From 9c6f31e0149fc0dc63703dd69fa9af7660d1fdc1 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sun, 16 Mar 2025 12:35:06 -0500 Subject: [PATCH] Channel stop is working correclty. --- apps/proxy/ts_proxy/server.py | 37 +++++++++++++++++++++++++- apps/proxy/ts_proxy/views.py | 49 +++++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 1f54d0f9..de992096 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -182,6 +182,35 @@ class ProxyServer: f"ts_proxy:events:{channel_id}", json.dumps(switch_result) ) + elif event_type == "channel_stop": + logger.info(f"Received channel stop event for channel {channel_id}") + # First mark channel as stopping in Redis + if self.redis_client: + # Set stopping state in metadata + metadata_key = f"ts_proxy:channel:{channel_id}:metadata" + if self.redis_client.exists(metadata_key): + self.redis_client.hset(metadata_key, mapping={ + "state": "stopping", + "state_changed_at": str(time.time()) + }) + + # If we have local resources for this channel, clean them up + if channel_id in self.stream_buffers or channel_id in self.client_managers: + # Use existing stop_channel method + logger.info(f"Stopping local resources for channel {channel_id}") + self.stop_channel(channel_id) + + # Acknowledge stop by publishing a response + stop_response = { + "event": "channel_stopped", + "channel_id": channel_id, + "worker_id": self.worker_id, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(stop_response) + ) except Exception as e: logger.error(f"Error processing event message: {e}") except Exception as e: @@ -451,7 +480,13 @@ class ProxyServer: """Stop a channel with proper ownership handling""" try: logger.info(f"Stopping channel {channel_id}") - + + # First set a stopping key that clients will check + if self.redis_client: + stop_key = f"ts_proxy:channel:{channel_id}:stopping" + # Set with 60 second TTL - enough time for clients to notice + self.redis_client.setex(stop_key, 10, "true") + # Only stop the actual stream manager if we're the owner if self.am_i_owner(channel_id): logger.info(f"This worker ({self.worker_id}) is the owner - closing provider connection") diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 401828ce..e52416bb 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -272,6 +272,13 @@ def stream_ts(request, channel_id): # Main streaming loop while True: + # Check if channel has been stopped + if proxy_server.redis_client: + stop_key = f"ts_proxy:channel:{channel_id}:stopping" + if proxy_server.redis_client.exists(stop_key): + logger.info(f"[{client_id}] Detected channel stop signal, terminating stream") + break # Exit loop immediately + # Get chunks at client's position using improved strategy chunks, next_index = buffer.get_optimized_client_data(local_index) @@ -604,7 +611,7 @@ def channel_status(request, channel_id=None): @csrf_exempt @require_http_methods(["POST", "DELETE"]) def stop_channel(request, channel_id): - """Stop a channel and release all associated resources""" + """Stop a channel and release all associated resources using PubSub events""" try: logger.info(f"Request to stop channel {channel_id} received") @@ -626,8 +633,28 @@ def stop_channel(request, channel_id): except Exception as e: logger.error(f"Error fetching channel state: {e}") - # Stop the channel - result = proxy_server.stop_channel(channel_id) + # Broadcast stop event to all workers via PubSub + if proxy_server.redis_client: + stop_request = { + "event": "channel_stop", + "channel_id": channel_id, + "requester_worker_id": proxy_server.worker_id, + "timestamp": time.time() + } + + # Publish the stop event + proxy_server.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(stop_request) + ) + + logger.info(f"Published channel stop event for {channel_id}") + + # Also stop locally to ensure this worker cleans up right away + result = proxy_server.stop_channel(channel_id) + else: + # No Redis, just stop locally + result = proxy_server.stop_channel(channel_id) # Release the channel in the channel model if applicable try: @@ -639,17 +666,11 @@ def stop_channel(request, channel_id): except Exception as e: logger.error(f"Error releasing channel stream: {e}") - if result: - return JsonResponse({ - 'message': 'Channel stopped successfully', - 'channel_id': channel_id, - 'previous_state': channel_info - }) - else: - return JsonResponse({ - 'error': 'Failed to stop channel', - 'channel_id': channel_id - }, status=500) + return JsonResponse({ + 'message': 'Channel stop request sent', + 'channel_id': channel_id, + 'previous_state': channel_info + }) except Exception as e: logger.error(f"Failed to stop channel: {e}", exc_info=True)