Reuse connections when seeking.

This commit is contained in:
SergeantPanda 2025-08-12 09:56:30 -05:00
parent 07966424f8
commit b7fb9336be
2 changed files with 356 additions and 222 deletions

View file

@ -17,10 +17,183 @@ from apps.m3u.models import M3UAccountProfile
logger = logging.getLogger("vod_proxy")
class PersistentVODConnection:
"""Handles a single persistent connection to a VOD provider for a session"""
def __init__(self, session_id: str, stream_url: str, headers: dict):
self.session_id = session_id
self.stream_url = stream_url
self.base_headers = headers
self.session = None
self.current_response = None
self.content_length = None
self.content_type = 'video/mp4'
self.final_url = None
self.lock = threading.Lock()
self.request_count = 0 # Track number of requests on this connection
def _establish_connection(self, range_header=None):
"""Establish or re-establish connection to provider"""
try:
if not self.session:
self.session = requests.Session()
headers = self.base_headers.copy()
# Validate range header against content length
if range_header and self.content_length:
logger.info(f"[{self.session_id}] Validating range {range_header} against content length {self.content_length}")
validated_range = self._validate_range_header(range_header, int(self.content_length))
if validated_range is None:
# Range is not satisfiable, but don't raise error - return empty response
logger.warning(f"[{self.session_id}] Range not satisfiable: {range_header} for content length {self.content_length}")
return None
elif validated_range != range_header:
range_header = validated_range
logger.info(f"[{self.session_id}] Adjusted range header: {range_header}")
else:
logger.info(f"[{self.session_id}] Range header validated successfully: {range_header}")
elif range_header:
logger.info(f"[{self.session_id}] Range header provided but no content length available yet: {range_header}")
if range_header:
headers['Range'] = range_header
logger.info(f"[{self.session_id}] Setting Range header: {range_header}")
# Track request count for better logging
self.request_count += 1
if self.request_count == 1:
logger.info(f"[{self.session_id}] Making initial request to provider")
target_url = self.stream_url
allow_redirects = True
else:
logger.info(f"[{self.session_id}] Making range request #{self.request_count} on SAME session (using final URL)")
# Use the final URL from first request to avoid redirect chain
target_url = self.final_url if self.final_url else self.stream_url
allow_redirects = False # No need to follow redirects again
logger.info(f"[{self.session_id}] Using cached final URL: {target_url}")
response = self.session.get(
target_url,
headers=headers,
stream=True,
timeout=(10, 30),
allow_redirects=allow_redirects
)
response.raise_for_status()
# Log successful response
if self.request_count == 1:
logger.info(f"[{self.session_id}] Request #{self.request_count} successful: {response.status_code} (followed redirects)")
else:
logger.info(f"[{self.session_id}] Request #{self.request_count} successful: {response.status_code} (direct to final URL)")
# Capture headers from final URL
if not self.content_length:
self.content_length = response.headers.get('content-length')
self.content_type = response.headers.get('content-type', 'video/mp4')
self.final_url = response.url
logger.info(f"[{self.session_id}] *** PERSISTENT CONNECTION - Final URL: {self.final_url} ***")
logger.info(f"[{self.session_id}] *** PERSISTENT CONNECTION - Content-Length: {self.content_length} ***")
self.current_response = response
return response
except Exception as e:
logger.error(f"[{self.session_id}] Error establishing connection: {e}")
self.cleanup()
raise
def _validate_range_header(self, range_header, content_length):
"""Validate and potentially adjust range header against content length"""
try:
if not range_header or not range_header.startswith('bytes='):
return range_header
range_part = range_header.replace('bytes=', '')
if '-' not in range_part:
return range_header
start_str, end_str = range_part.split('-', 1)
# Parse start byte
if start_str:
start_byte = int(start_str)
if start_byte >= content_length:
# Start is beyond file end - not satisfiable
logger.warning(f"[{self.session_id}] Range start {start_byte} >= content length {content_length} - not satisfiable")
return None
else:
start_byte = 0
# Parse end byte
if end_str:
end_byte = int(end_str)
if end_byte >= content_length:
# Adjust end to file end
end_byte = content_length - 1
logger.info(f"[{self.session_id}] Adjusted range end to {end_byte}")
else:
end_byte = content_length - 1
# Ensure start <= end
if start_byte > end_byte:
logger.warning(f"[{self.session_id}] Range start {start_byte} > end {end_byte} - not satisfiable")
return None
validated_range = f"bytes={start_byte}-{end_byte}"
return validated_range
except (ValueError, IndexError) as e:
logger.warning(f"[{self.session_id}] Could not validate range header {range_header}: {e}")
return range_header
def get_stream(self, range_header=None):
"""Get stream with optional range header - reuses connection for range requests"""
with self.lock:
# For range requests, we don't need to close the connection
# We can make a new request on the same session
if range_header:
logger.info(f"[{self.session_id}] Range request on existing connection: {range_header}")
# Close only the response stream, keep the session alive
if self.current_response:
logger.info(f"[{self.session_id}] Closing previous response stream (keeping connection alive)")
self.current_response.close()
self.current_response = None
# Make new request (reuses connection if session exists)
response = self._establish_connection(range_header)
if response is None:
# Range not satisfiable - return None to indicate this
return None
return self.current_response
def get_headers(self):
"""Get headers for response"""
return {
'content_length': self.content_length,
'content_type': self.content_type,
'final_url': self.final_url
}
def cleanup(self):
"""Clean up connection resources"""
with self.lock:
if self.current_response:
self.current_response.close()
self.current_response = None
if self.session:
self.session.close()
self.session = None
logger.info(f"[{self.session_id}] Persistent connection cleaned up")
class VODConnectionManager:
"""Manages VOD connections using Redis for tracking"""
_instance = None
_persistent_connections = {} # session_id -> PersistentVODConnection
@classmethod
def get_instance(cls):
@ -32,6 +205,7 @@ class VODConnectionManager:
def __init__(self):
self.redis_client = RedisClient.get_client()
self.connection_ttl = 3600 # 1 hour TTL for connections
self.session_ttl = 1800 # 30 minutes TTL for sessions
def _get_connection_key(self, content_type: str, content_uuid: str, client_id: str) -> str:
"""Get Redis key for a specific connection"""
@ -564,10 +738,10 @@ class VODConnectionManager:
def stream_content_with_session(self, session_id, content_obj, stream_url, m3u_profile, client_ip, user_agent, request,
utc_start=None, utc_end=None, offset=None, range_header=None):
"""
Stream VOD content with session-based connection reuse for timeshift operations
Stream VOD content with persistent connection per session
This method reuses existing upstream connections when the same session makes
timeshift requests, reducing provider connection usage to 1 per client session.
Maintains 1 open connection to provider per session that handles all range requests
dynamically based on client Range headers for seeking functionality.
"""
try:
@ -585,200 +759,101 @@ class VODConnectionManager:
content_uuid = str(content_obj.uuid)
content_name = getattr(content_obj, 'name', getattr(content_obj, 'title', 'Unknown'))
# Check if we have an existing session connection
session_key = f"vod_session:{session_id}"
session_info = None
# Check for existing connection or create new one
persistent_conn = self._persistent_connections.get(session_id)
if self.redis_client:
try:
session_data = self.redis_client.get(session_key)
if session_data:
session_info = json.loads(session_data.decode('utf-8'))
logger.info(f"[{client_id}] Found existing session: {session_info}")
except Exception as e:
logger.warning(f"[{client_id}] Error reading session data: {e}")
if not persistent_conn:
logger.info(f"[{client_id}] Creating NEW persistent connection for {content_type} {content_name}")
# If no existing session or session expired, create new connection tracking
# But only increment the profile counter ONCE per session
if not session_info:
connection_created = self.create_connection(
content_type=content_type,
content_uuid=content_uuid,
content_name=content_name,
client_id=client_id,
client_ip=client_ip,
user_agent=user_agent,
m3u_profile=m3u_profile
)
if not connection_created:
logger.error(f"Failed to create connection tracking for {content_type} {content_uuid}")
return HttpResponse("Connection limit exceeded", status=503)
# Store session info in Redis
# Create session in Redis for tracking
session_info = {
'content_type': content_type,
'content_uuid': content_uuid,
'content_name': content_name,
'created_at': time.time(),
'profile_id': m3u_profile.id,
'connection_counted': True # Mark that we've counted this connection
"content_type": content_type,
"content_uuid": content_uuid,
"content_name": content_name,
"created_at": str(time.time()),
"profile_id": str(m3u_profile.id),
"connection_counted": "True"
}
session_key = f"vod_session:{session_id}"
if self.redis_client:
try:
self.redis_client.setex(
session_key,
self.connection_ttl,
json.dumps(session_info)
)
logger.info(f"[{client_id}] Created new session: {session_info}")
except Exception as e:
logger.error(f"[{client_id}] Error storing session data: {e}")
else:
# Session exists - don't create new connection tracking
# This prevents double-counting connections for the same session
logger.info(f"[{client_id}] Reusing existing session - no new connection created") # Apply timeshift parameters to URL
modified_stream_url = self._apply_timeshift_parameters(
stream_url, utc_start, utc_end, offset
)
self.redis_client.hset(session_key, mapping=session_info)
self.redis_client.expire(session_key, self.session_ttl)
logger.info(f"[{client_id}] Modified stream URL for timeshift: {modified_stream_url}")
logger.info(f"[{client_id}] Created new session: {session_info}")
# Prepare headers - preserve ALL client headers for proper authentication
def prepare_headers():
headers = {}
# Apply timeshift parameters to URL
modified_stream_url = self._apply_timeshift_parameters(stream_url, utc_start, utc_end, offset)
logger.info(f"[{client_id}] Modified stream URL for timeshift: {modified_stream_url}")
# Copy all relevant headers from the original request
header_mapping = {
'HTTP_USER_AGENT': 'User-Agent',
'HTTP_AUTHORIZATION': 'Authorization',
'HTTP_X_FORWARDED_FOR': 'X-Forwarded-For',
'HTTP_X_REAL_IP': 'X-Real-IP',
'HTTP_REFERER': 'Referer',
'HTTP_ORIGIN': 'Origin',
'HTTP_ACCEPT': 'Accept',
'HTTP_ACCEPT_LANGUAGE': 'Accept-Language',
'HTTP_ACCEPT_ENCODING': 'Accept-Encoding',
'HTTP_CONNECTION': 'Connection',
'HTTP_CACHE_CONTROL': 'Cache-Control',
'HTTP_COOKIE': 'Cookie',
'HTTP_DNT': 'DNT',
'HTTP_X_FORWARDED_PROTO': 'X-Forwarded-Proto',
'HTTP_X_FORWARDED_PORT': 'X-Forwarded-Port',
# Prepare headers
headers = {
'User-Agent': user_agent or 'VLC/3.0.21 LibVLC/3.0.21',
'Accept': '*/*',
'Connection': 'keep-alive'
}
# Log all headers for debugging
logger.debug(f"[{client_id}] All available headers:")
for django_header, http_header in header_mapping.items():
if hasattr(request, 'META') and django_header in request.META:
header_value = request.META[django_header]
headers[http_header] = header_value
logger.debug(f"[{client_id}] {http_header}: {header_value}")
# Add any authentication headers from profile
if hasattr(m3u_profile, 'auth_headers') and m3u_profile.auth_headers:
headers.update(m3u_profile.auth_headers)
# Check for any timeshift-related headers VLC might send
timeshift_headers = ['HTTP_TIME', 'HTTP_TIMESTAMP', 'HTTP_SEEK', 'HTTP_POSITION']
for header in timeshift_headers:
if hasattr(request, 'META') and header in request.META:
value = request.META[header]
logger.info(f"[{client_id}] Found timeshift header {header}: {value}")
# Forward these as custom headers
headers[header.replace('HTTP_', '').replace('_', '-')] = value # Ensure we have a User-Agent
if 'User-Agent' not in headers and user_agent:
headers['User-Agent'] = user_agent
# Create persistent connection
persistent_conn = PersistentVODConnection(session_id, modified_stream_url, headers)
self._persistent_connections[session_id] = persistent_conn
# Add client IP headers
if client_ip:
headers['X-Forwarded-For'] = client_ip
headers['X-Real-IP'] = client_ip
# Track connection in profile
self.create_connection(content_type, content_uuid, content_name, client_id, client_ip, user_agent, m3u_profile)
else:
logger.info(f"[{client_id}] Using EXISTING persistent connection for {content_type} {content_name}")
# Add Range header for seeking support
if range_header:
headers['Range'] = range_header
logger.info(f"[{client_id}] Added Range header: {range_header}")
# Update session activity
session_key = f"vod_session:{session_id}"
if self.redis_client:
self.redis_client.hset(session_key, "last_activity", str(time.time()))
self.redis_client.expire(session_key, self.session_ttl)
return headers
logger.info(f"[{client_id}] Reusing existing session - no new connection created")
# STEP 1: Make a small Range request to get headers from the final URL after redirects
logger.info(f"[{client_id}] Making small Range request to get headers from final URL")
try:
probe_headers = prepare_headers()
# Add a small range request to get headers without downloading much data
probe_headers['Range'] = 'bytes=0-1024'
probe_response = requests.get(
modified_stream_url,
headers=probe_headers,
timeout=(10, 30),
allow_redirects=True,
stream=True
)
probe_response.raise_for_status()
# Extract critical headers from the final URL response
upstream_content_length = None
upstream_content_range = probe_response.headers.get('content-range')
# Parse Content-Range to get total file size: "bytes 0-1024/1559626615"
if upstream_content_range:
try:
parts = upstream_content_range.split('/')
if len(parts) == 2:
upstream_content_length = parts[1]
logger.info(f"[{client_id}] Extracted Content-Length from Content-Range: {upstream_content_length}")
except Exception as e:
logger.warning(f"[{client_id}] Could not parse Content-Range: {e}")
# Fallback to Content-Length header if available
if not upstream_content_length:
upstream_content_length = probe_response.headers.get('content-length')
upstream_content_type = probe_response.headers.get('content-type', 'video/mp4')
upstream_accept_ranges = probe_response.headers.get('accept-ranges', 'bytes')
upstream_last_modified = probe_response.headers.get('last-modified')
upstream_etag = probe_response.headers.get('etag')
logger.info(f"[{client_id}] Final URL after redirects: {probe_response.url}")
logger.info(f"[{client_id}] Headers from final URL - Content-Length: {upstream_content_length}, Content-Type: {upstream_content_type}")
# Close the probe response
probe_response.close()
except Exception as e:
logger.warning(f"[{client_id}] Probe request failed, proceeding without upstream headers: {e}")
upstream_content_length = None
upstream_content_type = 'video/mp4'
upstream_accept_ranges = 'bytes'
upstream_last_modified = None
upstream_etag = None
# Log the incoming Range header for debugging
if range_header:
logger.info(f"[{client_id}] *** CLIENT RANGE REQUEST: {range_header} ***")
# STEP 2: Create streaming generator for actual content
def stream_generator():
upstream_response = None
# Parse range for logging
try:
logger.info(f"[{client_id}] Starting session-based VOD stream for {content_type} {content_name}")
if 'bytes=' in range_header:
range_part = range_header.replace('bytes=', '')
if '-' in range_part:
start_byte, end_byte = range_part.split('-', 1)
if start_byte and int(start_byte) > 0:
start_pos_mb = int(start_byte) / (1024 * 1024)
logger.info(f"[{client_id}] *** VLC SEEKING TO: {start_pos_mb:.1f} MB ***")
else:
logger.info(f"[{client_id}] Range request from start")
except Exception as e:
logger.warning(f"[{client_id}] Could not parse range header: {e}")
else:
logger.info(f"[{client_id}] Full content request (no Range header)")
headers = prepare_headers()
# Get stream from persistent connection with current range
upstream_response = persistent_conn.get_stream(range_header)
# Make request to upstream server
upstream_response = requests.get(
modified_stream_url,
headers=headers,
stream=True,
timeout=(10, 30),
allow_redirects=True
)
upstream_response.raise_for_status()
# Handle range not satisfiable
if upstream_response is None:
logger.warning(f"[{client_id}] Range not satisfiable - returning 416 error")
return HttpResponse(
"Requested Range Not Satisfiable",
status=416,
headers={
'Content-Range': f'bytes */{persistent_conn.content_length}' if persistent_conn.content_length else 'bytes */*'
}
)
# Log upstream response info
logger.info(f"[{client_id}] Upstream response status: {upstream_response.status_code}")
logger.info(f"[{client_id}] Final URL after redirects: {upstream_response.url}")
if 'content-type' in upstream_response.headers:
logger.info(f"[{client_id}] Upstream content-type: {upstream_response.headers['content-type']}")
if 'content-length' in upstream_response.headers:
logger.info(f"[{client_id}] Upstream content-length: {upstream_response.headers['content-length']}")
if 'content-range' in upstream_response.headers:
logger.info(f"[{client_id}] Upstream content-range: {upstream_response.headers['content-range']}")
connection_headers = persistent_conn.get_headers()
# Create streaming generator
def stream_generator():
try:
logger.info(f"[{client_id}] Starting stream from persistent connection")
bytes_sent = 0
chunk_count = 0
@ -798,26 +873,20 @@ class VODConnectionManager:
bytes_sent=len(chunk)
)
logger.info(f"[{client_id}] Session-based VOD stream completed: {bytes_sent} bytes sent")
logger.info(f"[{client_id}] Persistent stream completed: {bytes_sent} bytes sent")
except requests.RequestException as e:
logger.error(f"[{client_id}] Error streaming from source: {e}")
yield b"Error: Unable to stream content"
except Exception as e:
logger.error(f"[{client_id}] Error in session stream generator: {e}")
finally:
# Don't remove connection tracking for sessions - let it expire naturally
# This allows timeshift operations to reuse the connection slot
if upstream_response:
upstream_response.close()
logger.error(f"[{client_id}] Error in persistent stream: {e}")
# Don't cleanup connection on error - it might be reusable
yield b"Error: Stream interrupted"
# STEP 3: Create streaming response with headers from final URL
# Create streaming response
response = StreamingHttpResponse(
streaming_content=stream_generator(),
content_type=upstream_content_type
content_type=connection_headers['content_type']
)
# Set status code based on request type
# Set status code based on range request
if range_header:
response.status_code = 206
logger.info(f"[{client_id}] Set response status to 206 for range request")
@ -825,66 +894,59 @@ class VODConnectionManager:
response.status_code = 200
logger.info(f"[{client_id}] Set response status to 200 for full request")
# Set headers that VLC and other players expect
# Set headers that VLC expects
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['X-Content-Type-Options'] = 'nosniff'
response['Connection'] = 'keep-alive'
response['Accept-Ranges'] = upstream_accept_ranges
response['Accept-Ranges'] = 'bytes'
# CRITICAL: Forward Content-Length from final URL to enable VLC seeking
if upstream_content_length:
response['Content-Length'] = upstream_content_length
logger.info(f"[{client_id}] *** FORWARDED Content-Length from final URL: {upstream_content_length} *** (VLC seeking enabled)")
# CRITICAL: Forward Content-Length from persistent connection
if connection_headers['content_length']:
response['Content-Length'] = connection_headers['content_length']
logger.info(f"[{client_id}] *** FORWARDED Content-Length: {connection_headers['content_length']} *** (VLC seeking enabled)")
else:
logger.warning(f"[{client_id}] *** NO Content-Length from final URL *** (VLC seeking may not work)")
logger.warning(f"[{client_id}] *** NO Content-Length available *** (VLC seeking may not work)")
# Forward other useful headers from final URL
if upstream_last_modified:
response['Last-Modified'] = upstream_last_modified
if upstream_etag:
response['ETag'] = upstream_etag
# Handle range requests - set Content-Range if this is a partial request
if range_header and upstream_content_length:
# Parse range header to set proper Content-Range
# Handle range requests - set Content-Range for partial responses
if range_header and connection_headers['content_length']:
try:
if 'bytes=' in range_header:
range_part = range_header.replace('bytes=', '')
if '-' in range_part:
start_byte, end_byte = range_part.split('-', 1)
start = int(start_byte) if start_byte else 0
end = int(end_byte) if end_byte else int(upstream_content_length) - 1
total_size = int(upstream_content_length)
end = int(end_byte) if end_byte else int(connection_headers['content_length']) - 1
total_size = int(connection_headers['content_length'])
content_range = f"bytes {start}-{end}/{total_size}"
response['Content-Range'] = content_range
logger.info(f"[{client_id}] Set Content-Range: {content_range}")
except Exception as e:
logger.warning(f"[{client_id}] Could not parse range for Content-Range header: {e}")
logger.warning(f"[{client_id}] Could not set Content-Range: {e}")
# Log the critical headers we're sending to the client
logger.info(f"[{client_id}] SESSION Response headers to client - Status: {response.status_code}, Accept-Ranges: {response.get('Accept-Ranges', 'MISSING')}")
if 'Content-Length' in response:
logger.info(f"[{client_id}] SESSION Content-Length: {response['Content-Length']}")
# Log response headers
logger.info(f"[{client_id}] PERSISTENT Response - Status: {response.status_code}, Content-Length: {response.get('Content-Length', 'MISSING')}")
if 'Content-Range' in response:
logger.info(f"[{client_id}] SESSION Content-Range: {response['Content-Range']}")
if 'Content-Type' in response:
logger.info(f"[{client_id}] SESSION Content-Type: {response['Content-Type']}")
logger.info(f"[{client_id}] PERSISTENT Content-Range: {response['Content-Range']}")
# Critical: Log what VLC needs to see for seeking to work
# Log VLC seeking status
if response.status_code == 200:
if upstream_content_length:
logger.info(f"[{client_id}] SESSION VLC SEEKING INFO: ✅ Full content response (200) with Content-Length from final URL. VLC seeking should work!")
if connection_headers['content_length']:
logger.info(f"[{client_id}] ✅ PERSISTENT VLC SEEKING: Full response with Content-Length - seeking should work!")
else:
logger.info(f"[{client_id}] SESSION VLC SEEKING INFO: ❌ Full content response (200) but NO Content-Length from final URL. VLC seeking will NOT work!")
logger.info(f"[{client_id}] ❌ PERSISTENT VLC SEEKING: Full response but no Content-Length - seeking won't work!")
elif response.status_code == 206:
logger.info(f"[{client_id}] SESSION VLC SEEKING INFO: ✅ Partial content response (206). This confirms seeking is working if VLC requested a range.")
logger.info(f"[{client_id}] ✅ PERSISTENT VLC SEEKING: Partial response - seeking is working!")
return response
except Exception as e:
logger.error(f"Error in stream_content_with_session: {e}", exc_info=True)
logger.error(f"Error in persistent stream_content_with_session: {e}", exc_info=True)
# Cleanup persistent connection on error
if session_id in self._persistent_connections:
self._persistent_connections[session_id].cleanup()
del self._persistent_connections[session_id]
return HttpResponse(f"Streaming error: {str(e)}", status=500)
def _apply_timeshift_parameters(self, original_url, utc_start=None, utc_end=None, offset=None):
@ -971,7 +1033,65 @@ class VODConnectionManager:
except Exception as e:
logger.error(f"Error applying timeshift parameters: {e}")
return original_url# Global instance
return original_url
def cleanup_persistent_connection(self, session_id: str):
"""Clean up a specific persistent connection"""
if session_id in self._persistent_connections:
logger.info(f"[{session_id}] Cleaning up persistent connection")
self._persistent_connections[session_id].cleanup()
del self._persistent_connections[session_id]
# Also remove session tracking
session_key = f"vod_session:{session_id}"
if self.redis_client:
try:
session_data = self.redis_client.hgetall(session_key)
if session_data:
# Remove from profile connections if counted
if session_data.get(b'connection_counted') == b'True':
profile_id = session_data.get(b'profile_id')
if profile_id:
profile_key = self._get_profile_connections_key(int(profile_id.decode('utf-8')))
self.redis_client.decr(profile_key)
logger.info(f"[{session_id}] Decremented profile {profile_id.decode('utf-8')} connections")
self.redis_client.delete(session_key)
logger.info(f"[{session_id}] Removed session tracking")
except Exception as e:
logger.error(f"[{session_id}] Error cleaning up session: {e}")
def cleanup_stale_persistent_connections(self, max_age_seconds: int = 1800):
"""Clean up stale persistent connections that haven't been used recently"""
current_time = time.time()
stale_sessions = []
for session_id, conn in self._persistent_connections.items():
session_key = f"vod_session:{session_id}"
try:
if self.redis_client:
session_data = self.redis_client.hgetall(session_key)
if session_data:
created_at = float(session_data.get(b'created_at', b'0').decode('utf-8'))
if current_time - created_at > max_age_seconds:
stale_sessions.append(session_id)
else:
# Session data missing, connection is stale
stale_sessions.append(session_id)
except Exception as e:
logger.error(f"[{session_id}] Error checking session age: {e}")
stale_sessions.append(session_id)
# Clean up stale connections
for session_id in stale_sessions:
logger.info(f"[{session_id}] Cleaning up stale persistent connection")
self.cleanup_persistent_connection(session_id)
if stale_sessions:
logger.info(f"Cleaned up {len(stale_sessions)} stale persistent connections")
# Global instance
_connection_manager = None
def get_connection_manager() -> VODConnectionManager:

View file

@ -633,3 +633,17 @@ def rehash_streams(keys):
for account_id in acquired_locks:
release_task_lock('refresh_single_m3u_account', account_id)
logger.info(f"Released M3U task locks for {len(acquired_locks)} accounts")
@shared_task
def cleanup_vod_persistent_connections():
"""Clean up stale VOD persistent connections"""
try:
from apps.proxy.vod_proxy.connection_manager import connection_manager
# Clean up connections older than 30 minutes
connection_manager.cleanup_stale_persistent_connections(max_age_seconds=1800)
logger.info("VOD persistent connection cleanup completed")
except Exception as e:
logger.error(f"Error during VOD persistent connection cleanup: {e}")