From 92d09eea3b8c0034fc5cd65caa31ba359f3f957f Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Tue, 6 May 2025 18:00:37 -0500 Subject: [PATCH] Enhance XCClient with robust error handling and user agent management; improve M3U processing logic for better error reporting and validation. --- apps/m3u/tasks.py | 414 ++++++++++++++++++++++++++++++++----------- core/xtream_codes.py | 171 +++++++++++++++--- 2 files changed, 456 insertions(+), 129 deletions(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 19172f85..32bae361 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -19,7 +19,7 @@ from django.utils import timezone import time import json from core.utils import RedisClient, acquire_task_lock, release_task_lock -from core.models import CoreSettings +from core.models import CoreSettings, UserAgent from asgiref.sync import async_to_sync from core.xtream_codes import Client as XCClient @@ -242,10 +242,13 @@ def process_groups(account, groups): relations = [] for group in group_objs: + # Ensure we include the xc_id in the custom_properties + custom_props = groups.get(group.name, {}) relations.append(ChannelGroupM3UAccount( channel_group=group, m3u_account=account, - custom_properties=json.dumps(groups[group.name]), + custom_properties=json.dumps(custom_props), + enabled=True, # Default to enabled )) ChannelGroupM3UAccount.objects.bulk_create( @@ -261,84 +264,117 @@ def process_xc_category(account_id, batch, groups, hash_keys): streams_to_update = [] stream_hashes = {} - xc_client = XCClient(account.server_url, account.username, account.password, account.get_user_agent()) - for group_name, props in batch.items(): - streams = xc_client.get_live_category_streams(props['xc_id']) - for stream in streams: - name = stream["name"] - url = xc_client.get_stream_url(stream["stream_id"]) - tvg_id = stream["epg_channel_id"] - tvg_logo = stream["stream_icon"] - group_title = group_name - - stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) - stream_props = { - "name": name, - "url": url, - "logo_url": tvg_logo, - "tvg_id": tvg_id, - "m3u_account": account, - "channel_group_id": int(groups.get(group_title)), - "stream_hash": stream_hash, - "custom_properties": json.dumps(stream), - } - - if stream_hash not in stream_hashes: - stream_hashes[stream_hash] = stream_props - - existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())} - - for stream_hash, stream_props in stream_hashes.items(): - if stream_hash in existing_streams: - obj = existing_streams[stream_hash] - existing_attr = {field.name: getattr(obj, field.name) for field in Stream._meta.fields if field != 'channel_group_id'} - changed = any(existing_attr[key] != value for key, value in stream_props.items() if key != 'channel_group_id') - - if changed: - for key, value in stream_props.items(): - setattr(obj, key, value) - obj.last_seen = timezone.now() - obj.updated_at = timezone.now() # Update timestamp only for changed streams - streams_to_update.append(obj) - del existing_streams[stream_hash] - else: - # Always update last_seen, even if nothing else changed - obj.last_seen = timezone.now() - # Don't update updated_at for unchanged streams - streams_to_update.append(obj) - existing_streams[stream_hash] = obj - else: - stream_props["last_seen"] = timezone.now() - stream_props["updated_at"] = timezone.now() # Set initial updated_at for new streams - streams_to_create.append(Stream(**stream_props)) - try: - with transaction.atomic(): - if streams_to_create: - Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True) - if streams_to_update: - # We need to split the bulk update to correctly handle updated_at - # First, get the subset of streams that have content changes - changed_streams = [s for s in streams_to_update if hasattr(s, 'updated_at') and s.updated_at] - unchanged_streams = [s for s in streams_to_update if not hasattr(s, 'updated_at') or not s.updated_at] + xc_client = XCClient(account.server_url, account.username, account.password, account.get_user_agent()) - # Update changed streams with all fields including updated_at - if changed_streams: - Stream.objects.bulk_update( - changed_streams, - {key for key in stream_props.keys() if key not in ["m3u_account", "stream_hash"] and key not in hash_keys} | {"last_seen", "updated_at"} - ) + # Log the batch details to help with debugging + logger.debug(f"Processing XC batch: {batch}") - # Update unchanged streams with only last_seen - if unchanged_streams: - Stream.objects.bulk_update(unchanged_streams, ["last_seen"]) + for group_name, props in batch.items(): + # Check if we have a valid xc_id for this group + if 'xc_id' not in props: + logger.error(f"Missing xc_id for group {group_name} in batch {batch}") + continue + + # Get actual group ID from the mapping + group_id = groups.get(group_name) + if not group_id: + logger.error(f"Group {group_name} not found in enabled groups") + continue + + try: + logger.info(f"Fetching streams for XC category: {group_name} (ID: {props['xc_id']})") + streams = xc_client.get_live_category_streams(props['xc_id']) + + if not streams: + logger.warning(f"No streams found for XC category {group_name} (ID: {props['xc_id']})") + continue + + logger.info(f"Found {len(streams)} streams for category {group_name}") + + for stream in streams: + name = stream["name"] + url = xc_client.get_stream_url(stream["stream_id"]) + tvg_id = stream.get("epg_channel_id", "") + tvg_logo = stream.get("stream_icon", "") + group_title = group_name + + stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) + stream_props = { + "name": name, + "url": url, + "logo_url": tvg_logo, + "tvg_id": tvg_id, + "m3u_account": account, + "channel_group_id": int(group_id), + "stream_hash": stream_hash, + "custom_properties": json.dumps(stream), + } + + if stream_hash not in stream_hashes: + stream_hashes[stream_hash] = stream_props + except Exception as e: + logger.error(f"Error processing XC category {group_name} (ID: {props['xc_id']}): {str(e)}") + continue + + # Process all found streams + existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())} + + for stream_hash, stream_props in stream_hashes.items(): + if stream_hash in existing_streams: + obj = existing_streams[stream_hash] + existing_attr = {field.name: getattr(obj, field.name) for field in Stream._meta.fields if field != 'channel_group_id'} + changed = any(existing_attr[key] != value for key, value in stream_props.items() if key != 'channel_group_id') + + if changed: + for key, value in stream_props.items(): + setattr(obj, key, value) + obj.last_seen = timezone.now() + obj.updated_at = timezone.now() # Update timestamp only for changed streams + streams_to_update.append(obj) + del existing_streams[stream_hash] + else: + # Always update last_seen, even if nothing else changed + obj.last_seen = timezone.now() + # Don't update updated_at for unchanged streams + streams_to_update.append(obj) + existing_streams[stream_hash] = obj + else: + stream_props["last_seen"] = timezone.now() + stream_props["updated_at"] = timezone.now() # Set initial updated_at for new streams + streams_to_create.append(Stream(**stream_props)) + + try: + with transaction.atomic(): + if streams_to_create: + Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True) + if streams_to_update: + # We need to split the bulk update to correctly handle updated_at + # First, get the subset of streams that have content changes + changed_streams = [s for s in streams_to_update if hasattr(s, 'updated_at') and s.updated_at] + unchanged_streams = [s for s in streams_to_update if not hasattr(s, 'updated_at') or not s.updated_at] + + # Update changed streams with all fields including updated_at + if changed_streams: + Stream.objects.bulk_update( + changed_streams, + {key for key in stream_props.keys() if key not in ["m3u_account", "stream_hash"] and key not in hash_keys} | {"last_seen", "updated_at"} + ) + + # Update unchanged streams with only last_seen + if unchanged_streams: + Stream.objects.bulk_update(unchanged_streams, ["last_seen"]) + + if len(existing_streams.keys()) > 0: + Stream.objects.bulk_update(existing_streams.values(), ["last_seen"]) + except Exception as e: + logger.error(f"Bulk create failed for XC streams: {str(e)}") + + retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." - if len(existing_streams.keys()) > 0: - Stream.objects.bulk_update(existing_streams.values(), ["last_seen"]) except Exception as e: - logger.error(f"Bulk create failed: {str(e)}") - - retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." + logger.error(f"XC category processing error: {str(e)}") + retval = f"Error processing XC batch: {str(e)}" # Aggressive garbage collection del streams_to_create, streams_to_update, stream_hashes, existing_streams @@ -493,26 +529,146 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False): extinf_data = [] groups = {"Default Group": {}} - xc_client = None if account.account_type == M3UAccount.Types.XC: - xc_client = XCClient(account.server_url, account.username, account.password, account.get_user_agent()) - try: - xc_client.authenticate() - except Exception as e: - error_msg = f"Failed to authenticate with XC server: {str(e)}" + # Log detailed information about the account + logger.info(f"Processing XC account {account_id} with URL: {account.server_url}") + logger.info(f"Username: {account.username}, Has password: {'Yes' if account.password else 'No'}") + + # Validate required fields + if not account.server_url: + error_msg = "Missing server URL for Xtream Codes account" logger.error(error_msg) account.status = M3UAccount.Status.ERROR account.last_message = error_msg account.save(update_fields=['status', 'last_message']) send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) release_task_lock('refresh_m3u_account_groups', account_id) - return f"M3UAccount with ID={account_id} failed to authenticate with XC server.", None + return error_msg, None - xc_categories = xc_client.get_live_categories() - for category in xc_categories: - groups[category["category_name"]] = { - "xc_id": category["category_id"], - } + if not account.username or not account.password: + error_msg = "Missing username or password for Xtream Codes account" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None + + try: + # Ensure server URL is properly formatted + server_url = account.server_url.rstrip('/') + if not (server_url.startswith('http://') or server_url.startswith('https://')): + server_url = f"http://{server_url}" + + # User agent handling - completely rewritten + try: + # Debug the user agent issue + logger.info(f"Getting user agent for account {account.id}") + + # Use a hardcoded user agent string to avoid any issues with object structure + user_agent_string = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + + try: + # Try to get the user agent directly from the database + if account.user_agent_id: + ua_obj = UserAgent.objects.get(id=account.user_agent_id) + if ua_obj and hasattr(ua_obj, 'user_agent') and ua_obj.user_agent: + user_agent_string = ua_obj.user_agent + logger.info(f"Using user agent from account: {user_agent_string}") + else: + # Get default user agent from CoreSettings + default_ua_id = CoreSettings.get_default_user_agent_id() + logger.info(f"Default user agent ID from settings: {default_ua_id}") + if default_ua_id: + ua_obj = UserAgent.objects.get(id=default_ua_id) + if ua_obj and hasattr(ua_obj, 'user_agent') and ua_obj.user_agent: + user_agent_string = ua_obj.user_agent + logger.info(f"Using default user agent: {user_agent_string}") + except Exception as e: + logger.warning(f"Error getting user agent, using fallback: {str(e)}") + + logger.info(f"Final user agent string: {user_agent_string}") + except Exception as e: + user_agent_string = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + logger.warning(f"Exception in user agent handling, using fallback: {str(e)}") + + logger.info(f"Creating XCClient with URL: {server_url}, Username: {account.username}, User-Agent: {user_agent_string}") + + # Create XCClient with explicit error handling + try: + xc_client = XCClient(server_url, account.username, account.password, user_agent_string) + logger.info(f"XCClient instance created successfully") + except Exception as e: + error_msg = f"Failed to create XCClient: {str(e)}" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None + + # Authenticate with detailed error handling + try: + logger.info(f"Authenticating with XC server {server_url}") + auth_result = xc_client.authenticate() + logger.info(f"Authentication response: {auth_result}") + except Exception as e: + error_msg = f"Failed to authenticate with XC server: {str(e)}" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None + + # Get categories with detailed error handling + try: + logger.info(f"Getting live categories from XC server") + xc_categories = xc_client.get_live_categories() + logger.info(f"Found {len(xc_categories)} categories: {xc_categories}") + + # Validate response + if not isinstance(xc_categories, list): + error_msg = f"Unexpected response from XC server: {xc_categories}" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None + + if len(xc_categories) == 0: + logger.warning("No categories found in XC server response") + + for category in xc_categories: + cat_name = category.get("category_name", "Unknown Category") + cat_id = category.get("category_id", "0") + logger.info(f"Adding category: {cat_name} (ID: {cat_id})") + groups[cat_name] = { + "xc_id": cat_id, + } + except Exception as e: + error_msg = f"Failed to get categories from XC server: {str(e)}" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None + except Exception as e: + error_msg = f"Unexpected error in XC processing: {str(e)}" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None else: # Here's the key change - use the success flag from fetch_m3u_lines lines, success = fetch_m3u_lines(account, use_cache) @@ -609,28 +765,34 @@ def refresh_single_m3u_account(account_id): if not extinf_data: try: + logger.info(f"Calling refresh_m3u_groups for account {account_id}") result = refresh_m3u_groups(account_id, full_refresh=True) + logger.info(f"refresh_m3u_groups result: {result}") - # Check if the result indicates an error (None tuple or tuple with empty values) - if not result or not result[0] or not result[1]: - logger.error(f"Failed to refresh M3U groups for account {account_id}") - # The error already has been recorded by refresh_m3u_groups -# Just release the lock and exit - no need to set parsing status at all + # Check for completely empty result or missing groups + if not result or result[1] is None: + logger.error(f"Failed to refresh M3U groups for account {account_id}: {result}") release_task_lock('refresh_single_m3u_account', account_id) return "Failed to update m3u account - download failed or other error" extinf_data, groups = result - if not groups: - logger.error(f"No groups found for account {account_id}") - account.status = M3UAccount.Status.ERROR - account.last_message = "No channel groups found in M3U source" - account.save(update_fields=['status', 'last_message']) - send_m3u_update(account_id, "parsing", 100, status="error", error="No channel groups found") - release_task_lock('refresh_single_m3u_account', account_id) - return "Failed to update m3u account, no groups found" + # XC accounts can have empty extinf_data but valid groups + try: + account = M3UAccount.objects.get(id=account_id) + is_xc_account = account.account_type == M3UAccount.Types.XC + except M3UAccount.DoesNotExist: + is_xc_account = False + + # For XC accounts, empty extinf_data is normal at this stage + if not extinf_data and not is_xc_account: + logger.error(f"No streams found for non-XC account {account_id}") + account.status = M3UAccount.Status.ERROR + account.last_message = "No streams found in M3U source" + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "parsing", 100, status="error", error="No streams found") except Exception as e: - logger.error(f"Exception in refresh_m3u_groups: {str(e)}") + logger.error(f"Exception in refresh_m3u_groups: {str(e)}", exc_info=True) account.status = M3UAccount.Status.ERROR account.last_message = f"Error refreshing M3U groups: {str(e)}" account.save(update_fields=['status', 'last_message']) @@ -638,8 +800,15 @@ def refresh_single_m3u_account(account_id): release_task_lock('refresh_single_m3u_account', account_id) return "Failed to update m3u account" -# Only proceed with parsing if we actually have data and no errors were encountered - if not extinf_data or not groups: + # Only proceed with parsing if we actually have data and no errors were encountered + # Get account type to handle XC accounts differently + try: + is_xc_account = account.account_type == M3UAccount.Types.XC + except Exception: + is_xc_account = False + + # Modified validation logic for different account types + if (not groups) or (not is_xc_account and not extinf_data): logger.error(f"No data to process for account {account_id}") account.status = M3UAccount.Status.ERROR account.last_message = "No data available for processing" @@ -665,11 +834,44 @@ def refresh_single_m3u_account(account_id): batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)] task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches) else: - filtered_groups = [(k, v) for k, v in groups.items() if k in existing_groups] + # For XC accounts, get the groups with their custom properties containing xc_id + logger.info(f"Processing XC account with groups: {existing_groups}") + + # Get the ChannelGroupM3UAccount entries with their custom_properties + channel_group_relationships = ChannelGroupM3UAccount.objects.filter( + m3u_account=account, + enabled=True + ).select_related('channel_group') + + filtered_groups = {} + for rel in channel_group_relationships: + group_name = rel.channel_group.name + group_id = rel.channel_group.id + + # Load the custom properties with the xc_id + try: + custom_props = json.loads(rel.custom_properties) if rel.custom_properties else {} + if 'xc_id' in custom_props: + filtered_groups[group_name] = { + 'xc_id': custom_props['xc_id'], + 'channel_group_id': group_id + } + logger.info(f"Added group {group_name} with xc_id {custom_props['xc_id']}") + else: + logger.warning(f"No xc_id found in custom properties for group {group_name}") + except (json.JSONDecodeError, KeyError) as e: + logger.error(f"Error parsing custom properties for group {group_name}: {str(e)}") + + logger.info(f"Filtered {len(filtered_groups)} groups for processing: {filtered_groups}") + + # Batch the groups + filtered_groups_list = list(filtered_groups.items()) batches = [ - dict(filtered_groups[i:i + 2]) - for i in range(0, len(filtered_groups), 2) + dict(filtered_groups_list[i:i + 2]) + for i in range(0, len(filtered_groups_list), 2) ] + + logger.info(f"Created {len(batches)} batches for XC processing") task_group = group(process_xc_category.s(account_id, batch, existing_groups, hash_keys) for batch in batches) total_batches = len(batches) diff --git a/core/xtream_codes.py b/core/xtream_codes.py index 0fec0fed..17f3eaad 100644 --- a/core/xtream_codes.py +++ b/core/xtream_codes.py @@ -1,39 +1,164 @@ import requests +import logging +import traceback +import json + +logger = logging.getLogger(__name__) class Client: - host = "" - username = "" - password = "" - user_agent = "" + """Xtream Codes API Client with robust error handling""" - def __init__(self, host, username, password, user_agent): - self.host = host + def __init__(self, server_url, username, password, user_agent=None): + self.server_url = self._normalize_url(server_url) self.username = username self.password = password + self.user_agent = user_agent - # Handle UserAgent objects by extracting the string value - if hasattr(user_agent, 'user_agent'): # Check if it's a UserAgent model object - self.user_agent = user_agent.user_agent # Extract the string attribute + # Fix: Properly handle all possible user_agent input types + if user_agent: + if isinstance(user_agent, str): + # Direct string user agent + user_agent_string = user_agent + elif hasattr(user_agent, 'user_agent'): + # UserAgent model object + user_agent_string = user_agent.user_agent + else: + # Fallback for any other type + logger.warning(f"Unexpected user_agent type: {type(user_agent)}, using default") + user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)' else: - self.user_agent = str(user_agent) # Ensure it's a string in any case + # No user agent provided + user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)' + + self.headers = {'User-Agent': user_agent_string} + self.server_info = None + + def _normalize_url(self, url): + """Normalize server URL by removing trailing slashes and paths""" + if not url: + raise ValueError("Server URL cannot be empty") + + url = url.rstrip('/') + # Remove any path after domain - we'll construct proper API URLs + # Split by protocol first to preserve it + if '://' in url: + protocol, rest = url.split('://', 1) + domain = rest.split('/', 1)[0] + return f"{protocol}://{domain}" + return url + + def _make_request(self, endpoint, params=None): + """Make request with detailed error handling""" + try: + url = f"{self.server_url}/{endpoint}" + logger.debug(f"XC API Request: {url} with params: {params}") + + response = requests.get(url, params=params, headers=self.headers, timeout=30) + response.raise_for_status() + + data = response.json() + logger.debug(f"XC API Response: {url} status code: {response.status_code}") + + # Check for XC-specific error responses + if isinstance(data, dict) and data.get('user_info') is None and 'error' in data: + error_msg = f"XC API Error: {data.get('error', 'Unknown error')}" + logger.error(error_msg) + raise ValueError(error_msg) + + return data + except requests.RequestException as e: + error_msg = f"XC API Request failed: {str(e)}" + logger.error(error_msg) + logger.error(f"Request details: URL={url}, Params={params}") + raise + except ValueError as e: + # This could be from JSON parsing or our explicit raises + logger.error(f"XC API Invalid response: {str(e)}") + raise + except Exception as e: + logger.error(f"XC API Unexpected error: {str(e)}") + logger.error(traceback.format_exc()) + raise def authenticate(self): - response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}", headers={ - 'User-Agent': self.user_agent, - }) - return response.json() + """Authenticate and validate server response""" + try: + endpoint = "player_api.php" + params = { + 'username': self.username, + 'password': self.password + } + + self.server_info = self._make_request(endpoint, params) + + if not self.server_info or not self.server_info.get('user_info'): + error_msg = "Authentication failed: Invalid response from server" + logger.error(f"{error_msg}. Response: {self.server_info}") + raise ValueError(error_msg) + + logger.info(f"XC Authentication successful for user {self.username}") + return self.server_info + except Exception as e: + logger.error(f"XC Authentication failed: {str(e)}") + logger.error(traceback.format_exc()) + raise def get_live_categories(self): - response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}&action=get_live_categories", headers={ - 'User-Agent': self.user_agent, - }) - return response.json() + """Get live TV categories""" + try: + if not self.server_info: + self.authenticate() + + endpoint = "player_api.php" + params = { + 'username': self.username, + 'password': self.password, + 'action': 'get_live_categories' + } + + categories = self._make_request(endpoint, params) + + if not isinstance(categories, list): + error_msg = f"Invalid categories response: {categories}" + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info(f"Successfully retrieved {len(categories)} live categories") + logger.debug(f"Categories: {json.dumps(categories[:5])}...") + return categories + except Exception as e: + logger.error(f"Failed to get live categories: {str(e)}") + logger.error(traceback.format_exc()) + raise def get_live_category_streams(self, category_id): - response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}&action=get_live_streams&category_id={category_id}", headers={ - 'User-Agent': self.user_agent, - }) - return response.json() + """Get streams for a specific category""" + try: + if not self.server_info: + self.authenticate() + + endpoint = "player_api.php" + params = { + 'username': self.username, + 'password': self.password, + 'action': 'get_live_streams', + 'category_id': category_id + } + + streams = self._make_request(endpoint, params) + + if not isinstance(streams, list): + error_msg = f"Invalid streams response for category {category_id}: {streams}" + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info(f"Successfully retrieved {len(streams)} streams for category {category_id}") + return streams + except Exception as e: + logger.error(f"Failed to get streams for category {category_id}: {str(e)}") + logger.error(traceback.format_exc()) + raise def get_stream_url(self, stream_id): - return f"{self.host}/{self.username}/{self.password}/{stream_id}" + """Get the playback URL for a stream""" + return f"{self.server_url}/live/{self.username}/{self.password}/{stream_id}.ts"