From 8bf093d79b52d7bdac9702266ebaecb409740eba Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Tue, 20 May 2025 13:53:02 -0500 Subject: [PATCH] Increased logging and cleanup --- apps/epg/tasks.py | 72 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index eb361429..dfb290c0 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -616,7 +616,9 @@ def parse_channels_only(source): logger.info(f"[parse_channels_only] Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB") channel_count = 0 + total_elements_processed = 0 # Track total elements processed, not just channels for _, elem in channel_parser: + total_elements_processed += 1 channel_count += 1 tvg_id = elem.get('id', '').strip() if tvg_id: @@ -660,7 +662,7 @@ def parse_channels_only(source): # Batch processing if len(epgs_to_create) >= batch_size: - logger.info(f"Bulk creating {len(epgs_to_create)} EPG entries") + logger.info(f"[parse_channels_only] Bulk creating {len(epgs_to_create)} EPG entries") EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) if process: logger.info(f"[parse_channels_only] Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") @@ -678,7 +680,7 @@ def parse_channels_only(source): # Periodically clear the existing_epgs cache to prevent memory buildup if processed_channels % 1000 == 0: - logger.info(f"Clearing existing_epgs cache at {processed_channels} channels") + logger.info(f"[parse_channels_only] Clearing existing_epgs cache at {processed_channels} channels") existing_epgs.clear() gc.collect() if process: @@ -694,7 +696,7 @@ def parse_channels_only(source): processed=processed_channels, total=total_channels ) - logger.debug(f"Processed channel: {tvg_id} - {display_name}") + logger.debug(f"[parse_channels_only] Processed channel: {tvg_id} - {display_name}") # Clear memory try: # First clear the element's content @@ -719,26 +721,72 @@ def parse_channels_only(source): except Exception as e: # Just log the error and continue - don't let cleanup errors stop processing - logger.debug(f"Non-critical error during XML element cleanup: {e}") + logger.debug(f"[parse_channels_only] Non-critical error during XML element cleanup: {e}") # Check if we should break early to avoid excessive sleep if processed_channels >= total_channels and total_channels > 0: - logger.info(f"Breaking channel processing loop - processed {processed_channels}/{total_channels}") + logger.info(f"[parse_channels_only] Expected channel numbers hit, continuing - processed {processed_channels}/{total_channels}") + logger.debug(f"[parse_channels_only] Memory usage after {processed_channels}: {process.memory_info().rss / 1024 / 1024:.2f} MB") break + logger.debug(f"[parse_channels_only] Total elements processed: {total_elements_processed}") + # Add periodic forced cleanup based on TOTAL ELEMENTS, not just channels + # This ensures we clean up even if processing many non-channel elements + if total_elements_processed % 1000 == 0: + logger.info(f"[parse_channels_only] Performing preventative memory cleanup after {total_elements_processed} elements (found {processed_channels} channels)") + # Close and reopen the parser to release memory + if source_file and channel_parser: + # First clear element references + elem.clear() + if elem.getparent() is not None: + elem.getparent().remove(elem) + + # Reset parser state + del channel_parser + channel_parser = None + gc.collect() + + # Perform thorough cleanup + cleanup_memory(log_usage=True, force_collection=True) + + # Create a new parser context + channel_parser = etree.iterparse(source_file, events=('end',), tag='channel') + logger.info(f"[parse_channels_only] Recreated parser context after memory cleanup") + + # Also do cleanup based on processed channels as before + elif processed_channels % 1000 == 0 and processed_channels > 0: + logger.info(f"[parse_channels_only] Performing preventative memory cleanup at {processed_channels} channels") + # Close and reopen the parser to release memory + if source_file and channel_parser: + # First clear element references + elem.clear() + if elem.getparent() is not None: + elem.getparent().remove(elem) + + # Reset parser state + del channel_parser + channel_parser = None + gc.collect() + + # Perform thorough cleanup + cleanup_memory(log_usage=True, force_collection=True) + + # Create a new parser context + channel_parser = etree.iterparse(source_file, events=('end',), tag='channel') + logger.info(f"[parse_channels_only] Recreated parser context after memory cleanup") # Explicit cleanup before sleeping - logger.info(f"Completed channel parsing loop, processed {processed_channels} channels") + logger.info(f"[parse_channels_only] Completed channel parsing loop, processed {processed_channels} channels") if process: logger.info(f"[parse_channels_only] Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Explicit cleanup of the parser del channel_parser - logger.info(f"Deleted channel_parser object") + logger.info(f"[parse_channels_only] Deleted channel_parser object") # Close the file - logger.info(f"Closing file: {file_path}") + logger.info(f"[parse_channels_only] Closing file: {file_path}") source_file.close() - logger.info(f"File closed: {file_path}") + logger.info(f"[parse_channels_only] File closed: {file_path}") # Force garbage collection gc.collect() @@ -749,7 +797,7 @@ def parse_channels_only(source): # time.sleep(200) # This seems excessive and may be causing issues except (etree.XMLSyntaxError, Exception) as xml_error: - logger.error(f"XML parsing failed: {xml_error}") + logger.error(f"[parse_channels_only] XML parsing failed: {xml_error}") # Update status to error source.status = 'error' source.last_message = f"Error parsing XML file: {str(xml_error)}" @@ -760,11 +808,11 @@ def parse_channels_only(source): # Process any remaining items if epgs_to_create: EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) - logger.info(f"Created final batch of {len(epgs_to_create)} EPG entries") + logger.info(f"[parse_channels_only] Created final batch of {len(epgs_to_create)} EPG entries") if epgs_to_update: EPGData.objects.bulk_update(epgs_to_update, ["name"]) - logger.info(f"Updated final batch of {len(epgs_to_update)} EPG entries") + logger.info(f"[parse_channels_only] Updated final batch of {len(epgs_to_update)} EPG entries") # Final garbage collection and memory tracking if process: