mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 18:54:58 +00:00
Extract compressed files after downloading and delete original.
This commit is contained in:
parent
d270e988bd
commit
7dbd41afa8
1 changed files with 139 additions and 51 deletions
|
|
@ -208,6 +208,19 @@ def fetch_xmltv(source):
|
|||
# Handle cases with local file but no URL
|
||||
if not source.url and source.file_path and os.path.exists(source.file_path):
|
||||
logger.info(f"Using existing local file for EPG source: {source.name} at {source.file_path}")
|
||||
|
||||
# Check if the existing file is compressed and we need to extract it
|
||||
if source.file_path.endswith(('.gz', '.zip')) and not source.file_path.endswith('.xml'):
|
||||
try:
|
||||
extracted_path = extract_compressed_file(source.file_path)
|
||||
if extracted_path:
|
||||
logger.info(f"Extracted existing compressed file to: {extracted_path}")
|
||||
source.file_path = extracted_path
|
||||
source.save(update_fields=['file_path'])
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract existing compressed file: {e}")
|
||||
# Continue with the original file if extraction fails
|
||||
|
||||
# Set the status to success in the database
|
||||
source.status = 'success'
|
||||
source.save(update_fields=['status'])
|
||||
|
|
@ -405,15 +418,42 @@ def fetch_xmltv(source):
|
|||
logger.warning(f"Continuing with temporary file: {cache_file}")
|
||||
new_cache_file = cache_file # Fall back to the tmp file
|
||||
|
||||
# Update the source's file_path to reflect the correct extension
|
||||
source.file_path = new_cache_file
|
||||
# Now extract the file if it's compressed
|
||||
extracted_file = None
|
||||
if file_extension in ('.gz', '.zip'):
|
||||
try:
|
||||
logger.info(f"Extracting compressed file {new_cache_file}")
|
||||
send_epg_update(source.id, "extracting", 0, message="Extracting downloaded file")
|
||||
|
||||
extracted_file = extract_compressed_file(new_cache_file)
|
||||
|
||||
if extracted_file:
|
||||
logger.info(f"Successfully extracted to {extracted_file}")
|
||||
send_epg_update(source.id, "extracting", 100, message=f"File extracted successfully: {os.path.basename(extracted_file)}")
|
||||
# Update the source's file_path to the extracted XML file
|
||||
source.file_path = extracted_file
|
||||
else:
|
||||
logger.error("Extraction failed, using compressed file")
|
||||
send_epg_update(source.id, "extracting", 100, status="error", message="Extraction failed, using compressed file")
|
||||
# Use the compressed file
|
||||
source.file_path = new_cache_file
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting file: {str(e)}", exc_info=True)
|
||||
send_epg_update(source.id, "extracting", 100, status="error", message=f"Error during extraction: {str(e)}")
|
||||
# Use the compressed file if extraction fails
|
||||
source.file_path = new_cache_file
|
||||
else:
|
||||
# It's already an XML file
|
||||
source.file_path = new_cache_file
|
||||
|
||||
# Update the source's file_path to reflect the correct file
|
||||
source.save(update_fields=['file_path', 'status'])
|
||||
|
||||
# Update status to parsing
|
||||
source.status = 'parsing'
|
||||
source.save(update_fields=['status'])
|
||||
|
||||
logger.info(f"Cached EPG file saved to {cache_file}")
|
||||
logger.info(f"Cached EPG file saved to {source.file_path}")
|
||||
return True
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
|
|
@ -519,6 +559,83 @@ def fetch_xmltv(source):
|
|||
return False
|
||||
|
||||
|
||||
def extract_compressed_file(file_path, delete_original=True):
|
||||
"""
|
||||
Extracts a compressed file (.gz or .zip) to an XML file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the compressed file
|
||||
delete_original: Whether to delete the original compressed file after successful extraction
|
||||
|
||||
Returns:
|
||||
Path to the extracted XML file, or None if extraction failed
|
||||
"""
|
||||
try:
|
||||
base_path = os.path.splitext(file_path)[0]
|
||||
extracted_path = f"{base_path}.xml"
|
||||
|
||||
# Make sure the output path doesn't already exist
|
||||
if os.path.exists(extracted_path):
|
||||
try:
|
||||
os.remove(extracted_path)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to remove existing extracted file: {e}")
|
||||
# Try with a unique filename instead
|
||||
extracted_path = f"{base_path}_{uuid.uuid4().hex[:8]}.xml"
|
||||
|
||||
if file_path.endswith('.gz'):
|
||||
logger.debug(f"Extracting gzip file: {file_path}")
|
||||
with gzip.open(file_path, 'rb') as gz_file:
|
||||
with open(extracted_path, 'wb') as out_file:
|
||||
out_file.write(gz_file.read())
|
||||
logger.info(f"Successfully extracted gzip file to: {extracted_path}")
|
||||
|
||||
# Delete original compressed file if requested
|
||||
if delete_original:
|
||||
try:
|
||||
os.remove(file_path)
|
||||
logger.info(f"Deleted original compressed file: {file_path}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to delete original compressed file {file_path}: {e}")
|
||||
|
||||
return extracted_path
|
||||
|
||||
elif file_path.endswith('.zip'):
|
||||
logger.debug(f"Extracting zip file: {file_path}")
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Find the first XML file in the ZIP archive
|
||||
xml_files = [f for f in zip_file.namelist() if f.lower().endswith('.xml')]
|
||||
|
||||
if not xml_files:
|
||||
logger.error("No XML file found in ZIP archive")
|
||||
return None
|
||||
|
||||
# Extract the first XML file
|
||||
xml_content = zip_file.read(xml_files[0])
|
||||
with open(extracted_path, 'wb') as out_file:
|
||||
out_file.write(xml_content)
|
||||
|
||||
logger.info(f"Successfully extracted zip file to: {extracted_path}")
|
||||
|
||||
# Delete original compressed file if requested
|
||||
if delete_original:
|
||||
try:
|
||||
os.remove(file_path)
|
||||
logger.info(f"Deleted original compressed file: {file_path}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to delete original compressed file {file_path}: {e}")
|
||||
|
||||
return extracted_path
|
||||
|
||||
else:
|
||||
logger.error(f"Unsupported compressed file format: {file_path}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting {file_path}: {str(e)}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
def parse_channels_only(source):
|
||||
file_path = source.file_path
|
||||
if not file_path:
|
||||
|
|
@ -558,14 +675,17 @@ def parse_channels_only(source):
|
|||
return False
|
||||
|
||||
# Verify the file was downloaded successfully
|
||||
if not os.path.exists(new_path):
|
||||
logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}")
|
||||
if not os.path.exists(source.file_path):
|
||||
logger.error(f"Failed to fetch EPG data, file still missing at: {source.file_path}")
|
||||
# Update status to error
|
||||
source.status = 'error'
|
||||
source.last_message = f"Failed to fetch EPG data, file missing after download"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="error", error="File not found after download")
|
||||
return False
|
||||
|
||||
# Update file_path with the new location
|
||||
file_path = source.file_path
|
||||
else:
|
||||
logger.error(f"No URL provided for EPG source {source.name}, cannot fetch new data")
|
||||
# Update status to error
|
||||
|
|
@ -576,7 +696,7 @@ def parse_channels_only(source):
|
|||
# Initialize process variable for memory tracking only in debug mode
|
||||
try:
|
||||
process = None
|
||||
# Get current log level as a number rather than string
|
||||
# Get current log level as a number
|
||||
current_log_level = logger.getEffectiveLevel()
|
||||
|
||||
# Only track memory usage when log level is DEBUG (10) or more verbose
|
||||
|
|
@ -613,9 +733,7 @@ def parse_channels_only(source):
|
|||
send_epg_update(source.id, "parsing_channels", 10)
|
||||
|
||||
# Stream parsing instead of loading entire file at once
|
||||
is_gzipped = file_path.endswith('.gz')
|
||||
is_zipped = file_path.endswith('.zip')
|
||||
|
||||
# This can be simplified since we now always have XML files
|
||||
epgs_to_create = []
|
||||
epgs_to_update = []
|
||||
total_channels = 0
|
||||
|
|
@ -641,20 +759,9 @@ def parse_channels_only(source):
|
|||
# Update progress after counting
|
||||
send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels)
|
||||
|
||||
# Open the file based on its type
|
||||
if is_gzipped:
|
||||
logger.debug(f"Opening gzipped file for channel parsing: {file_path}")
|
||||
source_file = gzip.open(file_path, 'rb')
|
||||
elif is_zipped:
|
||||
logger.debug(f"Opening zipped file for channel parsing: {file_path}")
|
||||
# For ZIP files, need to open the first file in the archive
|
||||
zip_archive = zipfile.ZipFile(file_path, 'r')
|
||||
# Use the first file in the archive
|
||||
first_file = zip_archive.namelist()[0]
|
||||
source_file = zip_archive.open(first_file, 'r')
|
||||
else:
|
||||
logger.debug(f"Opening file for channel parsing: {file_path}")
|
||||
source_file = open(file_path, 'rb')
|
||||
# Open the file - no need to check file type since it's always XML now
|
||||
logger.debug(f"Opening file for channel parsing: {file_path}")
|
||||
source_file = open(file_path, 'rb')
|
||||
|
||||
if process:
|
||||
logger.debug(f"[parse_channels_only] Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
|
||||
|
|
@ -788,14 +895,6 @@ def parse_channels_only(source):
|
|||
clear_element(elem)
|
||||
continue
|
||||
|
||||
except zipfile.BadZipFile as zip_error:
|
||||
logger.error(f"Bad ZIP file: {zip_error}")
|
||||
# Update status to error
|
||||
source.status = 'error'
|
||||
source.last_message = f"Error parsing ZIP file: {str(zip_error)}"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(zip_error))
|
||||
return False
|
||||
except (etree.XMLSyntaxError, Exception) as xml_error:
|
||||
logger.error(f"[parse_channels_only] XML parsing failed: {xml_error}")
|
||||
# Update status to error
|
||||
|
|
@ -966,14 +1065,17 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
return
|
||||
|
||||
# Also check if the file exists after download
|
||||
if not os.path.exists(new_path):
|
||||
logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}")
|
||||
if not os.path.exists(epg_source.file_path):
|
||||
logger.error(f"Failed to fetch EPG data, file still missing at: {epg_source.file_path}")
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"Failed to download EPG data, file missing after download"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
# Update file_path with the new location
|
||||
file_path = epg_source.file_path
|
||||
else:
|
||||
logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data")
|
||||
# Update status to error
|
||||
|
|
@ -984,12 +1086,8 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
file_path = new_path
|
||||
|
||||
# Use streaming parsing to reduce memory usage
|
||||
is_gzipped = file_path.endswith('.gz')
|
||||
is_zipped = file_path.endswith('.zip')
|
||||
|
||||
# No need to check file type anymore since it's always XML
|
||||
logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}")
|
||||
|
||||
# Memory usage tracking
|
||||
|
|
@ -1005,19 +1103,9 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
batch_size = 1000 # Process in batches to limit memory usage
|
||||
|
||||
try:
|
||||
# Open the file based on its type
|
||||
if is_gzipped:
|
||||
logger.debug(f"Opening GZ file for parsing: {file_path}")
|
||||
source_file = gzip.open(file_path, 'rb')
|
||||
elif is_zipped:
|
||||
logger.debug(f"Opening ZIP file for parsing: {file_path}")
|
||||
# For ZIP files, need to open the first file in the archive
|
||||
zip_archive = zipfile.ZipFile(file_path, 'r')
|
||||
# Use the first file in the archive
|
||||
first_file = zip_archive.namelist()[0]
|
||||
source_file = zip_archive.open(first_file, 'r')
|
||||
else:
|
||||
source_file = open(file_path, 'rb')
|
||||
# Open the file directly - no need to check compression
|
||||
logger.debug(f"Opening file for parsing: {file_path}")
|
||||
source_file = open(file_path, 'rb')
|
||||
|
||||
# Stream parse the file using lxml's iterparse
|
||||
program_parser = etree.iterparse(source_file, events=('end',), tag='programme')
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue