From 083eb264e6b6465ecea6987e302d49b3363a2696 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Tue, 19 Aug 2025 17:45:09 -0500 Subject: [PATCH] Properly track m3u profile connections. --- .../multi_worker_connection_manager.py | 86 ++++++++++++++++--- 1 file changed, 76 insertions(+), 10 deletions(-) diff --git a/apps/proxy/vod_proxy/multi_worker_connection_manager.py b/apps/proxy/vod_proxy/multi_worker_connection_manager.py index 1e7c3e68..c510c8a4 100644 --- a/apps/proxy/vod_proxy/multi_worker_connection_manager.py +++ b/apps/proxy/vod_proxy/multi_worker_connection_manager.py @@ -25,13 +25,14 @@ class SerializableConnectionState: def __init__(self, session_id: str, stream_url: str, headers: dict, content_length: str = None, content_type: str = 'video/mp4', - final_url: str = None): + final_url: str = None, m3u_profile_id: int = None): self.session_id = session_id self.stream_url = stream_url self.headers = headers self.content_length = content_length self.content_type = content_type self.final_url = final_url + self.m3u_profile_id = m3u_profile_id # Store M3U profile ID for connection counting self.last_activity = time.time() self.request_count = 0 self.active_streams = 0 @@ -45,6 +46,7 @@ class SerializableConnectionState: 'content_length': str(self.content_length) if self.content_length is not None else '', 'content_type': self.content_type or 'video/mp4', 'final_url': self.final_url or '', + 'm3u_profile_id': str(self.m3u_profile_id) if self.m3u_profile_id is not None else '', 'last_activity': str(self.last_activity), 'request_count': str(self.request_count), 'active_streams': str(self.active_streams) @@ -59,7 +61,8 @@ class SerializableConnectionState: headers=json.loads(data['headers']) if data['headers'] else {}, content_length=data.get('content_length') if data.get('content_length') else None, content_type=data.get('content_type', 'video/mp4'), - final_url=data.get('final_url') if data.get('final_url') else None + final_url=data.get('final_url') if data.get('final_url') else None, + m3u_profile_id=int(data.get('m3u_profile_id')) if data.get('m3u_profile_id') else None ) obj.last_activity = float(data.get('last_activity', time.time())) obj.request_count = int(data.get('request_count', 0)) @@ -141,7 +144,7 @@ class RedisBackedVODConnection: except Exception as e: logger.error(f"[{self.session_id}] Error releasing lock: {e}") - def create_connection(self, stream_url: str, headers: dict) -> bool: + def create_connection(self, stream_url: str, headers: dict, m3u_profile_id: int = None) -> bool: """Create a new connection state in Redis""" if not self._acquire_lock(): logger.warning(f"[{self.session_id}] Could not acquire lock for connection creation") @@ -155,7 +158,7 @@ class RedisBackedVODConnection: return True # Create new connection state - state = SerializableConnectionState(self.session_id, stream_url, headers) + state = SerializableConnectionState(self.session_id, stream_url, headers, m3u_profile_id=m3u_profile_id) success = self._save_connection_state(state) if success: @@ -322,8 +325,11 @@ class RedisBackedVODConnection: } return {} - def cleanup(self): + def cleanup(self, connection_manager=None): """Clean up local resources and Redis state""" + # Get connection state before cleanup to handle profile decrementing + state = self._get_connection_state() + if self.local_response: self.local_response.close() self.local_response = None @@ -337,6 +343,11 @@ class RedisBackedVODConnection: self.redis_client.delete(self.connection_key) self.redis_client.delete(self.lock_key) logger.info(f"[{self.session_id}] Cleaned up Redis connection state") + + # Decrement profile connections if we have the state and connection manager + if state and state.m3u_profile_id and connection_manager: + connection_manager._decrement_profile_connections(state.m3u_profile_id) + except Exception as e: logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}") @@ -372,6 +383,53 @@ class MultiWorkerVODConnectionManager: import random return f"worker-{random.randint(1000, 9999)}" + def _get_profile_connections_key(self, profile_id: int) -> str: + """Get Redis key for tracking connections per profile - STANDARDIZED with TS proxy""" + return f"profile_connections:{profile_id}" + + def _check_profile_limits(self, m3u_profile) -> bool: + """Check if profile has available connection slots""" + if m3u_profile.max_streams == 0: # Unlimited + return True + + try: + profile_connections_key = self._get_profile_connections_key(m3u_profile.id) + current_connections = int(self.redis_client.get(profile_connections_key) or 0) + + logger.info(f"[PROFILE-CHECK] Profile {m3u_profile.id} has {current_connections}/{m3u_profile.max_streams} connections") + return current_connections < m3u_profile.max_streams + + except Exception as e: + logger.error(f"Error checking profile limits: {e}") + return False + + def _increment_profile_connections(self, m3u_profile): + """Increment profile connection count""" + try: + profile_connections_key = self._get_profile_connections_key(m3u_profile.id) + new_count = self.redis_client.incr(profile_connections_key) + logger.info(f"[PROFILE-INCR] Profile {m3u_profile.id} connections: {new_count}") + return new_count + except Exception as e: + logger.error(f"Error incrementing profile connections: {e}") + return None + + def _decrement_profile_connections(self, m3u_profile_id: int): + """Decrement profile connection count""" + try: + profile_connections_key = self._get_profile_connections_key(m3u_profile_id) + current_count = int(self.redis_client.get(profile_connections_key) or 0) + if current_count > 0: + new_count = self.redis_client.decr(profile_connections_key) + logger.info(f"[PROFILE-DECR] Profile {m3u_profile_id} connections: {new_count}") + return new_count + else: + logger.warning(f"[PROFILE-DECR] Profile {m3u_profile_id} already at 0 connections") + return 0 + except Exception as e: + logger.error(f"Error decrementing profile connections: {e}") + return None + def stream_content_with_session(self, session_id, content_obj, stream_url, m3u_profile, client_ip, user_agent, request, utc_start=None, utc_end=None, offset=None, range_header=None): @@ -394,6 +452,11 @@ class MultiWorkerVODConnectionManager: if not existing_state: logger.info(f"[{client_id}] Worker {self.worker_id} - Creating new Redis-backed connection") + # Check profile limits before creating new connection + if not self._check_profile_limits(m3u_profile): + logger.warning(f"[{client_id}] Profile {m3u_profile.name} connection limit exceeded") + return HttpResponse("Connection limit exceeded for profile", status=429) + # Apply timeshift parameters modified_stream_url = self._apply_timeshift_parameters(stream_url, utc_start, utc_end, offset) @@ -418,10 +481,13 @@ class MultiWorkerVODConnectionManager: headers['X-Worker-ID'] = self.worker_id # Create connection state in Redis - if not redis_connection.create_connection(modified_stream_url, headers): + if not redis_connection.create_connection(modified_stream_url, headers, m3u_profile.id): logger.error(f"[{client_id}] Worker {self.worker_id} - Failed to create Redis connection") return HttpResponse("Failed to create connection", status=500) + # Increment profile connections after successful connection creation + self._increment_profile_connections(m3u_profile) + # Create session tracking session_info = { "content_type": content_type, @@ -510,7 +576,7 @@ class MultiWorkerVODConnectionManager: time.sleep(10) # Wait 10 seconds if not redis_connection.has_active_streams(): logger.info(f"[{client_id}] Worker {self.worker_id} - Cleaning up idle Redis connection") - redis_connection.cleanup() + redis_connection.cleanup(connection_manager=self) import threading cleanup_thread = threading.Thread(target=delayed_cleanup) @@ -522,7 +588,7 @@ class MultiWorkerVODConnectionManager: if not decremented: redis_connection.decrement_active_streams() decremented = True - redis_connection.cleanup() + redis_connection.cleanup(connection_manager=self) yield b"Error: Stream interrupted" finally: @@ -651,7 +717,7 @@ class MultiWorkerVODConnectionManager: logger.info(f"[{session_id}] Cleaning up Redis-backed persistent connection") redis_connection = RedisBackedVODConnection(session_id, self.redis_client) - redis_connection.cleanup() + redis_connection.cleanup(connection_manager=self) # Also clean up session data if self.redis_client: @@ -700,7 +766,7 @@ class MultiWorkerVODConnectionManager: # Clean up connection and related keys redis_connection = RedisBackedVODConnection(session_id, self.redis_client) - redis_connection.cleanup() + redis_connection.cleanup(connection_manager=self) cleanup_count += 1 except Exception as e: