Channel stop is working correclty.

This commit is contained in:
SergeantPanda 2025-03-16 12:35:06 -05:00
parent e47c3328e4
commit 9c6f31e014
2 changed files with 71 additions and 15 deletions

View file

@ -182,6 +182,35 @@ class ProxyServer:
f"ts_proxy:events:{channel_id}",
json.dumps(switch_result)
)
elif event_type == "channel_stop":
logger.info(f"Received channel stop event for channel {channel_id}")
# First mark channel as stopping in Redis
if self.redis_client:
# Set stopping state in metadata
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
if self.redis_client.exists(metadata_key):
self.redis_client.hset(metadata_key, mapping={
"state": "stopping",
"state_changed_at": str(time.time())
})
# If we have local resources for this channel, clean them up
if channel_id in self.stream_buffers or channel_id in self.client_managers:
# Use existing stop_channel method
logger.info(f"Stopping local resources for channel {channel_id}")
self.stop_channel(channel_id)
# Acknowledge stop by publishing a response
stop_response = {
"event": "channel_stopped",
"channel_id": channel_id,
"worker_id": self.worker_id,
"timestamp": time.time()
}
self.redis_client.publish(
f"ts_proxy:events:{channel_id}",
json.dumps(stop_response)
)
except Exception as e:
logger.error(f"Error processing event message: {e}")
except Exception as e:
@ -451,7 +480,13 @@ class ProxyServer:
"""Stop a channel with proper ownership handling"""
try:
logger.info(f"Stopping channel {channel_id}")
# First set a stopping key that clients will check
if self.redis_client:
stop_key = f"ts_proxy:channel:{channel_id}:stopping"
# Set with 60 second TTL - enough time for clients to notice
self.redis_client.setex(stop_key, 10, "true")
# Only stop the actual stream manager if we're the owner
if self.am_i_owner(channel_id):
logger.info(f"This worker ({self.worker_id}) is the owner - closing provider connection")

View file

@ -272,6 +272,13 @@ def stream_ts(request, channel_id):
# Main streaming loop
while True:
# Check if channel has been stopped
if proxy_server.redis_client:
stop_key = f"ts_proxy:channel:{channel_id}:stopping"
if proxy_server.redis_client.exists(stop_key):
logger.info(f"[{client_id}] Detected channel stop signal, terminating stream")
break # Exit loop immediately
# Get chunks at client's position using improved strategy
chunks, next_index = buffer.get_optimized_client_data(local_index)
@ -604,7 +611,7 @@ def channel_status(request, channel_id=None):
@csrf_exempt
@require_http_methods(["POST", "DELETE"])
def stop_channel(request, channel_id):
"""Stop a channel and release all associated resources"""
"""Stop a channel and release all associated resources using PubSub events"""
try:
logger.info(f"Request to stop channel {channel_id} received")
@ -626,8 +633,28 @@ def stop_channel(request, channel_id):
except Exception as e:
logger.error(f"Error fetching channel state: {e}")
# Stop the channel
result = proxy_server.stop_channel(channel_id)
# Broadcast stop event to all workers via PubSub
if proxy_server.redis_client:
stop_request = {
"event": "channel_stop",
"channel_id": channel_id,
"requester_worker_id": proxy_server.worker_id,
"timestamp": time.time()
}
# Publish the stop event
proxy_server.redis_client.publish(
f"ts_proxy:events:{channel_id}",
json.dumps(stop_request)
)
logger.info(f"Published channel stop event for {channel_id}")
# Also stop locally to ensure this worker cleans up right away
result = proxy_server.stop_channel(channel_id)
else:
# No Redis, just stop locally
result = proxy_server.stop_channel(channel_id)
# Release the channel in the channel model if applicable
try:
@ -639,17 +666,11 @@ def stop_channel(request, channel_id):
except Exception as e:
logger.error(f"Error releasing channel stream: {e}")
if result:
return JsonResponse({
'message': 'Channel stopped successfully',
'channel_id': channel_id,
'previous_state': channel_info
})
else:
return JsonResponse({
'error': 'Failed to stop channel',
'channel_id': channel_id
}, status=500)
return JsonResponse({
'message': 'Channel stop request sent',
'channel_id': channel_id,
'previous_state': channel_info
})
except Exception as e:
logger.error(f"Failed to stop channel: {e}", exc_info=True)