diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index b892caef..0ba595c5 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -2523,76 +2523,75 @@ def refresh_single_m3u_account(account_id): if not all_xc_streams: logger.warning("No streams collected from XC groups") - return f"No streams found for XC account {account_id}", None + else: + # Now batch by stream count (like standard M3U processing) + batches = [ + all_xc_streams[i : i + BATCH_SIZE] + for i in range(0, len(all_xc_streams), BATCH_SIZE) + ] - # Now batch by stream count (like standard M3U processing) - batches = [ - all_xc_streams[i : i + BATCH_SIZE] - for i in range(0, len(all_xc_streams), BATCH_SIZE) - ] + logger.info(f"Processing {len(all_xc_streams)} XC streams in {len(batches)} batches") - logger.info(f"Processing {len(all_xc_streams)} XC streams in {len(batches)} batches") + # Use threading for XC stream processing - now with consistent batch sizes + max_workers = min(4, len(batches)) + logger.debug(f"Using {max_workers} threads for XC stream processing") - # Use threading for XC stream processing - now with consistent batch sizes - max_workers = min(4, len(batches)) - logger.debug(f"Using {max_workers} threads for XC stream processing") + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit stream batch processing tasks (reuse standard M3U processing) + future_to_batch = { + executor.submit(process_m3u_batch_direct, account_id, batch, existing_groups, hash_keys): i + for i, batch in enumerate(batches) + } - with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit stream batch processing tasks (reuse standard M3U processing) - future_to_batch = { - executor.submit(process_m3u_batch_direct, account_id, batch, existing_groups, hash_keys): i - for i, batch in enumerate(batches) - } + completed_batches = 0 + total_batches = len(batches) - completed_batches = 0 - total_batches = len(batches) + # Process completed batches as they finish + for future in as_completed(future_to_batch): + batch_idx = future_to_batch[future] + try: + result = future.result() + completed_batches += 1 - # Process completed batches as they finish - for future in as_completed(future_to_batch): - batch_idx = future_to_batch[future] - try: - result = future.result() - completed_batches += 1 + # Extract stream counts from result + if isinstance(result, str): + try: + created_match = re.search(r"(\d+) created", result) + updated_match = re.search(r"(\d+) updated", result) + if created_match and updated_match: + created_count = int(created_match.group(1)) + updated_count = int(updated_match.group(1)) + streams_created += created_count + streams_updated += updated_count + except (AttributeError, ValueError): + pass - # Extract stream counts from result - if isinstance(result, str): - try: - created_match = re.search(r"(\d+) created", result) - updated_match = re.search(r"(\d+) updated", result) - if created_match and updated_match: - created_count = int(created_match.group(1)) - updated_count = int(updated_match.group(1)) - streams_created += created_count - streams_updated += updated_count - except (AttributeError, ValueError): - pass + # Send progress update + progress = int((completed_batches / total_batches) * 100) + current_elapsed = time.time() - start_time - # Send progress update - progress = int((completed_batches / total_batches) * 100) - current_elapsed = time.time() - start_time + if progress > 0: + estimated_total = (current_elapsed / progress) * 100 + time_remaining = max(0, estimated_total - current_elapsed) + else: + time_remaining = 0 - if progress > 0: - estimated_total = (current_elapsed / progress) * 100 - time_remaining = max(0, estimated_total - current_elapsed) - else: - time_remaining = 0 + send_m3u_update( + account_id, + "parsing", + progress, + elapsed_time=current_elapsed, + time_remaining=time_remaining, + streams_processed=streams_created + streams_updated, + ) - send_m3u_update( - account_id, - "parsing", - progress, - elapsed_time=current_elapsed, - time_remaining=time_remaining, - streams_processed=streams_created + streams_updated, - ) + logger.debug(f"XC thread batch {completed_batches}/{total_batches} completed") - logger.debug(f"XC thread batch {completed_batches}/{total_batches} completed") + except Exception as e: + logger.error(f"Error in XC thread batch {batch_idx}: {str(e)}") + completed_batches += 1 # Still count it to avoid hanging - except Exception as e: - logger.error(f"Error in XC thread batch {batch_idx}: {str(e)}") - completed_batches += 1 # Still count it to avoid hanging - - logger.info(f"XC thread-based processing completed for account {account_id}") + logger.info(f"XC thread-based processing completed for account {account_id}") # Ensure all database transactions are committed before cleanup logger.info( @@ -2673,7 +2672,16 @@ def refresh_single_m3u_account(account_id): release_task_lock("refresh_single_m3u_account", account_id) # Aggressive garbage collection - del existing_groups, extinf_data, groups, batches + # Only delete variables if they exist + if 'existing_groups' in locals(): + del existing_groups + if 'extinf_data' in locals(): + del extinf_data + if 'groups' in locals(): + del groups + if 'batches' in locals(): + del batches + from core.utils import cleanup_memory cleanup_memory(log_usage=True, force_collection=True)