Dispatcharr/core/xtream_codes.py

392 lines
14 KiB
Python

import requests
import logging
import traceback
import json
logger = logging.getLogger(__name__)
class Client:
"""Xtream Codes API Client with robust error handling"""
def __init__(self, server_url, username, password, user_agent=None):
self.server_url = self._normalize_url(server_url)
self.username = username
self.password = password
self.user_agent = user_agent
# Fix: Properly handle all possible user_agent input types
if user_agent:
if isinstance(user_agent, str):
user_agent_string = user_agent
elif hasattr(user_agent, 'user_agent'):
user_agent_string = user_agent.user_agent
else:
logger.warning(f"Unexpected user_agent type: {type(user_agent)}, using default")
user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
else:
user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
# Create persistent session
self.session = requests.Session()
self.session.headers.update({'User-Agent': user_agent_string})
# Configure connection pooling
adapter = requests.adapters.HTTPAdapter(
pool_connections=1,
pool_maxsize=2,
max_retries=3,
pool_block=False
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
self.server_info = None
def _normalize_url(self, url):
"""Normalize server URL by removing trailing slashes and paths"""
if not url:
raise ValueError("Server URL cannot be empty")
url = url.rstrip('/')
# Remove any path after domain - we'll construct proper API URLs
# Split by protocol first to preserve it
if '://' in url:
protocol, rest = url.split('://', 1)
domain = rest.split('/', 1)[0]
return f"{protocol}://{domain}"
return url
def _make_request(self, endpoint, params=None):
"""Make request with detailed error handling"""
try:
url = f"{self.server_url}/{endpoint}"
logger.debug(f"XC API Request: {url} with params: {params}")
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
# Check if response is empty
if not response.content:
error_msg = f"XC API returned empty response from {url}"
logger.error(error_msg)
raise ValueError(error_msg)
# Check for common blocking responses before trying to parse JSON
response_text = response.text.strip()
if response_text.lower() in ['blocked', 'forbidden', 'access denied', 'unauthorized']:
error_msg = f"XC API request blocked by server from {url}. Response: {response_text}"
logger.error(error_msg)
logger.error(f"This may indicate IP blocking, User-Agent filtering, or rate limiting")
raise ValueError(error_msg)
try:
data = response.json()
except requests.exceptions.JSONDecodeError as json_err:
error_msg = f"XC API returned invalid JSON from {url}. Response: {response.text[:1000]}"
logger.error(error_msg)
logger.error(f"JSON decode error: {str(json_err)}")
# Check if it looks like an HTML error page
if response_text.startswith('<'):
logger.error("Response appears to be HTML - server may be returning an error page")
raise ValueError(error_msg)
# Check for XC-specific error responses
if isinstance(data, dict) and data.get('user_info') is None and 'error' in data:
error_msg = f"XC API Error: {data.get('error', 'Unknown error')}"
logger.error(error_msg)
raise ValueError(error_msg)
return data
except requests.RequestException as e:
error_msg = f"XC API Request failed: {str(e)}"
logger.error(error_msg)
logger.error(f"Request details: URL={url}, Params={params}")
raise
except ValueError as e:
# This could be from JSON parsing or our explicit raises
logger.error(f"XC API Invalid response: {str(e)}")
raise
except Exception as e:
logger.error(f"XC API Unexpected error: {str(e)}")
logger.error(traceback.format_exc())
raise
def authenticate(self):
"""Authenticate and validate server response"""
try:
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password
}
self.server_info = self._make_request(endpoint, params)
if not self.server_info or not self.server_info.get('user_info'):
error_msg = "Authentication failed: Invalid response from server"
logger.error(f"{error_msg}. Response: {self.server_info}")
raise ValueError(error_msg)
logger.info(f"XC Authentication successful for user {self.username}")
return self.server_info
except Exception as e:
logger.error(f"XC Authentication failed: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_live_categories(self):
"""Get live TV categories"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_live_categories'
}
categories = self._make_request(endpoint, params)
if not isinstance(categories, list):
error_msg = f"Invalid categories response: {categories}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(categories)} live categories")
logger.debug(f"Categories: {json.dumps(categories[:5])}...")
return categories
except Exception as e:
logger.error(f"Failed to get live categories: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_live_category_streams(self, category_id):
"""Get streams for a specific category"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_live_streams',
'category_id': category_id
}
streams = self._make_request(endpoint, params)
if not isinstance(streams, list):
error_msg = f"Invalid streams response for category {category_id}: {streams}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(streams)} streams for category {category_id}")
return streams
except Exception as e:
logger.error(f"Failed to get streams for category {category_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_stream_url(self, stream_id):
"""Get the playback URL for a stream"""
return f"{self.server_url}/live/{self.username}/{self.password}/{stream_id}.ts"
def get_vod_categories(self):
"""Get VOD categories"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_vod_categories'
}
categories = self._make_request(endpoint, params)
if not isinstance(categories, list):
error_msg = f"Invalid VOD categories response: {categories}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(categories)} VOD categories")
return categories
except Exception as e:
logger.error(f"Failed to get VOD categories: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_vod_streams(self, category_id=None):
"""Get VOD streams for a specific category"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_vod_streams'
}
if category_id:
params['category_id'] = category_id
streams = self._make_request(endpoint, params)
if not isinstance(streams, list):
error_msg = f"Invalid VOD streams response for category {category_id}: {streams}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(streams)} VOD streams for category {category_id}")
return streams
except Exception as e:
logger.error(f"Failed to get VOD streams for category {category_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_vod_info(self, vod_id):
"""Get detailed information for a specific VOD"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_vod_info',
'vod_id': vod_id
}
vod_info = self._make_request(endpoint, params)
if not isinstance(vod_info, dict):
error_msg = f"Invalid VOD info response for vod_id {vod_id}: {vod_info}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved VOD info for vod_id {vod_id}")
return vod_info
except Exception as e:
logger.error(f"Failed to get VOD info for vod_id {vod_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_series_categories(self):
"""Get series categories"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_series_categories'
}
categories = self._make_request(endpoint, params)
if not isinstance(categories, list):
error_msg = f"Invalid series categories response: {categories}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(categories)} series categories")
return categories
except Exception as e:
logger.error(f"Failed to get series categories: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_series(self, category_id=None):
"""Get series for a specific category"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_series'
}
if category_id:
params['category_id'] = category_id
series = self._make_request(endpoint, params)
if not isinstance(series, list):
error_msg = f"Invalid series response for category {category_id}: {series}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(series)} series for category {category_id}")
return series
except Exception as e:
logger.error(f"Failed to get series for category {category_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_series_info(self, series_id):
"""Get detailed information for a specific series including episodes"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_series_info',
'series_id': series_id
}
series_info = self._make_request(endpoint, params)
if not isinstance(series_info, dict):
error_msg = f"Invalid series info response for series_id {series_id}: {series_info}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved series info for series_id {series_id}")
return series_info
except Exception as e:
logger.error(f"Failed to get series info for series_id {series_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_vod_stream_url(self, vod_id, container_extension="mp4"):
"""Get the playback URL for a VOD"""
return f"{self.server_url}/movie/{self.username}/{self.password}/{vod_id}.{container_extension}"
def close(self):
"""Close the session and cleanup resources"""
if hasattr(self, 'session') and self.session:
try:
self.session.close()
except Exception as e:
logger.debug(f"Error closing XC session: {e}")
def __enter__(self):
"""Enter the context manager"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager and cleanup resources"""
self.close()
return False # Don't suppress exceptions
def __del__(self):
"""Ensure session is closed when object is destroyed"""
self.close()