From 4738d301d1bf6e06ccb11707b7f86475dd2c65a6 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 20 Mar 2025 21:03:01 -0500 Subject: [PATCH] Fixed regression with buffer checks when clients should be disconnecting due to failure. --- apps/proxy/ts_proxy/redis_keys.py | 5 + apps/proxy/ts_proxy/server.py | 9 +- .../ts_proxy/services/channel_service.py | 10 ++ apps/proxy/ts_proxy/stream_generator.py | 20 ++- apps/proxy/ts_proxy/stream_manager.py | 132 ++++++++++++++++-- 5 files changed, 161 insertions(+), 15 deletions(-) diff --git a/apps/proxy/ts_proxy/redis_keys.py b/apps/proxy/ts_proxy/redis_keys.py index ebbcbc24..1eaa8aa5 100644 --- a/apps/proxy/ts_proxy/redis_keys.py +++ b/apps/proxy/ts_proxy/redis_keys.py @@ -78,3 +78,8 @@ class RedisKeys: def worker_heartbeat(worker_id): """Key for worker heartbeat""" return f"ts_proxy:worker:{worker_id}:heartbeat" + + @staticmethod + def transcode_active(channel_id): + """Key indicating active transcode process""" + return f"ts_proxy:channel:{channel_id}:transcode_active" diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index aa2e7ffc..cfe5b4ce 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -308,6 +308,12 @@ class ProxyServer: if current and current.decode('utf-8') == self.worker_id: self.redis_client.delete(lock_key) logger.info(f"Released ownership of channel {channel_id}") + + # Also ensure channel stopping key is set to signal clients + stop_key = RedisKeys.channel_stopping(channel_id) + self.redis_client.setex(stop_key, 30, "true") + logger.info(f"Set stopping signal for channel {channel_id} clients") + except Exception as e: logger.error(f"Error releasing channel ownership: {e}") @@ -458,7 +464,8 @@ class ProxyServer: buffer, user_agent=channel_user_agent, transcode=transcode, - stream_id=channel_stream_id # Pass stream ID to the manager + stream_id=channel_stream_id, # Pass stream ID to the manager + worker_id=self.worker_id # Pass worker_id explicitly to eliminate circular dependency ) logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}") self.stream_managers[channel_id] = stream_manager diff --git a/apps/proxy/ts_proxy/services/channel_service.py b/apps/proxy/ts_proxy/services/channel_service.py index e00e680d..210e4b0f 100644 --- a/apps/proxy/ts_proxy/services/channel_service.py +++ b/apps/proxy/ts_proxy/services/channel_service.py @@ -218,9 +218,19 @@ class ChannelService: if metadata and b'state' in metadata: state = metadata[b'state'].decode('utf-8') channel_info = {"state": state} + + # Immediately mark as stopping in metadata so clients detect it faster + proxy_server.redis_client.hset(metadata_key, "state", ChannelState.STOPPING) + proxy_server.redis_client.hset(metadata_key, "state_changed_at", str(time.time())) except Exception as e: logger.error(f"Error fetching channel state: {e}") + # Set stopping flag with higher TTL to ensure it persists + if proxy_server.redis_client: + stop_key = RedisKeys.channel_stopping(channel_id) + proxy_server.redis_client.setex(stop_key, 60, "true") # Higher TTL of 60 seconds + logger.info(f"Set channel stopping flag with 60s TTL for channel {channel_id}") + # Broadcast stop event to all workers via PubSub if proxy_server.redis_client: ChannelService._publish_channel_stop_event(channel_id) diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index dcaae961..8a91f1dc 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -103,7 +103,7 @@ class StreamGenerator: if state in ['waiting_for_clients', 'active']: logger.info(f"[{self.client_id}] Channel {self.channel_id} now ready (state={state})") return True - elif state in ['error', 'stopped']: + elif state in ['error', 'stopped', 'stopping']: # Added 'stopping' to error states error_message = metadata.get(b'error_message', b'Unknown error').decode('utf-8') logger.error(f"[{self.client_id}] Channel {self.channel_id} in error state: {state}, message: {error_message}") # Send error packet before giving up @@ -119,6 +119,13 @@ class StreamGenerator: self.bytes_sent += len(keepalive_packet) last_keepalive = time.time() + # Also check stopping key directly + stop_key = RedisKeys.channel_stopping(self.channel_id) + if proxy_server.redis_client.exists(stop_key): + logger.error(f"[{self.client_id}] Channel {self.channel_id} stopping flag detected during initialization") + yield create_ts_packet('error', "Error: Channel is stopping") + return False + # Wait a bit before checking again time.sleep(0.1) @@ -221,12 +228,21 @@ class StreamGenerator: # Check if this specific client has been stopped (Redis keys, etc.) if proxy_server.redis_client: - # Channel stop check + # Channel stop check - with extended key set stop_key = RedisKeys.channel_stopping(self.channel_id) if proxy_server.redis_client.exists(stop_key): logger.info(f"[{self.client_id}] Detected channel stop signal, terminating stream") return False + # Also check channel state in metadata + metadata_key = RedisKeys.channel_metadata(self.channel_id) + metadata = proxy_server.redis_client.hgetall(metadata_key) + if metadata and b'state' in metadata: + state = metadata[b'state'].decode('utf-8') + if state in ['error', 'stopped', 'stopping']: + logger.info(f"[{self.client_id}] Channel in {state} state, terminating stream") + return False + # Client stop check client_stop_key = RedisKeys.client_stop(self.channel_id, self.client_id) if proxy_server.redis_client.exists(client_stop_key): diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index 3d81472d..c47766eb 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -24,7 +24,7 @@ logger = logging.getLogger("ts_proxy") class StreamManager: """Manages a connection to a TS stream without using raw sockets""" - def __init__(self, channel_id, url, buffer, user_agent=None, transcode=False, stream_id=None): + def __init__(self, channel_id, url, buffer, user_agent=None, transcode=False, stream_id=None, worker_id=None): # Basic properties self.channel_id = channel_id self.url = url @@ -36,6 +36,8 @@ class StreamManager: self.current_response = None self.current_session = None self.url_switching = False + # Store worker_id for ownership checks + self.worker_id = worker_id # Sockets used for transcode jobs self.socket = None @@ -89,6 +91,9 @@ class StreamManager: logger.info(f"Initialized stream manager for channel {buffer.channel_id}") + # Add this flag for tracking transcoding process status + self.transcode_process_active = False + def _create_session(self): """Create and configure requests session with optimal settings""" session = requests.Session() @@ -234,9 +239,61 @@ class StreamManager: except Exception as e: logger.error(f"Stream error: {e}", exc_info=True) finally: + # Enhanced cleanup in the finally block self.connected = False + + # Explicitly cancel all timers + for timer in list(self._buffer_check_timers): + try: + if timer and timer.is_alive(): + timer.cancel() + except Exception: + pass + + self._buffer_check_timers.clear() + + # Make sure transcode process is terminated + if self.transcode_process_active: + logger.info("Ensuring transcode process is terminated in finally block") + self._close_socket() + + # Close all connections self._close_all_connections() - logger.info(f"Stream manager stopped") + + # Update channel state in Redis to prevent clients from waiting indefinitely + if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: + try: + metadata_key = RedisKeys.channel_metadata(self.channel_id) + + # Check if we're the owner before updating state + owner_key = RedisKeys.channel_owner(self.channel_id) + current_owner = self.buffer.redis_client.get(owner_key) + + # Use the worker_id that was passed in during initialization + if current_owner and self.worker_id and current_owner.decode('utf-8') == self.worker_id: + # Determine the appropriate error message based on retry failures + if self.tried_stream_ids and len(self.tried_stream_ids) > 0: + error_message = f"All {len(self.tried_stream_ids)} stream options failed" + else: + error_message = f"Connection failed after {self.max_retries} attempts" + + # Update metadata to indicate error state + update_data = { + "state": ChannelState.ERROR, + "state_changed_at": str(time.time()), + "error_message": error_message, + "error_time": str(time.time()) + } + self.buffer.redis_client.hset(metadata_key, mapping=update_data) + logger.info(f"Updated channel {self.channel_id} state to ERROR in Redis after stream failure") + + # Also set stopping key to ensure clients disconnect + stop_key = RedisKeys.channel_stopping(self.channel_id) + self.buffer.redis_client.setex(stop_key, 60, "true") + except Exception as e: + logger.error(f"Failed to update channel state in Redis: {e}") + + logger.info(f"Stream manager stopped for channel {self.channel_id}") def _establish_transcode_connection(self): """Establish a connection using transcoding""" @@ -264,11 +321,14 @@ class StreamManager: self.transcode_process = subprocess.Popen( self.transcode_cmd, stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, # Suppress FFmpeg logs + stderr=subprocess.DEVNULL, # Suppress error logs bufsize=188 * 64 # Buffer optimized for TS packets ) - self.socket = self.transcode_process.stdout # Read from FFmpeg output + # Set flag that transcoding process is active + self.transcode_process_active = True + + self.socket = self.transcode_process.stdout # Read from std output self.connected = True # Set channel state to waiting for clients @@ -392,6 +452,8 @@ class StreamManager: def stop(self): """Stop the stream manager and cancel all timers""" + logger.info(f"Stopping stream manager for channel {self.channel_id}") + # Add at the beginning of your stop method self.stopping = True @@ -405,7 +467,6 @@ class StreamManager: self._buffer_check_timers.clear() - # Rest of your existing stop method... # Set the flag first self.stop_requested = True @@ -423,6 +484,9 @@ class StreamManager: except Exception: pass + # Explicitly close socket/transcode resources + self._close_socket() + # Set running to false to ensure thread exits self.running = False @@ -530,15 +594,56 @@ class StreamManager: self.socket = None self.connected = False + # Enhanced transcode process cleanup with more aggressive termination if self.transcode_process: try: + # First try polite termination + logger.debug(f"Terminating transcode process for channel {self.channel_id}") self.transcode_process.terminate() - self.transcode_process.wait() + + # Give it a short time to terminate gracefully + try: + self.transcode_process.wait(timeout=1.0) + except subprocess.TimeoutExpired: + # If it doesn't terminate quickly, kill it + logger.warning(f"Transcode process didn't terminate within timeout, killing forcefully") + self.transcode_process.kill() + + try: + self.transcode_process.wait(timeout=1.0) + except subprocess.TimeoutExpired: + logger.error(f"Failed to kill transcode process even with force") except Exception as e: logger.debug(f"Error terminating transcode process: {e}") - pass + + # Final attempt: try to kill directly + try: + self.transcode_process.kill() + except Exception as e: + logger.error(f"Final kill attempt failed: {e}") self.transcode_process = None + self.transcode_process_active = False # Reset the flag + + # Clear transcode active key in Redis if available + if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: + try: + transcode_key = RedisKeys.transcode_active(self.channel_id) + self.buffer.redis_client.delete(transcode_key) + logger.debug(f"Cleared transcode active flag for channel {self.channel_id}") + except Exception as e: + logger.debug(f"Error clearing transcode flag: {e}") + + # Cancel any remaining buffer check timers + for timer in list(self._buffer_check_timers): + try: + if timer and timer.is_alive(): + timer.cancel() + logger.debug(f"Cancelled buffer check timer during socket close for channel {self.channel_id}") + except Exception as e: + logger.debug(f"Error canceling timer during socket close: {e}") + + self._buffer_check_timers = [] def fetch_chunk(self): """Fetch data from socket with direct pass-through to buffer""" @@ -649,10 +754,10 @@ class StreamManager: def _check_buffer_and_set_state(self): """Check buffer size and set state to waiting_for_clients when ready""" try: - # First check if we're stopping or reconnecting - if getattr(self, 'stopping', False) or getattr(self, 'reconnecting', False): + # Enhanced stop detection with short-circuit return + if not self.running or getattr(self, 'stopping', False) or getattr(self, 'reconnecting', False): logger.debug(f"Buffer check aborted - channel {self.buffer.channel_id} is stopping or reconnecting") - return + return False # Return value to indicate check was aborted # Clean up completed timers self._buffer_check_timers = [t for t in self._buffer_check_timers if t.is_alive()] @@ -670,14 +775,17 @@ class StreamManager: # Still waiting, log progress and schedule another check logger.debug(f"Buffer filling for channel {channel_id}: {current_buffer_index}/{initial_chunks_needed} chunks") - # Schedule another check - NOW WITH TRACKING - if not getattr(self, 'stopping', False): + # Schedule another check - NOW WITH STOPPING CHECK + if self.running and not getattr(self, 'stopping', False): timer = threading.Timer(0.5, self._check_buffer_and_set_state) timer.daemon = True timer.start() self._buffer_check_timers.append(timer) + + return True # Return value to indicate check was successful except Exception as e: logger.error(f"Error in buffer check: {e}") + return False def _try_next_stream(self): """