Bug fix: If no streams are found during an XC account refresh we were not releasing the task lock and we weren't processing vods.

Fixes #449
This commit is contained in:
SergeantPanda 2025-10-06 20:04:22 -05:00
parent bc574c272c
commit 13874d64ad

View file

@ -2523,76 +2523,75 @@ def refresh_single_m3u_account(account_id):
if not all_xc_streams:
logger.warning("No streams collected from XC groups")
return f"No streams found for XC account {account_id}", None
else:
# Now batch by stream count (like standard M3U processing)
batches = [
all_xc_streams[i : i + BATCH_SIZE]
for i in range(0, len(all_xc_streams), BATCH_SIZE)
]
# Now batch by stream count (like standard M3U processing)
batches = [
all_xc_streams[i : i + BATCH_SIZE]
for i in range(0, len(all_xc_streams), BATCH_SIZE)
]
logger.info(f"Processing {len(all_xc_streams)} XC streams in {len(batches)} batches")
logger.info(f"Processing {len(all_xc_streams)} XC streams in {len(batches)} batches")
# Use threading for XC stream processing - now with consistent batch sizes
max_workers = min(4, len(batches))
logger.debug(f"Using {max_workers} threads for XC stream processing")
# Use threading for XC stream processing - now with consistent batch sizes
max_workers = min(4, len(batches))
logger.debug(f"Using {max_workers} threads for XC stream processing")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit stream batch processing tasks (reuse standard M3U processing)
future_to_batch = {
executor.submit(process_m3u_batch_direct, account_id, batch, existing_groups, hash_keys): i
for i, batch in enumerate(batches)
}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit stream batch processing tasks (reuse standard M3U processing)
future_to_batch = {
executor.submit(process_m3u_batch_direct, account_id, batch, existing_groups, hash_keys): i
for i, batch in enumerate(batches)
}
completed_batches = 0
total_batches = len(batches)
completed_batches = 0
total_batches = len(batches)
# Process completed batches as they finish
for future in as_completed(future_to_batch):
batch_idx = future_to_batch[future]
try:
result = future.result()
completed_batches += 1
# Process completed batches as they finish
for future in as_completed(future_to_batch):
batch_idx = future_to_batch[future]
try:
result = future.result()
completed_batches += 1
# Extract stream counts from result
if isinstance(result, str):
try:
created_match = re.search(r"(\d+) created", result)
updated_match = re.search(r"(\d+) updated", result)
if created_match and updated_match:
created_count = int(created_match.group(1))
updated_count = int(updated_match.group(1))
streams_created += created_count
streams_updated += updated_count
except (AttributeError, ValueError):
pass
# Extract stream counts from result
if isinstance(result, str):
try:
created_match = re.search(r"(\d+) created", result)
updated_match = re.search(r"(\d+) updated", result)
if created_match and updated_match:
created_count = int(created_match.group(1))
updated_count = int(updated_match.group(1))
streams_created += created_count
streams_updated += updated_count
except (AttributeError, ValueError):
pass
# Send progress update
progress = int((completed_batches / total_batches) * 100)
current_elapsed = time.time() - start_time
# Send progress update
progress = int((completed_batches / total_batches) * 100)
current_elapsed = time.time() - start_time
if progress > 0:
estimated_total = (current_elapsed / progress) * 100
time_remaining = max(0, estimated_total - current_elapsed)
else:
time_remaining = 0
if progress > 0:
estimated_total = (current_elapsed / progress) * 100
time_remaining = max(0, estimated_total - current_elapsed)
else:
time_remaining = 0
send_m3u_update(
account_id,
"parsing",
progress,
elapsed_time=current_elapsed,
time_remaining=time_remaining,
streams_processed=streams_created + streams_updated,
)
send_m3u_update(
account_id,
"parsing",
progress,
elapsed_time=current_elapsed,
time_remaining=time_remaining,
streams_processed=streams_created + streams_updated,
)
logger.debug(f"XC thread batch {completed_batches}/{total_batches} completed")
logger.debug(f"XC thread batch {completed_batches}/{total_batches} completed")
except Exception as e:
logger.error(f"Error in XC thread batch {batch_idx}: {str(e)}")
completed_batches += 1 # Still count it to avoid hanging
except Exception as e:
logger.error(f"Error in XC thread batch {batch_idx}: {str(e)}")
completed_batches += 1 # Still count it to avoid hanging
logger.info(f"XC thread-based processing completed for account {account_id}")
logger.info(f"XC thread-based processing completed for account {account_id}")
# Ensure all database transactions are committed before cleanup
logger.info(
@ -2673,7 +2672,16 @@ def refresh_single_m3u_account(account_id):
release_task_lock("refresh_single_m3u_account", account_id)
# Aggressive garbage collection
del existing_groups, extinf_data, groups, batches
# Only delete variables if they exist
if 'existing_groups' in locals():
del existing_groups
if 'extinf_data' in locals():
del extinf_data
if 'groups' in locals():
del groups
if 'batches' in locals():
del batches
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)