From d622c96aba4092e70448959827a3242171a88ea1 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 22 Mar 2025 08:48:39 -0500 Subject: [PATCH] Improved connection handling for redis pubsub. --- apps/proxy/ts_proxy/server.py | 42 ++++--- core/redis_pubsub.py | 223 ++++++++++++++++++++++++++++++++++ core/utils.py | 49 ++++++-- dispatcharr/settings.py | 9 +- 4 files changed, 290 insertions(+), 33 deletions(-) create mode 100644 core/redis_pubsub.py diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index c6a9ded2..2dd923fd 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -18,7 +18,7 @@ import json from typing import Dict, Optional, Set from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel -from core.utils import redis_client as global_redis_client # Import the global Redis client +from core.utils import redis_client as global_redis_client, redis_pubsub_client as global_redis_pubsub_client # Import both global Redis clients from redis.exceptions import ConnectionError, TimeoutError from .stream_manager import StreamManager from .stream_buffer import StreamBuffer @@ -156,30 +156,34 @@ class ProxyServer: while True: try: - # Create a dedicated Redis client for PubSub with longer timeouts - # This avoids affecting the main Redis client operations - from django.conf import settings - import redis + # Use the global PubSub client if available + if global_redis_pubsub_client: + pubsub_client = global_redis_pubsub_client + logger.info("Using global Redis PubSub client for event listener") + else: + # Fall back to creating a dedicated client if global one is unavailable + from django.conf import settings + import redis - redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) - redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) - redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - # Create a dedicated client with generous timeouts for PubSub connections - pubsub_client = redis.Redis( - host=redis_host, - port=redis_port, - db=redis_db, - socket_timeout=60, # Much longer timeout for PubSub operations - socket_connect_timeout=10, - socket_keepalive=True, # Enable TCP keepalive - health_check_interval=30 - ) + pubsub_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=60, + socket_connect_timeout=10, + socket_keepalive=True, + health_check_interval=30 + ) + logger.info("Created dedicated Redis PubSub client for event listener") # Test connection before subscribing pubsub_client.ping() - # Create a new pubsub instance from the dedicated client + # Create a pubsub instance from the client pubsub = pubsub_client.pubsub() pubsub.psubscribe("ts_proxy:events:*") diff --git a/core/redis_pubsub.py b/core/redis_pubsub.py new file mode 100644 index 00000000..5fb57334 --- /dev/null +++ b/core/redis_pubsub.py @@ -0,0 +1,223 @@ +""" +Redis PubSub utilities for maintaining long-lived connections. +""" +import threading +import time +import logging +import json +from redis import Redis +from redis.exceptions import ConnectionError, TimeoutError + +logger = logging.getLogger(__name__) + +class RedisPubSubManager: + """ + A robust Redis PubSub manager that handles disconnections and reconnections. + """ + + def __init__(self, redis_client=None, auto_reconnect=True): + """ + Initialize the PubSub manager. + + Args: + redis_client: An existing Redis client to use + auto_reconnect: Whether to automatically reconnect on failure + """ + from .utils import get_redis_client + + self.redis_client = redis_client or get_redis_client() + self.pubsub = None + self.subscriptions = set() + self.pattern_subscriptions = set() + self.auto_reconnect = auto_reconnect + self.running = True + self.lock = threading.RLock() + self.message_handlers = {} # Map of channels to handler functions + self.message_thread = None + + def subscribe(self, channel, handler=None): + """ + Subscribe to a channel. + + Args: + channel: The channel to subscribe to + handler: Optional function to call when messages are received + """ + with self.lock: + self.subscriptions.add(channel) + if handler: + self.message_handlers[channel] = handler + + if self.pubsub: + self.pubsub.subscribe(channel) + logger.info(f"Subscribed to channel: {channel}") + + def psubscribe(self, pattern, handler=None): + """ + Subscribe to a channel pattern. + + Args: + pattern: The pattern to subscribe to + handler: Optional function to call when messages are received + """ + with self.lock: + self.pattern_subscriptions.add(pattern) + if handler: + self.message_handlers[pattern] = handler + + if self.pubsub: + self.pubsub.psubscribe(pattern) + logger.info(f"Subscribed to pattern: {pattern}") + + def publish(self, channel, message): + """ + Publish a message to a channel. + + Args: + channel: The channel to publish to + message: The message to publish (will be JSON-encoded if not a string) + + Returns: + Number of clients that received the message + """ + try: + if not isinstance(message, str): + message = json.dumps(message) + return self.redis_client.publish(channel, message) + except Exception as e: + logger.error(f"Error publishing to {channel}: {e}") + return 0 + + def start_listening(self): + """ + Start listening for messages in a background thread. + """ + if not self.message_thread: + self._connect() + self.message_thread = threading.Thread( + target=self._listen_for_messages, + daemon=True, + name="redis-pubsub-listener" + ) + self.message_thread.start() + logger.info("Started Redis PubSub listener thread") + + def stop(self): + """ + Stop listening and clean up resources. + """ + self.running = False + if self.pubsub: + try: + self.pubsub.close() + except: + pass + self.pubsub = None + + def _connect(self): + """ + Establish a new PubSub connection and subscribe to all channels. + """ + with self.lock: + # Close any existing connection + if self.pubsub: + try: + self.pubsub.close() + except: + pass + + # Create a new PubSub instance - critical: no timeout for subscribe operations + # This prevents the connection from timing out while waiting for messages + self.pubsub = self.redis_client.pubsub() + + # Resubscribe to all channels + if self.subscriptions: + self.pubsub.subscribe(*self.subscriptions) + logger.info(f"Resubscribed to channels: {self.subscriptions}") + + # Resubscribe to all patterns + if self.pattern_subscriptions: + self.pubsub.psubscribe(*self.pattern_subscriptions) + logger.info(f"Resubscribed to patterns: {self.pattern_subscriptions}") + + def _listen_for_messages(self): + """ + Background thread that listens for messages and handles reconnections. + """ + consecutive_errors = 0 + + while self.running: + try: + # Check if we need to connect + if not self.pubsub: + self._connect() + + # Listen for messages with NO timeout - this is critical! + message = self.pubsub.get_message(timeout=None) + + if message: + # Don't process subscription confirmation messages + if message['type'] in ('subscribe', 'psubscribe'): + continue + + channel = message.get('channel') + if channel: + # Decode binary channel name if needed + if isinstance(channel, bytes): + channel = channel.decode('utf-8') + + # Find and call the appropriate handler + handler = self.message_handlers.get(channel) + if handler: + try: + handler(message) + except Exception as e: + logger.error(f"Error in message handler for {channel}: {e}") + + # Reset error counter on success + consecutive_errors = 0 + + # Small sleep to prevent excessive CPU usage + time.sleep(0.01) + + except (ConnectionError, TimeoutError) as e: + consecutive_errors += 1 + + if not self.auto_reconnect: + logger.error(f"PubSub connection error and auto_reconnect is disabled: {e}") + break + + # Exponential backoff for reconnection attempts + backoff = min(consecutive_errors * 0.5, 5) + logger.warning(f"PubSub connection error, reconnecting in {backoff} seconds: {e}") + time.sleep(backoff) + + # Reconnect + self._connect() + + except Exception as e: + logger.error(f"Unexpected error in PubSub listener: {e}") + time.sleep(1) # Prevent tight loop in case of persistent errors + + logger.info("PubSub listener thread stopping") + +# Create a singleton instance +pubsub_manager = None + +def get_pubsub_manager(redis_client=None): + """ + Get or create the PubSub manager singleton. + + Args: + redis_client: Optional Redis client to use + + Returns: + The PubSub manager instance + """ + global pubsub_manager + + if pubsub_manager is None: + pubsub_manager = RedisPubSubManager(redis_client) + pubsub_manager.start_listening() + + return pubsub_manager diff --git a/core/utils.py b/core/utils.py index 06a2f25e..e61b0c14 100644 --- a/core/utils.py +++ b/core/utils.py @@ -2,6 +2,7 @@ import redis import logging import time import os +import threading from django.conf import settings from redis.exceptions import ConnectionError, TimeoutError @@ -17,15 +18,23 @@ def get_redis_client(max_retries=5, retry_interval=1): redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + # Use standardized settings + socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5) + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + # Create Redis client with better defaults client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, - socket_timeout=5, - socket_connect_timeout=5, - retry_on_timeout=True, - health_check_interval=30 + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout ) # Validate connection with ping @@ -49,7 +58,7 @@ def get_redis_client(max_retries=5, retry_interval=1): return None def get_redis_pubsub_client(max_retries=5, retry_interval=3): - """Get Redis client optimized for PubSub operations with longer timeouts""" + """Get Redis client optimized for PubSub operations""" retry_count = 0 while retry_count < max_retries: try: @@ -58,21 +67,30 @@ def get_redis_pubsub_client(max_retries=5, retry_interval=3): redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - # Create Redis client with PubSub-optimized settings + # Use standardized settings but without socket timeouts for PubSub + # Important: socket_timeout is None for PubSub operations + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + + # Create Redis client with PubSub-optimized settings - no timeout client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, - socket_timeout=60, # Longer timeout for blocking operations - socket_connect_timeout=5, - socket_keepalive=True, # Enable TCP keepalive - health_check_interval=30, - retry_on_timeout=True + socket_timeout=None, # Critical: No timeout for PubSub operations + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout ) # Validate connection with ping client.ping() logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}") + + # We don't need the keepalive thread anymore since we're using proper PubSub handling return client except (ConnectionError, TimeoutError) as e: @@ -114,5 +132,10 @@ def execute_redis_command(redis_client, command_func, default_return=None): logger.error(f"Redis command error: {e}") return default_return -# Initialize the global client with retry logic -redis_client = get_redis_client(max_retries=10, retry_interval=1) \ No newline at end of file +# Initialize the global clients with retry logic +redis_client = get_redis_client(max_retries=10, retry_interval=1) +redis_pubsub_client = get_redis_pubsub_client(max_retries=10, retry_interval=1) + +# Import and initialize the PubSub manager +from .redis_pubsub import get_pubsub_manager +pubsub_manager = get_pubsub_manager(redis_client) \ No newline at end of file diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 5cd21169..07373af1 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -179,8 +179,15 @@ SIMPLE_JWT = { 'BLACKLIST_AFTER_ROTATION': True, # Optional: Whether to blacklist refresh tokens } -# Redis settings for TS proxy +# Redis connection settings REDIS_URL = 'redis://localhost:6379/0' +REDIS_SOCKET_TIMEOUT = 60 # Socket timeout in seconds +REDIS_SOCKET_CONNECT_TIMEOUT = 5 # Connection timeout in seconds +REDIS_HEALTH_CHECK_INTERVAL = 15 # Health check every 15 seconds +REDIS_SOCKET_KEEPALIVE = True # Enable socket keepalive +REDIS_RETRY_ON_TIMEOUT = True # Retry on timeout +REDIS_MAX_RETRIES = 10 # Maximum number of retries +REDIS_RETRY_INTERVAL = 1 # Initial retry interval in seconds # Proxy Settings PROXY_SETTINGS = {