mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
Cleaning up added gc's
This commit is contained in:
parent
7c809931d7
commit
6087ecadf0
2 changed files with 40 additions and 69 deletions
|
|
@ -536,7 +536,7 @@ def parse_channels_only(source):
|
|||
try:
|
||||
process = psutil.Process()
|
||||
initial_memory = process.memory_info().rss / 1024 / 1024
|
||||
logger.info(f"Initial memory usage: {initial_memory:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Initial memory usage: {initial_memory:.2f} MB")
|
||||
except (ImportError, NameError):
|
||||
process = None
|
||||
logger.warning("psutil not available for memory tracking")
|
||||
|
|
@ -574,7 +574,7 @@ def parse_channels_only(source):
|
|||
|
||||
# Track memory at key points
|
||||
if process:
|
||||
logger.info(f"Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
try:
|
||||
# Create a parser with the desired options
|
||||
|
|
@ -585,7 +585,7 @@ def parse_channels_only(source):
|
|||
logger.info(f"Opening file for initial channel count: {file_path}")
|
||||
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
|
||||
if process:
|
||||
logger.info(f"Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# Count channels
|
||||
try:
|
||||
|
|
@ -599,7 +599,7 @@ def parse_channels_only(source):
|
|||
logger.info(f"Closing initial file handle")
|
||||
source_file.close()
|
||||
if process:
|
||||
logger.info(f"Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# Update progress after counting
|
||||
send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels)
|
||||
|
|
@ -608,12 +608,12 @@ def parse_channels_only(source):
|
|||
logger.info(f"Re-opening file for channel parsing: {file_path}")
|
||||
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
|
||||
if process:
|
||||
logger.info(f"Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
logger.info(f"Creating iterparse context")
|
||||
channel_parser = etree.iterparse(source_file, events=('end',), tag='channel')
|
||||
if process:
|
||||
logger.info(f"Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
channel_count = 0
|
||||
for _, elem in channel_parser:
|
||||
|
|
@ -663,12 +663,12 @@ def parse_channels_only(source):
|
|||
logger.info(f"Bulk creating {len(epgs_to_create)} EPG entries")
|
||||
EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True)
|
||||
if process:
|
||||
logger.info(f"Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
del epgs_to_create # Explicit deletion
|
||||
epgs_to_create = []
|
||||
gc.collect()
|
||||
if process:
|
||||
logger.info(f"Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
if len(epgs_to_update) >= batch_size:
|
||||
EPGData.objects.bulk_update(epgs_to_update, ["name"])
|
||||
|
|
@ -682,7 +682,7 @@ def parse_channels_only(source):
|
|||
existing_epgs.clear()
|
||||
gc.collect()
|
||||
if process:
|
||||
logger.info(f"Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# Send progress updates
|
||||
if processed_channels % 100 == 0 or processed_channels == total_channels:
|
||||
|
|
@ -715,6 +715,7 @@ def parse_channels_only(source):
|
|||
except (ValueError, KeyError, TypeError):
|
||||
# Element might already be removed or detached
|
||||
pass
|
||||
cleanup_memory(log_usage=True, force_collection=True)
|
||||
|
||||
except Exception as e:
|
||||
# Just log the error and continue - don't let cleanup errors stop processing
|
||||
|
|
@ -728,7 +729,7 @@ def parse_channels_only(source):
|
|||
# Explicit cleanup before sleeping
|
||||
logger.info(f"Completed channel parsing loop, processed {processed_channels} channels")
|
||||
if process:
|
||||
logger.info(f"Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# Explicit cleanup of the parser
|
||||
del channel_parser
|
||||
|
|
@ -742,7 +743,7 @@ def parse_channels_only(source):
|
|||
# Force garbage collection
|
||||
gc.collect()
|
||||
if process:
|
||||
logger.info(f"Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# Remove long sleep that might be causing issues
|
||||
# time.sleep(200) # This seems excessive and may be causing issues
|
||||
|
|
@ -767,10 +768,10 @@ def parse_channels_only(source):
|
|||
|
||||
# Final garbage collection and memory tracking
|
||||
if process:
|
||||
logger.info(f"Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
gc.collect()
|
||||
if process:
|
||||
logger.info(f"Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
logger.info(f"[parse_channels_only] Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# Update source status with channel count
|
||||
source.status = 'success'
|
||||
|
|
@ -814,20 +815,19 @@ def parse_channels_only(source):
|
|||
logger.info("In finally block, ensuring cleanup")
|
||||
existing_tvg_ids = None
|
||||
existing_epgs = None
|
||||
try:
|
||||
process = psutil.Process()
|
||||
final_memory = process.memory_info().rss / 1024 / 1024
|
||||
logger.info(f"Final memory usage: {final_memory:.2f} MB")
|
||||
except:
|
||||
pass
|
||||
# Explicitly clear the process object to prevent potential memory leaks
|
||||
if 'process' in locals() and process is not None:
|
||||
process = None
|
||||
|
||||
# Check final memory usage after clearing process
|
||||
gc.collect()
|
||||
# Add comprehensive cleanup at end of channel parsing
|
||||
cleanup_memory(log_usage=True, force_collection=True)
|
||||
try:
|
||||
process = psutil.Process()
|
||||
final_memory = process.memory_info().rss / 1024 / 1024
|
||||
logger.info(f"[parse_channels_only] Final memory usage: {final_memory:.2f} MB")
|
||||
process = None
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@shared_task
|
||||
|
|
@ -943,9 +943,6 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
programs_processed = 0
|
||||
|
||||
try:
|
||||
# Create a parser with the desired options
|
||||
#parser = etree.XMLParser(huge_tree=True, remove_blank_text=True)
|
||||
|
||||
# Open the file properly
|
||||
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
|
||||
|
||||
|
|
@ -1093,48 +1090,38 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
programs_to_create = None
|
||||
custom_props = None
|
||||
custom_properties_json = None
|
||||
#del programs_to_create
|
||||
#programs_to_create = []
|
||||
|
||||
|
||||
# Final garbage collection
|
||||
gc.collect()
|
||||
|
||||
# One additional garbage collection specifically for lxml elements
|
||||
# which can sometimes be retained due to reference cycles
|
||||
gc.collect()
|
||||
|
||||
logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.")
|
||||
finally:
|
||||
# Reset internal caches and pools that lxml might be keeping
|
||||
try:
|
||||
etree.clear_error_log()
|
||||
except:
|
||||
pass
|
||||
|
||||
logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.")
|
||||
finally:
|
||||
# Explicit cleanup of all potentially large objects
|
||||
if source_file:
|
||||
try:
|
||||
source_file.close()
|
||||
except:
|
||||
pass
|
||||
# Memory tracking after processing
|
||||
if process:
|
||||
mem_after = process.memory_info().rss / 1024 / 1024
|
||||
logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 2 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)")
|
||||
source_file = None
|
||||
program_parser = None
|
||||
programs_to_create = None
|
||||
epg = None
|
||||
|
||||
epg_source = None
|
||||
# Explicitly clear the process object to prevent potential memory leaks
|
||||
if 'process' in locals() and process is not None:
|
||||
process = None
|
||||
# Force garbage collection before releasing lock
|
||||
gc.collect()
|
||||
|
||||
# Add comprehensive cleanup before releasing lock
|
||||
cleanup_memory(log_usage=True, force_collection=True)
|
||||
# Memory tracking after processing
|
||||
if process:
|
||||
mem_after = process.memory_info().rss / 1024 / 1024
|
||||
logger.info(f"[parse_programs_for_tvg_id] Final memory usage {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)")
|
||||
process = None
|
||||
epg = None
|
||||
programs_processed = None
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
|
||||
|
||||
|
|
@ -1172,11 +1159,6 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
channel_count = 0
|
||||
updated_count = 0
|
||||
processed = 0
|
||||
|
||||
# Memory check before batch processing
|
||||
if process:
|
||||
logger.info(f"[parse_programs_for_source] Memory before batch processing: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# Process in batches using cursor-based approach to limit memory usage
|
||||
last_id = 0
|
||||
while True:
|
||||
|
|
@ -1207,18 +1189,10 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True)
|
||||
failed_entries.append(f"{epg.tvg_id}: {str(e)}")
|
||||
|
||||
# Memory check after processing batch
|
||||
if process:
|
||||
logger.info(f"[parse_programs_for_source] Memory after processing batch: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# Force garbage collection after each batch
|
||||
batch_entries = None # Remove reference to help garbage collection
|
||||
gc.collect()
|
||||
|
||||
# Memory check after garbage collection
|
||||
if process:
|
||||
logger.info(f"[parse_programs_for_source] Memory after gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
||||
# If there were failures, include them in the message but continue
|
||||
if failed_entries:
|
||||
epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed
|
||||
|
|
@ -1265,11 +1239,7 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
return False
|
||||
finally:
|
||||
# Final memory cleanup and tracking
|
||||
if process:
|
||||
# Force garbage collection before measuring
|
||||
gc.collect()
|
||||
final_memory = process.memory_info().rss / 1024 / 1024
|
||||
logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB")
|
||||
|
||||
|
||||
# Explicitly release any remaining large data structures
|
||||
failed_entries = None
|
||||
|
|
@ -1279,14 +1249,13 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
processed = None
|
||||
gc.collect()
|
||||
|
||||
# Explicitly clear the process object to prevent potential memory leaks
|
||||
if 'process' in locals() and process is not None:
|
||||
process = None
|
||||
|
||||
# Add comprehensive memory cleanup at the end
|
||||
cleanup_memory(log_usage=True, force_collection=True)
|
||||
|
||||
|
||||
if process:
|
||||
final_memory = process.memory_info().rss / 1024 / 1024
|
||||
logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB difference: {final_memory - initial_memory:.2f} MB")
|
||||
# Explicitly clear the process object to prevent potential memory leaks
|
||||
process = None
|
||||
def fetch_schedules_direct(source):
|
||||
logger.info(f"Fetching Schedules Direct data from source: {source.name}")
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -254,6 +254,7 @@ def cleanup_memory(log_usage=True, force_collection=True):
|
|||
log_usage: Whether to log memory usage before and after cleanup
|
||||
force_collection: Whether to force garbage collection
|
||||
"""
|
||||
logger.debug("Starting memory cleanup django memory cleanup")
|
||||
if log_usage:
|
||||
try:
|
||||
import psutil
|
||||
|
|
@ -282,3 +283,4 @@ def cleanup_memory(log_usage=True, force_collection=True):
|
|||
logger.debug(f"Memory after cleanup: {after_mem:.2f} MB (change: {after_mem-before_mem:.2f} MB)")
|
||||
except (ImportError, Exception):
|
||||
pass
|
||||
logger.debug("Memory cleanup complete for django")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue