From ed665584e997cc849c90c7016a8bd646e699fc9e Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sun, 18 May 2025 17:05:03 -0500 Subject: [PATCH] The struggle is real --- apps/epg/tasks.py | 268 ++++++++++++++++++++++++++------------------ apps/m3u/tasks.py | 18 +-- apps/proxy/tasks.py | 16 ++- core/tasks.py | 27 +++-- core/utils.py | 48 ++++++-- 5 files changed, 234 insertions(+), 143 deletions(-) diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index 04a96ae0..bc59771b 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -10,6 +10,7 @@ from datetime import datetime, timedelta, timezone as dt_timezone import gc # Add garbage collection module import json from lxml import etree # Using lxml exclusively +import psutil # Add import for memory tracking from celery import shared_task from django.conf import settings @@ -22,7 +23,7 @@ from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from .models import EPGSource, EPGData, ProgramData -from core.utils import acquire_task_lock, release_task_lock +from core.utils import acquire_task_lock, release_task_lock, send_websocket_update logger = logging.getLogger(__name__) @@ -40,18 +41,18 @@ def send_epg_update(source_id, action, progress, **kwargs): # Add the additional key-value pairs from kwargs data.update(kwargs) - # Now, send the updated data dictionary - channel_layer = get_channel_layer() - try: - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - 'data': data - } - ) - except Exception as e: - logger.warning(f"Failed to send WebSocket update: {e}") + # Use the standardized update function with garbage collection for program parsing + # This is a high-frequency operation that needs more aggressive memory management + collect_garbage = action == "parsing_programs" and progress % 10 == 0 + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) + + # Explicitly clear references + data = None + + # For high-frequency parsing, occasionally force additional garbage collection + # to prevent memory buildup + if action == "parsing_programs" and progress % 50 == 0: + gc.collect() def delete_epg_refresh_task_by_id(epg_id): @@ -529,18 +530,16 @@ def parse_channels_only(source): # Update status to error source.status = 'error' source.last_message = f"No URL provided, cannot fetch EPG data" - source.save(update_fields=['status', 'last_message']) - send_epg_update(source.id, "parsing_channels", 100, status="error", error="No URL provided") - return False + source.save(update_fields=['updated_at']) - file_path = new_path - logger.info(f"Parsing channels from EPG file: {file_path}") - - # Add memory tracking at start - import psutil - process = psutil.Process() - initial_memory = process.memory_info().rss / 1024 / 1024 - logger.info(f"Initial memory usage: {initial_memory:.2f} MB") + # Initialize process variable for memory tracking + try: + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"Initial memory usage: {initial_memory:.2f} MB") + except (ImportError, NameError): + process = None + logger.warning("psutil not available for memory tracking") # Replace full dictionary load with more efficient lookup set existing_tvg_ids = set() @@ -574,7 +573,8 @@ def parse_channels_only(source): progress = 0 # Initialize progress variable here # Track memory at key points - logger.info(f"Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") try: # Create a parser with the desired options @@ -584,7 +584,8 @@ def parse_channels_only(source): # Open the file first logger.info(f"Opening file for initial channel count: {file_path}") source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') - logger.info(f"Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Count channels try: @@ -597,7 +598,8 @@ def parse_channels_only(source): # Close the file to reset position logger.info(f"Closing initial file handle") source_file.close() - logger.info(f"Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Update progress after counting send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels) @@ -605,11 +607,13 @@ def parse_channels_only(source): # Reset file position for actual processing logger.info(f"Re-opening file for channel parsing: {file_path}") source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') - logger.info(f"Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") logger.info(f"Creating iterparse context") channel_parser = etree.iterparse(source_file, events=('end',), tag='channel') - logger.info(f"Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB") channel_count = 0 for _, elem in channel_parser: @@ -658,11 +662,13 @@ def parse_channels_only(source): if len(epgs_to_create) >= batch_size: logger.info(f"Bulk creating {len(epgs_to_create)} EPG entries") EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) - logger.info(f"Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") del epgs_to_create # Explicit deletion epgs_to_create = [] gc.collect() - logger.info(f"Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB") if len(epgs_to_update) >= batch_size: EPGData.objects.bulk_update(epgs_to_update, ["name"]) @@ -675,7 +681,8 @@ def parse_channels_only(source): logger.info(f"Clearing existing_epgs cache at {processed_channels} channels") existing_epgs.clear() gc.collect() - logger.info(f"Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Send progress updates if processed_channels % 100 == 0 or processed_channels == total_channels: @@ -689,9 +696,29 @@ def parse_channels_only(source): ) logger.debug(f"Processed channel: {tvg_id} - {display_name}") # Clear memory - elem.clear() - while elem.getprevious() is not None: - del elem.getparent()[0] + try: + # First clear the element's content + elem.clear() + + # Get the parent before we might lose reference to it + parent = elem.getparent() + if parent is not None: + # Clean up preceding siblings + while elem.getprevious() is not None: + del parent[0] + + # Try to fully detach this element from parent + try: + parent.remove(elem) + del elem + del parent + except (ValueError, KeyError, TypeError): + # Element might already be removed or detached + pass + + except Exception as e: + # Just log the error and continue - don't let cleanup errors stop processing + logger.debug(f"Non-critical error during XML element cleanup: {e}") # Check if we should break early to avoid excessive sleep if processed_channels >= total_channels and total_channels > 0: @@ -700,7 +727,8 @@ def parse_channels_only(source): # Explicit cleanup before sleeping logger.info(f"Completed channel parsing loop, processed {processed_channels} channels") - logger.info(f"Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Explicit cleanup of the parser del channel_parser @@ -713,7 +741,8 @@ def parse_channels_only(source): # Force garbage collection gc.collect() - logger.info(f"Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Remove long sleep that might be causing issues # time.sleep(200) # This seems excessive and may be causing issues @@ -737,9 +766,11 @@ def parse_channels_only(source): logger.info(f"Updated final batch of {len(epgs_to_update)} EPG entries") # Final garbage collection and memory tracking - logger.info(f"Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") gc.collect() - logger.info(f"Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") + if process: + logger.info(f"Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Update source status with channel count source.status = 'success' @@ -755,14 +786,7 @@ def parse_channels_only(source): channels_count=processed_channels ) - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": {"success": True, "type": "epg_channels"} - } - ) + send_websocket_update('updates', 'update', {"success": True, "type": "epg_channels"}) logger.info(f"Finished parsing channel info. Found {processed_channels} channels.") # Remove excessive sleep @@ -790,15 +814,20 @@ def parse_channels_only(source): logger.info("In finally block, ensuring cleanup") existing_tvg_ids = None existing_epgs = None - gc.collect() - # Check final memory usage try: - import psutil process = psutil.Process() final_memory = process.memory_info().rss / 1024 / 1024 logger.info(f"Final memory usage: {final_memory:.2f} MB") except: pass + # Explicitly clear the process object to prevent potential memory leaks + if 'process' in locals() and process is not None: + process = None + + # Check final memory usage after clearing process + gc.collect() + + @shared_task @@ -806,17 +835,16 @@ def parse_programs_for_tvg_id(epg_id): if not acquire_task_lock('parse_epg_programs', epg_id): logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task") return "Task already running" - source_file = None program_parser = None programs_to_create = None epg = None epg_source = None + programs_processed = 0 try: # Add memory tracking try: - import psutil process = psutil.Process() initial_memory = process.memory_info().rss / 1024 / 1024 logger.info(f"[parse_programs_for_tvg_id] Initial memory usage: {initial_memory:.2f} MB") @@ -834,39 +862,16 @@ def parse_programs_for_tvg_id(epg_id): logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}") # First, remove all existing programs - use chunked delete to avoid memory issues + # Delete old programs chunk_size = 5000 - programs_to_delete = ProgramData.objects.filter(epg=epg) - total_programs = programs_to_delete.count() - - if total_programs > 0: - logger.info(f"Deleting {total_programs} existing programs for {epg.tvg_id}") - - # More memory-efficient approach using cursor-based pagination - last_id = 0 - while True: - # Get batch of IDs greater than the last ID processed - id_batch = list(programs_to_delete.filter(id__gt=last_id).order_by('id').values_list('id', flat=True)[:chunk_size]) - if not id_batch: - break - - # Store the last ID before deleting the batch variable - if id_batch: - max_id = id_batch[-1] - else: - max_id = 0 - - # Delete this batch - ProgramData.objects.filter(id__in=id_batch).delete() - # Release memory immediately - del id_batch - gc.collect() - - # Update last_id for next iteration using our stored value - last_id = max_id - - # Explicitly delete query objects - del programs_to_delete - del last_id + last_id = 0 + while True: + ids = list(ProgramData.objects.filter(epg=epg, id__gt=last_id).order_by('id').values_list('id', flat=True)[:chunk_size]) + if not ids: + break + ProgramData.objects.filter(id__in=ids).delete() + last_id = ids[-1] + del ids gc.collect() file_path = epg_source.file_path @@ -929,7 +934,7 @@ def parse_programs_for_tvg_id(epg_id): # Memory usage tracking if process: mem_before = process.memory_info().rss / 1024 / 1024 - logger.info(f"[parse_programs_for_tvg_id] Memory before parsing: {mem_before:.2f} MB") + logger.info(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB") programs_to_create = [] batch_size = 1000 # Process in batches to limit memory usage @@ -968,9 +973,9 @@ def parse_programs_for_tvg_id(epg_id): # Extract custom properties custom_props = extract_custom_properties(elem) - custom_properties_json = None + if custom_props: - logger.debug(f"Number of custom properties: {len(custom_props)}") + logger.trace(f"Number of custom properties: {len(custom_props)}") try: custom_properties_json = json.dumps(custom_props) except Exception as e: @@ -986,30 +991,58 @@ def parse_programs_for_tvg_id(epg_id): tvg_id=epg.tvg_id, custom_properties=custom_properties_json )) - programs_processed += 1 - custom_props = None - custom_properties_json = None + del custom_props + del custom_properties_json + del start_time + del end_time + del title + del desc + del sub_title + elem.clear() + parent = elem.getparent() + if parent is not None: + while elem.getprevious() is not None: + del parent[0] + parent.remove(elem) + del elem + del parent + #gc.collect() # Batch processing if len(programs_to_create) >= batch_size: ProgramData.objects.bulk_create(programs_to_create) logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}") - del programs_to_create # Explicit deletion - programs_to_create = [] - - # Force more aggressive garbage collection - custom_props = None - custom_properties_json = None + del programs_to_create + del custom_props + del custom_properties_json gc.collect() + #continue except Exception as e: logger.error(f"Error processing program for {epg.tvg_id}: {e}", exc_info=True) - # Important: Clear the element to avoid memory leaks (lxml specific method) - elem.clear() - # Also eliminate ancestors to prevent memory leaks - while elem.getprevious() is not None: - del elem.getparent()[0] + # Important: Clear the element to avoid memory leaks using a more robust approach + try: + # First clear the element's content + elem.clear() + # Get the parent before we might lose reference to it + parent = elem.getparent() + if parent is not None: + # Clean up preceding siblings + while elem.getprevious() is not None: + del parent[0] + # Try to fully detach this element from parent + try: + parent.remove(elem) + del elem + del parent + except (ValueError, KeyError, TypeError): + # Element might already be removed or detached + pass + + except Exception as e: + # Just log the error and continue - don't let cleanup errors stop processing + logger.debug(f"Non-critical error during XML element cleanup: {e}") # Make sure to close the file and release parser resources if source_file: @@ -1032,18 +1065,21 @@ def parse_programs_for_tvg_id(epg_id): if source_file: source_file.close() source_file = None + # Memory tracking after processing + if process: + mem_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 1 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") # Process any remaining items if programs_to_create: ProgramData.objects.bulk_create(programs_to_create) logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}") - del programs_to_create - programs_to_create = [] + programs_to_create = None + custom_props = None + custom_properties_json = None + #del programs_to_create + #programs_to_create = [] - # Memory tracking after processing - if process: - mem_after = process.memory_info().rss / 1024 / 1024 - logger.info(f"[parse_programs_for_tvg_id] Memory after parsing {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") # Final garbage collection gc.collect() @@ -1066,25 +1102,31 @@ def parse_programs_for_tvg_id(epg_id): source_file.close() except: pass - + # Memory tracking after processing + if process: + mem_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 2 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") source_file = None program_parser = None programs_to_create = None epg = None epg_source = None - + # Explicitly clear the process object to prevent potential memory leaks + if 'process' in locals() and process is not None: + process = None # Force garbage collection before releasing lock gc.collect() + release_task_lock('parse_epg_programs', epg_id) + def parse_programs_for_source(epg_source, tvg_id=None): # Send initial programs parsing notification send_epg_update(epg_source.id, "parsing_programs", 0) - + #time.sleep(100) # Add memory tracking try: - import psutil process = psutil.Process() initial_memory = process.memory_info().rss / 1024 / 1024 logger.info(f"[parse_programs_for_source] Initial memory usage: {initial_memory:.2f} MB") @@ -1219,6 +1261,10 @@ def parse_programs_for_source(epg_source, tvg_id=None): processed = None gc.collect() + # Explicitly clear the process object to prevent potential memory leaks + if 'process' in locals() and process is not None: + process = None + def fetch_schedules_direct(source): logger.info(f"Fetching Schedules Direct data from source: {source.name}") diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 71ee5f49..513c550d 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -1088,6 +1088,8 @@ def refresh_single_m3u_account(account_id): return f"Dispatched jobs complete." +from core.utils import send_websocket_update + def send_m3u_update(account_id, action, progress, **kwargs): # Start with the base data dictionary data = { @@ -1111,12 +1113,10 @@ def send_m3u_update(account_id, action, progress, **kwargs): # Add the additional key-value pairs from kwargs data.update(kwargs) - # Now, send the updated data dictionary - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - 'data': data - } - ) + # Use the standardized function with memory management + # Enable garbage collection for certain operations + collect_garbage = action == "parsing" and progress % 25 == 0 + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) + + # Explicitly clear data reference to help garbage collection + data = None diff --git a/apps/proxy/tasks.py b/apps/proxy/tasks.py index a4aaf8e5..00e3e039 100644 --- a/apps/proxy/tasks.py +++ b/apps/proxy/tasks.py @@ -6,8 +6,10 @@ import redis import json import logging import re +import gc # Add import for garbage collection from core.utils import RedisClient from apps.proxy.ts_proxy.channel_status import ChannelStatus +from core.utils import send_websocket_update logger = logging.getLogger(__name__) @@ -43,11 +45,17 @@ def fetch_channel_stats(): return # return JsonResponse({'error': str(e)}, status=500) - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( + send_websocket_update( "updates", + "update", { - "type": "update", - "data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})} + "success": True, + "type": "channel_stats", + "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)}) }, + collect_garbage=True ) + + # Explicitly clean up large data structures + all_channels = None + gc.collect() diff --git a/core/tasks.py b/core/tasks.py index fbd9277d..5e60a4e4 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -8,7 +8,7 @@ import logging import re import time import os -from core.utils import RedisClient +from core.utils import RedisClient, send_websocket_update from apps.proxy.ts_proxy.channel_status import ChannelStatus from apps.m3u.models import M3UAccount from apps.epg.models import EPGSource @@ -317,19 +317,24 @@ def fetch_channel_stats(): if cursor == 0: break + send_websocket_update( + "updates", + "update", + { + "success": True, + "type": "channel_stats", + "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)}) + }, + collect_garbage=True + ) + + # Explicitly clean up large data structures + all_channels = None + gc.collect() + except Exception as e: logger.error(f"Error in channel_status: {e}", exc_info=True) return - # return JsonResponse({'error': str(e)}, status=500) - - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - "updates", - { - "type": "update", - "data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})} - }, - ) @shared_task def rehash_streams(keys): diff --git a/core/utils.py b/core/utils.py index 01463ad9..abe1c1f2 100644 --- a/core/utils.py +++ b/core/utils.py @@ -173,15 +173,47 @@ def release_task_lock(task_name, id): # Remove the lock redis_client.delete(lock_id) -def send_websocket_event(event, success, data): +def send_websocket_update(group_name, event_type, data, collect_garbage=False): + """ + Standardized function to send WebSocket updates with proper memory management. + + Args: + group_name: The WebSocket group to send to (e.g. 'updates') + event_type: The type of message (e.g. 'update') + data: The data to send + collect_garbage: Whether to force garbage collection after sending + """ channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": {"success": True, "type": "epg_channels"} - } - ) + try: + async_to_sync(channel_layer.group_send)( + group_name, + { + 'type': event_type, + 'data': data + } + ) + except Exception as e: + logger.warning(f"Failed to send WebSocket update: {e}") + finally: + # Explicitly release references to help garbage collection + channel_layer = None + + # Force garbage collection if requested + if collect_garbage: + gc.collect() + +def send_websocket_event(event, success, data): + """Acquire a lock to prevent concurrent task execution.""" + data_payload = {"success": success, "type": event} + if data: + # Make a copy to avoid modifying the original + data_payload.update(data) + + # Use the standardized function + send_websocket_update('updates', 'update', data_payload) + + # Help garbage collection by clearing references + data_payload = None # Add memory monitoring utilities def get_memory_usage():