Add process management for safe connection handling in StreamManager

- Introduced _wait_for_existing_processes_to_close method to ensure all existing processes and connections are fully closed before establishing new ones.
- Updated _establish_transcode_connection and _establish_http_connection methods to check for and close lingering processes and connections.
- Enhanced logging for better debugging and monitoring of connection states.
This commit is contained in:
SergeantPanda 2025-07-03 11:02:07 -05:00
parent 8e2c6c7780
commit 580aa1975c

View file

@ -135,6 +135,37 @@ class StreamManager:
return session
def _wait_for_existing_processes_to_close(self, timeout=5.0):
"""Wait for existing processes/connections to fully close before establishing new ones"""
start_time = time.time()
while time.time() - start_time < timeout:
# Check if transcode process is still running
if self.transcode_process and self.transcode_process.poll() is None:
logger.debug(f"Waiting for existing transcode process to terminate for channel {self.channel_id}")
gevent.sleep(0.1)
continue
# Check if HTTP connections are still active
if self.current_response or self.current_session:
logger.debug(f"Waiting for existing HTTP connections to close for channel {self.channel_id}")
gevent.sleep(0.1)
continue
# Check if socket is still active
if self.socket:
logger.debug(f"Waiting for existing socket to close for channel {self.channel_id}")
gevent.sleep(0.1)
continue
# All processes/connections are closed
logger.debug(f"All existing processes closed for channel {self.channel_id}")
return True
# Timeout reached
logger.warning(f"Timeout waiting for existing processes to close for channel {self.channel_id} after {timeout}s")
return False
def run(self):
"""Main execution loop using HTTP streaming with improved connection handling and stream switching"""
# Add a stop flag to the class properties
@ -323,6 +354,22 @@ class StreamManager:
"""Establish a connection using transcoding"""
try:
logger.debug(f"Building transcode command for channel {self.channel_id}")
# Check if we already have a running transcode process
if self.transcode_process and self.transcode_process.poll() is None:
logger.info(f"Existing transcode process found for channel {self.channel_id}, closing before establishing new connection")
self._close_socket()
# Wait for the process to fully terminate
if not self._wait_for_existing_processes_to_close():
logger.error(f"Failed to close existing transcode process for channel {self.channel_id}")
return False
# Also check for any lingering HTTP connections
if self.current_response or self.current_session:
logger.debug(f"Closing existing HTTP connections before establishing transcode connection for channel {self.channel_id}")
self._close_connection()
channel = get_stream_object(self.channel_id)
# Use FFmpeg specifically for HLS streams
@ -656,6 +703,21 @@ class StreamManager:
try:
logger.debug(f"Using TS Proxy to connect to stream: {self.url}")
# Check if we already have active HTTP connections
if self.current_response or self.current_session:
logger.info(f"Existing HTTP connection found for channel {self.channel_id}, closing before establishing new connection")
self._close_connection()
# Wait for connections to fully close
if not self._wait_for_existing_processes_to_close():
logger.error(f"Failed to close existing HTTP connections for channel {self.channel_id}")
return False
# Also check for any lingering transcode processes
if self.transcode_process and self.transcode_process.poll() is None:
logger.debug(f"Closing existing transcode process before establishing HTTP connection for channel {self.channel_id}")
self._close_socket()
# Create new session for each connection attempt
session = self._create_session()
self.current_session = session
@ -1000,12 +1062,18 @@ class StreamManager:
logger.info("URL switching already in progress, skipping reconnect")
return False
# Close existing connection
# Close existing connection and wait for it to fully terminate
if self.transcode or self.socket:
logger.debug("Closing transcode process before reconnect")
self._close_socket()
else:
logger.debug("Closing HTTP connection before reconnect")
self._close_connection()
# Wait for all processes to fully close before attempting reconnect
if not self._wait_for_existing_processes_to_close():
logger.warning(f"Some processes may still be running during reconnect for channel {self.channel_id}")
self.connected = False
# Attempt to establish a new connection using the same URL