mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 10:45:27 +00:00
Refactor XC Client usage to improve error handling and resource management with context management. Implement connection pooling for better performance.
This commit is contained in:
parent
65da85991c
commit
fafd93e958
2 changed files with 137 additions and 107 deletions
|
|
@ -285,57 +285,56 @@ def process_xc_category(account_id, batch, groups, hash_keys):
|
|||
stream_hashes = {}
|
||||
|
||||
try:
|
||||
xc_client = XCClient(account.server_url, account.username, account.password, account.get_user_agent())
|
||||
with XCClient(account.server_url, account.username, account.password, account.get_user_agent()) as xc_client:
|
||||
# Log the batch details to help with debugging
|
||||
logger.debug(f"Processing XC batch: {batch}")
|
||||
|
||||
# Log the batch details to help with debugging
|
||||
logger.debug(f"Processing XC batch: {batch}")
|
||||
|
||||
for group_name, props in batch.items():
|
||||
# Check if we have a valid xc_id for this group
|
||||
if 'xc_id' not in props:
|
||||
logger.error(f"Missing xc_id for group {group_name} in batch {batch}")
|
||||
continue
|
||||
|
||||
# Get actual group ID from the mapping
|
||||
group_id = groups.get(group_name)
|
||||
if not group_id:
|
||||
logger.error(f"Group {group_name} not found in enabled groups")
|
||||
continue
|
||||
|
||||
try:
|
||||
logger.debug(f"Fetching streams for XC category: {group_name} (ID: {props['xc_id']})")
|
||||
streams = xc_client.get_live_category_streams(props['xc_id'])
|
||||
|
||||
if not streams:
|
||||
logger.warning(f"No streams found for XC category {group_name} (ID: {props['xc_id']})")
|
||||
for group_name, props in batch.items():
|
||||
# Check if we have a valid xc_id for this group
|
||||
if 'xc_id' not in props:
|
||||
logger.error(f"Missing xc_id for group {group_name} in batch {batch}")
|
||||
continue
|
||||
|
||||
logger.debug(f"Found {len(streams)} streams for category {group_name}")
|
||||
# Get actual group ID from the mapping
|
||||
group_id = groups.get(group_name)
|
||||
if not group_id:
|
||||
logger.error(f"Group {group_name} not found in enabled groups")
|
||||
continue
|
||||
|
||||
for stream in streams:
|
||||
name = stream["name"]
|
||||
url = xc_client.get_stream_url(stream["stream_id"])
|
||||
tvg_id = stream.get("epg_channel_id", "")
|
||||
tvg_logo = stream.get("stream_icon", "")
|
||||
group_title = group_name
|
||||
try:
|
||||
logger.debug(f"Fetching streams for XC category: {group_name} (ID: {props['xc_id']})")
|
||||
streams = xc_client.get_live_category_streams(props['xc_id'])
|
||||
|
||||
stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys)
|
||||
stream_props = {
|
||||
"name": name,
|
||||
"url": url,
|
||||
"logo_url": tvg_logo,
|
||||
"tvg_id": tvg_id,
|
||||
"m3u_account": account,
|
||||
"channel_group_id": int(group_id),
|
||||
"stream_hash": stream_hash,
|
||||
"custom_properties": json.dumps(stream),
|
||||
}
|
||||
if not streams:
|
||||
logger.warning(f"No streams found for XC category {group_name} (ID: {props['xc_id']})")
|
||||
continue
|
||||
|
||||
if stream_hash not in stream_hashes:
|
||||
stream_hashes[stream_hash] = stream_props
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing XC category {group_name} (ID: {props['xc_id']}): {str(e)}")
|
||||
continue
|
||||
logger.debug(f"Found {len(streams)} streams for category {group_name}")
|
||||
|
||||
for stream in streams:
|
||||
name = stream["name"]
|
||||
url = xc_client.get_stream_url(stream["stream_id"])
|
||||
tvg_id = stream.get("epg_channel_id", "")
|
||||
tvg_logo = stream.get("stream_icon", "")
|
||||
group_title = group_name
|
||||
|
||||
stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys)
|
||||
stream_props = {
|
||||
"name": name,
|
||||
"url": url,
|
||||
"logo_url": tvg_logo,
|
||||
"tvg_id": tvg_id,
|
||||
"m3u_account": account,
|
||||
"channel_group_id": int(group_id),
|
||||
"stream_hash": stream_hash,
|
||||
"custom_properties": json.dumps(stream),
|
||||
}
|
||||
|
||||
if stream_hash not in stream_hashes:
|
||||
stream_hashes[stream_hash] = stream_props
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing XC category {group_name} (ID: {props['xc_id']}): {str(e)}")
|
||||
continue
|
||||
|
||||
# Process all found streams
|
||||
existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())}
|
||||
|
|
@ -622,62 +621,63 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False):
|
|||
|
||||
# Create XCClient with explicit error handling
|
||||
try:
|
||||
xc_client = XCClient(server_url, account.username, account.password, user_agent_string)
|
||||
logger.info(f"XCClient instance created successfully")
|
||||
with XCClient(server_url, account.username, account.password, user_agent_string) as xc_client:
|
||||
logger.info(f"XCClient instance created successfully")
|
||||
|
||||
# Authenticate with detailed error handling
|
||||
try:
|
||||
logger.debug(f"Authenticating with XC server {server_url}")
|
||||
auth_result = xc_client.authenticate()
|
||||
logger.debug(f"Authentication response: {auth_result}")
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to authenticate with XC server: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = error_msg
|
||||
account.save(update_fields=['status', 'last_message'])
|
||||
send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg)
|
||||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
return error_msg, None
|
||||
|
||||
# Get categories with detailed error handling
|
||||
try:
|
||||
logger.info(f"Getting live categories from XC server")
|
||||
xc_categories = xc_client.get_live_categories()
|
||||
logger.info(f"Found {len(xc_categories)} categories: {xc_categories}")
|
||||
|
||||
# Validate response
|
||||
if not isinstance(xc_categories, list):
|
||||
error_msg = f"Unexpected response from XC server: {xc_categories}"
|
||||
logger.error(error_msg)
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = error_msg
|
||||
account.save(update_fields=['status', 'last_message'])
|
||||
send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg)
|
||||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
return error_msg, None
|
||||
|
||||
if len(xc_categories) == 0:
|
||||
logger.warning("No categories found in XC server response")
|
||||
|
||||
for category in xc_categories:
|
||||
cat_name = category.get("category_name", "Unknown Category")
|
||||
cat_id = category.get("category_id", "0")
|
||||
logger.info(f"Adding category: {cat_name} (ID: {cat_id})")
|
||||
groups[cat_name] = {
|
||||
"xc_id": cat_id,
|
||||
}
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to get categories from XC server: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = error_msg
|
||||
account.save(update_fields=['status', 'last_message'])
|
||||
send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg)
|
||||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
return error_msg, None
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to create XCClient: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = error_msg
|
||||
account.save(update_fields=['status', 'last_message'])
|
||||
send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg)
|
||||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
return error_msg, None
|
||||
|
||||
# Authenticate with detailed error handling
|
||||
try:
|
||||
logger.debug(f"Authenticating with XC server {server_url}")
|
||||
auth_result = xc_client.authenticate()
|
||||
logger.debug(f"Authentication response: {auth_result}")
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to authenticate with XC server: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = error_msg
|
||||
account.save(update_fields=['status', 'last_message'])
|
||||
send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg)
|
||||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
return error_msg, None
|
||||
|
||||
# Get categories with detailed error handling
|
||||
try:
|
||||
logger.info(f"Getting live categories from XC server")
|
||||
xc_categories = xc_client.get_live_categories()
|
||||
logger.info(f"Found {len(xc_categories)} categories: {xc_categories}")
|
||||
|
||||
# Validate response
|
||||
if not isinstance(xc_categories, list):
|
||||
error_msg = f"Unexpected response from XC server: {xc_categories}"
|
||||
logger.error(error_msg)
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = error_msg
|
||||
account.save(update_fields=['status', 'last_message'])
|
||||
send_m3u_update(account_id, "processing_groups", 100, status="error", error=error_msg)
|
||||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
return error_msg, None
|
||||
|
||||
if len(xc_categories) == 0:
|
||||
logger.warning("No categories found in XC server response")
|
||||
|
||||
for category in xc_categories:
|
||||
cat_name = category.get("category_name", "Unknown Category")
|
||||
cat_id = category.get("category_id", "0")
|
||||
logger.info(f"Adding category: {cat_name} (ID: {cat_id})")
|
||||
groups[cat_name] = {
|
||||
"xc_id": cat_id,
|
||||
}
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to get categories from XC server: {str(e)}"
|
||||
error_msg = f"Failed to create XC Client: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = error_msg
|
||||
|
|
@ -686,7 +686,7 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False):
|
|||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
return error_msg, None
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error in XC processing: {str(e)}"
|
||||
error_msg = f"Unexpected error occurred in XC Client: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = error_msg
|
||||
|
|
|
|||
|
|
@ -17,20 +17,29 @@ class Client:
|
|||
# Fix: Properly handle all possible user_agent input types
|
||||
if user_agent:
|
||||
if isinstance(user_agent, str):
|
||||
# Direct string user agent
|
||||
user_agent_string = user_agent
|
||||
elif hasattr(user_agent, 'user_agent'):
|
||||
# UserAgent model object
|
||||
user_agent_string = user_agent.user_agent
|
||||
else:
|
||||
# Fallback for any other type
|
||||
logger.warning(f"Unexpected user_agent type: {type(user_agent)}, using default")
|
||||
user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
|
||||
else:
|
||||
# No user agent provided
|
||||
user_agent_string = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
|
||||
|
||||
self.headers = {'User-Agent': user_agent_string}
|
||||
# Create persistent session
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({'User-Agent': user_agent_string})
|
||||
|
||||
# Configure connection pooling
|
||||
adapter = requests.adapters.HTTPAdapter(
|
||||
pool_connections=1,
|
||||
pool_maxsize=2,
|
||||
max_retries=3,
|
||||
pool_block=False
|
||||
)
|
||||
self.session.mount('http://', adapter)
|
||||
self.session.mount('https://', adapter)
|
||||
|
||||
self.server_info = None
|
||||
|
||||
def _normalize_url(self, url):
|
||||
|
|
@ -53,7 +62,7 @@ class Client:
|
|||
url = f"{self.server_url}/{endpoint}"
|
||||
logger.debug(f"XC API Request: {url} with params: {params}")
|
||||
|
||||
response = requests.get(url, params=params, headers=self.headers, timeout=30)
|
||||
response = self.session.get(url, params=params, timeout=30)
|
||||
response.raise_for_status()
|
||||
|
||||
# Check if response is empty
|
||||
|
|
@ -186,3 +195,24 @@ class Client:
|
|||
def get_stream_url(self, stream_id):
|
||||
"""Get the playback URL for a stream"""
|
||||
return f"{self.server_url}/live/{self.username}/{self.password}/{stream_id}.ts"
|
||||
|
||||
def close(self):
|
||||
"""Close the session and cleanup resources"""
|
||||
if hasattr(self, 'session') and self.session:
|
||||
try:
|
||||
self.session.close()
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing XC session: {e}")
|
||||
|
||||
def __enter__(self):
|
||||
"""Enter the context manager"""
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Exit the context manager and cleanup resources"""
|
||||
self.close()
|
||||
return False # Don't suppress exceptions
|
||||
|
||||
def __del__(self):
|
||||
"""Ensure session is closed when object is destroyed"""
|
||||
self.close()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue