forked from Mirrors/Dispatcharr
perf: optimize EPG program parsing for multi-channel sources
Dramatically improve EPG refresh performance by parsing the XML file once per source instead of once per channel. The new implementation: - Pre-filters to only process EPG entries mapped to actual channels - Parses the entire XML file in a single pass - Uses O(1) set lookups to skip unmapped channel programmes - Skips non-mapped channels entirely with minimal overhead For EPG sources with many channels but few mapped (e.g., 10,000 channels with 100 mapped to channels), this provides approximately: - 99% reduction in file open operations - 99% reduction in XML file scans - Proportional reduction in CPU and I/O overhead The parse_programs_for_tvg_id() function is retained for single-channel use cases (e.g., when a new channel is mapped via signals). Fixes inefficient repeated file parsing that was occurring with large EPG sources.
This commit is contained in:
parent
2de6ac5da1
commit
2a8ba9125c
2 changed files with 209 additions and 71 deletions
|
|
@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
### Changed
|
||||
|
||||
- **Performance**: EPG program parsing optimized for sources with many channels but only a fraction mapped. Now parses XML file once per source instead of once per channel, dramatically reducing I/O and CPU overhead. For sources with 10,000 channels and 100 mapped, this results in ~99x fewer file opens and ~100x fewer full file scans.
|
||||
- IPv6 access now allowed by default with all IPv6 CIDRs accepted - Thanks [@adrianmace](https://github.com/adrianmace)
|
||||
- nginx.conf updated to bind to both IPv4 and IPv6 ports - Thanks [@jordandalley](https://github.com/jordandalley)
|
||||
|
||||
|
|
|
|||
|
|
@ -1393,11 +1393,23 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
|
||||
|
||||
def parse_programs_for_source(epg_source, tvg_id=None):
|
||||
"""
|
||||
Parse programs for all MAPPED channels from an EPG source in a single pass.
|
||||
|
||||
This is an optimized version that:
|
||||
1. Only processes EPG entries that are actually mapped to channels
|
||||
2. Parses the XML file ONCE instead of once per channel
|
||||
3. Skips programmes for unmapped channels entirely during parsing
|
||||
|
||||
This dramatically improves performance when an EPG source has many channels
|
||||
but only a fraction are mapped.
|
||||
"""
|
||||
# Send initial programs parsing notification
|
||||
send_epg_update(epg_source.id, "parsing_programs", 0)
|
||||
should_log_memory = False
|
||||
process = None
|
||||
initial_memory = 0
|
||||
source_file = None
|
||||
|
||||
# Add memory tracking only in trace mode or higher
|
||||
try:
|
||||
|
|
@ -1417,82 +1429,199 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
should_log_memory = False
|
||||
|
||||
try:
|
||||
# Process EPG entries in batches rather than all at once
|
||||
batch_size = 20 # Process fewer channels at once to reduce memory usage
|
||||
epg_count = EPGData.objects.filter(epg_source=epg_source).count()
|
||||
# Only get EPG entries that are actually mapped to channels
|
||||
mapped_epg_ids = set(
|
||||
Channel.objects.filter(
|
||||
epg_data__epg_source=epg_source,
|
||||
epg_data__isnull=False
|
||||
).values_list('epg_data_id', flat=True)
|
||||
)
|
||||
|
||||
if epg_count == 0:
|
||||
logger.info(f"No EPG entries found for source: {epg_source.name}")
|
||||
# Update status - this is not an error, just no entries
|
||||
if not mapped_epg_ids:
|
||||
total_epg_count = EPGData.objects.filter(epg_source=epg_source).count()
|
||||
logger.info(f"No channels mapped to any EPG entries from source: {epg_source.name} "
|
||||
f"(source has {total_epg_count} EPG entries, 0 mapped)")
|
||||
# Update status - this is not an error, just no mapped entries
|
||||
epg_source.status = 'success'
|
||||
epg_source.save(update_fields=['status'])
|
||||
epg_source.last_message = f"No channels mapped to this EPG source ({total_epg_count} entries available)"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="success")
|
||||
return True
|
||||
|
||||
logger.info(f"Parsing programs for {epg_count} EPG entries from source: {epg_source.name}")
|
||||
# Get the mapped EPG entries with their tvg_ids
|
||||
mapped_epgs = EPGData.objects.filter(id__in=mapped_epg_ids).values('id', 'tvg_id')
|
||||
tvg_id_to_epg_id = {epg['tvg_id']: epg['id'] for epg in mapped_epgs if epg['tvg_id']}
|
||||
mapped_tvg_ids = set(tvg_id_to_epg_id.keys())
|
||||
|
||||
failed_entries = []
|
||||
program_count = 0
|
||||
channel_count = 0
|
||||
updated_count = 0
|
||||
processed = 0
|
||||
# Process in batches using cursor-based approach to limit memory usage
|
||||
last_id = 0
|
||||
while True:
|
||||
# Get a batch of EPG entries
|
||||
batch_entries = list(EPGData.objects.filter(
|
||||
epg_source=epg_source,
|
||||
id__gt=last_id
|
||||
).order_by('id')[:batch_size])
|
||||
total_epg_count = EPGData.objects.filter(epg_source=epg_source).count()
|
||||
mapped_count = len(mapped_tvg_ids)
|
||||
|
||||
if not batch_entries:
|
||||
break # No more entries to process
|
||||
logger.info(f"Parsing programs for {mapped_count} MAPPED channels from source: {epg_source.name} "
|
||||
f"(skipping {total_epg_count - mapped_count} unmapped EPG entries)")
|
||||
|
||||
# Update last_id for next iteration
|
||||
last_id = batch_entries[-1].id
|
||||
# Get the file path
|
||||
file_path = epg_source.extracted_file_path if epg_source.extracted_file_path else epg_source.file_path
|
||||
if not file_path:
|
||||
file_path = epg_source.get_cache_file()
|
||||
|
||||
# Process this batch
|
||||
for epg in batch_entries:
|
||||
if epg.tvg_id:
|
||||
try:
|
||||
result = parse_programs_for_tvg_id(epg.id)
|
||||
if result == "Task already running":
|
||||
logger.info(f"Program parse for {epg.id} already in progress, skipping")
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"EPG file not found at: {file_path}")
|
||||
|
||||
processed += 1
|
||||
progress = min(95, int((processed / epg_count) * 100)) if epg_count > 0 else 50
|
||||
send_epg_update(epg_source.id, "parsing_programs", progress)
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True)
|
||||
failed_entries.append(f"{epg.tvg_id}: {str(e)}")
|
||||
if epg_source.url:
|
||||
# Update the file path in the database
|
||||
new_path = epg_source.get_cache_file()
|
||||
logger.info(f"Updating file_path from '{file_path}' to '{new_path}'")
|
||||
epg_source.file_path = new_path
|
||||
epg_source.save(update_fields=['file_path'])
|
||||
logger.info(f"Fetching new EPG data from URL: {epg_source.url}")
|
||||
|
||||
# Force garbage collection after each batch
|
||||
batch_entries = None # Remove reference to help garbage collection
|
||||
# Fetch new data before continuing
|
||||
fetch_success = fetch_xmltv(epg_source)
|
||||
|
||||
if not fetch_success:
|
||||
logger.error(f"Failed to fetch EPG data for source: {epg_source.name}")
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"Failed to download EPG data"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file")
|
||||
return False
|
||||
|
||||
# Update file_path with the new location
|
||||
file_path = epg_source.extracted_file_path if epg_source.extracted_file_path else epg_source.file_path
|
||||
else:
|
||||
logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data")
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"No URL provided, cannot fetch EPG data"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided")
|
||||
return False
|
||||
|
||||
# Delete existing programs for all mapped EPGs in one query
|
||||
logger.info(f"Deleting existing programs for {mapped_count} mapped EPG entries...")
|
||||
deleted_count = ProgramData.objects.filter(epg_id__in=mapped_epg_ids).delete()[0]
|
||||
logger.info(f"Deleted {deleted_count} existing programs")
|
||||
|
||||
# SINGLE PASS PARSING: Parse the XML file once and process all mapped channels
|
||||
programs_to_create = []
|
||||
programs_by_channel = {tvg_id: 0 for tvg_id in mapped_tvg_ids} # Track count per channel
|
||||
total_programs = 0
|
||||
skipped_programs = 0
|
||||
batch_size = 1000
|
||||
last_progress_update = 0
|
||||
|
||||
try:
|
||||
logger.debug(f"Opening file for single-pass parsing: {file_path}")
|
||||
source_file = open(file_path, 'rb')
|
||||
|
||||
# Stream parse the file using lxml's iterparse
|
||||
program_parser = etree.iterparse(source_file, events=('end',), tag='programme', remove_blank_text=True, recover=True)
|
||||
|
||||
for _, elem in program_parser:
|
||||
channel_id = elem.get('channel')
|
||||
|
||||
# Skip programmes for unmapped channels immediately
|
||||
if channel_id not in mapped_tvg_ids:
|
||||
skipped_programs += 1
|
||||
# Clear element to free memory
|
||||
clear_element(elem)
|
||||
continue
|
||||
|
||||
# This programme is for a mapped channel - process it
|
||||
try:
|
||||
start_time = parse_xmltv_time(elem.get('start'))
|
||||
end_time = parse_xmltv_time(elem.get('stop'))
|
||||
title = None
|
||||
desc = None
|
||||
sub_title = None
|
||||
|
||||
# Efficiently process child elements
|
||||
for child in elem:
|
||||
if child.tag == 'title':
|
||||
title = child.text or 'No Title'
|
||||
elif child.tag == 'desc':
|
||||
desc = child.text or ''
|
||||
elif child.tag == 'sub-title':
|
||||
sub_title = child.text or ''
|
||||
|
||||
if not title:
|
||||
title = 'No Title'
|
||||
|
||||
# Extract custom properties
|
||||
custom_props = extract_custom_properties(elem)
|
||||
custom_properties_json = custom_props if custom_props else None
|
||||
|
||||
epg_id = tvg_id_to_epg_id[channel_id]
|
||||
programs_to_create.append(ProgramData(
|
||||
epg_id=epg_id,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
title=title,
|
||||
description=desc,
|
||||
sub_title=sub_title,
|
||||
tvg_id=channel_id,
|
||||
custom_properties=custom_properties_json
|
||||
))
|
||||
total_programs += 1
|
||||
programs_by_channel[channel_id] += 1
|
||||
|
||||
# Clear the element to free memory
|
||||
clear_element(elem)
|
||||
|
||||
# Batch processing
|
||||
if len(programs_to_create) >= batch_size:
|
||||
ProgramData.objects.bulk_create(programs_to_create)
|
||||
logger.debug(f"Saved batch of {len(programs_to_create)} programs (total: {total_programs})")
|
||||
programs_to_create = []
|
||||
|
||||
# Send progress update (estimate based on programs processed)
|
||||
# We don't know total programs upfront, so use a rough estimate
|
||||
if total_programs - last_progress_update >= 5000:
|
||||
last_progress_update = total_programs
|
||||
# Cap at 90% until we're done
|
||||
progress = min(90, 10 + int((total_programs / max(total_programs + 10000, 1)) * 80))
|
||||
send_epg_update(epg_source.id, "parsing_programs", progress,
|
||||
processed=total_programs, channels=mapped_count)
|
||||
|
||||
# Periodic garbage collection
|
||||
if total_programs % (batch_size * 5) == 0:
|
||||
gc.collect()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing program for {channel_id}: {e}", exc_info=True)
|
||||
clear_element(elem)
|
||||
continue
|
||||
|
||||
# Process any remaining items
|
||||
if programs_to_create:
|
||||
ProgramData.objects.bulk_create(programs_to_create)
|
||||
logger.debug(f"Saved final batch of {len(programs_to_create)} programs")
|
||||
|
||||
except etree.XMLSyntaxError as xml_error:
|
||||
logger.error(f"XML syntax error parsing program data: {xml_error}")
|
||||
epg_source.status = EPGSource.STATUS_ERROR
|
||||
epg_source.last_message = f"XML parsing error: {str(xml_error)}"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", message=str(xml_error))
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing XML for programs: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
if source_file:
|
||||
source_file.close()
|
||||
source_file = None
|
||||
gc.collect()
|
||||
|
||||
# If there were failures, include them in the message but continue
|
||||
if failed_entries:
|
||||
epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed
|
||||
error_summary = f"Failed to parse {len(failed_entries)} of {epg_count} entries"
|
||||
stats_summary = f"Processed {program_count} programs across {channel_count} channels. Updated: {updated_count}."
|
||||
epg_source.last_message = f"{stats_summary} Warning: {error_summary}"
|
||||
epg_source.updated_at = timezone.now()
|
||||
epg_source.save(update_fields=['status', 'last_message', 'updated_at'])
|
||||
# Count channels that actually got programs
|
||||
channels_with_programs = sum(1 for count in programs_by_channel.values() if count > 0)
|
||||
|
||||
# Send completion notification with mixed status
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100,
|
||||
status="success",
|
||||
message=epg_source.last_message)
|
||||
|
||||
# Explicitly release memory of large lists before returning
|
||||
del failed_entries
|
||||
gc.collect()
|
||||
|
||||
return True
|
||||
|
||||
# If all successful, set a comprehensive success message
|
||||
# Success message
|
||||
epg_source.status = EPGSource.STATUS_SUCCESS
|
||||
epg_source.last_message = f"Successfully processed {program_count} programs across {channel_count} channels. Updated: {updated_count}."
|
||||
epg_source.last_message = (
|
||||
f"Parsed {total_programs:,} programs for {channels_with_programs} channels "
|
||||
f"(skipped {skipped_programs:,} programmes for {total_epg_count - mapped_count} unmapped channels)"
|
||||
)
|
||||
epg_source.updated_at = timezone.now()
|
||||
epg_source.save(update_fields=['status', 'last_message', 'updated_at'])
|
||||
|
||||
|
|
@ -1500,9 +1629,10 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
log_system_event(
|
||||
event_type='epg_refresh',
|
||||
source_name=epg_source.name,
|
||||
programs=program_count,
|
||||
channels=channel_count,
|
||||
updated=updated_count,
|
||||
programs=total_programs,
|
||||
channels=channels_with_programs,
|
||||
skipped_programs=skipped_programs,
|
||||
unmapped_channels=total_epg_count - mapped_count,
|
||||
)
|
||||
|
||||
# Send completion notification with status
|
||||
|
|
@ -1510,7 +1640,9 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
status="success",
|
||||
message=epg_source.last_message)
|
||||
|
||||
logger.info(f"Completed parsing all programs for source: {epg_source.name}")
|
||||
logger.info(f"Completed parsing programs for source: {epg_source.name} - "
|
||||
f"{total_programs:,} programs for {channels_with_programs} channels, "
|
||||
f"skipped {skipped_programs:,} programmes for unmapped channels")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -1525,14 +1657,19 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
return False
|
||||
finally:
|
||||
# Final memory cleanup and tracking
|
||||
|
||||
if source_file:
|
||||
try:
|
||||
source_file.close()
|
||||
except:
|
||||
pass
|
||||
source_file = None
|
||||
|
||||
# Explicitly release any remaining large data structures
|
||||
failed_entries = None
|
||||
program_count = None
|
||||
channel_count = None
|
||||
updated_count = None
|
||||
processed = None
|
||||
programs_to_create = None
|
||||
programs_by_channel = None
|
||||
mapped_epg_ids = None
|
||||
mapped_tvg_ids = None
|
||||
tvg_id_to_epg_id = None
|
||||
gc.collect()
|
||||
|
||||
# Add comprehensive memory cleanup at the end
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue