diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index d6e0755b..d7e46cde 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -285,57 +285,56 @@ def process_xc_category(account_id, batch, groups, hash_keys): stream_hashes = {} try: - xc_client = XCClient(account.server_url, account.username, account.password, account.get_user_agent()) + with XCClient(account.server_url, account.username, account.password, account.get_user_agent()) as xc_client: + # Log the batch details to help with debugging + logger.debug(f"Processing XC batch: {batch}") - # Log the batch details to help with debugging - logger.debug(f"Processing XC batch: {batch}") - - for group_name, props in batch.items(): - # Check if we have a valid xc_id for this group - if 'xc_id' not in props: - logger.error(f"Missing xc_id for group {group_name} in batch {batch}") - continue - - # Get actual group ID from the mapping - group_id = groups.get(group_name) - if not group_id: - logger.error(f"Group {group_name} not found in enabled groups") - continue - - try: - logger.debug(f"Fetching streams for XC category: {group_name} (ID: {props['xc_id']})") - streams = xc_client.get_live_category_streams(props['xc_id']) - - if not streams: - logger.warning(f"No streams found for XC category {group_name} (ID: {props['xc_id']})") + for group_name, props in batch.items(): + # Check if we have a valid xc_id for this group + if 'xc_id' not in props: + logger.error(f"Missing xc_id for group {group_name} in batch {batch}") continue - logger.debug(f"Found {len(streams)} streams for category {group_name}") + # Get actual group ID from the mapping + group_id = groups.get(group_name) + if not group_id: + logger.error(f"Group {group_name} not found in enabled groups") + continue - for stream in streams: - name = stream["name"] - url = xc_client.get_stream_url(stream["stream_id"]) - tvg_id = stream.get("epg_channel_id", "") - tvg_logo = stream.get("stream_icon", "") - group_title = group_name + try: + logger.debug(f"Fetching streams for XC category: {group_name} (ID: {props['xc_id']})") + streams = xc_client.get_live_category_streams(props['xc_id']) - stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) - stream_props = { - "name": name, - "url": url, - "logo_url": tvg_logo, - "tvg_id": tvg_id, - "m3u_account": account, - "channel_group_id": int(group_id), - "stream_hash": stream_hash, - "custom_properties": json.dumps(stream), - } + if not streams: + logger.warning(f"No streams found for XC category {group_name} (ID: {props['xc_id']})") + continue - if stream_hash not in stream_hashes: - stream_hashes[stream_hash] = stream_props - except Exception as e: - logger.error(f"Error processing XC category {group_name} (ID: {props['xc_id']}): {str(e)}") - continue + logger.debug(f"Found {len(streams)} streams for category {group_name}") + + for stream in streams: + name = stream["name"] + url = xc_client.get_stream_url(stream["stream_id"]) + tvg_id = stream.get("epg_channel_id", "") + tvg_logo = stream.get("stream_icon", "") + group_title = group_name + + stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) + stream_props = { + "name": name, + "url": url, + "logo_url": tvg_logo, + "tvg_id": tvg_id, + "m3u_account": account, + "channel_group_id": int(group_id), + "stream_hash": stream_hash, + "custom_properties": json.dumps(stream), + } + + if stream_hash not in stream_hashes: + stream_hashes[stream_hash] = stream_props + except Exception as e: + logger.error(f"Error processing XC category {group_name} (ID: {props['xc_id']}): {str(e)}") + continue # Process all found streams existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())} @@ -622,62 +621,63 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False): # Create XCClient with explicit error handling try: - xc_client = XCClient(server_url, account.username, account.password, user_agent_string) - logger.info(f"XCClient instance created successfully") + with XCClient(server_url, account.username, account.password, user_agent_string) as xc_client: + logger.info(f"XCClient instance created successfully") + + # Authenticate with detailed error handling + try: + logger.debug(f"Authenticating with XC server {server_url}") + auth_result = xc_client.authenticate() + logger.debug(f"Authentication response: {auth_result}") + except Exception as e: + error_msg = f"Failed to authenticate with XC server: {str(e)}" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None + + # Get categories with detailed error handling + try: + logger.info(f"Getting live categories from XC server") + xc_categories = xc_client.get_live_categories() + logger.info(f"Found {len(xc_categories)} categories: {xc_categories}") + + # Validate response + if not isinstance(xc_categories, list): + error_msg = f"Unexpected response from XC server: {xc_categories}" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None + + if len(xc_categories) == 0: + logger.warning("No categories found in XC server response") + + for category in xc_categories: + cat_name = category.get("category_name", "Unknown Category") + cat_id = category.get("category_id", "0") + logger.info(f"Adding category: {cat_name} (ID: {cat_id})") + groups[cat_name] = { + "xc_id": cat_id, + } + except Exception as e: + error_msg = f"Failed to get categories from XC server: {str(e)}" + logger.error(error_msg) + account.status = M3UAccount.Status.ERROR + account.last_message = error_msg + account.save(update_fields=['status', 'last_message']) + send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) + release_task_lock('refresh_m3u_account_groups', account_id) + return error_msg, None + except Exception as e: - error_msg = f"Failed to create XCClient: {str(e)}" - logger.error(error_msg) - account.status = M3UAccount.Status.ERROR - account.last_message = error_msg - account.save(update_fields=['status', 'last_message']) - send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) - release_task_lock('refresh_m3u_account_groups', account_id) - return error_msg, None - - # Authenticate with detailed error handling - try: - logger.debug(f"Authenticating with XC server {server_url}") - auth_result = xc_client.authenticate() - logger.debug(f"Authentication response: {auth_result}") - except Exception as e: - error_msg = f"Failed to authenticate with XC server: {str(e)}" - logger.error(error_msg) - account.status = M3UAccount.Status.ERROR - account.last_message = error_msg - account.save(update_fields=['status', 'last_message']) - send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) - release_task_lock('refresh_m3u_account_groups', account_id) - return error_msg, None - - # Get categories with detailed error handling - try: - logger.info(f"Getting live categories from XC server") - xc_categories = xc_client.get_live_categories() - logger.info(f"Found {len(xc_categories)} categories: {xc_categories}") - - # Validate response - if not isinstance(xc_categories, list): - error_msg = f"Unexpected response from XC server: {xc_categories}" - logger.error(error_msg) - account.status = M3UAccount.Status.ERROR - account.last_message = error_msg - account.save(update_fields=['status', 'last_message']) - send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg) - release_task_lock('refresh_m3u_account_groups', account_id) - return error_msg, None - - if len(xc_categories) == 0: - logger.warning("No categories found in XC server response") - - for category in xc_categories: - cat_name = category.get("category_name", "Unknown Category") - cat_id = category.get("category_id", "0") - logger.info(f"Adding category: {cat_name} (ID: {cat_id})") - groups[cat_name] = { - "xc_id": cat_id, - } - except Exception as e: - error_msg = f"Failed to get categories from XC server: {str(e)}" + error_msg = f"Failed to create XC Client: {str(e)}" logger.error(error_msg) account.status = M3UAccount.Status.ERROR account.last_message = error_msg @@ -686,7 +686,7 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False): release_task_lock('refresh_m3u_account_groups', account_id) return error_msg, None except Exception as e: - error_msg = f"Unexpected error in XC processing: {str(e)}" + error_msg = f"Unexpected error occurred in XC Client: {str(e)}" logger.error(error_msg) account.status = M3UAccount.Status.ERROR account.last_message = error_msg diff --git a/core/xtream_codes.py b/core/xtream_codes.py index 846e53d4..d068bacb 100644 --- a/core/xtream_codes.py +++ b/core/xtream_codes.py @@ -17,20 +17,29 @@ class Client: # Fix: Properly handle all possible user_agent input types if user_agent: if isinstance(user_agent, str): - # Direct string user agent user_agent_string = user_agent elif hasattr(user_agent, 'user_agent'): - # UserAgent model object user_agent_string = user_agent.user_agent else: - # Fallback for any other type logger.warning(f"Unexpected user_agent type: {type(user_agent)}, using default") user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)' else: - # No user agent provided user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)' - self.headers = {'User-Agent': user_agent_string} + # Create persistent session + self.session = requests.Session() + self.session.headers.update({'User-Agent': user_agent_string}) + + # Configure connection pooling + adapter = requests.adapters.HTTPAdapter( + pool_connections=1, + pool_maxsize=2, + max_retries=3, + pool_block=False + ) + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + self.server_info = None def _normalize_url(self, url): @@ -53,7 +62,7 @@ class Client: url = f"{self.server_url}/{endpoint}" logger.debug(f"XC API Request: {url} with params: {params}") - response = requests.get(url, params=params, headers=self.headers, timeout=30) + response = self.session.get(url, params=params, timeout=30) response.raise_for_status() # Check if response is empty @@ -186,3 +195,24 @@ class Client: def get_stream_url(self, stream_id): """Get the playback URL for a stream""" return f"{self.server_url}/live/{self.username}/{self.password}/{stream_id}.ts" + + def close(self): + """Close the session and cleanup resources""" + if hasattr(self, 'session') and self.session: + try: + self.session.close() + except Exception as e: + logger.debug(f"Error closing XC session: {e}") + + def __enter__(self): + """Enter the context manager""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the context manager and cleanup resources""" + self.close() + return False # Don't suppress exceptions + + def __del__(self): + """Ensure session is closed when object is destroyed""" + self.close()