diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index 03cb5e41..371e6c2d 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -208,6 +208,19 @@ def fetch_xmltv(source): # Handle cases with local file but no URL if not source.url and source.file_path and os.path.exists(source.file_path): logger.info(f"Using existing local file for EPG source: {source.name} at {source.file_path}") + + # Check if the existing file is compressed and we need to extract it + if source.file_path.endswith(('.gz', '.zip')) and not source.file_path.endswith('.xml'): + try: + extracted_path = extract_compressed_file(source.file_path) + if extracted_path: + logger.info(f"Extracted existing compressed file to: {extracted_path}") + source.file_path = extracted_path + source.save(update_fields=['file_path']) + except Exception as e: + logger.error(f"Failed to extract existing compressed file: {e}") + # Continue with the original file if extraction fails + # Set the status to success in the database source.status = 'success' source.save(update_fields=['status']) @@ -405,15 +418,42 @@ def fetch_xmltv(source): logger.warning(f"Continuing with temporary file: {cache_file}") new_cache_file = cache_file # Fall back to the tmp file - # Update the source's file_path to reflect the correct extension - source.file_path = new_cache_file + # Now extract the file if it's compressed + extracted_file = None + if file_extension in ('.gz', '.zip'): + try: + logger.info(f"Extracting compressed file {new_cache_file}") + send_epg_update(source.id, "extracting", 0, message="Extracting downloaded file") + + extracted_file = extract_compressed_file(new_cache_file) + + if extracted_file: + logger.info(f"Successfully extracted to {extracted_file}") + send_epg_update(source.id, "extracting", 100, message=f"File extracted successfully: {os.path.basename(extracted_file)}") + # Update the source's file_path to the extracted XML file + source.file_path = extracted_file + else: + logger.error("Extraction failed, using compressed file") + send_epg_update(source.id, "extracting", 100, status="error", message="Extraction failed, using compressed file") + # Use the compressed file + source.file_path = new_cache_file + except Exception as e: + logger.error(f"Error extracting file: {str(e)}", exc_info=True) + send_epg_update(source.id, "extracting", 100, status="error", message=f"Error during extraction: {str(e)}") + # Use the compressed file if extraction fails + source.file_path = new_cache_file + else: + # It's already an XML file + source.file_path = new_cache_file + + # Update the source's file_path to reflect the correct file source.save(update_fields=['file_path', 'status']) # Update status to parsing source.status = 'parsing' source.save(update_fields=['status']) - logger.info(f"Cached EPG file saved to {cache_file}") + logger.info(f"Cached EPG file saved to {source.file_path}") return True except requests.exceptions.HTTPError as e: @@ -519,6 +559,83 @@ def fetch_xmltv(source): return False +def extract_compressed_file(file_path, delete_original=True): + """ + Extracts a compressed file (.gz or .zip) to an XML file. + + Args: + file_path: Path to the compressed file + delete_original: Whether to delete the original compressed file after successful extraction + + Returns: + Path to the extracted XML file, or None if extraction failed + """ + try: + base_path = os.path.splitext(file_path)[0] + extracted_path = f"{base_path}.xml" + + # Make sure the output path doesn't already exist + if os.path.exists(extracted_path): + try: + os.remove(extracted_path) + except Exception as e: + logger.warning(f"Failed to remove existing extracted file: {e}") + # Try with a unique filename instead + extracted_path = f"{base_path}_{uuid.uuid4().hex[:8]}.xml" + + if file_path.endswith('.gz'): + logger.debug(f"Extracting gzip file: {file_path}") + with gzip.open(file_path, 'rb') as gz_file: + with open(extracted_path, 'wb') as out_file: + out_file.write(gz_file.read()) + logger.info(f"Successfully extracted gzip file to: {extracted_path}") + + # Delete original compressed file if requested + if delete_original: + try: + os.remove(file_path) + logger.info(f"Deleted original compressed file: {file_path}") + except Exception as e: + logger.warning(f"Failed to delete original compressed file {file_path}: {e}") + + return extracted_path + + elif file_path.endswith('.zip'): + logger.debug(f"Extracting zip file: {file_path}") + with zipfile.ZipFile(file_path, 'r') as zip_file: + # Find the first XML file in the ZIP archive + xml_files = [f for f in zip_file.namelist() if f.lower().endswith('.xml')] + + if not xml_files: + logger.error("No XML file found in ZIP archive") + return None + + # Extract the first XML file + xml_content = zip_file.read(xml_files[0]) + with open(extracted_path, 'wb') as out_file: + out_file.write(xml_content) + + logger.info(f"Successfully extracted zip file to: {extracted_path}") + + # Delete original compressed file if requested + if delete_original: + try: + os.remove(file_path) + logger.info(f"Deleted original compressed file: {file_path}") + except Exception as e: + logger.warning(f"Failed to delete original compressed file {file_path}: {e}") + + return extracted_path + + else: + logger.error(f"Unsupported compressed file format: {file_path}") + return None + + except Exception as e: + logger.error(f"Error extracting {file_path}: {str(e)}", exc_info=True) + return None + + def parse_channels_only(source): file_path = source.file_path if not file_path: @@ -558,14 +675,17 @@ def parse_channels_only(source): return False # Verify the file was downloaded successfully - if not os.path.exists(new_path): - logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}") + if not os.path.exists(source.file_path): + logger.error(f"Failed to fetch EPG data, file still missing at: {source.file_path}") # Update status to error source.status = 'error' source.last_message = f"Failed to fetch EPG data, file missing after download" source.save(update_fields=['status', 'last_message']) send_epg_update(source.id, "parsing_channels", 100, status="error", error="File not found after download") return False + + # Update file_path with the new location + file_path = source.file_path else: logger.error(f"No URL provided for EPG source {source.name}, cannot fetch new data") # Update status to error @@ -576,7 +696,7 @@ def parse_channels_only(source): # Initialize process variable for memory tracking only in debug mode try: process = None - # Get current log level as a number rather than string + # Get current log level as a number current_log_level = logger.getEffectiveLevel() # Only track memory usage when log level is DEBUG (10) or more verbose @@ -613,9 +733,7 @@ def parse_channels_only(source): send_epg_update(source.id, "parsing_channels", 10) # Stream parsing instead of loading entire file at once - is_gzipped = file_path.endswith('.gz') - is_zipped = file_path.endswith('.zip') - + # This can be simplified since we now always have XML files epgs_to_create = [] epgs_to_update = [] total_channels = 0 @@ -641,20 +759,9 @@ def parse_channels_only(source): # Update progress after counting send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels) - # Open the file based on its type - if is_gzipped: - logger.debug(f"Opening gzipped file for channel parsing: {file_path}") - source_file = gzip.open(file_path, 'rb') - elif is_zipped: - logger.debug(f"Opening zipped file for channel parsing: {file_path}") - # For ZIP files, need to open the first file in the archive - zip_archive = zipfile.ZipFile(file_path, 'r') - # Use the first file in the archive - first_file = zip_archive.namelist()[0] - source_file = zip_archive.open(first_file, 'r') - else: - logger.debug(f"Opening file for channel parsing: {file_path}") - source_file = open(file_path, 'rb') + # Open the file - no need to check file type since it's always XML now + logger.debug(f"Opening file for channel parsing: {file_path}") + source_file = open(file_path, 'rb') if process: logger.debug(f"[parse_channels_only] Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") @@ -788,14 +895,6 @@ def parse_channels_only(source): clear_element(elem) continue - except zipfile.BadZipFile as zip_error: - logger.error(f"Bad ZIP file: {zip_error}") - # Update status to error - source.status = 'error' - source.last_message = f"Error parsing ZIP file: {str(zip_error)}" - source.save(update_fields=['status', 'last_message']) - send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(zip_error)) - return False except (etree.XMLSyntaxError, Exception) as xml_error: logger.error(f"[parse_channels_only] XML parsing failed: {xml_error}") # Update status to error @@ -966,14 +1065,17 @@ def parse_programs_for_tvg_id(epg_id): return # Also check if the file exists after download - if not os.path.exists(new_path): - logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}") + if not os.path.exists(epg_source.file_path): + logger.error(f"Failed to fetch EPG data, file still missing at: {epg_source.file_path}") epg_source.status = 'error' epg_source.last_message = f"Failed to download EPG data, file missing after download" epg_source.save(update_fields=['status', 'last_message']) send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download") release_task_lock('parse_epg_programs', epg_id) return + + # Update file_path with the new location + file_path = epg_source.file_path else: logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data") # Update status to error @@ -984,12 +1086,8 @@ def parse_programs_for_tvg_id(epg_id): release_task_lock('parse_epg_programs', epg_id) return - file_path = new_path - # Use streaming parsing to reduce memory usage - is_gzipped = file_path.endswith('.gz') - is_zipped = file_path.endswith('.zip') - + # No need to check file type anymore since it's always XML logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}") # Memory usage tracking @@ -1005,19 +1103,9 @@ def parse_programs_for_tvg_id(epg_id): batch_size = 1000 # Process in batches to limit memory usage try: - # Open the file based on its type - if is_gzipped: - logger.debug(f"Opening GZ file for parsing: {file_path}") - source_file = gzip.open(file_path, 'rb') - elif is_zipped: - logger.debug(f"Opening ZIP file for parsing: {file_path}") - # For ZIP files, need to open the first file in the archive - zip_archive = zipfile.ZipFile(file_path, 'r') - # Use the first file in the archive - first_file = zip_archive.namelist()[0] - source_file = zip_archive.open(first_file, 'r') - else: - source_file = open(file_path, 'rb') + # Open the file directly - no need to check compression + logger.debug(f"Opening file for parsing: {file_path}") + source_file = open(file_path, 'rb') # Stream parse the file using lxml's iterparse program_parser = etree.iterparse(source_file, events=('end',), tag='programme')