mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
Add extracted_file_path to EPGSource model and update extraction logic
This commit is contained in:
parent
182a009d69
commit
8f4e05b0b8
4 changed files with 140 additions and 139 deletions
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.1.6 on 2025-05-25 23:00
|
||||
# Generated by Django 5.1.6 on 2025-05-26 15:48
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
|
@ -12,7 +12,7 @@ class Migration(migrations.Migration):
|
|||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='epgsource',
|
||||
name='original_file_path',
|
||||
field=models.CharField(blank=True, help_text='Original path to compressed file before extraction', max_length=1024, null=True),
|
||||
name='extracted_file_path',
|
||||
field=models.CharField(blank=True, help_text='Path to extracted XML file after decompression', max_length=1024, null=True),
|
||||
),
|
||||
]
|
||||
|
|
@ -32,8 +32,8 @@ class EPGSource(models.Model):
|
|||
api_key = models.CharField(max_length=255, blank=True, null=True) # For Schedules Direct
|
||||
is_active = models.BooleanField(default=True)
|
||||
file_path = models.CharField(max_length=1024, blank=True, null=True)
|
||||
original_file_path = models.CharField(max_length=1024, blank=True, null=True,
|
||||
help_text="Original path to compressed file before extraction")
|
||||
extracted_file_path = models.CharField(max_length=1024, blank=True, null=True,
|
||||
help_text="Path to extracted XML file after decompression")
|
||||
refresh_interval = models.IntegerField(default=0)
|
||||
refresh_task = models.ForeignKey(
|
||||
PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True
|
||||
|
|
|
|||
|
|
@ -212,11 +212,21 @@ def fetch_xmltv(source):
|
|||
# Check if the existing file is compressed and we need to extract it
|
||||
if source.file_path.endswith(('.gz', '.zip')) and not source.file_path.endswith('.xml'):
|
||||
try:
|
||||
extracted_path = extract_compressed_file(source.file_path)
|
||||
# Define the path for the extracted file in the cache directory
|
||||
cache_dir = os.path.join(settings.MEDIA_ROOT, "cached_epg")
|
||||
os.makedirs(cache_dir, exist_ok=True)
|
||||
xml_path = os.path.join(cache_dir, f"{source.id}.xml")
|
||||
|
||||
# Extract to the cache location keeping the original
|
||||
extracted_path = extract_compressed_file(source.file_path, xml_path, delete_original=False)
|
||||
|
||||
if extracted_path:
|
||||
logger.info(f"Extracted existing compressed file to: {extracted_path}")
|
||||
source.file_path = extracted_path
|
||||
source.save(update_fields=['file_path'])
|
||||
logger.info(f"Extracted mapped compressed file to: {extracted_path}")
|
||||
# Update to use extracted_file_path instead of changing file_path
|
||||
source.extracted_file_path = extracted_path
|
||||
source.save(update_fields=['extracted_file_path'])
|
||||
else:
|
||||
logger.error(f"Failed to extract mapped compressed file. Using original file: {source.file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract existing compressed file: {e}")
|
||||
# Continue with the original file if extraction fails
|
||||
|
|
@ -331,13 +341,9 @@ def fetch_xmltv(source):
|
|||
# Define base paths for consistent file naming
|
||||
cache_dir = os.path.join(settings.MEDIA_ROOT, "cached_epg")
|
||||
os.makedirs(cache_dir, exist_ok=True)
|
||||
|
||||
|
||||
# Create temporary download file with .tmp extension
|
||||
temp_download_path = os.path.join(cache_dir, f"{source.id}.tmp")
|
||||
|
||||
# Define final paths based on content type
|
||||
compressed_path = os.path.join(cache_dir, f"{source.id}.compressed");
|
||||
xml_path = os.path.join(cache_dir, f"{source.id}.xml");
|
||||
|
||||
# Check if we have content length for progress tracking
|
||||
total_size = int(response.headers.get('content-length', 0))
|
||||
|
|
@ -388,16 +394,21 @@ def fetch_xmltv(source):
|
|||
# Send completion notification
|
||||
send_epg_update(source.id, "downloading", 100)
|
||||
|
||||
# Determine the appropriate file extension based on content type or URL
|
||||
content_type = response.headers.get('Content-Type', '').lower()
|
||||
original_url = source.url.lower()
|
||||
# Determine the appropriate file extension based on content detection
|
||||
with open(temp_download_path, 'rb') as f:
|
||||
content_sample = f.read(1024) # Just need the first 1KB to detect format
|
||||
|
||||
# Is this file compressed?
|
||||
is_compressed = False
|
||||
if 'application/x-gzip' in content_type or 'application/gzip' in content_type or 'application/zip' in content_type:
|
||||
is_compressed = True
|
||||
elif original_url.endswith('.gz') or original_url.endswith('.zip'):
|
||||
is_compressed = True
|
||||
# Use our helper function to detect the format
|
||||
format_type, is_compressed, file_extension = detect_file_format(
|
||||
file_path=source.url, # Original URL as a hint
|
||||
content=content_sample # Actual file content for detection
|
||||
)
|
||||
|
||||
logger.debug(f"File format detection results: type={format_type}, compressed={is_compressed}, extension={file_extension}")
|
||||
|
||||
# Ensure consistent final paths
|
||||
compressed_path = os.path.join(cache_dir, f"{source.id}{file_extension}" if is_compressed else f"{source.id}.compressed")
|
||||
xml_path = os.path.join(cache_dir, f"{source.id}.xml")
|
||||
|
||||
# Clean up old files before saving new ones
|
||||
if os.path.exists(compressed_path):
|
||||
|
|
@ -439,32 +450,33 @@ def fetch_xmltv(source):
|
|||
send_epg_update(source.id, "extracting", 0, message="Extracting downloaded file")
|
||||
|
||||
# Always extract to the standard XML path
|
||||
extracted = extract_compressed_file(current_file_path, xml_path)
|
||||
extracted = extract_compressed_file(current_file_path, xml_path, delete_original=False)
|
||||
|
||||
if extracted:
|
||||
logger.info(f"Successfully extracted to {xml_path}")
|
||||
send_epg_update(source.id, "extracting", 100, message=f"File extracted successfully")
|
||||
# Update the source's file_path to the extracted XML file
|
||||
source.file_path = xml_path
|
||||
|
||||
# Store the original compressed file path
|
||||
source.original_file_path = current_file_path
|
||||
# Update to store both paths properly
|
||||
source.file_path = current_file_path
|
||||
source.extracted_file_path = xml_path
|
||||
else:
|
||||
logger.error("Extraction failed, using compressed file")
|
||||
send_epg_update(source.id, "extracting", 100, status="error", message="Extraction failed, using compressed file")
|
||||
# Use the compressed file
|
||||
source.file_path = current_file_path
|
||||
source.extracted_file_path = None
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting file: {str(e)}", exc_info=True)
|
||||
send_epg_update(source.id, "extracting", 100, status="error", message=f"Error during extraction: {str(e)}")
|
||||
# Use the compressed file if extraction fails
|
||||
source.file_path = current_file_path
|
||||
source.extracted_file_path = None
|
||||
else:
|
||||
# It's already an XML file
|
||||
source.file_path = current_file_path
|
||||
source.extracted_file_path = None
|
||||
|
||||
# Update the source's file_path to reflect the correct file
|
||||
source.save(update_fields=['file_path', 'status', 'original_file_path'])
|
||||
# Update the source's file paths
|
||||
source.save(update_fields=['file_path', 'status', 'extracted_file_path'])
|
||||
|
||||
# Update status to parsing
|
||||
source.status = 'parsing'
|
||||
|
|
@ -608,7 +620,13 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False):
|
|||
base_path = os.path.splitext(file_path)[0]
|
||||
extracted_path = f"{base_path}_{uuid.uuid4().hex[:8]}.xml"
|
||||
|
||||
if file_path.endswith('.gz'):
|
||||
# Use our detection helper to determine the file format instead of relying on extension
|
||||
with open(file_path, 'rb') as f:
|
||||
content_sample = f.read(4096) # Read a larger sample to ensure accurate detection
|
||||
|
||||
format_type, is_compressed, _ = detect_file_format(file_path=file_path, content=content_sample)
|
||||
|
||||
if format_type == 'gzip':
|
||||
logger.debug(f"Extracting gzip file: {file_path}")
|
||||
with gzip.open(file_path, 'rb') as gz_file:
|
||||
with open(extracted_path, 'wb') as out_file:
|
||||
|
|
@ -625,7 +643,7 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False):
|
|||
|
||||
return extracted_path
|
||||
|
||||
elif file_path.endswith('.zip'):
|
||||
elif format_type == 'zip':
|
||||
logger.debug(f"Extracting zip file: {file_path}")
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Find the first XML file in the ZIP archive
|
||||
|
|
@ -653,7 +671,7 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False):
|
|||
return extracted_path
|
||||
|
||||
else:
|
||||
logger.error(f"Unsupported compressed file format: {file_path}")
|
||||
logger.error(f"Unsupported or unrecognized compressed file format: {file_path} (detected as: {format_type})")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -662,7 +680,8 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False):
|
|||
|
||||
|
||||
def parse_channels_only(source):
|
||||
file_path = source.file_path
|
||||
# Use extracted file if available, otherwise use the original file path
|
||||
file_path = source.extracted_file_path if source.extracted_file_path else source.file_path
|
||||
if not file_path:
|
||||
file_path = source.get_cache_file()
|
||||
|
||||
|
|
@ -1058,7 +1077,7 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
# This is faster for most database engines
|
||||
ProgramData.objects.filter(epg=epg).delete()
|
||||
|
||||
file_path = epg_source.file_path
|
||||
file_path = epg_source.extracted_file_path if epg_source.extracted_file_path else epg_source.file_path
|
||||
if not file_path:
|
||||
file_path = epg_source.get_cache_file()
|
||||
|
||||
|
|
@ -1113,13 +1132,13 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
|
||||
# Use streaming parsing to reduce memory usage
|
||||
# No need to check file type anymore since it's always XML
|
||||
logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}")
|
||||
logger.debug(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}")
|
||||
|
||||
# Memory usage tracking
|
||||
if process:
|
||||
try:
|
||||
mem_before = process.memory_info().rss / 1024 / 1024
|
||||
logger.info(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB")
|
||||
logger.debug(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error tracking memory: {e}")
|
||||
mem_before = 0
|
||||
|
|
@ -1613,14 +1632,81 @@ def extract_custom_properties(prog):
|
|||
|
||||
return custom_props
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
while elem.getprevious() is not None: if parent is not None: parent = elem.getparent() elem.clear() try: """Clear an XML element and its parent to free memory."""def clear_element(elem): del parent[0]
|
||||
def clear_element(elem):
|
||||
"""Clear an XML element and its parent to free memory."""
|
||||
try:
|
||||
elem.clear()
|
||||
parent = elem.getparent()
|
||||
if parent is not None:
|
||||
while elem.getprevious() is not None:
|
||||
del parent[0]
|
||||
parent.remove(elem)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error clearing XML element: {e}", exc_info=True)
|
||||
|
||||
|
||||
def detect_file_format(file_path=None, content=None):
|
||||
"""
|
||||
Detect file format by examining content or file path.
|
||||
|
||||
Args:
|
||||
file_path: Path to file (optional)
|
||||
content: Raw file content bytes (optional)
|
||||
|
||||
Returns:
|
||||
tuple: (format_type, is_compressed, file_extension)
|
||||
format_type: 'gzip', 'zip', 'xml', or 'unknown'
|
||||
is_compressed: Boolean indicating if the file is compressed
|
||||
file_extension: Appropriate file extension including dot (.gz, .zip, .xml)
|
||||
"""
|
||||
# Default return values
|
||||
format_type = 'unknown'
|
||||
is_compressed = False
|
||||
file_extension = '.tmp'
|
||||
|
||||
# First priority: check content magic numbers as they're most reliable
|
||||
if content:
|
||||
# We only need the first few bytes for magic number detection
|
||||
header = content[:20] if len(content) >= 20 else content
|
||||
|
||||
# Check for gzip magic number (1f 8b)
|
||||
if len(header) >= 2 and header[:2] == b'\x1f\x8b':
|
||||
return 'gzip', True, '.gz'
|
||||
|
||||
# Check for zip magic number (PK..)
|
||||
if len(header) >= 2 and header[:2] == b'PK':
|
||||
return 'zip', True, '.zip'
|
||||
|
||||
# Check for XML - either standard XML header or XMLTV-specific tag
|
||||
if len(header) >= 5 and (b'<?xml' in header or b'<tv>' in header):
|
||||
return 'xml', False, '.xml'
|
||||
|
||||
# Second priority: check file extension - focus on the final extension for compression
|
||||
if file_path:
|
||||
logger.debug(f"Detecting file format for: {file_path}")
|
||||
|
||||
# Handle compound extensions like .xml.gz - prioritize compression extensions
|
||||
lower_path = file_path.lower()
|
||||
|
||||
# Check for compression extensions explicitly
|
||||
if lower_path.endswith('.gz') or lower_path.endswith('.gzip'):
|
||||
return 'gzip', True, '.gz'
|
||||
elif lower_path.endswith('.zip'):
|
||||
return 'zip', True, '.zip'
|
||||
elif lower_path.endswith('.xml'):
|
||||
return 'xml', False, '.xml'
|
||||
|
||||
# Fallback to mimetypes only if direct extension check doesn't work
|
||||
import mimetypes
|
||||
mime_type, _ = mimetypes.guess_type(file_path)
|
||||
logger.debug(f"Guessed MIME type: {mime_type}")
|
||||
if mime_type:
|
||||
if mime_type == 'application/gzip' or mime_type == 'application/x-gzip':
|
||||
return 'gzip', True, '.gz'
|
||||
elif mime_type == 'application/zip':
|
||||
return 'zip', True, '.zip'
|
||||
elif mime_type == 'application/xml' or mime_type == 'text/xml':
|
||||
return 'xml', False, '.xml'
|
||||
|
||||
# If we reach here, we couldn't reliably determine the format
|
||||
return format_type, is_compressed, file_extension
|
||||
|
|
|
|||
101
core/tasks.py
101
core/tasks.py
|
|
@ -7,9 +7,6 @@ import logging
|
|||
import re
|
||||
import time
|
||||
import os
|
||||
import gzip
|
||||
import zipfile
|
||||
import uuid
|
||||
from core.utils import RedisClient, send_websocket_update
|
||||
from apps.proxy.ts_proxy.channel_status import ChannelStatus
|
||||
from apps.m3u.models import M3UAccount
|
||||
|
|
@ -19,13 +16,11 @@ from apps.epg.tasks import refresh_epg_data
|
|||
from .models import CoreSettings
|
||||
from apps.channels.models import Stream, ChannelStream
|
||||
from django.db import transaction
|
||||
from django.conf import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
EPG_WATCH_DIR = '/data/epgs'
|
||||
M3U_WATCH_DIR = '/data/m3us'
|
||||
EPG_CACHE_DIR = os.path.join(settings.MEDIA_ROOT, "cached_epg")
|
||||
MIN_AGE_SECONDS = 6
|
||||
STARTUP_SKIP_AGE = 30
|
||||
REDIS_PREFIX = "processed_file:"
|
||||
|
|
@ -51,58 +46,6 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs):
|
|||
logger_method(message, *args, **kwargs)
|
||||
_last_log_times[key] = now
|
||||
|
||||
def extract_compressed_file(file_path):
|
||||
"""
|
||||
Extracts a compressed file (.gz or .zip) to an XML file in the cache directory.
|
||||
|
||||
Args:
|
||||
file_path: Path to the compressed file
|
||||
|
||||
Returns:
|
||||
Path to the extracted XML file, or None if extraction failed
|
||||
"""
|
||||
try:
|
||||
# Create cache directory if it doesn't exist
|
||||
os.makedirs(EPG_CACHE_DIR, exist_ok=True)
|
||||
|
||||
# Generate a unique filename for the extracted file
|
||||
extracted_filename = f"extracted_{uuid.uuid4().hex[:8]}.xml"
|
||||
extracted_path = os.path.join(EPG_CACHE_DIR, extracted_filename)
|
||||
|
||||
if file_path.endswith('.gz'):
|
||||
logger.debug(f"Extracting gzip file: {file_path}")
|
||||
with gzip.open(file_path, 'rb') as gz_file:
|
||||
with open(extracted_path, 'wb') as out_file:
|
||||
out_file.write(gz_file.read())
|
||||
logger.info(f"Successfully extracted gzip file to: {extracted_path}")
|
||||
return extracted_path
|
||||
|
||||
elif file_path.endswith('.zip'):
|
||||
logger.debug(f"Extracting zip file: {file_path}")
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Find the first XML file in the ZIP archive
|
||||
xml_files = [f for f in zip_file.namelist() if f.lower().endswith('.xml')]
|
||||
|
||||
if not xml_files:
|
||||
logger.error("No XML file found in ZIP archive")
|
||||
return None
|
||||
|
||||
# Extract the first XML file
|
||||
xml_content = zip_file.read(xml_files[0])
|
||||
with open(extracted_path, 'wb') as out_file:
|
||||
out_file.write(xml_content)
|
||||
|
||||
logger.info(f"Successfully extracted zip file to: {extracted_path}")
|
||||
return extracted_path
|
||||
|
||||
else:
|
||||
logger.error(f"Unsupported compressed file format: {file_path}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting {file_path}: {str(e)}", exc_info=True)
|
||||
return None
|
||||
|
||||
@shared_task
|
||||
def beat_periodic_task():
|
||||
fetch_channel_stats()
|
||||
|
|
@ -235,9 +178,9 @@ def scan_and_process_files():
|
|||
if not filename.endswith('.xml') and not filename.endswith('.gz') and not filename.endswith('.zip'):
|
||||
# Use trace level if not first scan
|
||||
if _first_scan_completed:
|
||||
logger.trace(f"Skipping {filename}: Not an XML, GZ, or ZIP file")
|
||||
logger.trace(f"Skipping {filename}: Not an XML, GZ or zip file")
|
||||
else:
|
||||
logger.debug(f"Skipping {filename}: Not an XML, GZ, or ZIP file")
|
||||
logger.debug(f"Skipping {filename}: Not an XML, GZ or zip file")
|
||||
epg_skipped += 1
|
||||
continue
|
||||
|
||||
|
|
@ -249,7 +192,7 @@ def scan_and_process_files():
|
|||
# Instead of assuming old files were processed, check if they exist in the database
|
||||
if not stored_mtime and age > STARTUP_SKIP_AGE:
|
||||
# Check if this file is already in the database
|
||||
existing_epg = EPGSource.objects.filter(original_file_path=filepath).exists()
|
||||
existing_epg = EPGSource.objects.filter(file_path=filepath).exists()
|
||||
if existing_epg:
|
||||
# Use trace level if not first scan
|
||||
if _first_scan_completed:
|
||||
|
|
@ -284,39 +227,11 @@ def scan_and_process_files():
|
|||
continue
|
||||
|
||||
try:
|
||||
extracted_path = None
|
||||
is_compressed = filename.endswith('.gz') or filename.endswith('.zip')
|
||||
|
||||
# Extract compressed files
|
||||
if is_compressed:
|
||||
logger.info(f"Detected compressed EPG file: {filename}, extracting")
|
||||
extracted_path = extract_compressed_file(filepath)
|
||||
|
||||
if not extracted_path:
|
||||
logger.error(f"Failed to extract compressed file: {filename}")
|
||||
epg_errors += 1
|
||||
continue
|
||||
|
||||
logger.info(f"Successfully extracted {filename} to {extracted_path}")
|
||||
|
||||
# Set the file path to use (either extracted or original)
|
||||
file_to_use = extracted_path if extracted_path else filepath
|
||||
|
||||
epg_source, created = EPGSource.objects.get_or_create(
|
||||
original_file_path=filepath,
|
||||
defaults={
|
||||
"name": filename,
|
||||
"source_type": "xmltv",
|
||||
"is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"],
|
||||
"file_path": file_to_use
|
||||
}
|
||||
)
|
||||
|
||||
# If the source already exists but we extracted a new file, update its file_path
|
||||
if not created and extracted_path:
|
||||
epg_source.file_path = extracted_path
|
||||
epg_source.save(update_fields=['file_path'])
|
||||
logger.info(f"Updated existing EPG source with new extracted file: {extracted_path}")
|
||||
epg_source, created = EPGSource.objects.get_or_create(file_path=filepath, defaults={
|
||||
"name": filename,
|
||||
"source_type": "xmltv",
|
||||
"is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"],
|
||||
})
|
||||
|
||||
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue