diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index cca827a9..61dac1e3 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -703,9 +703,10 @@ class ProxyServer: state = metadata.get(b'state', b'unknown').decode('utf-8') owner = metadata.get(b'owner', b'').decode('utf-8') - # States that indicate the channel is running properly + # States that indicate the channel is running properly or shutting down valid_states = [ChannelState.ACTIVE, ChannelState.WAITING_FOR_CLIENTS, - ChannelState.CONNECTING, ChannelState.BUFFERING, ChannelState.INITIALIZING] + ChannelState.CONNECTING, ChannelState.BUFFERING, ChannelState.INITIALIZING, + ChannelState.STOPPING] # If the channel is in a valid state, check if the owner is still active if state in valid_states: @@ -720,10 +721,10 @@ class ProxyServer: logger.warning(f"Detected zombie channel {channel_id} - owner {owner} is no longer active") self._clean_zombie_channel(channel_id, metadata) return False - elif state in [ChannelState.STOPPING, ChannelState.STOPPED, ChannelState.ERROR]: - # These states indicate the channel should be reinitialized - logger.info(f"Channel {channel_id} exists but in terminal state: {state}") - return True + elif state in [ChannelState.STOPPED, ChannelState.ERROR]: + # These terminal states indicate the channel should be cleaned up and reinitialized + logger.info(f"Channel {channel_id} in terminal state {state} - returning False to trigger cleanup") + return False else: # Unknown or initializing state, check how long it's been in this state if b'state_changed_at' in metadata: diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index c1b803ab..91f254a7 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -84,11 +84,18 @@ def stream_ts(request, channel_id): if state_field in metadata: channel_state = metadata[state_field].decode("utf-8") - if channel_state: - # Channel is being initialized or already active - no need for reinitialization + # Active/running states - channel is operational, don't reinitialize + if channel_state in [ + ChannelState.ACTIVE, + ChannelState.WAITING_FOR_CLIENTS, + ChannelState.BUFFERING, + ChannelState.INITIALIZING, + ChannelState.CONNECTING, + ChannelState.STOPPING, + ]: needs_initialization = False logger.debug( - f"[{client_id}] Channel {channel_id} already in state {channel_state}, skipping initialization" + f"[{client_id}] Channel {channel_id} in state {channel_state}, skipping initialization" ) # Special handling for initializing/connecting states @@ -98,19 +105,34 @@ def stream_ts(request, channel_id): ]: channel_initializing = True logger.debug( - f"[{client_id}] Channel {channel_id} is still initializing, client will wait for completion" + f"[{client_id}] Channel {channel_id} is still initializing, client will wait" ) + # Terminal states - channel needs cleanup before reinitialization + elif channel_state in [ + ChannelState.ERROR, + ChannelState.STOPPED, + ]: + needs_initialization = True + logger.info( + f"[{client_id}] Channel {channel_id} in terminal state {channel_state}, will reinitialize" + ) + # Unknown/empty state - check if owner is alive else: - # Only check for owner if channel is in a valid state owner_field = ChannelMetadataField.OWNER.encode("utf-8") if owner_field in metadata: owner = metadata[owner_field].decode("utf-8") owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat" if proxy_server.redis_client.exists(owner_heartbeat_key): - # Owner is still active, so we don't need to reinitialize + # Owner is still active with unknown state - don't reinitialize needs_initialization = False logger.debug( - f"[{client_id}] Channel {channel_id} has active owner {owner}" + f"[{client_id}] Channel {channel_id} has active owner {owner}, skipping init" + ) + else: + # Owner dead - needs reinitialization + needs_initialization = True + logger.warning( + f"[{client_id}] Channel {channel_id} owner {owner} is dead, will reinitialize" ) # Start initialization if needed