From efaa7f71951902dcaca617a8bbbe595b4f20a47d Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 21 Mar 2025 10:55:13 -0500 Subject: [PATCH] Singular redis-client. --- apps/proxy/ts_proxy/channel_status.py | 141 ++++---- apps/proxy/ts_proxy/client_manager.py | 23 +- apps/proxy/ts_proxy/server.py | 478 +++++++++++++++++--------- core/utils.py | 132 +++++-- docker/entrypoint.sh | 10 +- docker/uwsgi.dev.ini | 7 +- docker/uwsgi.ini | 7 +- scripts/wait_for_redis.py | 60 ++++ 8 files changed, 603 insertions(+), 255 deletions(-) create mode 100644 scripts/wait_for_redis.py diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py index cc8a1e5b..39423170 100644 --- a/apps/proxy/ts_proxy/channel_status.py +++ b/apps/proxy/ts_proxy/channel_status.py @@ -4,6 +4,7 @@ import re from . import proxy_server from .redis_keys import RedisKeys from .constants import TS_PACKET_SIZE +from redis.exceptions import ConnectionError, TimeoutError logger = logging.getLogger("ts_proxy") @@ -172,76 +173,98 @@ class ChannelStatus: return info - # Function for basic channel info (used for all channels summary) - def get_basic_channel_info(channel_id): - # Get channel metadata - metadata_key = RedisKeys.channel_metadata(channel_id) - metadata = proxy_server.redis_client.hgetall(metadata_key) - - if not metadata: + @staticmethod + def _execute_redis_command(command_func): + """Execute Redis command with error handling""" + if not proxy_server.redis_client: return None - # Basic channel info only - omit diagnostics and details - buffer_index_key = RedisKeys.buffer_index(channel_id) - buffer_index_value = proxy_server.redis_client.get(buffer_index_key) + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error in ChannelStatus: {e}") + return None + except Exception as e: + logger.error(f"Redis command error in ChannelStatus: {e}") + return None - # Count clients (using efficient count method) - client_set_key = RedisKeys.clients(channel_id) - client_count = proxy_server.redis_client.scard(client_set_key) or 0 + @staticmethod + def get_basic_channel_info(channel_id): + """Get basic channel information with Redis error handling""" + try: + # Use _execute_redis_command for Redis operations + metadata_key = RedisKeys.channel_metadata(channel_id) + metadata = ChannelStatus._execute_redis_command( + lambda: proxy_server.redis_client.hgetall(metadata_key) + ) - # Calculate uptime - created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) - uptime = time.time() - created_at if created_at > 0 else 0 + if not metadata: + return None - # Simplified info - info = { - 'channel_id': channel_id, - 'state': metadata.get(b'state', b'unknown').decode('utf-8'), - 'url': metadata.get(b'url', b'').decode('utf-8'), - 'profile': metadata.get(b'profile', b'unknown').decode('utf-8'), - 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), - 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, - 'client_count': client_count, - 'uptime': uptime - } + # Basic channel info only - omit diagnostics and details + buffer_index_key = RedisKeys.buffer_index(channel_id) + buffer_index_value = proxy_server.redis_client.get(buffer_index_key) - # Quick health check if available locally - if channel_id in proxy_server.stream_managers: - manager = proxy_server.stream_managers[channel_id] - info['healthy'] = manager.healthy + # Count clients (using efficient count method) + client_set_key = RedisKeys.clients(channel_id) + client_count = proxy_server.redis_client.scard(client_set_key) or 0 - # Get concise client information - clients = [] - client_ids = proxy_server.redis_client.smembers(client_set_key) + # Calculate uptime + created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) + uptime = time.time() - created_at if created_at > 0 else 0 - # Process only if we have clients and keep it limited - if client_ids: - # Get up to 10 clients for the basic view - for client_id in list(client_ids)[:10]: - client_id_str = client_id.decode('utf-8') - client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" + # Simplified info + info = { + 'channel_id': channel_id, + 'state': metadata.get(b'state', b'unknown').decode('utf-8'), + 'url': metadata.get(b'url', b'').decode('utf-8'), + 'profile': metadata.get(b'profile', b'unknown').decode('utf-8'), + 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), + 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + 'client_count': client_count, + 'uptime': uptime + } - # Efficient way - just retrieve the essentials - client_info = { - 'client_id': client_id_str, - 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'), - 'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'), - } + # Quick health check if available locally + if channel_id in proxy_server.stream_managers: + manager = proxy_server.stream_managers[channel_id] + info['healthy'] = manager.healthy - if client_info['user_agent']: - client_info['user_agent'] = client_info['user_agent'].decode('utf-8') - else: - client_info['user_agent'] = 'unknown' + # Get concise client information + clients = [] + client_ids = proxy_server.redis_client.smembers(client_set_key) - # Just get connected_at for client age - connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') - if connected_at_bytes: - connected_at = float(connected_at_bytes.decode('utf-8')) - client_info['connected_since'] = time.time() - connected_at + # Process only if we have clients and keep it limited + if client_ids: + # Get up to 10 clients for the basic view + for client_id in list(client_ids)[:10]: + client_id_str = client_id.decode('utf-8') + client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" - clients.append(client_info) + # Efficient way - just retrieve the essentials + client_info = { + 'client_id': client_id_str, + 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'), + 'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'), + } - # Add clients to info - info['clients'] = clients + if client_info['user_agent']: + client_info['user_agent'] = client_info['user_agent'].decode('utf-8') + else: + client_info['user_agent'] = 'unknown' - return info + # Just get connected_at for client age + connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') + if connected_at_bytes: + connected_at = float(connected_at_bytes.decode('utf-8')) + client_info['connected_since'] = time.time() - connected_at + + clients.append(client_info) + + # Add clients to info + info['clients'] = clients + + return info + except Exception as e: + logger.error(f"Error getting channel info: {e}") + return None diff --git a/apps/proxy/ts_proxy/client_manager.py b/apps/proxy/ts_proxy/client_manager.py index ed5868a9..d19ce719 100644 --- a/apps/proxy/ts_proxy/client_manager.py +++ b/apps/proxy/ts_proxy/client_manager.py @@ -6,6 +6,7 @@ import time import json from typing import Set, Optional from apps.proxy.config import TSConfig as Config +from redis.exceptions import ConnectionError, TimeoutError from .constants import EventType from .config_helper import ConfigHelper from .redis_keys import RedisKeys @@ -120,6 +121,20 @@ class ClientManager: thread.start() logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") + def _execute_redis_command(self, command_func): + """Execute Redis command with error handling""" + if not self.redis_client: + return None + + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error in ClientManager: {e}") + return None + except Exception as e: + logger.error(f"Redis command error in ClientManager: {e}") + return None + def _notify_owner_of_activity(self): """Notify channel owner that clients are active on this worker""" if not self.redis_client or not self.clients: @@ -130,11 +145,15 @@ class ClientManager: # STANDARDIZED KEY: Worker info under channel namespace worker_key = f"ts_proxy:channel:{self.channel_id}:worker:{worker_id}" - self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients))) + self._execute_redis_command( + lambda: self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients))) + ) # STANDARDIZED KEY: Activity timestamp under channel namespace activity_key = f"ts_proxy:channel:{self.channel_id}:activity" - self.redis_client.setex(activity_key, self.client_ttl, str(time.time())) + self._execute_redis_command( + lambda: self.redis_client.setex(activity_key, self.client_ttl, str(time.time())) + ) except Exception as e: logger.error(f"Error notifying owner of client activity: {e}") diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index cfe5b4ce..664b0273 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -18,6 +18,8 @@ import json from typing import Dict, Optional, Set from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel +from core.utils import redis_client as global_redis_client # Import the global Redis client +from redis.exceptions import ConnectionError, TimeoutError from .stream_manager import StreamManager from .stream_buffer import StreamBuffer from .client_manager import ClientManager @@ -43,19 +45,25 @@ class ProxyServer: hostname = socket.gethostname() self.worker_id = f"{hostname}:{pid}" - # Connect to Redis + # Connect to Redis - try using global client first self.redis_client = None - try: - import redis - from django.conf import settings + self.redis_connection_attempts = 0 + self.redis_max_retries = 3 + self.redis_retry_interval = 5 # seconds + + try: + # First try to use the global client from core.utils + if global_redis_client is not None: + self.redis_client = global_redis_client + logger.info(f"Using global Redis client") + logger.info(f"Worker ID: {self.worker_id}") + else: + # Fall back to direct connection with retry + self._setup_redis_connection() - redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0') - self.redis_client = redis.from_url(redis_url) - logger.info(f"Connected to Redis at {redis_url}") - logger.info(f"Worker ID: {self.worker_id}") except Exception as e: + logger.error(f"Failed to initialize Redis: {e}") self.redis_client = None - logger.error(f"Failed to connect to Redis: {e}") # Start cleanup thread self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60) @@ -64,179 +72,302 @@ class ProxyServer: # Start event listener for Redis pubsub messages self._start_event_listener() + def _setup_redis_connection(self): + """Setup Redis connection with retry logic""" + import redis + from django.conf import settings + + while self.redis_connection_attempts < self.redis_max_retries: + try: + logger.info(f"Attempting to connect to Redis ({self.redis_connection_attempts+1}/{self.redis_max_retries})") + + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + + # Create Redis client with reasonable timeouts + self.redis_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=5, + socket_connect_timeout=5, + retry_on_timeout=True, + health_check_interval=30 + ) + + # Test connection + self.redis_client.ping() + logger.info(f"Successfully connected to Redis at {redis_host}:{redis_port}/{redis_db}") + logger.info(f"Worker ID: {self.worker_id}") + break + + except (ConnectionError, TimeoutError) as e: + self.redis_connection_attempts += 1 + if self.redis_connection_attempts >= self.redis_max_retries: + logger.error(f"Failed to connect to Redis after {self.redis_max_retries} attempts: {e}") + self.redis_client = None + else: + # Exponential backoff with a maximum of 30 seconds + retry_delay = min(self.redis_retry_interval * (2 ** (self.redis_connection_attempts - 1)), 30) + logger.warning(f"Redis connection failed. Retrying in {retry_delay}s... ({self.redis_connection_attempts}/{self.redis_max_retries})") + time.sleep(retry_delay) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis: {e}", exc_info=True) + self.redis_client = None + break + + def _execute_redis_command(self, command_func, *args, **kwargs): + """Execute Redis command with error handling and reconnection logic""" + if not self.redis_client: + return None + + try: + return command_func(*args, **kwargs) + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection lost: {e}. Attempting to reconnect...") + try: + # Try to reconnect + self.redis_connection_attempts = 0 + self._setup_redis_connection() + if self.redis_client: + # Retry the command once + return command_func(*args, **kwargs) + except Exception as reconnect_error: + logger.error(f"Failed to reconnect to Redis: {reconnect_error}") + return None + except Exception as e: + logger.error(f"Redis command error: {e}") + return None + def _start_event_listener(self): """Listen for events from other workers""" if not self.redis_client: return def event_listener(): - try: - pubsub = self.redis_client.pubsub() - pubsub.psubscribe("ts_proxy:events:*") + retry_count = 0 + max_retries = 10 + base_retry_delay = 1 # Start with 1 second delay + max_retry_delay = 30 # Cap at 30 seconds - logger.info(f"Started Redis event listener for client activity") + while True: + try: + # Create a dedicated Redis client for PubSub with longer timeouts + # This avoids affecting the main Redis client operations + from django.conf import settings + import redis - for message in pubsub.listen(): - if message["type"] != "pmessage": - continue + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - try: - channel = message["channel"].decode("utf-8") - data = json.loads(message["data"].decode("utf-8")) + # Create a dedicated client with generous timeouts for PubSub connections + pubsub_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=60, # Much longer timeout for PubSub operations + socket_connect_timeout=10, + socket_keepalive=True, # Enable TCP keepalive + health_check_interval=30 + ) - event_type = data.get("event") - channel_id = data.get("channel_id") + # Test connection before subscribing + pubsub_client.ping() - if channel_id and event_type: - # For owner, update client status immediately - if self.am_i_owner(channel_id): - if event_type == EventType.CLIENT_CONNECTED: - logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}") - # Reset any disconnect timer - disconnect_key = RedisKeys.last_client_disconnect(channel_id) - self.redis_client.delete(disconnect_key) + # Create a new pubsub instance from the dedicated client + pubsub = pubsub_client.pubsub() + pubsub.psubscribe("ts_proxy:events:*") - elif event_type == EventType.CLIENT_DISCONNECTED: - logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}") - # Check if any clients remain - if channel_id in self.client_managers: - # VERIFY REDIS CLIENT COUNT DIRECTLY - client_set_key = RedisKeys.clients(channel_id) - total = self.redis_client.scard(client_set_key) or 0 + logger.info(f"Started Redis event listener for client activity") - if total == 0: - logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}") - # Set the disconnect timer for other workers to see - disconnect_key = RedisKeys.last_client_disconnect(channel_id) - self.redis_client.setex(disconnect_key, 60, str(time.time())) + # Reset retry count on successful connection + retry_count = 0 - # Get configured shutdown delay or default - shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0) + for message in pubsub.listen(): + if message["type"] != "pmessage": + continue - if shutdown_delay > 0: - logger.info(f"Waiting {shutdown_delay}s before stopping channel...") - time.sleep(shutdown_delay) + try: + channel = message["channel"].decode("utf-8") + data = json.loads(message["data"].decode("utf-8")) - # Re-check client count before stopping - total = self.redis_client.scard(client_set_key) or 0 - if total > 0: - logger.info(f"New clients connected during shutdown delay - aborting shutdown") - self.redis_client.delete(disconnect_key) - return + event_type = data.get("event") + channel_id = data.get("channel_id") - # Stop the channel directly + if channel_id and event_type: + # For owner, update client status immediately + if self.am_i_owner(channel_id): + if event_type == EventType.CLIENT_CONNECTED: + logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}") + # Reset any disconnect timer + disconnect_key = RedisKeys.last_client_disconnect(channel_id) + self.redis_client.delete(disconnect_key) + + elif event_type == EventType.CLIENT_DISCONNECTED: + logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}") + # Check if any clients remain + if channel_id in self.client_managers: + # VERIFY REDIS CLIENT COUNT DIRECTLY + client_set_key = RedisKeys.clients(channel_id) + total = self.redis_client.scard(client_set_key) or 0 + + if total == 0: + logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}") + # Set the disconnect timer for other workers to see + disconnect_key = RedisKeys.last_client_disconnect(channel_id) + self.redis_client.setex(disconnect_key, 60, str(time.time())) + + # Get configured shutdown delay or default + shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0) + + if shutdown_delay > 0: + logger.info(f"Waiting {shutdown_delay}s before stopping channel...") + time.sleep(shutdown_delay) + + # Re-check client count before stopping + total = self.redis_client.scard(client_set_key) or 0 + if total > 0: + logger.info(f"New clients connected during shutdown delay - aborting shutdown") + self.redis_client.delete(disconnect_key) + return + + # Stop the channel directly + self.stop_channel(channel_id) + + + elif event_type == EventType.STREAM_SWITCH: + logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}") + # Handle stream switch request + new_url = data.get("url") + user_agent = data.get("user_agent") + + if new_url and channel_id in self.stream_managers: + # Update metadata in Redis + if self.redis_client: + metadata_key = RedisKeys.channel_metadata(channel_id) + self.redis_client.hset(metadata_key, "url", new_url) + if user_agent: + self.redis_client.hset(metadata_key, "user_agent", user_agent) + + # Set switch status + status_key = RedisKeys.switch_status(channel_id) + self.redis_client.set(status_key, "switching") + + # Perform the stream switch + stream_manager = self.stream_managers[channel_id] + success = stream_manager.update_url(new_url) + + if success: + logger.info(f"Stream switch initiated for channel {channel_id}") + + # Publish confirmation + switch_result = { + "event": EventType.STREAM_SWITCHED, # Use constant instead of string + "channel_id": channel_id, + "success": True, + "url": new_url, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(switch_result) + ) + + # Update status + if self.redis_client: + self.redis_client.set(status_key, "switched") + else: + logger.error(f"Failed to switch stream for channel {channel_id}") + + # Publish failure + switch_result = { + "event": EventType.STREAM_SWITCHED, + "channel_id": channel_id, + "success": False, + "url": new_url, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(switch_result) + ) + elif event_type == EventType.CHANNEL_STOP: + logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}") + # First mark channel as stopping in Redis + if self.redis_client: + # Set stopping state in metadata + metadata_key = RedisKeys.channel_metadata(channel_id) + if self.redis_client.exists(metadata_key): + self.redis_client.hset(metadata_key, mapping={ + "state": ChannelState.STOPPING, + "state_changed_at": str(time.time()) + }) + + # If we have local resources for this channel, clean them up + if channel_id in self.stream_buffers or channel_id in self.client_managers: + # Use existing stop_channel method + logger.info(f"Stopping local resources for channel {channel_id}") self.stop_channel(channel_id) + # Acknowledge stop by publishing a response + stop_response = { + "event": EventType.CHANNEL_STOPPED, + "channel_id": channel_id, + "worker_id": self.worker_id, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(stop_response) + ) + elif event_type == EventType.CLIENT_STOP: + client_id = data.get("client_id") + if client_id and channel_id: + logger.info(f"Received request to stop client {client_id} on channel {channel_id}") - elif event_type == EventType.STREAM_SWITCH: - logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}") - # Handle stream switch request - new_url = data.get("url") - user_agent = data.get("user_agent") + # Both remove from client manager AND set a key for the generator to detect + if channel_id in self.client_managers: + client_manager = self.client_managers[channel_id] + if client_id in client_manager.clients: + client_manager.remove_client(client_id) + logger.info(f"Removed client {client_id} from client manager") - if new_url and channel_id in self.stream_managers: - # Update metadata in Redis - if self.redis_client: - metadata_key = RedisKeys.channel_metadata(channel_id) - self.redis_client.hset(metadata_key, "url", new_url) - if user_agent: - self.redis_client.hset(metadata_key, "user_agent", user_agent) - - # Set switch status - status_key = RedisKeys.switch_status(channel_id) - self.redis_client.set(status_key, "switching") - - # Perform the stream switch - stream_manager = self.stream_managers[channel_id] - success = stream_manager.update_url(new_url) - - if success: - logger.info(f"Stream switch initiated for channel {channel_id}") - - # Publish confirmation - switch_result = { - "event": EventType.STREAM_SWITCHED, # Use constant instead of string - "channel_id": channel_id, - "success": True, - "url": new_url, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(switch_result) - ) - - # Update status + # Set a Redis key for the generator to detect if self.redis_client: - self.redis_client.set(status_key, "switched") - else: - logger.error(f"Failed to switch stream for channel {channel_id}") + stop_key = RedisKeys.client_stop(channel_id, client_id) + self.redis_client.setex(stop_key, 30, "true") # 30 second TTL + logger.info(f"Set stop key for client {client_id}") + except Exception as e: + logger.error(f"Error processing event message: {e}") - # Publish failure - switch_result = { - "event": EventType.STREAM_SWITCHED, - "channel_id": channel_id, - "success": False, - "url": new_url, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(switch_result) - ) - elif event_type == EventType.CHANNEL_STOP: - logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}") - # First mark channel as stopping in Redis - if self.redis_client: - # Set stopping state in metadata - metadata_key = RedisKeys.channel_metadata(channel_id) - if self.redis_client.exists(metadata_key): - self.redis_client.hset(metadata_key, mapping={ - "state": ChannelState.STOPPING, - "state_changed_at": str(time.time()) - }) + except (ConnectionError, TimeoutError) as e: + # Calculate exponential backoff with jitter + retry_count += 1 + delay = min(base_retry_delay * (2 ** (retry_count - 1)), max_retry_delay) + # Add some randomness to prevent thundering herd + jitter = random.uniform(0, 0.5 * delay) + final_delay = delay + jitter - # If we have local resources for this channel, clean them up - if channel_id in self.stream_buffers or channel_id in self.client_managers: - # Use existing stop_channel method - logger.info(f"Stopping local resources for channel {channel_id}") - self.stop_channel(channel_id) + logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})") + time.sleep(final_delay) - # Acknowledge stop by publishing a response - stop_response = { - "event": EventType.CHANNEL_STOPPED, - "channel_id": channel_id, - "worker_id": self.worker_id, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(stop_response) - ) - elif event_type == EventType.CLIENT_STOP: - client_id = data.get("client_id") - if client_id and channel_id: - logger.info(f"Received request to stop client {client_id} on channel {channel_id}") + # Try to clean up the old connection + try: + if 'pubsub' in locals(): + pubsub.close() + if 'pubsub_client' in locals(): + pubsub_client.close() + except: + pass - # Both remove from client manager AND set a key for the generator to detect - if channel_id in self.client_managers: - client_manager = self.client_managers[channel_id] - if client_id in client_manager.clients: - client_manager.remove_client(client_id) - logger.info(f"Removed client {client_id} from client manager") - - # Set a Redis key for the generator to detect - if self.redis_client: - stop_key = RedisKeys.client_stop(channel_id, client_id) - self.redis_client.setex(stop_key, 30, "true") # 30 second TTL - logger.info(f"Set stop key for client {client_id}") - except Exception as e: - logger.error(f"Error processing event message: {e}") - except Exception as e: - logger.error(f"Error in event listener: {e}") - time.sleep(5) # Wait before reconnecting - # Try to restart the listener - self._start_event_listener() + except Exception as e: + logger.error(f"Error in event listener: {e}") + # Add a short delay to prevent rapid retries on persistent errors + time.sleep(5) thread = threading.Thread(target=event_listener, daemon=True) thread.name = "redis-event-listener" @@ -249,10 +380,9 @@ class ProxyServer: try: lock_key = RedisKeys.channel_owner(channel_id) - owner = self.redis_client.get(lock_key) - if owner: - return owner.decode('utf-8') - return None + return self._execute_redis_command( + lambda: self.redis_client.get(lock_key).decode('utf-8') if self.redis_client.get(lock_key) else None + ) except Exception as e: logger.error(f"Error getting channel owner: {e}") return None @@ -271,20 +401,32 @@ class ProxyServer: # Create a lock key with proper namespace lock_key = RedisKeys.channel_owner(channel_id) - # Use Redis SETNX for atomic locking - only succeeds if the key doesn't exist - acquired = self.redis_client.setnx(lock_key, self.worker_id) + # Use Redis SETNX for atomic locking with error handling + acquired = self._execute_redis_command( + lambda: self.redis_client.setnx(lock_key, self.worker_id) + ) + + if acquired is None: # Redis command failed + logger.warning(f"Redis command failed during ownership acquisition - assuming ownership") + return True # If acquired, set expiry to prevent orphaned locks if acquired: - self.redis_client.expire(lock_key, ttl) + self._execute_redis_command( + lambda: self.redis_client.expire(lock_key, ttl) + ) logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}") return True # If not acquired, check if we already own it (might be a retry) - current_owner = self.redis_client.get(lock_key) + current_owner = self._execute_redis_command( + lambda: self.redis_client.get(lock_key) + ) if current_owner and current_owner.decode('utf-8') == self.worker_id: # Refresh TTL - self.redis_client.expire(lock_key, ttl) + self._execute_redis_command( + lambda: self.redis_client.expire(lock_key, ttl) + ) logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}") return True @@ -689,7 +831,7 @@ class ProxyServer: return True except Exception as e: - logger.error(f"Error stopping channel {channel_id}: {e}", exc_info=True) + logger.error(f"Error stopping channel {channel_id}: {e}") return False def check_inactive_channels(self): @@ -723,7 +865,9 @@ class ProxyServer: # Send worker heartbeat first if self.redis_client: worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat" - self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) + self._execute_redis_command( + lambda: self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) + ) # Refresh channel registry self.refresh_channel_registry() diff --git a/core/utils.py b/core/utils.py index e9b5f6d2..06a2f25e 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,28 +1,118 @@ import redis import logging +import time +import os from django.conf import settings +from redis.exceptions import ConnectionError, TimeoutError logger = logging.getLogger(__name__) -def get_redis_client(): - """Get Redis client with connection validation""" - try: - # Create Redis client - client = redis.Redis( - host=settings.REDIS_HOST, - port=getattr(settings, 'REDIS_PORT', 6379), - db=settings.REDIS_DB, - socket_timeout=5, - socket_connect_timeout=5 - ) - - # Validate connection with ping - client.ping() - logger.info(f"Connected to Redis at {settings.REDIS_HOST}:6379/{settings.REDIS_DB}") - return client - except Exception as e: - logger.error(f"Failed to connect to Redis: {e}") - return None +def get_redis_client(max_retries=5, retry_interval=1): + """Get Redis client with connection validation and retry logic""" + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) -# Initialize the global client -redis_client = get_redis_client() \ No newline at end of file + # Create Redis client with better defaults + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=5, + socket_connect_timeout=5, + retry_on_timeout=True, + health_check_interval=30 + ) + + # Validate connection with ping + client.ping() + logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") + return client + + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis: {e}") + return None + +def get_redis_pubsub_client(max_retries=5, retry_interval=3): + """Get Redis client optimized for PubSub operations with longer timeouts""" + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + + # Create Redis client with PubSub-optimized settings + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=60, # Longer timeout for blocking operations + socket_connect_timeout=5, + socket_keepalive=True, # Enable TCP keepalive + health_check_interval=30, + retry_on_timeout=True + ) + + # Validate connection with ping + client.ping() + logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}") + return client + + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis for PubSub: {e}") + return None + +def execute_redis_command(redis_client, command_func, default_return=None): + """ + Execute a Redis command with proper error handling + + Args: + redis_client: The Redis client instance + command_func: Lambda function containing the Redis command to execute + default_return: Value to return if command fails + + Returns: + Command result or default_return on failure + """ + if redis_client is None: + return default_return + + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error: {e}") + return default_return + except Exception as e: + logger.error(f"Redis command error: {e}") + return default_return + +# Initialize the global client with retry logic +redis_client = get_redis_client(max_retries=10, retry_interval=1) \ No newline at end of file diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 2898ed2f..d04edcb0 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -85,10 +85,6 @@ else pids+=("$nginx_pid") fi -cd /app -python manage.py migrate --noinput -python manage.py collectstatic --noinput - uwsgi_file="/app/docker/uwsgi.ini" if [ "$DISPATCHARR_ENV" = "dev" ]; then uwsgi_file="/app/docker/uwsgi.dev.ini" @@ -100,6 +96,12 @@ uwsgi_pid=$(pgrep uwsgi | sort | head -n1) echo "✅ uwsgi started with PID $uwsgi_pid" pids+=("$uwsgi_pid") + + +cd /app +python manage.py migrate --noinput +python manage.py collectstatic --noinput + # Wait for at least one process to exit and log the process that exited first if [ ${#pids[@]} -gt 0 ]; then echo "⏳ Waiting for processes to exit..." diff --git a/docker/uwsgi.dev.ini b/docker/uwsgi.dev.ini index 93ef9fa0..5b23b183 100644 --- a/docker/uwsgi.dev.ini +++ b/docker/uwsgi.dev.ini @@ -2,9 +2,14 @@ ; exec-before = python manage.py collectstatic --noinput ; exec-before = python manage.py migrate --noinput +; First run Redis availability check script once +exec-pre = python /app/scripts/wait_for_redis.py + +; Start Redis first +attach-daemon = redis-server +; Then start other services attach-daemon = celery -A dispatcharr worker -l info attach-daemon = celery -A dispatcharr beat -l info -attach-daemon = redis-server attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application attach-daemon = cd /app/frontend && npm run dev diff --git a/docker/uwsgi.ini b/docker/uwsgi.ini index adb5de33..1b1de50a 100644 --- a/docker/uwsgi.ini +++ b/docker/uwsgi.ini @@ -2,9 +2,14 @@ ; exec-before = python manage.py collectstatic --noinput ; exec-before = python manage.py migrate --noinput +; First run Redis availability check script once +exec-pre = python /app/scripts/wait_for_redis.py + +; Start Redis first +attach-daemon = redis-server +; Then start other services attach-daemon = celery -A dispatcharr worker -l error attach-daemon = celery -A dispatcharr beat -l error -attach-daemon = redis-server attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application # Core settings diff --git a/scripts/wait_for_redis.py b/scripts/wait_for_redis.py new file mode 100644 index 00000000..306b9d49 --- /dev/null +++ b/scripts/wait_for_redis.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +""" +Helper script to wait for Redis to be available before starting the application. +""" + +import redis +import time +import os +import sys +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def wait_for_redis(host='localhost', port=6379, db=0, max_retries=30, retry_interval=2): + """Wait for Redis to become available""" + redis_client = None + retry_count = 0 + + logger.info(f"Waiting for Redis at {host}:{port}/{db}...") + + while retry_count < max_retries: + try: + redis_client = redis.Redis( + host=host, + port=port, + db=db, + socket_timeout=2, + socket_connect_timeout=2 + ) + redis_client.ping() + logger.info(f"✅ Redis at {host}:{port}/{db} is now available!") + return True + except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"❌ Failed to connect to Redis after {max_retries} attempts: {e}") + return False + + logger.info(f"⏳ Redis not available yet, retrying in {retry_interval}s... ({retry_count}/{max_retries})") + time.sleep(retry_interval) + except Exception as e: + logger.error(f"❌ Unexpected error connecting to Redis: {e}") + return False + + return False + +if __name__ == "__main__": + host = os.environ.get('REDIS_HOST', 'localhost') + port = int(os.environ.get('REDIS_PORT', 6379)) + db = int(os.environ.get('REDIS_DB', 0)) + max_retries = int(os.environ.get('REDIS_WAIT_RETRIES', 30)) + retry_interval = int(os.environ.get('REDIS_WAIT_INTERVAL', 2)) + + logger.info(f"Starting Redis availability check at {host}:{port}/{db}") + + if wait_for_redis(host, port, db, max_retries, retry_interval): + sys.exit(0) + else: + sys.exit(1)