mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
Initial rework of EPG processesing.
This commit is contained in:
parent
eecf879119
commit
f18ca4de37
4 changed files with 464 additions and 244 deletions
|
|
@ -5,9 +5,11 @@ import gzip
|
|||
import os
|
||||
import uuid
|
||||
import requests
|
||||
import xml.etree.ElementTree as ET
|
||||
import time # Add import for tracking download progress
|
||||
from datetime import datetime, timedelta, timezone as dt_timezone
|
||||
import gc # Add garbage collection module
|
||||
import json
|
||||
from lxml import etree # Using lxml exclusively
|
||||
|
||||
from celery import shared_task
|
||||
from django.conf import settings
|
||||
|
|
@ -40,13 +42,16 @@ def send_epg_update(source_id, action, progress, **kwargs):
|
|||
|
||||
# Now, send the updated data dictionary
|
||||
channel_layer = get_channel_layer()
|
||||
async_to_sync(channel_layer.group_send)(
|
||||
'updates',
|
||||
{
|
||||
'type': 'update',
|
||||
'data': data
|
||||
}
|
||||
)
|
||||
try:
|
||||
async_to_sync(channel_layer.group_send)(
|
||||
'updates',
|
||||
{
|
||||
'type': 'update',
|
||||
'data': data
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to send WebSocket update: {e}")
|
||||
|
||||
|
||||
def delete_epg_refresh_task_by_id(epg_id):
|
||||
|
|
@ -206,8 +211,12 @@ def fetch_xmltv(source):
|
|||
send_epg_update(source.id, "downloading", 100, status="error", error="No URL provided and no valid local file exists")
|
||||
return False
|
||||
|
||||
# Clean up existing cache file
|
||||
if os.path.exists(source.get_cache_file()):
|
||||
os.remove(source.get_cache_file())
|
||||
try:
|
||||
os.remove(source.get_cache_file())
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to remove existing cache file: {e}")
|
||||
|
||||
logger.info(f"Fetching XMLTV data from source: {source.name}")
|
||||
try:
|
||||
|
|
@ -235,7 +244,7 @@ def fetch_xmltv(source):
|
|||
send_epg_update(source.id, "downloading", 0)
|
||||
|
||||
# Use streaming response to track download progress
|
||||
with requests.get(source.url, headers=headers, stream=True, timeout=30) as response:
|
||||
with requests.get(source.url, headers=headers, stream=True, timeout=60) as response:
|
||||
# Handle 404 specifically
|
||||
if response.status_code == 404:
|
||||
logger.error(f"EPG URL not found (404): {source.url}")
|
||||
|
|
@ -304,9 +313,10 @@ def fetch_xmltv(source):
|
|||
downloaded = 0
|
||||
start_time = time.time()
|
||||
last_update_time = start_time
|
||||
update_interval = 0.5 # Only update every 0.5 seconds
|
||||
|
||||
with open(cache_file, 'wb') as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
for chunk in response.iter_content(chunk_size=16384): # Increased chunk size for better performance
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
|
||||
|
|
@ -326,17 +336,18 @@ def fetch_xmltv(source):
|
|||
# Time remaining (in seconds)
|
||||
time_remaining = (total_size - downloaded) / (speed * 1024) if speed > 0 and total_size > 0 else 0
|
||||
|
||||
# Only send updates every 0.5 seconds to avoid flooding
|
||||
# Only send updates at specified intervals to avoid flooding
|
||||
current_time = time.time()
|
||||
if current_time - last_update_time >= 0.5 and progress > 0:
|
||||
if current_time - last_update_time >= update_interval and progress > 0:
|
||||
last_update_time = current_time
|
||||
send_epg_update(
|
||||
source.id,
|
||||
"downloading",
|
||||
progress,
|
||||
speed=speed,
|
||||
elapsed_time=elapsed_time,
|
||||
time_remaining=time_remaining
|
||||
speed=round(speed, 2),
|
||||
elapsed_time=round(elapsed_time, 1),
|
||||
time_remaining=round(time_remaining, 1),
|
||||
downloaded=f"{downloaded / (1024 * 1024):.2f} MB"
|
||||
)
|
||||
|
||||
# Send completion notification
|
||||
|
|
@ -424,6 +435,20 @@ def fetch_xmltv(source):
|
|||
)
|
||||
send_epg_update(source.id, "downloading", 100, status="error", error=user_message)
|
||||
return False
|
||||
except requests.exceptions.Timeout as e:
|
||||
# Handle timeout errors specifically
|
||||
error_message = str(e)
|
||||
user_message = f"Timeout error: EPG source '{source.name}' took too long to respond"
|
||||
logger.error(f"Timeout error fetching XMLTV from {source.name}: {e}", exc_info=True)
|
||||
|
||||
# Update source status
|
||||
source.status = 'error'
|
||||
source.last_message = user_message
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
|
||||
# Send notifications
|
||||
send_epg_update(source.id, "downloading", 100, status="error", error=user_message)
|
||||
return False
|
||||
except Exception as e:
|
||||
error_message = str(e)
|
||||
logger.error(f"Error fetching XMLTV from {source.name}: {e}", exc_info=True)
|
||||
|
|
@ -496,65 +521,133 @@ def parse_channels_only(source):
|
|||
logger.info(f"Parsing channels from EPG file: {file_path}")
|
||||
existing_epgs = {e.tvg_id: e for e in EPGData.objects.filter(epg_source=source)}
|
||||
|
||||
# Read entire file (decompress if .gz)
|
||||
if file_path.endswith('.gz'):
|
||||
with open(file_path, 'rb') as gz_file:
|
||||
decompressed = gzip.decompress(gz_file.read())
|
||||
xml_data = decompressed.decode('utf-8')
|
||||
else:
|
||||
with open(file_path, 'r', encoding='utf-8') as xml_file:
|
||||
xml_data = xml_file.read()
|
||||
# Update progress to show file read starting
|
||||
send_epg_update(source.id, "parsing_channels", 10)
|
||||
|
||||
# Update progress to show file read completed
|
||||
send_epg_update(source.id, "parsing_channels", 25)
|
||||
|
||||
root = ET.fromstring(xml_data)
|
||||
channels = root.findall('channel')
|
||||
# Stream parsing instead of loading entire file at once
|
||||
is_gzipped = file_path.endswith('.gz')
|
||||
|
||||
epgs_to_create = []
|
||||
epgs_to_update = []
|
||||
total_channels = 0
|
||||
processed_channels = 0
|
||||
batch_size = 500 # Process in batches to limit memory usage
|
||||
|
||||
logger.info(f"Found {len(channels)} <channel> entries in {file_path}")
|
||||
try:
|
||||
# Create a parser with the desired options
|
||||
parser = etree.XMLParser(huge_tree=True, remove_blank_text=True)
|
||||
|
||||
# Update progress to show parsing started
|
||||
send_epg_update(source.id, "parsing_channels", 50)
|
||||
# Count channels for progress reporting - use proper lxml approach
|
||||
# Open the file first
|
||||
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
|
||||
|
||||
total_channels = len(channels)
|
||||
for i, channel_elem in enumerate(channels):
|
||||
tvg_id = channel_elem.get('id', '').strip()
|
||||
if not tvg_id:
|
||||
continue # skip blank/invalid IDs
|
||||
# Create an iterparse context without parser parameter
|
||||
channel_finder = etree.iterparse(source_file, events=('end',), tag='channel')
|
||||
|
||||
display_name = channel_elem.findtext('display-name', default=tvg_id).strip()
|
||||
# Count channels
|
||||
total_channels = sum(1 for _ in channel_finder)
|
||||
|
||||
if tvg_id in existing_epgs:
|
||||
epg_obj = existing_epgs[tvg_id]
|
||||
if epg_obj.name != display_name:
|
||||
epg_obj.name = display_name
|
||||
epgs_to_update.append(epg_obj)
|
||||
else:
|
||||
epgs_to_create.append(EPGData(
|
||||
tvg_id=tvg_id,
|
||||
name=display_name,
|
||||
epg_source=source,
|
||||
))
|
||||
# Close the file to reset position
|
||||
source_file.close()
|
||||
|
||||
# Send occasional progress updates
|
||||
if i % 100 == 0 or i == total_channels - 1:
|
||||
progress = 50 + int((i / total_channels) * 40) # Scale to 50-90% range
|
||||
send_epg_update(source.id, "parsing_channels", progress)
|
||||
# Update progress after counting
|
||||
send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels)
|
||||
|
||||
# Update progress before database operations
|
||||
send_epg_update(source.id, "parsing_channels", 90)
|
||||
# Reset file position for actual processing
|
||||
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
|
||||
channel_parser = etree.iterparse(source_file, events=('end',), tag='channel')
|
||||
|
||||
for _, elem in channel_parser:
|
||||
tvg_id = elem.get('id', '').strip()
|
||||
if tvg_id:
|
||||
display_name = None
|
||||
for child in elem:
|
||||
if child.tag == 'display-name' and child.text:
|
||||
display_name = child.text.strip()
|
||||
break
|
||||
|
||||
if not display_name:
|
||||
display_name = tvg_id
|
||||
|
||||
if tvg_id in existing_epgs:
|
||||
epg_obj = existing_epgs[tvg_id]
|
||||
if epg_obj.name != display_name:
|
||||
epg_obj.name = display_name
|
||||
epgs_to_update.append(epg_obj)
|
||||
else:
|
||||
epgs_to_create.append(EPGData(
|
||||
tvg_id=tvg_id,
|
||||
name=display_name,
|
||||
epg_source=source,
|
||||
))
|
||||
|
||||
processed_channels += 1
|
||||
|
||||
# Batch processing
|
||||
if len(epgs_to_create) >= batch_size:
|
||||
EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True)
|
||||
epgs_to_create = []
|
||||
# Force garbage collection
|
||||
gc.collect()
|
||||
|
||||
if len(epgs_to_update) >= batch_size:
|
||||
EPGData.objects.bulk_update(epgs_to_update, ["name"])
|
||||
epgs_to_update = []
|
||||
# Force garbage collection
|
||||
gc.collect()
|
||||
|
||||
# Send progress updates
|
||||
if processed_channels % 100 == 0 or processed_channels == total_channels:
|
||||
progress = 25 + int((processed_channels / total_channels) * 65) if total_channels > 0 else 90
|
||||
send_epg_update(
|
||||
source.id,
|
||||
"parsing_channels",
|
||||
progress,
|
||||
processed=processed_channels,
|
||||
total=total_channels
|
||||
)
|
||||
|
||||
# Clear memory
|
||||
elem.clear()
|
||||
while elem.getprevious() is not None:
|
||||
del elem.getparent()[0]
|
||||
|
||||
# Make sure to close the file
|
||||
source_file.close()
|
||||
|
||||
except (etree.XMLSyntaxError, Exception) as xml_error:
|
||||
# Instead of falling back, just handle the error
|
||||
logger.error(f"XML parsing failed: {xml_error}")
|
||||
# Update status to error
|
||||
source.status = 'error'
|
||||
source.last_message = f"Error parsing XML file: {str(xml_error)}"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(xml_error))
|
||||
return False
|
||||
|
||||
# Process any remaining items
|
||||
if epgs_to_create:
|
||||
EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True)
|
||||
|
||||
if epgs_to_update:
|
||||
EPGData.objects.bulk_update(epgs_to_update, ["name"])
|
||||
|
||||
# Final garbage collection
|
||||
gc.collect()
|
||||
|
||||
# Update source status with channel count
|
||||
source.status = 'success'
|
||||
source.last_message = f"Successfully parsed {processed_channels} channels"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
|
||||
# Send completion notification
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="success")
|
||||
send_epg_update(
|
||||
source.id,
|
||||
"parsing_channels",
|
||||
100,
|
||||
status="success",
|
||||
channels_count=processed_channels
|
||||
)
|
||||
|
||||
channel_layer = get_channel_layer()
|
||||
async_to_sync(channel_layer.group_send)(
|
||||
|
|
@ -565,7 +658,7 @@ def parse_channels_only(source):
|
|||
}
|
||||
)
|
||||
|
||||
logger.info("Finished parsing channel info.")
|
||||
logger.info(f"Finished parsing channel info. Found {processed_channels} channels.")
|
||||
return True
|
||||
|
||||
except FileNotFoundError:
|
||||
|
|
@ -592,199 +685,183 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task")
|
||||
return "Task already running"
|
||||
|
||||
epg = EPGData.objects.get(id=epg_id)
|
||||
epg_source = epg.epg_source
|
||||
try:
|
||||
epg = EPGData.objects.get(id=epg_id)
|
||||
epg_source = epg.epg_source
|
||||
|
||||
if not Channel.objects.filter(epg_data=epg).exists():
|
||||
logger.info(f"No channels matched to EPG {epg.tvg_id}")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}")
|
||||
|
||||
# First, remove all existing programs
|
||||
ProgramData.objects.filter(epg=epg).delete()
|
||||
|
||||
file_path = epg_source.file_path
|
||||
if not file_path:
|
||||
file_path = epg_source.get_cache_file()
|
||||
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"EPG file not found at: {file_path}")
|
||||
|
||||
# Update the file path in the database
|
||||
new_path = epg_source.get_cache_file()
|
||||
logger.info(f"Updating file_path from '{file_path}' to '{new_path}'")
|
||||
epg_source.file_path = new_path
|
||||
epg_source.save(update_fields=['file_path'])
|
||||
|
||||
# Fetch new data before continuing
|
||||
if epg_source.url:
|
||||
logger.info(f"Fetching new EPG data from URL: {epg_source.url}")
|
||||
# Properly check the return value from fetch_xmltv
|
||||
fetch_success = fetch_xmltv(epg_source)
|
||||
|
||||
# If fetch was not successful or the file still doesn't exist, abort
|
||||
if not fetch_success:
|
||||
logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}")
|
||||
# Update status to error if not already set
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"Failed to download EPG data, cannot parse programs"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
# Also check if the file exists after download
|
||||
if not os.path.exists(new_path):
|
||||
logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}")
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"Failed to download EPG data, file missing after download"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
else:
|
||||
logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data")
|
||||
# Update status to error
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"No URL provided, cannot fetch EPG data"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided")
|
||||
if not Channel.objects.filter(epg_data=epg).exists():
|
||||
logger.info(f"No channels matched to EPG {epg.tvg_id}")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
file_path = new_path
|
||||
logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}")
|
||||
|
||||
# Read entire file (decompress if .gz)
|
||||
try:
|
||||
if file_path.endswith('.gz'):
|
||||
with open(file_path, 'rb') as gz_file:
|
||||
decompressed = gzip.decompress(gz_file.read())
|
||||
xml_data = decompressed.decode('utf-8')
|
||||
else:
|
||||
with open(file_path, 'r', encoding='utf-8') as xml_file:
|
||||
xml_data = xml_file.read()
|
||||
except FileNotFoundError:
|
||||
logger.error(f"EPG file not found at: {file_path}")
|
||||
# First, remove all existing programs - use chunked delete to avoid memory issues
|
||||
chunk_size = 5000
|
||||
programs_to_delete = ProgramData.objects.filter(epg=epg)
|
||||
total_programs = programs_to_delete.count()
|
||||
|
||||
if total_programs > 0:
|
||||
logger.info(f"Deleting {total_programs} existing programs for {epg.tvg_id}")
|
||||
|
||||
# Get only the IDs to conserve memory
|
||||
program_ids = list(programs_to_delete.values_list('id', flat=True))
|
||||
|
||||
# Delete in chunks using ID-based filtering
|
||||
for i in range(0, len(program_ids), chunk_size):
|
||||
chunk_ids = program_ids[i:i + chunk_size]
|
||||
ProgramData.objects.filter(id__in=chunk_ids).delete()
|
||||
gc.collect() # Force garbage collection after batch delete
|
||||
|
||||
file_path = epg_source.file_path
|
||||
if not file_path:
|
||||
file_path = epg_source.get_cache_file()
|
||||
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"EPG file not found at: {file_path}")
|
||||
|
||||
# Update the file path in the database
|
||||
new_path = epg_source.get_cache_file()
|
||||
logger.info(f"Updating file_path from '{file_path}' to '{new_path}'")
|
||||
epg_source.file_path = new_path
|
||||
epg_source.save(update_fields=['file_path'])
|
||||
|
||||
# Fetch new data before continuing
|
||||
if epg_source.url:
|
||||
logger.info(f"Fetching new EPG data from URL: {epg_source.url}")
|
||||
# Properly check the return value from fetch_xmltv
|
||||
fetch_success = fetch_xmltv(epg_source)
|
||||
|
||||
# If fetch was not successful or the file still doesn't exist, abort
|
||||
if not fetch_success:
|
||||
logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}")
|
||||
# Update status to error if not already set
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"Failed to download EPG data, cannot parse programs"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
# Also check if the file exists after download
|
||||
if not os.path.exists(new_path):
|
||||
logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}")
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"Failed to download EPG data, file missing after download"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
else:
|
||||
logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data")
|
||||
# Update status to error
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"No URL provided, cannot fetch EPG data"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
file_path = new_path
|
||||
|
||||
# Use streaming parsing to reduce memory usage
|
||||
is_gzipped = file_path.endswith('.gz')
|
||||
|
||||
logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}")
|
||||
|
||||
programs_to_create = []
|
||||
batch_size = 1000 # Process in batches to limit memory usage
|
||||
|
||||
try:
|
||||
# Create a parser with the desired options
|
||||
parser = etree.XMLParser(huge_tree=True, remove_blank_text=True)
|
||||
|
||||
# Open the file properly
|
||||
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
|
||||
|
||||
# Stream parse the file using lxml's iterparse (without parser parameter)
|
||||
program_parser = etree.iterparse(source_file, events=('end',), tag='programme')
|
||||
|
||||
for _, elem in program_parser:
|
||||
if elem.get('channel') == epg.tvg_id:
|
||||
try:
|
||||
start_time = parse_xmltv_time(elem.get('start'))
|
||||
end_time = parse_xmltv_time(elem.get('stop'))
|
||||
title = None
|
||||
desc = None
|
||||
sub_title = None
|
||||
|
||||
# Efficiently process child elements
|
||||
for child in elem:
|
||||
if child.tag == 'title':
|
||||
title = child.text or 'No Title'
|
||||
elif child.tag == 'desc':
|
||||
desc = child.text or ''
|
||||
elif child.tag == 'sub-title':
|
||||
sub_title = child.text or ''
|
||||
|
||||
if not title:
|
||||
title = 'No Title'
|
||||
|
||||
# Extract custom properties
|
||||
custom_props = extract_custom_properties(elem)
|
||||
custom_properties_json = None
|
||||
if custom_props:
|
||||
try:
|
||||
custom_properties_json = json.dumps(custom_props)
|
||||
except Exception as e:
|
||||
logger.error(f"Error serializing custom properties to JSON: {e}", exc_info=True)
|
||||
|
||||
programs_to_create.append(ProgramData(
|
||||
epg=epg,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
title=title,
|
||||
description=desc,
|
||||
sub_title=sub_title,
|
||||
tvg_id=epg.tvg_id,
|
||||
custom_properties=custom_properties_json
|
||||
))
|
||||
|
||||
# Batch processing
|
||||
if len(programs_to_create) >= batch_size:
|
||||
ProgramData.objects.bulk_create(programs_to_create)
|
||||
logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}")
|
||||
programs_to_create = []
|
||||
# Force garbage collection after batch processing
|
||||
gc.collect()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing program for {epg.tvg_id}: {e}", exc_info=True)
|
||||
|
||||
# Important: Clear the element to avoid memory leaks (lxml specific method)
|
||||
elem.clear()
|
||||
# Also eliminate ancestors to prevent memory leaks
|
||||
while elem.getprevious() is not None:
|
||||
del elem.getparent()[0]
|
||||
|
||||
# Make sure to close the file
|
||||
source_file.close()
|
||||
|
||||
except etree.XMLSyntaxError as xml_error:
|
||||
logger.error(f"XML syntax error parsing program data: {xml_error}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing XML for programs: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
# Process any remaining items
|
||||
if programs_to_create:
|
||||
ProgramData.objects.bulk_create(programs_to_create)
|
||||
logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}")
|
||||
|
||||
# Final garbage collection
|
||||
gc.collect()
|
||||
|
||||
logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.")
|
||||
finally:
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading EPG file {file_path}: {e}", exc_info=True)
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
root = ET.fromstring(xml_data)
|
||||
|
||||
# Find only <programme> elements for this tvg_id
|
||||
matched_programmes = [p for p in root.findall('programme') if p.get('channel') == epg.tvg_id]
|
||||
logger.debug(f"Found {len(matched_programmes)} programmes for tvg_id={epg.tvg_id}")
|
||||
|
||||
programs_to_create = []
|
||||
for prog in matched_programmes:
|
||||
start_time = parse_xmltv_time(prog.get('start'))
|
||||
end_time = parse_xmltv_time(prog.get('stop'))
|
||||
title = prog.findtext('title', default='No Title')
|
||||
desc = prog.findtext('desc', default='')
|
||||
sub_title = prog.findtext('sub-title', default='')
|
||||
|
||||
# Extract custom properties
|
||||
custom_props = {}
|
||||
|
||||
# Extract categories
|
||||
categories = []
|
||||
for cat_elem in prog.findall('category'):
|
||||
if cat_elem.text and cat_elem.text.strip():
|
||||
categories.append(cat_elem.text.strip())
|
||||
if categories:
|
||||
custom_props['categories'] = categories
|
||||
|
||||
# Extract episode numbers
|
||||
for ep_num in prog.findall('episode-num'):
|
||||
system = ep_num.get('system', '')
|
||||
if system == 'xmltv_ns' and ep_num.text:
|
||||
# Parse XMLTV episode-num format (season.episode.part)
|
||||
parts = ep_num.text.split('.')
|
||||
if len(parts) >= 2:
|
||||
if parts[0].strip() != '':
|
||||
try:
|
||||
season = int(parts[0]) + 1 # XMLTV format is zero-based
|
||||
custom_props['season'] = season
|
||||
except ValueError:
|
||||
pass
|
||||
if parts[1].strip() != '':
|
||||
try:
|
||||
episode = int(parts[1]) + 1 # XMLTV format is zero-based
|
||||
custom_props['episode'] = episode
|
||||
except ValueError:
|
||||
pass
|
||||
elif system == 'onscreen' and ep_num.text:
|
||||
# Just store the raw onscreen format
|
||||
custom_props['onscreen_episode'] = ep_num.text.strip()
|
||||
|
||||
# Extract ratings
|
||||
for rating_elem in prog.findall('rating'):
|
||||
if rating_elem.findtext('value'):
|
||||
custom_props['rating'] = rating_elem.findtext('value').strip()
|
||||
if rating_elem.get('system'):
|
||||
custom_props['rating_system'] = rating_elem.get('system')
|
||||
break # Just use the first rating
|
||||
|
||||
# Extract credits (actors, directors, etc.)
|
||||
credits_elem = prog.find('credits')
|
||||
if credits_elem is not None:
|
||||
credits = {}
|
||||
for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']:
|
||||
elements = credits_elem.findall(credit_type)
|
||||
if elements:
|
||||
names = [e.text.strip() for e in elements if e.text and e.text.strip()]
|
||||
if names:
|
||||
credits[credit_type] = names
|
||||
if credits:
|
||||
custom_props['credits'] = credits
|
||||
|
||||
# Extract other common program metadata
|
||||
if prog.findtext('date'):
|
||||
custom_props['year'] = prog.findtext('date').strip()[:4] # Just the year part
|
||||
|
||||
if prog.findtext('country'):
|
||||
custom_props['country'] = prog.findtext('country').strip()
|
||||
|
||||
for icon_elem in prog.findall('icon'):
|
||||
if icon_elem.get('src'):
|
||||
custom_props['icon'] = icon_elem.get('src')
|
||||
break # Just use the first icon
|
||||
|
||||
for kw in ['previously-shown', 'premiere', 'new']:
|
||||
if prog.find(kw) is not None:
|
||||
custom_props[kw.replace('-', '_')] = True
|
||||
|
||||
# Convert custom_props to JSON string if not empty
|
||||
custom_properties_json = None
|
||||
if custom_props:
|
||||
import json
|
||||
try:
|
||||
custom_properties_json = json.dumps(custom_props)
|
||||
except Exception as e:
|
||||
logger.error(f"Error serializing custom properties to JSON: {e}", exc_info=True)
|
||||
|
||||
programs_to_create.append(ProgramData(
|
||||
epg=epg,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
title=title,
|
||||
description=desc,
|
||||
sub_title=sub_title,
|
||||
tvg_id=epg.tvg_id,
|
||||
custom_properties=custom_properties_json
|
||||
))
|
||||
|
||||
ProgramData.objects.bulk_create(programs_to_create)
|
||||
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
|
||||
logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.")
|
||||
|
||||
|
||||
def parse_programs_for_source(epg_source, tvg_id=None):
|
||||
|
|
@ -966,3 +1043,81 @@ def parse_schedules_direct_time(time_str):
|
|||
except Exception as e:
|
||||
logger.error(f"Error parsing Schedules Direct time '{time_str}': {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
# Helper function to extract custom properties - moved to a separate function to clean up the code
|
||||
def extract_custom_properties(prog):
|
||||
custom_props = {}
|
||||
|
||||
# Extract categories
|
||||
categories = []
|
||||
for cat_elem in prog.findall('category'):
|
||||
if cat_elem.text and cat_elem.text.strip():
|
||||
categories.append(cat_elem.text.strip())
|
||||
if categories:
|
||||
custom_props['categories'] = categories
|
||||
|
||||
# Extract episode numbers
|
||||
for ep_num in prog.findall('episode-num'):
|
||||
system = ep_num.get('system', '')
|
||||
if system == 'xmltv_ns' and ep_num.text:
|
||||
# Parse XMLTV episode-num format (season.episode.part)
|
||||
parts = ep_num.text.split('.')
|
||||
if len(parts) >= 2:
|
||||
if parts[0].strip() != '':
|
||||
try:
|
||||
season = int(parts[0]) + 1 # XMLTV format is zero-based
|
||||
custom_props['season'] = season
|
||||
except ValueError:
|
||||
pass
|
||||
if parts[1].strip() != '':
|
||||
try:
|
||||
episode = int(parts[1]) + 1 # XMLTV format is zero-based
|
||||
custom_props['episode'] = episode
|
||||
except ValueError:
|
||||
pass
|
||||
elif system == 'onscreen' and ep_num.text:
|
||||
# Just store the raw onscreen format
|
||||
custom_props['onscreen_episode'] = ep_num.text.strip()
|
||||
|
||||
# Extract ratings
|
||||
for rating_elem in prog.findall('rating'):
|
||||
value_elem = rating_elem.find('value')
|
||||
if value_elem is not None and value_elem.text:
|
||||
custom_props['rating'] = value_elem.text.strip()
|
||||
if rating_elem.get('system'):
|
||||
custom_props['rating_system'] = rating_elem.get('system')
|
||||
break # Just use the first rating
|
||||
|
||||
# Extract credits (actors, directors, etc.)
|
||||
credits_elem = prog.find('credits')
|
||||
if credits_elem is not None:
|
||||
credits = {}
|
||||
for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']:
|
||||
elements = credits_elem.findall(credit_type)
|
||||
if elements:
|
||||
names = [e.text.strip() for e in elements if e.text and e.text.strip()]
|
||||
if names:
|
||||
credits[credit_type] = names
|
||||
if credits:
|
||||
custom_props['credits'] = credits
|
||||
|
||||
# Extract other common program metadata
|
||||
date_elem = prog.find('date')
|
||||
if date_elem is not None and date_elem.text:
|
||||
custom_props['year'] = date_elem.text.strip()[:4] # Just the year part
|
||||
|
||||
country_elem = prog.find('country')
|
||||
if country_elem is not None and country_elem.text:
|
||||
custom_props['country'] = country_elem.text.strip()
|
||||
|
||||
for icon_elem in prog.findall('icon'):
|
||||
if icon_elem.get('src'):
|
||||
custom_props['icon'] = icon_elem.get('src')
|
||||
break # Just use the first icon
|
||||
|
||||
for kw in ['previously-shown', 'premiere', 'new']:
|
||||
if prog.find(kw) is not None:
|
||||
custom_props[kw.replace('-', '_')] = True
|
||||
|
||||
return custom_props
|
||||
|
|
|
|||
|
|
@ -59,6 +59,10 @@ class RedisClient:
|
|||
client.config_set('save', '') # Disable RDB snapshots
|
||||
client.config_set('appendonly', 'no') # Disable AOF logging
|
||||
|
||||
# Set optimal memory settings
|
||||
client.config_set('maxmemory-policy', 'allkeys-lru') # Use LRU eviction
|
||||
client.config_set('maxmemory', '256mb') # Set reasonable memory limit
|
||||
|
||||
# Disable protected mode when in debug mode
|
||||
if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true':
|
||||
client.config_set('protected-mode', 'no') # Disable protected mode in debug
|
||||
|
|
@ -178,3 +182,34 @@ def send_websocket_event(event, success, data):
|
|||
"data": {"success": True, "type": "epg_channels"}
|
||||
}
|
||||
)
|
||||
|
||||
# Add memory monitoring utilities
|
||||
def get_memory_usage():
|
||||
"""Returns current memory usage in MB"""
|
||||
import psutil
|
||||
process = psutil.Process(os.getpid())
|
||||
return process.memory_info().rss / (1024 * 1024)
|
||||
|
||||
def monitor_memory_usage(func):
|
||||
"""Decorator to monitor memory usage before and after function execution"""
|
||||
def wrapper(*args, **kwargs):
|
||||
import gc
|
||||
# Force garbage collection before measuring
|
||||
gc.collect()
|
||||
|
||||
# Get initial memory usage
|
||||
start_mem = get_memory_usage()
|
||||
logger.debug(f"Memory usage before {func.__name__}: {start_mem:.2f} MB")
|
||||
|
||||
# Call the original function
|
||||
result = func(*args, **kwargs)
|
||||
|
||||
# Force garbage collection before measuring again
|
||||
gc.collect()
|
||||
|
||||
# Get final memory usage
|
||||
end_mem = get_memory_usage()
|
||||
logger.debug(f"Memory usage after {func.__name__}: {end_mem:.2f} MB (Change: {end_mem - start_mem:.2f} MB)")
|
||||
|
||||
return result
|
||||
return wrapper
|
||||
|
|
|
|||
|
|
@ -43,6 +43,34 @@ INSTALLED_APPS = [
|
|||
'django_celery_beat',
|
||||
]
|
||||
|
||||
# EPG Processing optimization settings
|
||||
EPG_BATCH_SIZE = 1000 # Number of records to process in a batch
|
||||
EPG_MEMORY_LIMIT = 512 # Memory limit in MB before forcing garbage collection
|
||||
EPG_ENABLE_MEMORY_MONITORING = True # Whether to monitor memory usage during processing
|
||||
|
||||
# Database optimization settings
|
||||
DATABASE_STATEMENT_TIMEOUT = 300 # Seconds before timing out long-running queries
|
||||
DATABASE_CONN_MAX_AGE = 60 # Connection max age in seconds, helps with frequent reconnects
|
||||
|
||||
# Disable atomic requests for performance-sensitive views
|
||||
ATOMIC_REQUESTS = False
|
||||
|
||||
# Cache settings - add caching for EPG operations
|
||||
CACHES = {
|
||||
'default': {
|
||||
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
|
||||
'LOCATION': 'dispatcharr-epg-cache',
|
||||
'TIMEOUT': 3600, # 1 hour cache timeout
|
||||
'OPTIONS': {
|
||||
'MAX_ENTRIES': 10000,
|
||||
'CULL_FREQUENCY': 3, # Purge 1/3 of entries when max is reached
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Timeouts for external connections
|
||||
REQUESTS_TIMEOUT = 30 # Seconds for external API requests
|
||||
|
||||
MIDDLEWARE = [
|
||||
'django.middleware.security.SecurityMiddleware',
|
||||
'django.contrib.sessions.middleware.SessionMiddleware',
|
||||
|
|
|
|||
|
|
@ -28,3 +28,5 @@ channels
|
|||
channels-redis
|
||||
django-filter
|
||||
django-celery-beat
|
||||
memory-profiler==0.61.0
|
||||
lxml==5.4.0
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue