diff --git a/.github/workflows/base-image.yml b/.github/workflows/base-image.yml index 955043fb..1da33d4f 100644 --- a/.github/workflows/base-image.yml +++ b/.github/workflows/base-image.yml @@ -6,11 +6,13 @@ on: paths: - 'docker/DispatcharrBase' - '.github/workflows/base-image.yml' + - 'requirements.txt' pull_request: branches: [ main, dev ] paths: - 'docker/DispatcharrBase' - '.github/workflows/base-image.yml' + - 'requirements.txt' workflow_dispatch: # Allow manual triggering permissions: diff --git a/apps/channels/tasks.py b/apps/channels/tasks.py index 88d040e8..6217a4ca 100755 --- a/apps/channels/tasks.py +++ b/apps/channels/tasks.py @@ -7,6 +7,7 @@ import time import json import subprocess from datetime import datetime +import gc from celery import shared_task from django.utils.text import slugify @@ -63,146 +64,162 @@ def match_epg_channels(): 4) If a match is found, we set channel.tvg_id 5) Summarize and log results. """ - logger.info("Starting EPG matching logic...") - - # Attempt to retrieve a "preferred-region" if configured try: - region_obj = CoreSettings.objects.get(key="preferred-region") - region_code = region_obj.value.strip().lower() - except CoreSettings.DoesNotExist: - region_code = None + logger.info("Starting EPG matching logic...") - matched_channels = [] - channels_to_update = [] + # Attempt to retrieve a "preferred-region" if configured + try: + region_obj = CoreSettings.objects.get(key="preferred-region") + region_code = region_obj.value.strip().lower() + except CoreSettings.DoesNotExist: + region_code = None - # Get channels that don't have EPG data assigned - channels_without_epg = Channel.objects.filter(epg_data__isnull=True) - logger.info(f"Found {channels_without_epg.count()} channels without EPG data") + matched_channels = [] + channels_to_update = [] - channels_json = [] - for channel in channels_without_epg: - # Normalize TVG ID - strip whitespace and convert to lowercase - normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" - if normalized_tvg_id: - logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'") + # Get channels that don't have EPG data assigned + channels_without_epg = Channel.objects.filter(epg_data__isnull=True) + logger.info(f"Found {channels_without_epg.count()} channels without EPG data") - channels_json.append({ - "id": channel.id, - "name": channel.name, - "tvg_id": normalized_tvg_id, # Use normalized TVG ID - "original_tvg_id": channel.tvg_id, # Keep original for reference - "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, - "norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name) - }) + channels_json = [] + for channel in channels_without_epg: + # Normalize TVG ID - strip whitespace and convert to lowercase + normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" + if normalized_tvg_id: + logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'") - # Similarly normalize EPG data TVG IDs - epg_json = [] - for epg in EPGData.objects.all(): - normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" - epg_json.append({ - 'id': epg.id, - 'tvg_id': normalized_tvg_id, # Use normalized TVG ID - 'original_tvg_id': epg.tvg_id, # Keep original for reference - 'name': epg.name, - 'norm_name': normalize_name(epg.name), - 'epg_source_id': epg.epg_source.id if epg.epg_source else None, - }) + channels_json.append({ + "id": channel.id, + "name": channel.name, + "tvg_id": normalized_tvg_id, # Use normalized TVG ID + "original_tvg_id": channel.tvg_id, # Keep original for reference + "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, + "norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name) + }) - # Log available EPG data TVG IDs for debugging - unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id']) - logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}") + # Similarly normalize EPG data TVG IDs + epg_json = [] + for epg in EPGData.objects.all(): + normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" + epg_json.append({ + 'id': epg.id, + 'tvg_id': normalized_tvg_id, # Use normalized TVG ID + 'original_tvg_id': epg.tvg_id, # Keep original for reference + 'name': epg.name, + 'norm_name': normalize_name(epg.name), + 'epg_source_id': epg.epg_source.id if epg.epg_source else None, + }) - payload = { - "channels": channels_json, - "epg_data": epg_json, - "region_code": region_code, - } + # Log available EPG data TVG IDs for debugging + unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id']) + logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}") - with tempfile.NamedTemporaryFile(delete=False) as temp_file: - temp_file.write(json.dumps(payload).encode('utf-8')) - temp_file_path = temp_file.name - - process = subprocess.Popen( - ['python', '/app/scripts/epg_match.py', temp_file_path], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - - # Log stderr in real-time - for line in iter(process.stderr.readline, ''): - if line: - logger.info(line.strip()) - - process.stderr.close() - stdout, stderr = process.communicate() - - os.remove(temp_file_path) - - if process.returncode != 0: - return f"Failed to process EPG matching: {stderr}" - - result = json.loads(stdout) - # This returns lists of dicts, not model objects - channels_to_update_dicts = result["channels_to_update"] - matched_channels = result["matched_channels"] - - # Convert your dict-based 'channels_to_update' into real Channel objects - if channels_to_update_dicts: - # Extract IDs of the channels that need updates - channel_ids = [d["id"] for d in channels_to_update_dicts] - - # Fetch them from DB - channels_qs = Channel.objects.filter(id__in=channel_ids) - channels_list = list(channels_qs) - - # Build a map from channel_id -> epg_data_id (or whatever fields you need) - epg_mapping = { - d["id"]: d["epg_data_id"] for d in channels_to_update_dicts + payload = { + "channels": channels_json, + "epg_data": epg_json, + "region_code": region_code, } - # Populate each Channel object with the updated epg_data_id - for channel_obj in channels_list: - # The script sets 'epg_data_id' in the returned dict - # We either assign directly, or fetch the EPGData instance if needed. - channel_obj.epg_data_id = epg_mapping.get(channel_obj.id) + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(json.dumps(payload).encode('utf-8')) + temp_file_path = temp_file.name - # Now we have real model objects, so bulk_update will work - Channel.objects.bulk_update(channels_list, ["epg_data"]) + # After writing to the file but before subprocess + # Explicitly delete the large data structures + del payload + gc.collect() - total_matched = len(matched_channels) - if total_matched: - logger.info(f"Match Summary: {total_matched} channel(s) matched.") - for (cid, cname, tvg) in matched_channels: - logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'") - else: - logger.info("No new channels were matched.") + process = subprocess.Popen( + ['python', '/app/scripts/epg_match.py', temp_file_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) - logger.info("Finished EPG matching logic.") + # Log stderr in real-time + for line in iter(process.stderr.readline, ''): + if line: + logger.info(line.strip()) - # Send update with additional information for refreshing UI - channel_layer = get_channel_layer() - associations = [ - {"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]} - for chan in channels_to_update_dicts - ] + process.stderr.close() + stdout, stderr = process.communicate() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": { - "success": True, - "type": "epg_match", - "refresh_channels": True, # Flag to tell frontend to refresh channels - "matches_count": total_matched, - "message": f"EPG matching complete: {total_matched} channel(s) matched", - "associations": associations # Add the associations data + os.remove(temp_file_path) + + if process.returncode != 0: + return f"Failed to process EPG matching: {stderr}" + + result = json.loads(stdout) + # This returns lists of dicts, not model objects + channels_to_update_dicts = result["channels_to_update"] + matched_channels = result["matched_channels"] + + # Explicitly clean up large objects + del stdout, result + gc.collect() + + # Convert your dict-based 'channels_to_update' into real Channel objects + if channels_to_update_dicts: + # Extract IDs of the channels that need updates + channel_ids = [d["id"] for d in channels_to_update_dicts] + + # Fetch them from DB + channels_qs = Channel.objects.filter(id__in=channel_ids) + channels_list = list(channels_qs) + + # Build a map from channel_id -> epg_data_id (or whatever fields you need) + epg_mapping = { + d["id"]: d["epg_data_id"] for d in channels_to_update_dicts } - } - ) - return f"Done. Matched {total_matched} channel(s)." + # Populate each Channel object with the updated epg_data_id + for channel_obj in channels_list: + # The script sets 'epg_data_id' in the returned dict + # We either assign directly, or fetch the EPGData instance if needed. + channel_obj.epg_data_id = epg_mapping.get(channel_obj.id) + + # Now we have real model objects, so bulk_update will work + Channel.objects.bulk_update(channels_list, ["epg_data"]) + + total_matched = len(matched_channels) + if total_matched: + logger.info(f"Match Summary: {total_matched} channel(s) matched.") + for (cid, cname, tvg) in matched_channels: + logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'") + else: + logger.info("No new channels were matched.") + + logger.info("Finished EPG matching logic.") + + # Send update with additional information for refreshing UI + channel_layer = get_channel_layer() + associations = [ + {"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]} + for chan in channels_to_update_dicts + ] + + async_to_sync(channel_layer.group_send)( + 'updates', + { + 'type': 'update', + "data": { + "success": True, + "type": "epg_match", + "refresh_channels": True, # Flag to tell frontend to refresh channels + "matches_count": total_matched, + "message": f"EPG matching complete: {total_matched} channel(s) matched", + "associations": associations # Add the associations data + } + } + ) + + return f"Done. Matched {total_matched} channel(s)." + finally: + # Final cleanup + gc.collect() + # Use our standardized cleanup function for more thorough memory management + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) @shared_task diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index f102630f..60428484 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -5,9 +5,12 @@ import gzip import os import uuid import requests -import xml.etree.ElementTree as ET import time # Add import for tracking download progress from datetime import datetime, timedelta, timezone as dt_timezone +import gc # Add garbage collection module +import json +from lxml import etree # Using lxml exclusively +import psutil # Add import for memory tracking from celery import shared_task from django.conf import settings @@ -20,7 +23,7 @@ from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from .models import EPGSource, EPGData, ProgramData -from core.utils import acquire_task_lock, release_task_lock +from core.utils import acquire_task_lock, release_task_lock, send_websocket_update, cleanup_memory logger = logging.getLogger(__name__) @@ -38,15 +41,18 @@ def send_epg_update(source_id, action, progress, **kwargs): # Add the additional key-value pairs from kwargs data.update(kwargs) - # Now, send the updated data dictionary - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - 'data': data - } - ) + # Use the standardized update function with garbage collection for program parsing + # This is a high-frequency operation that needs more aggressive memory management + collect_garbage = action == "parsing_programs" and progress % 10 == 0 + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) + + # Explicitly clear references + data = None + + # For high-frequency parsing, occasionally force additional garbage collection + # to prevent memory buildup + if action == "parsing_programs" and progress % 50 == 0: + gc.collect() def delete_epg_refresh_task_by_id(epg_id): @@ -112,6 +118,8 @@ def refresh_all_epg_data(): for source in active_sources: refresh_epg_data(source.id) + # Force garbage collection between sources + gc.collect() logger.info("Finished refresh_epg_data task.") return "EPG data refreshed." @@ -123,6 +131,7 @@ def refresh_epg_data(source_id): logger.debug(f"EPG refresh for {source_id} already running") return + source = None try: # Try to get the EPG source try: @@ -139,12 +148,16 @@ def refresh_epg_data(source_id): # Release the lock and exit release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return f"EPG source {source_id} does not exist, task cleaned up" # The source exists but is not active, just skip processing if not source.is_active: logger.info(f"EPG source {source_id} is not active. Skipping.") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return # Continue with the normal processing... @@ -154,12 +167,16 @@ def refresh_epg_data(source_id): if not fetch_success: logger.error(f"Failed to fetch XMLTV for source {source.name}") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return parse_channels_success = parse_channels_only(source) if not parse_channels_success: logger.error(f"Failed to parse channels for source {source.name}") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return parse_programs_for_source(source) @@ -171,14 +188,18 @@ def refresh_epg_data(source_id): except Exception as e: logger.error(f"Error in refresh_epg_data for source {source_id}: {e}", exc_info=True) try: - source = EPGSource.objects.get(id=source_id) - source.status = 'error' - source.last_message = f"Error refreshing EPG data: {str(e)}" - source.save(update_fields=['status', 'last_message']) - send_epg_update(source_id, "refresh", 100, status="error", error=str(e)) + if source: + source.status = 'error' + source.last_message = f"Error refreshing EPG data: {str(e)}" + source.save(update_fields=['status', 'last_message']) + send_epg_update(source_id, "refresh", 100, status="error", error=str(e)) except Exception as inner_e: logger.error(f"Error updating source status: {inner_e}") finally: + # Clear references to ensure proper garbage collection + source = None + # Force garbage collection before releasing the lock + gc.collect() release_task_lock('refresh_epg_data', source_id) @@ -186,7 +207,6 @@ def fetch_xmltv(source): # Handle cases with local file but no URL if not source.url and source.file_path and os.path.exists(source.file_path): logger.info(f"Using existing local file for EPG source: {source.name} at {source.file_path}") - # Set the status to success in the database source.status = 'success' source.save(update_fields=['status']) @@ -206,8 +226,12 @@ def fetch_xmltv(source): send_epg_update(source.id, "downloading", 100, status="error", error="No URL provided and no valid local file exists") return False + # Clean up existing cache file if os.path.exists(source.get_cache_file()): - os.remove(source.get_cache_file()) + try: + os.remove(source.get_cache_file()) + except Exception as e: + logger.warning(f"Failed to remove existing cache file: {e}") logger.info(f"Fetching XMLTV data from source: {source.name}") try: @@ -235,7 +259,7 @@ def fetch_xmltv(source): send_epg_update(source.id, "downloading", 0) # Use streaming response to track download progress - with requests.get(source.url, headers=headers, stream=True, timeout=30) as response: + with requests.get(source.url, headers=headers, stream=True, timeout=60) as response: # Handle 404 specifically if response.status_code == 404: logger.error(f"EPG URL not found (404): {source.url}") @@ -304,9 +328,10 @@ def fetch_xmltv(source): downloaded = 0 start_time = time.time() last_update_time = start_time + update_interval = 0.5 # Only update every 0.5 seconds with open(cache_file, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): + for chunk in response.iter_content(chunk_size=16384): # Increased chunk size for better performance if chunk: f.write(chunk) @@ -326,19 +351,23 @@ def fetch_xmltv(source): # Time remaining (in seconds) time_remaining = (total_size - downloaded) / (speed * 1024) if speed > 0 and total_size > 0 else 0 - # Only send updates every 0.5 seconds to avoid flooding + # Only send updates at specified intervals to avoid flooding current_time = time.time() - if current_time - last_update_time >= 0.5 and progress > 0: + if current_time - last_update_time >= update_interval and progress > 0: last_update_time = current_time send_epg_update( source.id, "downloading", progress, - speed=speed, - elapsed_time=elapsed_time, - time_remaining=time_remaining + speed=round(speed, 2), + elapsed_time=round(elapsed_time, 1), + time_remaining=round(time_remaining, 1), + downloaded=f"{downloaded / (1024 * 1024):.2f} MB" ) + # Explicitly delete the chunk to free memory immediately + del chunk + # Send completion notification send_epg_update(source.id, "downloading", 100) @@ -424,6 +453,20 @@ def fetch_xmltv(source): ) send_epg_update(source.id, "downloading", 100, status="error", error=user_message) return False + except requests.exceptions.Timeout as e: + # Handle timeout errors specifically + error_message = str(e) + user_message = f"Timeout error: EPG source '{source.name}' took too long to respond" + logger.error(f"Timeout error fetching XMLTV from {source.name}: {e}", exc_info=True) + + # Update source status + source.status = 'error' + source.last_message = user_message + source.save(update_fields=['status', 'last_message']) + + # Send notifications + send_epg_update(source.id, "downloading", 100, status="error", error=user_message) + return False except Exception as e: error_message = str(e) logger.error(f"Error fetching XMLTV from {source.name}: {e}", exc_info=True) @@ -487,85 +530,336 @@ def parse_channels_only(source): # Update status to error source.status = 'error' source.last_message = f"No URL provided, cannot fetch EPG data" - source.save(update_fields=['status', 'last_message']) - send_epg_update(source.id, "parsing_channels", 100, status="error", error="No URL provided") - return False + source.save(update_fields=['updated_at']) - file_path = new_path + # Initialize process variable for memory tracking only in debug mode + try: + process = None + # Get current log level as a number rather than string + current_log_level = logger.getEffectiveLevel() - logger.info(f"Parsing channels from EPG file: {file_path}") - existing_epgs = {e.tvg_id: e for e in EPGData.objects.filter(epg_source=source)} + # Only track memory usage when log level is DEBUG (10) or more verbose + # This is more future-proof than string comparisons + should_log_memory = current_log_level <= logging.DEBUG or settings.DEBUG - # Read entire file (decompress if .gz) - if file_path.endswith('.gz'): - with open(file_path, 'rb') as gz_file: - decompressed = gzip.decompress(gz_file.read()) - xml_data = decompressed.decode('utf-8') - else: - with open(file_path, 'r', encoding='utf-8') as xml_file: - xml_data = xml_file.read() + if should_log_memory: + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.debug(f"[parse_channels_only] Initial memory usage: {initial_memory:.2f} MB") + else: + logger.debug("Memory tracking disabled in production mode") + except (ImportError, NameError): + process = None + should_log_memory = False + logger.warning("psutil not available for memory tracking") - # Update progress to show file read completed - send_epg_update(source.id, "parsing_channels", 25) + # Replace full dictionary load with more efficient lookup set + existing_tvg_ids = set() + existing_epgs = {} # Initialize the dictionary that will lazily load objects + last_id = 0 + chunk_size = 5000 - root = ET.fromstring(xml_data) - channels = root.findall('channel') + while True: + tvg_id_chunk = set(EPGData.objects.filter( + epg_source=source, + id__gt=last_id + ).order_by('id').values_list('tvg_id', flat=True)[:chunk_size]) + + if not tvg_id_chunk: + break + + existing_tvg_ids.update(tvg_id_chunk) + last_id = EPGData.objects.filter(tvg_id__in=tvg_id_chunk).order_by('-id')[0].id + # Update progress to show file read starting + send_epg_update(source.id, "parsing_channels", 10) + + # Stream parsing instead of loading entire file at once + is_gzipped = file_path.endswith('.gz') epgs_to_create = [] epgs_to_update = [] + total_channels = 0 + processed_channels = 0 + batch_size = 500 # Process in batches to limit memory usage + progress = 0 # Initialize progress variable here - logger.info(f"Found {len(channels)} entries in {file_path}") + # Track memory at key points + if process: + logger.info(f"[parse_channels_only] Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # Update progress to show parsing started - send_epg_update(source.id, "parsing_channels", 50) + try: + # Create a parser with the desired options + #parser = etree.XMLParser(huge_tree=True, remove_blank_text=True) - total_channels = len(channels) - for i, channel_elem in enumerate(channels): - tvg_id = channel_elem.get('id', '').strip() - if not tvg_id: - continue # skip blank/invalid IDs + # Count channels for progress reporting - use proper lxml approach + # Open the file first + logger.info(f"Opening file for initial channel count: {file_path}") + source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') + if process: + logger.info(f"[parse_channels_only] Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") - display_name = channel_elem.findtext('display-name', default=tvg_id).strip() + # Count channels + try: + total_channels = EPGData.objects.filter(epg_source=source).count() + logger.info(f"Found {total_channels} existing channels for this source") + except Exception as e: + logger.error(f"Error counting channels: {e}") + total_channels = 500 # Default estimate - if tvg_id in existing_epgs: - epg_obj = existing_epgs[tvg_id] - if epg_obj.name != display_name: - epg_obj.name = display_name - epgs_to_update.append(epg_obj) - else: - epgs_to_create.append(EPGData( - tvg_id=tvg_id, - name=display_name, - epg_source=source, - )) + # Close the file to reset position + logger.debug(f"Closing initial file handle") + source_file.close() + if process: + logger.debug(f"[parse_channels_only] Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # Send occasional progress updates - if i % 100 == 0 or i == total_channels - 1: - progress = 50 + int((i / total_channels) * 40) # Scale to 50-90% range - send_epg_update(source.id, "parsing_channels", progress) + # Update progress after counting + send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels) - # Update progress before database operations - send_epg_update(source.id, "parsing_channels", 90) + # Reset file position for actual processing + logger.debug(f"Re-opening file for channel parsing: {file_path}") + source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') + if process: + logger.debug(f"[parse_channels_only] Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + # Change iterparse to look for both channel and programme elements + logger.debug(f"Creating iterparse context for channels and programmes") + channel_parser = etree.iterparse(source_file, events=('end',), tag=('channel', 'programme')) + if process: + logger.debug(f"[parse_channels_only] Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + channel_count = 0 + total_elements_processed = 0 # Track total elements processed, not just channels + for _, elem in channel_parser: + total_elements_processed += 1 + + # If we encounter a programme element, we've processed all channels + # Break out of the loop to avoid memory spike + if elem.tag == 'programme': + logger.debug(f"[parse_channels_only] Found first programme element after processing {processed_channels} channels - exiting channel parsing") + # Clean up the element before breaking + elem.clear() + parent = elem.getparent() + if parent is not None: + parent.remove(elem) + break + + # Only process channel elements + if elem.tag == 'channel': + channel_count += 1 + tvg_id = elem.get('id', '').strip() + if tvg_id: + display_name = None + for child in elem: + if child.tag == 'display-name' and child.text: + display_name = child.text.strip() + break + + if not display_name: + display_name = tvg_id + + # Use lazy loading approach to reduce memory usage + if tvg_id in existing_tvg_ids: + # Only fetch the object if we need to update it and it hasn't been loaded yet + if tvg_id not in existing_epgs: + try: + # This loads the full EPG object from the database and caches it + existing_epgs[tvg_id] = EPGData.objects.get(tvg_id=tvg_id, epg_source=source) + except EPGData.DoesNotExist: + # Handle race condition where record was deleted + existing_tvg_ids.remove(tvg_id) + epgs_to_create.append(EPGData( + tvg_id=tvg_id, + name=display_name, + epg_source=source, + )) + logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 1: {tvg_id} - {display_name}") + continue + + # We use the cached object to check if the name has changed + epg_obj = existing_epgs[tvg_id] + if epg_obj.name != display_name: + # Only update if the name actually changed + epg_obj.name = display_name + epgs_to_update.append(epg_obj) + logger.debug(f"[parse_channels_only] Added channel to update to epgs_to_update: {tvg_id} - {display_name}") + else: + # This is a new channel that doesn't exist in our database + epgs_to_create.append(EPGData( + tvg_id=tvg_id, + name=display_name, + epg_source=source, + )) + logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 2: {tvg_id} - {display_name}") + + processed_channels += 1 + + # Batch processing + if len(epgs_to_create) >= batch_size: + logger.info(f"[parse_channels_only] Bulk creating {len(epgs_to_create)} EPG entries") + EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) + if process: + logger.info(f"[parse_channels_only] Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") + del epgs_to_create # Explicit deletion + epgs_to_create = [] + gc.collect() + if process: + logger.info(f"[parse_channels_only] Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB") + + if len(epgs_to_update) >= batch_size: + logger.info(f"[parse_channels_only] Bulk updating {len(epgs_to_update)} EPG entries") + if process: + logger.info(f"[parse_channels_only] Memory before bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB") + EPGData.objects.bulk_update(epgs_to_update, ["name"]) + if process: + logger.info(f"[parse_channels_only] Memory after bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB") + epgs_to_update = [] + # Force garbage collection + gc.collect() + + # Periodically clear the existing_epgs cache to prevent memory buildup + if processed_channels % 1000 == 0: + logger.info(f"[parse_channels_only] Clearing existing_epgs cache at {processed_channels} channels") + existing_epgs.clear() + gc.collect() + if process: + logger.info(f"[parse_channels_only] Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + # Send progress updates + if processed_channels % 100 == 0 or processed_channels == total_channels: + progress = 25 + int((processed_channels / total_channels) * 65) if total_channels > 0 else 90 + send_epg_update( + source.id, + "parsing_channels", + progress, + processed=processed_channels, + total=total_channels + ) + logger.debug(f"[parse_channels_only] Processed channel: {tvg_id} - {display_name}") + if process: + logger.info(f"[parse_channels_only] Memory before elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + # Clear memory + try: + # First clear the element's content + elem.clear() + + # Get the parent before we might lose reference to it + parent = elem.getparent() + if parent is not None: + # Clean up preceding siblings + while elem.getprevious() is not None: + del parent[0] + + # Try to fully detach this element from parent + try: + parent.remove(elem) + del elem + del parent + except (ValueError, KeyError, TypeError): + # Element might already be removed or detached + pass + cleanup_memory(log_usage=should_log_memory, force_collection=True) + + except Exception as e: + # Just log the error and continue - don't let cleanup errors stop processing + logger.debug(f"[parse_channels_only] Non-critical error during XML element cleanup: {e}") + if process: + logger.info(f"[parse_channels_only] Memory after elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + # Check if we should break early to avoid excessive sleep + if processed_channels >= total_channels and total_channels > 0: + logger.info(f"[parse_channels_only] Expected channel numbers hit, continuing - processed {processed_channels}/{total_channels}") + logger.debug(f"[parse_channels_only] Memory usage after {processed_channels}: {process.memory_info().rss / 1024 / 1024:.2f} MB") + #break + logger.debug(f"[parse_channels_only] Total elements processed: {total_elements_processed}") + # Add periodic forced cleanup based on TOTAL ELEMENTS, not just channels + # This ensures we clean up even if processing many non-channel elements + if total_elements_processed % 1000 == 0: + logger.info(f"[parse_channels_only] Performing preventative memory cleanup after {total_elements_processed} elements (found {processed_channels} channels)") + # Close and reopen the parser to release memory + if source_file and channel_parser: + # First clear element references + elem.clear() + if elem.getparent() is not None: + elem.getparent().remove(elem) + + # Reset parser state + del channel_parser + channel_parser = None + gc.collect() + + # Perform thorough cleanup + cleanup_memory(log_usage=should_log_memory, force_collection=True) + + # Create a new parser context - continue looking for both tags + # This doesn't restart from the beginning but continues from current position + channel_parser = etree.iterparse(source_file, events=('end',), tag=('channel', 'programme')) + logger.info(f"[parse_channels_only] Recreated parser context after memory cleanup") + + # Also do cleanup based on processed channels as before + elif processed_channels % 1000 == 0 and processed_channels > 0: + logger.info(f"[parse_channels_only] Performing preventative memory cleanup at {processed_channels} channels") + # Close and reopen the parser to release memory + if source_file and channel_parser: + # First clear element references + elem.clear() + if elem.getparent() is not None: + elem.getparent().remove(elem) + + # Reset parser state + del channel_parser + channel_parser = None + gc.collect() + + # Perform thorough cleanup + cleanup_memory(log_usage=should_log_memory, force_collection=True) + + # Create a new parser context + # This doesn't restart from the beginning but continues from current position + channel_parser = etree.iterparse(source_file, events=('end',), tag=('channel', 'programme')) + logger.info(f"[parse_channels_only] Recreated parser context after memory cleanup") + + if processed_channels == total_channels: + logger.info(f"[parse_channels_only] Processed all channels current memory: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + except (etree.XMLSyntaxError, Exception) as xml_error: + logger.error(f"[parse_channels_only] XML parsing failed: {xml_error}") + # Update status to error + source.status = 'error' + source.last_message = f"Error parsing XML file: {str(xml_error)}" + source.save(update_fields=['status', 'last_message']) + send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(xml_error)) + return False + if process: + logger.debug(f"[parse_channels_only] Memory before final batch creation: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + # Process any remaining items if epgs_to_create: EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) + logger.debug(f"[parse_channels_only] Created final batch of {len(epgs_to_create)} EPG entries") if epgs_to_update: EPGData.objects.bulk_update(epgs_to_update, ["name"]) + logger.debug(f"[parse_channels_only] Updated final batch of {len(epgs_to_update)} EPG entries") + if process: + logger.debug(f"[parse_channels_only] Memory after final batch creation: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + # Update source status with channel count + source.status = 'success' + source.last_message = f"Successfully parsed {processed_channels} channels" + source.save(update_fields=['status', 'last_message']) # Send completion notification - send_epg_update(source.id, "parsing_channels", 100, status="success") - - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": {"success": True, "type": "epg_channels"} - } + send_epg_update( + source.id, + "parsing_channels", + 100, + status="success", + channels_count=processed_channels ) - logger.info("Finished parsing channel info.") + send_websocket_update('updates', 'update', {"success": True, "type": "epg_channels"}) + + logger.info(f"Finished parsing channel info. Found {processed_channels} channels.") + return True except FileNotFoundError: @@ -584,6 +878,39 @@ def parse_channels_only(source): source.save(update_fields=['status', 'last_message']) send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(e)) return False + finally: + # Add more detailed cleanup in finally block + logger.debug("In finally block, ensuring cleanup") + try: + if 'channel_parser' in locals(): + del channel_parser + if 'elem' in locals(): + del elem + if 'parent' in locals(): + del parent + + if 'source_file' in locals(): + source_file.close() + del source_file + # Clear remaining large data structures + existing_epgs.clear() + epgs_to_create.clear() + epgs_to_update.clear() + existing_epgs = None + epgs_to_create = None + epgs_to_update = None + cleanup_memory(log_usage=should_log_memory, force_collection=True) + except Exception as e: + logger.warning(f"Cleanup error: {e}") + + try: + if process: + final_memory = process.memory_info().rss / 1024 / 1024 + logger.debug(f"[parse_channels_only] Final memory usage: {final_memory:.2f} MB") + process = None + except: + pass + @shared_task @@ -592,211 +919,322 @@ def parse_programs_for_tvg_id(epg_id): logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task") return "Task already running" - epg = EPGData.objects.get(id=epg_id) - epg_source = epg.epg_source + source_file = None + program_parser = None + programs_to_create = [] + try: + # Add memory tracking only in trace mode or higher + try: + process = None + # Get current log level as a number + current_log_level = logger.getEffectiveLevel() - if not Channel.objects.filter(epg_data=epg).exists(): - logger.info(f"No channels matched to EPG {epg.tvg_id}") - release_task_lock('parse_epg_programs', epg_id) - return + # Only track memory usage when log level is TRACE or more verbose + should_log_memory = current_log_level <= 5 - logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}") + if should_log_memory: + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Initial memory usage: {initial_memory:.2f} MB") + mem_before = initial_memory + else: + logger.debug("Memory tracking disabled in production mode") + except ImportError: + process = None + should_log_memory = False - # First, remove all existing programs - ProgramData.objects.filter(epg=epg).delete() + epg = EPGData.objects.get(id=epg_id) + epg_source = epg.epg_source - file_path = epg_source.file_path - if not file_path: - file_path = epg_source.get_cache_file() - - # Check if the file exists - if not os.path.exists(file_path): - logger.error(f"EPG file not found at: {file_path}") - - # Update the file path in the database - new_path = epg_source.get_cache_file() - logger.info(f"Updating file_path from '{file_path}' to '{new_path}'") - epg_source.file_path = new_path - epg_source.save(update_fields=['file_path']) - - # Fetch new data before continuing - if epg_source.url: - logger.info(f"Fetching new EPG data from URL: {epg_source.url}") - # Properly check the return value from fetch_xmltv - fetch_success = fetch_xmltv(epg_source) - - # If fetch was not successful or the file still doesn't exist, abort - if not fetch_success: - logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}") - # Update status to error if not already set - epg_source.status = 'error' - epg_source.last_message = f"Failed to download EPG data, cannot parse programs" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file") - release_task_lock('parse_epg_programs', epg_id) - return - - # Also check if the file exists after download - if not os.path.exists(new_path): - logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}") - epg_source.status = 'error' - epg_source.last_message = f"Failed to download EPG data, file missing after download" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download") - release_task_lock('parse_epg_programs', epg_id) - return - else: - logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data") - # Update status to error - epg_source.status = 'error' - epg_source.last_message = f"No URL provided, cannot fetch EPG data" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided") + if not Channel.objects.filter(epg_data=epg).exists(): + logger.info(f"No channels matched to EPG {epg.tvg_id}") release_task_lock('parse_epg_programs', epg_id) return - file_path = new_path + logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}") - # Read entire file (decompress if .gz) - try: - if file_path.endswith('.gz'): - with open(file_path, 'rb') as gz_file: - decompressed = gzip.decompress(gz_file.read()) - xml_data = decompressed.decode('utf-8') - else: - with open(file_path, 'r', encoding='utf-8') as xml_file: - xml_data = xml_file.read() - except FileNotFoundError: - logger.error(f"EPG file not found at: {file_path}") - release_task_lock('parse_epg_programs', epg_id) - return - except Exception as e: - logger.error(f"Error reading EPG file {file_path}: {e}", exc_info=True) - release_task_lock('parse_epg_programs', epg_id) - return + # Optimize deletion with a single delete query instead of chunking + # This is faster for most database engines + ProgramData.objects.filter(epg=epg).delete() - root = ET.fromstring(xml_data) + file_path = epg_source.file_path + if not file_path: + file_path = epg_source.get_cache_file() - # Find only elements for this tvg_id - matched_programmes = [p for p in root.findall('programme') if p.get('channel') == epg.tvg_id] - logger.debug(f"Found {len(matched_programmes)} programmes for tvg_id={epg.tvg_id}") + # Check if the file exists + if not os.path.exists(file_path): + logger.error(f"EPG file not found at: {file_path}") - programs_to_create = [] - for prog in matched_programmes: - start_time = parse_xmltv_time(prog.get('start')) - end_time = parse_xmltv_time(prog.get('stop')) - title = prog.findtext('title', default='No Title') - desc = prog.findtext('desc', default='') - sub_title = prog.findtext('sub-title', default='') + # Update the file path in the database + new_path = epg_source.get_cache_file() + logger.info(f"Updating file_path from '{file_path}' to '{new_path}'") + epg_source.file_path = new_path + epg_source.save(update_fields=['file_path']) - # Extract custom properties - custom_props = {} + # Fetch new data before continuing + if epg_source.url: + logger.info(f"Fetching new EPG data from URL: {epg_source.url}") + # Properly check the return value from fetch_xmltv + fetch_success = fetch_xmltv(epg_source) - # Extract categories - categories = [] - for cat_elem in prog.findall('category'): - if cat_elem.text and cat_elem.text.strip(): - categories.append(cat_elem.text.strip()) - if categories: - custom_props['categories'] = categories + # If fetch was not successful or the file still doesn't exist, abort + if not fetch_success: + logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}") + # Update status to error if not already set + epg_source.status = 'error' + epg_source.last_message = f"Failed to download EPG data, cannot parse programs" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file") + release_task_lock('parse_epg_programs', epg_id) + return - # Extract episode numbers - for ep_num in prog.findall('episode-num'): - system = ep_num.get('system', '') - if system == 'xmltv_ns' and ep_num.text: - # Parse XMLTV episode-num format (season.episode.part) - parts = ep_num.text.split('.') - if len(parts) >= 2: - if parts[0].strip() != '': + # Also check if the file exists after download + if not os.path.exists(new_path): + logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}") + epg_source.status = 'error' + epg_source.last_message = f"Failed to download EPG data, file missing after download" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download") + release_task_lock('parse_epg_programs', epg_id) + return + else: + logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data") + # Update status to error + epg_source.status = 'error' + epg_source.last_message = f"No URL provided, cannot fetch EPG data" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided") + release_task_lock('parse_epg_programs', epg_id) + return + + file_path = new_path + + # Use streaming parsing to reduce memory usage + is_gzipped = file_path.endswith('.gz') + + logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}") + + # Memory usage tracking + if process: + mem_before = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB") + + programs_to_create = [] + batch_size = 1000 # Process in batches to limit memory usage + programs_processed = 0 + + try: + # Open the file properly + source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') + + # Stream parse the file using lxml's iterparse + program_parser = etree.iterparse(source_file, events=('end',), tag='programme') + + for _, elem in program_parser: + if elem.get('channel') == epg.tvg_id: + try: + start_time = parse_xmltv_time(elem.get('start')) + end_time = parse_xmltv_time(elem.get('stop')) + title = None + desc = None + sub_title = None + + # Efficiently process child elements + for child in elem: + if child.tag == 'title': + title = child.text or 'No Title' + elif child.tag == 'desc': + desc = child.text or '' + elif child.tag == 'sub-title': + sub_title = child.text or '' + + if not title: + title = 'No Title' + + # Extract custom properties + custom_props = extract_custom_properties(elem) + custom_properties_json = None + + if custom_props: + logger.trace(f"Number of custom properties: {len(custom_props)}") + try: + custom_properties_json = json.dumps(custom_props) + except Exception as e: + logger.error(f"Error serializing custom properties to JSON: {e}", exc_info=True) + + programs_to_create.append(ProgramData( + epg=epg, + start_time=start_time, + end_time=end_time, + title=title, + description=desc, + sub_title=sub_title, + tvg_id=epg.tvg_id, + custom_properties=custom_properties_json + )) + programs_processed += 1 + del custom_props + del custom_properties_json + del start_time + del end_time + del title + del desc + del sub_title + elem.clear() + parent = elem.getparent() + if parent is not None: + while elem.getprevious() is not None: + del parent[0] + parent.remove(elem) + del elem + del parent + #gc.collect() + # Batch processing + if len(programs_to_create) >= batch_size: + ProgramData.objects.bulk_create(programs_to_create) + logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}") + programs_to_create = [] + # Only call gc.collect() every few batches + if programs_processed % (batch_size * 5) == 0: + gc.collect() + + except Exception as e: + logger.error(f"Error processing program for {epg.tvg_id}: {e}", exc_info=True) + else: + # Immediately clean up non-matching elements to reduce memory pressure + elem.clear() + parent = elem.getparent() + if parent is not None: + while elem.getprevious() is not None: + del parent[0] try: - season = int(parts[0]) + 1 # XMLTV format is zero-based - custom_props['season'] = season - except ValueError: + parent.remove(elem) + except (ValueError, KeyError, TypeError): pass - if parts[1].strip() != '': + del elem + continue + + # Important: Clear the element to avoid memory leaks using a more robust approach + try: + # First clear the element's content + elem.clear() + # Get the parent before we might lose reference to it + parent = elem.getparent() + if parent is not None: + # Clean up preceding siblings + while elem.getprevious() is not None: + del parent[0] + # Try to fully detach this element from parent try: - episode = int(parts[1]) + 1 # XMLTV format is zero-based - custom_props['episode'] = episode - except ValueError: + parent.remove(elem) + del elem + del parent + except (ValueError, KeyError, TypeError): + # Element might already be removed or detached pass - elif system == 'onscreen' and ep_num.text: - # Just store the raw onscreen format - custom_props['onscreen_episode'] = ep_num.text.strip() - # Extract ratings - for rating_elem in prog.findall('rating'): - if rating_elem.findtext('value'): - custom_props['rating'] = rating_elem.findtext('value').strip() - if rating_elem.get('system'): - custom_props['rating_system'] = rating_elem.get('system') - break # Just use the first rating + except Exception as e: + # Just log the error and continue - don't let cleanup errors stop processing + logger.trace(f"Non-critical error during XML element cleanup: {e}") - # Extract credits (actors, directors, etc.) - credits_elem = prog.find('credits') - if credits_elem is not None: - credits = {} - for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']: - elements = credits_elem.findall(credit_type) - if elements: - names = [e.text.strip() for e in elements if e.text and e.text.strip()] - if names: - credits[credit_type] = names - if credits: - custom_props['credits'] = credits + # Make sure to close the file and release parser resources + if source_file: + source_file.close() + source_file = None - # Extract other common program metadata - if prog.findtext('date'): - custom_props['year'] = prog.findtext('date').strip()[:4] # Just the year part + if program_parser: + program_parser = None - if prog.findtext('country'): - custom_props['country'] = prog.findtext('country').strip() + gc.collect() - for icon_elem in prog.findall('icon'): - if icon_elem.get('src'): - custom_props['icon'] = icon_elem.get('src') - break # Just use the first icon + except etree.XMLSyntaxError as xml_error: + logger.error(f"XML syntax error parsing program data: {xml_error}") + raise + except Exception as e: + logger.error(f"Error parsing XML for programs: {e}", exc_info=True) + raise + finally: + # Ensure file is closed even if an exception occurs + if source_file: + source_file.close() + source_file = None + # Memory tracking after processing + if process: + mem_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 1 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") - for kw in ['previously-shown', 'premiere', 'new']: - if prog.find(kw) is not None: - custom_props[kw.replace('-', '_')] = True + # Process any remaining items + if programs_to_create: + ProgramData.objects.bulk_create(programs_to_create) + logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}") + programs_to_create = None + custom_props = None + custom_properties_json = None - # Convert custom_props to JSON string if not empty - custom_properties_json = None - if custom_props: - import json + + logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") + finally: + # Reset internal caches and pools that lxml might be keeping + try: + etree.clear_error_log() + except: + pass + # Explicit cleanup of all potentially large objects + if source_file: try: - custom_properties_json = json.dumps(custom_props) - except Exception as e: - logger.error(f"Error serializing custom properties to JSON: {e}", exc_info=True) + source_file.close() + except: + pass + source_file = None + program_parser = None + programs_to_create = None - programs_to_create.append(ProgramData( - epg=epg, - start_time=start_time, - end_time=end_time, - title=title, - description=desc, - sub_title=sub_title, - tvg_id=epg.tvg_id, - custom_properties=custom_properties_json - )) + epg_source = None + # Add comprehensive cleanup before releasing lock + cleanup_memory(log_usage=should_log_memory, force_collection=True) + # Memory tracking after processing + if process: + mem_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Final memory usage {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") + process = None + epg = None + programs_processed = None + release_task_lock('parse_epg_programs', epg_id) - ProgramData.objects.bulk_create(programs_to_create) - - release_task_lock('parse_epg_programs', epg_id) - - logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") def parse_programs_for_source(epg_source, tvg_id=None): # Send initial programs parsing notification send_epg_update(epg_source.id, "parsing_programs", 0) + should_log_memory = False + process = None + initial_memory = 0 + + # Add memory tracking only in trace mode or higher + try: + # Get current log level as a number + current_log_level = logger.getEffectiveLevel() + + # Only track memory usage when log level is TRACE or more verbose + should_log_memory = current_log_level <= 5 or settings.DEBUG # Assuming TRACE is level 5 or lower + + if should_log_memory: + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_source] Initial memory usage: {initial_memory:.2f} MB") + else: + logger.debug("Memory tracking disabled in production mode") + except ImportError: + logger.warning("psutil not available for memory tracking") + process = None + should_log_memory = False try: - epg_entries = EPGData.objects.filter(epg_source=epg_source) - total_entries = epg_entries.count() - processed = 0 + # Process EPG entries in batches rather than all at once + batch_size = 20 # Process fewer channels at once to reduce memory usage + epg_count = EPGData.objects.filter(epg_source=epg_source).count() - if total_entries == 0: + if epg_count == 0: logger.info(f"No EPG entries found for source: {epg_source.name}") # Update status - this is not an error, just no entries epg_source.status = 'success' @@ -804,31 +1242,51 @@ def parse_programs_for_source(epg_source, tvg_id=None): send_epg_update(epg_source.id, "parsing_programs", 100, status="success") return True - logger.info(f"Parsing programs for {total_entries} EPG entries from source: {epg_source.name}") + logger.info(f"Parsing programs for {epg_count} EPG entries from source: {epg_source.name}") failed_entries = [] program_count = 0 channel_count = 0 updated_count = 0 + processed = 0 + # Process in batches using cursor-based approach to limit memory usage + last_id = 0 + while True: + # Get a batch of EPG entries + batch_entries = list(EPGData.objects.filter( + epg_source=epg_source, + id__gt=last_id + ).order_by('id')[:batch_size]) - for epg in epg_entries: - if epg.tvg_id: - try: - result = parse_programs_for_tvg_id(epg.id) - if result == "Task already running": - logger.info(f"Program parse for {epg.id} already in progress, skipping") + if not batch_entries: + break # No more entries to process - processed += 1 - progress = min(95, int((processed / total_entries) * 100)) if total_entries > 0 else 50 - send_epg_update(epg_source.id, "parsing_programs", progress) - except Exception as e: - logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True) - failed_entries.append(f"{epg.tvg_id}: {str(e)}") + # Update last_id for next iteration + last_id = batch_entries[-1].id + + # Process this batch + for epg in batch_entries: + if epg.tvg_id: + try: + result = parse_programs_for_tvg_id(epg.id) + if result == "Task already running": + logger.info(f"Program parse for {epg.id} already in progress, skipping") + + processed += 1 + progress = min(95, int((processed / epg_count) * 100)) if epg_count > 0 else 50 + send_epg_update(epg_source.id, "parsing_programs", progress) + except Exception as e: + logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True) + failed_entries.append(f"{epg.tvg_id}: {str(e)}") + + # Force garbage collection after each batch + batch_entries = None # Remove reference to help garbage collection + gc.collect() # If there were failures, include them in the message but continue if failed_entries: epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed - error_summary = f"Failed to parse {len(failed_entries)} of {total_entries} entries" + error_summary = f"Failed to parse {len(failed_entries)} of {epg_count} entries" stats_summary = f"Processed {program_count} programs across {channel_count} channels. Updated: {updated_count}." epg_source.last_message = f"{stats_summary} Warning: {error_summary}" epg_source.updated_at = timezone.now() @@ -838,6 +1296,11 @@ def parse_programs_for_source(epg_source, tvg_id=None): send_epg_update(epg_source.id, "parsing_programs", 100, status="success", message=epg_source.last_message) + + # Explicitly release memory of large lists before returning + del failed_entries + gc.collect() + return True # If all successful, set a comprehensive success message @@ -864,8 +1327,25 @@ def parse_programs_for_source(epg_source, tvg_id=None): status="error", message=epg_source.last_message) return False + finally: + # Final memory cleanup and tracking + # Explicitly release any remaining large data structures + failed_entries = None + program_count = None + channel_count = None + updated_count = None + processed = None + gc.collect() + + # Add comprehensive memory cleanup at the end + cleanup_memory(log_usage=should_log_memory, force_collection=True) + if process: + final_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB difference: {final_memory - initial_memory:.2f} MB") + # Explicitly clear the process object to prevent potential memory leaks + process = None def fetch_schedules_direct(source): logger.info(f"Fetching Schedules Direct data from source: {source.name}") try: @@ -950,7 +1430,7 @@ def parse_xmltv_time(time_str): elif tz_sign == '-': dt_obj = dt_obj + timedelta(hours=tz_hours, minutes=tz_minutes) aware_dt = timezone.make_aware(dt_obj, timezone=dt_timezone.utc) - logger.debug(f"Parsed XMLTV time '{time_str}' to {aware_dt}") + logger.trace(f"Parsed XMLTV time '{time_str}' to {aware_dt}") return aware_dt except Exception as e: logger.error(f"Error parsing XMLTV time '{time_str}': {e}", exc_info=True) @@ -966,3 +1446,77 @@ def parse_schedules_direct_time(time_str): except Exception as e: logger.error(f"Error parsing Schedules Direct time '{time_str}': {e}", exc_info=True) raise + + +# Helper function to extract custom properties - moved to a separate function to clean up the code +def extract_custom_properties(prog): + # Create a new dictionary for each call + custom_props = {} + + # Extract categories with a single comprehension to reduce intermediate objects + categories = [cat.text.strip() for cat in prog.findall('category') if cat.text and cat.text.strip()] + if categories: + custom_props['categories'] = categories + + # Extract episode numbers + for ep_num in prog.findall('episode-num'): + system = ep_num.get('system', '') + if system == 'xmltv_ns' and ep_num.text: + # Parse XMLTV episode-num format (season.episode.part) + parts = ep_num.text.split('.') + if len(parts) >= 2: + if parts[0].strip() != '': + try: + season = int(parts[0]) + 1 # XMLTV format is zero-based + custom_props['season'] = season + except ValueError: + pass + if parts[1].strip() != '': + try: + episode = int(parts[1]) + 1 # XMLTV format is zero-based + custom_props['episode'] = episode + except ValueError: + pass + elif system == 'onscreen' and ep_num.text: + # Just store the raw onscreen format + custom_props['onscreen_episode'] = ep_num.text.strip() + + # Extract ratings more efficiently + rating_elem = prog.find('rating') + if rating_elem is not None: + value_elem = rating_elem.find('value') + if value_elem is not None and value_elem.text: + custom_props['rating'] = value_elem.text.strip() + if rating_elem.get('system'): + custom_props['rating_system'] = rating_elem.get('system') + + # Extract credits more efficiently + credits_elem = prog.find('credits') + if credits_elem is not None: + credits = {} + for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']: + names = [e.text.strip() for e in credits_elem.findall(credit_type) if e.text and e.text.strip()] + if names: + credits[credit_type] = names + if credits: + custom_props['credits'] = credits + + # Extract other common program metadata + date_elem = prog.find('date') + if date_elem is not None and date_elem.text: + custom_props['year'] = date_elem.text.strip()[:4] # Just the year part + + country_elem = prog.find('country') + if country_elem is not None and country_elem.text: + custom_props['country'] = country_elem.text.strip() + + icon_elem = prog.find('icon') + if icon_elem is not None and icon_elem.get('src'): + custom_props['icon'] = icon_elem.get('src') + + # Simpler approach for boolean flags + for kw in ['previously-shown', 'premiere', 'new']: + if prog.find(kw) is not None: + custom_props[kw.replace('-', '_')] = True + + return custom_props diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 71ee5f49..4a1f2645 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -496,7 +496,8 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): # Aggressive garbage collection del streams_to_create, streams_to_update, stream_hashes, existing_streams - gc.collect() + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) return retval @@ -1080,7 +1081,8 @@ def refresh_single_m3u_account(account_id): # Aggressive garbage collection del existing_groups, extinf_data, groups, batches - gc.collect() + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) # Clean up cache file since we've fully processed it if os.path.exists(cache_path): @@ -1088,6 +1090,8 @@ def refresh_single_m3u_account(account_id): return f"Dispatched jobs complete." +from core.utils import send_websocket_update + def send_m3u_update(account_id, action, progress, **kwargs): # Start with the base data dictionary data = { @@ -1111,12 +1115,10 @@ def send_m3u_update(account_id, action, progress, **kwargs): # Add the additional key-value pairs from kwargs data.update(kwargs) - # Now, send the updated data dictionary - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - 'data': data - } - ) + # Use the standardized function with memory management + # Enable garbage collection for certain operations + collect_garbage = action == "parsing" and progress % 25 == 0 + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) + + # Explicitly clear data reference to help garbage collection + data = None diff --git a/apps/proxy/tasks.py b/apps/proxy/tasks.py index a4aaf8e5..00e3e039 100644 --- a/apps/proxy/tasks.py +++ b/apps/proxy/tasks.py @@ -6,8 +6,10 @@ import redis import json import logging import re +import gc # Add import for garbage collection from core.utils import RedisClient from apps.proxy.ts_proxy.channel_status import ChannelStatus +from core.utils import send_websocket_update logger = logging.getLogger(__name__) @@ -43,11 +45,17 @@ def fetch_channel_stats(): return # return JsonResponse({'error': str(e)}, status=500) - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( + send_websocket_update( "updates", + "update", { - "type": "update", - "data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})} + "success": True, + "type": "channel_stats", + "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)}) }, + collect_garbage=True ) + + # Explicitly clean up large data structures + all_channels = None + gc.collect() diff --git a/core/tasks.py b/core/tasks.py index a6bd80cf..eee5b18f 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -2,13 +2,12 @@ from celery import shared_task from channels.layers import get_channel_layer from asgiref.sync import async_to_sync -import redis import json import logging import re import time import os -from core.utils import RedisClient +from core.utils import RedisClient, send_websocket_update from apps.proxy.ts_proxy.channel_status import ChannelStatus from apps.m3u.models import M3UAccount from apps.epg.models import EPGSource @@ -36,11 +35,6 @@ LOG_THROTTLE_SECONDS = 300 # 5 minutes # Track if this is the first scan since startup _first_scan_completed = False -@shared_task -def beat_periodic_task(): - fetch_channel_stats() - scan_and_process_files() - def throttled_log(logger_method, message, key=None, *args, **kwargs): """Only log messages with the same key once per throttle period""" if key is None: @@ -52,6 +46,11 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs): logger_method(message, *args, **kwargs) _last_log_times[key] = now +@shared_task +def beat_periodic_task(): + fetch_channel_stats() + scan_and_process_files() + @shared_task def scan_and_process_files(): global _first_scan_completed @@ -293,19 +292,23 @@ def fetch_channel_stats(): if cursor == 0: break + send_websocket_update( + "updates", + "update", + { + "success": True, + "type": "channel_stats", + "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)}) + }, + collect_garbage=True + ) + + # Explicitly clean up large data structures + all_channels = None + except Exception as e: logger.error(f"Error in channel_status: {e}", exc_info=True) return - # return JsonResponse({'error': str(e)}, status=500) - - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - "updates", - { - "type": "update", - "data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})} - }, - ) @shared_task def rehash_streams(keys): diff --git a/core/utils.py b/core/utils.py index 6b5e6815..fcff03e5 100644 --- a/core/utils.py +++ b/core/utils.py @@ -59,6 +59,10 @@ class RedisClient: client.config_set('save', '') # Disable RDB snapshots client.config_set('appendonly', 'no') # Disable AOF logging + # Set optimal memory settings + client.config_set('maxmemory-policy', 'allkeys-lru') # Use LRU eviction + client.config_set('maxmemory', '256mb') # Set reasonable memory limit + # Disable protected mode when in debug mode if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true': client.config_set('protected-mode', 'no') # Disable protected mode in debug @@ -169,12 +173,118 @@ def release_task_lock(task_name, id): # Remove the lock redis_client.delete(lock_id) -def send_websocket_event(event, success, data): +def send_websocket_update(group_name, event_type, data, collect_garbage=False): + """ + Standardized function to send WebSocket updates with proper memory management. + + Args: + group_name: The WebSocket group to send to (e.g. 'updates') + event_type: The type of message (e.g. 'update') + data: The data to send + collect_garbage: Whether to force garbage collection after sending + """ channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": {"success": True, "type": "epg_channels"} - } - ) + try: + async_to_sync(channel_layer.group_send)( + group_name, + { + 'type': event_type, + 'data': data + } + ) + except Exception as e: + logger.warning(f"Failed to send WebSocket update: {e}") + finally: + # Explicitly release references to help garbage collection + channel_layer = None + + # Force garbage collection if requested + if collect_garbage: + gc.collect() + +def send_websocket_event(event, success, data): + """Acquire a lock to prevent concurrent task execution.""" + data_payload = {"success": success, "type": event} + if data: + # Make a copy to avoid modifying the original + data_payload.update(data) + + # Use the standardized function + send_websocket_update('updates', 'update', data_payload) + + # Help garbage collection by clearing references + data_payload = None + +# Add memory monitoring utilities +def get_memory_usage(): + """Returns current memory usage in MB""" + import psutil + process = psutil.Process(os.getpid()) + return process.memory_info().rss / (1024 * 1024) + +def monitor_memory_usage(func): + """Decorator to monitor memory usage before and after function execution""" + def wrapper(*args, **kwargs): + import gc + # Force garbage collection before measuring + gc.collect() + + # Get initial memory usage + start_mem = get_memory_usage() + logger.debug(f"Memory usage before {func.__name__}: {start_mem:.2f} MB") + + # Call the original function + result = func(*args, **kwargs) + + # Force garbage collection before measuring again + gc.collect() + + # Get final memory usage + end_mem = get_memory_usage() + logger.debug(f"Memory usage after {func.__name__}: {end_mem:.2f} MB (Change: {end_mem - start_mem:.2f} MB)") + + return result + return wrapper + +def cleanup_memory(log_usage=False, force_collection=True): + """ + Comprehensive memory cleanup function to reduce memory footprint + + Args: + log_usage: Whether to log memory usage before and after cleanup + force_collection: Whether to force garbage collection + """ + logger.trace("Starting memory cleanup django memory cleanup") + # Skip logging if log level is not set to debug or more verbose (like trace) + current_log_level = logger.getEffectiveLevel() + if not current_log_level <= logging.DEBUG: + log_usage = False + if log_usage: + try: + import psutil + process = psutil.Process() + before_mem = process.memory_info().rss / (1024 * 1024) + logger.debug(f"Memory before cleanup: {before_mem:.2f} MB") + except (ImportError, Exception) as e: + logger.debug(f"Error getting memory usage: {e}") + + # Clear any object caches from Django ORM + from django.db import connection, reset_queries + reset_queries() + + # Force garbage collection + if force_collection: + # Run full collection + gc.collect(generation=2) + # Clear cyclic references + gc.collect(generation=0) + + if log_usage: + try: + import psutil + process = psutil.Process() + after_mem = process.memory_info().rss / (1024 * 1024) + logger.debug(f"Memory after cleanup: {after_mem:.2f} MB (change: {after_mem-before_mem:.2f} MB)") + except (ImportError, Exception): + pass + logger.trace("Memory cleanup complete for django") diff --git a/dispatcharr/celery.py b/dispatcharr/celery.py index a0ff2168..855acacd 100644 --- a/dispatcharr/celery.py +++ b/dispatcharr/celery.py @@ -2,6 +2,7 @@ import os from celery import Celery import logging +from celery.signals import task_postrun # Add import for signals # Initialize with defaults before Django settings are loaded DEFAULT_LOG_LEVEL = 'DEBUG' @@ -48,6 +49,56 @@ app.conf.update( worker_task_log_format='%(asctime)s %(levelname)s %(task_name)s: %(message)s', ) +# Add memory cleanup after task completion +@task_postrun.connect # Use the imported signal +def cleanup_task_memory(**kwargs): + """Clean up memory after each task completes""" + # Get task name from kwargs + task_name = kwargs.get('task').name if kwargs.get('task') else '' + + # Only run cleanup for memory-intensive tasks + memory_intensive_tasks = [ + 'apps.m3u.tasks.refresh_single_m3u_account', + 'apps.m3u.tasks.refresh_m3u_accounts', + 'apps.m3u.tasks.process_m3u_batch', + 'apps.m3u.tasks.process_xc_category', + 'apps.epg.tasks.refresh_epg_data', + 'apps.epg.tasks.refresh_all_epg_data', + 'apps.epg.tasks.parse_programs_for_source', + 'apps.epg.tasks.parse_programs_for_tvg_id', + 'apps.channels.tasks.match_epg_channels', + 'core.tasks.rehash_streams' + ] + + # Check if this is a memory-intensive task + if task_name in memory_intensive_tasks: + # Import cleanup_memory function + from core.utils import cleanup_memory + + # Use the comprehensive cleanup function + cleanup_memory(log_usage=True, force_collection=True) + + # Log memory usage if psutil is installed + try: + import psutil + process = psutil.Process() + if hasattr(process, 'memory_info'): + mem = process.memory_info().rss / (1024 * 1024) + print(f"Memory usage after {task_name}: {mem:.2f} MB") + except (ImportError, Exception): + pass + else: + # For non-intensive tasks, just log but don't force cleanup + try: + import psutil + process = psutil.Process() + if hasattr(process, 'memory_info'): + mem = process.memory_info().rss / (1024 * 1024) + if mem > 500: # Only log if using more than 500MB + print(f"High memory usage detected in {task_name}: {mem:.2f} MB") + except (ImportError, Exception): + pass + @app.on_after_configure.connect def setup_celery_logging(**kwargs): # Use our directly determined log level diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index ba5b18f9..de43c454 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -44,6 +44,36 @@ INSTALLED_APPS = [ "django_celery_beat", ] +# EPG Processing optimization settings +EPG_BATCH_SIZE = 1000 # Number of records to process in a batch +EPG_MEMORY_LIMIT = 512 # Memory limit in MB before forcing garbage collection +EPG_ENABLE_MEMORY_MONITORING = True # Whether to monitor memory usage during processing + +# Database optimization settings +DATABASE_STATEMENT_TIMEOUT = 300 # Seconds before timing out long-running queries +DATABASE_CONN_MAX_AGE = ( + 60 # Connection max age in seconds, helps with frequent reconnects +) + +# Disable atomic requests for performance-sensitive views +ATOMIC_REQUESTS = False + +# Cache settings - add caching for EPG operations +CACHES = { + "default": { + "BACKEND": "django.core.cache.backends.locmem.LocMemCache", + "LOCATION": "dispatcharr-epg-cache", + "TIMEOUT": 3600, # 1 hour cache timeout + "OPTIONS": { + "MAX_ENTRIES": 10000, + "CULL_FREQUENCY": 3, # Purge 1/3 of entries when max is reached + }, + } +} + +# Timeouts for external connections +REQUESTS_TIMEOUT = 30 # Seconds for external API requests + MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", @@ -165,6 +195,15 @@ CELERY_BROKER_TRANSPORT_OPTIONS = { CELERY_ACCEPT_CONTENT = ["json"] CELERY_TASK_SERIALIZER = "json" +# Memory management settings +# CELERY_WORKER_MAX_TASKS_PER_CHILD = 10 # Restart worker after 10 tasks to free memory +# CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Don't prefetch tasks - process one at a time +# CELERY_TASK_ACKS_LATE = True # Only acknowledge tasks after they're processed +# CELERY_TASK_TIME_LIMIT = 3600 # 1 hour time limit per task +# CELERY_TASK_SOFT_TIME_LIMIT = 3540 # Soft limit 60 seconds before hard limit +# CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True # Cancel tasks if connection lost +# CELERY_TASK_IGNORE_RESULT = True # Don't store results unless explicitly needed + CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler" CELERY_BEAT_SCHEDULE = { "fetch-channel-statuses": { @@ -275,6 +314,11 @@ LOGGING = { "level": LOG_LEVEL, # Use environment-configured level "propagate": False, # Don't propagate to root logger to avoid duplicate logs }, + "core.utils": { + "handlers": ["console"], + "level": LOG_LEVEL, + "propagate": False, + }, "apps.proxy": { "handlers": ["console"], "level": LOG_LEVEL, # Use environment-configured level diff --git a/docker/uwsgi.debug.ini b/docker/uwsgi.debug.ini index 43ecd5ce..ea567e1e 100644 --- a/docker/uwsgi.debug.ini +++ b/docker/uwsgi.debug.ini @@ -8,7 +8,7 @@ exec-before = python /app/scripts/wait_for_redis.py ; Start Redis first attach-daemon = redis-server ; Then start other services -attach-daemon = celery -A dispatcharr worker +attach-daemon = celery -A dispatcharr worker --concurrency=4 attach-daemon = celery -A dispatcharr beat attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application attach-daemon = cd /app/frontend && npm run dev diff --git a/requirements.txt b/requirements.txt index 546a24b6..732ce9ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,3 +30,4 @@ channels channels-redis django-filter django-celery-beat +lxml==5.4.0 \ No newline at end of file