forked from Mirrors/Dispatcharr
Singular redis-client.
This commit is contained in:
parent
4738d301d1
commit
efaa7f7195
8 changed files with 603 additions and 255 deletions
|
|
@ -4,6 +4,7 @@ import re
|
|||
from . import proxy_server
|
||||
from .redis_keys import RedisKeys
|
||||
from .constants import TS_PACKET_SIZE
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
|
||||
|
|
@ -172,76 +173,98 @@ class ChannelStatus:
|
|||
|
||||
return info
|
||||
|
||||
# Function for basic channel info (used for all channels summary)
|
||||
def get_basic_channel_info(channel_id):
|
||||
# Get channel metadata
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
metadata = proxy_server.redis_client.hgetall(metadata_key)
|
||||
|
||||
if not metadata:
|
||||
@staticmethod
|
||||
def _execute_redis_command(command_func):
|
||||
"""Execute Redis command with error handling"""
|
||||
if not proxy_server.redis_client:
|
||||
return None
|
||||
|
||||
# Basic channel info only - omit diagnostics and details
|
||||
buffer_index_key = RedisKeys.buffer_index(channel_id)
|
||||
buffer_index_value = proxy_server.redis_client.get(buffer_index_key)
|
||||
try:
|
||||
return command_func()
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection error in ChannelStatus: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error in ChannelStatus: {e}")
|
||||
return None
|
||||
|
||||
# Count clients (using efficient count method)
|
||||
client_set_key = RedisKeys.clients(channel_id)
|
||||
client_count = proxy_server.redis_client.scard(client_set_key) or 0
|
||||
@staticmethod
|
||||
def get_basic_channel_info(channel_id):
|
||||
"""Get basic channel information with Redis error handling"""
|
||||
try:
|
||||
# Use _execute_redis_command for Redis operations
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
metadata = ChannelStatus._execute_redis_command(
|
||||
lambda: proxy_server.redis_client.hgetall(metadata_key)
|
||||
)
|
||||
|
||||
# Calculate uptime
|
||||
created_at = float(metadata.get(b'init_time', b'0').decode('utf-8'))
|
||||
uptime = time.time() - created_at if created_at > 0 else 0
|
||||
if not metadata:
|
||||
return None
|
||||
|
||||
# Simplified info
|
||||
info = {
|
||||
'channel_id': channel_id,
|
||||
'state': metadata.get(b'state', b'unknown').decode('utf-8'),
|
||||
'url': metadata.get(b'url', b'').decode('utf-8'),
|
||||
'profile': metadata.get(b'profile', b'unknown').decode('utf-8'),
|
||||
'owner': metadata.get(b'owner', b'unknown').decode('utf-8'),
|
||||
'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0,
|
||||
'client_count': client_count,
|
||||
'uptime': uptime
|
||||
}
|
||||
# Basic channel info only - omit diagnostics and details
|
||||
buffer_index_key = RedisKeys.buffer_index(channel_id)
|
||||
buffer_index_value = proxy_server.redis_client.get(buffer_index_key)
|
||||
|
||||
# Quick health check if available locally
|
||||
if channel_id in proxy_server.stream_managers:
|
||||
manager = proxy_server.stream_managers[channel_id]
|
||||
info['healthy'] = manager.healthy
|
||||
# Count clients (using efficient count method)
|
||||
client_set_key = RedisKeys.clients(channel_id)
|
||||
client_count = proxy_server.redis_client.scard(client_set_key) or 0
|
||||
|
||||
# Get concise client information
|
||||
clients = []
|
||||
client_ids = proxy_server.redis_client.smembers(client_set_key)
|
||||
# Calculate uptime
|
||||
created_at = float(metadata.get(b'init_time', b'0').decode('utf-8'))
|
||||
uptime = time.time() - created_at if created_at > 0 else 0
|
||||
|
||||
# Process only if we have clients and keep it limited
|
||||
if client_ids:
|
||||
# Get up to 10 clients for the basic view
|
||||
for client_id in list(client_ids)[:10]:
|
||||
client_id_str = client_id.decode('utf-8')
|
||||
client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}"
|
||||
# Simplified info
|
||||
info = {
|
||||
'channel_id': channel_id,
|
||||
'state': metadata.get(b'state', b'unknown').decode('utf-8'),
|
||||
'url': metadata.get(b'url', b'').decode('utf-8'),
|
||||
'profile': metadata.get(b'profile', b'unknown').decode('utf-8'),
|
||||
'owner': metadata.get(b'owner', b'unknown').decode('utf-8'),
|
||||
'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0,
|
||||
'client_count': client_count,
|
||||
'uptime': uptime
|
||||
}
|
||||
|
||||
# Efficient way - just retrieve the essentials
|
||||
client_info = {
|
||||
'client_id': client_id_str,
|
||||
'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'),
|
||||
'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'),
|
||||
}
|
||||
# Quick health check if available locally
|
||||
if channel_id in proxy_server.stream_managers:
|
||||
manager = proxy_server.stream_managers[channel_id]
|
||||
info['healthy'] = manager.healthy
|
||||
|
||||
if client_info['user_agent']:
|
||||
client_info['user_agent'] = client_info['user_agent'].decode('utf-8')
|
||||
else:
|
||||
client_info['user_agent'] = 'unknown'
|
||||
# Get concise client information
|
||||
clients = []
|
||||
client_ids = proxy_server.redis_client.smembers(client_set_key)
|
||||
|
||||
# Just get connected_at for client age
|
||||
connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at')
|
||||
if connected_at_bytes:
|
||||
connected_at = float(connected_at_bytes.decode('utf-8'))
|
||||
client_info['connected_since'] = time.time() - connected_at
|
||||
# Process only if we have clients and keep it limited
|
||||
if client_ids:
|
||||
# Get up to 10 clients for the basic view
|
||||
for client_id in list(client_ids)[:10]:
|
||||
client_id_str = client_id.decode('utf-8')
|
||||
client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}"
|
||||
|
||||
clients.append(client_info)
|
||||
# Efficient way - just retrieve the essentials
|
||||
client_info = {
|
||||
'client_id': client_id_str,
|
||||
'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'),
|
||||
'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'),
|
||||
}
|
||||
|
||||
# Add clients to info
|
||||
info['clients'] = clients
|
||||
if client_info['user_agent']:
|
||||
client_info['user_agent'] = client_info['user_agent'].decode('utf-8')
|
||||
else:
|
||||
client_info['user_agent'] = 'unknown'
|
||||
|
||||
return info
|
||||
# Just get connected_at for client age
|
||||
connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at')
|
||||
if connected_at_bytes:
|
||||
connected_at = float(connected_at_bytes.decode('utf-8'))
|
||||
client_info['connected_since'] = time.time() - connected_at
|
||||
|
||||
clients.append(client_info)
|
||||
|
||||
# Add clients to info
|
||||
info['clients'] = clients
|
||||
|
||||
return info
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting channel info: {e}")
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import time
|
|||
import json
|
||||
from typing import Set, Optional
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
from .constants import EventType
|
||||
from .config_helper import ConfigHelper
|
||||
from .redis_keys import RedisKeys
|
||||
|
|
@ -120,6 +121,20 @@ class ClientManager:
|
|||
thread.start()
|
||||
logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
|
||||
|
||||
def _execute_redis_command(self, command_func):
|
||||
"""Execute Redis command with error handling"""
|
||||
if not self.redis_client:
|
||||
return None
|
||||
|
||||
try:
|
||||
return command_func()
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection error in ClientManager: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error in ClientManager: {e}")
|
||||
return None
|
||||
|
||||
def _notify_owner_of_activity(self):
|
||||
"""Notify channel owner that clients are active on this worker"""
|
||||
if not self.redis_client or not self.clients:
|
||||
|
|
@ -130,11 +145,15 @@ class ClientManager:
|
|||
|
||||
# STANDARDIZED KEY: Worker info under channel namespace
|
||||
worker_key = f"ts_proxy:channel:{self.channel_id}:worker:{worker_id}"
|
||||
self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients)))
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients)))
|
||||
)
|
||||
|
||||
# STANDARDIZED KEY: Activity timestamp under channel namespace
|
||||
activity_key = f"ts_proxy:channel:{self.channel_id}:activity"
|
||||
self.redis_client.setex(activity_key, self.client_ttl, str(time.time()))
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.setex(activity_key, self.client_ttl, str(time.time()))
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error notifying owner of client activity: {e}")
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ import json
|
|||
from typing import Dict, Optional, Set
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from apps.channels.models import Channel
|
||||
from core.utils import redis_client as global_redis_client # Import the global Redis client
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
from .stream_manager import StreamManager
|
||||
from .stream_buffer import StreamBuffer
|
||||
from .client_manager import ClientManager
|
||||
|
|
@ -43,19 +45,25 @@ class ProxyServer:
|
|||
hostname = socket.gethostname()
|
||||
self.worker_id = f"{hostname}:{pid}"
|
||||
|
||||
# Connect to Redis
|
||||
# Connect to Redis - try using global client first
|
||||
self.redis_client = None
|
||||
try:
|
||||
import redis
|
||||
from django.conf import settings
|
||||
self.redis_connection_attempts = 0
|
||||
self.redis_max_retries = 3
|
||||
self.redis_retry_interval = 5 # seconds
|
||||
|
||||
try:
|
||||
# First try to use the global client from core.utils
|
||||
if global_redis_client is not None:
|
||||
self.redis_client = global_redis_client
|
||||
logger.info(f"Using global Redis client")
|
||||
logger.info(f"Worker ID: {self.worker_id}")
|
||||
else:
|
||||
# Fall back to direct connection with retry
|
||||
self._setup_redis_connection()
|
||||
|
||||
redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0')
|
||||
self.redis_client = redis.from_url(redis_url)
|
||||
logger.info(f"Connected to Redis at {redis_url}")
|
||||
logger.info(f"Worker ID: {self.worker_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize Redis: {e}")
|
||||
self.redis_client = None
|
||||
logger.error(f"Failed to connect to Redis: {e}")
|
||||
|
||||
# Start cleanup thread
|
||||
self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60)
|
||||
|
|
@ -64,179 +72,302 @@ class ProxyServer:
|
|||
# Start event listener for Redis pubsub messages
|
||||
self._start_event_listener()
|
||||
|
||||
def _setup_redis_connection(self):
|
||||
"""Setup Redis connection with retry logic"""
|
||||
import redis
|
||||
from django.conf import settings
|
||||
|
||||
while self.redis_connection_attempts < self.redis_max_retries:
|
||||
try:
|
||||
logger.info(f"Attempting to connect to Redis ({self.redis_connection_attempts+1}/{self.redis_max_retries})")
|
||||
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
# Create Redis client with reasonable timeouts
|
||||
self.redis_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=5,
|
||||
socket_connect_timeout=5,
|
||||
retry_on_timeout=True,
|
||||
health_check_interval=30
|
||||
)
|
||||
|
||||
# Test connection
|
||||
self.redis_client.ping()
|
||||
logger.info(f"Successfully connected to Redis at {redis_host}:{redis_port}/{redis_db}")
|
||||
logger.info(f"Worker ID: {self.worker_id}")
|
||||
break
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
self.redis_connection_attempts += 1
|
||||
if self.redis_connection_attempts >= self.redis_max_retries:
|
||||
logger.error(f"Failed to connect to Redis after {self.redis_max_retries} attempts: {e}")
|
||||
self.redis_client = None
|
||||
else:
|
||||
# Exponential backoff with a maximum of 30 seconds
|
||||
retry_delay = min(self.redis_retry_interval * (2 ** (self.redis_connection_attempts - 1)), 30)
|
||||
logger.warning(f"Redis connection failed. Retrying in {retry_delay}s... ({self.redis_connection_attempts}/{self.redis_max_retries})")
|
||||
time.sleep(retry_delay)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis: {e}", exc_info=True)
|
||||
self.redis_client = None
|
||||
break
|
||||
|
||||
def _execute_redis_command(self, command_func, *args, **kwargs):
|
||||
"""Execute Redis command with error handling and reconnection logic"""
|
||||
if not self.redis_client:
|
||||
return None
|
||||
|
||||
try:
|
||||
return command_func(*args, **kwargs)
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection lost: {e}. Attempting to reconnect...")
|
||||
try:
|
||||
# Try to reconnect
|
||||
self.redis_connection_attempts = 0
|
||||
self._setup_redis_connection()
|
||||
if self.redis_client:
|
||||
# Retry the command once
|
||||
return command_func(*args, **kwargs)
|
||||
except Exception as reconnect_error:
|
||||
logger.error(f"Failed to reconnect to Redis: {reconnect_error}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error: {e}")
|
||||
return None
|
||||
|
||||
def _start_event_listener(self):
|
||||
"""Listen for events from other workers"""
|
||||
if not self.redis_client:
|
||||
return
|
||||
|
||||
def event_listener():
|
||||
try:
|
||||
pubsub = self.redis_client.pubsub()
|
||||
pubsub.psubscribe("ts_proxy:events:*")
|
||||
retry_count = 0
|
||||
max_retries = 10
|
||||
base_retry_delay = 1 # Start with 1 second delay
|
||||
max_retry_delay = 30 # Cap at 30 seconds
|
||||
|
||||
logger.info(f"Started Redis event listener for client activity")
|
||||
while True:
|
||||
try:
|
||||
# Create a dedicated Redis client for PubSub with longer timeouts
|
||||
# This avoids affecting the main Redis client operations
|
||||
from django.conf import settings
|
||||
import redis
|
||||
|
||||
for message in pubsub.listen():
|
||||
if message["type"] != "pmessage":
|
||||
continue
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
try:
|
||||
channel = message["channel"].decode("utf-8")
|
||||
data = json.loads(message["data"].decode("utf-8"))
|
||||
# Create a dedicated client with generous timeouts for PubSub connections
|
||||
pubsub_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=60, # Much longer timeout for PubSub operations
|
||||
socket_connect_timeout=10,
|
||||
socket_keepalive=True, # Enable TCP keepalive
|
||||
health_check_interval=30
|
||||
)
|
||||
|
||||
event_type = data.get("event")
|
||||
channel_id = data.get("channel_id")
|
||||
# Test connection before subscribing
|
||||
pubsub_client.ping()
|
||||
|
||||
if channel_id and event_type:
|
||||
# For owner, update client status immediately
|
||||
if self.am_i_owner(channel_id):
|
||||
if event_type == EventType.CLIENT_CONNECTED:
|
||||
logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}")
|
||||
# Reset any disconnect timer
|
||||
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
||||
self.redis_client.delete(disconnect_key)
|
||||
# Create a new pubsub instance from the dedicated client
|
||||
pubsub = pubsub_client.pubsub()
|
||||
pubsub.psubscribe("ts_proxy:events:*")
|
||||
|
||||
elif event_type == EventType.CLIENT_DISCONNECTED:
|
||||
logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}")
|
||||
# Check if any clients remain
|
||||
if channel_id in self.client_managers:
|
||||
# VERIFY REDIS CLIENT COUNT DIRECTLY
|
||||
client_set_key = RedisKeys.clients(channel_id)
|
||||
total = self.redis_client.scard(client_set_key) or 0
|
||||
logger.info(f"Started Redis event listener for client activity")
|
||||
|
||||
if total == 0:
|
||||
logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}")
|
||||
# Set the disconnect timer for other workers to see
|
||||
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
||||
self.redis_client.setex(disconnect_key, 60, str(time.time()))
|
||||
# Reset retry count on successful connection
|
||||
retry_count = 0
|
||||
|
||||
# Get configured shutdown delay or default
|
||||
shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0)
|
||||
for message in pubsub.listen():
|
||||
if message["type"] != "pmessage":
|
||||
continue
|
||||
|
||||
if shutdown_delay > 0:
|
||||
logger.info(f"Waiting {shutdown_delay}s before stopping channel...")
|
||||
time.sleep(shutdown_delay)
|
||||
try:
|
||||
channel = message["channel"].decode("utf-8")
|
||||
data = json.loads(message["data"].decode("utf-8"))
|
||||
|
||||
# Re-check client count before stopping
|
||||
total = self.redis_client.scard(client_set_key) or 0
|
||||
if total > 0:
|
||||
logger.info(f"New clients connected during shutdown delay - aborting shutdown")
|
||||
self.redis_client.delete(disconnect_key)
|
||||
return
|
||||
event_type = data.get("event")
|
||||
channel_id = data.get("channel_id")
|
||||
|
||||
# Stop the channel directly
|
||||
if channel_id and event_type:
|
||||
# For owner, update client status immediately
|
||||
if self.am_i_owner(channel_id):
|
||||
if event_type == EventType.CLIENT_CONNECTED:
|
||||
logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}")
|
||||
# Reset any disconnect timer
|
||||
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
||||
self.redis_client.delete(disconnect_key)
|
||||
|
||||
elif event_type == EventType.CLIENT_DISCONNECTED:
|
||||
logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}")
|
||||
# Check if any clients remain
|
||||
if channel_id in self.client_managers:
|
||||
# VERIFY REDIS CLIENT COUNT DIRECTLY
|
||||
client_set_key = RedisKeys.clients(channel_id)
|
||||
total = self.redis_client.scard(client_set_key) or 0
|
||||
|
||||
if total == 0:
|
||||
logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}")
|
||||
# Set the disconnect timer for other workers to see
|
||||
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
||||
self.redis_client.setex(disconnect_key, 60, str(time.time()))
|
||||
|
||||
# Get configured shutdown delay or default
|
||||
shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0)
|
||||
|
||||
if shutdown_delay > 0:
|
||||
logger.info(f"Waiting {shutdown_delay}s before stopping channel...")
|
||||
time.sleep(shutdown_delay)
|
||||
|
||||
# Re-check client count before stopping
|
||||
total = self.redis_client.scard(client_set_key) or 0
|
||||
if total > 0:
|
||||
logger.info(f"New clients connected during shutdown delay - aborting shutdown")
|
||||
self.redis_client.delete(disconnect_key)
|
||||
return
|
||||
|
||||
# Stop the channel directly
|
||||
self.stop_channel(channel_id)
|
||||
|
||||
|
||||
elif event_type == EventType.STREAM_SWITCH:
|
||||
logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}")
|
||||
# Handle stream switch request
|
||||
new_url = data.get("url")
|
||||
user_agent = data.get("user_agent")
|
||||
|
||||
if new_url and channel_id in self.stream_managers:
|
||||
# Update metadata in Redis
|
||||
if self.redis_client:
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
self.redis_client.hset(metadata_key, "url", new_url)
|
||||
if user_agent:
|
||||
self.redis_client.hset(metadata_key, "user_agent", user_agent)
|
||||
|
||||
# Set switch status
|
||||
status_key = RedisKeys.switch_status(channel_id)
|
||||
self.redis_client.set(status_key, "switching")
|
||||
|
||||
# Perform the stream switch
|
||||
stream_manager = self.stream_managers[channel_id]
|
||||
success = stream_manager.update_url(new_url)
|
||||
|
||||
if success:
|
||||
logger.info(f"Stream switch initiated for channel {channel_id}")
|
||||
|
||||
# Publish confirmation
|
||||
switch_result = {
|
||||
"event": EventType.STREAM_SWITCHED, # Use constant instead of string
|
||||
"channel_id": channel_id,
|
||||
"success": True,
|
||||
"url": new_url,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_result)
|
||||
)
|
||||
|
||||
# Update status
|
||||
if self.redis_client:
|
||||
self.redis_client.set(status_key, "switched")
|
||||
else:
|
||||
logger.error(f"Failed to switch stream for channel {channel_id}")
|
||||
|
||||
# Publish failure
|
||||
switch_result = {
|
||||
"event": EventType.STREAM_SWITCHED,
|
||||
"channel_id": channel_id,
|
||||
"success": False,
|
||||
"url": new_url,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_result)
|
||||
)
|
||||
elif event_type == EventType.CHANNEL_STOP:
|
||||
logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}")
|
||||
# First mark channel as stopping in Redis
|
||||
if self.redis_client:
|
||||
# Set stopping state in metadata
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
if self.redis_client.exists(metadata_key):
|
||||
self.redis_client.hset(metadata_key, mapping={
|
||||
"state": ChannelState.STOPPING,
|
||||
"state_changed_at": str(time.time())
|
||||
})
|
||||
|
||||
# If we have local resources for this channel, clean them up
|
||||
if channel_id in self.stream_buffers or channel_id in self.client_managers:
|
||||
# Use existing stop_channel method
|
||||
logger.info(f"Stopping local resources for channel {channel_id}")
|
||||
self.stop_channel(channel_id)
|
||||
|
||||
# Acknowledge stop by publishing a response
|
||||
stop_response = {
|
||||
"event": EventType.CHANNEL_STOPPED,
|
||||
"channel_id": channel_id,
|
||||
"worker_id": self.worker_id,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(stop_response)
|
||||
)
|
||||
elif event_type == EventType.CLIENT_STOP:
|
||||
client_id = data.get("client_id")
|
||||
if client_id and channel_id:
|
||||
logger.info(f"Received request to stop client {client_id} on channel {channel_id}")
|
||||
|
||||
elif event_type == EventType.STREAM_SWITCH:
|
||||
logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}")
|
||||
# Handle stream switch request
|
||||
new_url = data.get("url")
|
||||
user_agent = data.get("user_agent")
|
||||
# Both remove from client manager AND set a key for the generator to detect
|
||||
if channel_id in self.client_managers:
|
||||
client_manager = self.client_managers[channel_id]
|
||||
if client_id in client_manager.clients:
|
||||
client_manager.remove_client(client_id)
|
||||
logger.info(f"Removed client {client_id} from client manager")
|
||||
|
||||
if new_url and channel_id in self.stream_managers:
|
||||
# Update metadata in Redis
|
||||
if self.redis_client:
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
self.redis_client.hset(metadata_key, "url", new_url)
|
||||
if user_agent:
|
||||
self.redis_client.hset(metadata_key, "user_agent", user_agent)
|
||||
|
||||
# Set switch status
|
||||
status_key = RedisKeys.switch_status(channel_id)
|
||||
self.redis_client.set(status_key, "switching")
|
||||
|
||||
# Perform the stream switch
|
||||
stream_manager = self.stream_managers[channel_id]
|
||||
success = stream_manager.update_url(new_url)
|
||||
|
||||
if success:
|
||||
logger.info(f"Stream switch initiated for channel {channel_id}")
|
||||
|
||||
# Publish confirmation
|
||||
switch_result = {
|
||||
"event": EventType.STREAM_SWITCHED, # Use constant instead of string
|
||||
"channel_id": channel_id,
|
||||
"success": True,
|
||||
"url": new_url,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_result)
|
||||
)
|
||||
|
||||
# Update status
|
||||
# Set a Redis key for the generator to detect
|
||||
if self.redis_client:
|
||||
self.redis_client.set(status_key, "switched")
|
||||
else:
|
||||
logger.error(f"Failed to switch stream for channel {channel_id}")
|
||||
stop_key = RedisKeys.client_stop(channel_id, client_id)
|
||||
self.redis_client.setex(stop_key, 30, "true") # 30 second TTL
|
||||
logger.info(f"Set stop key for client {client_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing event message: {e}")
|
||||
|
||||
# Publish failure
|
||||
switch_result = {
|
||||
"event": EventType.STREAM_SWITCHED,
|
||||
"channel_id": channel_id,
|
||||
"success": False,
|
||||
"url": new_url,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_result)
|
||||
)
|
||||
elif event_type == EventType.CHANNEL_STOP:
|
||||
logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}")
|
||||
# First mark channel as stopping in Redis
|
||||
if self.redis_client:
|
||||
# Set stopping state in metadata
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
if self.redis_client.exists(metadata_key):
|
||||
self.redis_client.hset(metadata_key, mapping={
|
||||
"state": ChannelState.STOPPING,
|
||||
"state_changed_at": str(time.time())
|
||||
})
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
# Calculate exponential backoff with jitter
|
||||
retry_count += 1
|
||||
delay = min(base_retry_delay * (2 ** (retry_count - 1)), max_retry_delay)
|
||||
# Add some randomness to prevent thundering herd
|
||||
jitter = random.uniform(0, 0.5 * delay)
|
||||
final_delay = delay + jitter
|
||||
|
||||
# If we have local resources for this channel, clean them up
|
||||
if channel_id in self.stream_buffers or channel_id in self.client_managers:
|
||||
# Use existing stop_channel method
|
||||
logger.info(f"Stopping local resources for channel {channel_id}")
|
||||
self.stop_channel(channel_id)
|
||||
logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})")
|
||||
time.sleep(final_delay)
|
||||
|
||||
# Acknowledge stop by publishing a response
|
||||
stop_response = {
|
||||
"event": EventType.CHANNEL_STOPPED,
|
||||
"channel_id": channel_id,
|
||||
"worker_id": self.worker_id,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(stop_response)
|
||||
)
|
||||
elif event_type == EventType.CLIENT_STOP:
|
||||
client_id = data.get("client_id")
|
||||
if client_id and channel_id:
|
||||
logger.info(f"Received request to stop client {client_id} on channel {channel_id}")
|
||||
# Try to clean up the old connection
|
||||
try:
|
||||
if 'pubsub' in locals():
|
||||
pubsub.close()
|
||||
if 'pubsub_client' in locals():
|
||||
pubsub_client.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Both remove from client manager AND set a key for the generator to detect
|
||||
if channel_id in self.client_managers:
|
||||
client_manager = self.client_managers[channel_id]
|
||||
if client_id in client_manager.clients:
|
||||
client_manager.remove_client(client_id)
|
||||
logger.info(f"Removed client {client_id} from client manager")
|
||||
|
||||
# Set a Redis key for the generator to detect
|
||||
if self.redis_client:
|
||||
stop_key = RedisKeys.client_stop(channel_id, client_id)
|
||||
self.redis_client.setex(stop_key, 30, "true") # 30 second TTL
|
||||
logger.info(f"Set stop key for client {client_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing event message: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in event listener: {e}")
|
||||
time.sleep(5) # Wait before reconnecting
|
||||
# Try to restart the listener
|
||||
self._start_event_listener()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in event listener: {e}")
|
||||
# Add a short delay to prevent rapid retries on persistent errors
|
||||
time.sleep(5)
|
||||
|
||||
thread = threading.Thread(target=event_listener, daemon=True)
|
||||
thread.name = "redis-event-listener"
|
||||
|
|
@ -249,10 +380,9 @@ class ProxyServer:
|
|||
|
||||
try:
|
||||
lock_key = RedisKeys.channel_owner(channel_id)
|
||||
owner = self.redis_client.get(lock_key)
|
||||
if owner:
|
||||
return owner.decode('utf-8')
|
||||
return None
|
||||
return self._execute_redis_command(
|
||||
lambda: self.redis_client.get(lock_key).decode('utf-8') if self.redis_client.get(lock_key) else None
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting channel owner: {e}")
|
||||
return None
|
||||
|
|
@ -271,20 +401,32 @@ class ProxyServer:
|
|||
# Create a lock key with proper namespace
|
||||
lock_key = RedisKeys.channel_owner(channel_id)
|
||||
|
||||
# Use Redis SETNX for atomic locking - only succeeds if the key doesn't exist
|
||||
acquired = self.redis_client.setnx(lock_key, self.worker_id)
|
||||
# Use Redis SETNX for atomic locking with error handling
|
||||
acquired = self._execute_redis_command(
|
||||
lambda: self.redis_client.setnx(lock_key, self.worker_id)
|
||||
)
|
||||
|
||||
if acquired is None: # Redis command failed
|
||||
logger.warning(f"Redis command failed during ownership acquisition - assuming ownership")
|
||||
return True
|
||||
|
||||
# If acquired, set expiry to prevent orphaned locks
|
||||
if acquired:
|
||||
self.redis_client.expire(lock_key, ttl)
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.expire(lock_key, ttl)
|
||||
)
|
||||
logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}")
|
||||
return True
|
||||
|
||||
# If not acquired, check if we already own it (might be a retry)
|
||||
current_owner = self.redis_client.get(lock_key)
|
||||
current_owner = self._execute_redis_command(
|
||||
lambda: self.redis_client.get(lock_key)
|
||||
)
|
||||
if current_owner and current_owner.decode('utf-8') == self.worker_id:
|
||||
# Refresh TTL
|
||||
self.redis_client.expire(lock_key, ttl)
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.expire(lock_key, ttl)
|
||||
)
|
||||
logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}")
|
||||
return True
|
||||
|
||||
|
|
@ -689,7 +831,7 @@ class ProxyServer:
|
|||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping channel {channel_id}: {e}", exc_info=True)
|
||||
logger.error(f"Error stopping channel {channel_id}: {e}")
|
||||
return False
|
||||
|
||||
def check_inactive_channels(self):
|
||||
|
|
@ -723,7 +865,9 @@ class ProxyServer:
|
|||
# Send worker heartbeat first
|
||||
if self.redis_client:
|
||||
worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat"
|
||||
self.redis_client.setex(worker_heartbeat_key, 30, str(time.time()))
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.setex(worker_heartbeat_key, 30, str(time.time()))
|
||||
)
|
||||
|
||||
# Refresh channel registry
|
||||
self.refresh_channel_registry()
|
||||
|
|
|
|||
132
core/utils.py
132
core/utils.py
|
|
@ -1,28 +1,118 @@
|
|||
import redis
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
from django.conf import settings
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_redis_client():
|
||||
"""Get Redis client with connection validation"""
|
||||
try:
|
||||
# Create Redis client
|
||||
client = redis.Redis(
|
||||
host=settings.REDIS_HOST,
|
||||
port=getattr(settings, 'REDIS_PORT', 6379),
|
||||
db=settings.REDIS_DB,
|
||||
socket_timeout=5,
|
||||
socket_connect_timeout=5
|
||||
)
|
||||
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
logger.info(f"Connected to Redis at {settings.REDIS_HOST}:6379/{settings.REDIS_DB}")
|
||||
return client
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Redis: {e}")
|
||||
return None
|
||||
def get_redis_client(max_retries=5, retry_interval=1):
|
||||
"""Get Redis client with connection validation and retry logic"""
|
||||
retry_count = 0
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
# Initialize the global client
|
||||
redis_client = get_redis_client()
|
||||
# Create Redis client with better defaults
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=5,
|
||||
socket_connect_timeout=5,
|
||||
retry_on_timeout=True,
|
||||
health_check_interval=30
|
||||
)
|
||||
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}")
|
||||
return client
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}")
|
||||
return None
|
||||
else:
|
||||
# Use exponential backoff for retries
|
||||
wait_time = retry_interval * (2 ** (retry_count - 1))
|
||||
logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis: {e}")
|
||||
return None
|
||||
|
||||
def get_redis_pubsub_client(max_retries=5, retry_interval=3):
|
||||
"""Get Redis client optimized for PubSub operations with longer timeouts"""
|
||||
retry_count = 0
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
# Create Redis client with PubSub-optimized settings
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=60, # Longer timeout for blocking operations
|
||||
socket_connect_timeout=5,
|
||||
socket_keepalive=True, # Enable TCP keepalive
|
||||
health_check_interval=30,
|
||||
retry_on_timeout=True
|
||||
)
|
||||
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}")
|
||||
return client
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}")
|
||||
return None
|
||||
else:
|
||||
# Use exponential backoff for retries
|
||||
wait_time = retry_interval * (2 ** (retry_count - 1))
|
||||
logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis for PubSub: {e}")
|
||||
return None
|
||||
|
||||
def execute_redis_command(redis_client, command_func, default_return=None):
|
||||
"""
|
||||
Execute a Redis command with proper error handling
|
||||
|
||||
Args:
|
||||
redis_client: The Redis client instance
|
||||
command_func: Lambda function containing the Redis command to execute
|
||||
default_return: Value to return if command fails
|
||||
|
||||
Returns:
|
||||
Command result or default_return on failure
|
||||
"""
|
||||
if redis_client is None:
|
||||
return default_return
|
||||
|
||||
try:
|
||||
return command_func()
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection error: {e}")
|
||||
return default_return
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error: {e}")
|
||||
return default_return
|
||||
|
||||
# Initialize the global client with retry logic
|
||||
redis_client = get_redis_client(max_retries=10, retry_interval=1)
|
||||
|
|
@ -85,10 +85,6 @@ else
|
|||
pids+=("$nginx_pid")
|
||||
fi
|
||||
|
||||
cd /app
|
||||
python manage.py migrate --noinput
|
||||
python manage.py collectstatic --noinput
|
||||
|
||||
uwsgi_file="/app/docker/uwsgi.ini"
|
||||
if [ "$DISPATCHARR_ENV" = "dev" ]; then
|
||||
uwsgi_file="/app/docker/uwsgi.dev.ini"
|
||||
|
|
@ -100,6 +96,12 @@ uwsgi_pid=$(pgrep uwsgi | sort | head -n1)
|
|||
echo "✅ uwsgi started with PID $uwsgi_pid"
|
||||
pids+=("$uwsgi_pid")
|
||||
|
||||
|
||||
|
||||
cd /app
|
||||
python manage.py migrate --noinput
|
||||
python manage.py collectstatic --noinput
|
||||
|
||||
# Wait for at least one process to exit and log the process that exited first
|
||||
if [ ${#pids[@]} -gt 0 ]; then
|
||||
echo "⏳ Waiting for processes to exit..."
|
||||
|
|
|
|||
|
|
@ -2,9 +2,14 @@
|
|||
; exec-before = python manage.py collectstatic --noinput
|
||||
; exec-before = python manage.py migrate --noinput
|
||||
|
||||
; First run Redis availability check script once
|
||||
exec-pre = python /app/scripts/wait_for_redis.py
|
||||
|
||||
; Start Redis first
|
||||
attach-daemon = redis-server
|
||||
; Then start other services
|
||||
attach-daemon = celery -A dispatcharr worker -l info
|
||||
attach-daemon = celery -A dispatcharr beat -l info
|
||||
attach-daemon = redis-server
|
||||
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
|
||||
attach-daemon = cd /app/frontend && npm run dev
|
||||
|
||||
|
|
|
|||
|
|
@ -2,9 +2,14 @@
|
|||
; exec-before = python manage.py collectstatic --noinput
|
||||
; exec-before = python manage.py migrate --noinput
|
||||
|
||||
; First run Redis availability check script once
|
||||
exec-pre = python /app/scripts/wait_for_redis.py
|
||||
|
||||
; Start Redis first
|
||||
attach-daemon = redis-server
|
||||
; Then start other services
|
||||
attach-daemon = celery -A dispatcharr worker -l error
|
||||
attach-daemon = celery -A dispatcharr beat -l error
|
||||
attach-daemon = redis-server
|
||||
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
|
||||
|
||||
# Core settings
|
||||
|
|
|
|||
60
scripts/wait_for_redis.py
Normal file
60
scripts/wait_for_redis.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
#!/usr/bin/env python
|
||||
"""
|
||||
Helper script to wait for Redis to be available before starting the application.
|
||||
"""
|
||||
|
||||
import redis
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def wait_for_redis(host='localhost', port=6379, db=0, max_retries=30, retry_interval=2):
|
||||
"""Wait for Redis to become available"""
|
||||
redis_client = None
|
||||
retry_count = 0
|
||||
|
||||
logger.info(f"Waiting for Redis at {host}:{port}/{db}...")
|
||||
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
redis_client = redis.Redis(
|
||||
host=host,
|
||||
port=port,
|
||||
db=db,
|
||||
socket_timeout=2,
|
||||
socket_connect_timeout=2
|
||||
)
|
||||
redis_client.ping()
|
||||
logger.info(f"✅ Redis at {host}:{port}/{db} is now available!")
|
||||
return True
|
||||
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"❌ Failed to connect to Redis after {max_retries} attempts: {e}")
|
||||
return False
|
||||
|
||||
logger.info(f"⏳ Redis not available yet, retrying in {retry_interval}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(retry_interval)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Unexpected error connecting to Redis: {e}")
|
||||
return False
|
||||
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
host = os.environ.get('REDIS_HOST', 'localhost')
|
||||
port = int(os.environ.get('REDIS_PORT', 6379))
|
||||
db = int(os.environ.get('REDIS_DB', 0))
|
||||
max_retries = int(os.environ.get('REDIS_WAIT_RETRIES', 30))
|
||||
retry_interval = int(os.environ.get('REDIS_WAIT_INTERVAL', 2))
|
||||
|
||||
logger.info(f"Starting Redis availability check at {host}:{port}/{db}")
|
||||
|
||||
if wait_for_redis(host, port, db, max_retries, retry_interval):
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
Loading…
Add table
Add a link
Reference in a new issue