mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
merged in singular-redis branch
This commit is contained in:
commit
cb013c0a0d
18 changed files with 945 additions and 302 deletions
|
|
@ -4,8 +4,10 @@ import re
|
|||
from . import proxy_server
|
||||
from .redis_keys import RedisKeys
|
||||
from .constants import TS_PACKET_SIZE
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
from .utils import get_logger
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
logger = get_logger()
|
||||
|
||||
class ChannelStatus:
|
||||
|
||||
|
|
@ -172,76 +174,98 @@ class ChannelStatus:
|
|||
|
||||
return info
|
||||
|
||||
# Function for basic channel info (used for all channels summary)
|
||||
def get_basic_channel_info(channel_id):
|
||||
# Get channel metadata
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
metadata = proxy_server.redis_client.hgetall(metadata_key)
|
||||
|
||||
if not metadata:
|
||||
@staticmethod
|
||||
def _execute_redis_command(command_func):
|
||||
"""Execute Redis command with error handling"""
|
||||
if not proxy_server.redis_client:
|
||||
return None
|
||||
|
||||
# Basic channel info only - omit diagnostics and details
|
||||
buffer_index_key = RedisKeys.buffer_index(channel_id)
|
||||
buffer_index_value = proxy_server.redis_client.get(buffer_index_key)
|
||||
try:
|
||||
return command_func()
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection error in ChannelStatus: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error in ChannelStatus: {e}")
|
||||
return None
|
||||
|
||||
# Count clients (using efficient count method)
|
||||
client_set_key = RedisKeys.clients(channel_id)
|
||||
client_count = proxy_server.redis_client.scard(client_set_key) or 0
|
||||
@staticmethod
|
||||
def get_basic_channel_info(channel_id):
|
||||
"""Get basic channel information with Redis error handling"""
|
||||
try:
|
||||
# Use _execute_redis_command for Redis operations
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
metadata = ChannelStatus._execute_redis_command(
|
||||
lambda: proxy_server.redis_client.hgetall(metadata_key)
|
||||
)
|
||||
|
||||
# Calculate uptime
|
||||
created_at = float(metadata.get(b'init_time', b'0').decode('utf-8'))
|
||||
uptime = time.time() - created_at if created_at > 0 else 0
|
||||
if not metadata:
|
||||
return None
|
||||
|
||||
# Simplified info
|
||||
info = {
|
||||
'channel_id': channel_id,
|
||||
'state': metadata.get(b'state', b'unknown').decode('utf-8'),
|
||||
'url': metadata.get(b'url', b'').decode('utf-8'),
|
||||
'profile': metadata.get(b'profile', b'unknown').decode('utf-8'),
|
||||
'owner': metadata.get(b'owner', b'unknown').decode('utf-8'),
|
||||
'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0,
|
||||
'client_count': client_count,
|
||||
'uptime': uptime
|
||||
}
|
||||
# Basic channel info only - omit diagnostics and details
|
||||
buffer_index_key = RedisKeys.buffer_index(channel_id)
|
||||
buffer_index_value = proxy_server.redis_client.get(buffer_index_key)
|
||||
|
||||
# Quick health check if available locally
|
||||
if channel_id in proxy_server.stream_managers:
|
||||
manager = proxy_server.stream_managers[channel_id]
|
||||
info['healthy'] = manager.healthy
|
||||
# Count clients (using efficient count method)
|
||||
client_set_key = RedisKeys.clients(channel_id)
|
||||
client_count = proxy_server.redis_client.scard(client_set_key) or 0
|
||||
|
||||
# Get concise client information
|
||||
clients = []
|
||||
client_ids = proxy_server.redis_client.smembers(client_set_key)
|
||||
# Calculate uptime
|
||||
created_at = float(metadata.get(b'init_time', b'0').decode('utf-8'))
|
||||
uptime = time.time() - created_at if created_at > 0 else 0
|
||||
|
||||
# Process only if we have clients and keep it limited
|
||||
if client_ids:
|
||||
# Get up to 10 clients for the basic view
|
||||
for client_id in list(client_ids)[:10]:
|
||||
client_id_str = client_id.decode('utf-8')
|
||||
client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}"
|
||||
# Simplified info
|
||||
info = {
|
||||
'channel_id': channel_id,
|
||||
'state': metadata.get(b'state', b'unknown').decode('utf-8'),
|
||||
'url': metadata.get(b'url', b'').decode('utf-8'),
|
||||
'profile': metadata.get(b'profile', b'unknown').decode('utf-8'),
|
||||
'owner': metadata.get(b'owner', b'unknown').decode('utf-8'),
|
||||
'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0,
|
||||
'client_count': client_count,
|
||||
'uptime': uptime
|
||||
}
|
||||
|
||||
# Efficient way - just retrieve the essentials
|
||||
client_info = {
|
||||
'client_id': client_id_str,
|
||||
'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'),
|
||||
'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'),
|
||||
}
|
||||
# Quick health check if available locally
|
||||
if channel_id in proxy_server.stream_managers:
|
||||
manager = proxy_server.stream_managers[channel_id]
|
||||
info['healthy'] = manager.healthy
|
||||
|
||||
if client_info['user_agent']:
|
||||
client_info['user_agent'] = client_info['user_agent'].decode('utf-8')
|
||||
else:
|
||||
client_info['user_agent'] = 'unknown'
|
||||
# Get concise client information
|
||||
clients = []
|
||||
client_ids = proxy_server.redis_client.smembers(client_set_key)
|
||||
|
||||
# Just get connected_at for client age
|
||||
connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at')
|
||||
if connected_at_bytes:
|
||||
connected_at = float(connected_at_bytes.decode('utf-8'))
|
||||
client_info['connected_since'] = time.time() - connected_at
|
||||
# Process only if we have clients and keep it limited
|
||||
if client_ids:
|
||||
# Get up to 10 clients for the basic view
|
||||
for client_id in list(client_ids)[:10]:
|
||||
client_id_str = client_id.decode('utf-8')
|
||||
client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}"
|
||||
|
||||
clients.append(client_info)
|
||||
# Efficient way - just retrieve the essentials
|
||||
client_info = {
|
||||
'client_id': client_id_str,
|
||||
'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'),
|
||||
'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'),
|
||||
}
|
||||
|
||||
# Add clients to info
|
||||
info['clients'] = clients
|
||||
if client_info['user_agent']:
|
||||
client_info['user_agent'] = client_info['user_agent'].decode('utf-8')
|
||||
else:
|
||||
client_info['user_agent'] = 'unknown'
|
||||
|
||||
return info
|
||||
# Just get connected_at for client age
|
||||
connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at')
|
||||
if connected_at_bytes:
|
||||
connected_at = float(connected_at_bytes.decode('utf-8'))
|
||||
client_info['connected_since'] = time.time() - connected_at
|
||||
|
||||
clients.append(client_info)
|
||||
|
||||
# Add clients to info
|
||||
info['clients'] = clients
|
||||
|
||||
return info
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting channel info: {e}")
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -6,11 +6,13 @@ import time
|
|||
import json
|
||||
from typing import Set, Optional
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
from .constants import EventType
|
||||
from .config_helper import ConfigHelper
|
||||
from .redis_keys import RedisKeys
|
||||
from .utils import get_logger
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
logger = get_logger()
|
||||
|
||||
class ClientManager:
|
||||
"""Manages client connections with no duplicates"""
|
||||
|
|
@ -120,6 +122,20 @@ class ClientManager:
|
|||
thread.start()
|
||||
logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
|
||||
|
||||
def _execute_redis_command(self, command_func):
|
||||
"""Execute Redis command with error handling"""
|
||||
if not self.redis_client:
|
||||
return None
|
||||
|
||||
try:
|
||||
return command_func()
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection error in ClientManager: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error in ClientManager: {e}")
|
||||
return None
|
||||
|
||||
def _notify_owner_of_activity(self):
|
||||
"""Notify channel owner that clients are active on this worker"""
|
||||
if not self.redis_client or not self.clients:
|
||||
|
|
@ -130,11 +146,15 @@ class ClientManager:
|
|||
|
||||
# STANDARDIZED KEY: Worker info under channel namespace
|
||||
worker_key = f"ts_proxy:channel:{self.channel_id}:worker:{worker_id}"
|
||||
self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients)))
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients)))
|
||||
)
|
||||
|
||||
# STANDARDIZED KEY: Activity timestamp under channel namespace
|
||||
activity_key = f"ts_proxy:channel:{self.channel_id}:activity"
|
||||
self.redis_client.setex(activity_key, self.client_ttl, str(time.time()))
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.setex(activity_key, self.client_ttl, str(time.time()))
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error notifying owner of client activity: {e}")
|
||||
|
||||
|
|
|
|||
|
|
@ -2,11 +2,8 @@
|
|||
Helper module to access configuration values with proper defaults.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
|
||||
class ConfigHelper:
|
||||
"""
|
||||
Helper class for accessing configuration values with sensible defaults.
|
||||
|
|
|
|||
|
|
@ -83,3 +83,8 @@ class RedisKeys:
|
|||
def transcode_active(channel_id):
|
||||
"""Key indicating active transcode process"""
|
||||
return f"ts_proxy:channel:{channel_id}:transcode_active"
|
||||
|
||||
@staticmethod
|
||||
def client_metadata(channel_id, client_id):
|
||||
"""Key for client metadata hash"""
|
||||
return f"ts_proxy:channel:{channel_id}:clients:{client_id}"
|
||||
|
|
|
|||
|
|
@ -18,14 +18,17 @@ import json
|
|||
from typing import Dict, Optional, Set
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from apps.channels.models import Channel
|
||||
from core.utils import redis_client as global_redis_client, redis_pubsub_client as global_redis_pubsub_client # Import both global Redis clients
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
from .stream_manager import StreamManager
|
||||
from .stream_buffer import StreamBuffer
|
||||
from .client_manager import ClientManager
|
||||
from .redis_keys import RedisKeys
|
||||
from .constants import ChannelState, EventType, StreamType
|
||||
from .config_helper import ConfigHelper
|
||||
from .utils import get_logger
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
logger = get_logger()
|
||||
|
||||
class ProxyServer:
|
||||
"""Manages TS proxy server instance with worker coordination"""
|
||||
|
|
@ -43,19 +46,25 @@ class ProxyServer:
|
|||
hostname = socket.gethostname()
|
||||
self.worker_id = f"{hostname}:{pid}"
|
||||
|
||||
# Connect to Redis
|
||||
# Connect to Redis - try using global client first
|
||||
self.redis_client = None
|
||||
try:
|
||||
import redis
|
||||
from django.conf import settings
|
||||
self.redis_connection_attempts = 0
|
||||
self.redis_max_retries = 3
|
||||
self.redis_retry_interval = 5 # seconds
|
||||
|
||||
try:
|
||||
# First try to use the global client from core.utils
|
||||
if global_redis_client is not None:
|
||||
self.redis_client = global_redis_client
|
||||
logger.info(f"Using global Redis client")
|
||||
logger.info(f"Worker ID: {self.worker_id}")
|
||||
else:
|
||||
# Fall back to direct connection with retry
|
||||
self._setup_redis_connection()
|
||||
|
||||
redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0')
|
||||
self.redis_client = redis.from_url(redis_url)
|
||||
logger.info(f"Connected to Redis at {redis_url}")
|
||||
logger.info(f"Worker ID: {self.worker_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize Redis: {e}")
|
||||
self.redis_client = None
|
||||
logger.error(f"Failed to connect to Redis: {e}")
|
||||
|
||||
# Start cleanup thread
|
||||
self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60)
|
||||
|
|
@ -64,179 +73,306 @@ class ProxyServer:
|
|||
# Start event listener for Redis pubsub messages
|
||||
self._start_event_listener()
|
||||
|
||||
def _setup_redis_connection(self):
|
||||
"""Setup Redis connection with retry logic"""
|
||||
import redis
|
||||
from django.conf import settings
|
||||
|
||||
while self.redis_connection_attempts < self.redis_max_retries:
|
||||
try:
|
||||
logger.info(f"Attempting to connect to Redis ({self.redis_connection_attempts+1}/{self.redis_max_retries})")
|
||||
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
# Create Redis client with reasonable timeouts
|
||||
self.redis_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=5,
|
||||
socket_connect_timeout=5,
|
||||
retry_on_timeout=True,
|
||||
health_check_interval=30
|
||||
)
|
||||
|
||||
# Test connection
|
||||
self.redis_client.ping()
|
||||
logger.info(f"Successfully connected to Redis at {redis_host}:{redis_port}/{redis_db}")
|
||||
logger.info(f"Worker ID: {self.worker_id}")
|
||||
break
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
self.redis_connection_attempts += 1
|
||||
if self.redis_connection_attempts >= self.redis_max_retries:
|
||||
logger.error(f"Failed to connect to Redis after {self.redis_max_retries} attempts: {e}")
|
||||
self.redis_client = None
|
||||
else:
|
||||
# Exponential backoff with a maximum of 30 seconds
|
||||
retry_delay = min(self.redis_retry_interval * (2 ** (self.redis_connection_attempts - 1)), 30)
|
||||
logger.warning(f"Redis connection failed. Retrying in {retry_delay}s... ({self.redis_connection_attempts}/{self.redis_max_retries})")
|
||||
time.sleep(retry_delay)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis: {e}", exc_info=True)
|
||||
self.redis_client = None
|
||||
break
|
||||
|
||||
def _execute_redis_command(self, command_func, *args, **kwargs):
|
||||
"""Execute Redis command with error handling and reconnection logic"""
|
||||
if not self.redis_client:
|
||||
return None
|
||||
|
||||
try:
|
||||
return command_func(*args, **kwargs)
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection lost: {e}. Attempting to reconnect...")
|
||||
try:
|
||||
# Try to reconnect
|
||||
self.redis_connection_attempts = 0
|
||||
self._setup_redis_connection()
|
||||
if self.redis_client:
|
||||
# Retry the command once
|
||||
return command_func(*args, **kwargs)
|
||||
except Exception as reconnect_error:
|
||||
logger.error(f"Failed to reconnect to Redis: {reconnect_error}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error: {e}")
|
||||
return None
|
||||
|
||||
def _start_event_listener(self):
|
||||
"""Listen for events from other workers"""
|
||||
if not self.redis_client:
|
||||
return
|
||||
|
||||
def event_listener():
|
||||
try:
|
||||
pubsub = self.redis_client.pubsub()
|
||||
pubsub.psubscribe("ts_proxy:events:*")
|
||||
retry_count = 0
|
||||
max_retries = 10
|
||||
base_retry_delay = 1 # Start with 1 second delay
|
||||
max_retry_delay = 30 # Cap at 30 seconds
|
||||
|
||||
logger.info(f"Started Redis event listener for client activity")
|
||||
while True:
|
||||
try:
|
||||
# Use the global PubSub client if available
|
||||
if global_redis_pubsub_client:
|
||||
pubsub_client = global_redis_pubsub_client
|
||||
logger.info("Using global Redis PubSub client for event listener")
|
||||
else:
|
||||
# Fall back to creating a dedicated client if global one is unavailable
|
||||
from django.conf import settings
|
||||
import redis
|
||||
|
||||
for message in pubsub.listen():
|
||||
if message["type"] != "pmessage":
|
||||
continue
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
try:
|
||||
channel = message["channel"].decode("utf-8")
|
||||
data = json.loads(message["data"].decode("utf-8"))
|
||||
pubsub_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=60,
|
||||
socket_connect_timeout=10,
|
||||
socket_keepalive=True,
|
||||
health_check_interval=30
|
||||
)
|
||||
logger.info("Created dedicated Redis PubSub client for event listener")
|
||||
|
||||
event_type = data.get("event")
|
||||
channel_id = data.get("channel_id")
|
||||
# Test connection before subscribing
|
||||
pubsub_client.ping()
|
||||
|
||||
if channel_id and event_type:
|
||||
# For owner, update client status immediately
|
||||
if self.am_i_owner(channel_id):
|
||||
if event_type == EventType.CLIENT_CONNECTED:
|
||||
logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}")
|
||||
# Reset any disconnect timer
|
||||
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
||||
self.redis_client.delete(disconnect_key)
|
||||
# Create a pubsub instance from the client
|
||||
pubsub = pubsub_client.pubsub()
|
||||
pubsub.psubscribe("ts_proxy:events:*")
|
||||
|
||||
elif event_type == EventType.CLIENT_DISCONNECTED:
|
||||
logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}")
|
||||
# Check if any clients remain
|
||||
if channel_id in self.client_managers:
|
||||
# VERIFY REDIS CLIENT COUNT DIRECTLY
|
||||
client_set_key = RedisKeys.clients(channel_id)
|
||||
total = self.redis_client.scard(client_set_key) or 0
|
||||
logger.info(f"Started Redis event listener for client activity")
|
||||
|
||||
if total == 0:
|
||||
logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}")
|
||||
# Set the disconnect timer for other workers to see
|
||||
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
||||
self.redis_client.setex(disconnect_key, 60, str(time.time()))
|
||||
# Reset retry count on successful connection
|
||||
retry_count = 0
|
||||
|
||||
# Get configured shutdown delay or default
|
||||
shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0)
|
||||
for message in pubsub.listen():
|
||||
if message["type"] != "pmessage":
|
||||
continue
|
||||
|
||||
if shutdown_delay > 0:
|
||||
logger.info(f"Waiting {shutdown_delay}s before stopping channel...")
|
||||
time.sleep(shutdown_delay)
|
||||
try:
|
||||
channel = message["channel"].decode("utf-8")
|
||||
data = json.loads(message["data"].decode("utf-8"))
|
||||
|
||||
# Re-check client count before stopping
|
||||
total = self.redis_client.scard(client_set_key) or 0
|
||||
if total > 0:
|
||||
logger.info(f"New clients connected during shutdown delay - aborting shutdown")
|
||||
self.redis_client.delete(disconnect_key)
|
||||
return
|
||||
event_type = data.get("event")
|
||||
channel_id = data.get("channel_id")
|
||||
|
||||
# Stop the channel directly
|
||||
if channel_id and event_type:
|
||||
# For owner, update client status immediately
|
||||
if self.am_i_owner(channel_id):
|
||||
if event_type == EventType.CLIENT_CONNECTED:
|
||||
logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}")
|
||||
# Reset any disconnect timer
|
||||
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
||||
self.redis_client.delete(disconnect_key)
|
||||
|
||||
elif event_type == EventType.CLIENT_DISCONNECTED:
|
||||
logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}")
|
||||
# Check if any clients remain
|
||||
if channel_id in self.client_managers:
|
||||
# VERIFY REDIS CLIENT COUNT DIRECTLY
|
||||
client_set_key = RedisKeys.clients(channel_id)
|
||||
total = self.redis_client.scard(client_set_key) or 0
|
||||
|
||||
if total == 0:
|
||||
logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}")
|
||||
# Set the disconnect timer for other workers to see
|
||||
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
||||
self.redis_client.setex(disconnect_key, 60, str(time.time()))
|
||||
|
||||
# Get configured shutdown delay or default
|
||||
shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0)
|
||||
|
||||
if shutdown_delay > 0:
|
||||
logger.info(f"Waiting {shutdown_delay}s before stopping channel...")
|
||||
time.sleep(shutdown_delay)
|
||||
|
||||
# Re-check client count before stopping
|
||||
total = self.redis_client.scard(client_set_key) or 0
|
||||
if total > 0:
|
||||
logger.info(f"New clients connected during shutdown delay - aborting shutdown")
|
||||
self.redis_client.delete(disconnect_key)
|
||||
return
|
||||
|
||||
# Stop the channel directly
|
||||
self.stop_channel(channel_id)
|
||||
|
||||
|
||||
elif event_type == EventType.STREAM_SWITCH:
|
||||
logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}")
|
||||
# Handle stream switch request
|
||||
new_url = data.get("url")
|
||||
user_agent = data.get("user_agent")
|
||||
|
||||
if new_url and channel_id in self.stream_managers:
|
||||
# Update metadata in Redis
|
||||
if self.redis_client:
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
self.redis_client.hset(metadata_key, "url", new_url)
|
||||
if user_agent:
|
||||
self.redis_client.hset(metadata_key, "user_agent", user_agent)
|
||||
|
||||
# Set switch status
|
||||
status_key = RedisKeys.switch_status(channel_id)
|
||||
self.redis_client.set(status_key, "switching")
|
||||
|
||||
# Perform the stream switch
|
||||
stream_manager = self.stream_managers[channel_id]
|
||||
success = stream_manager.update_url(new_url)
|
||||
|
||||
if success:
|
||||
logger.info(f"Stream switch initiated for channel {channel_id}")
|
||||
|
||||
# Publish confirmation
|
||||
switch_result = {
|
||||
"event": EventType.STREAM_SWITCHED, # Use constant instead of string
|
||||
"channel_id": channel_id,
|
||||
"success": True,
|
||||
"url": new_url,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_result)
|
||||
)
|
||||
|
||||
# Update status
|
||||
if self.redis_client:
|
||||
self.redis_client.set(status_key, "switched")
|
||||
else:
|
||||
logger.error(f"Failed to switch stream for channel {channel_id}")
|
||||
|
||||
# Publish failure
|
||||
switch_result = {
|
||||
"event": EventType.STREAM_SWITCHED,
|
||||
"channel_id": channel_id,
|
||||
"success": False,
|
||||
"url": new_url,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_result)
|
||||
)
|
||||
elif event_type == EventType.CHANNEL_STOP:
|
||||
logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}")
|
||||
# First mark channel as stopping in Redis
|
||||
if self.redis_client:
|
||||
# Set stopping state in metadata
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
if self.redis_client.exists(metadata_key):
|
||||
self.redis_client.hset(metadata_key, mapping={
|
||||
"state": ChannelState.STOPPING,
|
||||
"state_changed_at": str(time.time())
|
||||
})
|
||||
|
||||
# If we have local resources for this channel, clean them up
|
||||
if channel_id in self.stream_buffers or channel_id in self.client_managers:
|
||||
# Use existing stop_channel method
|
||||
logger.info(f"Stopping local resources for channel {channel_id}")
|
||||
self.stop_channel(channel_id)
|
||||
|
||||
# Acknowledge stop by publishing a response
|
||||
stop_response = {
|
||||
"event": EventType.CHANNEL_STOPPED,
|
||||
"channel_id": channel_id,
|
||||
"worker_id": self.worker_id,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(stop_response)
|
||||
)
|
||||
elif event_type == EventType.CLIENT_STOP:
|
||||
client_id = data.get("client_id")
|
||||
if client_id and channel_id:
|
||||
logger.info(f"Received request to stop client {client_id} on channel {channel_id}")
|
||||
|
||||
elif event_type == EventType.STREAM_SWITCH:
|
||||
logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}")
|
||||
# Handle stream switch request
|
||||
new_url = data.get("url")
|
||||
user_agent = data.get("user_agent")
|
||||
# Both remove from client manager AND set a key for the generator to detect
|
||||
if channel_id in self.client_managers:
|
||||
client_manager = self.client_managers[channel_id]
|
||||
if client_id in client_manager.clients:
|
||||
client_manager.remove_client(client_id)
|
||||
logger.info(f"Removed client {client_id} from client manager")
|
||||
|
||||
if new_url and channel_id in self.stream_managers:
|
||||
# Update metadata in Redis
|
||||
if self.redis_client:
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
self.redis_client.hset(metadata_key, "url", new_url)
|
||||
if user_agent:
|
||||
self.redis_client.hset(metadata_key, "user_agent", user_agent)
|
||||
|
||||
# Set switch status
|
||||
status_key = RedisKeys.switch_status(channel_id)
|
||||
self.redis_client.set(status_key, "switching")
|
||||
|
||||
# Perform the stream switch
|
||||
stream_manager = self.stream_managers[channel_id]
|
||||
success = stream_manager.update_url(new_url)
|
||||
|
||||
if success:
|
||||
logger.info(f"Stream switch initiated for channel {channel_id}")
|
||||
|
||||
# Publish confirmation
|
||||
switch_result = {
|
||||
"event": EventType.STREAM_SWITCHED, # Use constant instead of string
|
||||
"channel_id": channel_id,
|
||||
"success": True,
|
||||
"url": new_url,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_result)
|
||||
)
|
||||
|
||||
# Update status
|
||||
# Set a Redis key for the generator to detect
|
||||
if self.redis_client:
|
||||
self.redis_client.set(status_key, "switched")
|
||||
else:
|
||||
logger.error(f"Failed to switch stream for channel {channel_id}")
|
||||
stop_key = RedisKeys.client_stop(channel_id, client_id)
|
||||
self.redis_client.setex(stop_key, 30, "true") # 30 second TTL
|
||||
logger.info(f"Set stop key for client {client_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing event message: {e}")
|
||||
|
||||
# Publish failure
|
||||
switch_result = {
|
||||
"event": EventType.STREAM_SWITCHED,
|
||||
"channel_id": channel_id,
|
||||
"success": False,
|
||||
"url": new_url,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_result)
|
||||
)
|
||||
elif event_type == EventType.CHANNEL_STOP:
|
||||
logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}")
|
||||
# First mark channel as stopping in Redis
|
||||
if self.redis_client:
|
||||
# Set stopping state in metadata
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
if self.redis_client.exists(metadata_key):
|
||||
self.redis_client.hset(metadata_key, mapping={
|
||||
"state": ChannelState.STOPPING,
|
||||
"state_changed_at": str(time.time())
|
||||
})
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
# Calculate exponential backoff with jitter
|
||||
retry_count += 1
|
||||
delay = min(base_retry_delay * (2 ** (retry_count - 1)), max_retry_delay)
|
||||
# Add some randomness to prevent thundering herd
|
||||
jitter = random.uniform(0, 0.5 * delay)
|
||||
final_delay = delay + jitter
|
||||
|
||||
# If we have local resources for this channel, clean them up
|
||||
if channel_id in self.stream_buffers or channel_id in self.client_managers:
|
||||
# Use existing stop_channel method
|
||||
logger.info(f"Stopping local resources for channel {channel_id}")
|
||||
self.stop_channel(channel_id)
|
||||
logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})")
|
||||
time.sleep(final_delay)
|
||||
|
||||
# Acknowledge stop by publishing a response
|
||||
stop_response = {
|
||||
"event": EventType.CHANNEL_STOPPED,
|
||||
"channel_id": channel_id,
|
||||
"worker_id": self.worker_id,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
self.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(stop_response)
|
||||
)
|
||||
elif event_type == EventType.CLIENT_STOP:
|
||||
client_id = data.get("client_id")
|
||||
if client_id and channel_id:
|
||||
logger.info(f"Received request to stop client {client_id} on channel {channel_id}")
|
||||
# Try to clean up the old connection
|
||||
try:
|
||||
if 'pubsub' in locals():
|
||||
pubsub.close()
|
||||
if 'pubsub_client' in locals():
|
||||
pubsub_client.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Both remove from client manager AND set a key for the generator to detect
|
||||
if channel_id in self.client_managers:
|
||||
client_manager = self.client_managers[channel_id]
|
||||
if client_id in client_manager.clients:
|
||||
client_manager.remove_client(client_id)
|
||||
logger.info(f"Removed client {client_id} from client manager")
|
||||
|
||||
# Set a Redis key for the generator to detect
|
||||
if self.redis_client:
|
||||
stop_key = RedisKeys.client_stop(channel_id, client_id)
|
||||
self.redis_client.setex(stop_key, 30, "true") # 30 second TTL
|
||||
logger.info(f"Set stop key for client {client_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing event message: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in event listener: {e}")
|
||||
time.sleep(5) # Wait before reconnecting
|
||||
# Try to restart the listener
|
||||
self._start_event_listener()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in event listener: {e}")
|
||||
# Add a short delay to prevent rapid retries on persistent errors
|
||||
time.sleep(5)
|
||||
|
||||
thread = threading.Thread(target=event_listener, daemon=True)
|
||||
thread.name = "redis-event-listener"
|
||||
|
|
@ -249,10 +385,9 @@ class ProxyServer:
|
|||
|
||||
try:
|
||||
lock_key = RedisKeys.channel_owner(channel_id)
|
||||
owner = self.redis_client.get(lock_key)
|
||||
if owner:
|
||||
return owner.decode('utf-8')
|
||||
return None
|
||||
return self._execute_redis_command(
|
||||
lambda: self.redis_client.get(lock_key).decode('utf-8') if self.redis_client.get(lock_key) else None
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting channel owner: {e}")
|
||||
return None
|
||||
|
|
@ -271,20 +406,32 @@ class ProxyServer:
|
|||
# Create a lock key with proper namespace
|
||||
lock_key = RedisKeys.channel_owner(channel_id)
|
||||
|
||||
# Use Redis SETNX for atomic locking - only succeeds if the key doesn't exist
|
||||
acquired = self.redis_client.setnx(lock_key, self.worker_id)
|
||||
# Use Redis SETNX for atomic locking with error handling
|
||||
acquired = self._execute_redis_command(
|
||||
lambda: self.redis_client.setnx(lock_key, self.worker_id)
|
||||
)
|
||||
|
||||
if acquired is None: # Redis command failed
|
||||
logger.warning(f"Redis command failed during ownership acquisition - assuming ownership")
|
||||
return True
|
||||
|
||||
# If acquired, set expiry to prevent orphaned locks
|
||||
if acquired:
|
||||
self.redis_client.expire(lock_key, ttl)
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.expire(lock_key, ttl)
|
||||
)
|
||||
logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}")
|
||||
return True
|
||||
|
||||
# If not acquired, check if we already own it (might be a retry)
|
||||
current_owner = self.redis_client.get(lock_key)
|
||||
current_owner = self._execute_redis_command(
|
||||
lambda: self.redis_client.get(lock_key)
|
||||
)
|
||||
if current_owner and current_owner.decode('utf-8') == self.worker_id:
|
||||
# Refresh TTL
|
||||
self.redis_client.expire(lock_key, ttl)
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.expire(lock_key, ttl)
|
||||
)
|
||||
logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}")
|
||||
return True
|
||||
|
||||
|
|
@ -689,7 +836,7 @@ class ProxyServer:
|
|||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping channel {channel_id}: {e}", exc_info=True)
|
||||
logger.error(f"Error stopping channel {channel_id}: {e}")
|
||||
return False
|
||||
|
||||
def check_inactive_channels(self):
|
||||
|
|
@ -722,14 +869,10 @@ class ProxyServer:
|
|||
try:
|
||||
# Send worker heartbeat first
|
||||
if self.redis_client:
|
||||
while True:
|
||||
try:
|
||||
worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat"
|
||||
self.redis_client.setex(worker_heartbeat_key, 30, str(time.time()))
|
||||
break
|
||||
except:
|
||||
logger.debug("Waiting for redis connection...")
|
||||
time.sleep(1)
|
||||
worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat"
|
||||
self._execute_redis_command(
|
||||
lambda: self.redis_client.setex(worker_heartbeat_key, 30, str(time.time()))
|
||||
)
|
||||
|
||||
# Refresh channel registry
|
||||
self.refresh_channel_registry()
|
||||
|
|
|
|||
|
|
@ -10,8 +10,9 @@ from apps.proxy.config import TSConfig as Config
|
|||
from .redis_keys import RedisKeys
|
||||
from .config_helper import ConfigHelper
|
||||
from .constants import TS_PACKET_SIZE
|
||||
from .utils import get_logger
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
logger = get_logger()
|
||||
|
||||
class StreamBuffer:
|
||||
"""Manages stream data buffering with optimized chunk storage"""
|
||||
|
|
|
|||
|
|
@ -8,10 +8,11 @@ import logging
|
|||
import threading
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from . import proxy_server
|
||||
from .utils import create_ts_packet
|
||||
from .utils import create_ts_packet, get_logger
|
||||
from .redis_keys import RedisKeys
|
||||
from .utils import get_logger
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
logger = get_logger()
|
||||
|
||||
class StreamGenerator:
|
||||
"""
|
||||
|
|
@ -162,11 +163,6 @@ class StreamGenerator:
|
|||
|
||||
def _stream_data_generator(self):
|
||||
"""Generate stream data chunks based on buffer contents."""
|
||||
bytes_sent = 0
|
||||
chunks_sent = 0
|
||||
stream_start_time = time.time()
|
||||
local_index = self.local_index
|
||||
|
||||
# Main streaming loop
|
||||
while True:
|
||||
# Check if resources still exist
|
||||
|
|
@ -174,12 +170,11 @@ class StreamGenerator:
|
|||
break
|
||||
|
||||
# Get chunks at client's position using improved strategy
|
||||
chunks, next_index = self.buffer.get_optimized_client_data(local_index)
|
||||
chunks, next_index = self.buffer.get_optimized_client_data(self.local_index)
|
||||
|
||||
if chunks:
|
||||
yield from self._process_chunks(chunks, next_index, bytes_sent, chunks_sent, stream_start_time)
|
||||
local_index = next_index
|
||||
self.local_index = local_index
|
||||
yield from self._process_chunks(chunks, next_index)
|
||||
self.local_index = next_index
|
||||
self.last_yield_time = time.time()
|
||||
self.empty_reads = 0
|
||||
self.consecutive_empty = 0
|
||||
|
|
@ -188,11 +183,11 @@ class StreamGenerator:
|
|||
self.empty_reads += 1
|
||||
self.consecutive_empty += 1
|
||||
|
||||
if self._should_send_keepalive(local_index):
|
||||
if self._should_send_keepalive(self.local_index):
|
||||
keepalive_packet = create_ts_packet('keepalive')
|
||||
logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head")
|
||||
yield keepalive_packet
|
||||
bytes_sent += len(keepalive_packet)
|
||||
self.bytes_sent += len(keepalive_packet)
|
||||
self.last_yield_time = time.time()
|
||||
self.consecutive_empty = 0 # Reset consecutive counter but keep total empty_reads
|
||||
time.sleep(Config.KEEPALIVE_INTERVAL)
|
||||
|
|
@ -204,11 +199,11 @@ class StreamGenerator:
|
|||
# Log empty reads periodically
|
||||
if self.empty_reads % 50 == 0:
|
||||
stream_status = "healthy" if (self.stream_manager and self.stream_manager.healthy) else "unknown"
|
||||
logger.debug(f"[{self.client_id}] Waiting for chunks beyond {local_index} (buffer at {self.buffer.index}, stream: {stream_status})")
|
||||
logger.debug(f"[{self.client_id}] Waiting for chunks beyond {self.local_index} (buffer at {self.buffer.index}, stream: {stream_status})")
|
||||
|
||||
# Check for ghost clients
|
||||
if self._is_ghost_client(local_index):
|
||||
logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - local_index} chunks ahead but client stuck at {local_index}")
|
||||
if self._is_ghost_client(self.local_index):
|
||||
logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - self.local_index} chunks ahead but client stuck at {self.local_index}")
|
||||
break
|
||||
|
||||
# Check for timeouts
|
||||
|
|
@ -258,7 +253,7 @@ class StreamGenerator:
|
|||
|
||||
return True
|
||||
|
||||
def _process_chunks(self, chunks, next_index, bytes_sent, chunks_sent, stream_start_time):
|
||||
def _process_chunks(self, chunks, next_index):
|
||||
"""Process and yield chunks to the client."""
|
||||
# Process and send chunks
|
||||
total_size = sum(len(c) for c in chunks)
|
||||
|
|
@ -268,20 +263,34 @@ class StreamGenerator:
|
|||
for chunk in chunks:
|
||||
try:
|
||||
yield chunk
|
||||
bytes_sent += len(chunk)
|
||||
chunks_sent += 1
|
||||
self.bytes_sent += len(chunk)
|
||||
self.chunks_sent += 1
|
||||
logger.debug(f"[{self.client_id}] Sent chunk {self.chunks_sent} ({len(chunk)} bytes) to client")
|
||||
# Log every 10 chunks and store in redis for visibility
|
||||
if self.chunks_sent % 10 == 0:
|
||||
elapsed = time.time() - self.stream_start_time
|
||||
rate = self.bytes_sent / elapsed / 1024 if elapsed > 0 else 0
|
||||
logger.debug(f"[{self.client_id}] Stats: {self.chunks_sent} chunks, {self.bytes_sent/1024:.1f} KB, {rate:.1f} KB/s")
|
||||
|
||||
# Store stats in Redis client metadata
|
||||
if proxy_server.redis_client:
|
||||
try:
|
||||
client_key = RedisKeys.client_metadata(self.channel_id, self.client_id)
|
||||
stats = {
|
||||
"chunks_sent": str(self.chunks_sent),
|
||||
"bytes_sent": str(self.bytes_sent),
|
||||
"transfer_rate_KBps": str(round(rate, 1)),
|
||||
"stats_updated_at": str(time.time())
|
||||
}
|
||||
proxy_server.redis_client.hset(client_key, mapping=stats)
|
||||
# No need to set expiration as client heartbeat will refresh this key
|
||||
except Exception as e:
|
||||
logger.warning(f"[{self.client_id}] Failed to store stats in Redis: {e}")
|
||||
|
||||
# Log every 100 chunks for visibility
|
||||
if chunks_sent % 100 == 0:
|
||||
elapsed = time.time() - stream_start_time
|
||||
rate = bytes_sent / elapsed / 1024 if elapsed > 0 else 0
|
||||
logger.info(f"[{self.client_id}] Stats: {chunks_sent} chunks, {bytes_sent/1024:.1f}KB, {rate:.1f}KB/s")
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.client_id}] Error sending chunk to client: {e}")
|
||||
raise # Re-raise to exit the generator
|
||||
|
||||
return bytes_sent, chunks_sent
|
||||
|
||||
def _should_send_keepalive(self, local_index):
|
||||
"""Determine if a keepalive packet should be sent."""
|
||||
# Check if we're caught up to buffer head
|
||||
|
|
|
|||
|
|
@ -13,13 +13,13 @@ from apps.channels.models import Channel, Stream
|
|||
from apps.m3u.models import M3UAccount, M3UAccountProfile
|
||||
from core.models import UserAgent, CoreSettings
|
||||
from .stream_buffer import StreamBuffer
|
||||
from .utils import detect_stream_type
|
||||
from .utils import detect_stream_type, get_logger
|
||||
from .redis_keys import RedisKeys
|
||||
from .constants import ChannelState, EventType, StreamType, TS_PACKET_SIZE
|
||||
from .config_helper import ConfigHelper
|
||||
from .url_utils import get_alternate_streams, get_stream_info_for_switch
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
logger = get_logger()
|
||||
|
||||
class StreamManager:
|
||||
"""Manages a connection to a TS stream without using raw sockets"""
|
||||
|
|
|
|||
|
|
@ -9,8 +9,9 @@ from django.shortcuts import get_object_or_404
|
|||
from apps.channels.models import Channel, Stream
|
||||
from apps.m3u.models import M3UAccount, M3UAccountProfile
|
||||
from core.models import UserAgent, CoreSettings
|
||||
from .utils import get_logger
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
logger = get_logger()
|
||||
|
||||
def generate_stream_url(channel_id: str) -> Tuple[str, str, bool]:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import logging
|
||||
import re
|
||||
from urllib.parse import urlparse
|
||||
import inspect
|
||||
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
|
||||
|
|
@ -77,4 +78,31 @@ def create_ts_packet(packet_type='null', message=None):
|
|||
msg_bytes = message.encode('utf-8')
|
||||
packet[4:4+min(len(msg_bytes), 180)] = msg_bytes[:180]
|
||||
|
||||
return bytes(packet)
|
||||
return bytes(packet)
|
||||
|
||||
def get_logger(component_name=None):
|
||||
"""
|
||||
Get a standardized logger with ts_proxy prefix and optional component name.
|
||||
|
||||
Args:
|
||||
component_name (str, optional): Name of the component. If not provided,
|
||||
will try to detect from the calling module.
|
||||
|
||||
Returns:
|
||||
logging.Logger: A configured logger with standardized naming.
|
||||
"""
|
||||
if component_name:
|
||||
logger_name = f"ts_proxy.{component_name}"
|
||||
else:
|
||||
# Try to get the calling module name if not explicitly specified
|
||||
frame = inspect.currentframe().f_back
|
||||
module = inspect.getmodule(frame)
|
||||
if module:
|
||||
# Extract just the filename without extension
|
||||
module_name = module.__name__.split('.')[-1]
|
||||
logger_name = f"ts_proxy.{module_name}"
|
||||
else:
|
||||
# Default if detection fails
|
||||
logger_name = "ts_proxy"
|
||||
|
||||
return logging.getLogger(logger_name)
|
||||
|
|
@ -22,9 +22,9 @@ from .constants import ChannelState, EventType, StreamType
|
|||
from .config_helper import ConfigHelper
|
||||
from .services.channel_service import ChannelService
|
||||
from .url_utils import generate_stream_url, transform_url, get_stream_info_for_switch
|
||||
from .utils import get_logger
|
||||
|
||||
# Configure logging properly
|
||||
logger = logging.getLogger("ts_proxy")
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
@api_view(['GET'])
|
||||
|
|
|
|||
223
core/redis_pubsub.py
Normal file
223
core/redis_pubsub.py
Normal file
|
|
@ -0,0 +1,223 @@
|
|||
"""
|
||||
Redis PubSub utilities for maintaining long-lived connections.
|
||||
"""
|
||||
import threading
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
from redis import Redis
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RedisPubSubManager:
|
||||
"""
|
||||
A robust Redis PubSub manager that handles disconnections and reconnections.
|
||||
"""
|
||||
|
||||
def __init__(self, redis_client=None, auto_reconnect=True):
|
||||
"""
|
||||
Initialize the PubSub manager.
|
||||
|
||||
Args:
|
||||
redis_client: An existing Redis client to use
|
||||
auto_reconnect: Whether to automatically reconnect on failure
|
||||
"""
|
||||
from .utils import get_redis_client
|
||||
|
||||
self.redis_client = redis_client or get_redis_client()
|
||||
self.pubsub = None
|
||||
self.subscriptions = set()
|
||||
self.pattern_subscriptions = set()
|
||||
self.auto_reconnect = auto_reconnect
|
||||
self.running = True
|
||||
self.lock = threading.RLock()
|
||||
self.message_handlers = {} # Map of channels to handler functions
|
||||
self.message_thread = None
|
||||
|
||||
def subscribe(self, channel, handler=None):
|
||||
"""
|
||||
Subscribe to a channel.
|
||||
|
||||
Args:
|
||||
channel: The channel to subscribe to
|
||||
handler: Optional function to call when messages are received
|
||||
"""
|
||||
with self.lock:
|
||||
self.subscriptions.add(channel)
|
||||
if handler:
|
||||
self.message_handlers[channel] = handler
|
||||
|
||||
if self.pubsub:
|
||||
self.pubsub.subscribe(channel)
|
||||
logger.info(f"Subscribed to channel: {channel}")
|
||||
|
||||
def psubscribe(self, pattern, handler=None):
|
||||
"""
|
||||
Subscribe to a channel pattern.
|
||||
|
||||
Args:
|
||||
pattern: The pattern to subscribe to
|
||||
handler: Optional function to call when messages are received
|
||||
"""
|
||||
with self.lock:
|
||||
self.pattern_subscriptions.add(pattern)
|
||||
if handler:
|
||||
self.message_handlers[pattern] = handler
|
||||
|
||||
if self.pubsub:
|
||||
self.pubsub.psubscribe(pattern)
|
||||
logger.info(f"Subscribed to pattern: {pattern}")
|
||||
|
||||
def publish(self, channel, message):
|
||||
"""
|
||||
Publish a message to a channel.
|
||||
|
||||
Args:
|
||||
channel: The channel to publish to
|
||||
message: The message to publish (will be JSON-encoded if not a string)
|
||||
|
||||
Returns:
|
||||
Number of clients that received the message
|
||||
"""
|
||||
try:
|
||||
if not isinstance(message, str):
|
||||
message = json.dumps(message)
|
||||
return self.redis_client.publish(channel, message)
|
||||
except Exception as e:
|
||||
logger.error(f"Error publishing to {channel}: {e}")
|
||||
return 0
|
||||
|
||||
def start_listening(self):
|
||||
"""
|
||||
Start listening for messages in a background thread.
|
||||
"""
|
||||
if not self.message_thread:
|
||||
self._connect()
|
||||
self.message_thread = threading.Thread(
|
||||
target=self._listen_for_messages,
|
||||
daemon=True,
|
||||
name="redis-pubsub-listener"
|
||||
)
|
||||
self.message_thread.start()
|
||||
logger.info("Started Redis PubSub listener thread")
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop listening and clean up resources.
|
||||
"""
|
||||
self.running = False
|
||||
if self.pubsub:
|
||||
try:
|
||||
self.pubsub.close()
|
||||
except:
|
||||
pass
|
||||
self.pubsub = None
|
||||
|
||||
def _connect(self):
|
||||
"""
|
||||
Establish a new PubSub connection and subscribe to all channels.
|
||||
"""
|
||||
with self.lock:
|
||||
# Close any existing connection
|
||||
if self.pubsub:
|
||||
try:
|
||||
self.pubsub.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Create a new PubSub instance - critical: no timeout for subscribe operations
|
||||
# This prevents the connection from timing out while waiting for messages
|
||||
self.pubsub = self.redis_client.pubsub()
|
||||
|
||||
# Resubscribe to all channels
|
||||
if self.subscriptions:
|
||||
self.pubsub.subscribe(*self.subscriptions)
|
||||
logger.info(f"Resubscribed to channels: {self.subscriptions}")
|
||||
|
||||
# Resubscribe to all patterns
|
||||
if self.pattern_subscriptions:
|
||||
self.pubsub.psubscribe(*self.pattern_subscriptions)
|
||||
logger.info(f"Resubscribed to patterns: {self.pattern_subscriptions}")
|
||||
|
||||
def _listen_for_messages(self):
|
||||
"""
|
||||
Background thread that listens for messages and handles reconnections.
|
||||
"""
|
||||
consecutive_errors = 0
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
# Check if we need to connect
|
||||
if not self.pubsub:
|
||||
self._connect()
|
||||
|
||||
# Listen for messages with NO timeout - this is critical!
|
||||
message = self.pubsub.get_message(timeout=None)
|
||||
|
||||
if message:
|
||||
# Don't process subscription confirmation messages
|
||||
if message['type'] in ('subscribe', 'psubscribe'):
|
||||
continue
|
||||
|
||||
channel = message.get('channel')
|
||||
if channel:
|
||||
# Decode binary channel name if needed
|
||||
if isinstance(channel, bytes):
|
||||
channel = channel.decode('utf-8')
|
||||
|
||||
# Find and call the appropriate handler
|
||||
handler = self.message_handlers.get(channel)
|
||||
if handler:
|
||||
try:
|
||||
handler(message)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in message handler for {channel}: {e}")
|
||||
|
||||
# Reset error counter on success
|
||||
consecutive_errors = 0
|
||||
|
||||
# Small sleep to prevent excessive CPU usage
|
||||
time.sleep(0.01)
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
consecutive_errors += 1
|
||||
|
||||
if not self.auto_reconnect:
|
||||
logger.error(f"PubSub connection error and auto_reconnect is disabled: {e}")
|
||||
break
|
||||
|
||||
# Exponential backoff for reconnection attempts
|
||||
backoff = min(consecutive_errors * 0.5, 5)
|
||||
logger.warning(f"PubSub connection error, reconnecting in {backoff} seconds: {e}")
|
||||
time.sleep(backoff)
|
||||
|
||||
# Reconnect
|
||||
self._connect()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in PubSub listener: {e}")
|
||||
time.sleep(1) # Prevent tight loop in case of persistent errors
|
||||
|
||||
logger.info("PubSub listener thread stopping")
|
||||
|
||||
# Create a singleton instance
|
||||
pubsub_manager = None
|
||||
|
||||
def get_pubsub_manager(redis_client=None):
|
||||
"""
|
||||
Get or create the PubSub manager singleton.
|
||||
|
||||
Args:
|
||||
redis_client: Optional Redis client to use
|
||||
|
||||
Returns:
|
||||
The PubSub manager instance
|
||||
"""
|
||||
global pubsub_manager
|
||||
|
||||
if pubsub_manager is None:
|
||||
pubsub_manager = RedisPubSubManager(redis_client)
|
||||
pubsub_manager.start_listening()
|
||||
|
||||
return pubsub_manager
|
||||
155
core/utils.py
155
core/utils.py
|
|
@ -1,28 +1,141 @@
|
|||
import redis
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
import threading
|
||||
from django.conf import settings
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_redis_client():
|
||||
"""Get Redis client with connection validation"""
|
||||
try:
|
||||
# Create Redis client
|
||||
client = redis.Redis(
|
||||
host=settings.REDIS_HOST,
|
||||
port=getattr(settings, 'REDIS_PORT', 6379),
|
||||
db=settings.REDIS_DB,
|
||||
socket_timeout=5,
|
||||
socket_connect_timeout=5
|
||||
)
|
||||
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
logger.info(f"Connected to Redis at {settings.REDIS_HOST}:6379/{settings.REDIS_DB}")
|
||||
return client
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Redis: {e}")
|
||||
return None
|
||||
def get_redis_client(max_retries=5, retry_interval=1):
|
||||
"""Get Redis client with connection validation and retry logic"""
|
||||
retry_count = 0
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
# Initialize the global client
|
||||
redis_client = get_redis_client()
|
||||
# Use standardized settings
|
||||
socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5)
|
||||
socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5)
|
||||
health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30)
|
||||
socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True)
|
||||
retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True)
|
||||
|
||||
# Create Redis client with better defaults
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=socket_timeout,
|
||||
socket_connect_timeout=socket_connect_timeout,
|
||||
socket_keepalive=socket_keepalive,
|
||||
health_check_interval=health_check_interval,
|
||||
retry_on_timeout=retry_on_timeout
|
||||
)
|
||||
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}")
|
||||
return client
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}")
|
||||
return None
|
||||
else:
|
||||
# Use exponential backoff for retries
|
||||
wait_time = retry_interval * (2 ** (retry_count - 1))
|
||||
logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis: {e}")
|
||||
return None
|
||||
|
||||
def get_redis_pubsub_client(max_retries=5, retry_interval=3):
|
||||
"""Get Redis client optimized for PubSub operations"""
|
||||
retry_count = 0
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
# Use standardized settings but without socket timeouts for PubSub
|
||||
# Important: socket_timeout is None for PubSub operations
|
||||
socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5)
|
||||
socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True)
|
||||
health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30)
|
||||
retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True)
|
||||
|
||||
# Create Redis client with PubSub-optimized settings - no timeout
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=None, # Critical: No timeout for PubSub operations
|
||||
socket_connect_timeout=socket_connect_timeout,
|
||||
socket_keepalive=socket_keepalive,
|
||||
health_check_interval=health_check_interval,
|
||||
retry_on_timeout=retry_on_timeout
|
||||
)
|
||||
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}")
|
||||
|
||||
# We don't need the keepalive thread anymore since we're using proper PubSub handling
|
||||
return client
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}")
|
||||
return None
|
||||
else:
|
||||
# Use exponential backoff for retries
|
||||
wait_time = retry_interval * (2 ** (retry_count - 1))
|
||||
logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis for PubSub: {e}")
|
||||
return None
|
||||
|
||||
def execute_redis_command(redis_client, command_func, default_return=None):
|
||||
"""
|
||||
Execute a Redis command with proper error handling
|
||||
|
||||
Args:
|
||||
redis_client: The Redis client instance
|
||||
command_func: Lambda function containing the Redis command to execute
|
||||
default_return: Value to return if command fails
|
||||
|
||||
Returns:
|
||||
Command result or default_return on failure
|
||||
"""
|
||||
if redis_client is None:
|
||||
return default_return
|
||||
|
||||
try:
|
||||
return command_func()
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection error: {e}")
|
||||
return default_return
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error: {e}")
|
||||
return default_return
|
||||
|
||||
# Initialize the global clients with retry logic
|
||||
redis_client = get_redis_client(max_retries=10, retry_interval=1)
|
||||
redis_pubsub_client = get_redis_pubsub_client(max_retries=10, retry_interval=1)
|
||||
|
||||
# Import and initialize the PubSub manager
|
||||
from .redis_pubsub import get_pubsub_manager
|
||||
pubsub_manager = get_pubsub_manager(redis_client)
|
||||
|
|
@ -192,8 +192,15 @@ SIMPLE_JWT = {
|
|||
'BLACKLIST_AFTER_ROTATION': True, # Optional: Whether to blacklist refresh tokens
|
||||
}
|
||||
|
||||
# Redis settings for TS proxy
|
||||
# Redis connection settings
|
||||
REDIS_URL = 'redis://localhost:6379/0'
|
||||
REDIS_SOCKET_TIMEOUT = 60 # Socket timeout in seconds
|
||||
REDIS_SOCKET_CONNECT_TIMEOUT = 5 # Connection timeout in seconds
|
||||
REDIS_HEALTH_CHECK_INTERVAL = 15 # Health check every 15 seconds
|
||||
REDIS_SOCKET_KEEPALIVE = True # Enable socket keepalive
|
||||
REDIS_RETRY_ON_TIMEOUT = True # Retry on timeout
|
||||
REDIS_MAX_RETRIES = 10 # Maximum number of retries
|
||||
REDIS_RETRY_INTERVAL = 1 # Initial retry interval in seconds
|
||||
|
||||
# Proxy Settings
|
||||
PROXY_SETTINGS = {
|
||||
|
|
|
|||
|
|
@ -85,10 +85,6 @@ else
|
|||
pids+=("$nginx_pid")
|
||||
fi
|
||||
|
||||
cd /app
|
||||
python manage.py migrate --noinput
|
||||
python manage.py collectstatic --noinput
|
||||
|
||||
uwsgi_file="/app/docker/uwsgi.ini"
|
||||
if [ "$DISPATCHARR_ENV" = "dev" ]; then
|
||||
uwsgi_file="/app/docker/uwsgi.dev.ini"
|
||||
|
|
@ -100,6 +96,12 @@ uwsgi_pid=$(pgrep uwsgi | sort | head -n1)
|
|||
echo "✅ uwsgi started with PID $uwsgi_pid"
|
||||
pids+=("$uwsgi_pid")
|
||||
|
||||
|
||||
|
||||
cd /app
|
||||
python manage.py migrate --noinput
|
||||
python manage.py collectstatic --noinput
|
||||
|
||||
# Wait for at least one process to exit and log the process that exited first
|
||||
if [ ${#pids[@]} -gt 0 ]; then
|
||||
echo "⏳ Waiting for processes to exit..."
|
||||
|
|
|
|||
|
|
@ -2,9 +2,14 @@
|
|||
; exec-before = python manage.py collectstatic --noinput
|
||||
; exec-before = python manage.py migrate --noinput
|
||||
|
||||
; First run Redis availability check script once
|
||||
exec-pre = python /app/scripts/wait_for_redis.py
|
||||
|
||||
; Start Redis first
|
||||
attach-daemon = redis-server
|
||||
; Then start other services
|
||||
attach-daemon = celery -A dispatcharr worker -l info
|
||||
attach-daemon = celery -A dispatcharr beat -l info
|
||||
attach-daemon = redis-server
|
||||
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
|
||||
attach-daemon = cd /app/frontend && npm run dev
|
||||
|
||||
|
|
|
|||
|
|
@ -2,9 +2,14 @@
|
|||
; exec-before = python manage.py collectstatic --noinput
|
||||
; exec-before = python manage.py migrate --noinput
|
||||
|
||||
; First run Redis availability check script once
|
||||
exec-pre = python /app/scripts/wait_for_redis.py
|
||||
|
||||
; Start Redis first
|
||||
attach-daemon = redis-server
|
||||
; Then start other services
|
||||
attach-daemon = celery -A dispatcharr worker -l error
|
||||
attach-daemon = celery -A dispatcharr beat -l error
|
||||
attach-daemon = redis-server
|
||||
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
|
||||
|
||||
# Core settings
|
||||
|
|
|
|||
60
scripts/wait_for_redis.py
Normal file
60
scripts/wait_for_redis.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
#!/usr/bin/env python
|
||||
"""
|
||||
Helper script to wait for Redis to be available before starting the application.
|
||||
"""
|
||||
|
||||
import redis
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def wait_for_redis(host='localhost', port=6379, db=0, max_retries=30, retry_interval=2):
|
||||
"""Wait for Redis to become available"""
|
||||
redis_client = None
|
||||
retry_count = 0
|
||||
|
||||
logger.info(f"Waiting for Redis at {host}:{port}/{db}...")
|
||||
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
redis_client = redis.Redis(
|
||||
host=host,
|
||||
port=port,
|
||||
db=db,
|
||||
socket_timeout=2,
|
||||
socket_connect_timeout=2
|
||||
)
|
||||
redis_client.ping()
|
||||
logger.info(f"✅ Redis at {host}:{port}/{db} is now available!")
|
||||
return True
|
||||
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"❌ Failed to connect to Redis after {max_retries} attempts: {e}")
|
||||
return False
|
||||
|
||||
logger.info(f"⏳ Redis not available yet, retrying in {retry_interval}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(retry_interval)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Unexpected error connecting to Redis: {e}")
|
||||
return False
|
||||
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
host = os.environ.get('REDIS_HOST', 'localhost')
|
||||
port = int(os.environ.get('REDIS_PORT', 6379))
|
||||
db = int(os.environ.get('REDIS_DB', 0))
|
||||
max_retries = int(os.environ.get('REDIS_WAIT_RETRIES', 30))
|
||||
retry_interval = int(os.environ.get('REDIS_WAIT_INTERVAL', 2))
|
||||
|
||||
logger.info(f"Starting Redis availability check at {host}:{port}/{db}")
|
||||
|
||||
if wait_for_redis(host, port, db, max_retries, retry_interval):
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
Loading…
Add table
Add a link
Reference in a new issue