Dispatcharr/apps/proxy/ts_proxy/server.py
SergeantPanda 89a23164ff Enhancement: Add system event logging and viewer with M3U/EPG endpoint caching
System Event Logging:
- Add SystemEvent model with 15 event types tracking channel operations, client connections, M3U/EPG activities, and buffering events
- Log detailed metrics for M3U/EPG refresh operations (streams/programs created/updated/deleted)
- Track M3U/EPG downloads with client information (IP address, user agent, profile, channel count)
- Record channel lifecycle events (start, stop, reconnect) with stream and client details
- Monitor client connections/disconnections and buffering events with stream metadata

Event Viewer UI:
- Add SystemEvents component with real-time updates via WebSocket
- Implement pagination, filtering by event type, and configurable auto-refresh
- Display events with color-coded badges and type-specific icons
- Integrate event viewer into Stats page with modal display
- Add event management settings (retention period, refresh rate)

M3U/EPG Endpoint Optimizations:
- Implement content caching with 5-minute TTL to reduce duplicate processing
- Add client-based event deduplication (2-second window) using IP and user agent hashing
- Support HEAD requests for efficient preflight checks
- Cache streamed EPG responses while maintaining streaming behavior for first request
2025-11-20 17:41:06 -06:00

1449 lines
73 KiB
Python

"""
Transport Stream (TS) Proxy Server
Handles live TS stream proxying with support for:
- Stream switching
- Buffer management
- Multiple client connections
- Connection state tracking
"""
import threading
import logging
import socket
import random
import time
import sys
import os
import json
import gevent # Add gevent import
from typing import Dict, Optional, Set
from apps.proxy.config import TSConfig as Config
from apps.channels.models import Channel, Stream
from core.utils import RedisClient, log_system_event
from redis.exceptions import ConnectionError, TimeoutError
from .stream_manager import StreamManager
from .stream_buffer import StreamBuffer
from .client_manager import ClientManager
from .redis_keys import RedisKeys
from .constants import ChannelState, EventType, StreamType
from .config_helper import ConfigHelper
from .utils import get_logger
logger = get_logger()
class ProxyServer:
"""Manages TS proxy server instance with worker coordination"""
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
from .server import ProxyServer
from .stream_manager import StreamManager
from .stream_buffer import StreamBuffer
from .client_manager import ClientManager
cls._instance = ProxyServer()
return cls._instance
def __init__(self):
"""Initialize proxy server with worker identification"""
self.stream_managers = {}
self.stream_buffers = {}
self.client_managers = {}
# Generate a unique worker ID
import socket
import os
pid = os.getpid()
hostname = socket.gethostname()
self.worker_id = f"{hostname}:{pid}"
# Connect to Redis - use dedicated client for proxy
self.redis_client = None
self.redis_connection_attempts = 0
self.redis_max_retries = 3
self.redis_retry_interval = 5 # seconds
try:
# Use dedicated Redis client for proxy
self.redis_client = RedisClient.get_client()
if self.redis_client is not None:
logger.info(f"Using dedicated Redis client for proxy server")
logger.info(f"Worker ID: {self.worker_id}")
else:
# Fall back to direct connection with retry
self._setup_redis_connection()
except Exception as e:
logger.error(f"Failed to initialize Redis: {e}")
self.redis_client = None
# Start cleanup thread
self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60)
self._start_cleanup_thread()
# Start event listener for Redis pubsub messages
self._start_event_listener()
def _setup_redis_connection(self):
"""Setup Redis connection with retry logic"""
# Try to use get_redis_client utility instead of direct connection
self.redis_client = RedisClient.get_client(max_retries=self.redis_max_retries,
retry_interval=self.redis_retry_interval)
if self.redis_client:
logger.info(f"Successfully connected to Redis using utility function")
logger.info(f"Worker ID: {self.worker_id}")
else:
logger.error(f"Failed to connect to Redis after {self.redis_max_retries} attempts")
def _execute_redis_command(self, command_func, *args, **kwargs):
"""Execute Redis command with error handling and reconnection logic"""
if not self.redis_client:
return None
try:
return command_func(*args, **kwargs)
except (ConnectionError, TimeoutError) as e:
logger.warning(f"Redis connection lost: {e}. Attempting to reconnect...")
try:
# Try to reconnect
self.redis_connection_attempts = 0
self._setup_redis_connection()
if self.redis_client:
# Retry the command once
return command_func(*args, **kwargs)
except Exception as reconnect_error:
logger.error(f"Failed to reconnect to Redis: {reconnect_error}")
return None
except Exception as e:
logger.error(f"Redis command error: {e}")
return None
def _start_event_listener(self):
"""Listen for events from other workers"""
if not self.redis_client:
return
def event_listener():
retry_count = 0
max_retries = 10
base_retry_delay = 1 # Start with 1 second delay
max_retry_delay = 30 # Cap at 30 seconds
pubsub_client = None
pubsub = None
while True:
try:
# Use dedicated PubSub client for event listener
pubsub_client = RedisClient.get_pubsub_client()
if pubsub_client:
logger.info("Using dedicated Redis PubSub client for event listener")
else:
# Fall back to creating a dedicated client if utility fails
logger.warning("Utility function for PubSub client failed, creating direct connection")
from django.conf import settings
import redis
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
pubsub_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
socket_timeout=60,
socket_connect_timeout=10,
socket_keepalive=True,
health_check_interval=30
)
logger.info("Created fallback Redis PubSub client for event listener")
# Test connection before subscribing
pubsub_client.ping()
# Create a pubsub instance from the client
pubsub = pubsub_client.pubsub()
pubsub.psubscribe("ts_proxy:events:*")
logger.info(f"Started Redis event listener for client activity")
# Reset retry count on successful connection
retry_count = 0
for message in pubsub.listen():
if message["type"] != "pmessage":
continue
try:
channel = message["channel"].decode("utf-8")
data = json.loads(message["data"].decode("utf-8"))
event_type = data.get("event")
channel_id = data.get("channel_id")
if channel_id and event_type:
# For owner, update client status immediately
if self.am_i_owner(channel_id):
if event_type == EventType.CLIENT_CONNECTED:
logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}")
# Reset any disconnect timer
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
self.redis_client.delete(disconnect_key)
elif event_type == EventType.CLIENT_DISCONNECTED:
client_id = data.get("client_id")
worker_id = data.get("worker_id")
logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}, client {client_id} from worker {worker_id}")
# Delegate to dedicated method
self.handle_client_disconnect(channel_id)
elif event_type == EventType.STREAM_SWITCH:
logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}")
# Handle stream switch request
new_url = data.get("url")
user_agent = data.get("user_agent")
if new_url and channel_id in self.stream_managers:
# Update metadata in Redis
if self.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
self.redis_client.hset(metadata_key, "url", new_url)
if user_agent:
self.redis_client.hset(metadata_key, "user_agent", user_agent)
# Set switch status
status_key = RedisKeys.switch_status(channel_id)
self.redis_client.set(status_key, "switching")
# Perform the stream switch
stream_manager = self.stream_managers[channel_id]
success = stream_manager.update_url(new_url)
if success:
logger.info(f"Stream switch initiated for channel {channel_id}")
# Publish confirmation
switch_result = {
"event": EventType.STREAM_SWITCHED, # Use constant instead of string
"channel_id": channel_id,
"success": True,
"url": new_url,
"timestamp": time.time()
}
self.redis_client.publish(
f"ts_proxy:events:{channel_id}",
json.dumps(switch_result)
)
# Update status
if self.redis_client:
self.redis_client.set(status_key, "switched")
else:
logger.error(f"Failed to switch stream for channel {channel_id}")
# Publish failure
switch_result = {
"event": EventType.STREAM_SWITCHED,
"channel_id": channel_id,
"success": False,
"url": new_url,
"timestamp": time.time()
}
self.redis_client.publish(
f"ts_proxy:events:{channel_id}",
json.dumps(switch_result)
)
elif event_type == EventType.CHANNEL_STOP:
logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}")
# First mark channel as stopping in Redis
if self.redis_client:
# Set stopping state in metadata
metadata_key = RedisKeys.channel_metadata(channel_id)
if self.redis_client.exists(metadata_key):
self.redis_client.hset(metadata_key, mapping={
"state": ChannelState.STOPPING,
"state_changed_at": str(time.time())
})
# If we have local resources for this channel, clean them up
if channel_id in self.stream_buffers or channel_id in self.client_managers:
# Use existing stop_channel method
logger.info(f"Stopping local resources for channel {channel_id}")
self.stop_channel(channel_id)
# Acknowledge stop by publishing a response
stop_response = {
"event": EventType.CHANNEL_STOPPED,
"channel_id": channel_id,
"worker_id": self.worker_id,
"timestamp": time.time()
}
self.redis_client.publish(
f"ts_proxy:events:{channel_id}",
json.dumps(stop_response)
)
elif event_type == EventType.CLIENT_STOP:
client_id = data.get("client_id")
if client_id and channel_id:
logger.info(f"Received request to stop client {client_id} on channel {channel_id}")
# Both remove from client manager AND set a key for the generator to detect
if channel_id in self.client_managers:
client_manager = self.client_managers[channel_id]
if client_id in client_manager.clients:
client_manager.remove_client(client_id)
logger.info(f"Removed client {client_id} from client manager")
# Set a Redis key for the generator to detect
if self.redis_client:
stop_key = RedisKeys.client_stop(channel_id, client_id)
self.redis_client.setex(stop_key, 30, "true") # 30 second TTL
logger.info(f"Set stop key for client {client_id}")
except Exception as e:
logger.error(f"Error processing event message: {e}")
except (ConnectionError, TimeoutError) as e:
# Calculate exponential backoff with jitter
retry_count += 1
delay = min(base_retry_delay * (2 ** (retry_count - 1)), max_retry_delay)
# Add some randomness to prevent thundering herd
jitter = random.uniform(0, 0.5 * delay)
final_delay = delay + jitter
logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})")
gevent.sleep(final_delay) # REPLACE: time.sleep(final_delay)
except Exception as e:
logger.error(f"Error in event listener: {e}")
# Add a short delay to prevent rapid retries on persistent errors
gevent.sleep(5) # REPLACE: time.sleep(5)
finally:
# Always clean up PubSub connections in all error paths
try:
if pubsub:
pubsub.close()
pubsub = None
except Exception as e:
logger.debug(f"Error closing pubsub: {e}")
try:
if pubsub_client:
pubsub_client.close()
pubsub_client = None
except Exception as e:
logger.debug(f"Error closing pubsub_client: {e}")
thread = threading.Thread(target=event_listener, daemon=True)
thread.name = "redis-event-listener"
thread.start()
def get_channel_owner(self, channel_id):
"""Get the worker ID that owns this channel with proper error handling"""
if not self.redis_client:
return None
try:
lock_key = RedisKeys.channel_owner(channel_id)
return self._execute_redis_command(
lambda: self.redis_client.get(lock_key).decode('utf-8') if self.redis_client.get(lock_key) else None
)
except Exception as e:
logger.error(f"Error getting channel owner: {e}")
return None
def am_i_owner(self, channel_id):
"""Check if this worker is the owner of the channel"""
owner = self.get_channel_owner(channel_id)
return owner == self.worker_id
def try_acquire_ownership(self, channel_id, ttl=30):
"""Try to become the owner of this channel using proper locking"""
if not self.redis_client:
return True # If no Redis, always become owner
try:
# Create a lock key with proper namespace
lock_key = RedisKeys.channel_owner(channel_id)
# Use Redis SETNX for atomic locking with error handling
acquired = self._execute_redis_command(
lambda: self.redis_client.setnx(lock_key, self.worker_id)
)
if acquired is None: # Redis command failed
logger.warning(f"Redis command failed during ownership acquisition - assuming ownership")
return True
# If acquired, set expiry to prevent orphaned locks
if acquired:
self._execute_redis_command(
lambda: self.redis_client.expire(lock_key, ttl)
)
logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}")
return True
# If not acquired, check if we already own it (might be a retry)
current_owner = self._execute_redis_command(
lambda: self.redis_client.get(lock_key)
)
if current_owner and current_owner.decode('utf-8') == self.worker_id:
# Refresh TTL
self._execute_redis_command(
lambda: self.redis_client.expire(lock_key, ttl)
)
logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}")
return True
# Someone else owns it
return False
except Exception as e:
logger.error(f"Error acquiring channel ownership: {e}")
return False
def release_ownership(self, channel_id):
"""Release ownership of this channel safely"""
if not self.redis_client:
return
try:
lock_key = RedisKeys.channel_owner(channel_id)
# Only delete if we're the current owner to prevent race conditions
current = self.redis_client.get(lock_key)
if current and current.decode('utf-8') == self.worker_id:
self.redis_client.delete(lock_key)
logger.info(f"Released ownership of channel {channel_id}")
# Also ensure channel stopping key is set to signal clients
stop_key = RedisKeys.channel_stopping(channel_id)
self.redis_client.setex(stop_key, 30, "true")
logger.info(f"Set stopping signal for channel {channel_id} clients")
except Exception as e:
logger.error(f"Error releasing channel ownership: {e}")
def extend_ownership(self, channel_id, ttl=30):
"""Extend ownership lease with grace period"""
if not self.redis_client:
return False
try:
lock_key = RedisKeys.channel_owner(channel_id)
current = self.redis_client.get(lock_key)
# Only extend if we're still the owner
if current and current.decode('utf-8') == self.worker_id:
self.redis_client.expire(lock_key, ttl)
return True
return False
except Exception as e:
logger.error(f"Error extending ownership: {e}")
return False
def initialize_channel(self, url, channel_id, user_agent=None, transcode=False, stream_id=None):
"""Initialize a channel without redundant active key"""
try:
# IMPROVED: First check if channel is already being initialized by another process
if self.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
if self.redis_client.exists(metadata_key):
metadata = self.redis_client.hgetall(metadata_key)
if b'state' in metadata:
state = metadata[b'state'].decode('utf-8')
active_states = [ChannelState.INITIALIZING, ChannelState.CONNECTING,
ChannelState.WAITING_FOR_CLIENTS, ChannelState.ACTIVE, ChannelState.BUFFERING]
if state in active_states:
logger.info(f"Channel {channel_id} already being initialized with state {state}")
# Create buffer and client manager only if we don't have them
if channel_id not in self.stream_buffers:
self.stream_buffers[channel_id] = StreamBuffer(channel_id, redis_client=self.redis_client)
if channel_id not in self.client_managers:
self.client_managers[channel_id] = ClientManager(
channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
return True
# Create buffer and client manager instances (or reuse if they exist)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
if channel_id not in self.client_managers:
client_manager = ClientManager(
channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# IMPROVED: Set initializing state in Redis BEFORE any other operations
if self.redis_client:
# Set early initialization state to prevent race conditions
metadata_key = RedisKeys.channel_metadata(channel_id)
initial_metadata = {
"state": ChannelState.INITIALIZING,
"init_time": str(time.time()),
"owner": self.worker_id
}
if stream_id:
initial_metadata["stream_id"] = str(stream_id)
self.redis_client.hset(metadata_key, mapping=initial_metadata)
logger.info(f"Set early initializing state for channel {channel_id}")
# Get channel URL from Redis if available
channel_url = url
channel_user_agent = user_agent
channel_stream_id = stream_id # Store the stream ID
# First check if channel metadata already exists
existing_metadata = None
metadata_key = RedisKeys.channel_metadata(channel_id)
if self.redis_client:
existing_metadata = self.redis_client.hgetall(metadata_key)
# If no url was passed, try to get from Redis
if not url and existing_metadata:
url_bytes = existing_metadata.get(b'url')
if url_bytes:
channel_url = url_bytes.decode('utf-8')
ua_bytes = existing_metadata.get(b'user_agent')
if ua_bytes:
channel_user_agent = ua_bytes.decode('utf-8')
# Get stream ID from metadata if not provided
if not channel_stream_id and b'stream_id' in existing_metadata:
try:
channel_stream_id = int(existing_metadata[b'stream_id'].decode('utf-8'))
logger.debug(f"Found stream_id {channel_stream_id} in metadata for channel {channel_id}")
except (ValueError, TypeError) as e:
logger.debug(f"Could not parse stream_id from metadata: {e}")
# Check if channel is already owned
current_owner = self.get_channel_owner(channel_id)
# Exit early if another worker owns the channel
if current_owner and current_owner != self.worker_id:
logger.info(f"Channel {channel_id} already owned by worker {current_owner}")
logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only")
# Create buffer but not stream manager (only if not already exists)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
# Only continue with full initialization if URL is provided
# or we can get it from Redis
if not channel_url:
logger.error(f"No URL available for channel {channel_id}")
return False
# Try to acquire ownership with Redis locking
if not self.try_acquire_ownership(channel_id):
# Another worker just acquired ownership
logger.info(f"Another worker just acquired ownership of channel {channel_id}")
# Create buffer but not stream manager (only if not already exists)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
# We now own the channel - ONLY NOW should we set metadata with initializing state
logger.info(f"Worker {self.worker_id} is now the owner of channel {channel_id}")
if self.redis_client:
# NOW create or update metadata with initializing state
metadata = {
"url": channel_url,
"init_time": str(time.time()),
"last_active": str(time.time()),
"owner": self.worker_id,
"state": ChannelState.INITIALIZING # Use constant instead of string literal
}
if channel_user_agent:
metadata["user_agent"] = channel_user_agent
# Make sure stream_id is always set in metadata and properly logged
if channel_stream_id:
metadata["stream_id"] = str(channel_stream_id)
logger.info(f"Storing stream_id {channel_stream_id} in metadata for channel {channel_id}")
else:
logger.warning(f"No stream_id provided for channel {channel_id} during initialization")
# Set channel metadata BEFORE creating the StreamManager
self.redis_client.hset(metadata_key, mapping=metadata)
self.redis_client.expire(metadata_key, 3600) # Increased TTL from 30 seconds to 1 hour
# Verify the stream_id was set correctly in Redis
stream_id_value = self.redis_client.hget(metadata_key, "stream_id")
if stream_id_value:
logger.info(f"Verified stream_id {stream_id_value.decode('utf-8')} is set in Redis for channel {channel_id}")
else:
logger.warning(f"Failed to set stream_id in Redis for channel {channel_id}")
# Create stream buffer
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
logger.debug(f"Created StreamBuffer for channel {channel_id}")
self.stream_buffers[channel_id] = buffer
# Only the owner worker creates the actual stream manager
stream_manager = StreamManager(
channel_id,
channel_url,
buffer,
user_agent=channel_user_agent,
transcode=transcode,
stream_id=channel_stream_id, # Pass stream ID to the manager
worker_id=self.worker_id # Pass worker_id explicitly to eliminate circular dependency
)
logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}")
self.stream_managers[channel_id] = stream_manager
# Log channel start event
try:
channel_obj = Channel.objects.get(uuid=channel_id)
# Get stream name if stream_id is available
stream_name = None
if channel_stream_id:
try:
stream_obj = Stream.objects.get(id=channel_stream_id)
stream_name = stream_obj.name
except Exception:
pass
log_system_event(
'channel_start',
channel_id=channel_id,
channel_name=channel_obj.name,
stream_name=stream_name,
stream_id=channel_stream_id
)
except Exception as e:
logger.error(f"Could not log channel start event: {e}")
# Create client manager with channel_id, redis_client AND worker_id (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(
channel_id=channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# Start stream manager thread only for the owner
thread = threading.Thread(target=stream_manager.run, daemon=True)
thread.name = f"stream-{channel_id}"
thread.start()
logger.info(f"Started stream manager thread for channel {channel_id}")
# If we're the owner, we need to set the channel state rather than starting a grace period immediately
if self.am_i_owner(channel_id):
self.update_channel_state(channel_id, ChannelState.CONNECTING, {
"init_time": str(time.time()),
"owner": self.worker_id
})
# Set connection attempt start time
attempt_key = RedisKeys.connection_attempt(channel_id)
self.redis_client.setex(attempt_key, 60, str(time.time()))
logger.info(f"Channel {channel_id} in {ChannelState.CONNECTING} state - will start grace period after connection")
return True
except Exception as e:
logger.error(f"Error initializing channel {channel_id}: {e}", exc_info=True)
# Release ownership on failure
self.release_ownership(channel_id)
return False
def check_if_channel_exists(self, channel_id):
"""
Check if a channel exists and is in a valid state.
Enhanced to detect zombie channels after server restarts.
"""
# Check local memory first
if channel_id in self.stream_managers or channel_id in self.stream_buffers:
return True
# Check Redis using the standard key pattern
if self.redis_client:
# Primary check - look for channel metadata
metadata_key = RedisKeys.channel_metadata(channel_id)
# If metadata exists, validate it's in a healthy state
if self.redis_client.exists(metadata_key):
metadata = self.redis_client.hgetall(metadata_key)
# Get channel state and owner
state = metadata.get(b'state', b'unknown').decode('utf-8')
owner = metadata.get(b'owner', b'').decode('utf-8')
# States that indicate the channel is running properly or shutting down
valid_states = [ChannelState.ACTIVE, ChannelState.WAITING_FOR_CLIENTS,
ChannelState.CONNECTING, ChannelState.BUFFERING, ChannelState.INITIALIZING,
ChannelState.STOPPING]
# If the channel is in a valid state, check if the owner is still active
if state in valid_states:
# Check if owner still exists by checking heartbeat
owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat"
owner_alive = self.redis_client.exists(owner_heartbeat_key)
if owner_alive:
return True
else:
# This is a zombie channel - owner is gone but metadata still exists
logger.warning(f"Detected zombie channel {channel_id} - owner {owner} is no longer active")
# Check if there are any clients connected
client_set_key = RedisKeys.clients(channel_id)
client_count = self.redis_client.scard(client_set_key) or 0
if client_count > 0:
logger.warning(f"Zombie channel {channel_id} has {client_count} clients - attempting ownership takeover")
# Could potentially take ownership here in the future
# For now, just clean it up to be safe
else:
logger.warning(f"Zombie channel {channel_id} has no clients - cleaning up")
self._clean_zombie_channel(channel_id, metadata)
return False
elif state in [ChannelState.STOPPED, ChannelState.ERROR]:
# These terminal states indicate the channel should be cleaned up and reinitialized
logger.info(f"Channel {channel_id} in terminal state {state} - returning False to trigger cleanup")
return False
else:
# Unknown or initializing state, check how long it's been in this state
if b'state_changed_at' in metadata:
state_changed_at = float(metadata[b'state_changed_at'].decode('utf-8'))
state_age = time.time() - state_changed_at
# If in initializing state for too long, consider it stale
if state_age > 60: # 60 seconds threshold
logger.warning(f"Channel {channel_id} stuck in {state} state for {state_age:.1f}s - treating as zombie")
self._clean_zombie_channel(channel_id, metadata)
return False
# Otherwise assume it's still in progress
return True
# Additional checks if metadata doesn't exist
additional_keys = [
RedisKeys.clients(channel_id),
RedisKeys.buffer_index(channel_id),
RedisKeys.channel_owner(channel_id)
]
for key in additional_keys:
if self.redis_client.exists(key):
# Found orphaned keys without metadata - clean them up
logger.warning(f"Found orphaned keys for channel {channel_id} without metadata - cleaning up")
self._clean_redis_keys(channel_id)
return False
return False
def _clean_zombie_channel(self, channel_id, metadata=None):
"""Clean up a zombie channel (channel with Redis keys but no active owner)"""
try:
logger.info(f"Cleaning up zombie channel {channel_id}")
# If we have metadata, log details for debugging
if metadata:
state = metadata.get(b'state', b'unknown').decode('utf-8')
owner = metadata.get(b'owner', b'unknown').decode('utf-8')
logger.info(f"Zombie channel details - state: {state}, owner: {owner}")
# Clean up Redis keys
self._clean_redis_keys(channel_id)
# Force release resources in the Channel model
try:
channel = Channel.objects.get(uuid=channel_id)
channel.release_stream()
logger.info(f"Released stream allocation for zombie channel {channel_id}")
except Exception as e:
try:
stream = Stream.objects.get(stream_hash=channel_id)
stream.release_stream()
logger.info(f"Released stream allocation for zombie channel {channel_id}")
except Exception as e:
logger.error(f"Error releasing stream for zombie channel {channel_id}: {e}")
return True
except Exception as e:
logger.error(f"Error cleaning zombie channel {channel_id}: {e}", exc_info=True)
return False
def handle_client_disconnect(self, channel_id):
"""
Handle client disconnect event - check if channel should shut down.
Can be called directly by owner or via PubSub from non-owner workers.
"""
if channel_id not in self.client_managers:
return
try:
# VERIFY REDIS CLIENT COUNT DIRECTLY
client_set_key = RedisKeys.clients(channel_id)
total = self.redis_client.scard(client_set_key) or 0
if total == 0:
logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}")
# Set the disconnect timer for other workers to see
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
self.redis_client.setex(disconnect_key, 60, str(time.time()))
# Get configured shutdown delay or default
shutdown_delay = ConfigHelper.channel_shutdown_delay()
if shutdown_delay > 0:
logger.info(f"Waiting {shutdown_delay}s before stopping channel...")
gevent.sleep(shutdown_delay)
# Re-check client count before stopping
total = self.redis_client.scard(client_set_key) or 0
if total > 0:
logger.info(f"New clients connected during shutdown delay - aborting shutdown")
self.redis_client.delete(disconnect_key)
return
# Stop the channel directly
self.stop_channel(channel_id)
except Exception as e:
logger.error(f"Error handling client disconnect for channel {channel_id}: {e}")
def stop_channel(self, channel_id):
"""Stop a channel with proper ownership handling"""
try:
logger.info(f"Stopping channel {channel_id}")
# First set a stopping key that clients will check
if self.redis_client:
stop_key = RedisKeys.channel_stopping(channel_id)
self.redis_client.setex(stop_key, 10, "true")
# Only stop the actual stream manager if we're the owner
if self.am_i_owner(channel_id):
logger.info(f"This worker ({self.worker_id}) is the owner - closing provider connection")
if channel_id in self.stream_managers:
stream_manager = self.stream_managers[channel_id]
# Signal thread to stop and close resources
if hasattr(stream_manager, 'stop'):
stream_manager.stop()
else:
stream_manager.running = False
if hasattr(stream_manager, '_close_socket'):
stream_manager._close_socket()
# Wait for stream thread to finish
stream_thread_name = f"stream-{channel_id}"
stream_thread = None
for thread in threading.enumerate():
if thread.name == stream_thread_name:
stream_thread = thread
break
if stream_thread and stream_thread.is_alive():
logger.info(f"Waiting for stream thread to terminate")
try:
# Very short timeout to prevent hanging the app
stream_thread.join(timeout=2.0)
if stream_thread.is_alive():
logger.warning(f"Stream thread did not terminate within timeout")
except RuntimeError:
logger.debug(f"Could not join stream thread (may be current thread)")
# Release ownership
self.release_ownership(channel_id)
logger.info(f"Released ownership of channel {channel_id}")
# Log channel stop event (after cleanup, before releasing ownership section ends)
try:
channel_obj = Channel.objects.get(uuid=channel_id)
# Calculate runtime and get total bytes from metadata
runtime = None
total_bytes = None
if self.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
metadata = self.redis_client.hgetall(metadata_key)
if metadata:
# Calculate runtime from init_time
if b'init_time' in metadata:
try:
init_time = float(metadata[b'init_time'].decode('utf-8'))
runtime = round(time.time() - init_time, 2)
except Exception:
pass
# Get total bytes transferred
if b'total_bytes' in metadata:
try:
total_bytes = int(metadata[b'total_bytes'].decode('utf-8'))
except Exception:
pass
log_system_event(
'channel_stop',
channel_id=channel_id,
channel_name=channel_obj.name,
runtime=runtime,
total_bytes=total_bytes
)
except Exception as e:
logger.error(f"Could not log channel stop event: {e}")
# Always clean up local resources - WITH SAFE CHECKS
if channel_id in self.stream_managers:
del self.stream_managers[channel_id]
logger.info(f"Removed stream manager for channel {channel_id}")
# Stop buffer and ensure all its timers are cancelled - SAFE CHECK HERE
if channel_id in self.stream_buffers:
buffer = self.stream_buffers[channel_id]
# Call stop on buffer to properly shut it down
if hasattr(buffer, 'stop'):
try:
buffer.stop()
logger.debug(f"Buffer for channel {channel_id} properly stopped")
except Exception as e:
logger.error(f"Error stopping buffer: {e}")
# Save reference and check again before deleting
try:
if channel_id in self.stream_buffers: # Check again to prevent race conditions
del self.stream_buffers[channel_id]
logger.info(f"Removed stream buffer for channel {channel_id}")
except KeyError:
logger.debug(f"Buffer for channel {channel_id} already removed")
# Clean up client manager - SAFE CHECK HERE TOO
if channel_id in self.client_managers:
try:
client_manager = self.client_managers[channel_id]
# Stop the heartbeat thread before deleting
if hasattr(client_manager, 'stop'):
client_manager.stop()
del self.client_managers[channel_id]
logger.info(f"Removed client manager for channel {channel_id}")
except KeyError:
logger.debug(f"Client manager for channel {channel_id} already removed")
# Clean up Redis keys
self._clean_redis_keys(channel_id)
return True
except Exception as e:
logger.error(f"Error stopping channel {channel_id}: {e}")
return False
def check_inactive_channels(self):
"""Check for inactive channels (no clients) and stop them"""
channels_to_stop = []
for channel_id, client_manager in self.client_managers.items():
if client_manager.get_client_count() == 0:
channels_to_stop.append(channel_id)
for channel_id in channels_to_stop:
logger.info(f"Auto-stopping inactive channel {channel_id}")
self.stop_channel(channel_id)
def _cleanup_channel(self, channel_id: str) -> None:
"""Remove channel resources"""
# Removed reference to non-existent fetch_threads collection
for collection in [self.stream_managers, self.stream_buffers, self.client_managers]:
collection.pop(channel_id, None)
def shutdown(self) -> None:
"""Stop all channels and cleanup"""
for channel_id in list(self.stream_managers.keys()):
self.stop_channel(channel_id)
def _start_cleanup_thread(self):
"""Start background thread to maintain ownership and clean up resources"""
def cleanup_task():
while True:
try:
# Send worker heartbeat first
if self.redis_client:
worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat"
self._execute_redis_command(
lambda: self.redis_client.setex(worker_heartbeat_key, 30, str(time.time()))
)
# Refresh channel registry
self.refresh_channel_registry()
# Create a unified list of all channels we have locally
all_local_channels = set(self.stream_managers.keys()) | set(self.client_managers.keys())
# Single loop through all channels - process each exactly once
for channel_id in list(all_local_channels):
if self.am_i_owner(channel_id):
# === OWNER CHANNEL HANDLING ===
# Extend ownership lease
self.extend_ownership(channel_id)
# Get channel state from metadata hash
channel_state = "unknown"
if self.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
metadata = self.redis_client.hgetall(metadata_key)
if metadata and b'state' in metadata:
channel_state = metadata[b'state'].decode('utf-8')
# Check if channel has any clients left
total_clients = 0
if channel_id in self.client_managers:
client_manager = self.client_managers[channel_id]
total_clients = client_manager.get_total_client_count()
else:
# This can happen during reconnection attempts or crashes
# Check Redis directly for any connected clients
if self.redis_client:
client_set_key = RedisKeys.clients(channel_id)
total_clients = self.redis_client.scard(client_set_key) or 0
if total_clients == 0:
logger.warning(f"Channel {channel_id} is missing client_manager but we're the owner with 0 clients - will trigger cleanup")
# Log client count periodically
if time.time() % 30 < 1: # Every ~30 seconds
logger.info(f"Channel {channel_id} has {total_clients} clients, state: {channel_state}")
# If in connecting or waiting_for_clients state, check grace period
if channel_state in [ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS]:
# Check if channel is already stopping
if self.redis_client:
stop_key = RedisKeys.channel_stopping(channel_id)
if self.redis_client.exists(stop_key):
logger.debug(f"Channel {channel_id} is already stopping - skipping monitor shutdown")
continue
# Get connection_ready_time from metadata (indicates if channel reached ready state)
connection_ready_time = None
if metadata and b'connection_ready_time' in metadata:
try:
connection_ready_time = float(metadata[b'connection_ready_time'].decode('utf-8'))
except (ValueError, TypeError):
pass
if total_clients == 0:
# Check if we have a connection_attempt timestamp (set when CONNECTING starts)
connection_attempt_time = None
attempt_key = RedisKeys.connection_attempt(channel_id)
if self.redis_client:
attempt_value = self.redis_client.get(attempt_key)
if attempt_value:
try:
connection_attempt_time = float(attempt_value.decode('utf-8'))
except (ValueError, TypeError):
pass
# Also get init time as a fallback
init_time = None
if metadata and b'init_time' in metadata:
try:
init_time = float(metadata[b'init_time'].decode('utf-8'))
except (ValueError, TypeError):
pass
# Use whichever timestamp we have (prefer connection_attempt as it's more recent)
start_time = connection_attempt_time or init_time
if start_time:
# Check which timeout to apply based on channel lifecycle
if connection_ready_time:
# Already reached ready - use shutdown_delay
time_since_ready = time.time() - connection_ready_time
shutdown_delay = ConfigHelper.channel_shutdown_delay()
if time_since_ready > shutdown_delay:
logger.warning(
f"Channel {channel_id} in {channel_state} state with 0 clients for {time_since_ready:.1f}s "
f"(after reaching ready, shutdown_delay: {shutdown_delay}s) - stopping channel"
)
self.stop_channel(channel_id)
continue
else:
# Never reached ready - use grace_period timeout
time_since_start = time.time() - start_time
connecting_timeout = ConfigHelper.channel_init_grace_period()
if time_since_start > connecting_timeout:
logger.warning(
f"Channel {channel_id} stuck in {channel_state} state for {time_since_start:.1f}s "
f"with no clients (timeout: {connecting_timeout}s) - stopping channel due to upstream issues"
)
self.stop_channel(channel_id)
continue
elif connection_ready_time:
# We have clients now, but check grace period for state transition
grace_period = ConfigHelper.channel_init_grace_period()
time_since_ready = time.time() - connection_ready_time
logger.debug(f"GRACE PERIOD CHECK: Channel {channel_id} in {channel_state} state, "
f"time_since_ready={time_since_ready:.1f}s, grace_period={grace_period}s, "
f"total_clients={total_clients}")
if time_since_ready <= grace_period:
# Still within grace period
logger.debug(f"Channel {channel_id} in grace period - {time_since_ready:.1f}s of {grace_period}s elapsed")
continue
else:
# Grace period expired with clients - mark channel as active
logger.info(f"Grace period expired with {total_clients} clients - marking channel {channel_id} as active")
if self.update_channel_state(channel_id, ChannelState.ACTIVE, {
"grace_period_ended_at": str(time.time()),
"clients_at_activation": str(total_clients)
}):
logger.info(f"Channel {channel_id} activated with {total_clients} clients after grace period")
# If active and no clients, start normal shutdown procedure
elif channel_state not in [ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS] and total_clients == 0:
# Check if channel is already stopping
if self.redis_client:
stop_key = RedisKeys.channel_stopping(channel_id)
if self.redis_client.exists(stop_key):
logger.debug(f"Channel {channel_id} is already stopping - skipping monitor shutdown")
continue
# Check if there's a pending no-clients timeout
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
disconnect_time = None
if self.redis_client:
disconnect_value = self.redis_client.get(disconnect_key)
if disconnect_value:
try:
disconnect_time = float(disconnect_value.decode('utf-8'))
except (ValueError, TypeError) as e:
logger.error(f"Invalid disconnect time for channel {channel_id}: {e}")
current_time = time.time()
if not disconnect_time:
# First time seeing zero clients, set timestamp
if self.redis_client:
self.redis_client.setex(disconnect_key, 60, str(current_time))
logger.warning(f"No clients detected for channel {channel_id}, starting shutdown timer")
elif current_time - disconnect_time > ConfigHelper.channel_shutdown_delay():
# We've had no clients for the shutdown delay period
logger.warning(f"No clients for {current_time - disconnect_time:.1f}s, stopping channel {channel_id}")
self.stop_channel(channel_id)
else:
# Still in shutdown delay period
logger.debug(f"Channel {channel_id} shutdown timer: "
f"{current_time - disconnect_time:.1f}s of "
f"{ConfigHelper.channel_shutdown_delay()}s elapsed")
else:
# There are clients or we're still connecting - clear any disconnect timestamp
if self.redis_client:
self.redis_client.delete(f"ts_proxy:channel:{channel_id}:last_client_disconnect_time")
else:
# === NON-OWNER CHANNEL HANDLING ===
# For channels we don't own, check if they've been stopped/cleaned up in Redis
if self.redis_client:
# Method 1: Check for stopping key
stop_key = RedisKeys.channel_stopping(channel_id)
if self.redis_client.exists(stop_key):
logger.debug(f"Non-owner cleanup: Channel {channel_id} has stopping flag in Redis, cleaning up local resources")
self._cleanup_local_resources(channel_id)
continue
# Method 2: Check if owner still exists
owner_key = RedisKeys.channel_owner(channel_id)
if not self.redis_client.exists(owner_key):
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no owner in Redis, cleaning up local resources")
self._cleanup_local_resources(channel_id)
continue
# Method 3: Check if metadata still exists
metadata_key = RedisKeys.channel_metadata(channel_id)
if not self.redis_client.exists(metadata_key):
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no metadata in Redis, cleaning up local resources")
self._cleanup_local_resources(channel_id)
continue
# Check for local client count - if zero, clean up our local resources
if channel_id in self.client_managers:
if self.client_managers[channel_id].get_client_count() == 0:
# We're not the owner, and we have no local clients - clean up our resources
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no local clients, cleaning up local resources")
self._cleanup_local_resources(channel_id)
else:
# This shouldn't happen, but clean up anyway
logger.warning(f"Non-owner cleanup: Channel {channel_id} has no client_manager entry, cleaning up local resources")
self._cleanup_local_resources(channel_id)
except Exception as e:
logger.error(f"Error in cleanup thread: {e}", exc_info=True)
# Periodically check for orphaned channels (every 30 seconds)
if hasattr(self, '_last_orphan_check'):
if time.time() - self._last_orphan_check > 30:
try:
self._check_orphaned_metadata()
self._last_orphan_check = time.time()
except Exception as orphan_error:
logger.error(f"Error checking orphaned metadata: {orphan_error}", exc_info=True)
else:
self._last_orphan_check = time.time()
gevent.sleep(ConfigHelper.cleanup_check_interval()) # REPLACE: time.sleep(ConfigHelper.cleanup_check_interval())
thread = threading.Thread(target=cleanup_task, daemon=True)
thread.name = "ts-proxy-cleanup"
thread.start()
logger.info(f"Started TS proxy cleanup thread (interval: {ConfigHelper.cleanup_check_interval()}s)")
def _check_orphaned_channels(self):
"""Check for orphaned channels in Redis (owner worker crashed)"""
if not self.redis_client:
return
try:
# Get all active channel keys
channel_pattern = "ts_proxy:channel:*:metadata"
channel_keys = self.redis_client.keys(channel_pattern)
for key in channel_keys:
try:
channel_id = key.decode('utf-8').split(':')[2]
# Check if this channel has an owner
owner = self.get_channel_owner(channel_id)
if not owner:
# Check if there are any clients
client_set_key = RedisKeys.clients(channel_id)
client_count = self.redis_client.scard(client_set_key) or 0
if client_count > 0:
# Orphaned channel with clients - we could take ownership
logger.info(f"Found orphaned channel {channel_id} with {client_count} clients")
else:
# Orphaned channel with no clients - clean it up
logger.info(f"Cleaning up orphaned channel {channel_id}")
# If we have it locally, stop it properly to clean up processes
if channel_id in self.stream_managers or channel_id in self.client_managers:
logger.info(f"Orphaned channel {channel_id} is local - calling stop_channel")
self.stop_channel(channel_id)
else:
# Just clean up Redis keys for remote channels
self._clean_redis_keys(channel_id)
except Exception as e:
logger.error(f"Error processing channel key {key}: {e}")
except Exception as e:
logger.error(f"Error checking orphaned channels: {e}")
def _check_orphaned_metadata(self):
"""
Check for metadata entries that have no owner and no clients.
This catches zombie channels that weren't cleaned up properly.
"""
if not self.redis_client:
return
try:
# Get all channel metadata keys
channel_pattern = "ts_proxy:channel:*:metadata"
channel_keys = self.redis_client.keys(channel_pattern)
for key in channel_keys:
try:
channel_id = key.decode('utf-8').split(':')[2]
# Get metadata first
metadata = self.redis_client.hgetall(key)
if not metadata:
# Empty metadata - clean it up
logger.warning(f"Found empty metadata for channel {channel_id} - cleaning up")
# If we have it locally, stop it properly
if channel_id in self.stream_managers or channel_id in self.client_managers:
self.stop_channel(channel_id)
else:
self._clean_redis_keys(channel_id)
continue
# Get owner
owner = metadata.get(b'owner', b'').decode('utf-8') if b'owner' in metadata else ''
# Check if owner is still alive
owner_alive = False
if owner:
owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat"
owner_alive = self.redis_client.exists(owner_heartbeat_key)
# Check client count
client_set_key = RedisKeys.clients(channel_id)
client_count = self.redis_client.scard(client_set_key) or 0
# If no owner and no clients, clean it up
if not owner_alive and client_count == 0:
state = metadata.get(b'state', b'unknown').decode('utf-8') if b'state' in metadata else 'unknown'
logger.warning(f"Found orphaned metadata for channel {channel_id} (state: {state}, owner: {owner}, clients: {client_count}) - cleaning up")
# If we have it locally, stop it properly to clean up transcode/proxy processes
if channel_id in self.stream_managers or channel_id in self.client_managers:
logger.info(f"Channel {channel_id} is local - calling stop_channel to clean up processes")
self.stop_channel(channel_id)
else:
# Just clean up Redis keys for remote channels
self._clean_redis_keys(channel_id)
elif not owner_alive and client_count > 0:
# Owner is gone but clients remain - just log for now
logger.warning(f"Found orphaned channel {channel_id} with {client_count} clients but no owner - may need ownership takeover")
except Exception as e:
logger.error(f"Error processing metadata key {key}: {e}", exc_info=True)
except Exception as e:
logger.error(f"Error checking orphaned metadata: {e}", exc_info=True)
def _clean_redis_keys(self, channel_id):
"""Clean up all Redis keys for a channel more efficiently"""
# Release the channel, stream, and profile keys from the channel
try:
channel = Channel.objects.get(uuid=channel_id)
channel.release_stream()
except:
stream = Stream.objects.get(stream_hash=channel_id)
stream.release_stream()
if not self.redis_client:
return 0
try:
# Define key patterns to scan for
patterns = [
f"ts_proxy:channel:{channel_id}:*", # All channel keys
RedisKeys.events_channel(channel_id) # Event channel
]
total_deleted = 0
for pattern in patterns:
cursor = 0
while True:
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
if keys:
self.redis_client.delete(*keys)
total_deleted += len(keys)
# Exit when cursor returns to 0
if cursor == 0:
break
logger.info(f"Cleaned up {total_deleted} Redis keys for channel {channel_id}")
return total_deleted
except Exception as e:
logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}")
return 0
def refresh_channel_registry(self):
"""Refresh TTL for active channels using standard keys"""
if not self.redis_client:
return
# Refresh registry entries for channels we own
for channel_id in list(self.stream_buffers.keys()):
# Use standard key pattern
metadata_key = RedisKeys.channel_metadata(channel_id)
# Update activity timestamp in metadata only
self.redis_client.hset(metadata_key, "last_active", str(time.time()))
self.redis_client.expire(metadata_key, 30) # Reset TTL on metadata hash
logger.debug(f"Refreshed metadata TTL for channel {channel_id}")
def update_channel_state(self, channel_id, new_state, additional_fields=None):
"""Update channel state with proper history tracking and logging"""
if not self.redis_client:
return False
try:
metadata_key = RedisKeys.channel_metadata(channel_id)
# Get current state for logging
current_state = None
metadata = self.redis_client.hgetall(metadata_key)
if metadata and b'state' in metadata:
current_state = metadata[b'state'].decode('utf-8')
# Only update if state is actually changing
if current_state == new_state:
logger.debug(f"Channel {channel_id} state unchanged: {current_state}")
return True
# Prepare update data
update_data = {
"state": new_state,
"state_changed_at": str(time.time())
}
# Add optional additional fields
if additional_fields:
update_data.update(additional_fields)
# Update the metadata
self.redis_client.hset(metadata_key, mapping=update_data)
# Log the transition
logger.info(f"Channel {channel_id} state transition: {current_state or 'None'} -> {new_state}")
return True
except Exception as e:
logger.error(f"Error updating channel state: {e}")
return False
def _cleanup_local_resources(self, channel_id):
"""Clean up local resources for a channel without affecting Redis keys"""
try:
# Clean up local objects only
if channel_id in self.stream_managers:
if hasattr(self.stream_managers[channel_id], 'stop'):
self.stream_managers[channel_id].stop()
del self.stream_managers[channel_id]
logger.info(f"Non-owner cleanup: Removed stream manager for channel {channel_id}")
if channel_id in self.stream_buffers:
del self.stream_buffers[channel_id]
logger.info(f"Non-owner cleanup: Removed stream buffer for channel {channel_id}")
if channel_id in self.client_managers:
del self.client_managers[channel_id]
logger.info(f"Non-owner cleanup: Removed client manager for channel {channel_id}")
return True
except Exception as e:
logger.error(f"Error cleaning up local resources: {e}", exc_info=True)
return False