Improved connection handling for redis pubsub.

This commit is contained in:
SergeantPanda 2025-03-22 08:48:39 -05:00
parent 77002beaac
commit d622c96aba
4 changed files with 290 additions and 33 deletions

View file

@ -18,7 +18,7 @@ import json
from typing import Dict, Optional, Set
from apps.proxy.config import TSConfig as Config
from apps.channels.models import Channel
from core.utils import redis_client as global_redis_client # Import the global Redis client
from core.utils import redis_client as global_redis_client, redis_pubsub_client as global_redis_pubsub_client # Import both global Redis clients
from redis.exceptions import ConnectionError, TimeoutError
from .stream_manager import StreamManager
from .stream_buffer import StreamBuffer
@ -156,30 +156,34 @@ class ProxyServer:
while True:
try:
# Create a dedicated Redis client for PubSub with longer timeouts
# This avoids affecting the main Redis client operations
from django.conf import settings
import redis
# Use the global PubSub client if available
if global_redis_pubsub_client:
pubsub_client = global_redis_pubsub_client
logger.info("Using global Redis PubSub client for event listener")
else:
# Fall back to creating a dedicated client if global one is unavailable
from django.conf import settings
import redis
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
# Create a dedicated client with generous timeouts for PubSub connections
pubsub_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
socket_timeout=60, # Much longer timeout for PubSub operations
socket_connect_timeout=10,
socket_keepalive=True, # Enable TCP keepalive
health_check_interval=30
)
pubsub_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
socket_timeout=60,
socket_connect_timeout=10,
socket_keepalive=True,
health_check_interval=30
)
logger.info("Created dedicated Redis PubSub client for event listener")
# Test connection before subscribing
pubsub_client.ping()
# Create a new pubsub instance from the dedicated client
# Create a pubsub instance from the client
pubsub = pubsub_client.pubsub()
pubsub.psubscribe("ts_proxy:events:*")

223
core/redis_pubsub.py Normal file
View file

@ -0,0 +1,223 @@
"""
Redis PubSub utilities for maintaining long-lived connections.
"""
import threading
import time
import logging
import json
from redis import Redis
from redis.exceptions import ConnectionError, TimeoutError
logger = logging.getLogger(__name__)
class RedisPubSubManager:
"""
A robust Redis PubSub manager that handles disconnections and reconnections.
"""
def __init__(self, redis_client=None, auto_reconnect=True):
"""
Initialize the PubSub manager.
Args:
redis_client: An existing Redis client to use
auto_reconnect: Whether to automatically reconnect on failure
"""
from .utils import get_redis_client
self.redis_client = redis_client or get_redis_client()
self.pubsub = None
self.subscriptions = set()
self.pattern_subscriptions = set()
self.auto_reconnect = auto_reconnect
self.running = True
self.lock = threading.RLock()
self.message_handlers = {} # Map of channels to handler functions
self.message_thread = None
def subscribe(self, channel, handler=None):
"""
Subscribe to a channel.
Args:
channel: The channel to subscribe to
handler: Optional function to call when messages are received
"""
with self.lock:
self.subscriptions.add(channel)
if handler:
self.message_handlers[channel] = handler
if self.pubsub:
self.pubsub.subscribe(channel)
logger.info(f"Subscribed to channel: {channel}")
def psubscribe(self, pattern, handler=None):
"""
Subscribe to a channel pattern.
Args:
pattern: The pattern to subscribe to
handler: Optional function to call when messages are received
"""
with self.lock:
self.pattern_subscriptions.add(pattern)
if handler:
self.message_handlers[pattern] = handler
if self.pubsub:
self.pubsub.psubscribe(pattern)
logger.info(f"Subscribed to pattern: {pattern}")
def publish(self, channel, message):
"""
Publish a message to a channel.
Args:
channel: The channel to publish to
message: The message to publish (will be JSON-encoded if not a string)
Returns:
Number of clients that received the message
"""
try:
if not isinstance(message, str):
message = json.dumps(message)
return self.redis_client.publish(channel, message)
except Exception as e:
logger.error(f"Error publishing to {channel}: {e}")
return 0
def start_listening(self):
"""
Start listening for messages in a background thread.
"""
if not self.message_thread:
self._connect()
self.message_thread = threading.Thread(
target=self._listen_for_messages,
daemon=True,
name="redis-pubsub-listener"
)
self.message_thread.start()
logger.info("Started Redis PubSub listener thread")
def stop(self):
"""
Stop listening and clean up resources.
"""
self.running = False
if self.pubsub:
try:
self.pubsub.close()
except:
pass
self.pubsub = None
def _connect(self):
"""
Establish a new PubSub connection and subscribe to all channels.
"""
with self.lock:
# Close any existing connection
if self.pubsub:
try:
self.pubsub.close()
except:
pass
# Create a new PubSub instance - critical: no timeout for subscribe operations
# This prevents the connection from timing out while waiting for messages
self.pubsub = self.redis_client.pubsub()
# Resubscribe to all channels
if self.subscriptions:
self.pubsub.subscribe(*self.subscriptions)
logger.info(f"Resubscribed to channels: {self.subscriptions}")
# Resubscribe to all patterns
if self.pattern_subscriptions:
self.pubsub.psubscribe(*self.pattern_subscriptions)
logger.info(f"Resubscribed to patterns: {self.pattern_subscriptions}")
def _listen_for_messages(self):
"""
Background thread that listens for messages and handles reconnections.
"""
consecutive_errors = 0
while self.running:
try:
# Check if we need to connect
if not self.pubsub:
self._connect()
# Listen for messages with NO timeout - this is critical!
message = self.pubsub.get_message(timeout=None)
if message:
# Don't process subscription confirmation messages
if message['type'] in ('subscribe', 'psubscribe'):
continue
channel = message.get('channel')
if channel:
# Decode binary channel name if needed
if isinstance(channel, bytes):
channel = channel.decode('utf-8')
# Find and call the appropriate handler
handler = self.message_handlers.get(channel)
if handler:
try:
handler(message)
except Exception as e:
logger.error(f"Error in message handler for {channel}: {e}")
# Reset error counter on success
consecutive_errors = 0
# Small sleep to prevent excessive CPU usage
time.sleep(0.01)
except (ConnectionError, TimeoutError) as e:
consecutive_errors += 1
if not self.auto_reconnect:
logger.error(f"PubSub connection error and auto_reconnect is disabled: {e}")
break
# Exponential backoff for reconnection attempts
backoff = min(consecutive_errors * 0.5, 5)
logger.warning(f"PubSub connection error, reconnecting in {backoff} seconds: {e}")
time.sleep(backoff)
# Reconnect
self._connect()
except Exception as e:
logger.error(f"Unexpected error in PubSub listener: {e}")
time.sleep(1) # Prevent tight loop in case of persistent errors
logger.info("PubSub listener thread stopping")
# Create a singleton instance
pubsub_manager = None
def get_pubsub_manager(redis_client=None):
"""
Get or create the PubSub manager singleton.
Args:
redis_client: Optional Redis client to use
Returns:
The PubSub manager instance
"""
global pubsub_manager
if pubsub_manager is None:
pubsub_manager = RedisPubSubManager(redis_client)
pubsub_manager.start_listening()
return pubsub_manager

View file

@ -2,6 +2,7 @@ import redis
import logging
import time
import os
import threading
from django.conf import settings
from redis.exceptions import ConnectionError, TimeoutError
@ -17,15 +18,23 @@ def get_redis_client(max_retries=5, retry_interval=1):
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
# Use standardized settings
socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5)
socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5)
health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30)
socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True)
retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True)
# Create Redis client with better defaults
client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True,
health_check_interval=30
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
health_check_interval=health_check_interval,
retry_on_timeout=retry_on_timeout
)
# Validate connection with ping
@ -49,7 +58,7 @@ def get_redis_client(max_retries=5, retry_interval=1):
return None
def get_redis_pubsub_client(max_retries=5, retry_interval=3):
"""Get Redis client optimized for PubSub operations with longer timeouts"""
"""Get Redis client optimized for PubSub operations"""
retry_count = 0
while retry_count < max_retries:
try:
@ -58,21 +67,30 @@ def get_redis_pubsub_client(max_retries=5, retry_interval=3):
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
# Create Redis client with PubSub-optimized settings
# Use standardized settings but without socket timeouts for PubSub
# Important: socket_timeout is None for PubSub operations
socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5)
socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True)
health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30)
retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True)
# Create Redis client with PubSub-optimized settings - no timeout
client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
socket_timeout=60, # Longer timeout for blocking operations
socket_connect_timeout=5,
socket_keepalive=True, # Enable TCP keepalive
health_check_interval=30,
retry_on_timeout=True
socket_timeout=None, # Critical: No timeout for PubSub operations
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
health_check_interval=health_check_interval,
retry_on_timeout=retry_on_timeout
)
# Validate connection with ping
client.ping()
logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}")
# We don't need the keepalive thread anymore since we're using proper PubSub handling
return client
except (ConnectionError, TimeoutError) as e:
@ -114,5 +132,10 @@ def execute_redis_command(redis_client, command_func, default_return=None):
logger.error(f"Redis command error: {e}")
return default_return
# Initialize the global client with retry logic
redis_client = get_redis_client(max_retries=10, retry_interval=1)
# Initialize the global clients with retry logic
redis_client = get_redis_client(max_retries=10, retry_interval=1)
redis_pubsub_client = get_redis_pubsub_client(max_retries=10, retry_interval=1)
# Import and initialize the PubSub manager
from .redis_pubsub import get_pubsub_manager
pubsub_manager = get_pubsub_manager(redis_client)

View file

@ -179,8 +179,15 @@ SIMPLE_JWT = {
'BLACKLIST_AFTER_ROTATION': True, # Optional: Whether to blacklist refresh tokens
}
# Redis settings for TS proxy
# Redis connection settings
REDIS_URL = 'redis://localhost:6379/0'
REDIS_SOCKET_TIMEOUT = 60 # Socket timeout in seconds
REDIS_SOCKET_CONNECT_TIMEOUT = 5 # Connection timeout in seconds
REDIS_HEALTH_CHECK_INTERVAL = 15 # Health check every 15 seconds
REDIS_SOCKET_KEEPALIVE = True # Enable socket keepalive
REDIS_RETRY_ON_TIMEOUT = True # Retry on timeout
REDIS_MAX_RETRIES = 10 # Maximum number of retries
REDIS_RETRY_INTERVAL = 1 # Initial retry interval in seconds
# Proxy Settings
PROXY_SETTINGS = {