From c24da847fcaf759ea34f149744172639138a18e1 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 29 Aug 2025 19:18:25 -0500 Subject: [PATCH] Get all live streams at once and filter by category. --- apps/m3u/tasks.py | 87 +++++++++++++++++++++++++++++++++++++++----- core/xtream_codes.py | 28 ++++++++++++++ 2 files changed, 105 insertions(+), 10 deletions(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index b5c6f46f..0f75be39 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -315,6 +315,67 @@ def process_groups(account, groups): ChannelGroupM3UAccount.objects.bulk_create(relations, ignore_conflicts=True) +def collect_xc_streams(account_id, enabled_groups): + """Collect all XC streams in a single API call and filter by enabled groups.""" + account = M3UAccount.objects.get(id=account_id) + all_streams = [] + + # Create a mapping from category_id to group info for filtering + enabled_category_ids = {} + for group_name, props in enabled_groups.items(): + if "xc_id" in props: + enabled_category_ids[str(props["xc_id"])] = { + "name": group_name, + "props": props + } + + try: + with XCClient( + account.server_url, + account.username, + account.password, + account.get_user_agent(), + ) as xc_client: + + # Fetch ALL live streams in a single API call (much more efficient) + logger.info("Fetching ALL live streams from XC provider...") + all_xc_streams = xc_client.get_all_live_streams() # Get all streams without category filter + + if not all_xc_streams: + logger.warning("No live streams returned from XC provider") + return [] + + logger.info(f"Retrieved {len(all_xc_streams)} total live streams from provider") + + # Filter streams based on enabled categories + filtered_count = 0 + for stream in all_xc_streams: + # Get the category_id for this stream + category_id = str(stream.get("category_id", "")) + + # Only include streams from enabled categories + if category_id in enabled_category_ids: + group_info = enabled_category_ids[category_id] + + # Convert XC stream to our standard format + stream_data = { + "name": stream["name"], + "url": xc_client.get_stream_url(stream["stream_id"]), + "attributes": { + "tvg-id": stream.get("epg_channel_id", ""), + "tvg-logo": stream.get("stream_icon", ""), + "group-title": group_info["name"] + } + } + all_streams.append(stream_data) + filtered_count += 1 + + except Exception as e: + logger.error(f"Failed to fetch XC streams: {str(e)}") + return [] + + logger.info(f"Filtered {filtered_count} streams from {len(enabled_category_ids)} enabled categories") + return all_streams def process_xc_category(account_id, batch, groups, hash_keys): """Legacy Celery task wrapper - calls the direct function.""" return process_xc_category_direct(account_id, batch, groups, hash_keys) @@ -1861,24 +1922,30 @@ def refresh_single_m3u_account(account_id): f"Filtered {len(filtered_groups)} groups for processing: {filtered_groups}" ) - # Batch the groups - use single group per batch for maximum parallelism - GROUP_BATCH_SIZE = 4 # Process 4 groups per batch for maximum XC parallelism - filtered_groups_list = list(filtered_groups.items()) + # Collect all XC streams in a single API call and filter by enabled categories + logger.info("Fetching all XC streams from provider and filtering by enabled categories...") + all_xc_streams = collect_xc_streams(account_id, filtered_groups) + + if not all_xc_streams: + logger.warning("No streams collected from XC groups") + return f"No streams found for XC account {account_id}", None + + # Now batch by stream count (like standard M3U processing) batches = [ - dict(filtered_groups_list[i : i + GROUP_BATCH_SIZE]) - for i in range(0, len(filtered_groups_list), GROUP_BATCH_SIZE) + all_xc_streams[i : i + BATCH_SIZE] + for i in range(0, len(all_xc_streams), BATCH_SIZE) ] - logger.info(f"Created {len(batches)} batches for XC processing") + logger.info(f"Processing {len(all_xc_streams)} XC streams in {len(batches)} batches") - # Use threading for XC processing instead of Celery group - increase parallelism + # Use threading for XC stream processing - now with consistent batch sizes max_workers = min(4, len(batches)) - logger.debug(f"Using {max_workers} threads for XC processing") + logger.debug(f"Using {max_workers} threads for XC stream processing") with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit XC batch processing tasks using direct functions (now thread-safe) + # Submit stream batch processing tasks (reuse standard M3U processing) future_to_batch = { - executor.submit(process_xc_category_direct, account_id, batch, existing_groups, hash_keys): i + executor.submit(process_m3u_batch_direct, account_id, batch, existing_groups, hash_keys): i for i, batch in enumerate(batches) } diff --git a/core/xtream_codes.py b/core/xtream_codes.py index 469f3a9c..6a30b5d4 100644 --- a/core/xtream_codes.py +++ b/core/xtream_codes.py @@ -192,6 +192,34 @@ class Client: logger.error(traceback.format_exc()) raise + def get_all_live_streams(self): + """Get all live streams (no category filter)""" + try: + if not self.server_info: + self.authenticate() + + endpoint = "player_api.php" + params = { + 'username': self.username, + 'password': self.password, + 'action': 'get_live_streams' + # No category_id = get all streams + } + + streams = self._make_request(endpoint, params) + + if not isinstance(streams, list): + error_msg = f"Invalid streams response for all live streams: {streams}" + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info(f"Successfully retrieved {len(streams)} total live streams") + return streams + except Exception as e: + logger.error(f"Failed to get all live streams: {str(e)}") + logger.error(traceback.format_exc()) + raise + def get_stream_url(self, stream_id): """Get the playback URL for a stream""" return f"{self.server_url}/live/{self.username}/{self.password}/{stream_id}.ts"