diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index 9a3d76c1..e7bd06ad 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -585,7 +585,7 @@ def parse_channels_only(source): # Track memory at key points if process: - logger.info(f"[parse_channels_only] Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.debug(f"[parse_channels_only] Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") try: # Attempt to count existing channels in the database @@ -648,6 +648,7 @@ def parse_channels_only(source): epg_source=source, )) logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 1: {tvg_id} - {display_name}") + processed_channels += 1 continue # We use the cached object to check if the name has changed @@ -657,6 +658,9 @@ def parse_channels_only(source): epg_obj.name = display_name epgs_to_update.append(epg_obj) logger.debug(f"[parse_channels_only] Added channel to update to epgs_to_update: {tvg_id} - {display_name}") + else: + # No changes needed, just clear the element + logger.debug(f"[parse_channels_only] No changes needed for channel {tvg_id} - {display_name}") else: # This is a new channel that doesn't exist in our database epgs_to_create.append(EPGData( @@ -676,7 +680,7 @@ def parse_channels_only(source): logger.info(f"[parse_channels_only] Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") del epgs_to_create # Explicit deletion epgs_to_create = [] - gc.collect() + cleanup_memory(log_usage=should_log_memory, force_collection=True) if process: logger.info(f"[parse_channels_only] Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB") @@ -689,13 +693,13 @@ def parse_channels_only(source): logger.info(f"[parse_channels_only] Memory after bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB") epgs_to_update = [] # Force garbage collection - gc.collect() + cleanup_memory(log_usage=should_log_memory, force_collection=True) # Periodically clear the existing_epgs cache to prevent memory buildup if processed_channels % 1000 == 0: logger.info(f"[parse_channels_only] Clearing existing_epgs cache at {processed_channels} channels") existing_epgs.clear() - gc.collect() + cleanup_memory(log_usage=should_log_memory, force_collection=True) if process: logger.info(f"[parse_channels_only] Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB") @@ -709,62 +713,27 @@ def parse_channels_only(source): processed=processed_channels, total=total_channels ) - logger.debug(f"[parse_channels_only] Processed channel: {tvg_id} - {display_name}") + if processed_channels > total_channels: + logger.debug(f"[parse_channels_only] Processed channel {tvg_id} - {display_name} - processed {processed_channels - total_channels} additional channels") + else: + logger.debug(f"[parse_channels_only] Processed channel {tvg_id} - {display_name} - processed {processed_channels}/{total_channels}") if process: - logger.info(f"[parse_channels_only] Memory before elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.debug(f"[parse_channels_only] Memory before elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Clear memory try: # First clear the element's content - elem.clear() - - # Get the parent before we might lose reference to it - parent = elem.getparent() - if parent is not None: - # Clean up preceding siblings - while elem.getprevious() is not None: - del parent[0] - - # Try to fully detach this element from parent - try: - parent.remove(elem) - del elem - del parent - except (ValueError, KeyError, TypeError): - # Element might already be removed or detached - pass - #cleanup_memory(log_usage=should_log_memory, force_collection=True) + clear_element(elem) except Exception as e: # Just log the error and continue - don't let cleanup errors stop processing logger.debug(f"[parse_channels_only] Non-critical error during XML element cleanup: {e}") if process: - logger.info(f"[parse_channels_only] Memory after elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # Check if we should break early to avoid excessive sleep - if processed_channels >= total_channels and total_channels > 0: - logger.info(f"[parse_channels_only] Expected channel numbers hit, continuing - processed {processed_channels}/{total_channels}") - if process: - logger.debug(f"[parse_channels_only] Memory usage after {processed_channels}: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.debug(f"[parse_channels_only] Memory after elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") logger.debug(f"[parse_channels_only] Total elements processed: {total_elements_processed}") - if processed_channels == total_channels: - if process: - logger.info(f"[parse_channels_only] Processed all channels current memory: {process.memory_info().rss / 1024 / 1024:.2f} MB") - else: - logger.info(f"[parse_channels_only] Processed all channels") else: - # Just clear the element to avoid memory leaks - if elem is not None: - elem.clear() - parent = elem.getparent() - if parent is not None: - while elem.getprevious() is not None: - del parent[0] - try: - parent.remove(elem) - except (ValueError, KeyError, TypeError): - pass - del elem + clear_element(elem) continue except (etree.XMLSyntaxError, Exception) as xml_error: @@ -776,8 +745,9 @@ def parse_channels_only(source): send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(xml_error)) return False if process: - logger.debug(f"[parse_channels_only] Memory before final batch creation: {process.memory_info().rss / 1024 / 1024:.2f} MB") - + logger.info(f"[parse_channels_only] Processed {processed_channels} channels current memory: {process.memory_info().rss / 1024 / 1024:.2f} MB") + else: + logger.info(f"[parse_channels_only] Processed {processed_channels} channels") # Process any remaining items if epgs_to_create: EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) @@ -826,8 +796,9 @@ def parse_channels_only(source): send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(e)) return False finally: - # Add more detailed cleanup in finally block - logger.debug("In finally block, ensuring cleanup") + # Cleanup memory and close file + if process: + logger.debug(f"[parse_channels_only] Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") try: if 'channel_parser' in locals(): del channel_parser @@ -1053,41 +1024,10 @@ def parse_programs_for_tvg_id(epg_id): logger.error(f"Error processing program for {epg.tvg_id}: {e}", exc_info=True) else: # Immediately clean up non-matching elements to reduce memory pressure - elem.clear() - parent = elem.getparent() - if parent is not None: - while elem.getprevious() is not None: - del parent[0] - try: - parent.remove(elem) - except (ValueError, KeyError, TypeError): - pass - del elem + if elem is not None: + clear_element(elem) continue - # Important: Clear the element to avoid memory leaks using a more robust approach - try: - # First clear the element's content - elem.clear() - # Get the parent before we might lose reference to it - parent = elem.getparent() - if parent is not None: - # Clean up preceding siblings - while elem.getprevious() is not None: - del parent[0] - # Try to fully detach this element from parent - try: - parent.remove(elem) - del elem - del parent - except (ValueError, KeyError, TypeError): - # Element might already be removed or detached - pass - - except Exception as e: - # Just log the error and continue - don't let cleanup errors stop processing - logger.trace(f"Non-critical error during XML element cleanup: {e}") - # Make sure to close the file and release parser resources if source_file: source_file.close() @@ -1502,3 +1442,15 @@ def extract_custom_properties(prog): custom_props[kw.replace('-', '_')] = True return custom_props + +def clear_element(elem): + """Clear an XML element and its parent to free memory.""" + try: + elem.clear() + parent = elem.getparent() + if parent is not None: + while elem.getprevious() is not None: + del parent[0] + parent.remove(elem) + except Exception as e: + logger.warning(f"Error clearing XML element: {e}", exc_info=True)