diff --git a/apps/proxy/vod_proxy/multi_worker_connection_manager.py b/apps/proxy/vod_proxy/multi_worker_connection_manager.py new file mode 100644 index 00000000..1e7c3e68 --- /dev/null +++ b/apps/proxy/vod_proxy/multi_worker_connection_manager.py @@ -0,0 +1,931 @@ +""" +Enhanced VOD Connection Manager with Redis-based connection sharing for multi-worker environments +""" + +import time +import json +import logging +import threading +import random +import re +import requests +import pickle +import base64 +from typing import Optional, Dict, Any +from django.http import StreamingHttpResponse, HttpResponse +from core.utils import RedisClient +from apps.vod.models import Movie, Episode +from apps.m3u.models import M3UAccountProfile + +logger = logging.getLogger("vod_proxy") + + +class SerializableConnectionState: + """Serializable connection state that can be stored in Redis""" + + def __init__(self, session_id: str, stream_url: str, headers: dict, + content_length: str = None, content_type: str = 'video/mp4', + final_url: str = None): + self.session_id = session_id + self.stream_url = stream_url + self.headers = headers + self.content_length = content_length + self.content_type = content_type + self.final_url = final_url + self.last_activity = time.time() + self.request_count = 0 + self.active_streams = 0 + + def to_dict(self): + """Convert to dictionary for Redis storage""" + return { + 'session_id': self.session_id or '', + 'stream_url': self.stream_url or '', + 'headers': json.dumps(self.headers or {}), + 'content_length': str(self.content_length) if self.content_length is not None else '', + 'content_type': self.content_type or 'video/mp4', + 'final_url': self.final_url or '', + 'last_activity': str(self.last_activity), + 'request_count': str(self.request_count), + 'active_streams': str(self.active_streams) + } + + @classmethod + def from_dict(cls, data: dict): + """Create from dictionary loaded from Redis""" + obj = cls( + session_id=data['session_id'], + stream_url=data['stream_url'], + headers=json.loads(data['headers']) if data['headers'] else {}, + content_length=data.get('content_length') if data.get('content_length') else None, + content_type=data.get('content_type', 'video/mp4'), + final_url=data.get('final_url') if data.get('final_url') else None + ) + obj.last_activity = float(data.get('last_activity', time.time())) + obj.request_count = int(data.get('request_count', 0)) + obj.active_streams = int(data.get('active_streams', 0)) + return obj + + +class RedisBackedVODConnection: + """Redis-backed VOD connection that can be accessed from any worker""" + + def __init__(self, session_id: str, redis_client=None): + self.session_id = session_id + self.redis_client = redis_client or RedisClient.get_client() + self.connection_key = f"vod_persistent_connection:{session_id}" + self.lock_key = f"vod_connection_lock:{session_id}" + self.local_session = None # Local requests session + self.local_response = None # Local current response + + def _get_connection_state(self) -> Optional[SerializableConnectionState]: + """Get connection state from Redis""" + if not self.redis_client: + return None + + try: + data = self.redis_client.hgetall(self.connection_key) + if not data: + return None + + # Convert bytes keys/values to strings if needed + if isinstance(list(data.keys())[0], bytes): + data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data.items()} + + return SerializableConnectionState.from_dict(data) + except Exception as e: + logger.error(f"[{self.session_id}] Error getting connection state from Redis: {e}") + return None + + def _save_connection_state(self, state: SerializableConnectionState): + """Save connection state to Redis""" + if not self.redis_client: + return False + + try: + data = state.to_dict() + # Log the data being saved for debugging + logger.debug(f"[{self.session_id}] Saving connection state: {data}") + + # Verify all values are valid for Redis + for key, value in data.items(): + if value is None: + logger.error(f"[{self.session_id}] None value found for key '{key}' - this should not happen") + return False + + self.redis_client.hset(self.connection_key, mapping=data) + self.redis_client.expire(self.connection_key, 3600) # 1 hour TTL + return True + except Exception as e: + logger.error(f"[{self.session_id}] Error saving connection state to Redis: {e}") + return False + + def _acquire_lock(self, timeout: int = 10) -> bool: + """Acquire distributed lock for connection operations""" + if not self.redis_client: + return False + + try: + return self.redis_client.set(self.lock_key, "locked", nx=True, ex=timeout) + except Exception as e: + logger.error(f"[{self.session_id}] Error acquiring lock: {e}") + return False + + def _release_lock(self): + """Release distributed lock""" + if not self.redis_client: + return + + try: + self.redis_client.delete(self.lock_key) + except Exception as e: + logger.error(f"[{self.session_id}] Error releasing lock: {e}") + + def create_connection(self, stream_url: str, headers: dict) -> bool: + """Create a new connection state in Redis""" + if not self._acquire_lock(): + logger.warning(f"[{self.session_id}] Could not acquire lock for connection creation") + return False + + try: + # Check if connection already exists + existing_state = self._get_connection_state() + if existing_state: + logger.info(f"[{self.session_id}] Connection already exists in Redis") + return True + + # Create new connection state + state = SerializableConnectionState(self.session_id, stream_url, headers) + success = self._save_connection_state(state) + + if success: + logger.info(f"[{self.session_id}] Created new connection state in Redis") + + return success + finally: + self._release_lock() + + def get_stream(self, range_header: str = None): + """Get stream with optional range header - works across workers""" + # Get connection state from Redis + state = self._get_connection_state() + if not state: + logger.error(f"[{self.session_id}] No connection state found in Redis") + return None + + # Update activity and increment request count + state.last_activity = time.time() + state.request_count += 1 + + try: + # Create local session if needed + if not self.local_session: + self.local_session = requests.Session() + + # Prepare headers + headers = state.headers.copy() + if range_header: + # Validate range against content length if available + if state.content_length: + validated_range = self._validate_range_header(range_header, int(state.content_length)) + if validated_range is None: + logger.warning(f"[{self.session_id}] Range not satisfiable: {range_header}") + return None + range_header = validated_range + + headers['Range'] = range_header + logger.info(f"[{self.session_id}] Setting Range header: {range_header}") + + # Use final URL if available, otherwise original URL + target_url = state.final_url if state.final_url else state.stream_url + allow_redirects = not state.final_url # Only follow redirects if we don't have final URL + + logger.info(f"[{self.session_id}] Making request #{state.request_count} to {'final' if state.final_url else 'original'} URL") + + # Make request + response = self.local_session.get( + target_url, + headers=headers, + stream=True, + timeout=(10, 30), + allow_redirects=allow_redirects + ) + response.raise_for_status() + + # Update state with response info on first request + if state.request_count == 1: + if not state.content_length: + state.content_length = response.headers.get('content-length') + if not state.content_type: + state.content_type = response.headers.get('content-type', 'video/mp4') + if not state.final_url: + state.final_url = response.url + + logger.info(f"[{self.session_id}] Updated connection state: length={state.content_length}, type={state.content_type}") + + # Save updated state + self._save_connection_state(state) + + self.local_response = response + return response + + except Exception as e: + logger.error(f"[{self.session_id}] Error establishing connection: {e}") + self.cleanup() + raise + + def _validate_range_header(self, range_header: str, content_length: int): + """Validate range header against content length""" + try: + if not range_header or not range_header.startswith('bytes='): + return range_header + + range_part = range_header.replace('bytes=', '') + if '-' not in range_part: + return range_header + + start_str, end_str = range_part.split('-', 1) + + # Parse start byte + if start_str: + start_byte = int(start_str) + if start_byte >= content_length: + return None # Not satisfiable + else: + start_byte = 0 + + # Parse end byte + if end_str: + end_byte = int(end_str) + if end_byte >= content_length: + end_byte = content_length - 1 + else: + end_byte = content_length - 1 + + # Ensure start <= end + if start_byte > end_byte: + return None + + return f"bytes={start_byte}-{end_byte}" + + except (ValueError, IndexError) as e: + logger.warning(f"[{self.session_id}] Could not validate range header {range_header}: {e}") + return range_header + + def increment_active_streams(self): + """Increment active streams count in Redis""" + if not self._acquire_lock(): + return False + + try: + state = self._get_connection_state() + if state: + state.active_streams += 1 + state.last_activity = time.time() + self._save_connection_state(state) + logger.debug(f"[{self.session_id}] Active streams incremented to {state.active_streams}") + return True + return False + finally: + self._release_lock() + + def decrement_active_streams(self): + """Decrement active streams count in Redis""" + if not self._acquire_lock(): + return False + + try: + state = self._get_connection_state() + if state and state.active_streams > 0: + state.active_streams -= 1 + state.last_activity = time.time() + self._save_connection_state(state) + logger.debug(f"[{self.session_id}] Active streams decremented to {state.active_streams}") + return True + return False + finally: + self._release_lock() + + def has_active_streams(self) -> bool: + """Check if connection has any active streams""" + state = self._get_connection_state() + return state.active_streams > 0 if state else False + + def get_headers(self): + """Get headers for response""" + state = self._get_connection_state() + if state: + return { + 'content_length': state.content_length, + 'content_type': state.content_type, + 'final_url': state.final_url + } + return {} + + def cleanup(self): + """Clean up local resources and Redis state""" + if self.local_response: + self.local_response.close() + self.local_response = None + if self.local_session: + self.local_session.close() + self.local_session = None + + # Remove from Redis + if self.redis_client: + try: + self.redis_client.delete(self.connection_key) + self.redis_client.delete(self.lock_key) + logger.info(f"[{self.session_id}] Cleaned up Redis connection state") + except Exception as e: + logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}") + + +# Modify the VODConnectionManager to use Redis-backed connections +class MultiWorkerVODConnectionManager: + """Enhanced VOD Connection Manager that works across multiple uwsgi workers""" + + _instance = None + + @classmethod + def get_instance(cls): + """Get the singleton instance""" + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def __init__(self): + self.redis_client = RedisClient.get_client() + self.connection_ttl = 3600 # 1 hour TTL for connections + self.session_ttl = 1800 # 30 minutes TTL for sessions + self.worker_id = self._get_worker_id() + logger.info(f"MultiWorkerVODConnectionManager initialized for worker {self.worker_id}") + + def _get_worker_id(self): + """Get unique worker ID for this process""" + import os + import socket + try: + # Use combination of hostname and PID for unique worker ID + return f"{socket.gethostname()}-{os.getpid()}" + except: + import random + return f"worker-{random.randint(1000, 9999)}" + + def stream_content_with_session(self, session_id, content_obj, stream_url, m3u_profile, + client_ip, user_agent, request, + utc_start=None, utc_end=None, offset=None, range_header=None): + """Stream content with Redis-backed persistent connection""" + + # Generate client ID + content_type = "movie" if isinstance(content_obj, Movie) else "episode" + content_uuid = str(content_obj.uuid) + content_name = content_obj.name if hasattr(content_obj, 'name') else str(content_obj) + client_id = session_id + + logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed streaming request for {content_type} {content_name}") + + try: + # Create Redis-backed connection + redis_connection = RedisBackedVODConnection(session_id, self.redis_client) + + # Check if connection exists, create if not + existing_state = redis_connection._get_connection_state() + if not existing_state: + logger.info(f"[{client_id}] Worker {self.worker_id} - Creating new Redis-backed connection") + + # Apply timeshift parameters + modified_stream_url = self._apply_timeshift_parameters(stream_url, utc_start, utc_end, offset) + + # Prepare headers + headers = {} + if user_agent: + headers['User-Agent'] = user_agent + + # Forward important headers from request + important_headers = ['authorization', 'x-forwarded-for', 'x-real-ip', 'referer', 'origin', 'accept'] + for header_name in important_headers: + django_header = f'HTTP_{header_name.upper().replace("-", "_")}' + if hasattr(request, 'META') and django_header in request.META: + headers[header_name] = request.META[django_header] + + # Add client IP + if client_ip: + headers['X-Forwarded-For'] = client_ip + headers['X-Real-IP'] = client_ip + + # Add worker identification + headers['X-Worker-ID'] = self.worker_id + + # Create connection state in Redis + if not redis_connection.create_connection(modified_stream_url, headers): + logger.error(f"[{client_id}] Worker {self.worker_id} - Failed to create Redis connection") + return HttpResponse("Failed to create connection", status=500) + + # Create session tracking + session_info = { + "content_type": content_type, + "content_uuid": content_uuid, + "content_name": content_name, + "created_at": str(time.time()), + "last_activity": str(time.time()), + "profile_id": str(m3u_profile.id), + "client_ip": client_ip, + "user_agent": user_agent, + "utc_start": utc_start or "", + "utc_end": utc_end or "", + "offset": str(offset) if offset else "", + "worker_id": self.worker_id, # Track which worker created this + "connection_type": "redis_backed" + } + + session_key = f"vod_session:{session_id}" + if self.redis_client: + self.redis_client.hset(session_key, mapping=session_info) + self.redis_client.expire(session_key, self.session_ttl) + + logger.info(f"[{client_id}] Worker {self.worker_id} - Created session: {session_info}") + else: + logger.info(f"[{client_id}] Worker {self.worker_id} - Using existing Redis-backed connection") + + # Update session activity + session_key = f"vod_session:{session_id}" + if self.redis_client: + self.redis_client.hset(session_key, mapping={ + "last_activity": str(time.time()), + "last_worker_id": self.worker_id # Track which worker last accessed this + }) + self.redis_client.expire(session_key, self.session_ttl) + + # Get stream from Redis-backed connection + upstream_response = redis_connection.get_stream(range_header) + + if upstream_response is None: + logger.warning(f"[{client_id}] Worker {self.worker_id} - Range not satisfiable") + return HttpResponse("Requested Range Not Satisfiable", status=416) + + # Get connection headers + connection_headers = redis_connection.get_headers() + + # Create streaming generator + def stream_generator(): + decremented = False + try: + logger.info(f"[{client_id}] Worker {self.worker_id} - Starting Redis-backed stream") + + # Increment active streams + redis_connection.increment_active_streams() + + bytes_sent = 0 + chunk_count = 0 + + for chunk in upstream_response.iter_content(chunk_size=8192): + if chunk: + yield chunk + bytes_sent += len(chunk) + chunk_count += 1 + + # Update activity every 100 chunks + if chunk_count % 100 == 0: + self.update_connection_activity( + content_type=content_type, + content_uuid=content_uuid, + client_id=client_id, + bytes_sent=len(chunk) + ) + + logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed stream completed: {bytes_sent} bytes sent") + redis_connection.decrement_active_streams() + decremented = True + + except GeneratorExit: + logger.info(f"[{client_id}] Worker {self.worker_id} - Client disconnected from Redis-backed stream") + if not decremented: + redis_connection.decrement_active_streams() + decremented = True + + # Schedule cleanup if no active streams + if not redis_connection.has_active_streams(): + def delayed_cleanup(): + time.sleep(10) # Wait 10 seconds + if not redis_connection.has_active_streams(): + logger.info(f"[{client_id}] Worker {self.worker_id} - Cleaning up idle Redis connection") + redis_connection.cleanup() + + import threading + cleanup_thread = threading.Thread(target=delayed_cleanup) + cleanup_thread.daemon = True + cleanup_thread.start() + + except Exception as e: + logger.error(f"[{client_id}] Worker {self.worker_id} - Error in Redis-backed stream: {e}") + if not decremented: + redis_connection.decrement_active_streams() + decremented = True + redis_connection.cleanup() + yield b"Error: Stream interrupted" + + finally: + if not decremented: + redis_connection.decrement_active_streams() + + # Create streaming response + response = StreamingHttpResponse( + streaming_content=stream_generator(), + content_type=connection_headers.get('content_type', 'video/mp4') + ) + + # Set appropriate status code + response.status_code = 206 if range_header else 200 + + # Set required headers + response['Cache-Control'] = 'no-cache' + response['Pragma'] = 'no-cache' + response['X-Content-Type-Options'] = 'nosniff' + response['Connection'] = 'keep-alive' + response['X-Worker-ID'] = self.worker_id # Identify which worker served this + + if connection_headers.get('content_length'): + response['Accept-Ranges'] = 'bytes' + response['Content-Length'] = connection_headers['content_length'] + + # Set Content-Range for partial requests + if range_header and 'bytes=' in range_header: + try: + range_part = range_header.replace('bytes=', '') + if '-' in range_part: + start_byte, end_byte = range_part.split('-', 1) + start = int(start_byte) if start_byte else 0 + end = int(end_byte) if end_byte else int(connection_headers['content_length']) - 1 + total_size = int(connection_headers['content_length']) + + content_range = f"bytes {start}-{end}/{total_size}" + response['Content-Range'] = content_range + logger.info(f"[{client_id}] Worker {self.worker_id} - Set Content-Range: {content_range}") + except Exception as e: + logger.warning(f"[{client_id}] Worker {self.worker_id} - Could not set Content-Range: {e}") + + logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed response ready (status: {response.status_code})") + return response + + except Exception as e: + logger.error(f"[{client_id}] Worker {self.worker_id} - Error in Redis-backed stream_content_with_session: {e}", exc_info=True) + return HttpResponse(f"Streaming error: {str(e)}", status=500) + + def _apply_timeshift_parameters(self, original_url, utc_start=None, utc_end=None, offset=None): + """Apply timeshift parameters to URL""" + if not any([utc_start, utc_end, offset]): + return original_url + + try: + from urllib.parse import urlparse, urlunparse, parse_qs, urlencode + + parsed_url = urlparse(original_url) + query_params = parse_qs(parsed_url.query) + path = parsed_url.path + + logger.info(f"Applying timeshift parameters: utc_start={utc_start}, utc_end={utc_end}, offset={offset}") + + # Add timeshift parameters + if utc_start: + query_params['utc_start'] = [utc_start] + query_params['start'] = [utc_start] + logger.info(f"Added utc_start/start parameter: {utc_start}") + + if utc_end: + query_params['utc_end'] = [utc_end] + query_params['end'] = [utc_end] + logger.info(f"Added utc_end/end parameter: {utc_end}") + + if offset: + try: + offset_seconds = int(offset) + query_params['offset'] = [str(offset_seconds)] + query_params['seek'] = [str(offset_seconds)] + query_params['t'] = [str(offset_seconds)] + logger.info(f"Added offset/seek/t parameter: {offset_seconds}") + except ValueError: + logger.warning(f"Invalid offset value: {offset}") + + # Handle special catchup URL patterns + if utc_start: + try: + from datetime import datetime + import re + + # Parse the UTC start time + start_dt = datetime.fromisoformat(utc_start.replace('Z', '+00:00')) + + # Check for catchup URL patterns like /catchup/YYYY-MM-DD/HH-MM-SS/ + catchup_pattern = r'/catchup/\d{4}-\d{2}-\d{2}/\d{2}-\d{2}-\d{2}/' + if re.search(catchup_pattern, path): + # Replace the date/time in the path + date_part = start_dt.strftime('%Y-%m-%d') + time_part = start_dt.strftime('%H-%M-%S') + + path = re.sub(catchup_pattern, f'/catchup/{date_part}/{time_part}/', path) + logger.info(f"Modified catchup path: {path}") + except Exception as e: + logger.warning(f"Could not parse timeshift date: {e}") + + # Reconstruct URL + new_query = urlencode(query_params, doseq=True) + modified_url = urlunparse(( + parsed_url.scheme, + parsed_url.netloc, + path, + parsed_url.params, + new_query, + parsed_url.fragment + )) + + logger.info(f"Modified URL: {modified_url}") + return modified_url + + except Exception as e: + logger.error(f"Error applying timeshift parameters: {e}") + return original_url + + def cleanup_persistent_connection(self, session_id: str): + """Clean up a specific Redis-backed persistent connection""" + logger.info(f"[{session_id}] Cleaning up Redis-backed persistent connection") + + redis_connection = RedisBackedVODConnection(session_id, self.redis_client) + redis_connection.cleanup() + + # Also clean up session data + if self.redis_client: + try: + session_key = f"vod_session:{session_id}" + self.redis_client.delete(session_key) + logger.info(f"[{session_id}] Cleaned up session data") + except Exception as e: + logger.error(f"[{session_id}] Error cleaning up session data: {e}") + + def cleanup_stale_persistent_connections(self, max_age_seconds: int = 1800): + """Clean up stale Redis-backed persistent connections""" + if not self.redis_client: + return + + try: + logger.info(f"Cleaning up Redis-backed connections older than {max_age_seconds} seconds") + + # Find all persistent connection keys + pattern = "vod_persistent_connection:*" + cursor = 0 + cleanup_count = 0 + current_time = time.time() + + while True: + cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100) + + for key in keys: + try: + # Get connection state + data = self.redis_client.hgetall(key) + if not data: + continue + + # Convert bytes to strings if needed + if isinstance(list(data.keys())[0], bytes): + data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data.items()} + + last_activity = float(data.get('last_activity', 0)) + active_streams = int(data.get('active_streams', 0)) + + # Clean up if stale and no active streams + if (current_time - last_activity > max_age_seconds) and active_streams == 0: + session_id = key.decode('utf-8').replace('vod_persistent_connection:', '') + logger.info(f"Cleaning up stale connection: {session_id}") + + # Clean up connection and related keys + redis_connection = RedisBackedVODConnection(session_id, self.redis_client) + redis_connection.cleanup() + cleanup_count += 1 + + except Exception as e: + logger.error(f"Error processing connection key {key}: {e}") + continue + + if cursor == 0: + break + + if cleanup_count > 0: + logger.info(f"Cleaned up {cleanup_count} stale Redis-backed connections") + else: + logger.debug("No stale Redis-backed connections found") + + except Exception as e: + logger.error(f"Error during Redis-backed connection cleanup: {e}") + + def create_connection(self, content_type: str, content_uuid: str, content_name: str, + client_id: str, client_ip: str, user_agent: str, + m3u_profile: M3UAccountProfile) -> bool: + """Create connection tracking in Redis (same as original but for Redis-backed connections)""" + if not self.redis_client: + logger.error("Redis client not available for VOD connection tracking") + return False + + try: + # Check profile connection limits + profile_connections_key = f"profile_connections:{m3u_profile.id}" + current_connections = self.redis_client.get(profile_connections_key) + max_connections = getattr(m3u_profile, 'max_connections', 3) # Default to 3 + + if current_connections and int(current_connections) >= max_connections: + logger.warning(f"Profile {m3u_profile.name} connection limit exceeded ({current_connections}/{max_connections})") + return False + + # Create connection tracking + connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}" + content_connections_key = f"vod_proxy:content:{content_type}:{content_uuid}:connections" + + # Check if connection already exists + if self.redis_client.exists(connection_key): + logger.info(f"Connection already exists for {client_id} - {content_type} {content_name}") + self.redis_client.hset(connection_key, "last_activity", str(time.time())) + return True + + # Connection data + connection_data = { + "content_type": content_type, + "content_uuid": content_uuid, + "content_name": content_name, + "client_id": client_id, + "client_ip": client_ip, + "user_agent": user_agent, + "m3u_profile_id": m3u_profile.id, + "m3u_profile_name": m3u_profile.name, + "connected_at": str(time.time()), + "last_activity": str(time.time()), + "bytes_sent": "0", + "position_seconds": "0" + } + + # Use pipeline for atomic operations + pipe = self.redis_client.pipeline() + pipe.hset(connection_key, mapping=connection_data) + pipe.expire(connection_key, self.connection_ttl) + pipe.incr(profile_connections_key) + pipe.sadd(content_connections_key, client_id) + pipe.expire(content_connections_key, self.connection_ttl) + pipe.execute() + + logger.info(f"Created Redis-backed VOD connection: {client_id} for {content_type} {content_name}") + return True + + except Exception as e: + logger.error(f"Error creating Redis-backed connection: {e}") + return False + + def remove_connection(self, content_type: str, content_uuid: str, client_id: str): + """Remove connection tracking from Redis""" + if not self.redis_client: + return + + try: + connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}" + content_connections_key = f"vod_proxy:content:{content_type}:{content_uuid}:connections" + + # Get connection data to find profile + connection_data = self.redis_client.hgetall(connection_key) + if connection_data: + # Convert bytes to strings if needed + if isinstance(list(connection_data.keys())[0], bytes): + connection_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in connection_data.items()} + + profile_id = connection_data.get('m3u_profile_id') + if profile_id: + profile_connections_key = f"profile_connections:{profile_id}" + + # Use pipeline for atomic operations + pipe = self.redis_client.pipeline() + pipe.delete(connection_key) + pipe.srem(content_connections_key, client_id) + pipe.decr(profile_connections_key) + pipe.execute() + + logger.info(f"Removed Redis-backed connection: {client_id}") + + except Exception as e: + logger.error(f"Error removing Redis-backed connection: {e}") + + def update_connection_activity(self, content_type: str, content_uuid: str, + client_id: str, bytes_sent: int): + """Update connection activity in Redis""" + if not self.redis_client: + return + + try: + connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}" + pipe = self.redis_client.pipeline() + pipe.hset(connection_key, mapping={ + "last_activity": str(time.time()), + "bytes_sent": str(bytes_sent) + }) + pipe.expire(connection_key, self.connection_ttl) + pipe.execute() + except Exception as e: + logger.error(f"Error updating connection activity: {e}") + + def find_matching_idle_session(self, content_type: str, content_uuid: str, + client_ip: str, user_agent: str, + utc_start=None, utc_end=None, offset=None) -> Optional[str]: + """Find existing Redis-backed session that matches criteria""" + if not self.redis_client: + return None + + try: + # Search for sessions with matching content + pattern = "vod_session:*" + cursor = 0 + matching_sessions = [] + + while True: + cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100) + + for key in keys: + try: + session_data = self.redis_client.hgetall(key) + if not session_data: + continue + + # Convert bytes keys/values to strings if needed + if isinstance(list(session_data.keys())[0], bytes): + session_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in session_data.items()} + + # Check if content matches + stored_content_type = session_data.get('content_type', '') + stored_content_uuid = session_data.get('content_uuid', '') + + if stored_content_type != content_type or stored_content_uuid != content_uuid: + continue + + # Extract session ID + session_id = key.decode('utf-8').replace('vod_session:', '') + + # Check if Redis-backed connection exists and has no active streams + redis_connection = RedisBackedVODConnection(session_id, self.redis_client) + if redis_connection.has_active_streams(): + continue + + # Calculate match score + score = 10 # Content match + match_reasons = ["content"] + + # Check other criteria + stored_client_ip = session_data.get('client_ip', '') + stored_user_agent = session_data.get('user_agent', '') + + if stored_client_ip and stored_client_ip == client_ip: + score += 5 + match_reasons.append("ip") + + if stored_user_agent and stored_user_agent == user_agent: + score += 3 + match_reasons.append("user-agent") + + # Check timeshift parameters + stored_utc_start = session_data.get('utc_start', '') + stored_utc_end = session_data.get('utc_end', '') + stored_offset = session_data.get('offset', '') + + current_utc_start = utc_start or "" + current_utc_end = utc_end or "" + current_offset = str(offset) if offset else "" + + if (stored_utc_start == current_utc_start and + stored_utc_end == current_utc_end and + stored_offset == current_offset): + score += 7 + match_reasons.append("timeshift") + + if score >= 13: # Good match threshold + matching_sessions.append({ + 'session_id': session_id, + 'score': score, + 'reasons': match_reasons, + 'last_activity': float(session_data.get('last_activity', '0')) + }) + + except Exception as e: + logger.debug(f"Error processing session key {key}: {e}") + continue + + if cursor == 0: + break + + # Sort by score and last activity + matching_sessions.sort(key=lambda x: (x['score'], x['last_activity']), reverse=True) + + if matching_sessions: + best_match = matching_sessions[0] + logger.info(f"Found matching Redis-backed idle session: {best_match['session_id']} " + f"(score: {best_match['score']}, reasons: {', '.join(best_match['reasons'])})") + return best_match['session_id'] + + return None + + except Exception as e: + logger.error(f"Error finding matching idle session: {e}") + return None diff --git a/apps/proxy/vod_proxy/views.py b/apps/proxy/vod_proxy/views.py index e13220eb..8a5abd88 100644 --- a/apps/proxy/vod_proxy/views.py +++ b/apps/proxy/vod_proxy/views.py @@ -15,6 +15,7 @@ from django.views import View from apps.vod.models import Movie, Series, Episode from apps.m3u.models import M3UAccount, M3UAccountProfile from apps.proxy.vod_proxy.connection_manager import VODConnectionManager +from apps.proxy.vod_proxy.multi_worker_connection_manager import MultiWorkerVODConnectionManager from .utils import get_client_info, create_vod_response logger = logging.getLogger(__name__) @@ -180,8 +181,9 @@ class VODStreamView(View): logger.error(f"[VOD-ERROR] Invalid stream URL: {final_stream_url}") return HttpResponse("Invalid stream URL", status=500) - # Get connection manager - connection_manager = VODConnectionManager.get_instance() + # Get connection manager (Redis-backed for multi-worker support) + connection_manager = MultiWorkerVODConnectionManager.get_instance() + # Stream the content with session-based connection reuse logger.info("[VOD-STREAM] Calling connection manager to stream content") @@ -304,7 +306,9 @@ class VODStreamView(View): # Now create a persistent connection for the session (if one doesn't exist) # This ensures the FUSE GET requests will reuse the same connection - connection_manager = VODConnectionManager.get_instance() + + connection_manager = MultiWorkerVODConnectionManager.get_instance() + logger.info(f"[VOD-HEAD] Pre-creating persistent connection for session: {session_id}") # We don't actually stream content here, just ensure connection is ready diff --git a/docker/uwsgi.debug.ini b/docker/uwsgi.debug.ini index e049df87..2bc42119 100644 --- a/docker/uwsgi.debug.ini +++ b/docker/uwsgi.debug.ini @@ -27,7 +27,7 @@ die-on-term = true static-map = /static=/app/static # Worker configuration -workers = 1 +workers = 8 threads = 8 enable-threads = true lazy-apps = true