From 580aa1975c1697db09e4323957ac7d627e7ff511 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 3 Jul 2025 11:02:07 -0500 Subject: [PATCH] Add process management for safe connection handling in StreamManager - Introduced _wait_for_existing_processes_to_close method to ensure all existing processes and connections are fully closed before establishing new ones. - Updated _establish_transcode_connection and _establish_http_connection methods to check for and close lingering processes and connections. - Enhanced logging for better debugging and monitoring of connection states. --- apps/proxy/ts_proxy/stream_manager.py | 70 ++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index 1abfb6d8..f7290436 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -135,6 +135,37 @@ class StreamManager: return session + def _wait_for_existing_processes_to_close(self, timeout=5.0): + """Wait for existing processes/connections to fully close before establishing new ones""" + start_time = time.time() + + while time.time() - start_time < timeout: + # Check if transcode process is still running + if self.transcode_process and self.transcode_process.poll() is None: + logger.debug(f"Waiting for existing transcode process to terminate for channel {self.channel_id}") + gevent.sleep(0.1) + continue + + # Check if HTTP connections are still active + if self.current_response or self.current_session: + logger.debug(f"Waiting for existing HTTP connections to close for channel {self.channel_id}") + gevent.sleep(0.1) + continue + + # Check if socket is still active + if self.socket: + logger.debug(f"Waiting for existing socket to close for channel {self.channel_id}") + gevent.sleep(0.1) + continue + + # All processes/connections are closed + logger.debug(f"All existing processes closed for channel {self.channel_id}") + return True + + # Timeout reached + logger.warning(f"Timeout waiting for existing processes to close for channel {self.channel_id} after {timeout}s") + return False + def run(self): """Main execution loop using HTTP streaming with improved connection handling and stream switching""" # Add a stop flag to the class properties @@ -323,6 +354,22 @@ class StreamManager: """Establish a connection using transcoding""" try: logger.debug(f"Building transcode command for channel {self.channel_id}") + + # Check if we already have a running transcode process + if self.transcode_process and self.transcode_process.poll() is None: + logger.info(f"Existing transcode process found for channel {self.channel_id}, closing before establishing new connection") + self._close_socket() + + # Wait for the process to fully terminate + if not self._wait_for_existing_processes_to_close(): + logger.error(f"Failed to close existing transcode process for channel {self.channel_id}") + return False + + # Also check for any lingering HTTP connections + if self.current_response or self.current_session: + logger.debug(f"Closing existing HTTP connections before establishing transcode connection for channel {self.channel_id}") + self._close_connection() + channel = get_stream_object(self.channel_id) # Use FFmpeg specifically for HLS streams @@ -656,6 +703,21 @@ class StreamManager: try: logger.debug(f"Using TS Proxy to connect to stream: {self.url}") + # Check if we already have active HTTP connections + if self.current_response or self.current_session: + logger.info(f"Existing HTTP connection found for channel {self.channel_id}, closing before establishing new connection") + self._close_connection() + + # Wait for connections to fully close + if not self._wait_for_existing_processes_to_close(): + logger.error(f"Failed to close existing HTTP connections for channel {self.channel_id}") + return False + + # Also check for any lingering transcode processes + if self.transcode_process and self.transcode_process.poll() is None: + logger.debug(f"Closing existing transcode process before establishing HTTP connection for channel {self.channel_id}") + self._close_socket() + # Create new session for each connection attempt session = self._create_session() self.current_session = session @@ -1000,12 +1062,18 @@ class StreamManager: logger.info("URL switching already in progress, skipping reconnect") return False - # Close existing connection + # Close existing connection and wait for it to fully terminate if self.transcode or self.socket: + logger.debug("Closing transcode process before reconnect") self._close_socket() else: + logger.debug("Closing HTTP connection before reconnect") self._close_connection() + # Wait for all processes to fully close before attempting reconnect + if not self._wait_for_existing_processes_to_close(): + logger.warning(f"Some processes may still be running during reconnect for channel {self.channel_id}") + self.connected = False # Attempt to establish a new connection using the same URL