Individual client stopping is fully working now.

This commit is contained in:
SergeantPanda 2025-03-16 18:11:35 -05:00
parent 03bfaeb24f
commit 8bb9317f92
2 changed files with 35 additions and 9 deletions

View file

@ -216,12 +216,18 @@ class ProxyServer:
if client_id and channel_id:
logger.info(f"Received request to stop client {client_id} on channel {channel_id}")
# Check if we have this client locally
if (channel_id in self.client_managers and
client_id in self.client_managers[channel_id].clients):
# Use the existing remove_client method to properly clean up
self.client_managers[channel_id].remove_client(client_id)
logger.info(f"Stopped client {client_id} on channel {channel_id} based on event")
# Both remove from client manager AND set a key for the generator to detect
if channel_id in self.client_managers:
client_manager = self.client_managers[channel_id]
if client_id in client_manager.clients:
client_manager.remove_client(client_id)
logger.info(f"Removed client {client_id} from client manager")
# Set a Redis key for the generator to detect
if self.redis_client:
stop_key = f"ts_proxy:channel:{channel_id}:client:{client_id}:stop"
self.redis_client.setex(stop_key, 30, "true") # 30 second TTL
logger.info(f"Set stop key for client {client_id}")
except Exception as e:
logger.error(f"Error processing event message: {e}")
except Exception as e:

View file

@ -116,7 +116,7 @@ def stream_ts(request, channel_id):
# Register client - can do this regardless of initialization state
# Create local resources if needed
if channel_id not in proxy_server.stream_buffers or channel_id not in proxy_server.client_managers:
logger.warning(f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now")
logger.debug(f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now")
# Get URL from Redis metadata
url = None
@ -280,13 +280,27 @@ def stream_ts(request, channel_id):
if channel_id not in proxy_server.client_managers:
logger.info(f"[{client_id}] Client manager no longer exists, terminating stream")
break
# Keep the existing stopping flag check
# Check if this specific client has been stopped
if proxy_server.redis_client:
# Channel stop check
stop_key = f"ts_proxy:channel:{channel_id}:stopping"
if proxy_server.redis_client.exists(stop_key):
logger.info(f"[{client_id}] Detected channel stop signal, terminating stream")
break
# Client stop check - NEW
client_stop_key = f"ts_proxy:channel:{channel_id}:client:{client_id}:stop"
if proxy_server.redis_client.exists(client_stop_key):
logger.info(f"[{client_id}] Detected client stop signal, terminating stream")
break
# Also check if client has been removed from client_manager
if channel_id in proxy_server.client_managers:
client_manager = proxy_server.client_managers[channel_id]
if client_id not in client_manager.clients:
logger.info(f"[{client_id}] Client no longer in client manager, terminating stream")
break
# Get chunks at client's position using improved strategy
chunks, next_index = buffer.get_optimized_client_data(local_index)
@ -699,6 +713,12 @@ def stop_client(request, channel_id):
logger.info(f"Request to stop client {client_id} on channel {channel_id}")
# Set a Redis key for the generator to detect regardless of whether client is local
if proxy_server.redis_client:
stop_key = f"ts_proxy:channel:{channel_id}:client:{client_id}:stop"
proxy_server.redis_client.setex(stop_key, 30, "true") # 30 second TTL
logger.info(f"Set stop key for client {client_id}")
# Check if channel exists
channel_exists = proxy_server.check_if_channel_exists(channel_id)
if not channel_exists: