Better connection tracking for apps that do not reuse connections during seeking operations.

This commit is contained in:
SergeantPanda 2025-09-26 09:10:34 -05:00
parent 7bb4df78c8
commit 70f7484fb5

View file

@ -540,11 +540,9 @@ class RedisBackedVODConnection:
}
return {}
def cleanup(self, connection_manager=None):
"""Clean up local resources and Redis state"""
# Get connection state before cleanup to handle profile decrementing
state = self._get_connection_state()
def cleanup(self, connection_manager=None, current_worker_id=None):
"""Smart cleanup based on worker ownership and active streams"""
# Always clean up local resources first
if self.local_response:
self.local_response.close()
self.local_response = None
@ -552,38 +550,72 @@ class RedisBackedVODConnection:
self.local_session.close()
self.local_session = None
# Remove from Redis
if self.redis_client:
try:
# Use pipeline for atomic cleanup operations
pipe = self.redis_client.pipeline()
# Get current connection state to check ownership and active streams
state = self._get_connection_state()
# 1. Remove main connection state (now contains consolidated data)
pipe.delete(self.connection_key)
if not state:
logger.info(f"[{self.session_id}] No connection state found - local cleanup only")
return
# 2. Remove distributed lock
pipe.delete(self.lock_key)
# Check if there are active streams
if state.active_streams > 0:
# There are active streams - check ownership
if current_worker_id and state.worker_id == current_worker_id:
logger.info(f"[{self.session_id}] Active streams present ({state.active_streams}) and we own them - local cleanup only")
else:
logger.info(f"[{self.session_id}] Active streams present ({state.active_streams}) but owned by worker {state.worker_id} - local cleanup only")
return
# Execute all cleanup operations
pipe.execute()
# No active streams - we can clean up Redis state
if not self.redis_client:
logger.info(f"[{self.session_id}] No Redis client - local cleanup only")
return
logger.info(f"[{self.session_id}] Cleaned up all Redis keys (consolidated connection state, locks)")
# Acquire lock and do final check before cleanup to prevent race conditions
if not self._acquire_lock():
logger.warning(f"[{self.session_id}] Could not acquire lock for cleanup - skipping")
return
# Decrement profile connections if we have the state and connection manager
if state and state.m3u_profile_id and connection_manager:
logger.info(f"[{self.session_id}] Decrementing profile connection count for profile {state.m3u_profile_id}")
connection_manager._decrement_profile_connections(state.m3u_profile_id)
logger.info(f"[{self.session_id}] Profile connection count decremented for profile {state.m3u_profile_id}")
else:
if not state:
logger.warning(f"[{self.session_id}] No connection state found during cleanup - cannot decrement profile connections")
elif not state.m3u_profile_id:
logger.warning(f"[{self.session_id}] No profile ID in connection state - cannot decrement profile connections")
elif not connection_manager:
logger.warning(f"[{self.session_id}] No connection manager provided - cannot decrement profile connections")
try:
# Re-check active streams with lock held to prevent race conditions
current_state = self._get_connection_state()
if not current_state:
logger.info(f"[{self.session_id}] Connection state no longer exists - cleanup already done")
return
except Exception as e:
logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}")
if current_state.active_streams > 0:
logger.info(f"[{self.session_id}] Active streams now present ({current_state.active_streams}) - skipping cleanup")
return
# Use pipeline for atomic cleanup operations
pipe = self.redis_client.pipeline()
# 1. Remove main connection state (contains consolidated data)
pipe.delete(self.connection_key)
# 2. Remove distributed lock (will be released below anyway)
pipe.delete(self.lock_key)
# Execute all cleanup operations
pipe.execute()
logger.info(f"[{self.session_id}] Cleaned up Redis keys (verified no active streams)")
# Decrement profile connections if we have the state and connection manager
if state.m3u_profile_id and connection_manager:
connection_manager._decrement_profile_connections(state.m3u_profile_id)
logger.info(f"[{self.session_id}] Profile connection count decremented for profile {state.m3u_profile_id}")
else:
if not state.m3u_profile_id:
logger.warning(f"[{self.session_id}] No profile ID in connection state - cannot decrement profile connections")
elif not connection_manager:
logger.warning(f"[{self.session_id}] No connection manager provided - cannot decrement profile connections")
except Exception as e:
logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}")
finally:
# Always release the lock
self._release_lock()
# Modify the VODConnectionManager to use Redis-backed connections
@ -694,6 +726,15 @@ class MultiWorkerVODConnectionManager:
logger.info(f"[{client_id}] Worker {self.worker_id} - Found matching idle session: {matching_session_id}")
effective_session_id = matching_session_id
client_id = matching_session_id # Update client_id for logging consistency
# IMMEDIATELY reserve this session by incrementing active streams to prevent cleanup
temp_connection = RedisBackedVODConnection(effective_session_id, self.redis_client)
if temp_connection.increment_active_streams():
logger.info(f"[{client_id}] Reserved idle session - incremented active streams")
else:
logger.warning(f"[{client_id}] Failed to reserve idle session - falling back to new session")
effective_session_id = session_id
matching_session_id = None # Clear the match so we create a new connection
else:
logger.info(f"[{client_id}] Worker {self.worker_id} - No matching idle session found, using new session")
effective_session_id = session_id
@ -761,14 +802,20 @@ class MultiWorkerVODConnectionManager:
else:
logger.info(f"[{client_id}] Worker {self.worker_id} - Using existing Redis-backed connection")
# Update session activity in consolidated connection state
# Transfer ownership to current worker and update session activity
if redis_connection._acquire_lock():
try:
state = redis_connection._get_connection_state()
if state:
old_worker = state.worker_id
state.last_activity = time.time()
state.worker_id = self.worker_id # Track which worker last accessed this
state.worker_id = self.worker_id # Transfer ownership to current worker
redis_connection._save_connection_state(state)
if old_worker != self.worker_id:
logger.info(f"[{client_id}] Ownership transferred from worker {old_worker} to {self.worker_id}")
else:
logger.debug(f"[{client_id}] Worker {self.worker_id} retaining ownership")
finally:
redis_connection._release_lock()
@ -788,8 +835,13 @@ class MultiWorkerVODConnectionManager:
try:
logger.info(f"[{client_id}] Worker {self.worker_id} - Starting Redis-backed stream")
# Increment active streams
redis_connection.increment_active_streams()
# Increment active streams (unless we already did it for session reuse)
if not matching_session_id:
# New session - increment active streams
redis_connection.increment_active_streams()
else:
# Reused session - we already incremented when reserving the session
logger.debug(f"[{client_id}] Using pre-reserved session - active streams already incremented")
bytes_sent = 0
chunk_count = 0
@ -819,13 +871,13 @@ class MultiWorkerVODConnectionManager:
redis_connection.decrement_active_streams()
decremented = True
# Schedule cleanup if no active streams after normal completion
# Schedule smart cleanup if no active streams after normal completion
if not redis_connection.has_active_streams():
def delayed_cleanup():
time.sleep(1) # Wait 1 second
if not redis_connection.has_active_streams():
logger.info(f"[{client_id}] Worker {self.worker_id} - Cleaning up idle Redis connection after normal completion")
redis_connection.cleanup(connection_manager=self)
# Smart cleanup: check active streams and ownership
logger.info(f"[{client_id}] Worker {self.worker_id} - Checking for smart cleanup after normal completion")
redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id)
import threading
cleanup_thread = threading.Thread(target=delayed_cleanup)
@ -838,13 +890,13 @@ class MultiWorkerVODConnectionManager:
redis_connection.decrement_active_streams()
decremented = True
# Schedule cleanup if no active streams
# Schedule smart cleanup if no active streams
if not redis_connection.has_active_streams():
def delayed_cleanup():
time.sleep(1) # Wait 1 second
if not redis_connection.has_active_streams():
logger.info(f"[{client_id}] Worker {self.worker_id} - Cleaning up idle Redis connection")
redis_connection.cleanup(connection_manager=self)
# Smart cleanup: check active streams and ownership
logger.info(f"[{client_id}] Worker {self.worker_id} - Checking for smart cleanup after client disconnect")
redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id)
import threading
cleanup_thread = threading.Thread(target=delayed_cleanup)
@ -856,7 +908,8 @@ class MultiWorkerVODConnectionManager:
if not decremented:
redis_connection.decrement_active_streams()
decremented = True
redis_connection.cleanup(connection_manager=self)
# Smart cleanup on error - immediate cleanup since we're in error state
redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id)
yield b"Error: Stream interrupted"
finally: