# yourapp/tasks.py from celery import shared_task from channels.layers import get_channel_layer from asgiref.sync import async_to_sync import json import logging import re import time import os from core.utils import RedisClient, send_websocket_update from apps.proxy.ts_proxy.channel_status import ChannelStatus from apps.m3u.models import M3UAccount from apps.epg.models import EPGSource from apps.m3u.tasks import refresh_single_m3u_account from apps.epg.tasks import refresh_epg_data from .models import CoreSettings from apps.channels.models import Stream, ChannelStream from django.db import transaction logger = logging.getLogger(__name__) EPG_WATCH_DIR = '/data/epgs' M3U_WATCH_DIR = '/data/m3us' MIN_AGE_SECONDS = 6 STARTUP_SKIP_AGE = 30 REDIS_PREFIX = "processed_file:" REDIS_TTL = 60 * 60 * 24 * 3 # expire keys after 3 days (optional) # Store the last known value to compare with new data last_known_data = {} # Store when we last logged certain recurring messages _last_log_times = {} # Don't repeat similar log messages more often than this (in seconds) LOG_THROTTLE_SECONDS = 300 # 5 minutes # Track if this is the first scan since startup _first_scan_completed = False def throttled_log(logger_method, message, key=None, *args, **kwargs): """Only log messages with the same key once per throttle period""" if key is None: # Use message as key if no explicit key provided key = message now = time.time() if key not in _last_log_times or (now - _last_log_times[key]) >= LOG_THROTTLE_SECONDS: logger_method(message, *args, **kwargs) _last_log_times[key] = now @shared_task def beat_periodic_task(): fetch_channel_stats() scan_and_process_files() @shared_task def scan_and_process_files(): global _first_scan_completed redis_client = RedisClient.get_client() now = time.time() # Check if directories exist dirs_exist = all(os.path.exists(d) for d in [M3U_WATCH_DIR, EPG_WATCH_DIR]) if not dirs_exist: throttled_log(logger.warning, f"Watch directories missing: M3U ({os.path.exists(M3U_WATCH_DIR)}), EPG ({os.path.exists(EPG_WATCH_DIR)})", "watch_dirs_missing") # Process M3U files m3u_files = [f for f in os.listdir(M3U_WATCH_DIR) if os.path.isfile(os.path.join(M3U_WATCH_DIR, f)) and (f.endswith('.m3u') or f.endswith('.m3u8'))] m3u_processed = 0 m3u_skipped = 0 for filename in m3u_files: filepath = os.path.join(M3U_WATCH_DIR, filename) mtime = os.path.getmtime(filepath) age = now - mtime redis_key = REDIS_PREFIX + filepath stored_mtime = redis_client.get(redis_key) # Instead of assuming old files were processed, check if they exist in the database if not stored_mtime and age > STARTUP_SKIP_AGE: # Check if this file is already in the database existing_m3u = M3UAccount.objects.filter(file_path=filepath).exists() if existing_m3u: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Already exists in database") else: logger.debug(f"Skipping {filename}: Already exists in database") redis_client.set(redis_key, mtime, ex=REDIS_TTL) m3u_skipped += 1 continue else: logger.debug(f"Processing {filename} despite age: Not found in database") # Continue processing this file even though it's old # File too new — probably still being written if age < MIN_AGE_SECONDS: logger.debug(f"Skipping {filename}: Too new (age={age}s)") m3u_skipped += 1 continue # Skip if we've already processed this mtime if stored_mtime and float(stored_mtime) >= mtime: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Already processed this version") else: logger.debug(f"Skipping {filename}: Already processed this version") m3u_skipped += 1 continue m3u_account, created = M3UAccount.objects.get_or_create(file_path=filepath, defaults={ "name": filename, "is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"], }) redis_client.set(redis_key, mtime, ex=REDIS_TTL) # More descriptive creation logging that includes active status if created: if m3u_account.is_active: logger.info(f"Created new M3U account '{filename}' (active)") else: logger.info(f"Created new M3U account '{filename}' (inactive due to auto-import setting)") if not m3u_account.is_active: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: M3U account is inactive") else: logger.debug(f"Skipping {filename}: M3U account is inactive") m3u_skipped += 1 continue # Log update for existing files (we've already logged creation above) if not created: logger.info(f"Detected update to existing M3U file: {filename}") logger.info(f"Queueing refresh for M3U file: {filename}") refresh_single_m3u_account.delay(m3u_account.id) m3u_processed += 1 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( "updates", { "type": "update", "data": {"success": True, "type": "m3u_file", "filename": filename} }, ) logger.trace(f"M3U processing complete: {m3u_processed} processed, {m3u_skipped} skipped, {len(m3u_files)} total") # Process EPG files try: epg_files = os.listdir(EPG_WATCH_DIR) logger.trace(f"Found {len(epg_files)} files in EPG directory") except Exception as e: logger.error(f"Error listing EPG directory: {e}") epg_files = [] epg_processed = 0 epg_skipped = 0 epg_errors = 0 for filename in epg_files: filepath = os.path.join(EPG_WATCH_DIR, filename) if not os.path.isfile(filepath): # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Not a file") else: logger.debug(f"Skipping {filename}: Not a file") epg_skipped += 1 continue if not filename.endswith('.xml') and not filename.endswith('.gz') and not filename.endswith('.zip'): # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Not an XML, GZ or zip file") else: logger.debug(f"Skipping {filename}: Not an XML, GZ or zip file") epg_skipped += 1 continue mtime = os.path.getmtime(filepath) age = now - mtime redis_key = REDIS_PREFIX + filepath stored_mtime = redis_client.get(redis_key) # Instead of assuming old files were processed, check if they exist in the database if not stored_mtime and age > STARTUP_SKIP_AGE: # Check if this file is already in the database existing_epg = EPGSource.objects.filter(file_path=filepath).exists() if existing_epg: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Already exists in database") else: logger.debug(f"Skipping {filename}: Already exists in database") redis_client.set(redis_key, mtime, ex=REDIS_TTL) epg_skipped += 1 continue else: logger.debug(f"Processing {filename} despite age: Not found in database") # Continue processing this file even though it's old # File too new — probably still being written if age < MIN_AGE_SECONDS: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Too new, possibly still being written (age={age}s)") else: logger.debug(f"Skipping {filename}: Too new, possibly still being written (age={age}s)") epg_skipped += 1 continue # Skip if we've already processed this mtime if stored_mtime and float(stored_mtime) >= mtime: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: Already processed this version") else: logger.debug(f"Skipping {filename}: Already processed this version") epg_skipped += 1 continue try: epg_source, created = EPGSource.objects.get_or_create(file_path=filepath, defaults={ "name": filename, "source_type": "xmltv", "is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"], }) redis_client.set(redis_key, mtime, ex=REDIS_TTL) # More descriptive creation logging that includes active status if created: if epg_source.is_active: logger.info(f"Created new EPG source '{filename}' (active)") else: logger.info(f"Created new EPG source '{filename}' (inactive due to auto-import setting)") if not epg_source.is_active: # Use trace level if not first scan if _first_scan_completed: logger.trace(f"Skipping {filename}: EPG source is marked as inactive") else: logger.debug(f"Skipping {filename}: EPG source is marked as inactive") epg_skipped += 1 continue # Log update for existing files (we've already logged creation above) if not created: logger.info(f"Detected update to existing EPG file: {filename}") logger.info(f"Queueing refresh for EPG file: {filename}") refresh_epg_data.delay(epg_source.id) # Trigger Celery task epg_processed += 1 except Exception as e: logger.error(f"Error processing EPG file {filename}: {str(e)}", exc_info=True) epg_errors += 1 continue logger.trace(f"EPG processing complete: {epg_processed} processed, {epg_skipped} skipped, {epg_errors} errors") # Mark that the first scan is complete _first_scan_completed = True def fetch_channel_stats(): redis_client = RedisClient.get_client() try: # Basic info for all channels channel_pattern = "ts_proxy:channel:*:metadata" all_channels = [] # Extract channel IDs from keys cursor = 0 while True: cursor, keys = redis_client.scan(cursor, match=channel_pattern) for key in keys: channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8')) if channel_id_match: ch_id = channel_id_match.group(1) channel_info = ChannelStatus.get_basic_channel_info(ch_id) if channel_info: all_channels.append(channel_info) if cursor == 0: break send_websocket_update( "updates", "update", { "success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)}) }, collect_garbage=True ) # Explicitly clean up large data structures all_channels = None except Exception as e: logger.error(f"Error in channel_status: {e}", exc_info=True) return @shared_task def rehash_streams(keys): """ Rehash all streams with new hash keys and handle duplicates. """ batch_size = 1000 queryset = Stream.objects.all() # Track statistics total_processed = 0 duplicates_merged = 0 hash_keys = {} total_records = queryset.count() logger.info(f"Starting rehash of {total_records} streams with keys: {keys}") # Send initial WebSocket update send_websocket_update( 'updates', 'update', { "success": True, "type": "stream_rehash", "action": "starting", "progress": 0, "total_records": total_records, "message": f"Starting rehash of {total_records} streams" } ) for start in range(0, total_records, batch_size): batch_processed = 0 batch_duplicates = 0 with transaction.atomic(): batch = queryset[start:start + batch_size] for obj in batch: # Generate new hash new_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys) # Check if this hash already exists in our tracking dict or in database if new_hash in hash_keys: # Found duplicate in current batch - merge the streams existing_stream_id = hash_keys[new_hash] existing_stream = Stream.objects.get(id=existing_stream_id) # Move any channel relationships from duplicate to existing stream ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream_id) # Update the existing stream with the most recent data if obj.updated_at > existing_stream.updated_at: existing_stream.name = obj.name existing_stream.url = obj.url existing_stream.logo_url = obj.logo_url existing_stream.tvg_id = obj.tvg_id existing_stream.m3u_account = obj.m3u_account existing_stream.channel_group = obj.channel_group existing_stream.custom_properties = obj.custom_properties existing_stream.last_seen = obj.last_seen existing_stream.updated_at = obj.updated_at existing_stream.save() # Delete the duplicate obj.delete() batch_duplicates += 1 else: # Check if hash already exists in database (from previous batches or existing data) existing_stream = Stream.objects.filter(stream_hash=new_hash).exclude(id=obj.id).first() if existing_stream: # Found duplicate in database - merge the streams # Move any channel relationships from duplicate to existing stream ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream.id) # Update the existing stream with the most recent data if obj.updated_at > existing_stream.updated_at: existing_stream.name = obj.name existing_stream.url = obj.url existing_stream.logo_url = obj.logo_url existing_stream.tvg_id = obj.tvg_id existing_stream.m3u_account = obj.m3u_account existing_stream.channel_group = obj.channel_group existing_stream.custom_properties = obj.custom_properties existing_stream.last_seen = obj.last_seen existing_stream.updated_at = obj.updated_at existing_stream.save() # Delete the duplicate obj.delete() batch_duplicates += 1 hash_keys[new_hash] = existing_stream.id else: # Update hash for this stream obj.stream_hash = new_hash obj.save(update_fields=['stream_hash']) hash_keys[new_hash] = obj.id batch_processed += 1 total_processed += batch_processed duplicates_merged += batch_duplicates # Calculate progress percentage progress_percent = int((total_processed / total_records) * 100) current_batch = start // batch_size + 1 total_batches = (total_records // batch_size) + 1 # Send progress update via WebSocket send_websocket_update( 'updates', 'update', { "success": True, "type": "stream_rehash", "action": "processing", "progress": progress_percent, "batch": current_batch, "total_batches": total_batches, "processed": total_processed, "duplicates_merged": duplicates_merged, "message": f"Processed batch {current_batch}/{total_batches}: {batch_processed} streams, {batch_duplicates} duplicates merged" } ) logger.info(f"Rehashed batch {current_batch}/{total_batches}: " f"{batch_processed} processed, {batch_duplicates} duplicates merged") logger.info(f"Rehashing complete: {total_processed} streams processed, " f"{duplicates_merged} duplicates merged") # Send completion update via WebSocket send_websocket_update( 'updates', 'update', { "success": True, "type": "stream_rehash", "action": "completed", "progress": 100, "total_processed": total_processed, "duplicates_merged": duplicates_merged, "final_count": total_processed - duplicates_merged, "message": f"Rehashing complete: {total_processed} streams processed, {duplicates_merged} duplicates merged" }, collect_garbage=True # Force garbage collection after completion ) return { 'total_processed': total_processed, 'duplicates_merged': duplicates_merged, 'final_count': total_processed - duplicates_merged }