diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index a4a89586..d4f95570 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -125,7 +125,8 @@ class StreamManager: if "'NoneType' object has no attribute 'read'" in str(e): logger.warning(f"Connection closed by server (read {chunk_count} chunks before disconnect)") else: - # Re-raise unexpected AttributeError + # Re-raise unexpected AttributeErrors + logger.error(f"Unexpected AttributeError: {e}") raise else: logger.error(f"Failed to connect to stream: HTTP {response.status_code}") @@ -235,7 +236,7 @@ class StreamManager: while self.running: try: now = time.time() - if now - self.last_data_time > getattr(Config.CONNECTION_TIMEOUT,10) and self.connected: + if now - self.last_data_time > getattr(Config, 'CONNECTION_TIMEOUT', 10) and self.connected: # Mark unhealthy if no data for too long if self.healthy: logger.warning(f"Stream unhealthy - no data for {now - self.last_data_time:.1f}s") @@ -595,22 +596,18 @@ class ClientManager: # Check if client exists in Redis at all exists = self.redis_client.exists(client_key) if not exists: - # Client entry has expired in Redis but still in our local set - logger.warning(f"Found ghost client {client_id} - expired in Redis but still in local set") + logger.debug(f"Client {client_id} no longer exists in Redis, removing locally") clients_to_remove.add(client_id) continue - + # Check for stale activity using last_active field last_active = self.redis_client.hget(client_key, "last_active") if last_active: last_active_time = float(last_active.decode('utf-8')) - time_since_activity = current_time - last_active_time + ghost_timeout = self.heartbeat_interval * getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0) - # If client hasn't been active for too long, mark for removal - # Use configurable threshold for detection - ghost_threshold = getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0) - if time_since_activity > self.heartbeat_interval * ghost_threshold: - logger.warning(f"Detected ghost client {client_id} - last active {time_since_activity:.1f}s ago") + if current_time - last_active_time > ghost_timeout: + logger.debug(f"Client {client_id} inactive for {current_time - last_active_time:.1f}s, removing as ghost") clients_to_remove.add(client_id) # Remove ghost clients in a separate step @@ -631,8 +628,8 @@ class ClientManager: # Skip if we just sent a heartbeat recently if client_id in self.last_heartbeat_time: - time_since_last = current_time - self.last_heartbeat_time[client_id] - if time_since_last < self.heartbeat_interval * 0.8: + time_since_heartbeat = current_time - self.last_heartbeat_time[client_id] + if time_since_heartbeat < self.heartbeat_interval * 0.5: # Only heartbeat at half interval minimum continue # Only update clients that remain @@ -1487,21 +1484,37 @@ class ProxyServer: logger.error(f"Error checking orphaned channels: {e}") def _clean_redis_keys(self, channel_id): - """Clean up all Redis keys for a channel""" + """Clean up all Redis keys for a channel more efficiently""" if not self.redis_client: - return + return 0 try: - # All keys are now under the channel namespace for easy pattern matching - channel_pattern = f"ts_proxy:channel:{channel_id}:*" - all_keys = self.redis_client.keys(channel_pattern) + # Define key patterns to scan for + patterns = [ + f"ts_proxy:channel:{channel_id}:*", # All channel keys + f"ts_proxy:events:{channel_id}" # Event channel + ] + + total_deleted = 0 + + for pattern in patterns: + cursor = 0 + while True: + cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100) + if keys: + self.redis_client.delete(*keys) + total_deleted += len(keys) + + # Exit when cursor returns to 0 + if cursor == 0: + break + + logger.info(f"Cleaned up {total_deleted} Redis keys for channel {channel_id}") + return total_deleted - if all_keys: - # Delete all matching keys in bulk - self.redis_client.delete(*all_keys) - logger.info(f"Cleaned up {len(all_keys)} Redis keys for channel {channel_id}") except Exception as e: logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}") + return 0 def refresh_channel_registry(self): """Refresh TTL for active channels using standard keys"""