From b7fb9336be7b9bd9d052f554424a28c77d504864 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Tue, 12 Aug 2025 09:56:30 -0500 Subject: [PATCH] Reuse connections when seeking. --- apps/proxy/vod_proxy/connection_manager.py | 564 +++++++++++++-------- core/tasks.py | 14 + 2 files changed, 356 insertions(+), 222 deletions(-) diff --git a/apps/proxy/vod_proxy/connection_manager.py b/apps/proxy/vod_proxy/connection_manager.py index f25547cf..04540b8c 100644 --- a/apps/proxy/vod_proxy/connection_manager.py +++ b/apps/proxy/vod_proxy/connection_manager.py @@ -17,10 +17,183 @@ from apps.m3u.models import M3UAccountProfile logger = logging.getLogger("vod_proxy") +class PersistentVODConnection: + """Handles a single persistent connection to a VOD provider for a session""" + + def __init__(self, session_id: str, stream_url: str, headers: dict): + self.session_id = session_id + self.stream_url = stream_url + self.base_headers = headers + self.session = None + self.current_response = None + self.content_length = None + self.content_type = 'video/mp4' + self.final_url = None + self.lock = threading.Lock() + self.request_count = 0 # Track number of requests on this connection + + def _establish_connection(self, range_header=None): + """Establish or re-establish connection to provider""" + try: + if not self.session: + self.session = requests.Session() + + headers = self.base_headers.copy() + + # Validate range header against content length + if range_header and self.content_length: + logger.info(f"[{self.session_id}] Validating range {range_header} against content length {self.content_length}") + validated_range = self._validate_range_header(range_header, int(self.content_length)) + if validated_range is None: + # Range is not satisfiable, but don't raise error - return empty response + logger.warning(f"[{self.session_id}] Range not satisfiable: {range_header} for content length {self.content_length}") + return None + elif validated_range != range_header: + range_header = validated_range + logger.info(f"[{self.session_id}] Adjusted range header: {range_header}") + else: + logger.info(f"[{self.session_id}] Range header validated successfully: {range_header}") + elif range_header: + logger.info(f"[{self.session_id}] Range header provided but no content length available yet: {range_header}") + + if range_header: + headers['Range'] = range_header + logger.info(f"[{self.session_id}] Setting Range header: {range_header}") + + # Track request count for better logging + self.request_count += 1 + if self.request_count == 1: + logger.info(f"[{self.session_id}] Making initial request to provider") + target_url = self.stream_url + allow_redirects = True + else: + logger.info(f"[{self.session_id}] Making range request #{self.request_count} on SAME session (using final URL)") + # Use the final URL from first request to avoid redirect chain + target_url = self.final_url if self.final_url else self.stream_url + allow_redirects = False # No need to follow redirects again + logger.info(f"[{self.session_id}] Using cached final URL: {target_url}") + + response = self.session.get( + target_url, + headers=headers, + stream=True, + timeout=(10, 30), + allow_redirects=allow_redirects + ) + response.raise_for_status() + + # Log successful response + if self.request_count == 1: + logger.info(f"[{self.session_id}] Request #{self.request_count} successful: {response.status_code} (followed redirects)") + else: + logger.info(f"[{self.session_id}] Request #{self.request_count} successful: {response.status_code} (direct to final URL)") + + # Capture headers from final URL + if not self.content_length: + self.content_length = response.headers.get('content-length') + self.content_type = response.headers.get('content-type', 'video/mp4') + self.final_url = response.url + logger.info(f"[{self.session_id}] *** PERSISTENT CONNECTION - Final URL: {self.final_url} ***") + logger.info(f"[{self.session_id}] *** PERSISTENT CONNECTION - Content-Length: {self.content_length} ***") + + self.current_response = response + return response + + except Exception as e: + logger.error(f"[{self.session_id}] Error establishing connection: {e}") + self.cleanup() + raise + + def _validate_range_header(self, range_header, content_length): + """Validate and potentially adjust range header against content length""" + try: + if not range_header or not range_header.startswith('bytes='): + return range_header + + range_part = range_header.replace('bytes=', '') + if '-' not in range_part: + return range_header + + start_str, end_str = range_part.split('-', 1) + + # Parse start byte + if start_str: + start_byte = int(start_str) + if start_byte >= content_length: + # Start is beyond file end - not satisfiable + logger.warning(f"[{self.session_id}] Range start {start_byte} >= content length {content_length} - not satisfiable") + return None + else: + start_byte = 0 + + # Parse end byte + if end_str: + end_byte = int(end_str) + if end_byte >= content_length: + # Adjust end to file end + end_byte = content_length - 1 + logger.info(f"[{self.session_id}] Adjusted range end to {end_byte}") + else: + end_byte = content_length - 1 + + # Ensure start <= end + if start_byte > end_byte: + logger.warning(f"[{self.session_id}] Range start {start_byte} > end {end_byte} - not satisfiable") + return None + + validated_range = f"bytes={start_byte}-{end_byte}" + return validated_range + + except (ValueError, IndexError) as e: + logger.warning(f"[{self.session_id}] Could not validate range header {range_header}: {e}") + return range_header + + def get_stream(self, range_header=None): + """Get stream with optional range header - reuses connection for range requests""" + with self.lock: + # For range requests, we don't need to close the connection + # We can make a new request on the same session + if range_header: + logger.info(f"[{self.session_id}] Range request on existing connection: {range_header}") + # Close only the response stream, keep the session alive + if self.current_response: + logger.info(f"[{self.session_id}] Closing previous response stream (keeping connection alive)") + self.current_response.close() + self.current_response = None + + # Make new request (reuses connection if session exists) + response = self._establish_connection(range_header) + if response is None: + # Range not satisfiable - return None to indicate this + return None + + return self.current_response + + def get_headers(self): + """Get headers for response""" + return { + 'content_length': self.content_length, + 'content_type': self.content_type, + 'final_url': self.final_url + } + + def cleanup(self): + """Clean up connection resources""" + with self.lock: + if self.current_response: + self.current_response.close() + self.current_response = None + if self.session: + self.session.close() + self.session = None + logger.info(f"[{self.session_id}] Persistent connection cleaned up") + + class VODConnectionManager: """Manages VOD connections using Redis for tracking""" _instance = None + _persistent_connections = {} # session_id -> PersistentVODConnection @classmethod def get_instance(cls): @@ -32,6 +205,7 @@ class VODConnectionManager: def __init__(self): self.redis_client = RedisClient.get_client() self.connection_ttl = 3600 # 1 hour TTL for connections + self.session_ttl = 1800 # 30 minutes TTL for sessions def _get_connection_key(self, content_type: str, content_uuid: str, client_id: str) -> str: """Get Redis key for a specific connection""" @@ -564,10 +738,10 @@ class VODConnectionManager: def stream_content_with_session(self, session_id, content_obj, stream_url, m3u_profile, client_ip, user_agent, request, utc_start=None, utc_end=None, offset=None, range_header=None): """ - Stream VOD content with session-based connection reuse for timeshift operations + Stream VOD content with persistent connection per session - This method reuses existing upstream connections when the same session makes - timeshift requests, reducing provider connection usage to 1 per client session. + Maintains 1 open connection to provider per session that handles all range requests + dynamically based on client Range headers for seeking functionality. """ try: @@ -585,200 +759,101 @@ class VODConnectionManager: content_uuid = str(content_obj.uuid) content_name = getattr(content_obj, 'name', getattr(content_obj, 'title', 'Unknown')) - # Check if we have an existing session connection - session_key = f"vod_session:{session_id}" - session_info = None + # Check for existing connection or create new one + persistent_conn = self._persistent_connections.get(session_id) - if self.redis_client: - try: - session_data = self.redis_client.get(session_key) - if session_data: - session_info = json.loads(session_data.decode('utf-8')) - logger.info(f"[{client_id}] Found existing session: {session_info}") - except Exception as e: - logger.warning(f"[{client_id}] Error reading session data: {e}") + if not persistent_conn: + logger.info(f"[{client_id}] Creating NEW persistent connection for {content_type} {content_name}") - # If no existing session or session expired, create new connection tracking - # But only increment the profile counter ONCE per session - if not session_info: - connection_created = self.create_connection( - content_type=content_type, - content_uuid=content_uuid, - content_name=content_name, - client_id=client_id, - client_ip=client_ip, - user_agent=user_agent, - m3u_profile=m3u_profile - ) - - if not connection_created: - logger.error(f"Failed to create connection tracking for {content_type} {content_uuid}") - return HttpResponse("Connection limit exceeded", status=503) - - # Store session info in Redis + # Create session in Redis for tracking session_info = { - 'content_type': content_type, - 'content_uuid': content_uuid, - 'content_name': content_name, - 'created_at': time.time(), - 'profile_id': m3u_profile.id, - 'connection_counted': True # Mark that we've counted this connection + "content_type": content_type, + "content_uuid": content_uuid, + "content_name": content_name, + "created_at": str(time.time()), + "profile_id": str(m3u_profile.id), + "connection_counted": "True" } + session_key = f"vod_session:{session_id}" if self.redis_client: - try: - self.redis_client.setex( - session_key, - self.connection_ttl, - json.dumps(session_info) - ) - logger.info(f"[{client_id}] Created new session: {session_info}") - except Exception as e: - logger.error(f"[{client_id}] Error storing session data: {e}") - else: - # Session exists - don't create new connection tracking - # This prevents double-counting connections for the same session - logger.info(f"[{client_id}] Reusing existing session - no new connection created") # Apply timeshift parameters to URL - modified_stream_url = self._apply_timeshift_parameters( - stream_url, utc_start, utc_end, offset - ) + self.redis_client.hset(session_key, mapping=session_info) + self.redis_client.expire(session_key, self.session_ttl) - logger.info(f"[{client_id}] Modified stream URL for timeshift: {modified_stream_url}") + logger.info(f"[{client_id}] Created new session: {session_info}") - # Prepare headers - preserve ALL client headers for proper authentication - def prepare_headers(): - headers = {} + # Apply timeshift parameters to URL + modified_stream_url = self._apply_timeshift_parameters(stream_url, utc_start, utc_end, offset) + logger.info(f"[{client_id}] Modified stream URL for timeshift: {modified_stream_url}") - # Copy all relevant headers from the original request - header_mapping = { - 'HTTP_USER_AGENT': 'User-Agent', - 'HTTP_AUTHORIZATION': 'Authorization', - 'HTTP_X_FORWARDED_FOR': 'X-Forwarded-For', - 'HTTP_X_REAL_IP': 'X-Real-IP', - 'HTTP_REFERER': 'Referer', - 'HTTP_ORIGIN': 'Origin', - 'HTTP_ACCEPT': 'Accept', - 'HTTP_ACCEPT_LANGUAGE': 'Accept-Language', - 'HTTP_ACCEPT_ENCODING': 'Accept-Encoding', - 'HTTP_CONNECTION': 'Connection', - 'HTTP_CACHE_CONTROL': 'Cache-Control', - 'HTTP_COOKIE': 'Cookie', - 'HTTP_DNT': 'DNT', - 'HTTP_X_FORWARDED_PROTO': 'X-Forwarded-Proto', - 'HTTP_X_FORWARDED_PORT': 'X-Forwarded-Port', + # Prepare headers + headers = { + 'User-Agent': user_agent or 'VLC/3.0.21 LibVLC/3.0.21', + 'Accept': '*/*', + 'Connection': 'keep-alive' } - # Log all headers for debugging - logger.debug(f"[{client_id}] All available headers:") - for django_header, http_header in header_mapping.items(): - if hasattr(request, 'META') and django_header in request.META: - header_value = request.META[django_header] - headers[http_header] = header_value - logger.debug(f"[{client_id}] {http_header}: {header_value}") + # Add any authentication headers from profile + if hasattr(m3u_profile, 'auth_headers') and m3u_profile.auth_headers: + headers.update(m3u_profile.auth_headers) - # Check for any timeshift-related headers VLC might send - timeshift_headers = ['HTTP_TIME', 'HTTP_TIMESTAMP', 'HTTP_SEEK', 'HTTP_POSITION'] - for header in timeshift_headers: - if hasattr(request, 'META') and header in request.META: - value = request.META[header] - logger.info(f"[{client_id}] Found timeshift header {header}: {value}") - # Forward these as custom headers - headers[header.replace('HTTP_', '').replace('_', '-')] = value # Ensure we have a User-Agent - if 'User-Agent' not in headers and user_agent: - headers['User-Agent'] = user_agent + # Create persistent connection + persistent_conn = PersistentVODConnection(session_id, modified_stream_url, headers) + self._persistent_connections[session_id] = persistent_conn - # Add client IP headers - if client_ip: - headers['X-Forwarded-For'] = client_ip - headers['X-Real-IP'] = client_ip + # Track connection in profile + self.create_connection(content_type, content_uuid, content_name, client_id, client_ip, user_agent, m3u_profile) + else: + logger.info(f"[{client_id}] Using EXISTING persistent connection for {content_type} {content_name}") - # Add Range header for seeking support - if range_header: - headers['Range'] = range_header - logger.info(f"[{client_id}] Added Range header: {range_header}") + # Update session activity + session_key = f"vod_session:{session_id}" + if self.redis_client: + self.redis_client.hset(session_key, "last_activity", str(time.time())) + self.redis_client.expire(session_key, self.session_ttl) - return headers + logger.info(f"[{client_id}] Reusing existing session - no new connection created") - # STEP 1: Make a small Range request to get headers from the final URL after redirects - logger.info(f"[{client_id}] Making small Range request to get headers from final URL") - try: - probe_headers = prepare_headers() - # Add a small range request to get headers without downloading much data - probe_headers['Range'] = 'bytes=0-1024' - - probe_response = requests.get( - modified_stream_url, - headers=probe_headers, - timeout=(10, 30), - allow_redirects=True, - stream=True - ) - probe_response.raise_for_status() - - # Extract critical headers from the final URL response - upstream_content_length = None - upstream_content_range = probe_response.headers.get('content-range') - - # Parse Content-Range to get total file size: "bytes 0-1024/1559626615" - if upstream_content_range: - try: - parts = upstream_content_range.split('/') - if len(parts) == 2: - upstream_content_length = parts[1] - logger.info(f"[{client_id}] Extracted Content-Length from Content-Range: {upstream_content_length}") - except Exception as e: - logger.warning(f"[{client_id}] Could not parse Content-Range: {e}") - - # Fallback to Content-Length header if available - if not upstream_content_length: - upstream_content_length = probe_response.headers.get('content-length') - - upstream_content_type = probe_response.headers.get('content-type', 'video/mp4') - upstream_accept_ranges = probe_response.headers.get('accept-ranges', 'bytes') - upstream_last_modified = probe_response.headers.get('last-modified') - upstream_etag = probe_response.headers.get('etag') - - logger.info(f"[{client_id}] Final URL after redirects: {probe_response.url}") - logger.info(f"[{client_id}] Headers from final URL - Content-Length: {upstream_content_length}, Content-Type: {upstream_content_type}") - - # Close the probe response - probe_response.close() - - except Exception as e: - logger.warning(f"[{client_id}] Probe request failed, proceeding without upstream headers: {e}") - upstream_content_length = None - upstream_content_type = 'video/mp4' - upstream_accept_ranges = 'bytes' - upstream_last_modified = None - upstream_etag = None + # Log the incoming Range header for debugging + if range_header: + logger.info(f"[{client_id}] *** CLIENT RANGE REQUEST: {range_header} ***") - # STEP 2: Create streaming generator for actual content - def stream_generator(): - upstream_response = None + # Parse range for logging try: - logger.info(f"[{client_id}] Starting session-based VOD stream for {content_type} {content_name}") + if 'bytes=' in range_header: + range_part = range_header.replace('bytes=', '') + if '-' in range_part: + start_byte, end_byte = range_part.split('-', 1) + if start_byte and int(start_byte) > 0: + start_pos_mb = int(start_byte) / (1024 * 1024) + logger.info(f"[{client_id}] *** VLC SEEKING TO: {start_pos_mb:.1f} MB ***") + else: + logger.info(f"[{client_id}] Range request from start") + except Exception as e: + logger.warning(f"[{client_id}] Could not parse range header: {e}") + else: + logger.info(f"[{client_id}] Full content request (no Range header)") - headers = prepare_headers() + # Get stream from persistent connection with current range + upstream_response = persistent_conn.get_stream(range_header) - # Make request to upstream server - upstream_response = requests.get( - modified_stream_url, - headers=headers, - stream=True, - timeout=(10, 30), - allow_redirects=True - ) - upstream_response.raise_for_status() + # Handle range not satisfiable + if upstream_response is None: + logger.warning(f"[{client_id}] Range not satisfiable - returning 416 error") + return HttpResponse( + "Requested Range Not Satisfiable", + status=416, + headers={ + 'Content-Range': f'bytes */{persistent_conn.content_length}' if persistent_conn.content_length else 'bytes */*' + } + ) - # Log upstream response info - logger.info(f"[{client_id}] Upstream response status: {upstream_response.status_code}") - logger.info(f"[{client_id}] Final URL after redirects: {upstream_response.url}") - if 'content-type' in upstream_response.headers: - logger.info(f"[{client_id}] Upstream content-type: {upstream_response.headers['content-type']}") - if 'content-length' in upstream_response.headers: - logger.info(f"[{client_id}] Upstream content-length: {upstream_response.headers['content-length']}") - if 'content-range' in upstream_response.headers: - logger.info(f"[{client_id}] Upstream content-range: {upstream_response.headers['content-range']}") + connection_headers = persistent_conn.get_headers() + + # Create streaming generator + def stream_generator(): + try: + logger.info(f"[{client_id}] Starting stream from persistent connection") bytes_sent = 0 chunk_count = 0 @@ -798,26 +873,20 @@ class VODConnectionManager: bytes_sent=len(chunk) ) - logger.info(f"[{client_id}] Session-based VOD stream completed: {bytes_sent} bytes sent") + logger.info(f"[{client_id}] Persistent stream completed: {bytes_sent} bytes sent") - except requests.RequestException as e: - logger.error(f"[{client_id}] Error streaming from source: {e}") - yield b"Error: Unable to stream content" except Exception as e: - logger.error(f"[{client_id}] Error in session stream generator: {e}") - finally: - # Don't remove connection tracking for sessions - let it expire naturally - # This allows timeshift operations to reuse the connection slot - if upstream_response: - upstream_response.close() + logger.error(f"[{client_id}] Error in persistent stream: {e}") + # Don't cleanup connection on error - it might be reusable + yield b"Error: Stream interrupted" - # STEP 3: Create streaming response with headers from final URL + # Create streaming response response = StreamingHttpResponse( streaming_content=stream_generator(), - content_type=upstream_content_type + content_type=connection_headers['content_type'] ) - # Set status code based on request type + # Set status code based on range request if range_header: response.status_code = 206 logger.info(f"[{client_id}] Set response status to 206 for range request") @@ -825,66 +894,59 @@ class VODConnectionManager: response.status_code = 200 logger.info(f"[{client_id}] Set response status to 200 for full request") - # Set headers that VLC and other players expect + # Set headers that VLC expects response['Cache-Control'] = 'no-cache' response['Pragma'] = 'no-cache' response['X-Content-Type-Options'] = 'nosniff' response['Connection'] = 'keep-alive' - response['Accept-Ranges'] = upstream_accept_ranges + response['Accept-Ranges'] = 'bytes' - # CRITICAL: Forward Content-Length from final URL to enable VLC seeking - if upstream_content_length: - response['Content-Length'] = upstream_content_length - logger.info(f"[{client_id}] *** FORWARDED Content-Length from final URL: {upstream_content_length} *** (VLC seeking enabled)") + # CRITICAL: Forward Content-Length from persistent connection + if connection_headers['content_length']: + response['Content-Length'] = connection_headers['content_length'] + logger.info(f"[{client_id}] *** FORWARDED Content-Length: {connection_headers['content_length']} *** (VLC seeking enabled)") else: - logger.warning(f"[{client_id}] *** NO Content-Length from final URL *** (VLC seeking may not work)") + logger.warning(f"[{client_id}] *** NO Content-Length available *** (VLC seeking may not work)") - # Forward other useful headers from final URL - if upstream_last_modified: - response['Last-Modified'] = upstream_last_modified - if upstream_etag: - response['ETag'] = upstream_etag - - # Handle range requests - set Content-Range if this is a partial request - if range_header and upstream_content_length: - # Parse range header to set proper Content-Range + # Handle range requests - set Content-Range for partial responses + if range_header and connection_headers['content_length']: try: if 'bytes=' in range_header: range_part = range_header.replace('bytes=', '') if '-' in range_part: start_byte, end_byte = range_part.split('-', 1) start = int(start_byte) if start_byte else 0 - end = int(end_byte) if end_byte else int(upstream_content_length) - 1 - total_size = int(upstream_content_length) + end = int(end_byte) if end_byte else int(connection_headers['content_length']) - 1 + total_size = int(connection_headers['content_length']) content_range = f"bytes {start}-{end}/{total_size}" response['Content-Range'] = content_range logger.info(f"[{client_id}] Set Content-Range: {content_range}") except Exception as e: - logger.warning(f"[{client_id}] Could not parse range for Content-Range header: {e}") + logger.warning(f"[{client_id}] Could not set Content-Range: {e}") - # Log the critical headers we're sending to the client - logger.info(f"[{client_id}] SESSION Response headers to client - Status: {response.status_code}, Accept-Ranges: {response.get('Accept-Ranges', 'MISSING')}") - if 'Content-Length' in response: - logger.info(f"[{client_id}] SESSION Content-Length: {response['Content-Length']}") + # Log response headers + logger.info(f"[{client_id}] PERSISTENT Response - Status: {response.status_code}, Content-Length: {response.get('Content-Length', 'MISSING')}") if 'Content-Range' in response: - logger.info(f"[{client_id}] SESSION Content-Range: {response['Content-Range']}") - if 'Content-Type' in response: - logger.info(f"[{client_id}] SESSION Content-Type: {response['Content-Type']}") + logger.info(f"[{client_id}] PERSISTENT Content-Range: {response['Content-Range']}") - # Critical: Log what VLC needs to see for seeking to work + # Log VLC seeking status if response.status_code == 200: - if upstream_content_length: - logger.info(f"[{client_id}] SESSION VLC SEEKING INFO: ✅ Full content response (200) with Content-Length from final URL. VLC seeking should work!") + if connection_headers['content_length']: + logger.info(f"[{client_id}] ✅ PERSISTENT VLC SEEKING: Full response with Content-Length - seeking should work!") else: - logger.info(f"[{client_id}] SESSION VLC SEEKING INFO: ❌ Full content response (200) but NO Content-Length from final URL. VLC seeking will NOT work!") + logger.info(f"[{client_id}] ❌ PERSISTENT VLC SEEKING: Full response but no Content-Length - seeking won't work!") elif response.status_code == 206: - logger.info(f"[{client_id}] SESSION VLC SEEKING INFO: ✅ Partial content response (206). This confirms seeking is working if VLC requested a range.") + logger.info(f"[{client_id}] ✅ PERSISTENT VLC SEEKING: Partial response - seeking is working!") return response except Exception as e: - logger.error(f"Error in stream_content_with_session: {e}", exc_info=True) + logger.error(f"Error in persistent stream_content_with_session: {e}", exc_info=True) + # Cleanup persistent connection on error + if session_id in self._persistent_connections: + self._persistent_connections[session_id].cleanup() + del self._persistent_connections[session_id] return HttpResponse(f"Streaming error: {str(e)}", status=500) def _apply_timeshift_parameters(self, original_url, utc_start=None, utc_end=None, offset=None): @@ -971,7 +1033,65 @@ class VODConnectionManager: except Exception as e: logger.error(f"Error applying timeshift parameters: {e}") - return original_url# Global instance + return original_url + + def cleanup_persistent_connection(self, session_id: str): + """Clean up a specific persistent connection""" + if session_id in self._persistent_connections: + logger.info(f"[{session_id}] Cleaning up persistent connection") + self._persistent_connections[session_id].cleanup() + del self._persistent_connections[session_id] + + # Also remove session tracking + session_key = f"vod_session:{session_id}" + if self.redis_client: + try: + session_data = self.redis_client.hgetall(session_key) + if session_data: + # Remove from profile connections if counted + if session_data.get(b'connection_counted') == b'True': + profile_id = session_data.get(b'profile_id') + if profile_id: + profile_key = self._get_profile_connections_key(int(profile_id.decode('utf-8'))) + self.redis_client.decr(profile_key) + logger.info(f"[{session_id}] Decremented profile {profile_id.decode('utf-8')} connections") + + self.redis_client.delete(session_key) + logger.info(f"[{session_id}] Removed session tracking") + except Exception as e: + logger.error(f"[{session_id}] Error cleaning up session: {e}") + + def cleanup_stale_persistent_connections(self, max_age_seconds: int = 1800): + """Clean up stale persistent connections that haven't been used recently""" + current_time = time.time() + stale_sessions = [] + + for session_id, conn in self._persistent_connections.items(): + session_key = f"vod_session:{session_id}" + try: + if self.redis_client: + session_data = self.redis_client.hgetall(session_key) + if session_data: + created_at = float(session_data.get(b'created_at', b'0').decode('utf-8')) + if current_time - created_at > max_age_seconds: + stale_sessions.append(session_id) + else: + # Session data missing, connection is stale + stale_sessions.append(session_id) + except Exception as e: + logger.error(f"[{session_id}] Error checking session age: {e}") + stale_sessions.append(session_id) + + # Clean up stale connections + for session_id in stale_sessions: + logger.info(f"[{session_id}] Cleaning up stale persistent connection") + self.cleanup_persistent_connection(session_id) + + if stale_sessions: + logger.info(f"Cleaned up {len(stale_sessions)} stale persistent connections") + + +# Global instance _connection_manager = None def get_connection_manager() -> VODConnectionManager: diff --git a/core/tasks.py b/core/tasks.py index 47bc8cf0..8cc26481 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -633,3 +633,17 @@ def rehash_streams(keys): for account_id in acquired_locks: release_task_lock('refresh_single_m3u_account', account_id) logger.info(f"Released M3U task locks for {len(acquired_locks)} accounts") + + +@shared_task +def cleanup_vod_persistent_connections(): + """Clean up stale VOD persistent connections""" + try: + from apps.proxy.vod_proxy.connection_manager import connection_manager + + # Clean up connections older than 30 minutes + connection_manager.cleanup_stale_persistent_connections(max_age_seconds=1800) + logger.info("VOD persistent connection cleanup completed") + + except Exception as e: + logger.error(f"Error during VOD persistent connection cleanup: {e}")