mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
1202 lines
60 KiB
Python
1202 lines
60 KiB
Python
"""
|
|
Transport Stream (TS) Proxy Server
|
|
Handles live TS stream proxying with support for:
|
|
- Stream switching
|
|
- Buffer management
|
|
- Multiple client connections
|
|
- Connection state tracking
|
|
"""
|
|
|
|
import threading
|
|
import logging
|
|
import socket
|
|
import random
|
|
import time
|
|
import sys
|
|
import os
|
|
import json
|
|
import gevent # Add gevent import
|
|
from typing import Dict, Optional, Set
|
|
from apps.proxy.config import TSConfig as Config
|
|
from apps.channels.models import Channel, Stream
|
|
from core.utils import RedisClient
|
|
from redis.exceptions import ConnectionError, TimeoutError
|
|
from .stream_manager import StreamManager
|
|
from .stream_buffer import StreamBuffer
|
|
from .client_manager import ClientManager
|
|
from .redis_keys import RedisKeys
|
|
from .constants import ChannelState, EventType, StreamType
|
|
from .config_helper import ConfigHelper
|
|
from .utils import get_logger
|
|
|
|
logger = get_logger()
|
|
|
|
class ProxyServer:
|
|
"""Manages TS proxy server instance with worker coordination"""
|
|
_instance = None
|
|
|
|
@classmethod
|
|
def get_instance(cls):
|
|
if cls._instance is None:
|
|
from .server import ProxyServer
|
|
from .stream_manager import StreamManager
|
|
from .stream_buffer import StreamBuffer
|
|
from .client_manager import ClientManager
|
|
|
|
cls._instance = ProxyServer()
|
|
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
"""Initialize proxy server with worker identification"""
|
|
self.stream_managers = {}
|
|
self.stream_buffers = {}
|
|
self.client_managers = {}
|
|
|
|
# Generate a unique worker ID
|
|
import socket
|
|
import os
|
|
pid = os.getpid()
|
|
hostname = socket.gethostname()
|
|
self.worker_id = f"{hostname}:{pid}"
|
|
|
|
# Connect to Redis - use dedicated client for proxy
|
|
self.redis_client = None
|
|
self.redis_connection_attempts = 0
|
|
self.redis_max_retries = 3
|
|
self.redis_retry_interval = 5 # seconds
|
|
|
|
try:
|
|
# Use dedicated Redis client for proxy
|
|
self.redis_client = RedisClient.get_client()
|
|
if self.redis_client is not None:
|
|
logger.info(f"Using dedicated Redis client for proxy server")
|
|
logger.info(f"Worker ID: {self.worker_id}")
|
|
else:
|
|
# Fall back to direct connection with retry
|
|
self._setup_redis_connection()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Redis: {e}")
|
|
self.redis_client = None
|
|
|
|
# Start cleanup thread
|
|
self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60)
|
|
self._start_cleanup_thread()
|
|
|
|
# Start event listener for Redis pubsub messages
|
|
self._start_event_listener()
|
|
|
|
def _setup_redis_connection(self):
|
|
"""Setup Redis connection with retry logic"""
|
|
# Try to use get_redis_client utility instead of direct connection
|
|
self.redis_client = RedisClient.get_client(max_retries=self.redis_max_retries,
|
|
retry_interval=self.redis_retry_interval)
|
|
if self.redis_client:
|
|
logger.info(f"Successfully connected to Redis using utility function")
|
|
logger.info(f"Worker ID: {self.worker_id}")
|
|
else:
|
|
logger.error(f"Failed to connect to Redis after {self.redis_max_retries} attempts")
|
|
|
|
def _execute_redis_command(self, command_func, *args, **kwargs):
|
|
"""Execute Redis command with error handling and reconnection logic"""
|
|
if not self.redis_client:
|
|
return None
|
|
|
|
try:
|
|
return command_func(*args, **kwargs)
|
|
except (ConnectionError, TimeoutError) as e:
|
|
logger.warning(f"Redis connection lost: {e}. Attempting to reconnect...")
|
|
try:
|
|
# Try to reconnect
|
|
self.redis_connection_attempts = 0
|
|
self._setup_redis_connection()
|
|
if self.redis_client:
|
|
# Retry the command once
|
|
return command_func(*args, **kwargs)
|
|
except Exception as reconnect_error:
|
|
logger.error(f"Failed to reconnect to Redis: {reconnect_error}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Redis command error: {e}")
|
|
return None
|
|
|
|
def _start_event_listener(self):
|
|
"""Listen for events from other workers"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
def event_listener():
|
|
retry_count = 0
|
|
max_retries = 10
|
|
base_retry_delay = 1 # Start with 1 second delay
|
|
max_retry_delay = 30 # Cap at 30 seconds
|
|
|
|
while True:
|
|
try:
|
|
# Use dedicated PubSub client for event listener
|
|
pubsub_client = RedisClient.get_pubsub_client()
|
|
if pubsub_client:
|
|
logger.info("Using dedicated Redis PubSub client for event listener")
|
|
else:
|
|
# Fall back to creating a dedicated client if utility fails
|
|
logger.warning("Utility function for PubSub client failed, creating direct connection")
|
|
from django.conf import settings
|
|
import redis
|
|
|
|
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
|
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
|
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
|
|
|
pubsub_client = redis.Redis(
|
|
host=redis_host,
|
|
port=redis_port,
|
|
db=redis_db,
|
|
socket_timeout=60,
|
|
socket_connect_timeout=10,
|
|
socket_keepalive=True,
|
|
health_check_interval=30
|
|
)
|
|
logger.info("Created fallback Redis PubSub client for event listener")
|
|
|
|
# Test connection before subscribing
|
|
pubsub_client.ping()
|
|
|
|
# Create a pubsub instance from the client
|
|
pubsub = pubsub_client.pubsub()
|
|
pubsub.psubscribe("ts_proxy:events:*")
|
|
|
|
logger.info(f"Started Redis event listener for client activity")
|
|
|
|
# Reset retry count on successful connection
|
|
retry_count = 0
|
|
|
|
for message in pubsub.listen():
|
|
if message["type"] != "pmessage":
|
|
continue
|
|
|
|
try:
|
|
channel = message["channel"].decode("utf-8")
|
|
data = json.loads(message["data"].decode("utf-8"))
|
|
|
|
event_type = data.get("event")
|
|
channel_id = data.get("channel_id")
|
|
|
|
if channel_id and event_type:
|
|
# For owner, update client status immediately
|
|
if self.am_i_owner(channel_id):
|
|
if event_type == EventType.CLIENT_CONNECTED:
|
|
logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}")
|
|
# Reset any disconnect timer
|
|
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
|
self.redis_client.delete(disconnect_key)
|
|
|
|
elif event_type == EventType.CLIENT_DISCONNECTED:
|
|
logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}")
|
|
# Check if any clients remain
|
|
if channel_id in self.client_managers:
|
|
# VERIFY REDIS CLIENT COUNT DIRECTLY
|
|
client_set_key = RedisKeys.clients(channel_id)
|
|
total = self.redis_client.scard(client_set_key) or 0
|
|
|
|
if total == 0:
|
|
logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}")
|
|
# Set the disconnect timer for other workers to see
|
|
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
|
self.redis_client.setex(disconnect_key, 60, str(time.time()))
|
|
|
|
# Get configured shutdown delay or default
|
|
shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0)
|
|
|
|
if shutdown_delay > 0:
|
|
logger.info(f"Waiting {shutdown_delay}s before stopping channel...")
|
|
gevent.sleep(shutdown_delay) # REPLACE: time.sleep(shutdown_delay)
|
|
|
|
# Re-check client count before stopping
|
|
total = self.redis_client.scard(client_set_key) or 0
|
|
if total > 0:
|
|
logger.info(f"New clients connected during shutdown delay - aborting shutdown")
|
|
self.redis_client.delete(disconnect_key)
|
|
return
|
|
|
|
# Stop the channel directly
|
|
self.stop_channel(channel_id)
|
|
|
|
|
|
elif event_type == EventType.STREAM_SWITCH:
|
|
logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}")
|
|
# Handle stream switch request
|
|
new_url = data.get("url")
|
|
user_agent = data.get("user_agent")
|
|
|
|
if new_url and channel_id in self.stream_managers:
|
|
# Update metadata in Redis
|
|
if self.redis_client:
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
self.redis_client.hset(metadata_key, "url", new_url)
|
|
if user_agent:
|
|
self.redis_client.hset(metadata_key, "user_agent", user_agent)
|
|
|
|
# Set switch status
|
|
status_key = RedisKeys.switch_status(channel_id)
|
|
self.redis_client.set(status_key, "switching")
|
|
|
|
# Perform the stream switch
|
|
stream_manager = self.stream_managers[channel_id]
|
|
success = stream_manager.update_url(new_url)
|
|
|
|
if success:
|
|
logger.info(f"Stream switch initiated for channel {channel_id}")
|
|
|
|
# Publish confirmation
|
|
switch_result = {
|
|
"event": EventType.STREAM_SWITCHED, # Use constant instead of string
|
|
"channel_id": channel_id,
|
|
"success": True,
|
|
"url": new_url,
|
|
"timestamp": time.time()
|
|
}
|
|
self.redis_client.publish(
|
|
f"ts_proxy:events:{channel_id}",
|
|
json.dumps(switch_result)
|
|
)
|
|
|
|
# Update status
|
|
if self.redis_client:
|
|
self.redis_client.set(status_key, "switched")
|
|
else:
|
|
logger.error(f"Failed to switch stream for channel {channel_id}")
|
|
|
|
# Publish failure
|
|
switch_result = {
|
|
"event": EventType.STREAM_SWITCHED,
|
|
"channel_id": channel_id,
|
|
"success": False,
|
|
"url": new_url,
|
|
"timestamp": time.time()
|
|
}
|
|
self.redis_client.publish(
|
|
f"ts_proxy:events:{channel_id}",
|
|
json.dumps(switch_result)
|
|
)
|
|
elif event_type == EventType.CHANNEL_STOP:
|
|
logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}")
|
|
# First mark channel as stopping in Redis
|
|
if self.redis_client:
|
|
# Set stopping state in metadata
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
if self.redis_client.exists(metadata_key):
|
|
self.redis_client.hset(metadata_key, mapping={
|
|
"state": ChannelState.STOPPING,
|
|
"state_changed_at": str(time.time())
|
|
})
|
|
|
|
# If we have local resources for this channel, clean them up
|
|
if channel_id in self.stream_buffers or channel_id in self.client_managers:
|
|
# Use existing stop_channel method
|
|
logger.info(f"Stopping local resources for channel {channel_id}")
|
|
self.stop_channel(channel_id)
|
|
|
|
# Acknowledge stop by publishing a response
|
|
stop_response = {
|
|
"event": EventType.CHANNEL_STOPPED,
|
|
"channel_id": channel_id,
|
|
"worker_id": self.worker_id,
|
|
"timestamp": time.time()
|
|
}
|
|
self.redis_client.publish(
|
|
f"ts_proxy:events:{channel_id}",
|
|
json.dumps(stop_response)
|
|
)
|
|
elif event_type == EventType.CLIENT_STOP:
|
|
client_id = data.get("client_id")
|
|
if client_id and channel_id:
|
|
logger.info(f"Received request to stop client {client_id} on channel {channel_id}")
|
|
|
|
# Both remove from client manager AND set a key for the generator to detect
|
|
if channel_id in self.client_managers:
|
|
client_manager = self.client_managers[channel_id]
|
|
if client_id in client_manager.clients:
|
|
client_manager.remove_client(client_id)
|
|
logger.info(f"Removed client {client_id} from client manager")
|
|
|
|
# Set a Redis key for the generator to detect
|
|
if self.redis_client:
|
|
stop_key = RedisKeys.client_stop(channel_id, client_id)
|
|
self.redis_client.setex(stop_key, 30, "true") # 30 second TTL
|
|
logger.info(f"Set stop key for client {client_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing event message: {e}")
|
|
|
|
except (ConnectionError, TimeoutError) as e:
|
|
# Calculate exponential backoff with jitter
|
|
retry_count += 1
|
|
delay = min(base_retry_delay * (2 ** (retry_count - 1)), max_retry_delay)
|
|
# Add some randomness to prevent thundering herd
|
|
jitter = random.uniform(0, 0.5 * delay)
|
|
final_delay = delay + jitter
|
|
|
|
logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})")
|
|
gevent.sleep(final_delay) # REPLACE: time.sleep(final_delay)
|
|
|
|
# Try to clean up the old connection
|
|
try:
|
|
if 'pubsub' in locals():
|
|
pubsub.close()
|
|
if 'pubsub_client' in locals():
|
|
pubsub_client.close()
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in event listener: {e}")
|
|
# Add a short delay to prevent rapid retries on persistent errors
|
|
gevent.sleep(5) # REPLACE: time.sleep(5)
|
|
|
|
thread = threading.Thread(target=event_listener, daemon=True)
|
|
thread.name = "redis-event-listener"
|
|
thread.start()
|
|
|
|
def get_channel_owner(self, channel_id):
|
|
"""Get the worker ID that owns this channel with proper error handling"""
|
|
if not self.redis_client:
|
|
return None
|
|
|
|
try:
|
|
lock_key = RedisKeys.channel_owner(channel_id)
|
|
return self._execute_redis_command(
|
|
lambda: self.redis_client.get(lock_key).decode('utf-8') if self.redis_client.get(lock_key) else None
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting channel owner: {e}")
|
|
return None
|
|
|
|
def am_i_owner(self, channel_id):
|
|
"""Check if this worker is the owner of the channel"""
|
|
owner = self.get_channel_owner(channel_id)
|
|
return owner == self.worker_id
|
|
|
|
def try_acquire_ownership(self, channel_id, ttl=30):
|
|
"""Try to become the owner of this channel using proper locking"""
|
|
if not self.redis_client:
|
|
return True # If no Redis, always become owner
|
|
|
|
try:
|
|
# Create a lock key with proper namespace
|
|
lock_key = RedisKeys.channel_owner(channel_id)
|
|
|
|
# Use Redis SETNX for atomic locking with error handling
|
|
acquired = self._execute_redis_command(
|
|
lambda: self.redis_client.setnx(lock_key, self.worker_id)
|
|
)
|
|
|
|
if acquired is None: # Redis command failed
|
|
logger.warning(f"Redis command failed during ownership acquisition - assuming ownership")
|
|
return True
|
|
|
|
# If acquired, set expiry to prevent orphaned locks
|
|
if acquired:
|
|
self._execute_redis_command(
|
|
lambda: self.redis_client.expire(lock_key, ttl)
|
|
)
|
|
logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}")
|
|
return True
|
|
|
|
# If not acquired, check if we already own it (might be a retry)
|
|
current_owner = self._execute_redis_command(
|
|
lambda: self.redis_client.get(lock_key)
|
|
)
|
|
if current_owner and current_owner.decode('utf-8') == self.worker_id:
|
|
# Refresh TTL
|
|
self._execute_redis_command(
|
|
lambda: self.redis_client.expire(lock_key, ttl)
|
|
)
|
|
logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}")
|
|
return True
|
|
|
|
# Someone else owns it
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error acquiring channel ownership: {e}")
|
|
return False
|
|
|
|
def release_ownership(self, channel_id):
|
|
"""Release ownership of this channel safely"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
try:
|
|
lock_key = RedisKeys.channel_owner(channel_id)
|
|
|
|
# Only delete if we're the current owner to prevent race conditions
|
|
current = self.redis_client.get(lock_key)
|
|
if current and current.decode('utf-8') == self.worker_id:
|
|
self.redis_client.delete(lock_key)
|
|
logger.info(f"Released ownership of channel {channel_id}")
|
|
|
|
# Also ensure channel stopping key is set to signal clients
|
|
stop_key = RedisKeys.channel_stopping(channel_id)
|
|
self.redis_client.setex(stop_key, 30, "true")
|
|
logger.info(f"Set stopping signal for channel {channel_id} clients")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error releasing channel ownership: {e}")
|
|
|
|
def extend_ownership(self, channel_id, ttl=30):
|
|
"""Extend ownership lease with grace period"""
|
|
if not self.redis_client:
|
|
return False
|
|
|
|
try:
|
|
lock_key = RedisKeys.channel_owner(channel_id)
|
|
current = self.redis_client.get(lock_key)
|
|
|
|
# Only extend if we're still the owner
|
|
if current and current.decode('utf-8') == self.worker_id:
|
|
self.redis_client.expire(lock_key, ttl)
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error extending ownership: {e}")
|
|
return False
|
|
|
|
def initialize_channel(self, url, channel_id, user_agent=None, transcode=False, stream_id=None):
|
|
"""Initialize a channel without redundant active key"""
|
|
try:
|
|
# IMPROVED: First check if channel is already being initialized by another process
|
|
if self.redis_client:
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
if self.redis_client.exists(metadata_key):
|
|
metadata = self.redis_client.hgetall(metadata_key)
|
|
if b'state' in metadata:
|
|
state = metadata[b'state'].decode('utf-8')
|
|
active_states = [ChannelState.INITIALIZING, ChannelState.CONNECTING,
|
|
ChannelState.WAITING_FOR_CLIENTS, ChannelState.ACTIVE]
|
|
if state in active_states:
|
|
logger.info(f"Channel {channel_id} already being initialized with state {state}")
|
|
# Create buffer and client manager only if we don't have them
|
|
if channel_id not in self.stream_buffers:
|
|
self.stream_buffers[channel_id] = StreamBuffer(channel_id, redis_client=self.redis_client)
|
|
if channel_id not in self.client_managers:
|
|
self.client_managers[channel_id] = ClientManager(
|
|
channel_id,
|
|
redis_client=self.redis_client,
|
|
worker_id=self.worker_id
|
|
)
|
|
return True
|
|
|
|
# Create buffer and client manager instances
|
|
buffer = StreamBuffer(channel_id, redis_client=self.redis_client)
|
|
client_manager = ClientManager(
|
|
channel_id,
|
|
redis_client=self.redis_client,
|
|
worker_id=self.worker_id
|
|
)
|
|
|
|
# Store in local tracking
|
|
self.stream_buffers[channel_id] = buffer
|
|
self.client_managers[channel_id] = client_manager
|
|
|
|
# IMPROVED: Set initializing state in Redis BEFORE any other operations
|
|
if self.redis_client:
|
|
# Set early initialization state to prevent race conditions
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
initial_metadata = {
|
|
"state": ChannelState.INITIALIZING,
|
|
"init_time": str(time.time()),
|
|
"owner": self.worker_id
|
|
}
|
|
if stream_id:
|
|
initial_metadata["stream_id"] = str(stream_id)
|
|
self.redis_client.hset(metadata_key, mapping=initial_metadata)
|
|
logger.info(f"Set early initializing state for channel {channel_id}")
|
|
|
|
# Get channel URL from Redis if available
|
|
channel_url = url
|
|
channel_user_agent = user_agent
|
|
channel_stream_id = stream_id # Store the stream ID
|
|
|
|
# First check if channel metadata already exists
|
|
existing_metadata = None
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
|
|
if self.redis_client:
|
|
existing_metadata = self.redis_client.hgetall(metadata_key)
|
|
|
|
# If no url was passed, try to get from Redis
|
|
if not url and existing_metadata:
|
|
url_bytes = existing_metadata.get(b'url')
|
|
if url_bytes:
|
|
channel_url = url_bytes.decode('utf-8')
|
|
|
|
ua_bytes = existing_metadata.get(b'user_agent')
|
|
if ua_bytes:
|
|
channel_user_agent = ua_bytes.decode('utf-8')
|
|
|
|
# Get stream ID from metadata if not provided
|
|
if not channel_stream_id and b'stream_id' in existing_metadata:
|
|
try:
|
|
channel_stream_id = int(existing_metadata[b'stream_id'].decode('utf-8'))
|
|
logger.debug(f"Found stream_id {channel_stream_id} in metadata for channel {channel_id}")
|
|
except (ValueError, TypeError) as e:
|
|
logger.debug(f"Could not parse stream_id from metadata: {e}")
|
|
|
|
# Check if channel is already owned
|
|
current_owner = self.get_channel_owner(channel_id)
|
|
|
|
# Exit early if another worker owns the channel
|
|
if current_owner and current_owner != self.worker_id:
|
|
logger.info(f"Channel {channel_id} already owned by worker {current_owner}")
|
|
logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only")
|
|
|
|
# Create buffer but not stream manager
|
|
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
|
|
self.stream_buffers[channel_id] = buffer
|
|
|
|
# Create client manager with channel_id and redis_client
|
|
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
|
|
self.client_managers[channel_id] = client_manager
|
|
|
|
return True
|
|
|
|
# Only continue with full initialization if URL is provided
|
|
# or we can get it from Redis
|
|
if not channel_url:
|
|
logger.error(f"No URL available for channel {channel_id}")
|
|
return False
|
|
|
|
# Try to acquire ownership with Redis locking
|
|
if not self.try_acquire_ownership(channel_id):
|
|
# Another worker just acquired ownership
|
|
logger.info(f"Another worker just acquired ownership of channel {channel_id}")
|
|
|
|
# Create buffer but not stream manager
|
|
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
|
|
self.stream_buffers[channel_id] = buffer
|
|
|
|
# Create client manager with channel_id and redis_client
|
|
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
|
|
self.client_managers[channel_id] = client_manager
|
|
|
|
return True
|
|
|
|
# We now own the channel - ONLY NOW should we set metadata with initializing state
|
|
logger.info(f"Worker {self.worker_id} is now the owner of channel {channel_id}")
|
|
|
|
if self.redis_client:
|
|
# NOW create or update metadata with initializing state
|
|
metadata = {
|
|
"url": channel_url,
|
|
"init_time": str(time.time()),
|
|
"last_active": str(time.time()),
|
|
"owner": self.worker_id,
|
|
"state": ChannelState.INITIALIZING # Use constant instead of string literal
|
|
}
|
|
if channel_user_agent:
|
|
metadata["user_agent"] = channel_user_agent
|
|
|
|
# CRITICAL FIX: Make sure stream_id is always set in metadata and properly logged
|
|
if channel_stream_id:
|
|
metadata["stream_id"] = str(channel_stream_id)
|
|
logger.info(f"Storing stream_id {channel_stream_id} in metadata for channel {channel_id}")
|
|
else:
|
|
logger.warning(f"No stream_id provided for channel {channel_id} during initialization")
|
|
|
|
# Set channel metadata BEFORE creating the StreamManager
|
|
self.redis_client.hset(metadata_key, mapping=metadata)
|
|
self.redis_client.expire(metadata_key, 3600) # Increased TTL from 30 seconds to 1 hour
|
|
|
|
# Verify the stream_id was set correctly in Redis
|
|
stream_id_value = self.redis_client.hget(metadata_key, "stream_id")
|
|
if stream_id_value:
|
|
logger.info(f"Verified stream_id {stream_id_value.decode('utf-8')} is set in Redis for channel {channel_id}")
|
|
else:
|
|
logger.warning(f"Failed to set stream_id in Redis for channel {channel_id}")
|
|
|
|
# Create stream buffer
|
|
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
|
|
logger.debug(f"Created StreamBuffer for channel {channel_id}")
|
|
self.stream_buffers[channel_id] = buffer
|
|
|
|
# Only the owner worker creates the actual stream manager
|
|
stream_manager = StreamManager(
|
|
channel_id,
|
|
channel_url,
|
|
buffer,
|
|
user_agent=channel_user_agent,
|
|
transcode=transcode,
|
|
stream_id=channel_stream_id, # Pass stream ID to the manager
|
|
worker_id=self.worker_id # Pass worker_id explicitly to eliminate circular dependency
|
|
)
|
|
logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}")
|
|
self.stream_managers[channel_id] = stream_manager
|
|
|
|
# Create client manager with channel_id, redis_client AND worker_id
|
|
client_manager = ClientManager(
|
|
channel_id=channel_id,
|
|
redis_client=self.redis_client,
|
|
worker_id=self.worker_id
|
|
)
|
|
self.client_managers[channel_id] = client_manager
|
|
|
|
# Start stream manager thread only for the owner
|
|
thread = threading.Thread(target=stream_manager.run, daemon=True)
|
|
thread.name = f"stream-{channel_id}"
|
|
thread.start()
|
|
logger.info(f"Started stream manager thread for channel {channel_id}")
|
|
|
|
# If we're the owner, we need to set the channel state rather than starting a grace period immediately
|
|
if self.am_i_owner(channel_id):
|
|
self.update_channel_state(channel_id, ChannelState.CONNECTING, {
|
|
"init_time": str(time.time()),
|
|
"owner": self.worker_id
|
|
})
|
|
|
|
# Set connection attempt start time
|
|
attempt_key = RedisKeys.connection_attempt(channel_id)
|
|
self.redis_client.setex(attempt_key, 60, str(time.time()))
|
|
|
|
logger.info(f"Channel {channel_id} in {ChannelState.CONNECTING} state - will start grace period after connection")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing channel {channel_id}: {e}", exc_info=True)
|
|
# Release ownership on failure
|
|
self.release_ownership(channel_id)
|
|
return False
|
|
|
|
def check_if_channel_exists(self, channel_id):
|
|
"""
|
|
Check if a channel exists and is in a valid state.
|
|
Enhanced to detect zombie channels after server restarts.
|
|
"""
|
|
# Check local memory first
|
|
if channel_id in self.stream_managers or channel_id in self.stream_buffers:
|
|
return True
|
|
|
|
# Check Redis using the standard key pattern
|
|
if self.redis_client:
|
|
# Primary check - look for channel metadata
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
|
|
# If metadata exists, validate it's in a healthy state
|
|
if self.redis_client.exists(metadata_key):
|
|
metadata = self.redis_client.hgetall(metadata_key)
|
|
|
|
# Get channel state and owner
|
|
state = metadata.get(b'state', b'unknown').decode('utf-8')
|
|
owner = metadata.get(b'owner', b'').decode('utf-8')
|
|
|
|
# States that indicate the channel is running properly
|
|
valid_states = [ChannelState.ACTIVE, ChannelState.WAITING_FOR_CLIENTS, ChannelState.CONNECTING]
|
|
|
|
# If the channel is in a valid state, check if the owner is still active
|
|
if state in valid_states:
|
|
# Check if owner still exists by checking heartbeat
|
|
owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat"
|
|
owner_alive = self.redis_client.exists(owner_heartbeat_key)
|
|
|
|
if owner_alive:
|
|
return True
|
|
else:
|
|
# This is a zombie channel - owner is gone but metadata still exists
|
|
logger.warning(f"Detected zombie channel {channel_id} - owner {owner} is no longer active")
|
|
self._clean_zombie_channel(channel_id, metadata)
|
|
return False
|
|
elif state in [ChannelState.STOPPING, ChannelState.STOPPED, ChannelState.ERROR]:
|
|
# These states indicate the channel should be reinitialized
|
|
logger.info(f"Channel {channel_id} exists but in terminal state: {state}")
|
|
return False
|
|
else:
|
|
# Unknown or initializing state, check how long it's been in this state
|
|
if b'state_changed_at' in metadata:
|
|
state_changed_at = float(metadata[b'state_changed_at'].decode('utf-8'))
|
|
state_age = time.time() - state_changed_at
|
|
|
|
# If in initializing state for too long, consider it stale
|
|
if state_age > 60: # 60 seconds threshold
|
|
logger.warning(f"Channel {channel_id} stuck in {state} state for {state_age:.1f}s - treating as zombie")
|
|
self._clean_zombie_channel(channel_id, metadata)
|
|
return False
|
|
|
|
# Otherwise assume it's still in progress
|
|
return True
|
|
|
|
# Additional checks if metadata doesn't exist
|
|
additional_keys = [
|
|
RedisKeys.clients(channel_id),
|
|
RedisKeys.buffer_index(channel_id),
|
|
RedisKeys.channel_owner(channel_id)
|
|
]
|
|
|
|
for key in additional_keys:
|
|
if self.redis_client.exists(key):
|
|
# Found orphaned keys without metadata - clean them up
|
|
logger.warning(f"Found orphaned keys for channel {channel_id} without metadata - cleaning up")
|
|
self._clean_redis_keys(channel_id)
|
|
return False
|
|
|
|
return False
|
|
|
|
def _clean_zombie_channel(self, channel_id, metadata=None):
|
|
"""Clean up a zombie channel (channel with Redis keys but no active owner)"""
|
|
try:
|
|
logger.info(f"Cleaning up zombie channel {channel_id}")
|
|
|
|
# If we have metadata, log details for debugging
|
|
if metadata:
|
|
state = metadata.get(b'state', b'unknown').decode('utf-8')
|
|
owner = metadata.get(b'owner', b'unknown').decode('utf-8')
|
|
logger.info(f"Zombie channel details - state: {state}, owner: {owner}")
|
|
|
|
# Clean up Redis keys
|
|
self._clean_redis_keys(channel_id)
|
|
|
|
# Force release resources in the Channel model
|
|
try:
|
|
channel = Channel.objects.get(uuid=channel_id)
|
|
channel.release_stream()
|
|
logger.info(f"Released stream allocation for zombie channel {channel_id}")
|
|
except Exception as e:
|
|
try:
|
|
stream = Stream.objects.get(stream_hash=channel_id)
|
|
stream.release_stream()
|
|
logger.info(f"Released stream allocation for zombie channel {channel_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error releasing stream for zombie channel {channel_id}: {e}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning zombie channel {channel_id}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def stop_channel(self, channel_id):
|
|
"""Stop a channel with proper ownership handling"""
|
|
try:
|
|
logger.info(f"Stopping channel {channel_id}")
|
|
|
|
# First set a stopping key that clients will check
|
|
if self.redis_client:
|
|
stop_key = RedisKeys.channel_stopping(channel_id)
|
|
self.redis_client.setex(stop_key, 10, "true")
|
|
|
|
# Only stop the actual stream manager if we're the owner
|
|
if self.am_i_owner(channel_id):
|
|
logger.info(f"This worker ({self.worker_id}) is the owner - closing provider connection")
|
|
if channel_id in self.stream_managers:
|
|
stream_manager = self.stream_managers[channel_id]
|
|
|
|
# Signal thread to stop and close resources
|
|
if hasattr(stream_manager, 'stop'):
|
|
stream_manager.stop()
|
|
else:
|
|
stream_manager.running = False
|
|
if hasattr(stream_manager, '_close_socket'):
|
|
stream_manager._close_socket()
|
|
|
|
# Wait for stream thread to finish
|
|
stream_thread_name = f"stream-{channel_id}"
|
|
stream_thread = None
|
|
|
|
for thread in threading.enumerate():
|
|
if thread.name == stream_thread_name:
|
|
stream_thread = thread
|
|
break
|
|
|
|
if stream_thread and stream_thread.is_alive():
|
|
logger.info(f"Waiting for stream thread to terminate")
|
|
try:
|
|
# Very short timeout to prevent hanging the app
|
|
stream_thread.join(timeout=2.0)
|
|
if stream_thread.is_alive():
|
|
logger.warning(f"Stream thread did not terminate within timeout")
|
|
except RuntimeError:
|
|
logger.debug(f"Could not join stream thread (may be current thread)")
|
|
|
|
# Release ownership
|
|
self.release_ownership(channel_id)
|
|
logger.info(f"Released ownership of channel {channel_id}")
|
|
|
|
# Always clean up local resources - WITH SAFE CHECKS
|
|
if channel_id in self.stream_managers:
|
|
del self.stream_managers[channel_id]
|
|
logger.info(f"Removed stream manager for channel {channel_id}")
|
|
|
|
# Stop buffer and ensure all its timers are cancelled - SAFE CHECK HERE
|
|
if channel_id in self.stream_buffers:
|
|
buffer = self.stream_buffers[channel_id]
|
|
# Call stop on buffer to properly shut it down
|
|
if hasattr(buffer, 'stop'):
|
|
try:
|
|
buffer.stop()
|
|
logger.debug(f"Buffer for channel {channel_id} properly stopped")
|
|
except Exception as e:
|
|
logger.error(f"Error stopping buffer: {e}")
|
|
|
|
# Save reference and check again before deleting
|
|
try:
|
|
if channel_id in self.stream_buffers: # Check again to prevent race conditions
|
|
del self.stream_buffers[channel_id]
|
|
logger.info(f"Removed stream buffer for channel {channel_id}")
|
|
except KeyError:
|
|
logger.debug(f"Buffer for channel {channel_id} already removed")
|
|
|
|
# Clean up client manager - SAFE CHECK HERE TOO
|
|
if channel_id in self.client_managers:
|
|
try:
|
|
del self.client_managers[channel_id]
|
|
logger.info(f"Removed client manager for channel {channel_id}")
|
|
except KeyError:
|
|
logger.debug(f"Client manager for channel {channel_id} already removed")
|
|
|
|
# Clean up Redis keys
|
|
self._clean_redis_keys(channel_id)
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error stopping channel {channel_id}: {e}")
|
|
return False
|
|
|
|
def check_inactive_channels(self):
|
|
"""Check for inactive channels (no clients) and stop them"""
|
|
channels_to_stop = []
|
|
|
|
for channel_id, client_manager in self.client_managers.items():
|
|
if client_manager.get_client_count() == 0:
|
|
channels_to_stop.append(channel_id)
|
|
|
|
for channel_id in channels_to_stop:
|
|
logger.info(f"Auto-stopping inactive channel {channel_id}")
|
|
self.stop_channel(channel_id)
|
|
|
|
def _cleanup_channel(self, channel_id: str) -> None:
|
|
"""Remove channel resources"""
|
|
# Removed reference to non-existent fetch_threads collection
|
|
for collection in [self.stream_managers, self.stream_buffers, self.client_managers]:
|
|
collection.pop(channel_id, None)
|
|
|
|
def shutdown(self) -> None:
|
|
"""Stop all channels and cleanup"""
|
|
for channel_id in list(self.stream_managers.keys()):
|
|
self.stop_channel(channel_id)
|
|
|
|
def _start_cleanup_thread(self):
|
|
"""Start background thread to maintain ownership and clean up resources"""
|
|
def cleanup_task():
|
|
while True:
|
|
try:
|
|
# Send worker heartbeat first
|
|
if self.redis_client:
|
|
worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat"
|
|
self._execute_redis_command(
|
|
lambda: self.redis_client.setex(worker_heartbeat_key, 30, str(time.time()))
|
|
)
|
|
|
|
# Refresh channel registry
|
|
self.refresh_channel_registry()
|
|
|
|
# Create a unified list of all channels we have locally
|
|
all_local_channels = set(self.stream_managers.keys()) | set(self.client_managers.keys())
|
|
|
|
# Single loop through all channels - process each exactly once
|
|
for channel_id in list(all_local_channels):
|
|
if self.am_i_owner(channel_id):
|
|
# === OWNER CHANNEL HANDLING ===
|
|
# Extend ownership lease
|
|
self.extend_ownership(channel_id)
|
|
|
|
# Get channel state from metadata hash
|
|
channel_state = "unknown"
|
|
if self.redis_client:
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
metadata = self.redis_client.hgetall(metadata_key)
|
|
if metadata and b'state' in metadata:
|
|
channel_state = metadata[b'state'].decode('utf-8')
|
|
|
|
# Check if channel has any clients left
|
|
total_clients = 0
|
|
if channel_id in self.client_managers:
|
|
client_manager = self.client_managers[channel_id]
|
|
total_clients = client_manager.get_total_client_count()
|
|
|
|
# Log client count periodically
|
|
if time.time() % 30 < 1: # Every ~30 seconds
|
|
logger.info(f"Channel {channel_id} has {total_clients} clients, state: {channel_state}")
|
|
|
|
# If in connecting or waiting_for_clients state, check grace period
|
|
if channel_state in [ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS]:
|
|
# Get connection ready time from metadata
|
|
connection_ready_time = None
|
|
if metadata and b'connection_ready_time' in metadata:
|
|
try:
|
|
connection_ready_time = float(metadata[b'connection_ready_time'].decode('utf-8'))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# If still connecting, give it more time
|
|
if channel_state == ChannelState.CONNECTING:
|
|
logger.debug(f"Channel {channel_id} still connecting - not checking for clients yet")
|
|
continue
|
|
|
|
# If waiting for clients, check grace period
|
|
if connection_ready_time:
|
|
grace_period = ConfigHelper.get('CHANNEL_INIT_GRACE_PERIOD', 20)
|
|
time_since_ready = time.time() - connection_ready_time
|
|
|
|
# Add this debug log
|
|
logger.debug(f"GRACE PERIOD CHECK: Channel {channel_id} in {channel_state} state, "
|
|
f"time_since_ready={time_since_ready:.1f}s, grace_period={grace_period}s, "
|
|
f"total_clients={total_clients}")
|
|
|
|
if time_since_ready <= grace_period:
|
|
# Still within grace period
|
|
logger.debug(f"Channel {channel_id} in grace period - {time_since_ready:.1f}s of {grace_period}s elapsed")
|
|
continue
|
|
elif total_clients == 0:
|
|
# Grace period expired with no clients
|
|
logger.info(f"Grace period expired ({time_since_ready:.1f}s > {grace_period}s) with no clients - stopping channel {channel_id}")
|
|
self.stop_channel(channel_id)
|
|
else:
|
|
# Grace period expired but we have clients - mark channel as active
|
|
logger.info(f"Grace period expired with {total_clients} clients - marking channel {channel_id} as active")
|
|
old_state = "unknown"
|
|
if metadata and b'state' in metadata:
|
|
old_state = metadata[b'state'].decode('utf-8')
|
|
if self.update_channel_state(channel_id, ChannelState.ACTIVE, {
|
|
"grace_period_ended_at": str(time.time()),
|
|
"clients_at_activation": str(total_clients)
|
|
}):
|
|
logger.info(f"Channel {channel_id} activated with {total_clients} clients after grace period")
|
|
# If active and no clients, start normal shutdown procedure
|
|
elif channel_state not in [ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS] and total_clients == 0:
|
|
# Check if there's a pending no-clients timeout
|
|
disconnect_key = RedisKeys.last_client_disconnect(channel_id)
|
|
disconnect_time = None
|
|
|
|
if self.redis_client:
|
|
disconnect_value = self.redis_client.get(disconnect_key)
|
|
if disconnect_value:
|
|
try:
|
|
disconnect_time = float(disconnect_value.decode('utf-8'))
|
|
except (ValueError, TypeError) as e:
|
|
logger.error(f"Invalid disconnect time for channel {channel_id}: {e}")
|
|
|
|
current_time = time.time()
|
|
|
|
if not disconnect_time:
|
|
# First time seeing zero clients, set timestamp
|
|
if self.redis_client:
|
|
self.redis_client.setex(disconnect_key, 60, str(current_time))
|
|
logger.warning(f"No clients detected for channel {channel_id}, starting shutdown timer")
|
|
elif current_time - disconnect_time > ConfigHelper.channel_shutdown_delay():
|
|
# We've had no clients for the shutdown delay period
|
|
logger.warning(f"No clients for {current_time - disconnect_time:.1f}s, stopping channel {channel_id}")
|
|
self.stop_channel(channel_id)
|
|
else:
|
|
# Still in shutdown delay period
|
|
logger.debug(f"Channel {channel_id} shutdown timer: "
|
|
f"{current_time - disconnect_time:.1f}s of "
|
|
f"{ConfigHelper.channel_shutdown_delay()}s elapsed")
|
|
else:
|
|
# There are clients or we're still connecting - clear any disconnect timestamp
|
|
if self.redis_client:
|
|
self.redis_client.delete(f"ts_proxy:channel:{channel_id}:last_client_disconnect_time")
|
|
|
|
else:
|
|
# === NON-OWNER CHANNEL HANDLING ===
|
|
# For channels we don't own, check if they've been stopped/cleaned up in Redis
|
|
if self.redis_client:
|
|
# Method 1: Check for stopping key
|
|
stop_key = RedisKeys.channel_stopping(channel_id)
|
|
if self.redis_client.exists(stop_key):
|
|
logger.debug(f"Non-owner cleanup: Channel {channel_id} has stopping flag in Redis, cleaning up local resources")
|
|
self._cleanup_local_resources(channel_id)
|
|
continue
|
|
|
|
# Method 2: Check if owner still exists
|
|
owner_key = RedisKeys.channel_owner(channel_id)
|
|
if not self.redis_client.exists(owner_key):
|
|
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no owner in Redis, cleaning up local resources")
|
|
self._cleanup_local_resources(channel_id)
|
|
continue
|
|
|
|
# Method 3: Check if metadata still exists
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
if not self.redis_client.exists(metadata_key):
|
|
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no metadata in Redis, cleaning up local resources")
|
|
self._cleanup_local_resources(channel_id)
|
|
continue
|
|
|
|
# Check for local client count - if zero, clean up our local resources
|
|
if self.client_managers[channel_id].get_client_count() == 0:
|
|
# We're not the owner, and we have no local clients - clean up our resources
|
|
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no local clients, cleaning up local resources")
|
|
self._cleanup_local_resources(channel_id)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in cleanup thread: {e}", exc_info=True)
|
|
|
|
gevent.sleep(ConfigHelper.cleanup_check_interval()) # REPLACE: time.sleep(ConfigHelper.cleanup_check_interval())
|
|
|
|
thread = threading.Thread(target=cleanup_task, daemon=True)
|
|
thread.name = "ts-proxy-cleanup"
|
|
thread.start()
|
|
logger.info(f"Started TS proxy cleanup thread (interval: {ConfigHelper.cleanup_check_interval()}s)")
|
|
|
|
def _check_orphaned_channels(self):
|
|
"""Check for orphaned channels in Redis (owner worker crashed)"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
try:
|
|
# Get all active channel keys
|
|
channel_pattern = "ts_proxy:channel:*:metadata"
|
|
channel_keys = self.redis_client.keys(channel_pattern)
|
|
|
|
for key in channel_keys:
|
|
try:
|
|
channel_id = key.decode('utf-8').split(':')[2]
|
|
|
|
# Skip channels we already have locally
|
|
if channel_id in self.stream_buffers:
|
|
continue
|
|
|
|
# Check if this channel has an owner
|
|
owner = self.get_channel_owner(channel_id)
|
|
|
|
if not owner:
|
|
# Check if there are any clients
|
|
client_set_key = RedisKeys.clients(channel_id)
|
|
client_count = self.redis_client.scard(client_set_key) or 0
|
|
|
|
if client_count > 0:
|
|
# Orphaned channel with clients - we could take ownership
|
|
logger.info(f"Found orphaned channel {channel_id} with {client_count} clients")
|
|
else:
|
|
# Orphaned channel with no clients - clean it up
|
|
logger.info(f"Cleaning up orphaned channel {channel_id}")
|
|
self._clean_redis_keys(channel_id)
|
|
except Exception as e:
|
|
logger.error(f"Error processing channel key {key}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking orphaned channels: {e}")
|
|
|
|
def _clean_redis_keys(self, channel_id):
|
|
"""Clean up all Redis keys for a channel more efficiently"""
|
|
# Release the channel, stream, and profile keys from the channel
|
|
try:
|
|
channel = Channel.objects.get(uuid=channel_id)
|
|
channel.release_stream()
|
|
except:
|
|
stream = Stream.objects.get(stream_hash=channel_id)
|
|
stream.release_stream()
|
|
|
|
if not self.redis_client:
|
|
return 0
|
|
|
|
try:
|
|
# Define key patterns to scan for
|
|
patterns = [
|
|
f"ts_proxy:channel:{channel_id}:*", # All channel keys
|
|
RedisKeys.events_channel(channel_id) # Event channel
|
|
]
|
|
|
|
total_deleted = 0
|
|
|
|
for pattern in patterns:
|
|
cursor = 0
|
|
while True:
|
|
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
|
|
if keys:
|
|
self.redis_client.delete(*keys)
|
|
total_deleted += len(keys)
|
|
|
|
# Exit when cursor returns to 0
|
|
if cursor == 0:
|
|
break
|
|
|
|
logger.info(f"Cleaned up {total_deleted} Redis keys for channel {channel_id}")
|
|
return total_deleted
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}")
|
|
return 0
|
|
|
|
def refresh_channel_registry(self):
|
|
"""Refresh TTL for active channels using standard keys"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
# Refresh registry entries for channels we own
|
|
for channel_id in list(self.stream_buffers.keys()):
|
|
# Use standard key pattern
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
|
|
# Update activity timestamp in metadata only
|
|
self.redis_client.hset(metadata_key, "last_active", str(time.time()))
|
|
self.redis_client.expire(metadata_key, 30) # Reset TTL on metadata hash
|
|
logger.debug(f"Refreshed metadata TTL for channel {channel_id}")
|
|
|
|
def update_channel_state(self, channel_id, new_state, additional_fields=None):
|
|
"""Update channel state with proper history tracking and logging"""
|
|
if not self.redis_client:
|
|
return False
|
|
|
|
try:
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
|
|
# Get current state for logging
|
|
current_state = None
|
|
metadata = self.redis_client.hgetall(metadata_key)
|
|
if metadata and b'state' in metadata:
|
|
current_state = metadata[b'state'].decode('utf-8')
|
|
|
|
# Only update if state is actually changing
|
|
if current_state == new_state:
|
|
logger.debug(f"Channel {channel_id} state unchanged: {current_state}")
|
|
return True
|
|
|
|
# Prepare update data
|
|
update_data = {
|
|
"state": new_state,
|
|
"state_changed_at": str(time.time())
|
|
}
|
|
|
|
# Add optional additional fields
|
|
if additional_fields:
|
|
update_data.update(additional_fields)
|
|
|
|
# Update the metadata
|
|
self.redis_client.hset(metadata_key, mapping=update_data)
|
|
|
|
# Log the transition
|
|
logger.info(f"Channel {channel_id} state transition: {current_state or 'None'} -> {new_state}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error updating channel state: {e}")
|
|
return False
|
|
|
|
def _cleanup_local_resources(self, channel_id):
|
|
"""Clean up local resources for a channel without affecting Redis keys"""
|
|
try:
|
|
# Clean up local objects only
|
|
if channel_id in self.stream_managers:
|
|
if hasattr(self.stream_managers[channel_id], 'stop'):
|
|
self.stream_managers[channel_id].stop()
|
|
del self.stream_managers[channel_id]
|
|
logger.info(f"Non-owner cleanup: Removed stream manager for channel {channel_id}")
|
|
|
|
if channel_id in self.stream_buffers:
|
|
del self.stream_buffers[channel_id]
|
|
logger.info(f"Non-owner cleanup: Removed stream buffer for channel {channel_id}")
|
|
|
|
if channel_id in self.client_managers:
|
|
del self.client_managers[channel_id]
|
|
logger.info(f"Non-owner cleanup: Removed client manager for channel {channel_id}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up local resources: {e}", exc_info=True)
|
|
return False
|