diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index da5daaa7..0d638c1a 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -131,6 +131,8 @@ class ProxyServer: max_retries = 10 base_retry_delay = 1 # Start with 1 second delay max_retry_delay = 30 # Cap at 30 seconds + pubsub_client = None + pubsub = None while True: try: @@ -339,20 +341,27 @@ class ProxyServer: logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})") gevent.sleep(final_delay) # REPLACE: time.sleep(final_delay) - # Try to clean up the old connection - try: - if 'pubsub' in locals(): - pubsub.close() - if 'pubsub_client' in locals(): - pubsub_client.close() - except: - pass - except Exception as e: logger.error(f"Error in event listener: {e}") # Add a short delay to prevent rapid retries on persistent errors gevent.sleep(5) # REPLACE: time.sleep(5) + finally: + # Always clean up PubSub connections in all error paths + try: + if pubsub: + pubsub.close() + pubsub = None + except Exception as e: + logger.debug(f"Error closing pubsub: {e}") + + try: + if pubsub_client: + pubsub_client.close() + pubsub_client = None + except Exception as e: + logger.debug(f"Error closing pubsub_client: {e}") + thread = threading.Thread(target=event_listener, daemon=True) thread.name = "redis-event-listener" thread.start() @@ -596,7 +605,7 @@ class ProxyServer: if channel_user_agent: metadata["user_agent"] = channel_user_agent - # CRITICAL FIX: Make sure stream_id is always set in metadata and properly logged + # Make sure stream_id is always set in metadata and properly logged if channel_stream_id: metadata["stream_id"] = str(channel_stream_id) logger.info(f"Storing stream_id {channel_stream_id} in metadata for channel {channel_id}") diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index e80d4527..be6a9c4e 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -1219,6 +1219,30 @@ class StreamManager: except Exception as e: logger.error(f"Final kill attempt failed for channel {self.channel_id}: {e}") + # Explicitly close all subprocess pipes to prevent file descriptor leaks + try: + if self.transcode_process.stdin: + self.transcode_process.stdin.close() + if self.transcode_process.stdout: + self.transcode_process.stdout.close() + if self.transcode_process.stderr: + self.transcode_process.stderr.close() + logger.debug(f"Closed all subprocess pipes for channel {self.channel_id}") + except Exception as e: + logger.debug(f"Error closing subprocess pipes for channel {self.channel_id}: {e}") + + # Join stderr reader thread to ensure it's fully terminated + if hasattr(self, 'stderr_reader_thread') and self.stderr_reader_thread and self.stderr_reader_thread.is_alive(): + try: + logger.debug(f"Waiting for stderr reader thread to terminate for channel {self.channel_id}") + self.stderr_reader_thread.join(timeout=2.0) + if self.stderr_reader_thread.is_alive(): + logger.warning(f"Stderr reader thread did not terminate within timeout for channel {self.channel_id}") + except Exception as e: + logger.debug(f"Error joining stderr reader thread for channel {self.channel_id}: {e}") + finally: + self.stderr_reader_thread = None + self.transcode_process = None self.transcode_process_active = False # Reset the flag