Peak inside gz files to see if they contain xml files. Check file list for zip files to see if they contain xml files.

This commit is contained in:
SergeantPanda 2025-05-26 15:49:51 -05:00
parent 8f4e05b0b8
commit 5d2c604a4a

View file

@ -449,15 +449,15 @@ def fetch_xmltv(source):
logger.info(f"Extracting compressed file {current_file_path}")
send_epg_update(source.id, "extracting", 0, message="Extracting downloaded file")
# Always extract to the standard XML path
extracted = extract_compressed_file(current_file_path, xml_path, delete_original=False)
# Always extract to the standard XML path - set delete_original to True to clean up
extracted = extract_compressed_file(current_file_path, xml_path, delete_original=True)
if extracted:
logger.info(f"Successfully extracted to {xml_path}")
send_epg_update(source.id, "extracting", 100, message=f"File extracted successfully")
# Update to store both paths properly
source.file_path = current_file_path
source.extracted_file_path = xml_path
logger.info(f"Successfully extracted to {xml_path}, compressed file deleted")
send_epg_update(source.id, "extracting", 100, message=f"File extracted successfully, temporary file removed")
# Update to store only the extracted file path since the compressed file is now gone
source.file_path = xml_path
source.extracted_file_path = None
else:
logger.error("Extraction failed, using compressed file")
send_epg_update(source.id, "extracting", 100, status="error", message="Extraction failed, using compressed file")
@ -628,11 +628,26 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False):
if format_type == 'gzip':
logger.debug(f"Extracting gzip file: {file_path}")
with gzip.open(file_path, 'rb') as gz_file:
with open(extracted_path, 'wb') as out_file:
out_file.write(gz_file.read())
try:
# First check if the content is XML by reading a sample
with gzip.open(file_path, 'rb') as gz_file:
content_sample = gz_file.read(4096) # Read first 4KB for detection
detected_format, _, _ = detect_file_format(content=content_sample)
if detected_format != 'xml':
logger.warning(f"GZIP file does not appear to contain XML content: {file_path} (detected as: {detected_format})")
# Continue anyway since GZIP only contains one file
# Reset file pointer and extract the content
gz_file.seek(0)
with open(extracted_path, 'wb') as out_file:
out_file.write(gz_file.read())
except Exception as e:
logger.error(f"Error extracting GZIP file: {e}", exc_info=True)
return None
logger.info(f"Successfully extracted gzip file to: {extracted_path}")
# Delete original compressed file if requested
if delete_original:
try:
@ -649,6 +664,22 @@ def extract_compressed_file(file_path, output_path=None, delete_original=False):
# Find the first XML file in the ZIP archive
xml_files = [f for f in zip_file.namelist() if f.lower().endswith('.xml')]
if not xml_files:
logger.info("No files with .xml extension found in ZIP archive, checking content of all files")
# Check content of each file to see if any are XML without proper extension
for filename in zip_file.namelist():
if not filename.endswith('/'): # Skip directories
try:
# Read a sample of the file content
content_sample = zip_file.read(filename, 4096) # Read up to 4KB for detection
format_type, _, _ = detect_file_format(content=content_sample)
if format_type == 'xml':
logger.info(f"Found XML content in file without .xml extension: {filename}")
xml_files = [filename]
break
except Exception as e:
logger.warning(f"Error reading file {filename} from ZIP: {e}")
if not xml_files:
logger.error("No XML file found in ZIP archive")
return None