Get all live streams at once and filter by category.

This commit is contained in:
SergeantPanda 2025-08-29 19:18:25 -05:00
parent d2b6096570
commit c24da847fc
2 changed files with 105 additions and 10 deletions

View file

@ -315,6 +315,67 @@ def process_groups(account, groups):
ChannelGroupM3UAccount.objects.bulk_create(relations, ignore_conflicts=True)
def collect_xc_streams(account_id, enabled_groups):
"""Collect all XC streams in a single API call and filter by enabled groups."""
account = M3UAccount.objects.get(id=account_id)
all_streams = []
# Create a mapping from category_id to group info for filtering
enabled_category_ids = {}
for group_name, props in enabled_groups.items():
if "xc_id" in props:
enabled_category_ids[str(props["xc_id"])] = {
"name": group_name,
"props": props
}
try:
with XCClient(
account.server_url,
account.username,
account.password,
account.get_user_agent(),
) as xc_client:
# Fetch ALL live streams in a single API call (much more efficient)
logger.info("Fetching ALL live streams from XC provider...")
all_xc_streams = xc_client.get_all_live_streams() # Get all streams without category filter
if not all_xc_streams:
logger.warning("No live streams returned from XC provider")
return []
logger.info(f"Retrieved {len(all_xc_streams)} total live streams from provider")
# Filter streams based on enabled categories
filtered_count = 0
for stream in all_xc_streams:
# Get the category_id for this stream
category_id = str(stream.get("category_id", ""))
# Only include streams from enabled categories
if category_id in enabled_category_ids:
group_info = enabled_category_ids[category_id]
# Convert XC stream to our standard format
stream_data = {
"name": stream["name"],
"url": xc_client.get_stream_url(stream["stream_id"]),
"attributes": {
"tvg-id": stream.get("epg_channel_id", ""),
"tvg-logo": stream.get("stream_icon", ""),
"group-title": group_info["name"]
}
}
all_streams.append(stream_data)
filtered_count += 1
except Exception as e:
logger.error(f"Failed to fetch XC streams: {str(e)}")
return []
logger.info(f"Filtered {filtered_count} streams from {len(enabled_category_ids)} enabled categories")
return all_streams
def process_xc_category(account_id, batch, groups, hash_keys):
"""Legacy Celery task wrapper - calls the direct function."""
return process_xc_category_direct(account_id, batch, groups, hash_keys)
@ -1861,24 +1922,30 @@ def refresh_single_m3u_account(account_id):
f"Filtered {len(filtered_groups)} groups for processing: {filtered_groups}"
)
# Batch the groups - use single group per batch for maximum parallelism
GROUP_BATCH_SIZE = 4 # Process 4 groups per batch for maximum XC parallelism
filtered_groups_list = list(filtered_groups.items())
# Collect all XC streams in a single API call and filter by enabled categories
logger.info("Fetching all XC streams from provider and filtering by enabled categories...")
all_xc_streams = collect_xc_streams(account_id, filtered_groups)
if not all_xc_streams:
logger.warning("No streams collected from XC groups")
return f"No streams found for XC account {account_id}", None
# Now batch by stream count (like standard M3U processing)
batches = [
dict(filtered_groups_list[i : i + GROUP_BATCH_SIZE])
for i in range(0, len(filtered_groups_list), GROUP_BATCH_SIZE)
all_xc_streams[i : i + BATCH_SIZE]
for i in range(0, len(all_xc_streams), BATCH_SIZE)
]
logger.info(f"Created {len(batches)} batches for XC processing")
logger.info(f"Processing {len(all_xc_streams)} XC streams in {len(batches)} batches")
# Use threading for XC processing instead of Celery group - increase parallelism
# Use threading for XC stream processing - now with consistent batch sizes
max_workers = min(4, len(batches))
logger.debug(f"Using {max_workers} threads for XC processing")
logger.debug(f"Using {max_workers} threads for XC stream processing")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit XC batch processing tasks using direct functions (now thread-safe)
# Submit stream batch processing tasks (reuse standard M3U processing)
future_to_batch = {
executor.submit(process_xc_category_direct, account_id, batch, existing_groups, hash_keys): i
executor.submit(process_m3u_batch_direct, account_id, batch, existing_groups, hash_keys): i
for i, batch in enumerate(batches)
}