From d2b6096570db9cc9e809763f125af9259232633a Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 29 Aug 2025 16:39:57 -0500 Subject: [PATCH] Changes to XC processing for faster parsing. --- apps/m3u/tasks.py | 59 ++++++++++++++--------------------------------- 1 file changed, 17 insertions(+), 42 deletions(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index af995d96..b5c6f46f 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -425,17 +425,16 @@ def process_xc_category_direct(account_id, batch, groups, hash_keys): for key, value in stream_props.items(): setattr(obj, key, value) obj.last_seen = timezone.now() - obj.updated_at = ( - timezone.now() - ) # Update timestamp only for changed streams + obj.updated_at = timezone.now() # Update timestamp only for changed streams streams_to_update.append(obj) - del existing_streams[stream_hash] else: # Always update last_seen, even if nothing else changed obj.last_seen = timezone.now() # Don't update updated_at for unchanged streams streams_to_update.append(obj) - existing_streams[stream_hash] = obj + + # Remove from existing_streams since we've processed it + del existing_streams[stream_hash] else: stream_props["last_seen"] = timezone.now() stream_props["updated_at"] = ( @@ -447,41 +446,20 @@ def process_xc_category_direct(account_id, batch, groups, hash_keys): with transaction.atomic(): if streams_to_create: Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True) + if streams_to_update: - # We need to split the bulk update to correctly handle updated_at - # First, get the subset of streams that have content changes - changed_streams = [ - s - for s in streams_to_update - if hasattr(s, "updated_at") and s.updated_at - ] - unchanged_streams = [ - s - for s in streams_to_update - if not hasattr(s, "updated_at") or not s.updated_at - ] - - # Update changed streams with all fields including updated_at - if changed_streams: - Stream.objects.bulk_update( - changed_streams, - { - key - for key in stream_props.keys() - if key not in ["m3u_account", "stream_hash"] - and key not in hash_keys - } - | {"last_seen", "updated_at"}, - ) - - # Update unchanged streams with only last_seen - if unchanged_streams: - Stream.objects.bulk_update(unchanged_streams, ["last_seen"]) + # Simplified bulk update for better performance + Stream.objects.bulk_update( + streams_to_update, + ['name', 'url', 'logo_url', 'tvg_id', 'custom_properties', 'last_seen', 'updated_at'], + batch_size=150 # Smaller batch size for XC processing + ) + # Update last_seen for any remaining existing streams that weren't processed if len(existing_streams.keys()) > 0: Stream.objects.bulk_update(existing_streams.values(), ["last_seen"]) except Exception as e: - logger.error(f"Bulk create failed for XC streams: {str(e)}") + logger.error(f"Bulk operation failed for XC streams: {str(e)}") retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." @@ -496,9 +474,6 @@ def process_xc_category_direct(account_id, batch, groups, hash_keys): del streams_to_create, streams_to_update, stream_hashes, existing_streams gc.collect() - # Clean up database connections for threading - connections.close_all() - return retval @@ -1886,8 +1861,8 @@ def refresh_single_m3u_account(account_id): f"Filtered {len(filtered_groups)} groups for processing: {filtered_groups}" ) - # Batch the groups - use reasonable group batch size for XC processing - GROUP_BATCH_SIZE = 2 # Process 2 groups per batch for XC + # Batch the groups - use single group per batch for maximum parallelism + GROUP_BATCH_SIZE = 4 # Process 4 groups per batch for maximum XC parallelism filtered_groups_list = list(filtered_groups.items()) batches = [ dict(filtered_groups_list[i : i + GROUP_BATCH_SIZE]) @@ -1896,8 +1871,8 @@ def refresh_single_m3u_account(account_id): logger.info(f"Created {len(batches)} batches for XC processing") - # Use threading for XC processing instead of Celery group - max_workers = min(2, len(batches)) + # Use threading for XC processing instead of Celery group - increase parallelism + max_workers = min(4, len(batches)) logger.debug(f"Using {max_workers} threads for XC processing") with ThreadPoolExecutor(max_workers=max_workers) as executor: