from celery import shared_task from channels.layers import get_channel_layer from asgiref.sync import async_to_sync import json import logging import re import time import os from core.utils import RedisClient, send_websocket_update, acquire_task_lock, release_task_lock from apps.proxy.ts_proxy.channel_status import ChannelStatus from apps.m3u.models import M3UAccount from apps.epg.models import EPGSource from apps.m3u.tasks import refresh_single_m3u_account from apps.epg.tasks import refresh_epg_data from .models import CoreSettings from apps.channels.models import Stream, ChannelStream from django.db import transaction logger = logging.getLogger(__name__) EPG_WATCH_DIR = '/data/epgs' M3U_WATCH_DIR = '/data/m3us' LOGO_WATCH_DIR = '/data/logos' MIN_AGE_SECONDS = 6 STARTUP_SKIP_AGE = 30 REDIS_PREFIX = "processed_file:" REDIS_TTL = 60 * 60 * 24 * 3 # expire keys after 3 days (optional) SUPPORTED_LOGO_FORMATS = ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg'] # Store the last known value to compare with new data last_known_data = {} # Store when we last logged certain recurring messages _last_log_times = {} # Don't repeat similar log messages more often than this (in seconds) LOG_THROTTLE_SECONDS = 300 # 5 minutes # Track if this is the first scan since startup _first_scan_completed = False def throttled_log(logger_method, message, key=None, *args, **kwargs): """Only log messages with the same key once per throttle period""" if key is None: # Use message as key if no explicit key provided key = message now = time.time() if key not in _last_log_times or (now - _last_log_times[key]) >= LOG_THROTTLE_SECONDS: logger_method(message, *args, **kwargs) _last_log_times[key] = now @shared_task def beat_periodic_task(): fetch_channel_stats() scan_and_process_files() @shared_task def scan_and_process_files(): global _first_scan_completed redis_client = RedisClient.get_client() now = time.time() # Check if directories exist dirs_exist = all(os.path.exists(d) for d in [M3U_WATCH_DIR, EPG_WATCH_DIR, LOGO_WATCH_DIR]) if not dirs_exist: throttled_log(logger.warning, f"Watch directories missing: M3U ({os.path.exists(M3U_WATCH_DIR)}), EPG ({os.path.exists(EPG_WATCH_DIR)}), LOGO ({os.path.exists(LOGO_WATCH_DIR)})", "watch_dirs_missing") # Process M3U files m3u_files = [f for f in os.listdir(M3U_WATCH_DIR) if os.path.isfile(os.path.join(M3U_WATCH_DIR, f)) and (f.endswith('.m3u') or f.endswith('.m3u8'))] m3u_processed = 0 m3u_skipped = 0 for filename in m3u_files: filepath = os.path.join(M3U_WATCH_DIR, filename) mtime = os.path.getmtime(filepath) age = now - mtime redis_key = REDIS_PREFIX + filepath stored_mtime = redis_client.get(redis_key) # Instead of assuming old files were processed, check if they exist in the database if not stored_mtime and age > STARTUP_SKIP_AGE: # Check if this file is already in the database existing_m3u = M3UAccount.objects.filter(file_path=filepath).exists() if existing_m3u: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Already exists in database") else: logger.debug(f"Skipping {filename}: Already exists in database") redis_client.set(redis_key, mtime, ex=REDIS_TTL) m3u_skipped += 1 continue else: logger.debug(f"Processing {filename} despite age: Not found in database") # Continue processing this file even though it's old # File too new — probably still being written if age < MIN_AGE_SECONDS: logger.debug(f"Skipping {filename}: Too new (age={age}s)") m3u_skipped += 1 continue # Skip if we've already processed this mtime if stored_mtime and float(stored_mtime) >= mtime: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Already processed this version") else: logger.debug(f"Skipping {filename}: Already processed this version") m3u_skipped += 1 continue m3u_account, created = M3UAccount.objects.get_or_create(file_path=filepath, defaults={ "name": filename, "is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"], }) redis_client.set(redis_key, mtime, ex=REDIS_TTL) # More descriptive creation logging that includes active status if created: if m3u_account.is_active: logger.info(f"Created new M3U account '{filename}' (active)") else: logger.info(f"Created new M3U account '{filename}' (inactive due to auto-import setting)") if not m3u_account.is_active: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: M3U account is inactive") else: logger.debug(f"Skipping {filename}: M3U account is inactive") m3u_skipped += 1 continue # Log update for existing files (we've already logged creation above) if not created: logger.info(f"Detected update to existing M3U file: {filename}") logger.info(f"Queueing refresh for M3U file: {filename}") refresh_single_m3u_account.delay(m3u_account.id) m3u_processed += 1 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( "updates", { "type": "update", "data": {"success": True, "type": "m3u_file", "filename": filename} }, ) logger.trace(f"M3U processing complete: {m3u_processed} processed, {m3u_skipped} skipped, {len(m3u_files)} total") # Process EPG files try: epg_files = os.listdir(EPG_WATCH_DIR) logger.trace(f"Found {len(epg_files)} files in EPG directory") except Exception as e: logger.error(f"Error listing EPG directory: {e}") epg_files = [] epg_processed = 0 epg_skipped = 0 epg_errors = 0 for filename in epg_files: filepath = os.path.join(EPG_WATCH_DIR, filename) if not os.path.isfile(filepath): # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Not a file") else: logger.debug(f"Skipping {filename}: Not a file") epg_skipped += 1 continue if not filename.endswith('.xml') and not filename.endswith('.gz') and not filename.endswith('.zip'): # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Not an XML, GZ or zip file") else: logger.debug(f"Skipping {filename}: Not an XML, GZ or zip file") epg_skipped += 1 continue mtime = os.path.getmtime(filepath) age = now - mtime redis_key = REDIS_PREFIX + filepath stored_mtime = redis_client.get(redis_key) # Instead of assuming old files were processed, check if they exist in the database if not stored_mtime and age > STARTUP_SKIP_AGE: # Check if this file is already in the database existing_epg = EPGSource.objects.filter(file_path=filepath).exists() if existing_epg: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Already exists in database") else: logger.debug(f"Skipping {filename}: Already exists in database") redis_client.set(redis_key, mtime, ex=REDIS_TTL) epg_skipped += 1 continue else: logger.debug(f"Processing {filename} despite age: Not found in database") # Continue processing this file even though it's old # File too new — probably still being written if age < MIN_AGE_SECONDS: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Too new, possibly still being written (age={age}s)") else: logger.debug(f"Skipping {filename}: Too new, possibly still being written (age={age}s)") epg_skipped += 1 continue # Skip if we've already processed this mtime if stored_mtime and float(stored_mtime) >= mtime: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Already processed this version") else: logger.debug(f"Skipping {filename}: Already processed this version") epg_skipped += 1 continue try: epg_source, created = EPGSource.objects.get_or_create(file_path=filepath, defaults={ "name": filename, "source_type": "xmltv", "is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"], }) redis_client.set(redis_key, mtime, ex=REDIS_TTL) # More descriptive creation logging that includes active status if created: if epg_source.is_active: logger.info(f"Created new EPG source '{filename}' (active)") else: logger.info(f"Created new EPG source '{filename}' (inactive due to auto-import setting)") if not epg_source.is_active: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: EPG source is marked as inactive") else: logger.debug(f"Skipping {filename}: EPG source is marked as inactive") epg_skipped += 1 continue # Log update for existing files (we've already logged creation above) if not created: logger.info(f"Detected update to existing EPG file: {filename}") logger.info(f"Queueing refresh for EPG file: {filename}") refresh_epg_data.delay(epg_source.id) # Trigger Celery task epg_processed += 1 except Exception as e: logger.error(f"Error processing EPG file {filename}: {str(e)}", exc_info=True) epg_errors += 1 continue logger.trace(f"EPG processing complete: {epg_processed} processed, {epg_skipped} skipped, {epg_errors} errors") # Process Logo files (including subdirectories) try: logo_files = [] if os.path.exists(LOGO_WATCH_DIR): for root, dirs, files in os.walk(LOGO_WATCH_DIR): for filename in files: logo_files.append(os.path.join(root, filename)) logger.trace(f"Found {len(logo_files)} files in LOGO directory (including subdirectories)") except Exception as e: logger.error(f"Error listing LOGO directory: {e}") logo_files = [] logo_processed = 0 logo_skipped = 0 logo_errors = 0 for filepath in logo_files: filename = os.path.basename(filepath) if not os.path.isfile(filepath): if _first_scan_completed: logger.trace(f"Skipping {filename}: Not a file") else: logger.debug(f"Skipping {filename}: Not a file") logo_skipped += 1 continue # Check if file has supported logo extension file_ext = os.path.splitext(filename)[1].lower() if file_ext not in SUPPORTED_LOGO_FORMATS: if _first_scan_completed: logger.trace(f"Skipping {filename}: Not a supported logo format") else: logger.debug(f"Skipping {filename}: Not a supported logo format") logo_skipped += 1 continue mtime = os.path.getmtime(filepath) age = now - mtime redis_key = REDIS_PREFIX + filepath stored_mtime = redis_client.get(redis_key) # Check if logo already exists in database if not stored_mtime and age > STARTUP_SKIP_AGE: from apps.channels.models import Logo existing_logo = Logo.objects.filter(url=filepath).exists() if existing_logo: if _first_scan_completed: logger.trace(f"Skipping {filename}: Already exists in database") else: logger.debug(f"Skipping {filename}: Already exists in database") redis_client.set(redis_key, mtime, ex=REDIS_TTL) logo_skipped += 1 continue else: logger.debug(f"Processing {filename} despite age: Not found in database") # File too new — probably still being written if age < MIN_AGE_SECONDS: if _first_scan_completed: logger.trace(f"Skipping {filename}: Too new, possibly still being written (age={age}s)") else: logger.debug(f"Skipping {filename}: Too new, possibly still being written (age={age}s)") logo_skipped += 1 continue # Skip if we've already processed this mtime if stored_mtime and float(stored_mtime) >= mtime: if _first_scan_completed: logger.trace(f"Skipping {filename}: Already processed this version") else: logger.debug(f"Skipping {filename}: Already processed this version") logo_skipped += 1 continue try: from apps.channels.models import Logo # Create logo entry with just the filename (without extension) as name logo_name = os.path.splitext(filename)[0] logo, created = Logo.objects.get_or_create( url=filepath, defaults={ "name": logo_name, } ) redis_client.set(redis_key, mtime, ex=REDIS_TTL) if created: logger.info(f"Created new logo entry: {logo_name}") else: logger.debug(f"Logo entry already exists: {logo_name}") logo_processed += 1 except Exception as e: logger.error(f"Error processing logo file {filename}: {str(e)}", exc_info=True) logo_errors += 1 continue logger.trace(f"LOGO processing complete: {logo_processed} processed, {logo_skipped} skipped, {logo_errors} errors") # Send summary websocket update for logo processing if logo_processed > 0 or logo_errors > 0: send_websocket_update( "updates", "update", { "success": True, "type": "logo_processing_summary", "processed": logo_processed, "skipped": logo_skipped, "errors": logo_errors, "total_files": len(logo_files), "message": f"Logo processing complete: {logo_processed} processed, {logo_skipped} skipped, {logo_errors} errors" } ) # Mark that the first scan is complete _first_scan_completed = True def fetch_channel_stats(): redis_client = RedisClient.get_client() try: # Basic info for all channels channel_pattern = "ts_proxy:channel:*:metadata" all_channels = [] # Extract channel IDs from keys cursor = 0 while True: cursor, keys = redis_client.scan(cursor, match=channel_pattern) for key in keys: channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8')) if channel_id_match: ch_id = channel_id_match.group(1) channel_info = ChannelStatus.get_basic_channel_info(ch_id) if channel_info: all_channels.append(channel_info) if cursor == 0: break send_websocket_update( "updates", "update", { "success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)}) }, collect_garbage=True ) # Explicitly clean up large data structures all_channels = None except Exception as e: logger.error(f"Error in channel_status: {e}", exc_info=True) return @shared_task def rehash_streams(keys): """ Regenerate stream hashes for all streams based on current hash key configuration. This task checks for and blocks M3U refresh tasks to prevent conflicts. """ from apps.channels.models import Stream from apps.m3u.models import M3UAccount logger.info("Starting stream rehash process") # Get all M3U account IDs for locking m3u_account_ids = list(M3UAccount.objects.filter(is_active=True).values_list('id', flat=True)) # Check if any M3U refresh tasks are currently running blocked_accounts = [] for account_id in m3u_account_ids: if not acquire_task_lock('refresh_single_m3u_account', account_id): blocked_accounts.append(account_id) if blocked_accounts: # Release any locks we did acquire for account_id in m3u_account_ids: if account_id not in blocked_accounts: release_task_lock('refresh_single_m3u_account', account_id) logger.warning(f"Rehash blocked: M3U refresh tasks running for accounts: {blocked_accounts}") # Send WebSocket notification to inform user send_websocket_update( 'updates', 'update', { "success": False, "type": "stream_rehash", "action": "blocked", "blocked_accounts": len(blocked_accounts), "total_accounts": len(m3u_account_ids), "message": f"Stream rehash blocked: M3U refresh tasks are currently running for {len(blocked_accounts)} accounts. Please try again later." } ) return f"Rehash blocked: M3U refresh tasks running for {len(blocked_accounts)} accounts" acquired_locks = m3u_account_ids.copy() try: batch_size = 1000 queryset = Stream.objects.all() # Track statistics total_processed = 0 duplicates_merged = 0 hash_keys = {} total_records = queryset.count() logger.info(f"Starting rehash of {total_records} streams with keys: {keys}") # Send initial WebSocket update send_websocket_update( 'updates', 'update', { "success": True, "type": "stream_rehash", "action": "starting", "progress": 0, "total_records": total_records, "message": f"Starting rehash of {total_records} streams" } ) for start in range(0, total_records, batch_size): batch_processed = 0 batch_duplicates = 0 with transaction.atomic(): batch = queryset[start:start + batch_size] for obj in batch: # Generate new hash group_name = obj.channel_group.name if obj.channel_group else None new_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys, m3u_id=obj.m3u_account_id, group=group_name) # Check if this hash already exists in our tracking dict or in database if new_hash in hash_keys: # Found duplicate in current batch - merge the streams existing_stream_id = hash_keys[new_hash] existing_stream = Stream.objects.get(id=existing_stream_id) # Move any channel relationships from duplicate to existing stream # Handle potential unique constraint violations for channel_stream in ChannelStream.objects.filter(stream_id=obj.id): # Check if this channel already has a relationship with the target stream existing_relationship = ChannelStream.objects.filter( channel_id=channel_stream.channel_id, stream_id=existing_stream_id ).first() if existing_relationship: # Relationship already exists, just delete the duplicate channel_stream.delete() else: # Safe to update the relationship channel_stream.stream_id = existing_stream_id channel_stream.save() # Update the existing stream with the most recent data if obj.updated_at > existing_stream.updated_at: existing_stream.name = obj.name existing_stream.url = obj.url existing_stream.logo_url = obj.logo_url existing_stream.tvg_id = obj.tvg_id existing_stream.m3u_account = obj.m3u_account existing_stream.channel_group = obj.channel_group existing_stream.custom_properties = obj.custom_properties existing_stream.last_seen = obj.last_seen existing_stream.updated_at = obj.updated_at existing_stream.save() # Delete the duplicate obj.delete() batch_duplicates += 1 else: # Check if hash already exists in database (from previous batches or existing data) existing_stream = Stream.objects.filter(stream_hash=new_hash).exclude(id=obj.id).first() if existing_stream: # Found duplicate in database - merge the streams # Move any channel relationships from duplicate to existing stream # Handle potential unique constraint violations for channel_stream in ChannelStream.objects.filter(stream_id=obj.id): # Check if this channel already has a relationship with the target stream existing_relationship = ChannelStream.objects.filter( channel_id=channel_stream.channel_id, stream_id=existing_stream.id ).first() if existing_relationship: # Relationship already exists, just delete the duplicate channel_stream.delete() else: # Safe to update the relationship channel_stream.stream_id = existing_stream.id channel_stream.save() # Update the existing stream with the most recent data if obj.updated_at > existing_stream.updated_at: existing_stream.name = obj.name existing_stream.url = obj.url existing_stream.logo_url = obj.logo_url existing_stream.tvg_id = obj.tvg_id existing_stream.m3u_account = obj.m3u_account existing_stream.channel_group = obj.channel_group existing_stream.custom_properties = obj.custom_properties existing_stream.last_seen = obj.last_seen existing_stream.updated_at = obj.updated_at existing_stream.save() # Delete the duplicate obj.delete() batch_duplicates += 1 hash_keys[new_hash] = existing_stream.id else: # Update hash for this stream obj.stream_hash = new_hash obj.save(update_fields=['stream_hash']) hash_keys[new_hash] = obj.id batch_processed += 1 total_processed += batch_processed duplicates_merged += batch_duplicates # Calculate progress percentage progress_percent = int((total_processed / total_records) * 100) current_batch = start // batch_size + 1 total_batches = (total_records // batch_size) + 1 # Send progress update via WebSocket send_websocket_update( 'updates', 'update', { "success": True, "type": "stream_rehash", "action": "processing", "progress": progress_percent, "batch": current_batch, "total_batches": total_batches, "processed": total_processed, "duplicates_merged": duplicates_merged, "message": f"Processed batch {current_batch}/{total_batches}: {batch_processed} streams, {batch_duplicates} duplicates merged" } ) logger.info(f"Rehashed batch {current_batch}/{total_batches}: " f"{batch_processed} processed, {batch_duplicates} duplicates merged") logger.info(f"Rehashing complete: {total_processed} streams processed, " f"{duplicates_merged} duplicates merged") # Send completion update via WebSocket send_websocket_update( 'updates', 'update', { "success": True, "type": "stream_rehash", "action": "completed", "progress": 100, "total_processed": total_processed, "duplicates_merged": duplicates_merged, "final_count": total_processed - duplicates_merged, "message": f"Rehashing complete: {total_processed} streams processed, {duplicates_merged} duplicates merged" }, collect_garbage=True # Force garbage collection after completion ) logger.info("Stream rehash completed successfully") return f"Successfully rehashed {total_processed} streams" except Exception as e: logger.error(f"Error during stream rehash: {e}") raise finally: # Always release all acquired M3U locks for account_id in acquired_locks: release_task_lock('refresh_single_m3u_account', account_id) logger.info(f"Released M3U task locks for {len(acquired_locks)} accounts") @shared_task def cleanup_vod_persistent_connections(): """Clean up stale VOD persistent connections""" try: from apps.proxy.vod_proxy.connection_manager import VODConnectionManager # Clean up connections older than 30 minutes VODConnectionManager.cleanup_stale_persistent_connections(max_age_seconds=1800) logger.info("VOD persistent connection cleanup completed") except Exception as e: logger.error(f"Error during VOD persistent connection cleanup: {e}")