From 8bb9317f92da206f28e76e4f189fa17497a25e57 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sun, 16 Mar 2025 18:11:35 -0500 Subject: [PATCH] Individual client stopping is fully working now. --- apps/proxy/ts_proxy/server.py | 18 ++++++++++++------ apps/proxy/ts_proxy/views.py | 26 +++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 097419a4..5dc4a6b5 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -216,12 +216,18 @@ class ProxyServer: if client_id and channel_id: logger.info(f"Received request to stop client {client_id} on channel {channel_id}") - # Check if we have this client locally - if (channel_id in self.client_managers and - client_id in self.client_managers[channel_id].clients): - # Use the existing remove_client method to properly clean up - self.client_managers[channel_id].remove_client(client_id) - logger.info(f"Stopped client {client_id} on channel {channel_id} based on event") + # Both remove from client manager AND set a key for the generator to detect + if channel_id in self.client_managers: + client_manager = self.client_managers[channel_id] + if client_id in client_manager.clients: + client_manager.remove_client(client_id) + logger.info(f"Removed client {client_id} from client manager") + + # Set a Redis key for the generator to detect + if self.redis_client: + stop_key = f"ts_proxy:channel:{channel_id}:client:{client_id}:stop" + self.redis_client.setex(stop_key, 30, "true") # 30 second TTL + logger.info(f"Set stop key for client {client_id}") except Exception as e: logger.error(f"Error processing event message: {e}") except Exception as e: diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 52a1f6b9..472fa190 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -116,7 +116,7 @@ def stream_ts(request, channel_id): # Register client - can do this regardless of initialization state # Create local resources if needed if channel_id not in proxy_server.stream_buffers or channel_id not in proxy_server.client_managers: - logger.warning(f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now") + logger.debug(f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now") # Get URL from Redis metadata url = None @@ -280,13 +280,27 @@ def stream_ts(request, channel_id): if channel_id not in proxy_server.client_managers: logger.info(f"[{client_id}] Client manager no longer exists, terminating stream") break - - # Keep the existing stopping flag check + + # Check if this specific client has been stopped if proxy_server.redis_client: + # Channel stop check stop_key = f"ts_proxy:channel:{channel_id}:stopping" if proxy_server.redis_client.exists(stop_key): logger.info(f"[{client_id}] Detected channel stop signal, terminating stream") break + + # Client stop check - NEW + client_stop_key = f"ts_proxy:channel:{channel_id}:client:{client_id}:stop" + if proxy_server.redis_client.exists(client_stop_key): + logger.info(f"[{client_id}] Detected client stop signal, terminating stream") + break + + # Also check if client has been removed from client_manager + if channel_id in proxy_server.client_managers: + client_manager = proxy_server.client_managers[channel_id] + if client_id not in client_manager.clients: + logger.info(f"[{client_id}] Client no longer in client manager, terminating stream") + break # Get chunks at client's position using improved strategy chunks, next_index = buffer.get_optimized_client_data(local_index) @@ -699,6 +713,12 @@ def stop_client(request, channel_id): logger.info(f"Request to stop client {client_id} on channel {channel_id}") + # Set a Redis key for the generator to detect regardless of whether client is local + if proxy_server.redis_client: + stop_key = f"ts_proxy:channel:{channel_id}:client:{client_id}:stop" + proxy_server.redis_client.setex(stop_key, 30, "true") # 30 second TTL + logger.info(f"Set stop key for client {client_id}") + # Check if channel exists channel_exists = proxy_server.check_if_channel_exists(channel_id) if not channel_exists: