""" Transport Stream (TS) Proxy Server Handles live TS stream proxying with support for: - Stream switching - Buffer management - Multiple client connections - Connection state tracking """ import threading import logging import socket import random import time import sys import os import json import gevent # Add gevent import from typing import Dict, Optional, Set from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel, Stream from core.utils import RedisClient from redis.exceptions import ConnectionError, TimeoutError from .stream_manager import StreamManager from .stream_buffer import StreamBuffer from .client_manager import ClientManager from .redis_keys import RedisKeys from .constants import ChannelState, EventType, StreamType from .config_helper import ConfigHelper from .utils import get_logger logger = get_logger() class ProxyServer: """Manages TS proxy server instance with worker coordination""" _instance = None @classmethod def get_instance(cls): if cls._instance is None: from .server import ProxyServer from .stream_manager import StreamManager from .stream_buffer import StreamBuffer from .client_manager import ClientManager cls._instance = ProxyServer() return cls._instance def __init__(self): """Initialize proxy server with worker identification""" self.stream_managers = {} self.stream_buffers = {} self.client_managers = {} # Generate a unique worker ID import socket import os pid = os.getpid() hostname = socket.gethostname() self.worker_id = f"{hostname}:{pid}" # Connect to Redis - use dedicated client for proxy self.redis_client = None self.redis_connection_attempts = 0 self.redis_max_retries = 3 self.redis_retry_interval = 5 # seconds try: # Use dedicated Redis client for proxy self.redis_client = RedisClient.get_client() if self.redis_client is not None: logger.info(f"Using dedicated Redis client for proxy server") logger.info(f"Worker ID: {self.worker_id}") else: # Fall back to direct connection with retry self._setup_redis_connection() except Exception as e: logger.error(f"Failed to initialize Redis: {e}") self.redis_client = None # Start cleanup thread self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60) self._start_cleanup_thread() # Start event listener for Redis pubsub messages self._start_event_listener() def _setup_redis_connection(self): """Setup Redis connection with retry logic""" # Try to use get_redis_client utility instead of direct connection self.redis_client = RedisClient.get_client(max_retries=self.redis_max_retries, retry_interval=self.redis_retry_interval) if self.redis_client: logger.info(f"Successfully connected to Redis using utility function") logger.info(f"Worker ID: {self.worker_id}") else: logger.error(f"Failed to connect to Redis after {self.redis_max_retries} attempts") def _execute_redis_command(self, command_func, *args, **kwargs): """Execute Redis command with error handling and reconnection logic""" if not self.redis_client: return None try: return command_func(*args, **kwargs) except (ConnectionError, TimeoutError) as e: logger.warning(f"Redis connection lost: {e}. Attempting to reconnect...") try: # Try to reconnect self.redis_connection_attempts = 0 self._setup_redis_connection() if self.redis_client: # Retry the command once return command_func(*args, **kwargs) except Exception as reconnect_error: logger.error(f"Failed to reconnect to Redis: {reconnect_error}") return None except Exception as e: logger.error(f"Redis command error: {e}") return None def _start_event_listener(self): """Listen for events from other workers""" if not self.redis_client: return def event_listener(): retry_count = 0 max_retries = 10 base_retry_delay = 1 # Start with 1 second delay max_retry_delay = 30 # Cap at 30 seconds pubsub_client = None pubsub = None while True: try: # Use dedicated PubSub client for event listener pubsub_client = RedisClient.get_pubsub_client() if pubsub_client: logger.info("Using dedicated Redis PubSub client for event listener") else: # Fall back to creating a dedicated client if utility fails logger.warning("Utility function for PubSub client failed, creating direct connection") from django.conf import settings import redis redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) pubsub_client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, socket_timeout=60, socket_connect_timeout=10, socket_keepalive=True, health_check_interval=30, decode_responses=True ) logger.info("Created fallback Redis PubSub client for event listener") # Test connection before subscribing pubsub_client.ping() # Create a pubsub instance from the client pubsub = pubsub_client.pubsub() pubsub.psubscribe("ts_proxy:events:*") logger.info(f"Started Redis event listener for client activity") # Reset retry count on successful connection retry_count = 0 for message in pubsub.listen(): if message["type"] != "pmessage": continue try: channel = message["channel"] data = json.loads(message["data"]) event_type = data.get("event") channel_id = data.get("channel_id") if channel_id and event_type: # For owner, update client status immediately if self.am_i_owner(channel_id): if event_type == EventType.CLIENT_CONNECTED: logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}") # Reset any disconnect timer disconnect_key = RedisKeys.last_client_disconnect(channel_id) self.redis_client.delete(disconnect_key) elif event_type == EventType.CLIENT_DISCONNECTED: logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}") # Check if any clients remain if channel_id in self.client_managers: # VERIFY REDIS CLIENT COUNT DIRECTLY client_set_key = RedisKeys.clients(channel_id) total = self.redis_client.scard(client_set_key) or 0 if total == 0: logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}") # Set the disconnect timer for other workers to see disconnect_key = RedisKeys.last_client_disconnect(channel_id) self.redis_client.setex(disconnect_key, 60, str(time.time())) # Get configured shutdown delay or default shutdown_delay = ConfigHelper.channel_shutdown_delay() if shutdown_delay > 0: logger.info(f"Waiting {shutdown_delay}s before stopping channel...") gevent.sleep(shutdown_delay) # REPLACE: time.sleep(shutdown_delay) # Re-check client count before stopping total = self.redis_client.scard(client_set_key) or 0 if total > 0: logger.info(f"New clients connected during shutdown delay - aborting shutdown") self.redis_client.delete(disconnect_key) return # Stop the channel directly self.stop_channel(channel_id) elif event_type == EventType.STREAM_SWITCH: logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}") # Handle stream switch request new_url = data.get("url") user_agent = data.get("user_agent") if new_url and channel_id in self.stream_managers: # Update metadata in Redis if self.redis_client: metadata_key = RedisKeys.channel_metadata(channel_id) self.redis_client.hset(metadata_key, "url", new_url) if user_agent: self.redis_client.hset(metadata_key, "user_agent", user_agent) # Set switch status status_key = RedisKeys.switch_status(channel_id) self.redis_client.set(status_key, "switching") # Perform the stream switch stream_manager = self.stream_managers[channel_id] success = stream_manager.update_url(new_url) if success: logger.info(f"Stream switch initiated for channel {channel_id}") # Publish confirmation switch_result = { "event": EventType.STREAM_SWITCHED, # Use constant instead of string "channel_id": channel_id, "success": True, "url": new_url, "timestamp": time.time() } self.redis_client.publish( f"ts_proxy:events:{channel_id}", json.dumps(switch_result) ) # Update status if self.redis_client: self.redis_client.set(status_key, "switched") else: logger.error(f"Failed to switch stream for channel {channel_id}") # Publish failure switch_result = { "event": EventType.STREAM_SWITCHED, "channel_id": channel_id, "success": False, "url": new_url, "timestamp": time.time() } self.redis_client.publish( f"ts_proxy:events:{channel_id}", json.dumps(switch_result) ) elif event_type == EventType.CHANNEL_STOP: logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}") # First mark channel as stopping in Redis if self.redis_client: # Set stopping state in metadata metadata_key = RedisKeys.channel_metadata(channel_id) if self.redis_client.exists(metadata_key): self.redis_client.hset(metadata_key, mapping={ "state": ChannelState.STOPPING, "state_changed_at": str(time.time()) }) # If we have local resources for this channel, clean them up if channel_id in self.stream_buffers or channel_id in self.client_managers: # Use existing stop_channel method logger.info(f"Stopping local resources for channel {channel_id}") self.stop_channel(channel_id) # Acknowledge stop by publishing a response stop_response = { "event": EventType.CHANNEL_STOPPED, "channel_id": channel_id, "worker_id": self.worker_id, "timestamp": time.time() } self.redis_client.publish( f"ts_proxy:events:{channel_id}", json.dumps(stop_response) ) elif event_type == EventType.CLIENT_STOP: client_id = data.get("client_id") if client_id and channel_id: logger.info(f"Received request to stop client {client_id} on channel {channel_id}") # Both remove from client manager AND set a key for the generator to detect if channel_id in self.client_managers: client_manager = self.client_managers[channel_id] if client_id in client_manager.clients: client_manager.remove_client(client_id) logger.info(f"Removed client {client_id} from client manager") # Set a Redis key for the generator to detect if self.redis_client: stop_key = RedisKeys.client_stop(channel_id, client_id) self.redis_client.setex(stop_key, 30, "true") # 30 second TTL logger.info(f"Set stop key for client {client_id}") except Exception as e: logger.error(f"Error processing event message: {e}") except (ConnectionError, TimeoutError) as e: # Calculate exponential backoff with jitter retry_count += 1 delay = min(base_retry_delay * (2 ** (retry_count - 1)), max_retry_delay) # Add some randomness to prevent thundering herd jitter = random.uniform(0, 0.5 * delay) final_delay = delay + jitter logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})") gevent.sleep(final_delay) # REPLACE: time.sleep(final_delay) except Exception as e: logger.error(f"Error in event listener: {e}") # Add a short delay to prevent rapid retries on persistent errors gevent.sleep(5) # REPLACE: time.sleep(5) finally: # Always clean up PubSub connections in all error paths try: if pubsub: pubsub.close() pubsub = None except Exception as e: logger.debug(f"Error closing pubsub: {e}") try: if pubsub_client: pubsub_client.close() pubsub_client = None except Exception as e: logger.debug(f"Error closing pubsub_client: {e}") thread = threading.Thread(target=event_listener, daemon=True) thread.name = "redis-event-listener" thread.start() def get_channel_owner(self, channel_id): """Get the worker ID that owns this channel with proper error handling""" if not self.redis_client: return None try: lock_key = RedisKeys.channel_owner(channel_id) return self._execute_redis_command( lambda: self.redis_client.get(lock_key) if self.redis_client.get(lock_key) else None ) except Exception as e: logger.error(f"Error getting channel owner: {e}") return None def am_i_owner(self, channel_id): """Check if this worker is the owner of the channel""" owner = self.get_channel_owner(channel_id) return owner == self.worker_id def try_acquire_ownership(self, channel_id, ttl=30): """Try to become the owner of this channel using proper locking""" if not self.redis_client: return True # If no Redis, always become owner try: # Create a lock key with proper namespace lock_key = RedisKeys.channel_owner(channel_id) # Use Redis SETNX for atomic locking with error handling acquired = self._execute_redis_command( lambda: self.redis_client.setnx(lock_key, self.worker_id) ) if acquired is None: # Redis command failed logger.warning(f"Redis command failed during ownership acquisition - assuming ownership") return True # If acquired, set expiry to prevent orphaned locks if acquired: self._execute_redis_command( lambda: self.redis_client.expire(lock_key, ttl) ) logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}") return True # If not acquired, check if we already own it (might be a retry) current_owner = self._execute_redis_command( lambda: self.redis_client.get(lock_key) ) if current_owner and current_owner == self.worker_id: # Refresh TTL self._execute_redis_command( lambda: self.redis_client.expire(lock_key, ttl) ) logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}") return True # Someone else owns it return False except Exception as e: logger.error(f"Error acquiring channel ownership: {e}") return False def release_ownership(self, channel_id): """Release ownership of this channel safely""" if not self.redis_client: return try: lock_key = RedisKeys.channel_owner(channel_id) # Only delete if we're the current owner to prevent race conditions current = self.redis_client.get(lock_key) if current and current == self.worker_id: self.redis_client.delete(lock_key) logger.info(f"Released ownership of channel {channel_id}") # Also ensure channel stopping key is set to signal clients stop_key = RedisKeys.channel_stopping(channel_id) self.redis_client.setex(stop_key, 30, "true") logger.info(f"Set stopping signal for channel {channel_id} clients") except Exception as e: logger.error(f"Error releasing channel ownership: {e}") def extend_ownership(self, channel_id, ttl=30): """Extend ownership lease with grace period""" if not self.redis_client: return False try: lock_key = RedisKeys.channel_owner(channel_id) current = self.redis_client.get(lock_key) # Only extend if we're still the owner if current and current == self.worker_id: self.redis_client.expire(lock_key, ttl) return True return False except Exception as e: logger.error(f"Error extending ownership: {e}") return False def initialize_channel(self, url, channel_id, user_agent=None, transcode=False, stream_id=None): """Initialize a channel without redundant active key""" try: # IMPROVED: First check if channel is already being initialized by another process if self.redis_client: metadata_key = RedisKeys.channel_metadata(channel_id) if self.redis_client.exists(metadata_key): metadata = self.redis_client.hgetall(metadata_key) if 'state' in metadata: state = metadata['state'] active_states = [ChannelState.INITIALIZING, ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS, ChannelState.ACTIVE, ChannelState.BUFFERING] if state in active_states: logger.info(f"Channel {channel_id} already being initialized with state {state}") # Create buffer and client manager only if we don't have them if channel_id not in self.stream_buffers: self.stream_buffers[channel_id] = StreamBuffer(channel_id, redis_client=RedisClient.get_buffer()) if channel_id not in self.client_managers: self.client_managers[channel_id] = ClientManager( channel_id, redis_client=self.redis_client, worker_id=self.worker_id ) return True # Create buffer and client manager instances (or reuse if they exist) if channel_id not in self.stream_buffers: buffer = StreamBuffer(channel_id, redis_client=RedisClient.get_buffer()) self.stream_buffers[channel_id] = buffer if channel_id not in self.client_managers: client_manager = ClientManager( channel_id, redis_client=self.redis_client, worker_id=self.worker_id ) self.client_managers[channel_id] = client_manager # IMPROVED: Set initializing state in Redis BEFORE any other operations if self.redis_client: # Set early initialization state to prevent race conditions metadata_key = RedisKeys.channel_metadata(channel_id) initial_metadata = { "state": ChannelState.INITIALIZING, "init_time": str(time.time()), "owner": self.worker_id } if stream_id: initial_metadata["stream_id"] = str(stream_id) self.redis_client.hset(metadata_key, mapping=initial_metadata) logger.info(f"Set early initializing state for channel {channel_id}") # Get channel URL from Redis if available channel_url = url channel_user_agent = user_agent channel_stream_id = stream_id # Store the stream ID # First check if channel metadata already exists existing_metadata = None metadata_key = RedisKeys.channel_metadata(channel_id) if self.redis_client: existing_metadata = self.redis_client.hgetall(metadata_key) # If no url was passed, try to get from Redis if not url and existing_metadata: url_bytes = existing_metadata.get('url') if url_bytes: channel_url = url_bytes ua_bytes = existing_metadata.get('user_agent') if ua_bytes: channel_user_agent = ua_bytes # Get stream ID from metadata if not provided if not channel_stream_id and 'stream_id' in existing_metadata: try: channel_stream_id = int(existing_metadata['stream_id']) logger.debug(f"Found stream_id {channel_stream_id} in metadata for channel {channel_id}") except (ValueError, TypeError) as e: logger.debug(f"Could not parse stream_id from metadata: {e}") # Check if channel is already owned current_owner = self.get_channel_owner(channel_id) # Exit early if another worker owns the channel if current_owner and current_owner != self.worker_id: logger.info(f"Channel {channel_id} already owned by worker {current_owner}") logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only") # Create buffer but not stream manager (only if not already exists) if channel_id not in self.stream_buffers: buffer = StreamBuffer(channel_id=channel_id, redis_client=RedisClient.get_buffer()) self.stream_buffers[channel_id] = buffer # Create client manager with channel_id and redis_client (only if not already exists) if channel_id not in self.client_managers: client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) self.client_managers[channel_id] = client_manager return True # Only continue with full initialization if URL is provided # or we can get it from Redis if not channel_url: logger.error(f"No URL available for channel {channel_id}") return False # Try to acquire ownership with Redis locking if not self.try_acquire_ownership(channel_id): # Another worker just acquired ownership logger.info(f"Another worker just acquired ownership of channel {channel_id}") # Create buffer but not stream manager (only if not already exists) if channel_id not in self.stream_buffers: buffer = StreamBuffer(channel_id=channel_id, redis_client=RedisClient.get_buffer()) self.stream_buffers[channel_id] = buffer # Create client manager with channel_id and redis_client (only if not already exists) if channel_id not in self.client_managers: client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) self.client_managers[channel_id] = client_manager return True # We now own the channel - ONLY NOW should we set metadata with initializing state logger.info(f"Worker {self.worker_id} is now the owner of channel {channel_id}") if self.redis_client: # NOW create or update metadata with initializing state metadata = { "url": channel_url, "init_time": str(time.time()), "last_active": str(time.time()), "owner": self.worker_id, "state": ChannelState.INITIALIZING # Use constant instead of string literal } if channel_user_agent: metadata["user_agent"] = channel_user_agent # Make sure stream_id is always set in metadata and properly logged if channel_stream_id: metadata["stream_id"] = str(channel_stream_id) logger.info(f"Storing stream_id {channel_stream_id} in metadata for channel {channel_id}") else: logger.warning(f"No stream_id provided for channel {channel_id} during initialization") # Set channel metadata BEFORE creating the StreamManager self.redis_client.hset(metadata_key, mapping=metadata) self.redis_client.expire(metadata_key, 3600) # Increased TTL from 30 seconds to 1 hour # Verify the stream_id was set correctly in Redis stream_id_value = self.redis_client.hget(metadata_key, "stream_id") if stream_id_value: logger.info(f"Verified stream_id {stream_id_value} is set in Redis for channel {channel_id}") else: logger.warning(f"Failed to set stream_id in Redis for channel {channel_id}") # Create stream buffer buffer = StreamBuffer(channel_id=channel_id, redis_client=RedisClient.get_buffer()) logger.debug(f"Created StreamBuffer for channel {channel_id}") self.stream_buffers[channel_id] = buffer # Only the owner worker creates the actual stream manager stream_manager = StreamManager( channel_id, channel_url, buffer, user_agent=channel_user_agent, transcode=transcode, stream_id=channel_stream_id, # Pass stream ID to the manager worker_id=self.worker_id # Pass worker_id explicitly to eliminate circular dependency ) logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}") self.stream_managers[channel_id] = stream_manager # Create client manager with channel_id, redis_client AND worker_id (only if not already exists) if channel_id not in self.client_managers: client_manager = ClientManager( channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id ) self.client_managers[channel_id] = client_manager # Start stream manager thread only for the owner thread = threading.Thread(target=stream_manager.run, daemon=True) thread.name = f"stream-{channel_id}" thread.start() logger.info(f"Started stream manager thread for channel {channel_id}") # If we're the owner, we need to set the channel state rather than starting a grace period immediately if self.am_i_owner(channel_id): self.update_channel_state(channel_id, ChannelState.CONNECTING, { "init_time": str(time.time()), "owner": self.worker_id }) # Set connection attempt start time attempt_key = RedisKeys.connection_attempt(channel_id) self.redis_client.setex(attempt_key, 60, str(time.time())) logger.info(f"Channel {channel_id} in {ChannelState.CONNECTING} state - will start grace period after connection") return True except Exception as e: logger.error(f"Error initializing channel {channel_id}: {e}", exc_info=True) # Release ownership on failure self.release_ownership(channel_id) return False def check_if_channel_exists(self, channel_id): """ Check if a channel exists and is in a valid state. Enhanced to detect zombie channels after server restarts. """ # Check local memory first if channel_id in self.stream_managers or channel_id in self.stream_buffers: return True # Check Redis using the standard key pattern if self.redis_client: # Primary check - look for channel metadata metadata_key = RedisKeys.channel_metadata(channel_id) # If metadata exists, validate it's in a healthy state if self.redis_client.exists(metadata_key): metadata = self.redis_client.hgetall(metadata_key) # Get channel state and owner state = metadata.get('state', 'unknown') owner = metadata.get('owner', '') # States that indicate the channel is running properly valid_states = [ChannelState.ACTIVE, ChannelState.WAITING_FOR_CLIENTS, ChannelState.CONNECTING, ChannelState.BUFFERING, ChannelState.INITIALIZING] # If the channel is in a valid state, check if the owner is still active if state in valid_states: # Check if owner still exists by checking heartbeat owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat" owner_alive = self.redis_client.exists(owner_heartbeat_key) if owner_alive: return True else: # This is a zombie channel - owner is gone but metadata still exists logger.warning(f"Detected zombie channel {channel_id} - owner {owner} is no longer active") self._clean_zombie_channel(channel_id, metadata) return False elif state in [ChannelState.STOPPING, ChannelState.STOPPED, ChannelState.ERROR]: # These states indicate the channel should be reinitialized logger.info(f"Channel {channel_id} exists but in terminal state: {state}") return True else: # Unknown or initializing state, check how long it's been in this state if 'state_changed_at' in metadata: state_changed_at = float(metadata['state_changed_at']) state_age = time.time() - state_changed_at # If in initializing state for too long, consider it stale if state_age > 60: # 60 seconds threshold logger.warning(f"Channel {channel_id} stuck in {state} state for {state_age:.1f}s - treating as zombie") self._clean_zombie_channel(channel_id, metadata) return False # Otherwise assume it's still in progress return True # Additional checks if metadata doesn't exist additional_keys = [ RedisKeys.clients(channel_id), RedisKeys.buffer_index(channel_id), RedisKeys.channel_owner(channel_id) ] for key in additional_keys: if self.redis_client.exists(key): # Found orphaned keys without metadata - clean them up logger.warning(f"Found orphaned keys for channel {channel_id} without metadata - cleaning up") self._clean_redis_keys(channel_id) return False return False def _clean_zombie_channel(self, channel_id, metadata=None): """Clean up a zombie channel (channel with Redis keys but no active owner)""" try: logger.info(f"Cleaning up zombie channel {channel_id}") # If we have metadata, log details for debugging if metadata: state = metadata.get('state', 'unknown') owner = metadata.get('owner', 'unknown') logger.info(f"Zombie channel details - state: {state}, owner: {owner}") # Clean up Redis keys self._clean_redis_keys(channel_id) # Force release resources in the Channel model try: channel = Channel.objects.get(uuid=channel_id) channel.release_stream() logger.info(f"Released stream allocation for zombie channel {channel_id}") except Exception as e: try: stream = Stream.objects.get(stream_hash=channel_id) stream.release_stream() logger.info(f"Released stream allocation for zombie channel {channel_id}") except Exception as e: logger.error(f"Error releasing stream for zombie channel {channel_id}: {e}") return True except Exception as e: logger.error(f"Error cleaning zombie channel {channel_id}: {e}", exc_info=True) return False def stop_channel(self, channel_id): """Stop a channel with proper ownership handling""" try: logger.info(f"Stopping channel {channel_id}") # First set a stopping key that clients will check if self.redis_client: stop_key = RedisKeys.channel_stopping(channel_id) self.redis_client.setex(stop_key, 10, "true") # Only stop the actual stream manager if we're the owner if self.am_i_owner(channel_id): logger.info(f"This worker ({self.worker_id}) is the owner - closing provider connection") if channel_id in self.stream_managers: stream_manager = self.stream_managers[channel_id] # Signal thread to stop and close resources if hasattr(stream_manager, 'stop'): stream_manager.stop() else: stream_manager.running = False if hasattr(stream_manager, '_close_socket'): stream_manager._close_socket() # Wait for stream thread to finish stream_thread_name = f"stream-{channel_id}" stream_thread = None for thread in threading.enumerate(): if thread.name == stream_thread_name: stream_thread = thread break if stream_thread and stream_thread.is_alive(): logger.info(f"Waiting for stream thread to terminate") try: # Very short timeout to prevent hanging the app stream_thread.join(timeout=2.0) if stream_thread.is_alive(): logger.warning(f"Stream thread did not terminate within timeout") except RuntimeError: logger.debug(f"Could not join stream thread (may be current thread)") # Release ownership self.release_ownership(channel_id) logger.info(f"Released ownership of channel {channel_id}") # Always clean up local resources - WITH SAFE CHECKS if channel_id in self.stream_managers: del self.stream_managers[channel_id] logger.info(f"Removed stream manager for channel {channel_id}") # Stop buffer and ensure all its timers are cancelled - SAFE CHECK HERE if channel_id in self.stream_buffers: buffer = self.stream_buffers[channel_id] # Call stop on buffer to properly shut it down if hasattr(buffer, 'stop'): try: buffer.stop() logger.debug(f"Buffer for channel {channel_id} properly stopped") except Exception as e: logger.error(f"Error stopping buffer: {e}") # Save reference and check again before deleting try: if channel_id in self.stream_buffers: # Check again to prevent race conditions del self.stream_buffers[channel_id] logger.info(f"Removed stream buffer for channel {channel_id}") except KeyError: logger.debug(f"Buffer for channel {channel_id} already removed") # Clean up client manager - SAFE CHECK HERE TOO if channel_id in self.client_managers: try: client_manager = self.client_managers[channel_id] # Stop the heartbeat thread before deleting if hasattr(client_manager, 'stop'): client_manager.stop() del self.client_managers[channel_id] logger.info(f"Removed client manager for channel {channel_id}") except KeyError: logger.debug(f"Client manager for channel {channel_id} already removed") # Clean up Redis keys self._clean_redis_keys(channel_id) return True except Exception as e: logger.error(f"Error stopping channel {channel_id}: {e}") return False def check_inactive_channels(self): """Check for inactive channels (no clients) and stop them""" channels_to_stop = [] for channel_id, client_manager in self.client_managers.items(): if client_manager.get_client_count() == 0: channels_to_stop.append(channel_id) for channel_id in channels_to_stop: logger.info(f"Auto-stopping inactive channel {channel_id}") self.stop_channel(channel_id) def _cleanup_channel(self, channel_id: str) -> None: """Remove channel resources""" # Removed reference to non-existent fetch_threads collection for collection in [self.stream_managers, self.stream_buffers, self.client_managers]: collection.pop(channel_id, None) def shutdown(self) -> None: """Stop all channels and cleanup""" for channel_id in list(self.stream_managers.keys()): self.stop_channel(channel_id) def _start_cleanup_thread(self): """Start background thread to maintain ownership and clean up resources""" def cleanup_task(): while True: try: # Send worker heartbeat first if self.redis_client: worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat" self._execute_redis_command( lambda: self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) ) # Refresh channel registry self.refresh_channel_registry() # Create a unified list of all channels we have locally all_local_channels = set(self.stream_managers.keys()) | set(self.client_managers.keys()) # Single loop through all channels - process each exactly once for channel_id in list(all_local_channels): if self.am_i_owner(channel_id): # === OWNER CHANNEL HANDLING === # Extend ownership lease self.extend_ownership(channel_id) # Get channel state from metadata hash channel_state = "unknown" if self.redis_client: metadata_key = RedisKeys.channel_metadata(channel_id) metadata = self.redis_client.hgetall(metadata_key) if metadata and 'state' in metadata: channel_state = metadata['state'] # Check if channel has any clients left total_clients = 0 if channel_id in self.client_managers: client_manager = self.client_managers[channel_id] total_clients = client_manager.get_total_client_count() # Log client count periodically if time.time() % 30 < 1: # Every ~30 seconds logger.info(f"Channel {channel_id} has {total_clients} clients, state: {channel_state}") # If in connecting or waiting_for_clients state, check grace period if channel_state in [ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS]: # Get connection ready time from metadata connection_ready_time = None if metadata and 'connection_ready_time' in metadata: try: connection_ready_time = float(metadata['connection_ready_time']) except (ValueError, TypeError): pass # If still connecting, give it more time if channel_state == ChannelState.CONNECTING: logger.debug(f"Channel {channel_id} still connecting - not checking for clients yet") continue # If waiting for clients, check grace period if connection_ready_time: grace_period = ConfigHelper.channel_init_grace_period() time_since_ready = time.time() - connection_ready_time # Add this debug log logger.debug(f"GRACE PERIOD CHECK: Channel {channel_id} in {channel_state} state, " f"time_since_ready={time_since_ready:.1f}s, grace_period={grace_period}s, " f"total_clients={total_clients}") if time_since_ready <= grace_period: # Still within grace period logger.debug(f"Channel {channel_id} in grace period - {time_since_ready:.1f}s of {grace_period}s elapsed") continue elif total_clients == 0: # Grace period expired with no clients logger.info(f"Grace period expired ({time_since_ready:.1f}s > {grace_period}s) with no clients - stopping channel {channel_id}") self.stop_channel(channel_id) else: # Grace period expired but we have clients - mark channel as active logger.info(f"Grace period expired with {total_clients} clients - marking channel {channel_id} as active") old_state = "unknown" if metadata and 'state' in metadata: old_state = metadata['state'] if self.update_channel_state(channel_id, ChannelState.ACTIVE, { "grace_period_ended_at": str(time.time()), "clients_at_activation": str(total_clients) }): logger.info(f"Channel {channel_id} activated with {total_clients} clients after grace period") # If active and no clients, start normal shutdown procedure elif channel_state not in [ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS] and total_clients == 0: # Check if there's a pending no-clients timeout disconnect_key = RedisKeys.last_client_disconnect(channel_id) disconnect_time = None if self.redis_client: disconnect_value = self.redis_client.get(disconnect_key) if disconnect_value: try: disconnect_time = float(disconnect_value) except (ValueError, TypeError) as e: logger.error(f"Invalid disconnect time for channel {channel_id}: {e}") current_time = time.time() if not disconnect_time: # First time seeing zero clients, set timestamp if self.redis_client: self.redis_client.setex(disconnect_key, 60, str(current_time)) logger.warning(f"No clients detected for channel {channel_id}, starting shutdown timer") elif current_time - disconnect_time > ConfigHelper.channel_shutdown_delay(): # We've had no clients for the shutdown delay period logger.warning(f"No clients for {current_time - disconnect_time:.1f}s, stopping channel {channel_id}") self.stop_channel(channel_id) else: # Still in shutdown delay period logger.debug(f"Channel {channel_id} shutdown timer: " f"{current_time - disconnect_time:.1f}s of " f"{ConfigHelper.channel_shutdown_delay()}s elapsed") else: # There are clients or we're still connecting - clear any disconnect timestamp if self.redis_client: self.redis_client.delete(f"ts_proxy:channel:{channel_id}:last_client_disconnect_time") else: # === NON-OWNER CHANNEL HANDLING === # For channels we don't own, check if they've been stopped/cleaned up in Redis if self.redis_client: # Method 1: Check for stopping key stop_key = RedisKeys.channel_stopping(channel_id) if self.redis_client.exists(stop_key): logger.debug(f"Non-owner cleanup: Channel {channel_id} has stopping flag in Redis, cleaning up local resources") self._cleanup_local_resources(channel_id) continue # Method 2: Check if owner still exists owner_key = RedisKeys.channel_owner(channel_id) if not self.redis_client.exists(owner_key): logger.debug(f"Non-owner cleanup: Channel {channel_id} has no owner in Redis, cleaning up local resources") self._cleanup_local_resources(channel_id) continue # Method 3: Check if metadata still exists metadata_key = RedisKeys.channel_metadata(channel_id) if not self.redis_client.exists(metadata_key): logger.debug(f"Non-owner cleanup: Channel {channel_id} has no metadata in Redis, cleaning up local resources") self._cleanup_local_resources(channel_id) continue # Check for local client count - if zero, clean up our local resources if self.client_managers[channel_id].get_client_count() == 0: # We're not the owner, and we have no local clients - clean up our resources logger.debug(f"Non-owner cleanup: Channel {channel_id} has no local clients, cleaning up local resources") self._cleanup_local_resources(channel_id) except Exception as e: logger.error(f"Error in cleanup thread: {e}", exc_info=True) gevent.sleep(ConfigHelper.cleanup_check_interval()) # REPLACE: time.sleep(ConfigHelper.cleanup_check_interval()) thread = threading.Thread(target=cleanup_task, daemon=True) thread.name = "ts-proxy-cleanup" thread.start() logger.info(f"Started TS proxy cleanup thread (interval: {ConfigHelper.cleanup_check_interval()}s)") def _check_orphaned_channels(self): """Check for orphaned channels in Redis (owner worker crashed)""" if not self.redis_client: return try: # Get all active channel keys channel_pattern = "ts_proxy:channel:*:metadata" channel_keys = self.redis_client.keys(channel_pattern) for key in channel_keys: try: channel_id = key.split(':')[2] # Skip channels we already have locally if channel_id in self.stream_buffers: continue # Check if this channel has an owner owner = self.get_channel_owner(channel_id) if not owner: # Check if there are any clients client_set_key = RedisKeys.clients(channel_id) client_count = self.redis_client.scard(client_set_key) or 0 if client_count > 0: # Orphaned channel with clients - we could take ownership logger.info(f"Found orphaned channel {channel_id} with {client_count} clients") else: # Orphaned channel with no clients - clean it up logger.info(f"Cleaning up orphaned channel {channel_id}") self._clean_redis_keys(channel_id) except Exception as e: logger.error(f"Error processing channel key {key}: {e}") except Exception as e: logger.error(f"Error checking orphaned channels: {e}") def _clean_redis_keys(self, channel_id): """Clean up all Redis keys for a channel more efficiently""" # Release the channel, stream, and profile keys from the channel try: channel = Channel.objects.get(uuid=channel_id) channel.release_stream() except: stream = Stream.objects.get(stream_hash=channel_id) stream.release_stream() if not self.redis_client: return 0 try: # Define key patterns to scan for patterns = [ f"ts_proxy:channel:{channel_id}:*", # All channel keys RedisKeys.events_channel(channel_id) # Event channel ] total_deleted = 0 for pattern in patterns: cursor = 0 while True: cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100) if keys: self.redis_client.delete(*keys) total_deleted += len(keys) # Exit when cursor returns to 0 if cursor == 0: break logger.info(f"Cleaned up {total_deleted} Redis keys for channel {channel_id}") return total_deleted except Exception as e: logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}") return 0 def refresh_channel_registry(self): """Refresh TTL for active channels using standard keys""" if not self.redis_client: return # Refresh registry entries for channels we own for channel_id in list(self.stream_buffers.keys()): # Use standard key pattern metadata_key = RedisKeys.channel_metadata(channel_id) # Update activity timestamp in metadata only self.redis_client.hset(metadata_key, "last_active", str(time.time())) self.redis_client.expire(metadata_key, 30) # Reset TTL on metadata hash logger.debug(f"Refreshed metadata TTL for channel {channel_id}") def update_channel_state(self, channel_id, new_state, additional_fields=None): """Update channel state with proper history tracking and logging""" if not self.redis_client: return False try: metadata_key = RedisKeys.channel_metadata(channel_id) # Get current state for logging current_state = None metadata = self.redis_client.hgetall(metadata_key) if metadata and 'state' in metadata: current_state = metadata['state'] # Only update if state is actually changing if current_state == new_state: logger.debug(f"Channel {channel_id} state unchanged: {current_state}") return True # Prepare update data update_data = { "state": new_state, "state_changed_at": str(time.time()) } # Add optional additional fields if additional_fields: update_data.update(additional_fields) # Update the metadata self.redis_client.hset(metadata_key, mapping=update_data) # Log the transition logger.info(f"Channel {channel_id} state transition: {current_state or 'None'} -> {new_state}") return True except Exception as e: logger.error(f"Error updating channel state: {e}") return False def _cleanup_local_resources(self, channel_id): """Clean up local resources for a channel without affecting Redis keys""" try: # Clean up local objects only if channel_id in self.stream_managers: if hasattr(self.stream_managers[channel_id], 'stop'): self.stream_managers[channel_id].stop() del self.stream_managers[channel_id] logger.info(f"Non-owner cleanup: Removed stream manager for channel {channel_id}") if channel_id in self.stream_buffers: del self.stream_buffers[channel_id] logger.info(f"Non-owner cleanup: Removed stream buffer for channel {channel_id}") if channel_id in self.client_managers: del self.client_managers[channel_id] logger.info(f"Non-owner cleanup: Removed client manager for channel {channel_id}") return True except Exception as e: logger.error(f"Error cleaning up local resources: {e}", exc_info=True) return False