mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 18:54:58 +00:00
This enhancement introduces a powerful custom dummy EPG system that allows users to generate EPG programs on-demand by parsing channel or stream names using configurable regex patterns.
Key Features:
- Custom Pattern Matching: Define regex patterns to extract information from channel/stream names (teams, leagues, times, dates, etc.)
- Flexible Name Source: Choose to parse either the channel name or a specific stream name (by index)
- Timezone-Aware Scheduling: Automatic DST handling using pytz timezone names (e.g., 'US/Eastern', 'Europe/London')
- Time Format Support: Parse both 12-hour (AM/PM) and 24-hour time formats
- Date Parsing: Extract dates from names with flexible month/day/year patterns
- Custom Templates: Format EPG titles and descriptions using captured groups with {placeholder} syntax
- Upcoming/Ended Customization: Define custom titles and descriptions for programs before and after scheduled events
- Live Preview: Test patterns and templates in real-time with sample input
- Smart Program Generation: Automatically creates "Upcoming" and "Ended" programs around scheduled events
Use Cases:
- Sports channels with event details in stream names (e.g., "NHL 01: Bruins VS Leafs @ 8:00PM ET")
- Movie channels with genre/title/year information
- Racing events with driver/track/series details
- Any scenario where EPG data is embedded in channel/stream naming conventions
Technical Implementation:
- Backend: Pattern matching engine with timezone conversion and program scheduling logic
- Frontend: Interactive form with validation, pattern testing, and visual group preview
- Name Source Options: Parse from channel name or selectable stream index (1-based)
- Fallback Behavior: Uses standard dummy EPG if patterns don't match
- Custom Properties: Stores all configuration in EPGSource.custom_properties JSON field
Configuration Options:
- Title Pattern: Extract primary information (required)
- Time Pattern: Extract hour/minute/AM-PM (optional)
- Date Pattern: Extract month/day/year (optional)
- Timezone: Event timezone with automatic DST support
- Program Duration: Length of generated programs in minutes
- Title Template: Format EPG title using captured groups
- Description Template: Format EPG description using captured groups
- Upcoming Title Template: Custom title for programs before event starts (optional)
- Upcoming Description Template: Custom description for programs before event starts (optional)
- Ended Title Template: Custom title for programs after event ends (optional)
- Ended Description Template: Custom description for programs after event ends (optional)
- Name Source: Channel name or stream name
- Stream Index: Which stream to use when parsing stream names (1, 2, 3, etc.)
Closes #293
1970 lines
89 KiB
Python
1970 lines
89 KiB
Python
# apps/epg/tasks.py
|
|
|
|
import logging
|
|
import gzip
|
|
import os
|
|
import uuid
|
|
import requests
|
|
import time # Add import for tracking download progress
|
|
from datetime import datetime, timedelta, timezone as dt_timezone
|
|
import gc # Add garbage collection module
|
|
import json
|
|
from lxml import etree # Using lxml exclusively
|
|
import psutil # Add import for memory tracking
|
|
import zipfile
|
|
|
|
from celery import shared_task
|
|
from django.conf import settings
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
from apps.channels.models import Channel
|
|
from core.models import UserAgent, CoreSettings
|
|
|
|
from asgiref.sync import async_to_sync
|
|
from channels.layers import get_channel_layer
|
|
|
|
from .models import EPGSource, EPGData, ProgramData
|
|
from core.utils import acquire_task_lock, release_task_lock, send_websocket_update, cleanup_memory
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def validate_icon_url_fast(icon_url, max_length=None):
|
|
"""
|
|
Fast validation for icon URLs during parsing.
|
|
Returns None if URL is too long, original URL otherwise.
|
|
If max_length is None, gets it dynamically from the EPGData model field.
|
|
"""
|
|
if max_length is None:
|
|
# Get max_length dynamically from the model field
|
|
max_length = EPGData._meta.get_field('icon_url').max_length
|
|
|
|
if icon_url and len(icon_url) > max_length:
|
|
logger.warning(f"Icon URL too long ({len(icon_url)} > {max_length}), skipping: {icon_url[:100]}...")
|
|
return None
|
|
return icon_url
|
|
|
|
|
|
MAX_EXTRACT_CHUNK_SIZE = 65536 # 64kb (base2)
|
|
|
|
|
|
def send_epg_update(source_id, action, progress, **kwargs):
|
|
"""Send WebSocket update about EPG download/parsing progress"""
|
|
# Start with the base data dictionary
|
|
data = {
|
|
"progress": progress,
|
|
"type": "epg_refresh",
|
|
"source": source_id,
|
|
"action": action,
|
|
}
|
|
|
|
# Add the additional key-value pairs from kwargs
|
|
data.update(kwargs)
|
|
|
|
# Use the standardized update function with garbage collection for program parsing
|
|
# This is a high-frequency operation that needs more aggressive memory management
|
|
collect_garbage = action == "parsing_programs" and progress % 10 == 0
|
|
send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage)
|
|
|
|
# Explicitly clear references
|
|
data = None
|
|
|
|
# For high-frequency parsing, occasionally force additional garbage collection
|
|
# to prevent memory buildup
|
|
if action == "parsing_programs" and progress % 50 == 0:
|
|
gc.collect()
|
|
|
|
|
|
def delete_epg_refresh_task_by_id(epg_id):
|
|
"""
|
|
Delete the periodic task associated with an EPG source ID.
|
|
Can be called directly or from the post_delete signal.
|
|
Returns True if a task was found and deleted, False otherwise.
|
|
"""
|
|
try:
|
|
task = None
|
|
task_name = f"epg_source-refresh-{epg_id}"
|
|
|
|
# Look for task by name
|
|
try:
|
|
from django_celery_beat.models import PeriodicTask, IntervalSchedule
|
|
task = PeriodicTask.objects.get(name=task_name)
|
|
logger.info(f"Found task by name: {task.id} for EPGSource {epg_id}")
|
|
except PeriodicTask.DoesNotExist:
|
|
logger.warning(f"No PeriodicTask found with name {task_name}")
|
|
return False
|
|
|
|
# Now delete the task and its interval
|
|
if task:
|
|
# Store interval info before deleting the task
|
|
interval_id = None
|
|
if hasattr(task, 'interval') and task.interval:
|
|
interval_id = task.interval.id
|
|
|
|
# Count how many TOTAL tasks use this interval (including this one)
|
|
tasks_with_same_interval = PeriodicTask.objects.filter(interval_id=interval_id).count()
|
|
logger.info(f"Interval {interval_id} is used by {tasks_with_same_interval} tasks total")
|
|
|
|
# Delete the task first
|
|
task_id = task.id
|
|
task.delete()
|
|
logger.info(f"Successfully deleted periodic task {task_id}")
|
|
|
|
# Now check if we should delete the interval
|
|
# We only delete if it was the ONLY task using this interval
|
|
if interval_id and tasks_with_same_interval == 1:
|
|
try:
|
|
interval = IntervalSchedule.objects.get(id=interval_id)
|
|
logger.info(f"Deleting interval schedule {interval_id} (not shared with other tasks)")
|
|
interval.delete()
|
|
logger.info(f"Successfully deleted interval {interval_id}")
|
|
except IntervalSchedule.DoesNotExist:
|
|
logger.warning(f"Interval {interval_id} no longer exists")
|
|
elif interval_id:
|
|
logger.info(f"Not deleting interval {interval_id} as it's shared with {tasks_with_same_interval-1} other tasks")
|
|
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error deleting periodic task for EPGSource {epg_id}: {str(e)}", exc_info=True)
|
|
return False
|
|
|
|
|
|
@shared_task
|
|
def refresh_all_epg_data():
|
|
logger.info("Starting refresh_epg_data task.")
|
|
# Exclude dummy EPG sources from refresh - they don't need refreshing
|
|
active_sources = EPGSource.objects.filter(is_active=True).exclude(source_type='dummy')
|
|
logger.debug(f"Found {active_sources.count()} active EPGSource(s) (excluding dummy EPGs).")
|
|
|
|
for source in active_sources:
|
|
refresh_epg_data(source.id)
|
|
# Force garbage collection between sources
|
|
gc.collect()
|
|
|
|
logger.info("Finished refresh_epg_data task.")
|
|
return "EPG data refreshed."
|
|
|
|
|
|
@shared_task
|
|
def refresh_epg_data(source_id):
|
|
if not acquire_task_lock('refresh_epg_data', source_id):
|
|
logger.debug(f"EPG refresh for {source_id} already running")
|
|
return
|
|
|
|
source = None
|
|
try:
|
|
# Try to get the EPG source
|
|
try:
|
|
source = EPGSource.objects.get(id=source_id)
|
|
except EPGSource.DoesNotExist:
|
|
# The EPG source doesn't exist, so delete the periodic task if it exists
|
|
logger.warning(f"EPG source with ID {source_id} not found, but task was triggered. Cleaning up orphaned task.")
|
|
|
|
# Call the shared function to delete the task
|
|
if delete_epg_refresh_task_by_id(source_id):
|
|
logger.info(f"Successfully cleaned up orphaned task for EPG source {source_id}")
|
|
else:
|
|
logger.info(f"No orphaned task found for EPG source {source_id}")
|
|
|
|
# Release the lock and exit
|
|
release_task_lock('refresh_epg_data', source_id)
|
|
# Force garbage collection before exit
|
|
gc.collect()
|
|
return f"EPG source {source_id} does not exist, task cleaned up"
|
|
|
|
# The source exists but is not active, just skip processing
|
|
if not source.is_active:
|
|
logger.info(f"EPG source {source_id} is not active. Skipping.")
|
|
release_task_lock('refresh_epg_data', source_id)
|
|
# Force garbage collection before exit
|
|
gc.collect()
|
|
return
|
|
|
|
# Skip refresh for dummy EPG sources - they don't need refreshing
|
|
if source.source_type == 'dummy':
|
|
logger.info(f"Skipping refresh for dummy EPG source {source.name} (ID: {source_id})")
|
|
release_task_lock('refresh_epg_data', source_id)
|
|
gc.collect()
|
|
return
|
|
|
|
# Continue with the normal processing...
|
|
logger.info(f"Processing EPGSource: {source.name} (type: {source.source_type})")
|
|
if source.source_type == 'xmltv':
|
|
fetch_success = fetch_xmltv(source)
|
|
if not fetch_success:
|
|
logger.error(f"Failed to fetch XMLTV for source {source.name}")
|
|
release_task_lock('refresh_epg_data', source_id)
|
|
# Force garbage collection before exit
|
|
gc.collect()
|
|
return
|
|
|
|
parse_channels_success = parse_channels_only(source)
|
|
if not parse_channels_success:
|
|
logger.error(f"Failed to parse channels for source {source.name}")
|
|
release_task_lock('refresh_epg_data', source_id)
|
|
# Force garbage collection before exit
|
|
gc.collect()
|
|
return
|
|
|
|
parse_programs_for_source(source)
|
|
|
|
elif source.source_type == 'schedules_direct':
|
|
fetch_schedules_direct(source)
|
|
|
|
source.save(update_fields=['updated_at'])
|
|
# After successful EPG refresh, evaluate DVR series rules to schedule new episodes
|
|
try:
|
|
from apps.channels.tasks import evaluate_series_rules
|
|
evaluate_series_rules.delay()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"Error in refresh_epg_data for source {source_id}: {e}", exc_info=True)
|
|
try:
|
|
if source:
|
|
source.status = 'error'
|
|
source.last_message = f"Error refreshing EPG data: {str(e)}"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(source_id, "refresh", 100, status="error", error=str(e))
|
|
except Exception as inner_e:
|
|
logger.error(f"Error updating source status: {inner_e}")
|
|
finally:
|
|
# Clear references to ensure proper garbage collection
|
|
source = None
|
|
# Force garbage collection before releasing the lock
|
|
gc.collect()
|
|
release_task_lock('refresh_epg_data', source_id)
|
|
|
|
|
|
def fetch_xmltv(source):
|
|
# Handle cases with local file but no URL
|
|
if not source.url and source.file_path and os.path.exists(source.file_path):
|
|
logger.info(f"Using existing local file for EPG source: {source.name} at {source.file_path}")
|
|
|
|
# Check if the existing file is compressed and we need to extract it
|
|
if source.file_path.endswith(('.gz', '.zip')) and not source.file_path.endswith('.xml'):
|
|
try:
|
|
# Define the path for the extracted file in the cache directory
|
|
cache_dir = os.path.join(settings.MEDIA_ROOT, "cached_epg")
|
|
os.makedirs(cache_dir, exist_ok=True)
|
|
xml_path = os.path.join(cache_dir, f"{source.id}.xml")
|
|
|
|
# Extract to the cache location keeping the original
|
|
extracted_path = extract_compressed_file(source.file_path, xml_path, delete_original=False)
|
|
|
|
if extracted_path:
|
|
logger.info(f"Extracted mapped compressed file to: {extracted_path}")
|
|
# Update to use extracted_file_path instead of changing file_path
|
|
source.extracted_file_path = extracted_path
|
|
source.save(update_fields=['extracted_file_path'])
|
|
else:
|
|
logger.error(f"Failed to extract mapped compressed file. Using original file: {source.file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to extract existing compressed file: {e}")
|
|
# Continue with the original file if extraction fails
|
|
|
|
# Set the status to success in the database
|
|
source.status = 'success'
|
|
source.save(update_fields=['status'])
|
|
|
|
# Send a download complete notification
|
|
send_epg_update(source.id, "downloading", 100, status="success")
|
|
|
|
# Return True to indicate successful fetch, processing will continue with parse_channels_only
|
|
return True
|
|
|
|
# Handle cases where no URL is provided and no valid file path exists
|
|
if not source.url:
|
|
# Update source status for missing URL
|
|
source.status = 'error'
|
|
source.last_message = "No URL provided and no valid local file exists"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(source.id, "downloading", 100, status="error", error="No URL provided and no valid local file exists")
|
|
return False
|
|
|
|
logger.info(f"Fetching XMLTV data from source: {source.name}")
|
|
try:
|
|
# Get default user agent from settings
|
|
default_user_agent_setting = CoreSettings.objects.filter(key='default-user-agent').first()
|
|
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0" # Fallback default
|
|
if default_user_agent_setting and default_user_agent_setting.value:
|
|
try:
|
|
user_agent_obj = UserAgent.objects.filter(id=int(default_user_agent_setting.value)).first()
|
|
if user_agent_obj and user_agent_obj.user_agent:
|
|
user_agent = user_agent_obj.user_agent
|
|
logger.debug(f"Using default user agent: {user_agent}")
|
|
except (ValueError, Exception) as e:
|
|
logger.warning(f"Error retrieving default user agent, using fallback: {e}")
|
|
|
|
headers = {
|
|
'User-Agent': user_agent
|
|
}
|
|
|
|
# Update status to fetching before starting download
|
|
source.status = 'fetching'
|
|
source.save(update_fields=['status'])
|
|
|
|
# Send initial download notification
|
|
send_epg_update(source.id, "downloading", 0)
|
|
|
|
# Use streaming response to track download progress
|
|
with requests.get(source.url, headers=headers, stream=True, timeout=60) as response:
|
|
# Handle 404 specifically
|
|
if response.status_code == 404:
|
|
logger.error(f"EPG URL not found (404): {source.url}")
|
|
# Update status to error in the database
|
|
source.status = 'error'
|
|
source.last_message = f"EPG source '{source.name}' returned 404 error - will retry on next scheduled run"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
|
|
# Notify users through the WebSocket about the EPG fetch failure
|
|
channel_layer = get_channel_layer()
|
|
async_to_sync(channel_layer.group_send)(
|
|
'updates',
|
|
{
|
|
'type': 'update',
|
|
'data': {
|
|
"success": False,
|
|
"type": "epg_fetch_error",
|
|
"source_id": source.id,
|
|
"source_name": source.name,
|
|
"error_code": 404,
|
|
"message": f"EPG source '{source.name}' returned 404 error - will retry on next scheduled run"
|
|
}
|
|
}
|
|
)
|
|
# Ensure we update the download progress to 100 with error status
|
|
send_epg_update(source.id, "downloading", 100, status="error", error="URL not found (404)")
|
|
return False
|
|
|
|
# For all other error status codes
|
|
if response.status_code >= 400:
|
|
error_message = f"HTTP error {response.status_code}"
|
|
user_message = f"EPG source '{source.name}' encountered HTTP error {response.status_code}"
|
|
|
|
# Update status to error in the database
|
|
source.status = 'error'
|
|
source.last_message = user_message
|
|
source.save(update_fields=['status', 'last_message'])
|
|
|
|
# Notify users through the WebSocket
|
|
channel_layer = get_channel_layer()
|
|
async_to_sync(channel_layer.group_send)(
|
|
'updates',
|
|
{
|
|
'type': 'update',
|
|
'data': {
|
|
"success": False,
|
|
"type": "epg_fetch_error",
|
|
"source_id": source.id,
|
|
"source_name": source.name,
|
|
"error_code": response.status_code,
|
|
"message": user_message
|
|
}
|
|
}
|
|
)
|
|
# Update download progress
|
|
send_epg_update(source.id, "downloading", 100, status="error", error=user_message)
|
|
return False
|
|
|
|
response.raise_for_status()
|
|
logger.debug("XMLTV data fetched successfully.")
|
|
|
|
# Define base paths for consistent file naming
|
|
cache_dir = os.path.join(settings.MEDIA_ROOT, "cached_epg")
|
|
os.makedirs(cache_dir, exist_ok=True)
|
|
|
|
# Create temporary download file with .tmp extension
|
|
temp_download_path = os.path.join(cache_dir, f"{source.id}.tmp")
|
|
|
|
# Check if we have content length for progress tracking
|
|
total_size = int(response.headers.get('content-length', 0))
|
|
downloaded = 0
|
|
start_time = time.time()
|
|
last_update_time = start_time
|
|
update_interval = 0.5 # Only update every 0.5 seconds
|
|
|
|
# Download to temporary file
|
|
with open(temp_download_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=16384): # Increased chunk size for better performance
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
downloaded += len(chunk)
|
|
elapsed_time = time.time() - start_time
|
|
|
|
# Calculate download speed in KB/s
|
|
speed = downloaded / elapsed_time / 1024 if elapsed_time > 0 else 0
|
|
|
|
# Calculate progress percentage
|
|
if total_size and total_size > 0:
|
|
progress = min(100, int((downloaded / total_size) * 100))
|
|
else:
|
|
# If no content length header, estimate progress
|
|
progress = min(95, int((downloaded / (10 * 1024 * 1024)) * 100)) # Assume 10MB if unknown
|
|
|
|
# Time remaining (in seconds)
|
|
time_remaining = (total_size - downloaded) / (speed * 1024) if speed > 0 and total_size > 0 else 0
|
|
|
|
# Only send updates at specified intervals to avoid flooding
|
|
current_time = time.time()
|
|
if current_time - last_update_time >= update_interval and progress > 0:
|
|
last_update_time = current_time
|
|
send_epg_update(
|
|
source.id,
|
|
"downloading",
|
|
progress,
|
|
speed=round(speed, 2),
|
|
elapsed_time=round(elapsed_time, 1),
|
|
time_remaining=round(time_remaining, 1),
|
|
downloaded=f"{downloaded / (1024 * 1024):.2f} MB"
|
|
)
|
|
|
|
# Explicitly delete the chunk to free memory immediately
|
|
del chunk
|
|
|
|
# Send completion notification
|
|
send_epg_update(source.id, "downloading", 100)
|
|
|
|
# Determine the appropriate file extension based on content detection
|
|
with open(temp_download_path, 'rb') as f:
|
|
content_sample = f.read(1024) # Just need the first 1KB to detect format
|
|
|
|
# Use our helper function to detect the format
|
|
format_type, is_compressed, file_extension = detect_file_format(
|
|
file_path=source.url, # Original URL as a hint
|
|
content=content_sample # Actual file content for detection
|
|
)
|
|
|
|
logger.debug(f"File format detection results: type={format_type}, compressed={is_compressed}, extension={file_extension}")
|
|
|
|
# Ensure consistent final paths
|
|
compressed_path = os.path.join(cache_dir, f"{source.id}{file_extension}" if is_compressed else f"{source.id}.compressed")
|
|
xml_path = os.path.join(cache_dir, f"{source.id}.xml")
|
|
|
|
# Clean up old files before saving new ones
|
|
if os.path.exists(compressed_path):
|
|
try:
|
|
os.remove(compressed_path)
|
|
logger.debug(f"Removed old compressed file: {compressed_path}")
|
|
except OSError as e:
|
|
logger.warning(f"Failed to remove old compressed file: {e}")
|
|
|
|
if os.path.exists(xml_path):
|
|
try:
|
|
os.remove(xml_path)
|
|
logger.debug(f"Removed old XML file: {xml_path}")
|
|
except OSError as e:
|
|
logger.warning(f"Failed to remove old XML file: {e}")
|
|
|
|
# Rename the temp file to appropriate final path
|
|
if is_compressed:
|
|
try:
|
|
os.rename(temp_download_path, compressed_path)
|
|
logger.debug(f"Renamed temp file to compressed file: {compressed_path}")
|
|
current_file_path = compressed_path
|
|
except OSError as e:
|
|
logger.error(f"Failed to rename temp file to compressed file: {e}")
|
|
current_file_path = temp_download_path # Fall back to using temp file
|
|
else:
|
|
try:
|
|
os.rename(temp_download_path, xml_path)
|
|
logger.debug(f"Renamed temp file to XML file: {xml_path}")
|
|
current_file_path = xml_path
|
|
except OSError as e:
|
|
logger.error(f"Failed to rename temp file to XML file: {e}")
|
|
current_file_path = temp_download_path # Fall back to using temp file
|
|
|
|
# Now extract the file if it's compressed
|
|
if is_compressed:
|
|
try:
|
|
logger.info(f"Extracting compressed file {current_file_path}")
|
|
send_epg_update(source.id, "extracting", 0, message="Extracting downloaded file")
|
|
|
|
# Always extract to the standard XML path - set delete_original to True to clean up
|
|
extracted = extract_compressed_file(current_file_path, xml_path, delete_original=True)
|
|
|
|
if extracted:
|
|
logger.info(f"Successfully extracted to {xml_path}, compressed file deleted")
|
|
send_epg_update(source.id, "extracting", 100, message=f"File extracted successfully, temporary file removed")
|
|
# Update to store only the extracted file path since the compressed file is now gone
|
|
source.file_path = xml_path
|
|
source.extracted_file_path = None
|
|
else:
|
|
logger.error("Extraction failed, using compressed file")
|
|
send_epg_update(source.id, "extracting", 100, status="error", message="Extraction failed, using compressed file")
|
|
# Use the compressed file
|
|
source.file_path = current_file_path
|
|
source.extracted_file_path = None
|
|
except Exception as e:
|
|
logger.error(f"Error extracting file: {str(e)}", exc_info=True)
|
|
send_epg_update(source.id, "extracting", 100, status="error", message=f"Error during extraction: {str(e)}")
|
|
# Use the compressed file if extraction fails
|
|
source.file_path = current_file_path
|
|
source.extracted_file_path = None
|
|
else:
|
|
# It's already an XML file
|
|
source.file_path = current_file_path
|
|
source.extracted_file_path = None
|
|
|
|
# Update the source's file paths
|
|
source.save(update_fields=['file_path', 'status', 'extracted_file_path'])
|
|
|
|
# Update status to parsing
|
|
source.status = 'parsing'
|
|
source.save(update_fields=['status'])
|
|
|
|
logger.info(f"Cached EPG file saved to {source.file_path}")
|
|
return True
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
logger.error(f"HTTP Error fetching XMLTV from {source.name}: {e}", exc_info=True)
|
|
|
|
# Get error details
|
|
status_code = e.response.status_code if hasattr(e, 'response') and e.response else 'unknown'
|
|
error_message = str(e)
|
|
|
|
# Create a user-friendly message
|
|
user_message = f"EPG source '{source.name}' encountered HTTP error {status_code}"
|
|
|
|
# Add specific handling for common HTTP errors
|
|
if status_code == 404:
|
|
user_message = f"EPG source '{source.name}' URL not found (404) - will retry on next scheduled run"
|
|
elif status_code == 401 or status_code == 403:
|
|
user_message = f"EPG source '{source.name}' access denied (HTTP {status_code}) - check credentials"
|
|
elif status_code == 429:
|
|
user_message = f"EPG source '{source.name}' rate limited (429) - try again later"
|
|
elif status_code >= 500:
|
|
user_message = f"EPG source '{source.name}' server error (HTTP {status_code}) - will retry later"
|
|
|
|
# Update source status to error with the error message
|
|
source.status = 'error'
|
|
source.last_message = user_message
|
|
source.save(update_fields=['status', 'last_message'])
|
|
|
|
# Notify users through the WebSocket about the EPG fetch failure
|
|
channel_layer = get_channel_layer()
|
|
async_to_sync(channel_layer.group_send)(
|
|
'updates',
|
|
{
|
|
'type': 'update',
|
|
'data': {
|
|
"success": False,
|
|
"type": "epg_fetch_error",
|
|
"source_id": source.id,
|
|
"source_name": source.name,
|
|
"error_code": status_code,
|
|
"message": user_message,
|
|
"details": error_message
|
|
}
|
|
}
|
|
)
|
|
|
|
# Ensure we update the download progress to 100 with error status
|
|
send_epg_update(source.id, "downloading", 100, status="error", error=user_message)
|
|
return False
|
|
except requests.exceptions.ConnectionError as e:
|
|
# Handle connection errors separately
|
|
error_message = str(e)
|
|
user_message = f"Connection error: Unable to connect to EPG source '{source.name}'"
|
|
logger.error(f"Connection error fetching XMLTV from {source.name}: {e}", exc_info=True)
|
|
|
|
# Update source status
|
|
source.status = 'error'
|
|
source.last_message = user_message
|
|
source.save(update_fields=['status', 'last_message'])
|
|
|
|
# Send notifications
|
|
channel_layer = get_channel_layer()
|
|
async_to_sync(channel_layer.group_send)(
|
|
'updates',
|
|
{
|
|
'type': 'update',
|
|
'data': {
|
|
"success": False,
|
|
"type": "epg_fetch_error",
|
|
"source_id": source.id,
|
|
"source_name": source.name,
|
|
"error_code": "connection_error",
|
|
"message": user_message
|
|
}
|
|
}
|
|
)
|
|
send_epg_update(source.id, "downloading", 100, status="error", error=user_message)
|
|
return False
|
|
except requests.exceptions.Timeout as e:
|
|
# Handle timeout errors specifically
|
|
error_message = str(e)
|
|
user_message = f"Timeout error: EPG source '{source.name}' took too long to respond"
|
|
logger.error(f"Timeout error fetching XMLTV from {source.name}: {e}", exc_info=True)
|
|
|
|
# Update source status
|
|
source.status = 'error'
|
|
source.last_message = user_message
|
|
source.save(update_fields=['status', 'last_message'])
|
|
|
|
# Send notifications
|
|
send_epg_update(source.id, "downloading", 100, status="error", error=user_message)
|
|
return False
|
|
except Exception as e:
|
|
error_message = str(e)
|
|
logger.error(f"Error fetching XMLTV from {source.name}: {e}", exc_info=True)
|
|
|
|
# Update source status for general exceptions too
|
|
source.status = 'error'
|
|
source.last_message = f"Error: {error_message}"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
|
|
# Ensure we update the download progress to 100 with error status
|
|
send_epg_update(source.id, "downloading", 100, status="error", error=f"Error: {error_message}")
|
|
return False
|
|
|
|
|
|
def extract_compressed_file(file_path, output_path=None, delete_original=False):
|
|
"""
|
|
Extracts a compressed file (.gz or .zip) to an XML file.
|
|
|
|
Args:
|
|
file_path: Path to the compressed file
|
|
output_path: Specific path where the file should be extracted (optional)
|
|
delete_original: Whether to delete the original compressed file after successful extraction
|
|
|
|
Returns:
|
|
Path to the extracted XML file, or None if extraction failed
|
|
"""
|
|
try:
|
|
if output_path is None:
|
|
base_path = os.path.splitext(file_path)[0]
|
|
extracted_path = f"{base_path}.xml"
|
|
else:
|
|
extracted_path = output_path
|
|
|
|
# Make sure the output path doesn't already exist
|
|
if os.path.exists(extracted_path):
|
|
try:
|
|
os.remove(extracted_path)
|
|
logger.info(f"Removed existing extracted file: {extracted_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove existing extracted file: {e}")
|
|
# If we can't delete the existing file and no specific output was requested,
|
|
# create a unique filename instead
|
|
if output_path is None:
|
|
base_path = os.path.splitext(file_path)[0]
|
|
extracted_path = f"{base_path}_{uuid.uuid4().hex[:8]}.xml"
|
|
|
|
# Use our detection helper to determine the file format instead of relying on extension
|
|
with open(file_path, 'rb') as f:
|
|
content_sample = f.read(4096) # Read a larger sample to ensure accurate detection
|
|
|
|
format_type, is_compressed, _ = detect_file_format(file_path=file_path, content=content_sample)
|
|
|
|
if format_type == 'gzip':
|
|
logger.debug(f"Extracting gzip file: {file_path}")
|
|
try:
|
|
# First check if the content is XML by reading a sample
|
|
with gzip.open(file_path, 'rb') as gz_file:
|
|
content_sample = gz_file.read(4096) # Read first 4KB for detection
|
|
detected_format, _, _ = detect_file_format(content=content_sample)
|
|
|
|
if detected_format != 'xml':
|
|
logger.warning(f"GZIP file does not appear to contain XML content: {file_path} (detected as: {detected_format})")
|
|
# Continue anyway since GZIP only contains one file
|
|
|
|
# Reset file pointer and extract the content
|
|
gz_file.seek(0)
|
|
with open(extracted_path, 'wb') as out_file:
|
|
while True:
|
|
chunk = gz_file.read(MAX_EXTRACT_CHUNK_SIZE)
|
|
if not chunk or len(chunk) == 0:
|
|
break
|
|
out_file.write(chunk)
|
|
except Exception as e:
|
|
logger.error(f"Error extracting GZIP file: {e}", exc_info=True)
|
|
return None
|
|
|
|
logger.info(f"Successfully extracted gzip file to: {extracted_path}")
|
|
|
|
# Delete original compressed file if requested
|
|
if delete_original:
|
|
try:
|
|
os.remove(file_path)
|
|
logger.info(f"Deleted original compressed file: {file_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete original compressed file {file_path}: {e}")
|
|
|
|
return extracted_path
|
|
|
|
elif format_type == 'zip':
|
|
logger.debug(f"Extracting zip file: {file_path}")
|
|
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
|
# Find the first XML file in the ZIP archive
|
|
xml_files = [f for f in zip_file.namelist() if f.lower().endswith('.xml')]
|
|
|
|
if not xml_files:
|
|
logger.info("No files with .xml extension found in ZIP archive, checking content of all files")
|
|
# Check content of each file to see if any are XML without proper extension
|
|
for filename in zip_file.namelist():
|
|
if not filename.endswith('/'): # Skip directories
|
|
try:
|
|
# Read a sample of the file content
|
|
content_sample = zip_file.read(filename, 4096) # Read up to 4KB for detection
|
|
format_type, _, _ = detect_file_format(content=content_sample)
|
|
if format_type == 'xml':
|
|
logger.info(f"Found XML content in file without .xml extension: {filename}")
|
|
xml_files = [filename]
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"Error reading file {filename} from ZIP: {e}")
|
|
|
|
if not xml_files:
|
|
logger.error("No XML file found in ZIP archive")
|
|
return None
|
|
|
|
# Extract the first XML file
|
|
with open(extracted_path, 'wb') as out_file:
|
|
with zip_file.open(xml_files[0], "r") as xml_file:
|
|
while True:
|
|
chunk = xml_file.read(MAX_EXTRACT_CHUNK_SIZE)
|
|
if not chunk or len(chunk) == 0:
|
|
break
|
|
out_file.write(chunk)
|
|
|
|
logger.info(f"Successfully extracted zip file to: {extracted_path}")
|
|
|
|
# Delete original compressed file if requested
|
|
if delete_original:
|
|
try:
|
|
os.remove(file_path)
|
|
logger.info(f"Deleted original compressed file: {file_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete original compressed file {file_path}: {e}")
|
|
|
|
return extracted_path
|
|
|
|
else:
|
|
logger.error(f"Unsupported or unrecognized compressed file format: {file_path} (detected as: {format_type})")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting {file_path}: {str(e)}", exc_info=True)
|
|
return None
|
|
|
|
|
|
def parse_channels_only(source):
|
|
# Use extracted file if available, otherwise use the original file path
|
|
file_path = source.extracted_file_path if source.extracted_file_path else source.file_path
|
|
if not file_path:
|
|
file_path = source.get_cache_file()
|
|
|
|
# Send initial parsing notification
|
|
send_epg_update(source.id, "parsing_channels", 0)
|
|
|
|
process = None
|
|
should_log_memory = False
|
|
|
|
try:
|
|
# Check if the file exists
|
|
if not os.path.exists(file_path):
|
|
logger.error(f"EPG file does not exist at path: {file_path}")
|
|
|
|
# Update the source's file_path to the default cache location
|
|
new_path = source.get_cache_file()
|
|
logger.info(f"Updating file_path from '{file_path}' to '{new_path}'")
|
|
source.file_path = new_path
|
|
source.save(update_fields=['file_path'])
|
|
|
|
# If the source has a URL, fetch the data before continuing
|
|
if source.url:
|
|
logger.info(f"Fetching new EPG data from URL: {source.url}")
|
|
fetch_success = fetch_xmltv(source) # Store the result
|
|
|
|
# Only proceed if fetch was successful AND file exists
|
|
if not fetch_success:
|
|
logger.error(f"Failed to fetch EPG data from URL: {source.url}")
|
|
# Update status to error
|
|
source.status = 'error'
|
|
source.last_message = f"Failed to fetch EPG data from URL"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
# Send error notification
|
|
send_epg_update(source.id, "parsing_channels", 100, status="error", error="Failed to fetch EPG data")
|
|
return False
|
|
|
|
# Verify the file was downloaded successfully
|
|
if not os.path.exists(source.file_path):
|
|
logger.error(f"Failed to fetch EPG data, file still missing at: {source.file_path}")
|
|
# Update status to error
|
|
source.status = 'error'
|
|
source.last_message = f"Failed to fetch EPG data, file missing after download"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(source.id, "parsing_channels", 100, status="error", error="File not found after download")
|
|
return False
|
|
|
|
# Update file_path with the new location
|
|
file_path = source.file_path
|
|
else:
|
|
logger.error(f"No URL provided for EPG source {source.name}, cannot fetch new data")
|
|
# Update status to error
|
|
source.status = 'error'
|
|
source.last_message = f"No URL provided, cannot fetch EPG data"
|
|
source.save(update_fields=['updated_at'])
|
|
|
|
# Initialize process variable for memory tracking only in debug mode
|
|
try:
|
|
process = None
|
|
# Get current log level as a number
|
|
current_log_level = logger.getEffectiveLevel()
|
|
|
|
# Only track memory usage when log level is DEBUG (10) or more verbose
|
|
# This is more future-proof than string comparisons
|
|
should_log_memory = current_log_level <= logging.DEBUG or settings.DEBUG
|
|
|
|
if should_log_memory:
|
|
process = psutil.Process()
|
|
initial_memory = process.memory_info().rss / 1024 / 1024
|
|
logger.debug(f"[parse_channels_only] Initial memory usage: {initial_memory:.2f} MB")
|
|
except (ImportError, NameError):
|
|
process = None
|
|
should_log_memory = False
|
|
logger.warning("psutil not available for memory tracking")
|
|
|
|
# Replace full dictionary load with more efficient lookup set
|
|
existing_tvg_ids = set()
|
|
existing_epgs = {} # Initialize the dictionary that will lazily load objects
|
|
last_id = 0
|
|
chunk_size = 5000
|
|
|
|
while True:
|
|
tvg_id_chunk = set(EPGData.objects.filter(
|
|
epg_source=source,
|
|
id__gt=last_id
|
|
).order_by('id').values_list('tvg_id', flat=True)[:chunk_size])
|
|
|
|
if not tvg_id_chunk:
|
|
break
|
|
|
|
existing_tvg_ids.update(tvg_id_chunk)
|
|
last_id = EPGData.objects.filter(tvg_id__in=tvg_id_chunk).order_by('-id')[0].id
|
|
# Update progress to show file read starting
|
|
send_epg_update(source.id, "parsing_channels", 10)
|
|
|
|
# Stream parsing instead of loading entire file at once
|
|
# This can be simplified since we now always have XML files
|
|
epgs_to_create = []
|
|
epgs_to_update = []
|
|
total_channels = 0
|
|
processed_channels = 0
|
|
batch_size = 500 # Process in batches to limit memory usage
|
|
progress = 0 # Initialize progress variable here
|
|
icon_url_max_length = EPGData._meta.get_field('icon_url').max_length # Get max length for icon_url field
|
|
|
|
# Track memory at key points
|
|
if process:
|
|
logger.debug(f"[parse_channels_only] Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
|
|
try:
|
|
# Attempt to count existing channels in the database
|
|
try:
|
|
total_channels = EPGData.objects.filter(epg_source=source).count()
|
|
logger.info(f"Found {total_channels} existing channels for this source")
|
|
except Exception as e:
|
|
logger.error(f"Error counting channels: {e}")
|
|
total_channels = 500 # Default estimate
|
|
if process:
|
|
logger.debug(f"[parse_channels_only] Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
|
|
# Update progress after counting
|
|
send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels)
|
|
|
|
# Open the file - no need to check file type since it's always XML now
|
|
logger.debug(f"Opening file for channel parsing: {file_path}")
|
|
source_file = open(file_path, 'rb')
|
|
|
|
if process:
|
|
logger.debug(f"[parse_channels_only] Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
|
|
# Change iterparse to look for both channel and programme elements
|
|
logger.debug(f"Creating iterparse context for channels and programmes")
|
|
channel_parser = etree.iterparse(source_file, events=('end',), tag=('channel', 'programme'), remove_blank_text=True, recover=True)
|
|
if process:
|
|
logger.debug(f"[parse_channels_only] Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
|
|
channel_count = 0
|
|
total_elements_processed = 0 # Track total elements processed, not just channels
|
|
for _, elem in channel_parser:
|
|
total_elements_processed += 1
|
|
# Only process channel elements
|
|
if elem.tag == 'channel':
|
|
channel_count += 1
|
|
tvg_id = elem.get('id', '').strip()
|
|
if tvg_id:
|
|
display_name = None
|
|
icon_url = None
|
|
for child in elem:
|
|
if display_name is None and child.tag == 'display-name' and child.text:
|
|
display_name = child.text.strip()
|
|
elif child.tag == 'icon':
|
|
raw_icon_url = child.get('src', '').strip()
|
|
icon_url = validate_icon_url_fast(raw_icon_url, icon_url_max_length)
|
|
if display_name and icon_url:
|
|
break # No need to continue if we have both
|
|
|
|
if not display_name:
|
|
display_name = tvg_id
|
|
|
|
# Use lazy loading approach to reduce memory usage
|
|
if tvg_id in existing_tvg_ids:
|
|
# Only fetch the object if we need to update it and it hasn't been loaded yet
|
|
if tvg_id not in existing_epgs:
|
|
try:
|
|
# This loads the full EPG object from the database and caches it
|
|
existing_epgs[tvg_id] = EPGData.objects.get(tvg_id=tvg_id, epg_source=source)
|
|
except EPGData.DoesNotExist:
|
|
# Handle race condition where record was deleted
|
|
existing_tvg_ids.remove(tvg_id)
|
|
epgs_to_create.append(EPGData(
|
|
tvg_id=tvg_id,
|
|
name=display_name,
|
|
icon_url=icon_url,
|
|
epg_source=source,
|
|
))
|
|
logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 1: {tvg_id} - {display_name}")
|
|
processed_channels += 1
|
|
continue
|
|
|
|
# We use the cached object to check if the name or icon_url has changed
|
|
epg_obj = existing_epgs[tvg_id]
|
|
needs_update = False
|
|
if epg_obj.name != display_name:
|
|
epg_obj.name = display_name
|
|
needs_update = True
|
|
if epg_obj.icon_url != icon_url:
|
|
epg_obj.icon_url = icon_url
|
|
needs_update = True
|
|
|
|
if needs_update:
|
|
epgs_to_update.append(epg_obj)
|
|
logger.debug(f"[parse_channels_only] Added channel to update to epgs_to_update: {tvg_id} - {display_name}")
|
|
else:
|
|
# No changes needed, just clear the element
|
|
logger.debug(f"[parse_channels_only] No changes needed for channel {tvg_id} - {display_name}")
|
|
else:
|
|
# This is a new channel that doesn't exist in our database
|
|
epgs_to_create.append(EPGData(
|
|
tvg_id=tvg_id,
|
|
name=display_name,
|
|
icon_url=icon_url,
|
|
epg_source=source,
|
|
))
|
|
logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 2: {tvg_id} - {display_name}")
|
|
|
|
processed_channels += 1
|
|
|
|
# Batch processing
|
|
if len(epgs_to_create) >= batch_size:
|
|
logger.info(f"[parse_channels_only] Bulk creating {len(epgs_to_create)} EPG entries")
|
|
EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True)
|
|
if process:
|
|
logger.info(f"[parse_channels_only] Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
del epgs_to_create # Explicit deletion
|
|
epgs_to_create = []
|
|
cleanup_memory(log_usage=should_log_memory, force_collection=True)
|
|
if process:
|
|
logger.info(f"[parse_channels_only] Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
|
|
if len(epgs_to_update) >= batch_size:
|
|
logger.info(f"[parse_channels_only] Bulk updating {len(epgs_to_update)} EPG entries")
|
|
if process:
|
|
logger.info(f"[parse_channels_only] Memory before bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
EPGData.objects.bulk_update(epgs_to_update, ["name", "icon_url"])
|
|
if process:
|
|
logger.info(f"[parse_channels_only] Memory after bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
epgs_to_update = []
|
|
# Force garbage collection
|
|
cleanup_memory(log_usage=should_log_memory, force_collection=True)
|
|
|
|
# Periodically clear the existing_epgs cache to prevent memory buildup
|
|
if processed_channels % 1000 == 0:
|
|
logger.info(f"[parse_channels_only] Clearing existing_epgs cache at {processed_channels} channels")
|
|
existing_epgs.clear()
|
|
cleanup_memory(log_usage=should_log_memory, force_collection=True)
|
|
if process:
|
|
logger.info(f"[parse_channels_only] Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
|
|
# Send progress updates
|
|
if processed_channels % 100 == 0 or processed_channels == total_channels:
|
|
progress = 25 + int((processed_channels / total_channels) * 65) if total_channels > 0 else 90
|
|
send_epg_update(
|
|
source.id,
|
|
"parsing_channels",
|
|
progress,
|
|
processed=processed_channels,
|
|
total=total_channels
|
|
)
|
|
if processed_channels > total_channels:
|
|
logger.debug(f"[parse_channels_only] Processed channel {tvg_id} - processed {processed_channels - total_channels} additional channels")
|
|
else:
|
|
logger.debug(f"[parse_channels_only] Processed channel {tvg_id} - processed {processed_channels}/{total_channels}")
|
|
if process:
|
|
logger.debug(f"[parse_channels_only] Memory before elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
# Clear memory
|
|
try:
|
|
# First clear the element's content
|
|
clear_element(elem)
|
|
|
|
except Exception as e:
|
|
# Just log the error and continue - don't let cleanup errors stop processing
|
|
logger.debug(f"[parse_channels_only] Non-critical error during XML element cleanup: {e}")
|
|
if process:
|
|
logger.debug(f"[parse_channels_only] Memory after elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
|
|
logger.debug(f"[parse_channels_only] Total elements processed: {total_elements_processed}")
|
|
|
|
else:
|
|
logger.trace(f"[parse_channels_only] Skipping non-channel element: {elem.get('channel', 'unknown')} - {elem.get('start', 'unknown')} {elem.tag}")
|
|
clear_element(elem)
|
|
continue
|
|
|
|
except (etree.XMLSyntaxError, Exception) as xml_error:
|
|
logger.error(f"[parse_channels_only] XML parsing failed: {xml_error}")
|
|
# Update status to error
|
|
source.status = 'error'
|
|
source.last_message = f"Error parsing XML file: {str(xml_error)}"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(xml_error))
|
|
return False
|
|
if process:
|
|
logger.info(f"[parse_channels_only] Processed {processed_channels} channels current memory: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
else:
|
|
logger.info(f"[parse_channels_only] Processed {processed_channels} channels")
|
|
# Process any remaining items
|
|
if epgs_to_create:
|
|
EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True)
|
|
logger.debug(f"[parse_channels_only] Created final batch of {len(epgs_to_create)} EPG entries")
|
|
|
|
if epgs_to_update:
|
|
EPGData.objects.bulk_update(epgs_to_update, ["name", "icon_url"])
|
|
logger.debug(f"[parse_channels_only] Updated final batch of {len(epgs_to_update)} EPG entries")
|
|
if process:
|
|
logger.debug(f"[parse_channels_only] Memory after final batch creation: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
|
|
# Update source status with channel count
|
|
source.status = 'success'
|
|
source.last_message = f"Successfully parsed {processed_channels} channels"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
|
|
# Send completion notification
|
|
send_epg_update(
|
|
source.id,
|
|
"parsing_channels",
|
|
100,
|
|
status="success",
|
|
channels_count=processed_channels
|
|
)
|
|
|
|
send_websocket_update('updates', 'update', {"success": True, "type": "epg_channels"})
|
|
|
|
logger.info(f"Finished parsing channel info. Found {processed_channels} channels.")
|
|
|
|
return True
|
|
|
|
except FileNotFoundError:
|
|
logger.error(f"EPG file not found at: {file_path}")
|
|
# Update status to error
|
|
source.status = 'error'
|
|
source.last_message = f"EPG file not found: {file_path}"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(source.id, "parsing_channels", 100, status="error", error="File not found")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error reading EPG file {file_path}: {e}", exc_info=True)
|
|
# Update status to error
|
|
source.status = 'error'
|
|
source.last_message = f"Error parsing EPG file: {str(e)}"
|
|
source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(e))
|
|
return False
|
|
finally:
|
|
# Cleanup memory and close file
|
|
if process:
|
|
logger.debug(f"[parse_channels_only] Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
|
try:
|
|
# Output any errors in the channel_parser error log
|
|
if 'channel_parser' in locals() and hasattr(channel_parser, 'error_log') and len(channel_parser.error_log) > 0:
|
|
logger.debug(f"XML parser errors found ({len(channel_parser.error_log)} total):")
|
|
for i, error in enumerate(channel_parser.error_log):
|
|
logger.debug(f" Error {i+1}: {error}")
|
|
if 'channel_parser' in locals():
|
|
del channel_parser
|
|
if 'elem' in locals():
|
|
del elem
|
|
if 'parent' in locals():
|
|
del parent
|
|
|
|
if 'source_file' in locals():
|
|
source_file.close()
|
|
del source_file
|
|
# Clear remaining large data structures
|
|
existing_epgs.clear()
|
|
epgs_to_create.clear()
|
|
epgs_to_update.clear()
|
|
existing_epgs = None
|
|
epgs_to_create = None
|
|
epgs_to_update = None
|
|
cleanup_memory(log_usage=should_log_memory, force_collection=True)
|
|
except Exception as e:
|
|
logger.warning(f"Cleanup error: {e}")
|
|
|
|
try:
|
|
if process:
|
|
final_memory = process.memory_info().rss / 1024 / 1024
|
|
logger.debug(f"[parse_channels_only] Final memory usage: {final_memory:.2f} MB")
|
|
process = None
|
|
except:
|
|
pass
|
|
|
|
|
|
|
|
@shared_task
|
|
def parse_programs_for_tvg_id(epg_id):
|
|
if not acquire_task_lock('parse_epg_programs', epg_id):
|
|
logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task")
|
|
return "Task already running"
|
|
|
|
source_file = None
|
|
program_parser = None
|
|
programs_to_create = []
|
|
programs_processed = 0
|
|
try:
|
|
# Add memory tracking only in trace mode or higher
|
|
try:
|
|
process = None
|
|
# Get current log level as a number
|
|
current_log_level = logger.getEffectiveLevel()
|
|
|
|
# Only track memory usage when log level is TRACE or more verbose or if running in DEBUG mode
|
|
should_log_memory = current_log_level <= 5 or settings.DEBUG
|
|
|
|
if should_log_memory:
|
|
process = psutil.Process()
|
|
initial_memory = process.memory_info().rss / 1024 / 1024
|
|
logger.info(f"[parse_programs_for_tvg_id] Initial memory usage: {initial_memory:.2f} MB")
|
|
mem_before = initial_memory
|
|
except ImportError:
|
|
process = None
|
|
should_log_memory = False
|
|
|
|
epg = EPGData.objects.get(id=epg_id)
|
|
epg_source = epg.epg_source
|
|
|
|
if not Channel.objects.filter(epg_data=epg).exists():
|
|
logger.info(f"No channels matched to EPG {epg.tvg_id}")
|
|
release_task_lock('parse_epg_programs', epg_id)
|
|
return
|
|
|
|
logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}")
|
|
|
|
# Optimize deletion with a single delete query instead of chunking
|
|
# This is faster for most database engines
|
|
ProgramData.objects.filter(epg=epg).delete()
|
|
|
|
file_path = epg_source.extracted_file_path if epg_source.extracted_file_path else epg_source.file_path
|
|
if not file_path:
|
|
file_path = epg_source.get_cache_file()
|
|
|
|
# Check if the file exists
|
|
if not os.path.exists(file_path):
|
|
logger.error(f"EPG file not found at: {file_path}")
|
|
|
|
if epg_source.url:
|
|
# Update the file path in the database
|
|
new_path = epg_source.get_cache_file()
|
|
logger.info(f"Updating file_path from '{file_path}' to '{new_path}'")
|
|
epg_source.file_path = new_path
|
|
epg_source.save(update_fields=['file_path'])
|
|
logger.info(f"Fetching new EPG data from URL: {epg_source.url}")
|
|
else:
|
|
logger.info(f"EPG source does not have a URL, using existing file path: {file_path} to rebuild cache")
|
|
|
|
# Fetch new data before continuing
|
|
if epg_source:
|
|
|
|
# Properly check the return value from fetch_xmltv
|
|
fetch_success = fetch_xmltv(epg_source)
|
|
|
|
# If fetch was not successful or the file still doesn't exist, abort
|
|
if not fetch_success:
|
|
logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}")
|
|
# Update status to error if not already set
|
|
epg_source.status = 'error'
|
|
epg_source.last_message = f"Failed to download EPG data, cannot parse programs"
|
|
epg_source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file")
|
|
release_task_lock('parse_epg_programs', epg_id)
|
|
return
|
|
|
|
# Also check if the file exists after download
|
|
if not os.path.exists(epg_source.file_path):
|
|
logger.error(f"Failed to fetch EPG data, file still missing at: {epg_source.file_path}")
|
|
epg_source.status = 'error'
|
|
epg_source.last_message = f"Failed to download EPG data, file missing after download"
|
|
epg_source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download")
|
|
release_task_lock('parse_epg_programs', epg_id)
|
|
return
|
|
|
|
# Update file_path with the new location
|
|
if epg_source.extracted_file_path:
|
|
file_path = epg_source.extracted_file_path
|
|
else:
|
|
file_path = epg_source.file_path
|
|
else:
|
|
logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data")
|
|
# Update status to error
|
|
epg_source.status = 'error'
|
|
epg_source.last_message = f"No URL provided, cannot fetch EPG data"
|
|
epg_source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided")
|
|
release_task_lock('parse_epg_programs', epg_id)
|
|
return
|
|
|
|
# Use streaming parsing to reduce memory usage
|
|
# No need to check file type anymore since it's always XML
|
|
logger.debug(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}")
|
|
|
|
# Memory usage tracking
|
|
if process:
|
|
try:
|
|
mem_before = process.memory_info().rss / 1024 / 1024
|
|
logger.debug(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB")
|
|
except Exception as e:
|
|
logger.warning(f"Error tracking memory: {e}")
|
|
mem_before = 0
|
|
|
|
programs_to_create = []
|
|
batch_size = 1000 # Process in batches to limit memory usage
|
|
|
|
try:
|
|
# Open the file directly - no need to check compression
|
|
logger.debug(f"Opening file for parsing: {file_path}")
|
|
source_file = open(file_path, 'rb')
|
|
|
|
# Stream parse the file using lxml's iterparse
|
|
program_parser = etree.iterparse(source_file, events=('end',), tag='programme', remove_blank_text=True, recover=True)
|
|
|
|
for _, elem in program_parser:
|
|
if elem.get('channel') == epg.tvg_id:
|
|
try:
|
|
start_time = parse_xmltv_time(elem.get('start'))
|
|
end_time = parse_xmltv_time(elem.get('stop'))
|
|
title = None
|
|
desc = None
|
|
sub_title = None
|
|
|
|
# Efficiently process child elements
|
|
for child in elem:
|
|
if child.tag == 'title':
|
|
title = child.text or 'No Title'
|
|
elif child.tag == 'desc':
|
|
desc = child.text or ''
|
|
elif child.tag == 'sub-title':
|
|
sub_title = child.text or ''
|
|
|
|
if not title:
|
|
title = 'No Title'
|
|
|
|
# Extract custom properties
|
|
custom_props = extract_custom_properties(elem)
|
|
custom_properties_json = None
|
|
|
|
if custom_props:
|
|
logger.trace(f"Number of custom properties: {len(custom_props)}")
|
|
custom_properties_json = custom_props
|
|
|
|
programs_to_create.append(ProgramData(
|
|
epg=epg,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
title=title,
|
|
description=desc,
|
|
sub_title=sub_title,
|
|
tvg_id=epg.tvg_id,
|
|
custom_properties=custom_properties_json
|
|
))
|
|
programs_processed += 1
|
|
# Clear the element to free memory
|
|
clear_element(elem)
|
|
# Batch processing
|
|
if len(programs_to_create) >= batch_size:
|
|
ProgramData.objects.bulk_create(programs_to_create)
|
|
logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}")
|
|
programs_to_create = []
|
|
# Only call gc.collect() every few batches
|
|
if programs_processed % (batch_size * 5) == 0:
|
|
gc.collect()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing program for {epg.tvg_id}: {e}", exc_info=True)
|
|
else:
|
|
# Immediately clean up non-matching elements to reduce memory pressure
|
|
if elem is not None:
|
|
clear_element(elem)
|
|
continue
|
|
|
|
# Make sure to close the file and release parser resources
|
|
if source_file:
|
|
source_file.close()
|
|
source_file = None
|
|
|
|
if program_parser:
|
|
program_parser = None
|
|
|
|
gc.collect()
|
|
|
|
except zipfile.BadZipFile as zip_error:
|
|
logger.error(f"Bad ZIP file: {zip_error}")
|
|
raise
|
|
except etree.XMLSyntaxError as xml_error:
|
|
logger.error(f"XML syntax error parsing program data: {xml_error}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error parsing XML for programs: {e}", exc_info=True)
|
|
raise
|
|
finally:
|
|
# Ensure file is closed even if an exception occurs
|
|
if source_file:
|
|
source_file.close()
|
|
source_file = None
|
|
# Memory tracking after processing
|
|
if process:
|
|
try:
|
|
mem_after = process.memory_info().rss / 1024 / 1024
|
|
logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 1 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)")
|
|
except Exception as e:
|
|
logger.warning(f"Error tracking memory: {e}")
|
|
|
|
# Process any remaining items
|
|
if programs_to_create:
|
|
ProgramData.objects.bulk_create(programs_to_create)
|
|
logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}")
|
|
programs_to_create = None
|
|
custom_props = None
|
|
custom_properties_json = None
|
|
|
|
|
|
logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.")
|
|
finally:
|
|
# Reset internal caches and pools that lxml might be keeping
|
|
try:
|
|
etree.clear_error_log()
|
|
except:
|
|
pass
|
|
# Explicit cleanup of all potentially large objects
|
|
if source_file:
|
|
try:
|
|
source_file.close()
|
|
except:
|
|
pass
|
|
source_file = None
|
|
program_parser = None
|
|
programs_to_create = None
|
|
|
|
epg_source = None
|
|
# Add comprehensive cleanup before releasing lock
|
|
cleanup_memory(log_usage=should_log_memory, force_collection=True)
|
|
# Memory tracking after processing
|
|
if process:
|
|
try:
|
|
mem_after = process.memory_info().rss / 1024 / 1024
|
|
logger.info(f"[parse_programs_for_tvg_id] Final memory usage {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)")
|
|
except Exception as e:
|
|
logger.warning(f"Error tracking memory: {e}")
|
|
process = None
|
|
epg = None
|
|
programs_processed = None
|
|
release_task_lock('parse_epg_programs', epg_id)
|
|
|
|
|
|
|
|
def parse_programs_for_source(epg_source, tvg_id=None):
|
|
# Send initial programs parsing notification
|
|
send_epg_update(epg_source.id, "parsing_programs", 0)
|
|
should_log_memory = False
|
|
process = None
|
|
initial_memory = 0
|
|
|
|
# Add memory tracking only in trace mode or higher
|
|
try:
|
|
# Get current log level as a number
|
|
current_log_level = logger.getEffectiveLevel()
|
|
|
|
# Only track memory usage when log level is TRACE or more verbose
|
|
should_log_memory = current_log_level <= 5 or settings.DEBUG # Assuming TRACE is level 5 or lower
|
|
|
|
if should_log_memory:
|
|
process = psutil.Process()
|
|
initial_memory = process.memory_info().rss / 1024 / 1024
|
|
logger.info(f"[parse_programs_for_source] Initial memory usage: {initial_memory:.2f} MB")
|
|
except ImportError:
|
|
logger.warning("psutil not available for memory tracking")
|
|
process = None
|
|
should_log_memory = False
|
|
|
|
try:
|
|
# Process EPG entries in batches rather than all at once
|
|
batch_size = 20 # Process fewer channels at once to reduce memory usage
|
|
epg_count = EPGData.objects.filter(epg_source=epg_source).count()
|
|
|
|
if epg_count == 0:
|
|
logger.info(f"No EPG entries found for source: {epg_source.name}")
|
|
# Update status - this is not an error, just no entries
|
|
epg_source.status = 'success'
|
|
epg_source.save(update_fields=['status'])
|
|
send_epg_update(epg_source.id, "parsing_programs", 100, status="success")
|
|
return True
|
|
|
|
logger.info(f"Parsing programs for {epg_count} EPG entries from source: {epg_source.name}")
|
|
|
|
failed_entries = []
|
|
program_count = 0
|
|
channel_count = 0
|
|
updated_count = 0
|
|
processed = 0
|
|
# Process in batches using cursor-based approach to limit memory usage
|
|
last_id = 0
|
|
while True:
|
|
# Get a batch of EPG entries
|
|
batch_entries = list(EPGData.objects.filter(
|
|
epg_source=epg_source,
|
|
id__gt=last_id
|
|
).order_by('id')[:batch_size])
|
|
|
|
if not batch_entries:
|
|
break # No more entries to process
|
|
|
|
# Update last_id for next iteration
|
|
last_id = batch_entries[-1].id
|
|
|
|
# Process this batch
|
|
for epg in batch_entries:
|
|
if epg.tvg_id:
|
|
try:
|
|
result = parse_programs_for_tvg_id(epg.id)
|
|
if result == "Task already running":
|
|
logger.info(f"Program parse for {epg.id} already in progress, skipping")
|
|
|
|
processed += 1
|
|
progress = min(95, int((processed / epg_count) * 100)) if epg_count > 0 else 50
|
|
send_epg_update(epg_source.id, "parsing_programs", progress)
|
|
except Exception as e:
|
|
logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True)
|
|
failed_entries.append(f"{epg.tvg_id}: {str(e)}")
|
|
|
|
# Force garbage collection after each batch
|
|
batch_entries = None # Remove reference to help garbage collection
|
|
gc.collect()
|
|
|
|
# If there were failures, include them in the message but continue
|
|
if failed_entries:
|
|
epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed
|
|
error_summary = f"Failed to parse {len(failed_entries)} of {epg_count} entries"
|
|
stats_summary = f"Processed {program_count} programs across {channel_count} channels. Updated: {updated_count}."
|
|
epg_source.last_message = f"{stats_summary} Warning: {error_summary}"
|
|
epg_source.updated_at = timezone.now()
|
|
epg_source.save(update_fields=['status', 'last_message', 'updated_at'])
|
|
|
|
# Send completion notification with mixed status
|
|
send_epg_update(epg_source.id, "parsing_programs", 100,
|
|
status="success",
|
|
message=epg_source.last_message)
|
|
|
|
# Explicitly release memory of large lists before returning
|
|
del failed_entries
|
|
gc.collect()
|
|
|
|
return True
|
|
|
|
# If all successful, set a comprehensive success message
|
|
epg_source.status = EPGSource.STATUS_SUCCESS
|
|
epg_source.last_message = f"Successfully processed {program_count} programs across {channel_count} channels. Updated: {updated_count}."
|
|
epg_source.updated_at = timezone.now()
|
|
epg_source.save(update_fields=['status', 'last_message', 'updated_at'])
|
|
|
|
# Send completion notification with status
|
|
send_epg_update(epg_source.id, "parsing_programs", 100,
|
|
status="success",
|
|
message=epg_source.last_message)
|
|
|
|
logger.info(f"Completed parsing all programs for source: {epg_source.name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in parse_programs_for_source: {e}", exc_info=True)
|
|
# Update status to error
|
|
epg_source.status = EPGSource.STATUS_ERROR
|
|
epg_source.last_message = f"Error parsing programs: {str(e)}"
|
|
epg_source.save(update_fields=['status', 'last_message'])
|
|
send_epg_update(epg_source.id, "parsing_programs", 100,
|
|
status="error",
|
|
message=epg_source.last_message)
|
|
return False
|
|
finally:
|
|
# Final memory cleanup and tracking
|
|
|
|
|
|
# Explicitly release any remaining large data structures
|
|
failed_entries = None
|
|
program_count = None
|
|
channel_count = None
|
|
updated_count = None
|
|
processed = None
|
|
gc.collect()
|
|
|
|
# Add comprehensive memory cleanup at the end
|
|
cleanup_memory(log_usage=should_log_memory, force_collection=True)
|
|
if process:
|
|
final_memory = process.memory_info().rss / 1024 / 1024
|
|
logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB difference: {final_memory - initial_memory:.2f} MB")
|
|
# Explicitly clear the process object to prevent potential memory leaks
|
|
process = None
|
|
def fetch_schedules_direct(source):
|
|
logger.info(f"Fetching Schedules Direct data from source: {source.name}")
|
|
try:
|
|
# Get default user agent from settings
|
|
default_user_agent_setting = CoreSettings.objects.filter(key='default-user-agent').first()
|
|
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0" # Fallback default
|
|
|
|
if default_user_agent_setting and default_user_agent_setting.value:
|
|
try:
|
|
user_agent_obj = UserAgent.objects.filter(id=int(default_user_agent_setting.value)).first()
|
|
if user_agent_obj and user_agent_obj.user_agent:
|
|
user_agent = user_agent_obj.user_agent
|
|
logger.debug(f"Using default user agent: {user_agent}")
|
|
except (ValueError, Exception) as e:
|
|
logger.warning(f"Error retrieving default user agent, using fallback: {e}")
|
|
|
|
api_url = ''
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {source.api_key}',
|
|
'User-Agent': user_agent
|
|
}
|
|
logger.debug(f"Requesting subscriptions from Schedules Direct using URL: {api_url}")
|
|
response = requests.get(api_url, headers=headers, timeout=30)
|
|
response.raise_for_status()
|
|
subscriptions = response.json()
|
|
logger.debug(f"Fetched subscriptions: {subscriptions}")
|
|
|
|
for sub in subscriptions:
|
|
tvg_id = sub.get('stationID')
|
|
logger.debug(f"Processing subscription for tvg_id: {tvg_id}")
|
|
schedules_url = f"/schedules/{tvg_id}"
|
|
logger.debug(f"Requesting schedules from URL: {schedules_url}")
|
|
sched_response = requests.get(schedules_url, headers=headers, timeout=30)
|
|
sched_response.raise_for_status()
|
|
schedules = sched_response.json()
|
|
logger.debug(f"Fetched schedules: {schedules}")
|
|
|
|
epg_data, created = EPGData.objects.get_or_create(
|
|
tvg_id=tvg_id,
|
|
defaults={'name': tvg_id}
|
|
)
|
|
if created:
|
|
logger.info(f"Created new EPGData for tvg_id '{tvg_id}'.")
|
|
else:
|
|
logger.debug(f"Found existing EPGData for tvg_id '{tvg_id}'.")
|
|
|
|
for sched in schedules.get('schedules', []):
|
|
title = sched.get('title', 'No Title')
|
|
desc = sched.get('description', '')
|
|
start_time = parse_schedules_direct_time(sched.get('startTime'))
|
|
end_time = parse_schedules_direct_time(sched.get('endTime'))
|
|
obj, created = ProgramData.objects.update_or_create(
|
|
epg=epg_data,
|
|
start_time=start_time,
|
|
title=title,
|
|
defaults={
|
|
'end_time': end_time,
|
|
'description': desc,
|
|
'sub_title': ''
|
|
}
|
|
)
|
|
if created:
|
|
logger.info(f"Created ProgramData '{title}' for tvg_id '{tvg_id}'.")
|
|
else:
|
|
logger.info(f"Updated ProgramData '{title}' for tvg_id '{tvg_id}'.")
|
|
except Exception as e:
|
|
logger.error(f"Error fetching Schedules Direct data from {source.name}: {e}", exc_info=True)
|
|
|
|
|
|
# -------------------------------
|
|
# Helper parse functions
|
|
# -------------------------------
|
|
def parse_xmltv_time(time_str):
|
|
try:
|
|
# Basic format validation
|
|
if len(time_str) < 14:
|
|
logger.warning(f"XMLTV timestamp too short: '{time_str}', using as-is")
|
|
dt_obj = datetime.strptime(time_str, '%Y%m%d%H%M%S')
|
|
return timezone.make_aware(dt_obj, timezone=dt_timezone.utc)
|
|
|
|
# Parse base datetime
|
|
dt_obj = datetime.strptime(time_str[:14], '%Y%m%d%H%M%S')
|
|
|
|
# Handle timezone if present
|
|
if len(time_str) >= 20: # Has timezone info
|
|
tz_sign = time_str[15]
|
|
tz_hours = int(time_str[16:18])
|
|
tz_minutes = int(time_str[18:20])
|
|
|
|
# Create a timezone object
|
|
if tz_sign == '+':
|
|
tz_offset = dt_timezone(timedelta(hours=tz_hours, minutes=tz_minutes))
|
|
elif tz_sign == '-':
|
|
tz_offset = dt_timezone(timedelta(hours=-tz_hours, minutes=-tz_minutes))
|
|
else:
|
|
tz_offset = dt_timezone.utc
|
|
|
|
# Make datetime aware with correct timezone
|
|
aware_dt = datetime.replace(dt_obj, tzinfo=tz_offset)
|
|
# Convert to UTC
|
|
aware_dt = aware_dt.astimezone(dt_timezone.utc)
|
|
|
|
logger.trace(f"Parsed XMLTV time '{time_str}' to {aware_dt}")
|
|
return aware_dt
|
|
else:
|
|
# No timezone info, assume UTC
|
|
aware_dt = timezone.make_aware(dt_obj, timezone=dt_timezone.utc)
|
|
logger.trace(f"Parsed XMLTV time without timezone '{time_str}' as UTC: {aware_dt}")
|
|
return aware_dt
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing XMLTV time '{time_str}': {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
def parse_schedules_direct_time(time_str):
|
|
try:
|
|
dt_obj = datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%SZ')
|
|
aware_dt = timezone.make_aware(dt_obj, timezone=dt_timezone.utc)
|
|
logger.debug(f"Parsed Schedules Direct time '{time_str}' to {aware_dt}")
|
|
return aware_dt
|
|
except Exception as e:
|
|
logger.error(f"Error parsing Schedules Direct time '{time_str}': {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
# Helper function to extract custom properties - moved to a separate function to clean up the code
|
|
def extract_custom_properties(prog):
|
|
# Create a new dictionary for each call
|
|
custom_props = {}
|
|
|
|
# Extract categories with a single comprehension to reduce intermediate objects
|
|
categories = [cat.text.strip() for cat in prog.findall('category') if cat.text and cat.text.strip()]
|
|
if categories:
|
|
custom_props['categories'] = categories
|
|
|
|
# Extract keywords (new)
|
|
keywords = [kw.text.strip() for kw in prog.findall('keyword') if kw.text and kw.text.strip()]
|
|
if keywords:
|
|
custom_props['keywords'] = keywords
|
|
|
|
# Extract episode numbers
|
|
for ep_num in prog.findall('episode-num'):
|
|
system = ep_num.get('system', '')
|
|
if system == 'xmltv_ns' and ep_num.text:
|
|
# Parse XMLTV episode-num format (season.episode.part)
|
|
parts = ep_num.text.split('.')
|
|
if len(parts) >= 2:
|
|
if parts[0].strip() != '':
|
|
try:
|
|
season = int(parts[0]) + 1 # XMLTV format is zero-based
|
|
custom_props['season'] = season
|
|
except ValueError:
|
|
pass
|
|
if parts[1].strip() != '':
|
|
try:
|
|
episode = int(parts[1]) + 1 # XMLTV format is zero-based
|
|
custom_props['episode'] = episode
|
|
except ValueError:
|
|
pass
|
|
elif system == 'onscreen' and ep_num.text:
|
|
# Just store the raw onscreen format
|
|
custom_props['onscreen_episode'] = ep_num.text.strip()
|
|
elif system == 'dd_progid' and ep_num.text:
|
|
# Store the dd_progid format
|
|
custom_props['dd_progid'] = ep_num.text.strip()
|
|
# Add support for other systems like thetvdb.com, themoviedb.org, imdb.com
|
|
elif system in ['thetvdb.com', 'themoviedb.org', 'imdb.com'] and ep_num.text:
|
|
custom_props[f'{system}_id'] = ep_num.text.strip()
|
|
|
|
# Extract ratings more efficiently
|
|
rating_elem = prog.find('rating')
|
|
if rating_elem is not None:
|
|
value_elem = rating_elem.find('value')
|
|
if value_elem is not None and value_elem.text:
|
|
custom_props['rating'] = value_elem.text.strip()
|
|
if rating_elem.get('system'):
|
|
custom_props['rating_system'] = rating_elem.get('system')
|
|
|
|
# Extract star ratings (new)
|
|
star_ratings = []
|
|
for star_rating in prog.findall('star-rating'):
|
|
value_elem = star_rating.find('value')
|
|
if value_elem is not None and value_elem.text:
|
|
rating_data = {'value': value_elem.text.strip()}
|
|
if star_rating.get('system'):
|
|
rating_data['system'] = star_rating.get('system')
|
|
star_ratings.append(rating_data)
|
|
if star_ratings:
|
|
custom_props['star_ratings'] = star_ratings
|
|
|
|
# Extract credits more efficiently
|
|
credits_elem = prog.find('credits')
|
|
if credits_elem is not None:
|
|
credits = {}
|
|
for credit_type in ['director', 'actor', 'writer', 'adapter', 'producer', 'composer', 'editor', 'presenter', 'commentator', 'guest']:
|
|
if credit_type == 'actor':
|
|
# Handle actors with roles and guest status
|
|
actors = []
|
|
for actor_elem in credits_elem.findall('actor'):
|
|
if actor_elem.text and actor_elem.text.strip():
|
|
actor_data = {'name': actor_elem.text.strip()}
|
|
if actor_elem.get('role'):
|
|
actor_data['role'] = actor_elem.get('role')
|
|
if actor_elem.get('guest') == 'yes':
|
|
actor_data['guest'] = True
|
|
actors.append(actor_data)
|
|
if actors:
|
|
credits['actor'] = actors
|
|
else:
|
|
names = [e.text.strip() for e in credits_elem.findall(credit_type) if e.text and e.text.strip()]
|
|
if names:
|
|
credits[credit_type] = names
|
|
if credits:
|
|
custom_props['credits'] = credits
|
|
|
|
# Extract other common program metadata
|
|
date_elem = prog.find('date')
|
|
if date_elem is not None and date_elem.text:
|
|
custom_props['date'] = date_elem.text.strip()
|
|
|
|
country_elem = prog.find('country')
|
|
if country_elem is not None and country_elem.text:
|
|
custom_props['country'] = country_elem.text.strip()
|
|
|
|
# Extract language information (new)
|
|
language_elem = prog.find('language')
|
|
if language_elem is not None and language_elem.text:
|
|
custom_props['language'] = language_elem.text.strip()
|
|
|
|
orig_language_elem = prog.find('orig-language')
|
|
if orig_language_elem is not None and orig_language_elem.text:
|
|
custom_props['original_language'] = orig_language_elem.text.strip()
|
|
|
|
# Extract length (new)
|
|
length_elem = prog.find('length')
|
|
if length_elem is not None and length_elem.text:
|
|
try:
|
|
length_value = int(length_elem.text.strip())
|
|
length_units = length_elem.get('units', 'minutes')
|
|
custom_props['length'] = {'value': length_value, 'units': length_units}
|
|
except ValueError:
|
|
pass
|
|
|
|
# Extract video information (new)
|
|
video_elem = prog.find('video')
|
|
if video_elem is not None:
|
|
video_info = {}
|
|
for video_attr in ['present', 'colour', 'aspect', 'quality']:
|
|
attr_elem = video_elem.find(video_attr)
|
|
if attr_elem is not None and attr_elem.text:
|
|
video_info[video_attr] = attr_elem.text.strip()
|
|
if video_info:
|
|
custom_props['video'] = video_info
|
|
|
|
# Extract audio information (new)
|
|
audio_elem = prog.find('audio')
|
|
if audio_elem is not None:
|
|
audio_info = {}
|
|
for audio_attr in ['present', 'stereo']:
|
|
attr_elem = audio_elem.find(audio_attr)
|
|
if attr_elem is not None and attr_elem.text:
|
|
audio_info[audio_attr] = attr_elem.text.strip()
|
|
if audio_info:
|
|
custom_props['audio'] = audio_info
|
|
|
|
# Extract subtitles information (new)
|
|
subtitles = []
|
|
for subtitle_elem in prog.findall('subtitles'):
|
|
subtitle_data = {}
|
|
if subtitle_elem.get('type'):
|
|
subtitle_data['type'] = subtitle_elem.get('type')
|
|
lang_elem = subtitle_elem.find('language')
|
|
if lang_elem is not None and lang_elem.text:
|
|
subtitle_data['language'] = lang_elem.text.strip()
|
|
if subtitle_data:
|
|
subtitles.append(subtitle_data)
|
|
|
|
if subtitles:
|
|
custom_props['subtitles'] = subtitles
|
|
|
|
# Extract reviews (new)
|
|
reviews = []
|
|
for review_elem in prog.findall('review'):
|
|
if review_elem.text and review_elem.text.strip():
|
|
review_data = {'content': review_elem.text.strip()}
|
|
if review_elem.get('type'):
|
|
review_data['type'] = review_elem.get('type')
|
|
if review_elem.get('source'):
|
|
review_data['source'] = review_elem.get('source')
|
|
if review_elem.get('reviewer'):
|
|
review_data['reviewer'] = review_elem.get('reviewer')
|
|
reviews.append(review_data)
|
|
if reviews:
|
|
custom_props['reviews'] = reviews
|
|
|
|
# Extract images (new)
|
|
images = []
|
|
for image_elem in prog.findall('image'):
|
|
if image_elem.text and image_elem.text.strip():
|
|
image_data = {'url': image_elem.text.strip()}
|
|
for attr in ['type', 'size', 'orient', 'system']:
|
|
if image_elem.get(attr):
|
|
image_data[attr] = image_elem.get(attr)
|
|
images.append(image_data)
|
|
if images:
|
|
custom_props['images'] = images
|
|
|
|
icon_elem = prog.find('icon')
|
|
if icon_elem is not None and icon_elem.get('src'):
|
|
custom_props['icon'] = icon_elem.get('src')
|
|
|
|
# Simpler approach for boolean flags - expanded list
|
|
for kw in ['previously-shown', 'premiere', 'new', 'live', 'last-chance']:
|
|
if prog.find(kw) is not None:
|
|
custom_props[kw.replace('-', '_')] = True
|
|
|
|
# Extract premiere and last-chance text content if available
|
|
premiere_elem = prog.find('premiere')
|
|
if premiere_elem is not None:
|
|
custom_props['premiere'] = True
|
|
if premiere_elem.text and premiere_elem.text.strip():
|
|
custom_props['premiere_text'] = premiere_elem.text.strip()
|
|
|
|
last_chance_elem = prog.find('last-chance')
|
|
if last_chance_elem is not None:
|
|
custom_props['last_chance'] = True
|
|
if last_chance_elem.text and last_chance_elem.text.strip():
|
|
custom_props['last_chance_text'] = last_chance_elem.text.strip()
|
|
|
|
# Extract previously-shown details
|
|
prev_shown_elem = prog.find('previously-shown')
|
|
if prev_shown_elem is not None:
|
|
custom_props['previously_shown'] = True
|
|
prev_shown_data = {}
|
|
if prev_shown_elem.get('start'):
|
|
prev_shown_data['start'] = prev_shown_elem.get('start')
|
|
if prev_shown_elem.get('channel'):
|
|
prev_shown_data['channel'] = prev_shown_elem.get('channel')
|
|
if prev_shown_data:
|
|
custom_props['previously_shown_details'] = prev_shown_data
|
|
|
|
return custom_props
|
|
|
|
|
|
def clear_element(elem):
|
|
"""Clear an XML element and its parent to free memory."""
|
|
try:
|
|
elem.clear()
|
|
parent = elem.getparent()
|
|
if parent is not None:
|
|
while elem.getprevious() is not None:
|
|
del parent[0]
|
|
parent.remove(elem)
|
|
except Exception as e:
|
|
logger.warning(f"Error clearing XML element: {e}", exc_info=True)
|
|
|
|
|
|
def detect_file_format(file_path=None, content=None):
|
|
"""
|
|
Detect file format by examining content or file path.
|
|
|
|
Args:
|
|
file_path: Path to file (optional)
|
|
content: Raw file content bytes (optional)
|
|
|
|
Returns:
|
|
tuple: (format_type, is_compressed, file_extension)
|
|
format_type: 'gzip', 'zip', 'xml', or 'unknown'
|
|
is_compressed: Boolean indicating if the file is compressed
|
|
file_extension: Appropriate file extension including dot (.gz, .zip, .xml)
|
|
"""
|
|
# Default return values
|
|
format_type = 'unknown'
|
|
is_compressed = False
|
|
file_extension = '.tmp'
|
|
|
|
# First priority: check content magic numbers as they're most reliable
|
|
if content:
|
|
# We only need the first few bytes for magic number detection
|
|
header = content[:20] if len(content) >= 20 else content
|
|
|
|
# Check for gzip magic number (1f 8b)
|
|
if len(header) >= 2 and header[:2] == b'\x1f\x8b':
|
|
return 'gzip', True, '.gz'
|
|
|
|
# Check for zip magic number (PK..)
|
|
if len(header) >= 2 and header[:2] == b'PK':
|
|
return 'zip', True, '.zip'
|
|
|
|
# Check for XML - either standard XML header or XMLTV-specific tag
|
|
if len(header) >= 5 and (b'<?xml' in header or b'<tv>' in header):
|
|
return 'xml', False, '.xml'
|
|
|
|
# Second priority: check file extension - focus on the final extension for compression
|
|
if file_path:
|
|
logger.debug(f"Detecting file format for: {file_path}")
|
|
|
|
# Handle compound extensions like .xml.gz - prioritize compression extensions
|
|
lower_path = file_path.lower()
|
|
|
|
# Check for compression extensions explicitly
|
|
if lower_path.endswith('.gz') or lower_path.endswith('.gzip'):
|
|
return 'gzip', True, '.gz'
|
|
elif lower_path.endswith('.zip'):
|
|
return 'zip', True, '.zip'
|
|
elif lower_path.endswith('.xml'):
|
|
return 'xml', False, '.xml'
|
|
|
|
# Fallback to mimetypes only if direct extension check doesn't work
|
|
import mimetypes
|
|
mime_type, _ = mimetypes.guess_type(file_path)
|
|
logger.debug(f"Guessed MIME type: {mime_type}")
|
|
if mime_type:
|
|
if mime_type == 'application/gzip' or mime_type == 'application/x-gzip':
|
|
return 'gzip', True, '.gz'
|
|
elif mime_type == 'application/zip':
|
|
return 'zip', True, '.zip'
|
|
elif mime_type == 'application/xml' or mime_type == 'text/xml':
|
|
return 'xml', False, '.xml'
|
|
|
|
# If we reach here, we couldn't reliably determine the format
|
|
return format_type, is_compressed, file_extension
|
|
|
|
|
|
def generate_dummy_epg(source):
|
|
"""
|
|
DEPRECATED: This function is no longer used.
|
|
|
|
Dummy EPG programs are now generated on-demand when they are requested
|
|
(during XMLTV export or EPG grid display), rather than being pre-generated
|
|
and stored in the database.
|
|
|
|
See: apps/output/views.py - generate_custom_dummy_programs()
|
|
|
|
This function remains for backward compatibility but should not be called.
|
|
"""
|
|
logger.warning(f"generate_dummy_epg() called for {source.name} but this function is deprecated. "
|
|
f"Dummy EPG programs are now generated on-demand.")
|
|
return True
|