Dispatcharr/core/xtream_codes.py
2025-09-09 16:43:57 -05:00

465 lines
17 KiB
Python

import requests
import logging
import traceback
import json
logger = logging.getLogger(__name__)
class Client:
"""Xtream Codes API Client with robust error handling"""
def __init__(self, server_url, username, password, user_agent=None):
self.server_url = self._normalize_url(server_url)
self.username = username
self.password = password
self.user_agent = user_agent
# Fix: Properly handle all possible user_agent input types
if user_agent:
if isinstance(user_agent, str):
user_agent_string = user_agent
elif hasattr(user_agent, 'user_agent'):
user_agent_string = user_agent.user_agent
else:
logger.warning(f"Unexpected user_agent type: {type(user_agent)}, using default")
user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
else:
user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
# Create persistent session
self.session = requests.Session()
self.session.headers.update({'User-Agent': user_agent_string})
# Configure connection pooling
adapter = requests.adapters.HTTPAdapter(
pool_connections=1,
pool_maxsize=2,
max_retries=3,
pool_block=False
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
self.server_info = None
def _normalize_url(self, url):
"""Normalize server URL by removing trailing slashes and paths"""
if not url:
raise ValueError("Server URL cannot be empty")
url = url.rstrip('/')
# Remove any path after domain - we'll construct proper API URLs
# Split by protocol first to preserve it
if '://' in url:
protocol, rest = url.split('://', 1)
domain = rest.split('/', 1)[0]
return f"{protocol}://{domain}"
return url
def _make_request(self, endpoint, params=None):
"""Make request with detailed error handling"""
try:
url = f"{self.server_url}/{endpoint}"
logger.debug(f"XC API Request: {url} with params: {params}")
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
# Check if response is empty
if not response.content:
error_msg = f"XC API returned empty response from {url}"
logger.error(error_msg)
raise ValueError(error_msg)
# Check for common blocking responses before trying to parse JSON
response_text = response.text.strip()
if response_text.lower() in ['blocked', 'forbidden', 'access denied', 'unauthorized']:
error_msg = f"XC API request blocked by server from {url}. Response: {response_text}"
logger.error(error_msg)
logger.error(f"This may indicate IP blocking, User-Agent filtering, or rate limiting")
raise ValueError(error_msg)
try:
data = response.json()
except requests.exceptions.JSONDecodeError as json_err:
error_msg = f"XC API returned invalid JSON from {url}. Response: {response.text[:1000]}"
logger.error(error_msg)
logger.error(f"JSON decode error: {str(json_err)}")
# Check if it looks like an HTML error page
if response_text.startswith('<'):
logger.error("Response appears to be HTML - server may be returning an error page")
raise ValueError(error_msg)
# Check for XC-specific error responses
if isinstance(data, dict) and data.get('user_info') is None and 'error' in data:
error_msg = f"XC API Error: {data.get('error', 'Unknown error')}"
logger.error(error_msg)
raise ValueError(error_msg)
return data
except requests.RequestException as e:
error_msg = f"XC API Request failed: {str(e)}"
logger.error(error_msg)
logger.error(f"Request details: URL={url}, Params={params}")
raise
except ValueError as e:
# This could be from JSON parsing or our explicit raises
logger.error(f"XC API Invalid response: {str(e)}")
raise
except Exception as e:
logger.error(f"XC API Unexpected error: {str(e)}")
logger.error(traceback.format_exc())
raise
def authenticate(self):
"""Authenticate and validate server response"""
try:
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password
}
self.server_info = self._make_request(endpoint, params)
if not self.server_info or not self.server_info.get('user_info'):
error_msg = "Authentication failed: Invalid response from server"
logger.error(f"{error_msg}. Response: {self.server_info}")
raise ValueError(error_msg)
logger.info(f"XC Authentication successful for user {self.username}")
return self.server_info
except Exception as e:
logger.error(f"XC Authentication failed: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_account_info(self):
"""Get account information from the last authentication response"""
if not self.server_info:
raise ValueError("Not authenticated. Call authenticate() first.")
from datetime import datetime
# Extract relevant account information
user_info = self.server_info.get('user_info', {})
server_info = self.server_info.get('server_info', {})
account_info = {
'last_refresh': datetime.utcnow().isoformat() + 'Z', # Explicit UTC with Z suffix
'auth_timestamp': datetime.utcnow().timestamp(),
'user_info': {
'username': user_info.get('username'),
'password': user_info.get('password'),
'message': user_info.get('message'),
'auth': user_info.get('auth'),
'status': user_info.get('status'),
'exp_date': user_info.get('exp_date'),
'is_trial': user_info.get('is_trial'),
'active_cons': user_info.get('active_cons'),
'created_at': user_info.get('created_at'),
'max_connections': user_info.get('max_connections'),
'allowed_output_formats': user_info.get('allowed_output_formats', [])
},
'server_info': {
'url': server_info.get('url'),
'port': server_info.get('port'),
'https_port': server_info.get('https_port'),
'server_protocol': server_info.get('server_protocol'),
'rtmp_port': server_info.get('rtmp_port'),
'timezone': server_info.get('timezone'),
'timestamp_now': server_info.get('timestamp_now'),
'time_now': server_info.get('time_now')
}
}
return account_info
def get_live_categories(self):
"""Get live TV categories"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_live_categories'
}
categories = self._make_request(endpoint, params)
if not isinstance(categories, list):
error_msg = f"Invalid categories response: {categories}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(categories)} live categories")
logger.debug(f"Categories: {json.dumps(categories[:5])}...")
return categories
except Exception as e:
logger.error(f"Failed to get live categories: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_live_category_streams(self, category_id):
"""Get streams for a specific category"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_live_streams',
'category_id': category_id
}
streams = self._make_request(endpoint, params)
if not isinstance(streams, list):
error_msg = f"Invalid streams response for category {category_id}: {streams}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(streams)} streams for category {category_id}")
return streams
except Exception as e:
logger.error(f"Failed to get streams for category {category_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_all_live_streams(self):
"""Get all live streams (no category filter)"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_live_streams'
# No category_id = get all streams
}
streams = self._make_request(endpoint, params)
if not isinstance(streams, list):
error_msg = f"Invalid streams response for all live streams: {streams}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(streams)} total live streams")
return streams
except Exception as e:
logger.error(f"Failed to get all live streams: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_stream_url(self, stream_id):
"""Get the playback URL for a stream"""
return f"{self.server_url}/live/{self.username}/{self.password}/{stream_id}.ts"
def get_episode_stream_url(self, stream_id, container_extension='mp4'):
"""Get the playback URL for an episode stream"""
return f"{self.server_url}/series/{self.username}/{self.password}/{stream_id}.{container_extension}"
def get_vod_stream_url(self, stream_id, container_extension='mp4'):
"""Get the playback URL for a VOD stream"""
return f"{self.server_url}/movie/{self.username}/{self.password}/{stream_id}.{container_extension}"
def get_vod_categories(self):
"""Get VOD categories"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_vod_categories'
}
categories = self._make_request(endpoint, params)
if not isinstance(categories, list):
error_msg = f"Invalid VOD categories response: {categories}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(categories)} VOD categories")
return categories
except Exception as e:
logger.error(f"Failed to get VOD categories: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_vod_streams(self, category_id=None):
"""Get VOD streams for a specific category"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_vod_streams'
}
if category_id:
params['category_id'] = category_id
streams = self._make_request(endpoint, params)
if not isinstance(streams, list):
error_msg = f"Invalid VOD streams response for category {category_id}: {streams}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(streams)} VOD streams for category {category_id}")
return streams
except Exception as e:
logger.error(f"Failed to get VOD streams for category {category_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_vod_info(self, vod_id):
"""Get detailed information for a specific VOD"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_vod_info',
'vod_id': vod_id
}
vod_info = self._make_request(endpoint, params)
if not isinstance(vod_info, dict):
error_msg = f"Invalid VOD info response for vod_id {vod_id}: {vod_info}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved VOD info for vod_id {vod_id}")
return vod_info
except Exception as e:
logger.error(f"Failed to get VOD info for vod_id {vod_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_series_categories(self):
"""Get series categories"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_series_categories'
}
categories = self._make_request(endpoint, params)
if not isinstance(categories, list):
error_msg = f"Invalid series categories response: {categories}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(categories)} series categories")
return categories
except Exception as e:
logger.error(f"Failed to get series categories: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_series(self, category_id=None):
"""Get series for a specific category"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_series'
}
if category_id:
params['category_id'] = category_id
series = self._make_request(endpoint, params)
if not isinstance(series, list):
error_msg = f"Invalid series response for category {category_id}: {series}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved {len(series)} series for category {category_id}")
return series
except Exception as e:
logger.error(f"Failed to get series for category {category_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_series_info(self, series_id):
"""Get detailed information for a specific series including episodes"""
try:
if not self.server_info:
self.authenticate()
endpoint = "player_api.php"
params = {
'username': self.username,
'password': self.password,
'action': 'get_series_info',
'series_id': series_id
}
series_info = self._make_request(endpoint, params)
if not isinstance(series_info, dict):
error_msg = f"Invalid series info response for series_id {series_id}: {series_info}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully retrieved series info for series_id {series_id}")
return series_info
except Exception as e:
logger.error(f"Failed to get series info for series_id {series_id}: {str(e)}")
logger.error(traceback.format_exc())
raise
def close(self):
"""Close the session and cleanup resources"""
if hasattr(self, 'session') and self.session:
try:
self.session.close()
except Exception as e:
logger.debug(f"Error closing XC session: {e}")
def __enter__(self):
"""Enter the context manager"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager and cleanup resources"""
self.close()
return False # Don't suppress exceptions
def __del__(self):
"""Ensure session is closed when object is destroyed"""
self.close()