From 8f4e05b0b82e08a7ccbfc917ce4a5f703dda8594 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Mon, 26 May 2025 15:10:54 -0500 Subject: [PATCH] Add extracted_file_path to EPGSource model and update extraction logic --- ... => 0014_epgsource_extracted_file_path.py} | 6 +- apps/epg/models.py | 4 +- apps/epg/tasks.py | 168 +++++++++++++----- core/tasks.py | 101 +---------- 4 files changed, 140 insertions(+), 139 deletions(-) rename apps/epg/migrations/{0014_epgsource_original_file_path.py => 0014_epgsource_extracted_file_path.py} (54%) diff --git a/apps/epg/migrations/0014_epgsource_original_file_path.py b/apps/epg/migrations/0014_epgsource_extracted_file_path.py similarity index 54% rename from apps/epg/migrations/0014_epgsource_original_file_path.py rename to apps/epg/migrations/0014_epgsource_extracted_file_path.py index e6c04fec..9ee1170b 100644 --- a/apps/epg/migrations/0014_epgsource_original_file_path.py +++ b/apps/epg/migrations/0014_epgsource_extracted_file_path.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.6 on 2025-05-25 23:00 +# Generated by Django 5.1.6 on 2025-05-26 15:48 from django.db import migrations, models @@ -12,7 +12,7 @@ class Migration(migrations.Migration): operations = [ migrations.AddField( model_name='epgsource', - name='original_file_path', - field=models.CharField(blank=True, help_text='Original path to compressed file before extraction', max_length=1024, null=True), + name='extracted_file_path', + field=models.CharField(blank=True, help_text='Path to extracted XML file after decompression', max_length=1024, null=True), ), ] diff --git a/apps/epg/models.py b/apps/epg/models.py index 8fd4b2f5..8abfb26f 100644 --- a/apps/epg/models.py +++ b/apps/epg/models.py @@ -32,8 +32,8 @@ class EPGSource(models.Model): api_key = models.CharField(max_length=255, blank=True, null=True) # For Schedules Direct is_active = models.BooleanField(default=True) file_path = models.CharField(max_length=1024, blank=True, null=True) - original_file_path = models.CharField(max_length=1024, blank=True, null=True, - help_text="Original path to compressed file before extraction") + extracted_file_path = models.CharField(max_length=1024, blank=True, null=True, + help_text="Path to extracted XML file after decompression") refresh_interval = models.IntegerField(default=0) refresh_task = models.ForeignKey( PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index cdf942e8..5238e9d3 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -212,11 +212,21 @@ def fetch_xmltv(source): # Check if the existing file is compressed and we need to extract it if source.file_path.endswith(('.gz', '.zip')) and not source.file_path.endswith('.xml'): try: - extracted_path = extract_compressed_file(source.file_path) + # Define the path for the extracted file in the cache directory + cache_dir = os.path.join(settings.MEDIA_ROOT, "cached_epg") + os.makedirs(cache_dir, exist_ok=True) + xml_path = os.path.join(cache_dir, f"{source.id}.xml") + + # Extract to the cache location keeping the original + extracted_path = extract_compressed_file(source.file_path, xml_path, delete_original=False) + if extracted_path: - logger.info(f"Extracted existing compressed file to: {extracted_path}") - source.file_path = extracted_path - source.save(update_fields=['file_path']) + logger.info(f"Extracted mapped compressed file to: {extracted_path}") + # Update to use extracted_file_path instead of changing file_path + source.extracted_file_path = extracted_path + source.save(update_fields=['extracted_file_path']) + else: + logger.error(f"Failed to extract mapped compressed file. Using original file: {source.file_path}") except Exception as e: logger.error(f"Failed to extract existing compressed file: {e}") # Continue with the original file if extraction fails @@ -331,13 +341,9 @@ def fetch_xmltv(source): # Define base paths for consistent file naming cache_dir = os.path.join(settings.MEDIA_ROOT, "cached_epg") os.makedirs(cache_dir, exist_ok=True) - + # Create temporary download file with .tmp extension temp_download_path = os.path.join(cache_dir, f"{source.id}.tmp") - - # Define final paths based on content type - compressed_path = os.path.join(cache_dir, f"{source.id}.compressed"); - xml_path = os.path.join(cache_dir, f"{source.id}.xml"); # Check if we have content length for progress tracking total_size = int(response.headers.get('content-length', 0)) @@ -388,16 +394,21 @@ def fetch_xmltv(source): # Send completion notification send_epg_update(source.id, "downloading", 100) - # Determine the appropriate file extension based on content type or URL - content_type = response.headers.get('Content-Type', '').lower() - original_url = source.url.lower() + # Determine the appropriate file extension based on content detection + with open(temp_download_path, 'rb') as f: + content_sample = f.read(1024) # Just need the first 1KB to detect format - # Is this file compressed? - is_compressed = False - if 'application/x-gzip' in content_type or 'application/gzip' in content_type or 'application/zip' in content_type: - is_compressed = True - elif original_url.endswith('.gz') or original_url.endswith('.zip'): - is_compressed = True + # Use our helper function to detect the format + format_type, is_compressed, file_extension = detect_file_format( + file_path=source.url, # Original URL as a hint + content=content_sample # Actual file content for detection + ) + + logger.debug(f"File format detection results: type={format_type}, compressed={is_compressed}, extension={file_extension}") + + # Ensure consistent final paths + compressed_path = os.path.join(cache_dir, f"{source.id}{file_extension}" if is_compressed else f"{source.id}.compressed") + xml_path = os.path.join(cache_dir, f"{source.id}.xml") # Clean up old files before saving new ones if os.path.exists(compressed_path): @@ -439,32 +450,33 @@ def fetch_xmltv(source): send_epg_update(source.id, "extracting", 0, message="Extracting downloaded file") # Always extract to the standard XML path - extracted = extract_compressed_file(current_file_path, xml_path) + extracted = extract_compressed_file(current_file_path, xml_path, delete_original=False) if extracted: logger.info(f"Successfully extracted to {xml_path}") send_epg_update(source.id, "extracting", 100, message=f"File extracted successfully") - # Update the source's file_path to the extracted XML file - source.file_path = xml_path - - # Store the original compressed file path - source.original_file_path = current_file_path + # Update to store both paths properly + source.file_path = current_file_path + source.extracted_file_path = xml_path else: logger.error("Extraction failed, using compressed file") send_epg_update(source.id, "extracting", 100, status="error", message="Extraction failed, using compressed file") # Use the compressed file source.file_path = current_file_path + source.extracted_file_path = None except Exception as e: logger.error(f"Error extracting file: {str(e)}", exc_info=True) send_epg_update(source.id, "extracting", 100, status="error", message=f"Error during extraction: {str(e)}") # Use the compressed file if extraction fails source.file_path = current_file_path + source.extracted_file_path = None else: # It's already an XML file source.file_path = current_file_path + source.extracted_file_path = None - # Update the source's file_path to reflect the correct file - source.save(update_fields=['file_path', 'status', 'original_file_path']) + # Update the source's file paths + source.save(update_fields=['file_path', 'status', 'extracted_file_path']) # Update status to parsing source.status = 'parsing' @@ -608,7 +620,13 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False): base_path = os.path.splitext(file_path)[0] extracted_path = f"{base_path}_{uuid.uuid4().hex[:8]}.xml" - if file_path.endswith('.gz'): + # Use our detection helper to determine the file format instead of relying on extension + with open(file_path, 'rb') as f: + content_sample = f.read(4096) # Read a larger sample to ensure accurate detection + + format_type, is_compressed, _ = detect_file_format(file_path=file_path, content=content_sample) + + if format_type == 'gzip': logger.debug(f"Extracting gzip file: {file_path}") with gzip.open(file_path, 'rb') as gz_file: with open(extracted_path, 'wb') as out_file: @@ -625,7 +643,7 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False): return extracted_path - elif file_path.endswith('.zip'): + elif format_type == 'zip': logger.debug(f"Extracting zip file: {file_path}") with zipfile.ZipFile(file_path, 'r') as zip_file: # Find the first XML file in the ZIP archive @@ -653,7 +671,7 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False): return extracted_path else: - logger.error(f"Unsupported compressed file format: {file_path}") + logger.error(f"Unsupported or unrecognized compressed file format: {file_path} (detected as: {format_type})") return None except Exception as e: @@ -662,7 +680,8 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False): def parse_channels_only(source): - file_path = source.file_path + # Use extracted file if available, otherwise use the original file path + file_path = source.extracted_file_path if source.extracted_file_path else source.file_path if not file_path: file_path = source.get_cache_file() @@ -1058,7 +1077,7 @@ def parse_programs_for_tvg_id(epg_id): # This is faster for most database engines ProgramData.objects.filter(epg=epg).delete() - file_path = epg_source.file_path + file_path = epg_source.extracted_file_path if epg_source.extracted_file_path else epg_source.file_path if not file_path: file_path = epg_source.get_cache_file() @@ -1113,13 +1132,13 @@ def parse_programs_for_tvg_id(epg_id): # Use streaming parsing to reduce memory usage # No need to check file type anymore since it's always XML - logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}") + logger.debug(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}") # Memory usage tracking if process: try: mem_before = process.memory_info().rss / 1024 / 1024 - logger.info(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB") + logger.debug(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB") except Exception as e: logger.warning(f"Error tracking memory: {e}") mem_before = 0 @@ -1613,14 +1632,81 @@ def extract_custom_properties(prog): return custom_props - - - - - - - - while elem.getprevious() is not None: if parent is not None: parent = elem.getparent() elem.clear() try: """Clear an XML element and its parent to free memory."""def clear_element(elem): del parent[0] +def clear_element(elem): + """Clear an XML element and its parent to free memory.""" + try: + elem.clear() + parent = elem.getparent() + if parent is not None: + while elem.getprevious() is not None: + del parent[0] parent.remove(elem) except Exception as e: logger.warning(f"Error clearing XML element: {e}", exc_info=True) + + +def detect_file_format(file_path=None, content=None): + """ + Detect file format by examining content or file path. + + Args: + file_path: Path to file (optional) + content: Raw file content bytes (optional) + + Returns: + tuple: (format_type, is_compressed, file_extension) + format_type: 'gzip', 'zip', 'xml', or 'unknown' + is_compressed: Boolean indicating if the file is compressed + file_extension: Appropriate file extension including dot (.gz, .zip, .xml) + """ + # Default return values + format_type = 'unknown' + is_compressed = False + file_extension = '.tmp' + + # First priority: check content magic numbers as they're most reliable + if content: + # We only need the first few bytes for magic number detection + header = content[:20] if len(content) >= 20 else content + + # Check for gzip magic number (1f 8b) + if len(header) >= 2 and header[:2] == b'\x1f\x8b': + return 'gzip', True, '.gz' + + # Check for zip magic number (PK..) + if len(header) >= 2 and header[:2] == b'PK': + return 'zip', True, '.zip' + + # Check for XML - either standard XML header or XMLTV-specific tag + if len(header) >= 5 and (b'' in header): + return 'xml', False, '.xml' + + # Second priority: check file extension - focus on the final extension for compression + if file_path: + logger.debug(f"Detecting file format for: {file_path}") + + # Handle compound extensions like .xml.gz - prioritize compression extensions + lower_path = file_path.lower() + + # Check for compression extensions explicitly + if lower_path.endswith('.gz') or lower_path.endswith('.gzip'): + return 'gzip', True, '.gz' + elif lower_path.endswith('.zip'): + return 'zip', True, '.zip' + elif lower_path.endswith('.xml'): + return 'xml', False, '.xml' + + # Fallback to mimetypes only if direct extension check doesn't work + import mimetypes + mime_type, _ = mimetypes.guess_type(file_path) + logger.debug(f"Guessed MIME type: {mime_type}") + if mime_type: + if mime_type == 'application/gzip' or mime_type == 'application/x-gzip': + return 'gzip', True, '.gz' + elif mime_type == 'application/zip': + return 'zip', True, '.zip' + elif mime_type == 'application/xml' or mime_type == 'text/xml': + return 'xml', False, '.xml' + + # If we reach here, we couldn't reliably determine the format + return format_type, is_compressed, file_extension diff --git a/core/tasks.py b/core/tasks.py index 062ffaf2..0fdaedf7 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -7,9 +7,6 @@ import logging import re import time import os -import gzip -import zipfile -import uuid from core.utils import RedisClient, send_websocket_update from apps.proxy.ts_proxy.channel_status import ChannelStatus from apps.m3u.models import M3UAccount @@ -19,13 +16,11 @@ from apps.epg.tasks import refresh_epg_data from .models import CoreSettings from apps.channels.models import Stream, ChannelStream from django.db import transaction -from django.conf import settings logger = logging.getLogger(__name__) EPG_WATCH_DIR = '/data/epgs' M3U_WATCH_DIR = '/data/m3us' -EPG_CACHE_DIR = os.path.join(settings.MEDIA_ROOT, "cached_epg") MIN_AGE_SECONDS = 6 STARTUP_SKIP_AGE = 30 REDIS_PREFIX = "processed_file:" @@ -51,58 +46,6 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs): logger_method(message, *args, **kwargs) _last_log_times[key] = now -def extract_compressed_file(file_path): - """ - Extracts a compressed file (.gz or .zip) to an XML file in the cache directory. - - Args: - file_path: Path to the compressed file - - Returns: - Path to the extracted XML file, or None if extraction failed - """ - try: - # Create cache directory if it doesn't exist - os.makedirs(EPG_CACHE_DIR, exist_ok=True) - - # Generate a unique filename for the extracted file - extracted_filename = f"extracted_{uuid.uuid4().hex[:8]}.xml" - extracted_path = os.path.join(EPG_CACHE_DIR, extracted_filename) - - if file_path.endswith('.gz'): - logger.debug(f"Extracting gzip file: {file_path}") - with gzip.open(file_path, 'rb') as gz_file: - with open(extracted_path, 'wb') as out_file: - out_file.write(gz_file.read()) - logger.info(f"Successfully extracted gzip file to: {extracted_path}") - return extracted_path - - elif file_path.endswith('.zip'): - logger.debug(f"Extracting zip file: {file_path}") - with zipfile.ZipFile(file_path, 'r') as zip_file: - # Find the first XML file in the ZIP archive - xml_files = [f for f in zip_file.namelist() if f.lower().endswith('.xml')] - - if not xml_files: - logger.error("No XML file found in ZIP archive") - return None - - # Extract the first XML file - xml_content = zip_file.read(xml_files[0]) - with open(extracted_path, 'wb') as out_file: - out_file.write(xml_content) - - logger.info(f"Successfully extracted zip file to: {extracted_path}") - return extracted_path - - else: - logger.error(f"Unsupported compressed file format: {file_path}") - return None - - except Exception as e: - logger.error(f"Error extracting {file_path}: {str(e)}", exc_info=True) - return None - @shared_task def beat_periodic_task(): fetch_channel_stats() @@ -235,9 +178,9 @@ def scan_and_process_files(): if not filename.endswith('.xml') and not filename.endswith('.gz') and not filename.endswith('.zip'): # Use trace level if not first scan if _first_scan_completed: - logger.trace(f"Skipping {filename}: Not an XML, GZ, or ZIP file") + logger.trace(f"Skipping {filename}: Not an XML, GZ or zip file") else: - logger.debug(f"Skipping {filename}: Not an XML, GZ, or ZIP file") + logger.debug(f"Skipping {filename}: Not an XML, GZ or zip file") epg_skipped += 1 continue @@ -249,7 +192,7 @@ def scan_and_process_files(): # Instead of assuming old files were processed, check if they exist in the database if not stored_mtime and age > STARTUP_SKIP_AGE: # Check if this file is already in the database - existing_epg = EPGSource.objects.filter(original_file_path=filepath).exists() + existing_epg = EPGSource.objects.filter(file_path=filepath).exists() if existing_epg: # Use trace level if not first scan if _first_scan_completed: @@ -284,39 +227,11 @@ def scan_and_process_files(): continue try: - extracted_path = None - is_compressed = filename.endswith('.gz') or filename.endswith('.zip') - - # Extract compressed files - if is_compressed: - logger.info(f"Detected compressed EPG file: {filename}, extracting") - extracted_path = extract_compressed_file(filepath) - - if not extracted_path: - logger.error(f"Failed to extract compressed file: {filename}") - epg_errors += 1 - continue - - logger.info(f"Successfully extracted {filename} to {extracted_path}") - - # Set the file path to use (either extracted or original) - file_to_use = extracted_path if extracted_path else filepath - - epg_source, created = EPGSource.objects.get_or_create( - original_file_path=filepath, - defaults={ - "name": filename, - "source_type": "xmltv", - "is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"], - "file_path": file_to_use - } - ) - - # If the source already exists but we extracted a new file, update its file_path - if not created and extracted_path: - epg_source.file_path = extracted_path - epg_source.save(update_fields=['file_path']) - logger.info(f"Updated existing EPG source with new extracted file: {extracted_path}") + epg_source, created = EPGSource.objects.get_or_create(file_path=filepath, defaults={ + "name": filename, + "source_type": "xmltv", + "is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"], + }) redis_client.set(redis_key, mtime, ex=REDIS_TTL)