diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index c0422126..097419a4 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -211,6 +211,17 @@ class ProxyServer: f"ts_proxy:events:{channel_id}", json.dumps(stop_response) ) + elif event_type == "client_stop": + client_id = data.get("client_id") + if client_id and channel_id: + logger.info(f"Received request to stop client {client_id} on channel {channel_id}") + + # Check if we have this client locally + if (channel_id in self.client_managers and + client_id in self.client_managers[channel_id].clients): + # Use the existing remove_client method to properly clean up + self.client_managers[channel_id].remove_client(client_id) + logger.info(f"Stopped client {client_id} on channel {channel_id} based on event") except Exception as e: logger.error(f"Error processing event message: {e}") except Exception as e: diff --git a/apps/proxy/ts_proxy/urls.py b/apps/proxy/ts_proxy/urls.py index bbba4ed4..cba06b6f 100644 --- a/apps/proxy/ts_proxy/urls.py +++ b/apps/proxy/ts_proxy/urls.py @@ -9,4 +9,5 @@ urlpatterns = [ path('status', views.channel_status, name='channel_status'), path('status/', views.channel_status, name='channel_status_detail'), path('stop/', views.stop_channel, name='stop_channel'), + path('stop_client/', views.stop_client, name='stop_client'), ] diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index b926b369..52a1f6b9 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -683,4 +683,67 @@ def stop_channel(request, channel_id): except Exception as e: logger.error(f"Failed to stop channel: {e}", exc_info=True) + return JsonResponse({'error': str(e)}, status=500) + +@csrf_exempt +@require_http_methods(["POST"]) +def stop_client(request, channel_id): + """Stop a specific client connection using existing client management""" + try: + # Parse request body to get client ID + data = json.loads(request.body) + client_id = data.get('client_id') + + if not client_id: + return JsonResponse({'error': 'No client_id provided'}, status=400) + + logger.info(f"Request to stop client {client_id} on channel {channel_id}") + + # Check if channel exists + channel_exists = proxy_server.check_if_channel_exists(channel_id) + if not channel_exists: + logger.warning(f"Channel {channel_id} not found") + return JsonResponse({'error': 'Channel not found'}, status=404) + + # Two-part approach: + # 1. Handle locally if client is on this worker + # 2. Use events to inform other workers if needed + + local_client_stopped = False + if channel_id in proxy_server.client_managers: + client_manager = proxy_server.client_managers[channel_id] + # Use the existing remove_client method directly + with client_manager.lock: + if client_id in client_manager.clients: + client_manager.remove_client(client_id) + local_client_stopped = True + logger.info(f"Client {client_id} stopped locally on channel {channel_id}") + + # If client wasn't found locally, broadcast stop event for other workers + if not local_client_stopped and proxy_server.redis_client: + stop_request = { + "event": "client_stop", + "channel_id": channel_id, + "client_id": client_id, + "requester_worker_id": proxy_server.worker_id, + "timestamp": time.time() + } + + proxy_server.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(stop_request) + ) + logger.info(f"Published stop request for client {client_id} on channel {channel_id}") + + return JsonResponse({ + 'message': 'Client stop request processed', + 'channel_id': channel_id, + 'client_id': client_id, + 'locally_processed': local_client_stopped + }) + + except json.JSONDecodeError: + return JsonResponse({'error': 'Invalid JSON'}, status=400) + except Exception as e: + logger.error(f"Failed to stop client: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) \ No newline at end of file