perf: optimize EPG program parsing and implement atomic database updates to reduce I/O overhead and prevent partial data visibility

This commit is contained in:
SergeantPanda 2025-12-04 14:57:57 -06:00
parent 256ac2f55a
commit 5693ee7f9e
2 changed files with 62 additions and 43 deletions

View file

@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- **Performance**: EPG program parsing optimized for sources with many channels but only a fraction mapped. Now parses XML file once per source instead of once per channel, dramatically reducing I/O and CPU overhead. For sources with 10,000 channels and 100 mapped, this results in ~99x fewer file opens and ~100x fewer full file scans. Orphaned programs for unmapped channels are also cleaned up during refresh to prevent database bloat.
- **Performance**: EPG program parsing optimized for sources with many channels but only a fraction mapped. Now parses XML file once per source instead of once per channel, dramatically reducing I/O and CPU overhead. For sources with 10,000 channels and 100 mapped, this results in ~99x fewer file opens and ~100x fewer full file scans. Orphaned programs for unmapped channels are also cleaned up during refresh to prevent database bloat. Database updates are now atomic to prevent clients from seeing empty/partial EPG data during refresh.
- IPv6 access now allowed by default with all IPv6 CIDRs accepted - Thanks [@adrianmace](https://github.com/adrianmace)
- nginx.conf updated to bind to both IPv4 and IPv6 ports - Thanks [@jordandalley](https://github.com/jordandalley)

View file

@ -1497,28 +1497,13 @@ def parse_programs_for_source(epg_source, tvg_id=None):
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided")
return False
# Delete existing programs for all mapped EPGs in one query
logger.info(f"Deleting existing programs for {mapped_count} mapped EPG entries...")
deleted_count = ProgramData.objects.filter(epg_id__in=mapped_epg_ids).delete()[0]
logger.info(f"Deleted {deleted_count} existing programs")
# Clean up orphaned programs for unmapped EPG entries
# These accumulate if a channel is unmapped after being mapped
unmapped_epg_ids = EPGData.objects.filter(
epg_source=epg_source
).exclude(id__in=mapped_epg_ids).values_list('id', flat=True)
if unmapped_epg_ids:
orphaned_count = ProgramData.objects.filter(epg_id__in=unmapped_epg_ids).delete()[0]
if orphaned_count > 0:
logger.info(f"Cleaned up {orphaned_count} orphaned programs for {len(unmapped_epg_ids)} unmapped EPG entries")
# SINGLE PASS PARSING: Parse the XML file once and process all mapped channels
programs_to_create = []
# SINGLE PASS PARSING: Parse the XML file once and collect all programs in memory
# We parse FIRST, then do an atomic delete+insert to avoid race conditions
# where clients might see empty/partial EPG data during the transition
all_programs_to_create = []
programs_by_channel = {tvg_id: 0 for tvg_id in mapped_tvg_ids} # Track count per channel
total_programs = 0
skipped_programs = 0
batch_size = 1000
last_progress_update = 0
try:
@ -1563,7 +1548,7 @@ def parse_programs_for_source(epg_source, tvg_id=None):
custom_properties_json = custom_props if custom_props else None
epg_id = tvg_id_to_epg_id[channel_id]
programs_to_create.append(ProgramData(
all_programs_to_create.append(ProgramData(
epg_id=epg_id,
start_time=start_time,
end_time=end_time,
@ -1579,35 +1564,23 @@ def parse_programs_for_source(epg_source, tvg_id=None):
# Clear the element to free memory
clear_element(elem)
# Batch processing
if len(programs_to_create) >= batch_size:
ProgramData.objects.bulk_create(programs_to_create)
logger.debug(f"Saved batch of {len(programs_to_create)} programs (total: {total_programs})")
programs_to_create = []
# Send progress update (estimate based on programs processed)
if total_programs - last_progress_update >= 5000:
last_progress_update = total_programs
# Cap at 70% during parsing phase (save 30% for DB operations)
progress = min(70, 10 + int((total_programs / max(total_programs + 10000, 1)) * 60))
send_epg_update(epg_source.id, "parsing_programs", progress,
processed=total_programs, channels=mapped_count)
# Send progress update (estimate based on programs processed)
# We don't know total programs upfront, so use a rough estimate
if total_programs - last_progress_update >= 5000:
last_progress_update = total_programs
# Cap at 90% until we're done
progress = min(90, 10 + int((total_programs / max(total_programs + 10000, 1)) * 80))
send_epg_update(epg_source.id, "parsing_programs", progress,
processed=total_programs, channels=mapped_count)
# Periodic garbage collection
if total_programs % (batch_size * 5) == 0:
gc.collect()
# Periodic garbage collection during parsing
if total_programs % 5000 == 0:
gc.collect()
except Exception as e:
logger.error(f"Error processing program for {channel_id}: {e}", exc_info=True)
clear_element(elem)
continue
# Process any remaining items
if programs_to_create:
ProgramData.objects.bulk_create(programs_to_create)
logger.debug(f"Saved final batch of {len(programs_to_create)} programs")
except etree.XMLSyntaxError as xml_error:
logger.error(f"XML syntax error parsing program data: {xml_error}")
epg_source.status = EPGSource.STATUS_ERROR
@ -1622,6 +1595,52 @@ def parse_programs_for_source(epg_source, tvg_id=None):
if source_file:
source_file.close()
source_file = None
# Now perform atomic delete + bulk insert
# This ensures clients never see empty/partial EPG data
logger.info(f"Parsed {total_programs} programs, performing atomic database update...")
send_epg_update(epg_source.id, "parsing_programs", 75, message="Updating database...")
batch_size = 1000
try:
with transaction.atomic():
# Delete existing programs for mapped EPGs
deleted_count = ProgramData.objects.filter(epg_id__in=mapped_epg_ids).delete()[0]
logger.debug(f"Deleted {deleted_count} existing programs")
# Clean up orphaned programs for unmapped EPG entries
unmapped_epg_ids = list(EPGData.objects.filter(
epg_source=epg_source
).exclude(id__in=mapped_epg_ids).values_list('id', flat=True))
if unmapped_epg_ids:
orphaned_count = ProgramData.objects.filter(epg_id__in=unmapped_epg_ids).delete()[0]
if orphaned_count > 0:
logger.info(f"Cleaned up {orphaned_count} orphaned programs for {len(unmapped_epg_ids)} unmapped EPG entries")
# Bulk insert all new programs in batches within the same transaction
for i in range(0, len(all_programs_to_create), batch_size):
batch = all_programs_to_create[i:i + batch_size]
ProgramData.objects.bulk_create(batch)
# Update progress during insertion
progress = 75 + int((i / len(all_programs_to_create)) * 20) if all_programs_to_create else 95
if i % (batch_size * 5) == 0:
send_epg_update(epg_source.id, "parsing_programs", min(95, progress),
message=f"Inserting programs... {i}/{len(all_programs_to_create)}")
logger.info(f"Atomic update complete: deleted {deleted_count}, inserted {total_programs} programs")
except Exception as db_error:
logger.error(f"Database error during atomic update: {db_error}", exc_info=True)
epg_source.status = EPGSource.STATUS_ERROR
epg_source.last_message = f"Database error: {str(db_error)}"
epg_source.save(update_fields=['status', 'last_message'])
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", message=str(db_error))
return False
finally:
# Clear the large list to free memory
all_programs_to_create = None
gc.collect()
# Count channels that actually got programs