Enhancement: Improve resource cleanup in ProxyServer and StreamManager classes to avoid "SystemError: (libev) error creating signal/async pipe: Too many open files" errors

This commit is contained in:
SergeantPanda 2025-10-10 15:26:02 -05:00
parent 9dc54fdcff
commit fefab4c4c6
2 changed files with 43 additions and 10 deletions

View file

@ -131,6 +131,8 @@ class ProxyServer:
max_retries = 10
base_retry_delay = 1 # Start with 1 second delay
max_retry_delay = 30 # Cap at 30 seconds
pubsub_client = None
pubsub = None
while True:
try:
@ -339,20 +341,27 @@ class ProxyServer:
logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})")
gevent.sleep(final_delay) # REPLACE: time.sleep(final_delay)
# Try to clean up the old connection
try:
if 'pubsub' in locals():
pubsub.close()
if 'pubsub_client' in locals():
pubsub_client.close()
except:
pass
except Exception as e:
logger.error(f"Error in event listener: {e}")
# Add a short delay to prevent rapid retries on persistent errors
gevent.sleep(5) # REPLACE: time.sleep(5)
finally:
# Always clean up PubSub connections in all error paths
try:
if pubsub:
pubsub.close()
pubsub = None
except Exception as e:
logger.debug(f"Error closing pubsub: {e}")
try:
if pubsub_client:
pubsub_client.close()
pubsub_client = None
except Exception as e:
logger.debug(f"Error closing pubsub_client: {e}")
thread = threading.Thread(target=event_listener, daemon=True)
thread.name = "redis-event-listener"
thread.start()
@ -596,7 +605,7 @@ class ProxyServer:
if channel_user_agent:
metadata["user_agent"] = channel_user_agent
# CRITICAL FIX: Make sure stream_id is always set in metadata and properly logged
# Make sure stream_id is always set in metadata and properly logged
if channel_stream_id:
metadata["stream_id"] = str(channel_stream_id)
logger.info(f"Storing stream_id {channel_stream_id} in metadata for channel {channel_id}")

View file

@ -1219,6 +1219,30 @@ class StreamManager:
except Exception as e:
logger.error(f"Final kill attempt failed for channel {self.channel_id}: {e}")
# Explicitly close all subprocess pipes to prevent file descriptor leaks
try:
if self.transcode_process.stdin:
self.transcode_process.stdin.close()
if self.transcode_process.stdout:
self.transcode_process.stdout.close()
if self.transcode_process.stderr:
self.transcode_process.stderr.close()
logger.debug(f"Closed all subprocess pipes for channel {self.channel_id}")
except Exception as e:
logger.debug(f"Error closing subprocess pipes for channel {self.channel_id}: {e}")
# Join stderr reader thread to ensure it's fully terminated
if hasattr(self, 'stderr_reader_thread') and self.stderr_reader_thread and self.stderr_reader_thread.is_alive():
try:
logger.debug(f"Waiting for stderr reader thread to terminate for channel {self.channel_id}")
self.stderr_reader_thread.join(timeout=2.0)
if self.stderr_reader_thread.is_alive():
logger.warning(f"Stderr reader thread did not terminate within timeout for channel {self.channel_id}")
except Exception as e:
logger.debug(f"Error joining stderr reader thread for channel {self.channel_id}: {e}")
finally:
self.stderr_reader_thread = None
self.transcode_process = None
self.transcode_process_active = False # Reset the flag