EPG processing enhancements. Celery memory management.

This commit is contained in:
SergeantPanda 2025-05-17 16:42:37 -05:00
parent 7fe618b037
commit 1174e2e0c7
6 changed files with 444 additions and 172 deletions

View file

@ -63,146 +63,162 @@ def match_epg_channels():
4) If a match is found, we set channel.tvg_id
5) Summarize and log results.
"""
logger.info("Starting EPG matching logic...")
# Attempt to retrieve a "preferred-region" if configured
try:
region_obj = CoreSettings.objects.get(key="preferred-region")
region_code = region_obj.value.strip().lower()
except CoreSettings.DoesNotExist:
region_code = None
logger.info("Starting EPG matching logic...")
matched_channels = []
channels_to_update = []
# Attempt to retrieve a "preferred-region" if configured
try:
region_obj = CoreSettings.objects.get(key="preferred-region")
region_code = region_obj.value.strip().lower()
except CoreSettings.DoesNotExist:
region_code = None
# Get channels that don't have EPG data assigned
channels_without_epg = Channel.objects.filter(epg_data__isnull=True)
logger.info(f"Found {channels_without_epg.count()} channels without EPG data")
matched_channels = []
channels_to_update = []
channels_json = []
for channel in channels_without_epg:
# Normalize TVG ID - strip whitespace and convert to lowercase
normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else ""
if normalized_tvg_id:
logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'")
# Get channels that don't have EPG data assigned
channels_without_epg = Channel.objects.filter(epg_data__isnull=True)
logger.info(f"Found {channels_without_epg.count()} channels without EPG data")
channels_json.append({
"id": channel.id,
"name": channel.name,
"tvg_id": normalized_tvg_id, # Use normalized TVG ID
"original_tvg_id": channel.tvg_id, # Keep original for reference
"fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name,
"norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name)
})
channels_json = []
for channel in channels_without_epg:
# Normalize TVG ID - strip whitespace and convert to lowercase
normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else ""
if normalized_tvg_id:
logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'")
# Similarly normalize EPG data TVG IDs
epg_json = []
for epg in EPGData.objects.all():
normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
epg_json.append({
'id': epg.id,
'tvg_id': normalized_tvg_id, # Use normalized TVG ID
'original_tvg_id': epg.tvg_id, # Keep original for reference
'name': epg.name,
'norm_name': normalize_name(epg.name),
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
})
channels_json.append({
"id": channel.id,
"name": channel.name,
"tvg_id": normalized_tvg_id, # Use normalized TVG ID
"original_tvg_id": channel.tvg_id, # Keep original for reference
"fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name,
"norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name)
})
# Log available EPG data TVG IDs for debugging
unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id'])
logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}")
# Similarly normalize EPG data TVG IDs
epg_json = []
for epg in EPGData.objects.all():
normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
epg_json.append({
'id': epg.id,
'tvg_id': normalized_tvg_id, # Use normalized TVG ID
'original_tvg_id': epg.tvg_id, # Keep original for reference
'name': epg.name,
'norm_name': normalize_name(epg.name),
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
})
payload = {
"channels": channels_json,
"epg_data": epg_json,
"region_code": region_code,
}
# Log available EPG data TVG IDs for debugging
unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id'])
logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}")
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(json.dumps(payload).encode('utf-8'))
temp_file_path = temp_file.name
process = subprocess.Popen(
['python', '/app/scripts/epg_match.py', temp_file_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Log stderr in real-time
for line in iter(process.stderr.readline, ''):
if line:
logger.info(line.strip())
process.stderr.close()
stdout, stderr = process.communicate()
os.remove(temp_file_path)
if process.returncode != 0:
return f"Failed to process EPG matching: {stderr}"
result = json.loads(stdout)
# This returns lists of dicts, not model objects
channels_to_update_dicts = result["channels_to_update"]
matched_channels = result["matched_channels"]
# Convert your dict-based 'channels_to_update' into real Channel objects
if channels_to_update_dicts:
# Extract IDs of the channels that need updates
channel_ids = [d["id"] for d in channels_to_update_dicts]
# Fetch them from DB
channels_qs = Channel.objects.filter(id__in=channel_ids)
channels_list = list(channels_qs)
# Build a map from channel_id -> epg_data_id (or whatever fields you need)
epg_mapping = {
d["id"]: d["epg_data_id"] for d in channels_to_update_dicts
payload = {
"channels": channels_json,
"epg_data": epg_json,
"region_code": region_code,
}
# Populate each Channel object with the updated epg_data_id
for channel_obj in channels_list:
# The script sets 'epg_data_id' in the returned dict
# We either assign directly, or fetch the EPGData instance if needed.
channel_obj.epg_data_id = epg_mapping.get(channel_obj.id)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(json.dumps(payload).encode('utf-8'))
temp_file_path = temp_file.name
# Now we have real model objects, so bulk_update will work
Channel.objects.bulk_update(channels_list, ["epg_data"])
# After writing to the file but before subprocess
# Explicitly delete the large data structures
del payload
gc.collect()
total_matched = len(matched_channels)
if total_matched:
logger.info(f"Match Summary: {total_matched} channel(s) matched.")
for (cid, cname, tvg) in matched_channels:
logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'")
else:
logger.info("No new channels were matched.")
process = subprocess.Popen(
['python', '/app/scripts/epg_match.py', temp_file_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
logger.info("Finished EPG matching logic.")
# Log stderr in real-time
for line in iter(process.stderr.readline, ''):
if line:
logger.info(line.strip())
# Send update with additional information for refreshing UI
channel_layer = get_channel_layer()
associations = [
{"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]}
for chan in channels_to_update_dicts
]
process.stderr.close()
stdout, stderr = process.communicate()
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {
"success": True,
"type": "epg_match",
"refresh_channels": True, # Flag to tell frontend to refresh channels
"matches_count": total_matched,
"message": f"EPG matching complete: {total_matched} channel(s) matched",
"associations": associations # Add the associations data
os.remove(temp_file_path)
if process.returncode != 0:
return f"Failed to process EPG matching: {stderr}"
result = json.loads(stdout)
# This returns lists of dicts, not model objects
channels_to_update_dicts = result["channels_to_update"]
matched_channels = result["matched_channels"]
# Explicitly clean up large objects
del stdout, result
gc.collect()
# Convert your dict-based 'channels_to_update' into real Channel objects
if channels_to_update_dicts:
# Extract IDs of the channels that need updates
channel_ids = [d["id"] for d in channels_to_update_dicts]
# Fetch them from DB
channels_qs = Channel.objects.filter(id__in=channel_ids)
channels_list = list(channels_qs)
# Build a map from channel_id -> epg_data_id (or whatever fields you need)
epg_mapping = {
d["id"]: d["epg_data_id"] for d in channels_to_update_dicts
}
}
)
return f"Done. Matched {total_matched} channel(s)."
# Populate each Channel object with the updated epg_data_id
for channel_obj in channels_list:
# The script sets 'epg_data_id' in the returned dict
# We either assign directly, or fetch the EPGData instance if needed.
channel_obj.epg_data_id = epg_mapping.get(channel_obj.id)
# Now we have real model objects, so bulk_update will work
Channel.objects.bulk_update(channels_list, ["epg_data"])
total_matched = len(matched_channels)
if total_matched:
logger.info(f"Match Summary: {total_matched} channel(s) matched.")
for (cid, cname, tvg) in matched_channels:
logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'")
else:
logger.info("No new channels were matched.")
logger.info("Finished EPG matching logic.")
# Send update with additional information for refreshing UI
channel_layer = get_channel_layer()
associations = [
{"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]}
for chan in channels_to_update_dicts
]
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {
"success": True,
"type": "epg_match",
"refresh_channels": True, # Flag to tell frontend to refresh channels
"matches_count": total_matched,
"message": f"EPG matching complete: {total_matched} channel(s) matched",
"associations": associations # Add the associations data
}
}
)
return f"Done. Matched {total_matched} channel(s)."
finally:
# Final cleanup
gc.collect()
# Force an even more aggressive cleanup
import gc
gc.collect(generation=2)
@shared_task

View file

@ -117,6 +117,8 @@ def refresh_all_epg_data():
for source in active_sources:
refresh_epg_data(source.id)
# Force garbage collection between sources
gc.collect()
logger.info("Finished refresh_epg_data task.")
return "EPG data refreshed."
@ -128,6 +130,7 @@ def refresh_epg_data(source_id):
logger.debug(f"EPG refresh for {source_id} already running")
return
source = None
try:
# Try to get the EPG source
try:
@ -144,12 +147,16 @@ def refresh_epg_data(source_id):
# Release the lock and exit
release_task_lock('refresh_epg_data', source_id)
# Force garbage collection before exit
gc.collect()
return f"EPG source {source_id} does not exist, task cleaned up"
# The source exists but is not active, just skip processing
if not source.is_active:
logger.info(f"EPG source {source_id} is not active. Skipping.")
release_task_lock('refresh_epg_data', source_id)
# Force garbage collection before exit
gc.collect()
return
# Continue with the normal processing...
@ -159,12 +166,16 @@ def refresh_epg_data(source_id):
if not fetch_success:
logger.error(f"Failed to fetch XMLTV for source {source.name}")
release_task_lock('refresh_epg_data', source_id)
# Force garbage collection before exit
gc.collect()
return
parse_channels_success = parse_channels_only(source)
if not parse_channels_success:
logger.error(f"Failed to parse channels for source {source.name}")
release_task_lock('refresh_epg_data', source_id)
# Force garbage collection before exit
gc.collect()
return
parse_programs_for_source(source)
@ -176,14 +187,18 @@ def refresh_epg_data(source_id):
except Exception as e:
logger.error(f"Error in refresh_epg_data for source {source_id}: {e}", exc_info=True)
try:
source = EPGSource.objects.get(id=source_id)
source.status = 'error'
source.last_message = f"Error refreshing EPG data: {str(e)}"
source.save(update_fields=['status', 'last_message'])
send_epg_update(source_id, "refresh", 100, status="error", error=str(e))
if source:
source.status = 'error'
source.last_message = f"Error refreshing EPG data: {str(e)}"
source.save(update_fields=['status', 'last_message'])
send_epg_update(source_id, "refresh", 100, status="error", error=str(e))
except Exception as inner_e:
logger.error(f"Error updating source status: {inner_e}")
finally:
# Clear references to ensure proper garbage collection
source = None
# Force garbage collection before releasing the lock
gc.collect()
release_task_lock('refresh_epg_data', source_id)
@ -191,7 +206,6 @@ def fetch_xmltv(source):
# Handle cases with local file but no URL
if not source.url and source.file_path and os.path.exists(source.file_path):
logger.info(f"Using existing local file for EPG source: {source.name} at {source.file_path}")
# Set the status to success in the database
source.status = 'success'
source.save(update_fields=['status'])
@ -350,6 +364,9 @@ def fetch_xmltv(source):
downloaded=f"{downloaded / (1024 * 1024):.2f} MB"
)
# Explicitly delete the chunk to free memory immediately
del chunk
# Send completion notification
send_epg_update(source.id, "downloading", 100)
@ -517,9 +534,14 @@ def parse_channels_only(source):
return False
file_path = new_path
logger.info(f"Parsing channels from EPG file: {file_path}")
# Add memory tracking at start
import psutil
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"Initial memory usage: {initial_memory:.2f} MB")
# Replace full dictionary load with more efficient lookup set
existing_tvg_ids = set()
existing_epgs = {} # Initialize the dictionary that will lazily load objects
@ -537,7 +559,7 @@ def parse_channels_only(source):
existing_tvg_ids.update(tvg_id_chunk)
last_id = EPGData.objects.filter(tvg_id__in=tvg_id_chunk).order_by('-id')[0].id
#time.sleep(20)
# Update progress to show file read starting
send_epg_update(source.id, "parsing_channels", 10)
@ -549,35 +571,49 @@ def parse_channels_only(source):
total_channels = 0
processed_channels = 0
batch_size = 500 # Process in batches to limit memory usage
progress = 0 # Initialize progress variable here
# Track memory at key points
logger.info(f"Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
try:
# Create a parser with the desired options
parser = etree.XMLParser(huge_tree=True, remove_blank_text=True)
#parser = etree.XMLParser(huge_tree=True, remove_blank_text=True)
# Count channels for progress reporting - use proper lxml approach
# Open the file first
logger.info(f"Opening file for initial channel count: {file_path}")
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
# Create an iterparse context without parser parameter
channel_finder = etree.iterparse(source_file, events=('end',), tag='channel')
logger.info(f"Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Count channels
try:
total_channels = EPGData.objects.filter(epg_source=source).count()
except:
logger.info(f"Found {total_channels} existing channels for this source")
except Exception as e:
logger.error(f"Error counting channels: {e}")
total_channels = 500 # Default estimate
# Close the file to reset position
logger.info(f"Closing initial file handle")
source_file.close()
logger.info(f"Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Update progress after counting
send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels)
# Reset file position for actual processing
logger.info(f"Re-opening file for channel parsing: {file_path}")
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
channel_parser = etree.iterparse(source_file, events=('end',), tag='channel')
logger.info(f"Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
logger.info(f"Creating iterparse context")
channel_parser = etree.iterparse(source_file, events=('end',), tag='channel')
logger.info(f"Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB")
channel_count = 0
for _, elem in channel_parser:
channel_count += 1
tvg_id = elem.get('id', '').strip()
if tvg_id:
display_name = None
@ -620,10 +656,13 @@ def parse_channels_only(source):
# Batch processing
if len(epgs_to_create) >= batch_size:
logger.info(f"Bulk creating {len(epgs_to_create)} EPG entries")
EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True)
logger.info(f"Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB")
del epgs_to_create # Explicit deletion
epgs_to_create = []
# Force garbage collection
gc.collect()
logger.info(f"Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB")
if len(epgs_to_update) >= batch_size:
EPGData.objects.bulk_update(epgs_to_update, ["name"])
@ -633,8 +672,10 @@ def parse_channels_only(source):
# Periodically clear the existing_epgs cache to prevent memory buildup
if processed_channels % 1000 == 0:
logger.info(f"Clearing existing_epgs cache at {processed_channels} channels")
existing_epgs.clear()
gc.collect()
logger.info(f"Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Send progress updates
if processed_channels % 100 == 0 or processed_channels == total_channels:
@ -646,17 +687,38 @@ def parse_channels_only(source):
processed=processed_channels,
total=total_channels
)
logger.debug(f"Processed channel: {tvg_id} - {display_name}")
# Clear memory
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
# Make sure to close the file
# Check if we should break early to avoid excessive sleep
if processed_channels >= total_channels and total_channels > 0:
logger.info(f"Breaking channel processing loop - processed {processed_channels}/{total_channels}")
break
# Explicit cleanup before sleeping
logger.info(f"Completed channel parsing loop, processed {processed_channels} channels")
logger.info(f"Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Explicit cleanup of the parser
del channel_parser
logger.info(f"Deleted channel_parser object")
# Close the file
logger.info(f"Closing file: {file_path}")
source_file.close()
logger.info(f"File closed: {file_path}")
# Force garbage collection
gc.collect()
logger.info(f"Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Remove long sleep that might be causing issues
# time.sleep(200) # This seems excessive and may be causing issues
except (etree.XMLSyntaxError, Exception) as xml_error:
# Instead of falling back, just handle the error
logger.error(f"XML parsing failed: {xml_error}")
# Update status to error
source.status = 'error'
@ -668,12 +730,16 @@ def parse_channels_only(source):
# Process any remaining items
if epgs_to_create:
EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True)
logger.info(f"Created final batch of {len(epgs_to_create)} EPG entries")
if epgs_to_update:
EPGData.objects.bulk_update(epgs_to_update, ["name"])
logger.info(f"Updated final batch of {len(epgs_to_update)} EPG entries")
# Final garbage collection
# Final garbage collection and memory tracking
logger.info(f"Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
gc.collect()
logger.info(f"Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Update source status with channel count
source.status = 'success'
@ -699,6 +765,8 @@ def parse_channels_only(source):
)
logger.info(f"Finished parsing channel info. Found {processed_channels} channels.")
# Remove excessive sleep
# time.sleep(20)
return True
except FileNotFoundError:
@ -718,9 +786,19 @@ def parse_channels_only(source):
send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(e))
return False
finally:
# Add more detailed cleanup in finally block
logger.info("In finally block, ensuring cleanup")
existing_tvg_ids = None
existing_epgs = None
gc.collect()
# Check final memory usage
try:
import psutil
process = psutil.Process()
final_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"Final memory usage: {final_memory:.2f} MB")
except:
pass
@shared_task
@ -729,7 +807,22 @@ def parse_programs_for_tvg_id(epg_id):
logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task")
return "Task already running"
source_file = None
program_parser = None
programs_to_create = None
epg = None
epg_source = None
try:
# Add memory tracking
try:
import psutil
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_tvg_id] Initial memory usage: {initial_memory:.2f} MB")
except ImportError:
process = None
epg = EPGData.objects.get(id=epg_id)
epg_source = epg.epg_source
@ -756,12 +849,25 @@ def parse_programs_for_tvg_id(epg_id):
if not id_batch:
break
# Store the last ID before deleting the batch variable
if id_batch:
max_id = id_batch[-1]
else:
max_id = 0
# Delete this batch
ProgramData.objects.filter(id__in=id_batch).delete()
# Release memory immediately
del id_batch
gc.collect()
# Update last_id for next iteration
last_id = id_batch[-1] if id_batch else 0
# Update last_id for next iteration using our stored value
last_id = max_id
# Explicitly delete query objects
del programs_to_delete
del last_id
gc.collect()
file_path = epg_source.file_path
if not file_path:
@ -820,17 +926,23 @@ def parse_programs_for_tvg_id(epg_id):
logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}")
# Memory usage tracking
if process:
mem_before = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_tvg_id] Memory before parsing: {mem_before:.2f} MB")
programs_to_create = []
batch_size = 1000 # Process in batches to limit memory usage
programs_processed = 0
try:
# Create a parser with the desired options
parser = etree.XMLParser(huge_tree=True, remove_blank_text=True)
#parser = etree.XMLParser(huge_tree=True, remove_blank_text=True)
# Open the file properly
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
# Stream parse the file using lxml's iterparse (without parser parameter)
# Stream parse the file using lxml's iterparse
program_parser = etree.iterparse(source_file, events=('end',), tag='programme')
for _, elem in program_parser:
@ -858,6 +970,7 @@ def parse_programs_for_tvg_id(epg_id):
custom_props = extract_custom_properties(elem)
custom_properties_json = None
if custom_props:
logger.debug(f"Number of custom properties: {len(custom_props)}")
try:
custom_properties_json = json.dumps(custom_props)
except Exception as e:
@ -874,12 +987,19 @@ def parse_programs_for_tvg_id(epg_id):
custom_properties=custom_properties_json
))
programs_processed += 1
custom_props = None
custom_properties_json = None
# Batch processing
if len(programs_to_create) >= batch_size:
ProgramData.objects.bulk_create(programs_to_create)
logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}")
del programs_to_create # Explicit deletion
programs_to_create = []
# Force garbage collection after batch processing
# Force more aggressive garbage collection
custom_props = None
custom_properties_json = None
gc.collect()
except Exception as e:
@ -891,8 +1011,17 @@ def parse_programs_for_tvg_id(epg_id):
while elem.getprevious() is not None:
del elem.getparent()[0]
# Make sure to close the file
source_file.close()
# Make sure to close the file and release parser resources
if source_file:
source_file.close()
source_file = None
if program_parser:
program_parser = None
# Free parser memory
parser = None
gc.collect()
except etree.XMLSyntaxError as xml_error:
logger.error(f"XML syntax error parsing program data: {xml_error}")
@ -900,17 +1029,54 @@ def parse_programs_for_tvg_id(epg_id):
except Exception as e:
logger.error(f"Error parsing XML for programs: {e}", exc_info=True)
raise
finally:
# Ensure file is closed even if an exception occurs
if source_file:
source_file.close()
source_file = None
# Process any remaining items
if programs_to_create:
ProgramData.objects.bulk_create(programs_to_create)
logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}")
del programs_to_create
programs_to_create = []
# Memory tracking after processing
if process:
mem_after = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_tvg_id] Memory after parsing {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)")
# Final garbage collection
gc.collect()
# One additional garbage collection specifically for lxml elements
# which can sometimes be retained due to reference cycles
gc.collect()
# Reset internal caches and pools that lxml might be keeping
try:
etree.clear_error_log()
except:
pass
logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.")
finally:
# Explicit cleanup of all potentially large objects
if source_file:
try:
source_file.close()
except:
pass
source_file = None
program_parser = None
programs_to_create = None
epg = None
epg_source = None
# Force garbage collection before releasing lock
gc.collect()
release_task_lock('parse_epg_programs', epg_id)
@ -918,6 +1084,16 @@ def parse_programs_for_source(epg_source, tvg_id=None):
# Send initial programs parsing notification
send_epg_update(epg_source.id, "parsing_programs", 0)
# Add memory tracking
try:
import psutil
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_source] Initial memory usage: {initial_memory:.2f} MB")
except ImportError:
logger.warning("psutil not available for memory tracking")
process = None
try:
# Process EPG entries in batches rather than all at once
batch_size = 20 # Process fewer channels at once to reduce memory usage
@ -939,6 +1115,10 @@ def parse_programs_for_source(epg_source, tvg_id=None):
updated_count = 0
processed = 0
# Memory check before batch processing
if process:
logger.info(f"[parse_programs_for_source] Memory before batch processing: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Process in batches using cursor-based approach to limit memory usage
last_id = 0
while True:
@ -969,10 +1149,18 @@ def parse_programs_for_source(epg_source, tvg_id=None):
logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True)
failed_entries.append(f"{epg.tvg_id}: {str(e)}")
# Memory check after processing batch
if process:
logger.info(f"[parse_programs_for_source] Memory after processing batch: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Force garbage collection after each batch
batch_entries = None # Remove reference to help garbage collection
gc.collect()
# Memory check after garbage collection
if process:
logger.info(f"[parse_programs_for_source] Memory after gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# If there were failures, include them in the message but continue
if failed_entries:
epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed
@ -986,6 +1174,11 @@ def parse_programs_for_source(epg_source, tvg_id=None):
send_epg_update(epg_source.id, "parsing_programs", 100,
status="success",
message=epg_source.last_message)
# Explicitly release memory of large lists before returning
del failed_entries
gc.collect()
return True
# If all successful, set a comprehensive success message
@ -1012,6 +1205,21 @@ def parse_programs_for_source(epg_source, tvg_id=None):
status="error",
message=epg_source.last_message)
return False
finally:
# Final memory cleanup and tracking
if process:
# Force garbage collection before measuring
gc.collect()
final_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB")
# Explicitly release any remaining large data structures
failed_entries = None
program_count = None
channel_count = None
updated_count = None
processed = None
gc.collect()
def fetch_schedules_direct(source):
@ -1118,13 +1326,11 @@ def parse_schedules_direct_time(time_str):
# Helper function to extract custom properties - moved to a separate function to clean up the code
def extract_custom_properties(prog):
# Create a new dictionary for each call
custom_props = {}
# Extract categories
categories = []
for cat_elem in prog.findall('category'):
if cat_elem.text and cat_elem.text.strip():
categories.append(cat_elem.text.strip())
# Extract categories with a single comprehension to reduce intermediate objects
categories = [cat.text.strip() for cat in prog.findall('category') if cat.text and cat.text.strip()]
if categories:
custom_props['categories'] = categories
@ -1151,25 +1357,23 @@ def extract_custom_properties(prog):
# Just store the raw onscreen format
custom_props['onscreen_episode'] = ep_num.text.strip()
# Extract ratings
for rating_elem in prog.findall('rating'):
# Extract ratings more efficiently
rating_elem = prog.find('rating')
if rating_elem is not None:
value_elem = rating_elem.find('value')
if value_elem is not None and value_elem.text:
custom_props['rating'] = value_elem.text.strip()
if rating_elem.get('system'):
custom_props['rating_system'] = rating_elem.get('system')
break # Just use the first rating
# Extract credits (actors, directors, etc.)
# Extract credits more efficiently
credits_elem = prog.find('credits')
if credits_elem is not None:
credits = {}
for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']:
elements = credits_elem.findall(credit_type)
if elements:
names = [e.text.strip() for e in elements if e.text and e.text.strip()]
if names:
credits[credit_type] = names
names = [e.text.strip() for e in credits_elem.findall(credit_type) if e.text and e.text.strip()]
if names:
credits[credit_type] = names
if credits:
custom_props['credits'] = credits
@ -1182,11 +1386,11 @@ def extract_custom_properties(prog):
if country_elem is not None and country_elem.text:
custom_props['country'] = country_elem.text.strip()
for icon_elem in prog.findall('icon'):
if icon_elem.get('src'):
custom_props['icon'] = icon_elem.get('src')
break # Just use the first icon
icon_elem = prog.find('icon')
if icon_elem is not None and icon_elem.get('src'):
custom_props['icon'] = icon_elem.get('src')
# Simpler approach for boolean flags
for kw in ['previously-shown', 'premiere', 'new']:
if prog.find(kw) is not None:
custom_props[kw.replace('-', '_')] = True

View file

@ -36,11 +36,6 @@ LOG_THROTTLE_SECONDS = 300 # 5 minutes
# Track if this is the first scan since startup
_first_scan_completed = False
@shared_task
def beat_periodic_task():
fetch_channel_stats()
scan_and_process_files()
def throttled_log(logger_method, message, key=None, *args, **kwargs):
"""Only log messages with the same key once per throttle period"""
if key is None:
@ -52,6 +47,32 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs):
logger_method(message, *args, **kwargs)
_last_log_times[key] = now
def clear_memory():
"""Force aggressive garbage collection to free memory"""
import gc
# Run full garbage collection
gc.collect(generation=2)
# Find and break any reference cycles
gc.collect(generation=0)
# Clear any cached objects in memory
gc.collect(generation=1)
# Check if psutil is available for more advanced monitoring
try:
import psutil
process = psutil.Process()
if hasattr(process, 'memory_info'):
mem = process.memory_info().rss / (1024 * 1024)
logger.debug(f"Memory usage after cleanup: {mem:.2f} MB")
except (ImportError, Exception):
pass
@shared_task
def beat_periodic_task():
fetch_channel_stats()
scan_and_process_files()
# Call memory cleanup after completing tasks
clear_memory()
@shared_task
def scan_and_process_files():
global _first_scan_completed
@ -270,6 +291,9 @@ def scan_and_process_files():
# Mark that the first scan is complete
_first_scan_completed = True
# Force memory cleanup
clear_memory()
def fetch_channel_stats():
redis_client = RedisClient.get_client()

View file

@ -2,6 +2,7 @@
import os
from celery import Celery
import logging
from celery.signals import task_postrun # Add import for signals
# Initialize with defaults before Django settings are loaded
DEFAULT_LOG_LEVEL = 'DEBUG'
@ -48,6 +49,24 @@ app.conf.update(
worker_task_log_format='%(asctime)s %(levelname)s %(task_name)s: %(message)s',
)
# Add memory cleanup after task completion
@task_postrun.connect # Use the imported signal
def cleanup_task_memory(**kwargs):
"""Clean up memory after each task completes"""
import gc
# Force garbage collection
gc.collect()
# Log memory usage if psutil is installed
try:
import psutil
process = psutil.Process()
if hasattr(process, 'memory_info'):
mem = process.memory_info().rss / (1024 * 1024)
print(f"Memory usage after task: {mem:.2f} MB")
except (ImportError, Exception):
pass
@app.on_after_configure.connect
def setup_celery_logging(**kwargs):
# Use our directly determined log level

View file

@ -199,6 +199,15 @@ CELERY_BROKER_TRANSPORT_OPTIONS = {
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# Memory management settings
#CELERY_WORKER_MAX_TASKS_PER_CHILD = 10 # Restart worker after 10 tasks to free memory
#CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Don't prefetch tasks - process one at a time
#CELERY_TASK_ACKS_LATE = True # Only acknowledge tasks after they're processed
#CELERY_TASK_TIME_LIMIT = 3600 # 1 hour time limit per task
#CELERY_TASK_SOFT_TIME_LIMIT = 3540 # Soft limit 60 seconds before hard limit
#CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True # Cancel tasks if connection lost
#CELERY_TASK_IGNORE_RESULT = True # Don't store results unless explicitly needed
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler"
CELERY_BEAT_SCHEDULE = {
'fetch-channel-statuses': {

View file

@ -8,7 +8,7 @@ exec-before = python /app/scripts/wait_for_redis.py
; Start Redis first
attach-daemon = redis-server
; Then start other services
attach-daemon = celery -A dispatcharr worker
attach-daemon = celery -A dispatcharr worker --concurrency=4
attach-daemon = celery -A dispatcharr beat
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
attach-daemon = cd /app/frontend && npm run dev