Changes to XC processing for faster parsing.

This commit is contained in:
SergeantPanda 2025-08-29 16:39:57 -05:00
parent e3f988b071
commit d2b6096570

View file

@ -425,17 +425,16 @@ def process_xc_category_direct(account_id, batch, groups, hash_keys):
for key, value in stream_props.items():
setattr(obj, key, value)
obj.last_seen = timezone.now()
obj.updated_at = (
timezone.now()
) # Update timestamp only for changed streams
obj.updated_at = timezone.now() # Update timestamp only for changed streams
streams_to_update.append(obj)
del existing_streams[stream_hash]
else:
# Always update last_seen, even if nothing else changed
obj.last_seen = timezone.now()
# Don't update updated_at for unchanged streams
streams_to_update.append(obj)
existing_streams[stream_hash] = obj
# Remove from existing_streams since we've processed it
del existing_streams[stream_hash]
else:
stream_props["last_seen"] = timezone.now()
stream_props["updated_at"] = (
@ -447,41 +446,20 @@ def process_xc_category_direct(account_id, batch, groups, hash_keys):
with transaction.atomic():
if streams_to_create:
Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True)
if streams_to_update:
# We need to split the bulk update to correctly handle updated_at
# First, get the subset of streams that have content changes
changed_streams = [
s
for s in streams_to_update
if hasattr(s, "updated_at") and s.updated_at
]
unchanged_streams = [
s
for s in streams_to_update
if not hasattr(s, "updated_at") or not s.updated_at
]
# Update changed streams with all fields including updated_at
if changed_streams:
Stream.objects.bulk_update(
changed_streams,
{
key
for key in stream_props.keys()
if key not in ["m3u_account", "stream_hash"]
and key not in hash_keys
}
| {"last_seen", "updated_at"},
)
# Update unchanged streams with only last_seen
if unchanged_streams:
Stream.objects.bulk_update(unchanged_streams, ["last_seen"])
# Simplified bulk update for better performance
Stream.objects.bulk_update(
streams_to_update,
['name', 'url', 'logo_url', 'tvg_id', 'custom_properties', 'last_seen', 'updated_at'],
batch_size=150 # Smaller batch size for XC processing
)
# Update last_seen for any remaining existing streams that weren't processed
if len(existing_streams.keys()) > 0:
Stream.objects.bulk_update(existing_streams.values(), ["last_seen"])
except Exception as e:
logger.error(f"Bulk create failed for XC streams: {str(e)}")
logger.error(f"Bulk operation failed for XC streams: {str(e)}")
retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
@ -496,9 +474,6 @@ def process_xc_category_direct(account_id, batch, groups, hash_keys):
del streams_to_create, streams_to_update, stream_hashes, existing_streams
gc.collect()
# Clean up database connections for threading
connections.close_all()
return retval
@ -1886,8 +1861,8 @@ def refresh_single_m3u_account(account_id):
f"Filtered {len(filtered_groups)} groups for processing: {filtered_groups}"
)
# Batch the groups - use reasonable group batch size for XC processing
GROUP_BATCH_SIZE = 2 # Process 2 groups per batch for XC
# Batch the groups - use single group per batch for maximum parallelism
GROUP_BATCH_SIZE = 4 # Process 4 groups per batch for maximum XC parallelism
filtered_groups_list = list(filtered_groups.items())
batches = [
dict(filtered_groups_list[i : i + GROUP_BATCH_SIZE])
@ -1896,8 +1871,8 @@ def refresh_single_m3u_account(account_id):
logger.info(f"Created {len(batches)} batches for XC processing")
# Use threading for XC processing instead of Celery group
max_workers = min(2, len(batches))
# Use threading for XC processing instead of Celery group - increase parallelism
max_workers = min(4, len(batches))
logger.debug(f"Using {max_workers} threads for XC processing")
with ThreadPoolExecutor(max_workers=max_workers) as executor: