Properly track m3u profile connections.

This commit is contained in:
SergeantPanda 2025-08-19 17:45:09 -05:00
parent 97b82e5520
commit 083eb264e6

View file

@ -25,13 +25,14 @@ class SerializableConnectionState:
def __init__(self, session_id: str, stream_url: str, headers: dict,
content_length: str = None, content_type: str = 'video/mp4',
final_url: str = None):
final_url: str = None, m3u_profile_id: int = None):
self.session_id = session_id
self.stream_url = stream_url
self.headers = headers
self.content_length = content_length
self.content_type = content_type
self.final_url = final_url
self.m3u_profile_id = m3u_profile_id # Store M3U profile ID for connection counting
self.last_activity = time.time()
self.request_count = 0
self.active_streams = 0
@ -45,6 +46,7 @@ class SerializableConnectionState:
'content_length': str(self.content_length) if self.content_length is not None else '',
'content_type': self.content_type or 'video/mp4',
'final_url': self.final_url or '',
'm3u_profile_id': str(self.m3u_profile_id) if self.m3u_profile_id is not None else '',
'last_activity': str(self.last_activity),
'request_count': str(self.request_count),
'active_streams': str(self.active_streams)
@ -59,7 +61,8 @@ class SerializableConnectionState:
headers=json.loads(data['headers']) if data['headers'] else {},
content_length=data.get('content_length') if data.get('content_length') else None,
content_type=data.get('content_type', 'video/mp4'),
final_url=data.get('final_url') if data.get('final_url') else None
final_url=data.get('final_url') if data.get('final_url') else None,
m3u_profile_id=int(data.get('m3u_profile_id')) if data.get('m3u_profile_id') else None
)
obj.last_activity = float(data.get('last_activity', time.time()))
obj.request_count = int(data.get('request_count', 0))
@ -141,7 +144,7 @@ class RedisBackedVODConnection:
except Exception as e:
logger.error(f"[{self.session_id}] Error releasing lock: {e}")
def create_connection(self, stream_url: str, headers: dict) -> bool:
def create_connection(self, stream_url: str, headers: dict, m3u_profile_id: int = None) -> bool:
"""Create a new connection state in Redis"""
if not self._acquire_lock():
logger.warning(f"[{self.session_id}] Could not acquire lock for connection creation")
@ -155,7 +158,7 @@ class RedisBackedVODConnection:
return True
# Create new connection state
state = SerializableConnectionState(self.session_id, stream_url, headers)
state = SerializableConnectionState(self.session_id, stream_url, headers, m3u_profile_id=m3u_profile_id)
success = self._save_connection_state(state)
if success:
@ -322,8 +325,11 @@ class RedisBackedVODConnection:
}
return {}
def cleanup(self):
def cleanup(self, connection_manager=None):
"""Clean up local resources and Redis state"""
# Get connection state before cleanup to handle profile decrementing
state = self._get_connection_state()
if self.local_response:
self.local_response.close()
self.local_response = None
@ -337,6 +343,11 @@ class RedisBackedVODConnection:
self.redis_client.delete(self.connection_key)
self.redis_client.delete(self.lock_key)
logger.info(f"[{self.session_id}] Cleaned up Redis connection state")
# Decrement profile connections if we have the state and connection manager
if state and state.m3u_profile_id and connection_manager:
connection_manager._decrement_profile_connections(state.m3u_profile_id)
except Exception as e:
logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}")
@ -372,6 +383,53 @@ class MultiWorkerVODConnectionManager:
import random
return f"worker-{random.randint(1000, 9999)}"
def _get_profile_connections_key(self, profile_id: int) -> str:
"""Get Redis key for tracking connections per profile - STANDARDIZED with TS proxy"""
return f"profile_connections:{profile_id}"
def _check_profile_limits(self, m3u_profile) -> bool:
"""Check if profile has available connection slots"""
if m3u_profile.max_streams == 0: # Unlimited
return True
try:
profile_connections_key = self._get_profile_connections_key(m3u_profile.id)
current_connections = int(self.redis_client.get(profile_connections_key) or 0)
logger.info(f"[PROFILE-CHECK] Profile {m3u_profile.id} has {current_connections}/{m3u_profile.max_streams} connections")
return current_connections < m3u_profile.max_streams
except Exception as e:
logger.error(f"Error checking profile limits: {e}")
return False
def _increment_profile_connections(self, m3u_profile):
"""Increment profile connection count"""
try:
profile_connections_key = self._get_profile_connections_key(m3u_profile.id)
new_count = self.redis_client.incr(profile_connections_key)
logger.info(f"[PROFILE-INCR] Profile {m3u_profile.id} connections: {new_count}")
return new_count
except Exception as e:
logger.error(f"Error incrementing profile connections: {e}")
return None
def _decrement_profile_connections(self, m3u_profile_id: int):
"""Decrement profile connection count"""
try:
profile_connections_key = self._get_profile_connections_key(m3u_profile_id)
current_count = int(self.redis_client.get(profile_connections_key) or 0)
if current_count > 0:
new_count = self.redis_client.decr(profile_connections_key)
logger.info(f"[PROFILE-DECR] Profile {m3u_profile_id} connections: {new_count}")
return new_count
else:
logger.warning(f"[PROFILE-DECR] Profile {m3u_profile_id} already at 0 connections")
return 0
except Exception as e:
logger.error(f"Error decrementing profile connections: {e}")
return None
def stream_content_with_session(self, session_id, content_obj, stream_url, m3u_profile,
client_ip, user_agent, request,
utc_start=None, utc_end=None, offset=None, range_header=None):
@ -394,6 +452,11 @@ class MultiWorkerVODConnectionManager:
if not existing_state:
logger.info(f"[{client_id}] Worker {self.worker_id} - Creating new Redis-backed connection")
# Check profile limits before creating new connection
if not self._check_profile_limits(m3u_profile):
logger.warning(f"[{client_id}] Profile {m3u_profile.name} connection limit exceeded")
return HttpResponse("Connection limit exceeded for profile", status=429)
# Apply timeshift parameters
modified_stream_url = self._apply_timeshift_parameters(stream_url, utc_start, utc_end, offset)
@ -418,10 +481,13 @@ class MultiWorkerVODConnectionManager:
headers['X-Worker-ID'] = self.worker_id
# Create connection state in Redis
if not redis_connection.create_connection(modified_stream_url, headers):
if not redis_connection.create_connection(modified_stream_url, headers, m3u_profile.id):
logger.error(f"[{client_id}] Worker {self.worker_id} - Failed to create Redis connection")
return HttpResponse("Failed to create connection", status=500)
# Increment profile connections after successful connection creation
self._increment_profile_connections(m3u_profile)
# Create session tracking
session_info = {
"content_type": content_type,
@ -510,7 +576,7 @@ class MultiWorkerVODConnectionManager:
time.sleep(10) # Wait 10 seconds
if not redis_connection.has_active_streams():
logger.info(f"[{client_id}] Worker {self.worker_id} - Cleaning up idle Redis connection")
redis_connection.cleanup()
redis_connection.cleanup(connection_manager=self)
import threading
cleanup_thread = threading.Thread(target=delayed_cleanup)
@ -522,7 +588,7 @@ class MultiWorkerVODConnectionManager:
if not decremented:
redis_connection.decrement_active_streams()
decremented = True
redis_connection.cleanup()
redis_connection.cleanup(connection_manager=self)
yield b"Error: Stream interrupted"
finally:
@ -651,7 +717,7 @@ class MultiWorkerVODConnectionManager:
logger.info(f"[{session_id}] Cleaning up Redis-backed persistent connection")
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
redis_connection.cleanup()
redis_connection.cleanup(connection_manager=self)
# Also clean up session data
if self.redis_client:
@ -700,7 +766,7 @@ class MultiWorkerVODConnectionManager:
# Clean up connection and related keys
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
redis_connection.cleanup()
redis_connection.cleanup(connection_manager=self)
cleanup_count += 1
except Exception as e: