From 70f7484fb56c415dbbcf027cd2ea6d2838ac175f Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 26 Sep 2025 09:10:34 -0500 Subject: [PATCH] Better connection tracking for apps that do not reuse connections during seeking operations. --- .../multi_worker_connection_manager.py | 141 ++++++++++++------ 1 file changed, 97 insertions(+), 44 deletions(-) diff --git a/apps/proxy/vod_proxy/multi_worker_connection_manager.py b/apps/proxy/vod_proxy/multi_worker_connection_manager.py index 7577d2af..fefc8739 100644 --- a/apps/proxy/vod_proxy/multi_worker_connection_manager.py +++ b/apps/proxy/vod_proxy/multi_worker_connection_manager.py @@ -540,11 +540,9 @@ class RedisBackedVODConnection: } return {} - def cleanup(self, connection_manager=None): - """Clean up local resources and Redis state""" - # Get connection state before cleanup to handle profile decrementing - state = self._get_connection_state() - + def cleanup(self, connection_manager=None, current_worker_id=None): + """Smart cleanup based on worker ownership and active streams""" + # Always clean up local resources first if self.local_response: self.local_response.close() self.local_response = None @@ -552,38 +550,72 @@ class RedisBackedVODConnection: self.local_session.close() self.local_session = None - # Remove from Redis - if self.redis_client: - try: - # Use pipeline for atomic cleanup operations - pipe = self.redis_client.pipeline() + # Get current connection state to check ownership and active streams + state = self._get_connection_state() - # 1. Remove main connection state (now contains consolidated data) - pipe.delete(self.connection_key) + if not state: + logger.info(f"[{self.session_id}] No connection state found - local cleanup only") + return - # 2. Remove distributed lock - pipe.delete(self.lock_key) + # Check if there are active streams + if state.active_streams > 0: + # There are active streams - check ownership + if current_worker_id and state.worker_id == current_worker_id: + logger.info(f"[{self.session_id}] Active streams present ({state.active_streams}) and we own them - local cleanup only") + else: + logger.info(f"[{self.session_id}] Active streams present ({state.active_streams}) but owned by worker {state.worker_id} - local cleanup only") + return - # Execute all cleanup operations - pipe.execute() + # No active streams - we can clean up Redis state + if not self.redis_client: + logger.info(f"[{self.session_id}] No Redis client - local cleanup only") + return - logger.info(f"[{self.session_id}] Cleaned up all Redis keys (consolidated connection state, locks)") + # Acquire lock and do final check before cleanup to prevent race conditions + if not self._acquire_lock(): + logger.warning(f"[{self.session_id}] Could not acquire lock for cleanup - skipping") + return - # Decrement profile connections if we have the state and connection manager - if state and state.m3u_profile_id and connection_manager: - logger.info(f"[{self.session_id}] Decrementing profile connection count for profile {state.m3u_profile_id}") - connection_manager._decrement_profile_connections(state.m3u_profile_id) - logger.info(f"[{self.session_id}] Profile connection count decremented for profile {state.m3u_profile_id}") - else: - if not state: - logger.warning(f"[{self.session_id}] No connection state found during cleanup - cannot decrement profile connections") - elif not state.m3u_profile_id: - logger.warning(f"[{self.session_id}] No profile ID in connection state - cannot decrement profile connections") - elif not connection_manager: - logger.warning(f"[{self.session_id}] No connection manager provided - cannot decrement profile connections") + try: + # Re-check active streams with lock held to prevent race conditions + current_state = self._get_connection_state() + if not current_state: + logger.info(f"[{self.session_id}] Connection state no longer exists - cleanup already done") + return - except Exception as e: - logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}") + if current_state.active_streams > 0: + logger.info(f"[{self.session_id}] Active streams now present ({current_state.active_streams}) - skipping cleanup") + return + + # Use pipeline for atomic cleanup operations + pipe = self.redis_client.pipeline() + + # 1. Remove main connection state (contains consolidated data) + pipe.delete(self.connection_key) + + # 2. Remove distributed lock (will be released below anyway) + pipe.delete(self.lock_key) + + # Execute all cleanup operations + pipe.execute() + + logger.info(f"[{self.session_id}] Cleaned up Redis keys (verified no active streams)") + + # Decrement profile connections if we have the state and connection manager + if state.m3u_profile_id and connection_manager: + connection_manager._decrement_profile_connections(state.m3u_profile_id) + logger.info(f"[{self.session_id}] Profile connection count decremented for profile {state.m3u_profile_id}") + else: + if not state.m3u_profile_id: + logger.warning(f"[{self.session_id}] No profile ID in connection state - cannot decrement profile connections") + elif not connection_manager: + logger.warning(f"[{self.session_id}] No connection manager provided - cannot decrement profile connections") + + except Exception as e: + logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}") + finally: + # Always release the lock + self._release_lock() # Modify the VODConnectionManager to use Redis-backed connections @@ -694,6 +726,15 @@ class MultiWorkerVODConnectionManager: logger.info(f"[{client_id}] Worker {self.worker_id} - Found matching idle session: {matching_session_id}") effective_session_id = matching_session_id client_id = matching_session_id # Update client_id for logging consistency + + # IMMEDIATELY reserve this session by incrementing active streams to prevent cleanup + temp_connection = RedisBackedVODConnection(effective_session_id, self.redis_client) + if temp_connection.increment_active_streams(): + logger.info(f"[{client_id}] Reserved idle session - incremented active streams") + else: + logger.warning(f"[{client_id}] Failed to reserve idle session - falling back to new session") + effective_session_id = session_id + matching_session_id = None # Clear the match so we create a new connection else: logger.info(f"[{client_id}] Worker {self.worker_id} - No matching idle session found, using new session") effective_session_id = session_id @@ -761,14 +802,20 @@ class MultiWorkerVODConnectionManager: else: logger.info(f"[{client_id}] Worker {self.worker_id} - Using existing Redis-backed connection") - # Update session activity in consolidated connection state + # Transfer ownership to current worker and update session activity if redis_connection._acquire_lock(): try: state = redis_connection._get_connection_state() if state: + old_worker = state.worker_id state.last_activity = time.time() - state.worker_id = self.worker_id # Track which worker last accessed this + state.worker_id = self.worker_id # Transfer ownership to current worker redis_connection._save_connection_state(state) + + if old_worker != self.worker_id: + logger.info(f"[{client_id}] Ownership transferred from worker {old_worker} to {self.worker_id}") + else: + logger.debug(f"[{client_id}] Worker {self.worker_id} retaining ownership") finally: redis_connection._release_lock() @@ -788,8 +835,13 @@ class MultiWorkerVODConnectionManager: try: logger.info(f"[{client_id}] Worker {self.worker_id} - Starting Redis-backed stream") - # Increment active streams - redis_connection.increment_active_streams() + # Increment active streams (unless we already did it for session reuse) + if not matching_session_id: + # New session - increment active streams + redis_connection.increment_active_streams() + else: + # Reused session - we already incremented when reserving the session + logger.debug(f"[{client_id}] Using pre-reserved session - active streams already incremented") bytes_sent = 0 chunk_count = 0 @@ -819,13 +871,13 @@ class MultiWorkerVODConnectionManager: redis_connection.decrement_active_streams() decremented = True - # Schedule cleanup if no active streams after normal completion + # Schedule smart cleanup if no active streams after normal completion if not redis_connection.has_active_streams(): def delayed_cleanup(): time.sleep(1) # Wait 1 second - if not redis_connection.has_active_streams(): - logger.info(f"[{client_id}] Worker {self.worker_id} - Cleaning up idle Redis connection after normal completion") - redis_connection.cleanup(connection_manager=self) + # Smart cleanup: check active streams and ownership + logger.info(f"[{client_id}] Worker {self.worker_id} - Checking for smart cleanup after normal completion") + redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id) import threading cleanup_thread = threading.Thread(target=delayed_cleanup) @@ -838,13 +890,13 @@ class MultiWorkerVODConnectionManager: redis_connection.decrement_active_streams() decremented = True - # Schedule cleanup if no active streams + # Schedule smart cleanup if no active streams if not redis_connection.has_active_streams(): def delayed_cleanup(): time.sleep(1) # Wait 1 second - if not redis_connection.has_active_streams(): - logger.info(f"[{client_id}] Worker {self.worker_id} - Cleaning up idle Redis connection") - redis_connection.cleanup(connection_manager=self) + # Smart cleanup: check active streams and ownership + logger.info(f"[{client_id}] Worker {self.worker_id} - Checking for smart cleanup after client disconnect") + redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id) import threading cleanup_thread = threading.Thread(target=delayed_cleanup) @@ -856,7 +908,8 @@ class MultiWorkerVODConnectionManager: if not decremented: redis_connection.decrement_active_streams() decremented = True - redis_connection.cleanup(connection_manager=self) + # Smart cleanup on error - immediate cleanup since we're in error state + redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id) yield b"Error: Stream interrupted" finally: