From 6087ecadf0cdd6a2d43b5bb130cfcd9e732458bd Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Mon, 19 May 2025 09:42:21 -0500 Subject: [PATCH] Cleaning up added gc's --- apps/epg/tasks.py | 107 ++++++++++++++++------------------------------ core/utils.py | 2 + 2 files changed, 40 insertions(+), 69 deletions(-) diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index d808a4c6..eb361429 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -536,7 +536,7 @@ def parse_channels_only(source): try: process = psutil.Process() initial_memory = process.memory_info().rss / 1024 / 1024 - logger.info(f"Initial memory usage: {initial_memory:.2f} MB") + logger.info(f"[parse_channels_only] Initial memory usage: {initial_memory:.2f} MB") except (ImportError, NameError): process = None logger.warning("psutil not available for memory tracking") @@ -574,7 +574,7 @@ def parse_channels_only(source): # Track memory at key points if process: - logger.info(f"Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") try: # Create a parser with the desired options @@ -585,7 +585,7 @@ def parse_channels_only(source): logger.info(f"Opening file for initial channel count: {file_path}") source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') if process: - logger.info(f"Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Count channels try: @@ -599,7 +599,7 @@ def parse_channels_only(source): logger.info(f"Closing initial file handle") source_file.close() if process: - logger.info(f"Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Update progress after counting send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels) @@ -608,12 +608,12 @@ def parse_channels_only(source): logger.info(f"Re-opening file for channel parsing: {file_path}") source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') if process: - logger.info(f"Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") logger.info(f"Creating iterparse context") channel_parser = etree.iterparse(source_file, events=('end',), tag='channel') if process: - logger.info(f"Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB") channel_count = 0 for _, elem in channel_parser: @@ -663,12 +663,12 @@ def parse_channels_only(source): logger.info(f"Bulk creating {len(epgs_to_create)} EPG entries") EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) if process: - logger.info(f"Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") del epgs_to_create # Explicit deletion epgs_to_create = [] gc.collect() if process: - logger.info(f"Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB") if len(epgs_to_update) >= batch_size: EPGData.objects.bulk_update(epgs_to_update, ["name"]) @@ -682,7 +682,7 @@ def parse_channels_only(source): existing_epgs.clear() gc.collect() if process: - logger.info(f"Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Send progress updates if processed_channels % 100 == 0 or processed_channels == total_channels: @@ -715,6 +715,7 @@ def parse_channels_only(source): except (ValueError, KeyError, TypeError): # Element might already be removed or detached pass + cleanup_memory(log_usage=True, force_collection=True) except Exception as e: # Just log the error and continue - don't let cleanup errors stop processing @@ -728,7 +729,7 @@ def parse_channels_only(source): # Explicit cleanup before sleeping logger.info(f"Completed channel parsing loop, processed {processed_channels} channels") if process: - logger.info(f"Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Explicit cleanup of the parser del channel_parser @@ -742,7 +743,7 @@ def parse_channels_only(source): # Force garbage collection gc.collect() if process: - logger.info(f"Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Remove long sleep that might be causing issues # time.sleep(200) # This seems excessive and may be causing issues @@ -767,10 +768,10 @@ def parse_channels_only(source): # Final garbage collection and memory tracking if process: - logger.info(f"Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") gc.collect() if process: - logger.info(f"Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"[parse_channels_only] Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Update source status with channel count source.status = 'success' @@ -814,20 +815,19 @@ def parse_channels_only(source): logger.info("In finally block, ensuring cleanup") existing_tvg_ids = None existing_epgs = None - try: - process = psutil.Process() - final_memory = process.memory_info().rss / 1024 / 1024 - logger.info(f"Final memory usage: {final_memory:.2f} MB") - except: - pass - # Explicitly clear the process object to prevent potential memory leaks - if 'process' in locals() and process is not None: - process = None # Check final memory usage after clearing process gc.collect() # Add comprehensive cleanup at end of channel parsing cleanup_memory(log_usage=True, force_collection=True) + try: + process = psutil.Process() + final_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_channels_only] Final memory usage: {final_memory:.2f} MB") + process = None + except: + pass + @shared_task @@ -943,9 +943,6 @@ def parse_programs_for_tvg_id(epg_id): programs_processed = 0 try: - # Create a parser with the desired options - #parser = etree.XMLParser(huge_tree=True, remove_blank_text=True) - # Open the file properly source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') @@ -1093,48 +1090,38 @@ def parse_programs_for_tvg_id(epg_id): programs_to_create = None custom_props = None custom_properties_json = None - #del programs_to_create - #programs_to_create = [] - # Final garbage collection - gc.collect() - - # One additional garbage collection specifically for lxml elements - # which can sometimes be retained due to reference cycles - gc.collect() - + logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") + finally: # Reset internal caches and pools that lxml might be keeping try: etree.clear_error_log() except: pass - - logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") - finally: # Explicit cleanup of all potentially large objects if source_file: try: source_file.close() except: pass - # Memory tracking after processing - if process: - mem_after = process.memory_info().rss / 1024 / 1024 - logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 2 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") source_file = None program_parser = None programs_to_create = None - epg = None + epg_source = None - # Explicitly clear the process object to prevent potential memory leaks - if 'process' in locals() and process is not None: - process = None # Force garbage collection before releasing lock gc.collect() # Add comprehensive cleanup before releasing lock cleanup_memory(log_usage=True, force_collection=True) + # Memory tracking after processing + if process: + mem_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Final memory usage {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") + process = None + epg = None + programs_processed = None release_task_lock('parse_epg_programs', epg_id) @@ -1172,11 +1159,6 @@ def parse_programs_for_source(epg_source, tvg_id=None): channel_count = 0 updated_count = 0 processed = 0 - - # Memory check before batch processing - if process: - logger.info(f"[parse_programs_for_source] Memory before batch processing: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # Process in batches using cursor-based approach to limit memory usage last_id = 0 while True: @@ -1207,18 +1189,10 @@ def parse_programs_for_source(epg_source, tvg_id=None): logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True) failed_entries.append(f"{epg.tvg_id}: {str(e)}") - # Memory check after processing batch - if process: - logger.info(f"[parse_programs_for_source] Memory after processing batch: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # Force garbage collection after each batch batch_entries = None # Remove reference to help garbage collection gc.collect() - # Memory check after garbage collection - if process: - logger.info(f"[parse_programs_for_source] Memory after gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # If there were failures, include them in the message but continue if failed_entries: epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed @@ -1265,11 +1239,7 @@ def parse_programs_for_source(epg_source, tvg_id=None): return False finally: # Final memory cleanup and tracking - if process: - # Force garbage collection before measuring - gc.collect() - final_memory = process.memory_info().rss / 1024 / 1024 - logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB") + # Explicitly release any remaining large data structures failed_entries = None @@ -1279,14 +1249,13 @@ def parse_programs_for_source(epg_source, tvg_id=None): processed = None gc.collect() - # Explicitly clear the process object to prevent potential memory leaks - if 'process' in locals() and process is not None: - process = None - # Add comprehensive memory cleanup at the end cleanup_memory(log_usage=True, force_collection=True) - - + if process: + final_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB difference: {final_memory - initial_memory:.2f} MB") + # Explicitly clear the process object to prevent potential memory leaks + process = None def fetch_schedules_direct(source): logger.info(f"Fetching Schedules Direct data from source: {source.name}") try: diff --git a/core/utils.py b/core/utils.py index 7143a349..1dabf3a6 100644 --- a/core/utils.py +++ b/core/utils.py @@ -254,6 +254,7 @@ def cleanup_memory(log_usage=True, force_collection=True): log_usage: Whether to log memory usage before and after cleanup force_collection: Whether to force garbage collection """ + logger.debug("Starting memory cleanup django memory cleanup") if log_usage: try: import psutil @@ -282,3 +283,4 @@ def cleanup_memory(log_usage=True, force_collection=True): logger.debug(f"Memory after cleanup: {after_mem:.2f} MB (change: {after_mem-before_mem:.2f} MB)") except (ImportError, Exception): pass + logger.debug("Memory cleanup complete for django")