Enhancement: Improve zombie channel handling in ProxyServer by checking client connections and cleaning up orphaned metadata, ensuring better resource management and stability.

This commit is contained in:
SergeantPanda 2025-11-18 10:02:35 -06:00
parent d8df848136
commit b6c3234e96

View file

@ -719,6 +719,18 @@ class ProxyServer:
else:
# This is a zombie channel - owner is gone but metadata still exists
logger.warning(f"Detected zombie channel {channel_id} - owner {owner} is no longer active")
# Check if there are any clients connected
client_set_key = RedisKeys.clients(channel_id)
client_count = self.redis_client.scard(client_set_key) or 0
if client_count > 0:
logger.warning(f"Zombie channel {channel_id} has {client_count} clients - attempting ownership takeover")
# Could potentially take ownership here in the future
# For now, just clean it up to be safe
else:
logger.warning(f"Zombie channel {channel_id} has no clients - cleaning up")
self._clean_zombie_channel(channel_id, metadata)
return False
elif state in [ChannelState.STOPPED, ChannelState.ERROR]:
@ -940,6 +952,15 @@ class ProxyServer:
if channel_id in self.client_managers:
client_manager = self.client_managers[channel_id]
total_clients = client_manager.get_total_client_count()
else:
# This can happen during reconnection attempts or crashes
# Check Redis directly for any connected clients
if self.redis_client:
client_set_key = RedisKeys.clients(channel_id)
total_clients = self.redis_client.scard(client_set_key) or 0
if total_clients == 0:
logger.warning(f"Channel {channel_id} is missing client_manager but we're the owner with 0 clients - will trigger cleanup")
# Log client count periodically
if time.time() % 30 < 1: # Every ~30 seconds
@ -1086,14 +1107,30 @@ class ProxyServer:
continue
# Check for local client count - if zero, clean up our local resources
if self.client_managers[channel_id].get_client_count() == 0:
# We're not the owner, and we have no local clients - clean up our resources
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no local clients, cleaning up local resources")
if channel_id in self.client_managers:
if self.client_managers[channel_id].get_client_count() == 0:
# We're not the owner, and we have no local clients - clean up our resources
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no local clients, cleaning up local resources")
self._cleanup_local_resources(channel_id)
else:
# This shouldn't happen, but clean up anyway
logger.warning(f"Non-owner cleanup: Channel {channel_id} has no client_manager entry, cleaning up local resources")
self._cleanup_local_resources(channel_id)
except Exception as e:
logger.error(f"Error in cleanup thread: {e}", exc_info=True)
# Periodically check for orphaned channels (every 30 seconds)
if hasattr(self, '_last_orphan_check'):
if time.time() - self._last_orphan_check > 30:
try:
self._check_orphaned_metadata()
self._last_orphan_check = time.time()
except Exception as orphan_error:
logger.error(f"Error checking orphaned metadata: {orphan_error}", exc_info=True)
else:
self._last_orphan_check = time.time()
gevent.sleep(ConfigHelper.cleanup_check_interval()) # REPLACE: time.sleep(ConfigHelper.cleanup_check_interval())
thread = threading.Thread(target=cleanup_task, daemon=True)
@ -1115,10 +1152,6 @@ class ProxyServer:
try:
channel_id = key.decode('utf-8').split(':')[2]
# Skip channels we already have locally
if channel_id in self.stream_buffers:
continue
# Check if this channel has an owner
owner = self.get_channel_owner(channel_id)
@ -1133,13 +1166,84 @@ class ProxyServer:
else:
# Orphaned channel with no clients - clean it up
logger.info(f"Cleaning up orphaned channel {channel_id}")
self._clean_redis_keys(channel_id)
# If we have it locally, stop it properly to clean up processes
if channel_id in self.stream_managers or channel_id in self.client_managers:
logger.info(f"Orphaned channel {channel_id} is local - calling stop_channel")
self.stop_channel(channel_id)
else:
# Just clean up Redis keys for remote channels
self._clean_redis_keys(channel_id)
except Exception as e:
logger.error(f"Error processing channel key {key}: {e}")
except Exception as e:
logger.error(f"Error checking orphaned channels: {e}")
def _check_orphaned_metadata(self):
"""
Check for metadata entries that have no owner and no clients.
This catches zombie channels that weren't cleaned up properly.
"""
if not self.redis_client:
return
try:
# Get all channel metadata keys
channel_pattern = "ts_proxy:channel:*:metadata"
channel_keys = self.redis_client.keys(channel_pattern)
for key in channel_keys:
try:
channel_id = key.decode('utf-8').split(':')[2]
# Get metadata first
metadata = self.redis_client.hgetall(key)
if not metadata:
# Empty metadata - clean it up
logger.warning(f"Found empty metadata for channel {channel_id} - cleaning up")
# If we have it locally, stop it properly
if channel_id in self.stream_managers or channel_id in self.client_managers:
self.stop_channel(channel_id)
else:
self._clean_redis_keys(channel_id)
continue
# Get owner
owner = metadata.get(b'owner', b'').decode('utf-8') if b'owner' in metadata else ''
# Check if owner is still alive
owner_alive = False
if owner:
owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat"
owner_alive = self.redis_client.exists(owner_heartbeat_key)
# Check client count
client_set_key = RedisKeys.clients(channel_id)
client_count = self.redis_client.scard(client_set_key) or 0
# If no owner and no clients, clean it up
if not owner_alive and client_count == 0:
state = metadata.get(b'state', b'unknown').decode('utf-8') if b'state' in metadata else 'unknown'
logger.warning(f"Found orphaned metadata for channel {channel_id} (state: {state}, owner: {owner}, clients: {client_count}) - cleaning up")
# If we have it locally, stop it properly to clean up transcode/proxy processes
if channel_id in self.stream_managers or channel_id in self.client_managers:
logger.info(f"Channel {channel_id} is local - calling stop_channel to clean up processes")
self.stop_channel(channel_id)
else:
# Just clean up Redis keys for remote channels
self._clean_redis_keys(channel_id)
elif not owner_alive and client_count > 0:
# Owner is gone but clients remain - just log for now
logger.warning(f"Found orphaned channel {channel_id} with {client_count} clients but no owner - may need ownership takeover")
except Exception as e:
logger.error(f"Error processing metadata key {key}: {e}", exc_info=True)
except Exception as e:
logger.error(f"Error checking orphaned metadata: {e}", exc_info=True)
def _clean_redis_keys(self, channel_id):
"""Clean up all Redis keys for a channel more efficiently"""
# Release the channel, stream, and profile keys from the channel