From efaa7f71951902dcaca617a8bbbe595b4f20a47d Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 21 Mar 2025 10:55:13 -0500 Subject: [PATCH 1/4] Singular redis-client. --- apps/proxy/ts_proxy/channel_status.py | 141 ++++---- apps/proxy/ts_proxy/client_manager.py | 23 +- apps/proxy/ts_proxy/server.py | 478 +++++++++++++++++--------- core/utils.py | 132 +++++-- docker/entrypoint.sh | 10 +- docker/uwsgi.dev.ini | 7 +- docker/uwsgi.ini | 7 +- scripts/wait_for_redis.py | 60 ++++ 8 files changed, 603 insertions(+), 255 deletions(-) create mode 100644 scripts/wait_for_redis.py diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py index cc8a1e5b..39423170 100644 --- a/apps/proxy/ts_proxy/channel_status.py +++ b/apps/proxy/ts_proxy/channel_status.py @@ -4,6 +4,7 @@ import re from . import proxy_server from .redis_keys import RedisKeys from .constants import TS_PACKET_SIZE +from redis.exceptions import ConnectionError, TimeoutError logger = logging.getLogger("ts_proxy") @@ -172,76 +173,98 @@ class ChannelStatus: return info - # Function for basic channel info (used for all channels summary) - def get_basic_channel_info(channel_id): - # Get channel metadata - metadata_key = RedisKeys.channel_metadata(channel_id) - metadata = proxy_server.redis_client.hgetall(metadata_key) - - if not metadata: + @staticmethod + def _execute_redis_command(command_func): + """Execute Redis command with error handling""" + if not proxy_server.redis_client: return None - # Basic channel info only - omit diagnostics and details - buffer_index_key = RedisKeys.buffer_index(channel_id) - buffer_index_value = proxy_server.redis_client.get(buffer_index_key) + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error in ChannelStatus: {e}") + return None + except Exception as e: + logger.error(f"Redis command error in ChannelStatus: {e}") + return None - # Count clients (using efficient count method) - client_set_key = RedisKeys.clients(channel_id) - client_count = proxy_server.redis_client.scard(client_set_key) or 0 + @staticmethod + def get_basic_channel_info(channel_id): + """Get basic channel information with Redis error handling""" + try: + # Use _execute_redis_command for Redis operations + metadata_key = RedisKeys.channel_metadata(channel_id) + metadata = ChannelStatus._execute_redis_command( + lambda: proxy_server.redis_client.hgetall(metadata_key) + ) - # Calculate uptime - created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) - uptime = time.time() - created_at if created_at > 0 else 0 + if not metadata: + return None - # Simplified info - info = { - 'channel_id': channel_id, - 'state': metadata.get(b'state', b'unknown').decode('utf-8'), - 'url': metadata.get(b'url', b'').decode('utf-8'), - 'profile': metadata.get(b'profile', b'unknown').decode('utf-8'), - 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), - 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, - 'client_count': client_count, - 'uptime': uptime - } + # Basic channel info only - omit diagnostics and details + buffer_index_key = RedisKeys.buffer_index(channel_id) + buffer_index_value = proxy_server.redis_client.get(buffer_index_key) - # Quick health check if available locally - if channel_id in proxy_server.stream_managers: - manager = proxy_server.stream_managers[channel_id] - info['healthy'] = manager.healthy + # Count clients (using efficient count method) + client_set_key = RedisKeys.clients(channel_id) + client_count = proxy_server.redis_client.scard(client_set_key) or 0 - # Get concise client information - clients = [] - client_ids = proxy_server.redis_client.smembers(client_set_key) + # Calculate uptime + created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) + uptime = time.time() - created_at if created_at > 0 else 0 - # Process only if we have clients and keep it limited - if client_ids: - # Get up to 10 clients for the basic view - for client_id in list(client_ids)[:10]: - client_id_str = client_id.decode('utf-8') - client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" + # Simplified info + info = { + 'channel_id': channel_id, + 'state': metadata.get(b'state', b'unknown').decode('utf-8'), + 'url': metadata.get(b'url', b'').decode('utf-8'), + 'profile': metadata.get(b'profile', b'unknown').decode('utf-8'), + 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), + 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + 'client_count': client_count, + 'uptime': uptime + } - # Efficient way - just retrieve the essentials - client_info = { - 'client_id': client_id_str, - 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'), - 'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'), - } + # Quick health check if available locally + if channel_id in proxy_server.stream_managers: + manager = proxy_server.stream_managers[channel_id] + info['healthy'] = manager.healthy - if client_info['user_agent']: - client_info['user_agent'] = client_info['user_agent'].decode('utf-8') - else: - client_info['user_agent'] = 'unknown' + # Get concise client information + clients = [] + client_ids = proxy_server.redis_client.smembers(client_set_key) - # Just get connected_at for client age - connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') - if connected_at_bytes: - connected_at = float(connected_at_bytes.decode('utf-8')) - client_info['connected_since'] = time.time() - connected_at + # Process only if we have clients and keep it limited + if client_ids: + # Get up to 10 clients for the basic view + for client_id in list(client_ids)[:10]: + client_id_str = client_id.decode('utf-8') + client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" - clients.append(client_info) + # Efficient way - just retrieve the essentials + client_info = { + 'client_id': client_id_str, + 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'), + 'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'), + } - # Add clients to info - info['clients'] = clients + if client_info['user_agent']: + client_info['user_agent'] = client_info['user_agent'].decode('utf-8') + else: + client_info['user_agent'] = 'unknown' - return info + # Just get connected_at for client age + connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') + if connected_at_bytes: + connected_at = float(connected_at_bytes.decode('utf-8')) + client_info['connected_since'] = time.time() - connected_at + + clients.append(client_info) + + # Add clients to info + info['clients'] = clients + + return info + except Exception as e: + logger.error(f"Error getting channel info: {e}") + return None diff --git a/apps/proxy/ts_proxy/client_manager.py b/apps/proxy/ts_proxy/client_manager.py index ed5868a9..d19ce719 100644 --- a/apps/proxy/ts_proxy/client_manager.py +++ b/apps/proxy/ts_proxy/client_manager.py @@ -6,6 +6,7 @@ import time import json from typing import Set, Optional from apps.proxy.config import TSConfig as Config +from redis.exceptions import ConnectionError, TimeoutError from .constants import EventType from .config_helper import ConfigHelper from .redis_keys import RedisKeys @@ -120,6 +121,20 @@ class ClientManager: thread.start() logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") + def _execute_redis_command(self, command_func): + """Execute Redis command with error handling""" + if not self.redis_client: + return None + + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error in ClientManager: {e}") + return None + except Exception as e: + logger.error(f"Redis command error in ClientManager: {e}") + return None + def _notify_owner_of_activity(self): """Notify channel owner that clients are active on this worker""" if not self.redis_client or not self.clients: @@ -130,11 +145,15 @@ class ClientManager: # STANDARDIZED KEY: Worker info under channel namespace worker_key = f"ts_proxy:channel:{self.channel_id}:worker:{worker_id}" - self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients))) + self._execute_redis_command( + lambda: self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients))) + ) # STANDARDIZED KEY: Activity timestamp under channel namespace activity_key = f"ts_proxy:channel:{self.channel_id}:activity" - self.redis_client.setex(activity_key, self.client_ttl, str(time.time())) + self._execute_redis_command( + lambda: self.redis_client.setex(activity_key, self.client_ttl, str(time.time())) + ) except Exception as e: logger.error(f"Error notifying owner of client activity: {e}") diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index cfe5b4ce..664b0273 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -18,6 +18,8 @@ import json from typing import Dict, Optional, Set from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel +from core.utils import redis_client as global_redis_client # Import the global Redis client +from redis.exceptions import ConnectionError, TimeoutError from .stream_manager import StreamManager from .stream_buffer import StreamBuffer from .client_manager import ClientManager @@ -43,19 +45,25 @@ class ProxyServer: hostname = socket.gethostname() self.worker_id = f"{hostname}:{pid}" - # Connect to Redis + # Connect to Redis - try using global client first self.redis_client = None - try: - import redis - from django.conf import settings + self.redis_connection_attempts = 0 + self.redis_max_retries = 3 + self.redis_retry_interval = 5 # seconds + + try: + # First try to use the global client from core.utils + if global_redis_client is not None: + self.redis_client = global_redis_client + logger.info(f"Using global Redis client") + logger.info(f"Worker ID: {self.worker_id}") + else: + # Fall back to direct connection with retry + self._setup_redis_connection() - redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0') - self.redis_client = redis.from_url(redis_url) - logger.info(f"Connected to Redis at {redis_url}") - logger.info(f"Worker ID: {self.worker_id}") except Exception as e: + logger.error(f"Failed to initialize Redis: {e}") self.redis_client = None - logger.error(f"Failed to connect to Redis: {e}") # Start cleanup thread self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60) @@ -64,179 +72,302 @@ class ProxyServer: # Start event listener for Redis pubsub messages self._start_event_listener() + def _setup_redis_connection(self): + """Setup Redis connection with retry logic""" + import redis + from django.conf import settings + + while self.redis_connection_attempts < self.redis_max_retries: + try: + logger.info(f"Attempting to connect to Redis ({self.redis_connection_attempts+1}/{self.redis_max_retries})") + + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + + # Create Redis client with reasonable timeouts + self.redis_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=5, + socket_connect_timeout=5, + retry_on_timeout=True, + health_check_interval=30 + ) + + # Test connection + self.redis_client.ping() + logger.info(f"Successfully connected to Redis at {redis_host}:{redis_port}/{redis_db}") + logger.info(f"Worker ID: {self.worker_id}") + break + + except (ConnectionError, TimeoutError) as e: + self.redis_connection_attempts += 1 + if self.redis_connection_attempts >= self.redis_max_retries: + logger.error(f"Failed to connect to Redis after {self.redis_max_retries} attempts: {e}") + self.redis_client = None + else: + # Exponential backoff with a maximum of 30 seconds + retry_delay = min(self.redis_retry_interval * (2 ** (self.redis_connection_attempts - 1)), 30) + logger.warning(f"Redis connection failed. Retrying in {retry_delay}s... ({self.redis_connection_attempts}/{self.redis_max_retries})") + time.sleep(retry_delay) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis: {e}", exc_info=True) + self.redis_client = None + break + + def _execute_redis_command(self, command_func, *args, **kwargs): + """Execute Redis command with error handling and reconnection logic""" + if not self.redis_client: + return None + + try: + return command_func(*args, **kwargs) + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection lost: {e}. Attempting to reconnect...") + try: + # Try to reconnect + self.redis_connection_attempts = 0 + self._setup_redis_connection() + if self.redis_client: + # Retry the command once + return command_func(*args, **kwargs) + except Exception as reconnect_error: + logger.error(f"Failed to reconnect to Redis: {reconnect_error}") + return None + except Exception as e: + logger.error(f"Redis command error: {e}") + return None + def _start_event_listener(self): """Listen for events from other workers""" if not self.redis_client: return def event_listener(): - try: - pubsub = self.redis_client.pubsub() - pubsub.psubscribe("ts_proxy:events:*") + retry_count = 0 + max_retries = 10 + base_retry_delay = 1 # Start with 1 second delay + max_retry_delay = 30 # Cap at 30 seconds - logger.info(f"Started Redis event listener for client activity") + while True: + try: + # Create a dedicated Redis client for PubSub with longer timeouts + # This avoids affecting the main Redis client operations + from django.conf import settings + import redis - for message in pubsub.listen(): - if message["type"] != "pmessage": - continue + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - try: - channel = message["channel"].decode("utf-8") - data = json.loads(message["data"].decode("utf-8")) + # Create a dedicated client with generous timeouts for PubSub connections + pubsub_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=60, # Much longer timeout for PubSub operations + socket_connect_timeout=10, + socket_keepalive=True, # Enable TCP keepalive + health_check_interval=30 + ) - event_type = data.get("event") - channel_id = data.get("channel_id") + # Test connection before subscribing + pubsub_client.ping() - if channel_id and event_type: - # For owner, update client status immediately - if self.am_i_owner(channel_id): - if event_type == EventType.CLIENT_CONNECTED: - logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}") - # Reset any disconnect timer - disconnect_key = RedisKeys.last_client_disconnect(channel_id) - self.redis_client.delete(disconnect_key) + # Create a new pubsub instance from the dedicated client + pubsub = pubsub_client.pubsub() + pubsub.psubscribe("ts_proxy:events:*") - elif event_type == EventType.CLIENT_DISCONNECTED: - logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}") - # Check if any clients remain - if channel_id in self.client_managers: - # VERIFY REDIS CLIENT COUNT DIRECTLY - client_set_key = RedisKeys.clients(channel_id) - total = self.redis_client.scard(client_set_key) or 0 + logger.info(f"Started Redis event listener for client activity") - if total == 0: - logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}") - # Set the disconnect timer for other workers to see - disconnect_key = RedisKeys.last_client_disconnect(channel_id) - self.redis_client.setex(disconnect_key, 60, str(time.time())) + # Reset retry count on successful connection + retry_count = 0 - # Get configured shutdown delay or default - shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0) + for message in pubsub.listen(): + if message["type"] != "pmessage": + continue - if shutdown_delay > 0: - logger.info(f"Waiting {shutdown_delay}s before stopping channel...") - time.sleep(shutdown_delay) + try: + channel = message["channel"].decode("utf-8") + data = json.loads(message["data"].decode("utf-8")) - # Re-check client count before stopping - total = self.redis_client.scard(client_set_key) or 0 - if total > 0: - logger.info(f"New clients connected during shutdown delay - aborting shutdown") - self.redis_client.delete(disconnect_key) - return + event_type = data.get("event") + channel_id = data.get("channel_id") - # Stop the channel directly + if channel_id and event_type: + # For owner, update client status immediately + if self.am_i_owner(channel_id): + if event_type == EventType.CLIENT_CONNECTED: + logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}") + # Reset any disconnect timer + disconnect_key = RedisKeys.last_client_disconnect(channel_id) + self.redis_client.delete(disconnect_key) + + elif event_type == EventType.CLIENT_DISCONNECTED: + logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}") + # Check if any clients remain + if channel_id in self.client_managers: + # VERIFY REDIS CLIENT COUNT DIRECTLY + client_set_key = RedisKeys.clients(channel_id) + total = self.redis_client.scard(client_set_key) or 0 + + if total == 0: + logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}") + # Set the disconnect timer for other workers to see + disconnect_key = RedisKeys.last_client_disconnect(channel_id) + self.redis_client.setex(disconnect_key, 60, str(time.time())) + + # Get configured shutdown delay or default + shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0) + + if shutdown_delay > 0: + logger.info(f"Waiting {shutdown_delay}s before stopping channel...") + time.sleep(shutdown_delay) + + # Re-check client count before stopping + total = self.redis_client.scard(client_set_key) or 0 + if total > 0: + logger.info(f"New clients connected during shutdown delay - aborting shutdown") + self.redis_client.delete(disconnect_key) + return + + # Stop the channel directly + self.stop_channel(channel_id) + + + elif event_type == EventType.STREAM_SWITCH: + logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}") + # Handle stream switch request + new_url = data.get("url") + user_agent = data.get("user_agent") + + if new_url and channel_id in self.stream_managers: + # Update metadata in Redis + if self.redis_client: + metadata_key = RedisKeys.channel_metadata(channel_id) + self.redis_client.hset(metadata_key, "url", new_url) + if user_agent: + self.redis_client.hset(metadata_key, "user_agent", user_agent) + + # Set switch status + status_key = RedisKeys.switch_status(channel_id) + self.redis_client.set(status_key, "switching") + + # Perform the stream switch + stream_manager = self.stream_managers[channel_id] + success = stream_manager.update_url(new_url) + + if success: + logger.info(f"Stream switch initiated for channel {channel_id}") + + # Publish confirmation + switch_result = { + "event": EventType.STREAM_SWITCHED, # Use constant instead of string + "channel_id": channel_id, + "success": True, + "url": new_url, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(switch_result) + ) + + # Update status + if self.redis_client: + self.redis_client.set(status_key, "switched") + else: + logger.error(f"Failed to switch stream for channel {channel_id}") + + # Publish failure + switch_result = { + "event": EventType.STREAM_SWITCHED, + "channel_id": channel_id, + "success": False, + "url": new_url, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(switch_result) + ) + elif event_type == EventType.CHANNEL_STOP: + logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}") + # First mark channel as stopping in Redis + if self.redis_client: + # Set stopping state in metadata + metadata_key = RedisKeys.channel_metadata(channel_id) + if self.redis_client.exists(metadata_key): + self.redis_client.hset(metadata_key, mapping={ + "state": ChannelState.STOPPING, + "state_changed_at": str(time.time()) + }) + + # If we have local resources for this channel, clean them up + if channel_id in self.stream_buffers or channel_id in self.client_managers: + # Use existing stop_channel method + logger.info(f"Stopping local resources for channel {channel_id}") self.stop_channel(channel_id) + # Acknowledge stop by publishing a response + stop_response = { + "event": EventType.CHANNEL_STOPPED, + "channel_id": channel_id, + "worker_id": self.worker_id, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(stop_response) + ) + elif event_type == EventType.CLIENT_STOP: + client_id = data.get("client_id") + if client_id and channel_id: + logger.info(f"Received request to stop client {client_id} on channel {channel_id}") - elif event_type == EventType.STREAM_SWITCH: - logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}") - # Handle stream switch request - new_url = data.get("url") - user_agent = data.get("user_agent") + # Both remove from client manager AND set a key for the generator to detect + if channel_id in self.client_managers: + client_manager = self.client_managers[channel_id] + if client_id in client_manager.clients: + client_manager.remove_client(client_id) + logger.info(f"Removed client {client_id} from client manager") - if new_url and channel_id in self.stream_managers: - # Update metadata in Redis - if self.redis_client: - metadata_key = RedisKeys.channel_metadata(channel_id) - self.redis_client.hset(metadata_key, "url", new_url) - if user_agent: - self.redis_client.hset(metadata_key, "user_agent", user_agent) - - # Set switch status - status_key = RedisKeys.switch_status(channel_id) - self.redis_client.set(status_key, "switching") - - # Perform the stream switch - stream_manager = self.stream_managers[channel_id] - success = stream_manager.update_url(new_url) - - if success: - logger.info(f"Stream switch initiated for channel {channel_id}") - - # Publish confirmation - switch_result = { - "event": EventType.STREAM_SWITCHED, # Use constant instead of string - "channel_id": channel_id, - "success": True, - "url": new_url, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(switch_result) - ) - - # Update status + # Set a Redis key for the generator to detect if self.redis_client: - self.redis_client.set(status_key, "switched") - else: - logger.error(f"Failed to switch stream for channel {channel_id}") + stop_key = RedisKeys.client_stop(channel_id, client_id) + self.redis_client.setex(stop_key, 30, "true") # 30 second TTL + logger.info(f"Set stop key for client {client_id}") + except Exception as e: + logger.error(f"Error processing event message: {e}") - # Publish failure - switch_result = { - "event": EventType.STREAM_SWITCHED, - "channel_id": channel_id, - "success": False, - "url": new_url, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(switch_result) - ) - elif event_type == EventType.CHANNEL_STOP: - logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}") - # First mark channel as stopping in Redis - if self.redis_client: - # Set stopping state in metadata - metadata_key = RedisKeys.channel_metadata(channel_id) - if self.redis_client.exists(metadata_key): - self.redis_client.hset(metadata_key, mapping={ - "state": ChannelState.STOPPING, - "state_changed_at": str(time.time()) - }) + except (ConnectionError, TimeoutError) as e: + # Calculate exponential backoff with jitter + retry_count += 1 + delay = min(base_retry_delay * (2 ** (retry_count - 1)), max_retry_delay) + # Add some randomness to prevent thundering herd + jitter = random.uniform(0, 0.5 * delay) + final_delay = delay + jitter - # If we have local resources for this channel, clean them up - if channel_id in self.stream_buffers or channel_id in self.client_managers: - # Use existing stop_channel method - logger.info(f"Stopping local resources for channel {channel_id}") - self.stop_channel(channel_id) + logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})") + time.sleep(final_delay) - # Acknowledge stop by publishing a response - stop_response = { - "event": EventType.CHANNEL_STOPPED, - "channel_id": channel_id, - "worker_id": self.worker_id, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(stop_response) - ) - elif event_type == EventType.CLIENT_STOP: - client_id = data.get("client_id") - if client_id and channel_id: - logger.info(f"Received request to stop client {client_id} on channel {channel_id}") + # Try to clean up the old connection + try: + if 'pubsub' in locals(): + pubsub.close() + if 'pubsub_client' in locals(): + pubsub_client.close() + except: + pass - # Both remove from client manager AND set a key for the generator to detect - if channel_id in self.client_managers: - client_manager = self.client_managers[channel_id] - if client_id in client_manager.clients: - client_manager.remove_client(client_id) - logger.info(f"Removed client {client_id} from client manager") - - # Set a Redis key for the generator to detect - if self.redis_client: - stop_key = RedisKeys.client_stop(channel_id, client_id) - self.redis_client.setex(stop_key, 30, "true") # 30 second TTL - logger.info(f"Set stop key for client {client_id}") - except Exception as e: - logger.error(f"Error processing event message: {e}") - except Exception as e: - logger.error(f"Error in event listener: {e}") - time.sleep(5) # Wait before reconnecting - # Try to restart the listener - self._start_event_listener() + except Exception as e: + logger.error(f"Error in event listener: {e}") + # Add a short delay to prevent rapid retries on persistent errors + time.sleep(5) thread = threading.Thread(target=event_listener, daemon=True) thread.name = "redis-event-listener" @@ -249,10 +380,9 @@ class ProxyServer: try: lock_key = RedisKeys.channel_owner(channel_id) - owner = self.redis_client.get(lock_key) - if owner: - return owner.decode('utf-8') - return None + return self._execute_redis_command( + lambda: self.redis_client.get(lock_key).decode('utf-8') if self.redis_client.get(lock_key) else None + ) except Exception as e: logger.error(f"Error getting channel owner: {e}") return None @@ -271,20 +401,32 @@ class ProxyServer: # Create a lock key with proper namespace lock_key = RedisKeys.channel_owner(channel_id) - # Use Redis SETNX for atomic locking - only succeeds if the key doesn't exist - acquired = self.redis_client.setnx(lock_key, self.worker_id) + # Use Redis SETNX for atomic locking with error handling + acquired = self._execute_redis_command( + lambda: self.redis_client.setnx(lock_key, self.worker_id) + ) + + if acquired is None: # Redis command failed + logger.warning(f"Redis command failed during ownership acquisition - assuming ownership") + return True # If acquired, set expiry to prevent orphaned locks if acquired: - self.redis_client.expire(lock_key, ttl) + self._execute_redis_command( + lambda: self.redis_client.expire(lock_key, ttl) + ) logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}") return True # If not acquired, check if we already own it (might be a retry) - current_owner = self.redis_client.get(lock_key) + current_owner = self._execute_redis_command( + lambda: self.redis_client.get(lock_key) + ) if current_owner and current_owner.decode('utf-8') == self.worker_id: # Refresh TTL - self.redis_client.expire(lock_key, ttl) + self._execute_redis_command( + lambda: self.redis_client.expire(lock_key, ttl) + ) logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}") return True @@ -689,7 +831,7 @@ class ProxyServer: return True except Exception as e: - logger.error(f"Error stopping channel {channel_id}: {e}", exc_info=True) + logger.error(f"Error stopping channel {channel_id}: {e}") return False def check_inactive_channels(self): @@ -723,7 +865,9 @@ class ProxyServer: # Send worker heartbeat first if self.redis_client: worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat" - self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) + self._execute_redis_command( + lambda: self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) + ) # Refresh channel registry self.refresh_channel_registry() diff --git a/core/utils.py b/core/utils.py index e9b5f6d2..06a2f25e 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,28 +1,118 @@ import redis import logging +import time +import os from django.conf import settings +from redis.exceptions import ConnectionError, TimeoutError logger = logging.getLogger(__name__) -def get_redis_client(): - """Get Redis client with connection validation""" - try: - # Create Redis client - client = redis.Redis( - host=settings.REDIS_HOST, - port=getattr(settings, 'REDIS_PORT', 6379), - db=settings.REDIS_DB, - socket_timeout=5, - socket_connect_timeout=5 - ) - - # Validate connection with ping - client.ping() - logger.info(f"Connected to Redis at {settings.REDIS_HOST}:6379/{settings.REDIS_DB}") - return client - except Exception as e: - logger.error(f"Failed to connect to Redis: {e}") - return None +def get_redis_client(max_retries=5, retry_interval=1): + """Get Redis client with connection validation and retry logic""" + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) -# Initialize the global client -redis_client = get_redis_client() \ No newline at end of file + # Create Redis client with better defaults + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=5, + socket_connect_timeout=5, + retry_on_timeout=True, + health_check_interval=30 + ) + + # Validate connection with ping + client.ping() + logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") + return client + + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis: {e}") + return None + +def get_redis_pubsub_client(max_retries=5, retry_interval=3): + """Get Redis client optimized for PubSub operations with longer timeouts""" + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + + # Create Redis client with PubSub-optimized settings + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=60, # Longer timeout for blocking operations + socket_connect_timeout=5, + socket_keepalive=True, # Enable TCP keepalive + health_check_interval=30, + retry_on_timeout=True + ) + + # Validate connection with ping + client.ping() + logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}") + return client + + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis for PubSub: {e}") + return None + +def execute_redis_command(redis_client, command_func, default_return=None): + """ + Execute a Redis command with proper error handling + + Args: + redis_client: The Redis client instance + command_func: Lambda function containing the Redis command to execute + default_return: Value to return if command fails + + Returns: + Command result or default_return on failure + """ + if redis_client is None: + return default_return + + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error: {e}") + return default_return + except Exception as e: + logger.error(f"Redis command error: {e}") + return default_return + +# Initialize the global client with retry logic +redis_client = get_redis_client(max_retries=10, retry_interval=1) \ No newline at end of file diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 2898ed2f..d04edcb0 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -85,10 +85,6 @@ else pids+=("$nginx_pid") fi -cd /app -python manage.py migrate --noinput -python manage.py collectstatic --noinput - uwsgi_file="/app/docker/uwsgi.ini" if [ "$DISPATCHARR_ENV" = "dev" ]; then uwsgi_file="/app/docker/uwsgi.dev.ini" @@ -100,6 +96,12 @@ uwsgi_pid=$(pgrep uwsgi | sort | head -n1) echo "✅ uwsgi started with PID $uwsgi_pid" pids+=("$uwsgi_pid") + + +cd /app +python manage.py migrate --noinput +python manage.py collectstatic --noinput + # Wait for at least one process to exit and log the process that exited first if [ ${#pids[@]} -gt 0 ]; then echo "⏳ Waiting for processes to exit..." diff --git a/docker/uwsgi.dev.ini b/docker/uwsgi.dev.ini index 93ef9fa0..5b23b183 100644 --- a/docker/uwsgi.dev.ini +++ b/docker/uwsgi.dev.ini @@ -2,9 +2,14 @@ ; exec-before = python manage.py collectstatic --noinput ; exec-before = python manage.py migrate --noinput +; First run Redis availability check script once +exec-pre = python /app/scripts/wait_for_redis.py + +; Start Redis first +attach-daemon = redis-server +; Then start other services attach-daemon = celery -A dispatcharr worker -l info attach-daemon = celery -A dispatcharr beat -l info -attach-daemon = redis-server attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application attach-daemon = cd /app/frontend && npm run dev diff --git a/docker/uwsgi.ini b/docker/uwsgi.ini index adb5de33..1b1de50a 100644 --- a/docker/uwsgi.ini +++ b/docker/uwsgi.ini @@ -2,9 +2,14 @@ ; exec-before = python manage.py collectstatic --noinput ; exec-before = python manage.py migrate --noinput +; First run Redis availability check script once +exec-pre = python /app/scripts/wait_for_redis.py + +; Start Redis first +attach-daemon = redis-server +; Then start other services attach-daemon = celery -A dispatcharr worker -l error attach-daemon = celery -A dispatcharr beat -l error -attach-daemon = redis-server attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application # Core settings diff --git a/scripts/wait_for_redis.py b/scripts/wait_for_redis.py new file mode 100644 index 00000000..306b9d49 --- /dev/null +++ b/scripts/wait_for_redis.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +""" +Helper script to wait for Redis to be available before starting the application. +""" + +import redis +import time +import os +import sys +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def wait_for_redis(host='localhost', port=6379, db=0, max_retries=30, retry_interval=2): + """Wait for Redis to become available""" + redis_client = None + retry_count = 0 + + logger.info(f"Waiting for Redis at {host}:{port}/{db}...") + + while retry_count < max_retries: + try: + redis_client = redis.Redis( + host=host, + port=port, + db=db, + socket_timeout=2, + socket_connect_timeout=2 + ) + redis_client.ping() + logger.info(f"✅ Redis at {host}:{port}/{db} is now available!") + return True + except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"❌ Failed to connect to Redis after {max_retries} attempts: {e}") + return False + + logger.info(f"⏳ Redis not available yet, retrying in {retry_interval}s... ({retry_count}/{max_retries})") + time.sleep(retry_interval) + except Exception as e: + logger.error(f"❌ Unexpected error connecting to Redis: {e}") + return False + + return False + +if __name__ == "__main__": + host = os.environ.get('REDIS_HOST', 'localhost') + port = int(os.environ.get('REDIS_PORT', 6379)) + db = int(os.environ.get('REDIS_DB', 0)) + max_retries = int(os.environ.get('REDIS_WAIT_RETRIES', 30)) + retry_interval = int(os.environ.get('REDIS_WAIT_INTERVAL', 2)) + + logger.info(f"Starting Redis availability check at {host}:{port}/{db}") + + if wait_for_redis(host, port, db, max_retries, retry_interval): + sys.exit(0) + else: + sys.exit(1) From d04ba07d10ee3a10a40b9fd76df723c819e673db Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 21 Mar 2025 19:59:54 -0500 Subject: [PATCH 2/4] Added client speed statistics to client metadata in redis. --- apps/proxy/ts_proxy/redis_keys.py | 5 +++ apps/proxy/ts_proxy/stream_generator.py | 56 ++++++++++++++----------- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/apps/proxy/ts_proxy/redis_keys.py b/apps/proxy/ts_proxy/redis_keys.py index 1eaa8aa5..22b02648 100644 --- a/apps/proxy/ts_proxy/redis_keys.py +++ b/apps/proxy/ts_proxy/redis_keys.py @@ -83,3 +83,8 @@ class RedisKeys: def transcode_active(channel_id): """Key indicating active transcode process""" return f"ts_proxy:channel:{channel_id}:transcode_active" + + @staticmethod + def client_metadata(channel_id, client_id): + """Key for client metadata hash""" + return f"ts_proxy:channel:{channel_id}:clients:{client_id}" diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index 8a91f1dc..6a6341c9 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -162,11 +162,6 @@ class StreamGenerator: def _stream_data_generator(self): """Generate stream data chunks based on buffer contents.""" - bytes_sent = 0 - chunks_sent = 0 - stream_start_time = time.time() - local_index = self.local_index - # Main streaming loop while True: # Check if resources still exist @@ -174,12 +169,11 @@ class StreamGenerator: break # Get chunks at client's position using improved strategy - chunks, next_index = self.buffer.get_optimized_client_data(local_index) + chunks, next_index = self.buffer.get_optimized_client_data(self.local_index) if chunks: - yield from self._process_chunks(chunks, next_index, bytes_sent, chunks_sent, stream_start_time) - local_index = next_index - self.local_index = local_index + yield from self._process_chunks(chunks, next_index) + self.local_index = next_index self.last_yield_time = time.time() self.empty_reads = 0 self.consecutive_empty = 0 @@ -188,11 +182,11 @@ class StreamGenerator: self.empty_reads += 1 self.consecutive_empty += 1 - if self._should_send_keepalive(local_index): + if self._should_send_keepalive(self.local_index): keepalive_packet = create_ts_packet('keepalive') logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head") yield keepalive_packet - bytes_sent += len(keepalive_packet) + self.bytes_sent += len(keepalive_packet) self.last_yield_time = time.time() self.consecutive_empty = 0 # Reset consecutive counter but keep total empty_reads time.sleep(Config.KEEPALIVE_INTERVAL) @@ -204,11 +198,11 @@ class StreamGenerator: # Log empty reads periodically if self.empty_reads % 50 == 0: stream_status = "healthy" if (self.stream_manager and self.stream_manager.healthy) else "unknown" - logger.debug(f"[{self.client_id}] Waiting for chunks beyond {local_index} (buffer at {self.buffer.index}, stream: {stream_status})") + logger.debug(f"[{self.client_id}] Waiting for chunks beyond {self.local_index} (buffer at {self.buffer.index}, stream: {stream_status})") # Check for ghost clients - if self._is_ghost_client(local_index): - logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - local_index} chunks ahead but client stuck at {local_index}") + if self._is_ghost_client(self.local_index): + logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - self.local_index} chunks ahead but client stuck at {self.local_index}") break # Check for timeouts @@ -258,7 +252,7 @@ class StreamGenerator: return True - def _process_chunks(self, chunks, next_index, bytes_sent, chunks_sent, stream_start_time): + def _process_chunks(self, chunks, next_index): """Process and yield chunks to the client.""" # Process and send chunks total_size = sum(len(c) for c in chunks) @@ -268,20 +262,34 @@ class StreamGenerator: for chunk in chunks: try: yield chunk - bytes_sent += len(chunk) - chunks_sent += 1 + self.bytes_sent += len(chunk) + self.chunks_sent += 1 + logger.debug(f"[{self.client_id}] Sent chunk {self.chunks_sent} ({len(chunk)} bytes) to client") + # Log every 10 chunks and store in redis for visibility + if self.chunks_sent % 10 == 0: + elapsed = time.time() - self.stream_start_time + rate = self.bytes_sent / elapsed / 1024 if elapsed > 0 else 0 + logger.debug(f"[{self.client_id}] Stats: {self.chunks_sent} chunks, {self.bytes_sent/1024:.1f} KB, {rate:.1f} KB/s") + + # Store stats in Redis client metadata + if proxy_server.redis_client: + try: + client_key = RedisKeys.client_metadata(self.channel_id, self.client_id) + stats = { + "chunks_sent": str(self.chunks_sent), + "bytes_sent": str(self.bytes_sent), + "transfer_rate_KBps": str(round(rate, 1)), + "stats_updated_at": str(time.time()) + } + proxy_server.redis_client.hset(client_key, mapping=stats) + # No need to set expiration as client heartbeat will refresh this key + except Exception as e: + logger.warning(f"[{self.client_id}] Failed to store stats in Redis: {e}") - # Log every 100 chunks for visibility - if chunks_sent % 100 == 0: - elapsed = time.time() - stream_start_time - rate = bytes_sent / elapsed / 1024 if elapsed > 0 else 0 - logger.info(f"[{self.client_id}] Stats: {chunks_sent} chunks, {bytes_sent/1024:.1f}KB, {rate:.1f}KB/s") except Exception as e: logger.error(f"[{self.client_id}] Error sending chunk to client: {e}") raise # Re-raise to exit the generator - return bytes_sent, chunks_sent - def _should_send_keepalive(self, local_index): """Determine if a keepalive packet should be sent.""" # Check if we're caught up to buffer head From 77002beaacb52497459449e81746c9f7b09338f0 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 22 Mar 2025 07:42:46 -0500 Subject: [PATCH 3/4] Improved logging to include current component name. --- apps/proxy/ts_proxy/channel_status.py | 3 ++- apps/proxy/ts_proxy/client_manager.py | 3 ++- apps/proxy/ts_proxy/config_helper.py | 3 --- apps/proxy/ts_proxy/server.py | 3 ++- apps/proxy/ts_proxy/stream_buffer.py | 3 ++- apps/proxy/ts_proxy/stream_generator.py | 5 +++-- apps/proxy/ts_proxy/stream_manager.py | 4 ++-- apps/proxy/ts_proxy/url_utils.py | 3 ++- apps/proxy/ts_proxy/utils.py | 30 ++++++++++++++++++++++++- apps/proxy/ts_proxy/views.py | 4 ++-- 10 files changed, 46 insertions(+), 15 deletions(-) diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py index 39423170..95c561f5 100644 --- a/apps/proxy/ts_proxy/channel_status.py +++ b/apps/proxy/ts_proxy/channel_status.py @@ -5,8 +5,9 @@ from . import proxy_server from .redis_keys import RedisKeys from .constants import TS_PACKET_SIZE from redis.exceptions import ConnectionError, TimeoutError +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class ChannelStatus: diff --git a/apps/proxy/ts_proxy/client_manager.py b/apps/proxy/ts_proxy/client_manager.py index d19ce719..42d7e04d 100644 --- a/apps/proxy/ts_proxy/client_manager.py +++ b/apps/proxy/ts_proxy/client_manager.py @@ -10,8 +10,9 @@ from redis.exceptions import ConnectionError, TimeoutError from .constants import EventType from .config_helper import ConfigHelper from .redis_keys import RedisKeys +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class ClientManager: """Manages client connections with no duplicates""" diff --git a/apps/proxy/ts_proxy/config_helper.py b/apps/proxy/ts_proxy/config_helper.py index 5a16581a..c0576cb7 100644 --- a/apps/proxy/ts_proxy/config_helper.py +++ b/apps/proxy/ts_proxy/config_helper.py @@ -2,11 +2,8 @@ Helper module to access configuration values with proper defaults. """ -import logging from apps.proxy.config import TSConfig as Config -logger = logging.getLogger("ts_proxy") - class ConfigHelper: """ Helper class for accessing configuration values with sensible defaults. diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 664b0273..c6a9ded2 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -26,8 +26,9 @@ from .client_manager import ClientManager from .redis_keys import RedisKeys from .constants import ChannelState, EventType, StreamType from .config_helper import ConfigHelper +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class ProxyServer: """Manages TS proxy server instance with worker coordination""" diff --git a/apps/proxy/ts_proxy/stream_buffer.py b/apps/proxy/ts_proxy/stream_buffer.py index 7bf4ba21..a94204ab 100644 --- a/apps/proxy/ts_proxy/stream_buffer.py +++ b/apps/proxy/ts_proxy/stream_buffer.py @@ -10,8 +10,9 @@ from apps.proxy.config import TSConfig as Config from .redis_keys import RedisKeys from .config_helper import ConfigHelper from .constants import TS_PACKET_SIZE +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class StreamBuffer: """Manages stream data buffering with optimized chunk storage""" diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index 6a6341c9..3d4f037c 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -8,10 +8,11 @@ import logging import threading from apps.proxy.config import TSConfig as Config from . import proxy_server -from .utils import create_ts_packet +from .utils import create_ts_packet, get_logger from .redis_keys import RedisKeys +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class StreamGenerator: """ diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index c47766eb..e8c3daac 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -13,13 +13,13 @@ from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile from core.models import UserAgent, CoreSettings from .stream_buffer import StreamBuffer -from .utils import detect_stream_type +from .utils import detect_stream_type, get_logger from .redis_keys import RedisKeys from .constants import ChannelState, EventType, StreamType, TS_PACKET_SIZE from .config_helper import ConfigHelper from .url_utils import get_alternate_streams, get_stream_info_for_switch -logger = logging.getLogger("ts_proxy") +logger = get_logger() class StreamManager: """Manages a connection to a TS stream without using raw sockets""" diff --git a/apps/proxy/ts_proxy/url_utils.py b/apps/proxy/ts_proxy/url_utils.py index e478c1c0..aa624ca1 100644 --- a/apps/proxy/ts_proxy/url_utils.py +++ b/apps/proxy/ts_proxy/url_utils.py @@ -9,8 +9,9 @@ from django.shortcuts import get_object_or_404 from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile from core.models import UserAgent, CoreSettings +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() def generate_stream_url(channel_id: str) -> Tuple[str, str, bool]: """ diff --git a/apps/proxy/ts_proxy/utils.py b/apps/proxy/ts_proxy/utils.py index d8c96464..b568b804 100644 --- a/apps/proxy/ts_proxy/utils.py +++ b/apps/proxy/ts_proxy/utils.py @@ -1,6 +1,7 @@ import logging import re from urllib.parse import urlparse +import inspect logger = logging.getLogger("ts_proxy") @@ -77,4 +78,31 @@ def create_ts_packet(packet_type='null', message=None): msg_bytes = message.encode('utf-8') packet[4:4+min(len(msg_bytes), 180)] = msg_bytes[:180] - return bytes(packet) \ No newline at end of file + return bytes(packet) + +def get_logger(component_name=None): + """ + Get a standardized logger with ts_proxy prefix and optional component name. + + Args: + component_name (str, optional): Name of the component. If not provided, + will try to detect from the calling module. + + Returns: + logging.Logger: A configured logger with standardized naming. + """ + if component_name: + logger_name = f"ts_proxy.{component_name}" + else: + # Try to get the calling module name if not explicitly specified + frame = inspect.currentframe().f_back + module = inspect.getmodule(frame) + if module: + # Extract just the filename without extension + module_name = module.__name__.split('.')[-1] + logger_name = f"ts_proxy.{module_name}" + else: + # Default if detection fails + logger_name = "ts_proxy" + + return logging.getLogger(logger_name) \ No newline at end of file diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 75ef4fc5..fe87e677 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -22,9 +22,9 @@ from .constants import ChannelState, EventType, StreamType from .config_helper import ConfigHelper from .services.channel_service import ChannelService from .url_utils import generate_stream_url, transform_url, get_stream_info_for_switch +from .utils import get_logger -# Configure logging properly -logger = logging.getLogger("ts_proxy") +logger = get_logger() @api_view(['GET']) From d622c96aba4092e70448959827a3242171a88ea1 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 22 Mar 2025 08:48:39 -0500 Subject: [PATCH 4/4] Improved connection handling for redis pubsub. --- apps/proxy/ts_proxy/server.py | 42 ++++--- core/redis_pubsub.py | 223 ++++++++++++++++++++++++++++++++++ core/utils.py | 49 ++++++-- dispatcharr/settings.py | 9 +- 4 files changed, 290 insertions(+), 33 deletions(-) create mode 100644 core/redis_pubsub.py diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index c6a9ded2..2dd923fd 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -18,7 +18,7 @@ import json from typing import Dict, Optional, Set from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel -from core.utils import redis_client as global_redis_client # Import the global Redis client +from core.utils import redis_client as global_redis_client, redis_pubsub_client as global_redis_pubsub_client # Import both global Redis clients from redis.exceptions import ConnectionError, TimeoutError from .stream_manager import StreamManager from .stream_buffer import StreamBuffer @@ -156,30 +156,34 @@ class ProxyServer: while True: try: - # Create a dedicated Redis client for PubSub with longer timeouts - # This avoids affecting the main Redis client operations - from django.conf import settings - import redis + # Use the global PubSub client if available + if global_redis_pubsub_client: + pubsub_client = global_redis_pubsub_client + logger.info("Using global Redis PubSub client for event listener") + else: + # Fall back to creating a dedicated client if global one is unavailable + from django.conf import settings + import redis - redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) - redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) - redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - # Create a dedicated client with generous timeouts for PubSub connections - pubsub_client = redis.Redis( - host=redis_host, - port=redis_port, - db=redis_db, - socket_timeout=60, # Much longer timeout for PubSub operations - socket_connect_timeout=10, - socket_keepalive=True, # Enable TCP keepalive - health_check_interval=30 - ) + pubsub_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=60, + socket_connect_timeout=10, + socket_keepalive=True, + health_check_interval=30 + ) + logger.info("Created dedicated Redis PubSub client for event listener") # Test connection before subscribing pubsub_client.ping() - # Create a new pubsub instance from the dedicated client + # Create a pubsub instance from the client pubsub = pubsub_client.pubsub() pubsub.psubscribe("ts_proxy:events:*") diff --git a/core/redis_pubsub.py b/core/redis_pubsub.py new file mode 100644 index 00000000..5fb57334 --- /dev/null +++ b/core/redis_pubsub.py @@ -0,0 +1,223 @@ +""" +Redis PubSub utilities for maintaining long-lived connections. +""" +import threading +import time +import logging +import json +from redis import Redis +from redis.exceptions import ConnectionError, TimeoutError + +logger = logging.getLogger(__name__) + +class RedisPubSubManager: + """ + A robust Redis PubSub manager that handles disconnections and reconnections. + """ + + def __init__(self, redis_client=None, auto_reconnect=True): + """ + Initialize the PubSub manager. + + Args: + redis_client: An existing Redis client to use + auto_reconnect: Whether to automatically reconnect on failure + """ + from .utils import get_redis_client + + self.redis_client = redis_client or get_redis_client() + self.pubsub = None + self.subscriptions = set() + self.pattern_subscriptions = set() + self.auto_reconnect = auto_reconnect + self.running = True + self.lock = threading.RLock() + self.message_handlers = {} # Map of channels to handler functions + self.message_thread = None + + def subscribe(self, channel, handler=None): + """ + Subscribe to a channel. + + Args: + channel: The channel to subscribe to + handler: Optional function to call when messages are received + """ + with self.lock: + self.subscriptions.add(channel) + if handler: + self.message_handlers[channel] = handler + + if self.pubsub: + self.pubsub.subscribe(channel) + logger.info(f"Subscribed to channel: {channel}") + + def psubscribe(self, pattern, handler=None): + """ + Subscribe to a channel pattern. + + Args: + pattern: The pattern to subscribe to + handler: Optional function to call when messages are received + """ + with self.lock: + self.pattern_subscriptions.add(pattern) + if handler: + self.message_handlers[pattern] = handler + + if self.pubsub: + self.pubsub.psubscribe(pattern) + logger.info(f"Subscribed to pattern: {pattern}") + + def publish(self, channel, message): + """ + Publish a message to a channel. + + Args: + channel: The channel to publish to + message: The message to publish (will be JSON-encoded if not a string) + + Returns: + Number of clients that received the message + """ + try: + if not isinstance(message, str): + message = json.dumps(message) + return self.redis_client.publish(channel, message) + except Exception as e: + logger.error(f"Error publishing to {channel}: {e}") + return 0 + + def start_listening(self): + """ + Start listening for messages in a background thread. + """ + if not self.message_thread: + self._connect() + self.message_thread = threading.Thread( + target=self._listen_for_messages, + daemon=True, + name="redis-pubsub-listener" + ) + self.message_thread.start() + logger.info("Started Redis PubSub listener thread") + + def stop(self): + """ + Stop listening and clean up resources. + """ + self.running = False + if self.pubsub: + try: + self.pubsub.close() + except: + pass + self.pubsub = None + + def _connect(self): + """ + Establish a new PubSub connection and subscribe to all channels. + """ + with self.lock: + # Close any existing connection + if self.pubsub: + try: + self.pubsub.close() + except: + pass + + # Create a new PubSub instance - critical: no timeout for subscribe operations + # This prevents the connection from timing out while waiting for messages + self.pubsub = self.redis_client.pubsub() + + # Resubscribe to all channels + if self.subscriptions: + self.pubsub.subscribe(*self.subscriptions) + logger.info(f"Resubscribed to channels: {self.subscriptions}") + + # Resubscribe to all patterns + if self.pattern_subscriptions: + self.pubsub.psubscribe(*self.pattern_subscriptions) + logger.info(f"Resubscribed to patterns: {self.pattern_subscriptions}") + + def _listen_for_messages(self): + """ + Background thread that listens for messages and handles reconnections. + """ + consecutive_errors = 0 + + while self.running: + try: + # Check if we need to connect + if not self.pubsub: + self._connect() + + # Listen for messages with NO timeout - this is critical! + message = self.pubsub.get_message(timeout=None) + + if message: + # Don't process subscription confirmation messages + if message['type'] in ('subscribe', 'psubscribe'): + continue + + channel = message.get('channel') + if channel: + # Decode binary channel name if needed + if isinstance(channel, bytes): + channel = channel.decode('utf-8') + + # Find and call the appropriate handler + handler = self.message_handlers.get(channel) + if handler: + try: + handler(message) + except Exception as e: + logger.error(f"Error in message handler for {channel}: {e}") + + # Reset error counter on success + consecutive_errors = 0 + + # Small sleep to prevent excessive CPU usage + time.sleep(0.01) + + except (ConnectionError, TimeoutError) as e: + consecutive_errors += 1 + + if not self.auto_reconnect: + logger.error(f"PubSub connection error and auto_reconnect is disabled: {e}") + break + + # Exponential backoff for reconnection attempts + backoff = min(consecutive_errors * 0.5, 5) + logger.warning(f"PubSub connection error, reconnecting in {backoff} seconds: {e}") + time.sleep(backoff) + + # Reconnect + self._connect() + + except Exception as e: + logger.error(f"Unexpected error in PubSub listener: {e}") + time.sleep(1) # Prevent tight loop in case of persistent errors + + logger.info("PubSub listener thread stopping") + +# Create a singleton instance +pubsub_manager = None + +def get_pubsub_manager(redis_client=None): + """ + Get or create the PubSub manager singleton. + + Args: + redis_client: Optional Redis client to use + + Returns: + The PubSub manager instance + """ + global pubsub_manager + + if pubsub_manager is None: + pubsub_manager = RedisPubSubManager(redis_client) + pubsub_manager.start_listening() + + return pubsub_manager diff --git a/core/utils.py b/core/utils.py index 06a2f25e..e61b0c14 100644 --- a/core/utils.py +++ b/core/utils.py @@ -2,6 +2,7 @@ import redis import logging import time import os +import threading from django.conf import settings from redis.exceptions import ConnectionError, TimeoutError @@ -17,15 +18,23 @@ def get_redis_client(max_retries=5, retry_interval=1): redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + # Use standardized settings + socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5) + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + # Create Redis client with better defaults client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, - socket_timeout=5, - socket_connect_timeout=5, - retry_on_timeout=True, - health_check_interval=30 + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout ) # Validate connection with ping @@ -49,7 +58,7 @@ def get_redis_client(max_retries=5, retry_interval=1): return None def get_redis_pubsub_client(max_retries=5, retry_interval=3): - """Get Redis client optimized for PubSub operations with longer timeouts""" + """Get Redis client optimized for PubSub operations""" retry_count = 0 while retry_count < max_retries: try: @@ -58,21 +67,30 @@ def get_redis_pubsub_client(max_retries=5, retry_interval=3): redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - # Create Redis client with PubSub-optimized settings + # Use standardized settings but without socket timeouts for PubSub + # Important: socket_timeout is None for PubSub operations + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + + # Create Redis client with PubSub-optimized settings - no timeout client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, - socket_timeout=60, # Longer timeout for blocking operations - socket_connect_timeout=5, - socket_keepalive=True, # Enable TCP keepalive - health_check_interval=30, - retry_on_timeout=True + socket_timeout=None, # Critical: No timeout for PubSub operations + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout ) # Validate connection with ping client.ping() logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}") + + # We don't need the keepalive thread anymore since we're using proper PubSub handling return client except (ConnectionError, TimeoutError) as e: @@ -114,5 +132,10 @@ def execute_redis_command(redis_client, command_func, default_return=None): logger.error(f"Redis command error: {e}") return default_return -# Initialize the global client with retry logic -redis_client = get_redis_client(max_retries=10, retry_interval=1) \ No newline at end of file +# Initialize the global clients with retry logic +redis_client = get_redis_client(max_retries=10, retry_interval=1) +redis_pubsub_client = get_redis_pubsub_client(max_retries=10, retry_interval=1) + +# Import and initialize the PubSub manager +from .redis_pubsub import get_pubsub_manager +pubsub_manager = get_pubsub_manager(redis_client) \ No newline at end of file diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 5cd21169..07373af1 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -179,8 +179,15 @@ SIMPLE_JWT = { 'BLACKLIST_AFTER_ROTATION': True, # Optional: Whether to blacklist refresh tokens } -# Redis settings for TS proxy +# Redis connection settings REDIS_URL = 'redis://localhost:6379/0' +REDIS_SOCKET_TIMEOUT = 60 # Socket timeout in seconds +REDIS_SOCKET_CONNECT_TIMEOUT = 5 # Connection timeout in seconds +REDIS_HEALTH_CHECK_INTERVAL = 15 # Health check every 15 seconds +REDIS_SOCKET_KEEPALIVE = True # Enable socket keepalive +REDIS_RETRY_ON_TIMEOUT = True # Retry on timeout +REDIS_MAX_RETRIES = 10 # Maximum number of retries +REDIS_RETRY_INTERVAL = 1 # Initial retry interval in seconds # Proxy Settings PROXY_SETTINGS = {