Continuing cleanup.

This commit is contained in:
SergeantPanda 2025-03-13 18:34:49 -05:00
parent d953386c57
commit 387c2491b7

View file

@ -125,7 +125,8 @@ class StreamManager:
if "'NoneType' object has no attribute 'read'" in str(e):
logger.warning(f"Connection closed by server (read {chunk_count} chunks before disconnect)")
else:
# Re-raise unexpected AttributeError
# Re-raise unexpected AttributeErrors
logger.error(f"Unexpected AttributeError: {e}")
raise
else:
logger.error(f"Failed to connect to stream: HTTP {response.status_code}")
@ -235,7 +236,7 @@ class StreamManager:
while self.running:
try:
now = time.time()
if now - self.last_data_time > getattr(Config.CONNECTION_TIMEOUT,10) and self.connected:
if now - self.last_data_time > getattr(Config, 'CONNECTION_TIMEOUT', 10) and self.connected:
# Mark unhealthy if no data for too long
if self.healthy:
logger.warning(f"Stream unhealthy - no data for {now - self.last_data_time:.1f}s")
@ -595,22 +596,18 @@ class ClientManager:
# Check if client exists in Redis at all
exists = self.redis_client.exists(client_key)
if not exists:
# Client entry has expired in Redis but still in our local set
logger.warning(f"Found ghost client {client_id} - expired in Redis but still in local set")
logger.debug(f"Client {client_id} no longer exists in Redis, removing locally")
clients_to_remove.add(client_id)
continue
# Check for stale activity using last_active field
last_active = self.redis_client.hget(client_key, "last_active")
if last_active:
last_active_time = float(last_active.decode('utf-8'))
time_since_activity = current_time - last_active_time
ghost_timeout = self.heartbeat_interval * getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0)
# If client hasn't been active for too long, mark for removal
# Use configurable threshold for detection
ghost_threshold = getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0)
if time_since_activity > self.heartbeat_interval * ghost_threshold:
logger.warning(f"Detected ghost client {client_id} - last active {time_since_activity:.1f}s ago")
if current_time - last_active_time > ghost_timeout:
logger.debug(f"Client {client_id} inactive for {current_time - last_active_time:.1f}s, removing as ghost")
clients_to_remove.add(client_id)
# Remove ghost clients in a separate step
@ -631,8 +628,8 @@ class ClientManager:
# Skip if we just sent a heartbeat recently
if client_id in self.last_heartbeat_time:
time_since_last = current_time - self.last_heartbeat_time[client_id]
if time_since_last < self.heartbeat_interval * 0.8:
time_since_heartbeat = current_time - self.last_heartbeat_time[client_id]
if time_since_heartbeat < self.heartbeat_interval * 0.5: # Only heartbeat at half interval minimum
continue
# Only update clients that remain
@ -1487,21 +1484,37 @@ class ProxyServer:
logger.error(f"Error checking orphaned channels: {e}")
def _clean_redis_keys(self, channel_id):
"""Clean up all Redis keys for a channel"""
"""Clean up all Redis keys for a channel more efficiently"""
if not self.redis_client:
return
return 0
try:
# All keys are now under the channel namespace for easy pattern matching
channel_pattern = f"ts_proxy:channel:{channel_id}:*"
all_keys = self.redis_client.keys(channel_pattern)
# Define key patterns to scan for
patterns = [
f"ts_proxy:channel:{channel_id}:*", # All channel keys
f"ts_proxy:events:{channel_id}" # Event channel
]
total_deleted = 0
for pattern in patterns:
cursor = 0
while True:
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
if keys:
self.redis_client.delete(*keys)
total_deleted += len(keys)
# Exit when cursor returns to 0
if cursor == 0:
break
logger.info(f"Cleaned up {total_deleted} Redis keys for channel {channel_id}")
return total_deleted
if all_keys:
# Delete all matching keys in bulk
self.redis_client.delete(*all_keys)
logger.info(f"Cleaned up {len(all_keys)} Redis keys for channel {channel_id}")
except Exception as e:
logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}")
return 0
def refresh_channel_registry(self):
"""Refresh TTL for active channels using standard keys"""