forked from Mirrors/Dispatcharr
1390 lines
No EOL
65 KiB
Python
1390 lines
No EOL
65 KiB
Python
"""
|
|
Enhanced VOD Connection Manager with Redis-based connection sharing for multi-worker environments
|
|
"""
|
|
|
|
import time
|
|
import json
|
|
import logging
|
|
import threading
|
|
import random
|
|
import re
|
|
import requests
|
|
import pickle
|
|
import base64
|
|
import os
|
|
import socket
|
|
import mimetypes
|
|
from urllib.parse import urlparse
|
|
from typing import Optional, Dict, Any
|
|
from django.http import StreamingHttpResponse, HttpResponse
|
|
from core.utils import RedisClient
|
|
from apps.vod.models import Movie, Episode
|
|
from apps.m3u.models import M3UAccountProfile
|
|
|
|
logger = logging.getLogger("vod_proxy")
|
|
|
|
|
|
def get_vod_client_stop_key(client_id):
|
|
"""Get the Redis key for signaling a VOD client to stop"""
|
|
return f"vod_proxy:client:{client_id}:stop"
|
|
|
|
|
|
def infer_content_type_from_url(url: str) -> Optional[str]:
|
|
"""
|
|
Infer MIME type from file extension in URL
|
|
|
|
Args:
|
|
url: The stream URL
|
|
|
|
Returns:
|
|
MIME type string or None if cannot be determined
|
|
"""
|
|
try:
|
|
parsed_url = urlparse(url)
|
|
path = parsed_url.path
|
|
|
|
# Extract file extension
|
|
_, ext = os.path.splitext(path)
|
|
ext = ext.lower()
|
|
|
|
# Common video format mappings
|
|
video_mime_types = {
|
|
'.mp4': 'video/mp4',
|
|
'.mkv': 'video/x-matroska',
|
|
'.avi': 'video/x-msvideo',
|
|
'.mov': 'video/quicktime',
|
|
'.wmv': 'video/x-ms-wmv',
|
|
'.flv': 'video/x-flv',
|
|
'.webm': 'video/webm',
|
|
'.m4v': 'video/x-m4v',
|
|
'.3gp': 'video/3gpp',
|
|
'.ts': 'video/mp2t',
|
|
'.m3u8': 'application/x-mpegURL',
|
|
'.mpg': 'video/mpeg',
|
|
'.mpeg': 'video/mpeg',
|
|
}
|
|
|
|
if ext in video_mime_types:
|
|
logger.debug(f"Inferred content type '{video_mime_types[ext]}' from extension '{ext}' in URL: {url}")
|
|
return video_mime_types[ext]
|
|
|
|
# Fallback to mimetypes module
|
|
mime_type, _ = mimetypes.guess_type(path)
|
|
if mime_type and mime_type.startswith('video/'):
|
|
logger.debug(f"Inferred content type '{mime_type}' using mimetypes for URL: {url}")
|
|
return mime_type
|
|
|
|
logger.debug(f"Could not infer content type from URL: {url}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error inferring content type from URL '{url}': {e}")
|
|
return None
|
|
|
|
|
|
class SerializableConnectionState:
|
|
"""Serializable connection state that can be stored in Redis"""
|
|
|
|
def __init__(self, session_id: str, stream_url: str, headers: dict,
|
|
content_length: str = None, content_type: str = None,
|
|
final_url: str = None, m3u_profile_id: int = None,
|
|
# Session metadata fields (previously stored in vod_session key)
|
|
content_obj_type: str = None, content_uuid: str = None,
|
|
content_name: str = None, client_ip: str = None,
|
|
client_user_agent: str = None, utc_start: str = None,
|
|
utc_end: str = None, offset: str = None,
|
|
worker_id: str = None, connection_type: str = "redis_backed"):
|
|
self.session_id = session_id
|
|
self.stream_url = stream_url
|
|
self.headers = headers
|
|
self.content_length = content_length
|
|
self.content_type = content_type
|
|
self.final_url = final_url
|
|
self.m3u_profile_id = m3u_profile_id # Store M3U profile ID for connection counting
|
|
self.last_activity = time.time()
|
|
self.request_count = 0
|
|
self.active_streams = 0
|
|
|
|
# Session metadata (consolidated from vod_session key)
|
|
self.content_obj_type = content_obj_type
|
|
self.content_uuid = content_uuid
|
|
self.content_name = content_name
|
|
self.client_ip = client_ip
|
|
self.client_user_agent = client_user_agent
|
|
self.utc_start = utc_start or ""
|
|
self.utc_end = utc_end or ""
|
|
self.offset = offset or ""
|
|
self.worker_id = worker_id
|
|
self.connection_type = connection_type
|
|
self.created_at = time.time()
|
|
|
|
# Additional tracking fields
|
|
self.bytes_sent = 0
|
|
self.position_seconds = 0
|
|
|
|
# Range/seek tracking for position calculation
|
|
self.last_seek_byte = 0
|
|
self.last_seek_percentage = 0.0
|
|
self.total_content_size = 0
|
|
self.last_seek_timestamp = 0.0
|
|
|
|
def to_dict(self):
|
|
"""Convert to dictionary for Redis storage"""
|
|
return {
|
|
'session_id': self.session_id or '',
|
|
'stream_url': self.stream_url or '',
|
|
'headers': json.dumps(self.headers or {}),
|
|
'content_length': str(self.content_length) if self.content_length is not None else '',
|
|
'content_type': self.content_type or '',
|
|
'final_url': self.final_url or '',
|
|
'm3u_profile_id': str(self.m3u_profile_id) if self.m3u_profile_id is not None else '',
|
|
'last_activity': str(self.last_activity),
|
|
'request_count': str(self.request_count),
|
|
'active_streams': str(self.active_streams),
|
|
# Session metadata
|
|
'content_obj_type': self.content_obj_type or '',
|
|
'content_uuid': self.content_uuid or '',
|
|
'content_name': self.content_name or '',
|
|
'client_ip': self.client_ip or '',
|
|
'client_user_agent': self.client_user_agent or '',
|
|
'utc_start': self.utc_start or '',
|
|
'utc_end': self.utc_end or '',
|
|
'offset': self.offset or '',
|
|
'worker_id': self.worker_id or '',
|
|
'connection_type': self.connection_type or 'redis_backed',
|
|
'created_at': str(self.created_at),
|
|
# Additional tracking fields
|
|
'bytes_sent': str(self.bytes_sent),
|
|
'position_seconds': str(self.position_seconds),
|
|
# Range/seek tracking
|
|
'last_seek_byte': str(self.last_seek_byte),
|
|
'last_seek_percentage': str(self.last_seek_percentage),
|
|
'total_content_size': str(self.total_content_size),
|
|
'last_seek_timestamp': str(self.last_seek_timestamp)
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict):
|
|
"""Create from dictionary loaded from Redis"""
|
|
obj = cls(
|
|
session_id=data['session_id'],
|
|
stream_url=data['stream_url'],
|
|
headers=json.loads(data['headers']) if data['headers'] else {},
|
|
content_length=data.get('content_length') if data.get('content_length') else None,
|
|
content_type=data.get('content_type') or None,
|
|
final_url=data.get('final_url') if data.get('final_url') else None,
|
|
m3u_profile_id=int(data.get('m3u_profile_id')) if data.get('m3u_profile_id') else None,
|
|
# Session metadata
|
|
content_obj_type=data.get('content_obj_type') or None,
|
|
content_uuid=data.get('content_uuid') or None,
|
|
content_name=data.get('content_name') or None,
|
|
client_ip=data.get('client_ip') or None,
|
|
client_user_agent=data.get('client_user_agent') or data.get('user_agent') or None,
|
|
utc_start=data.get('utc_start') or '',
|
|
utc_end=data.get('utc_end') or '',
|
|
offset=data.get('offset') or '',
|
|
worker_id=data.get('worker_id') or None,
|
|
connection_type=data.get('connection_type', 'redis_backed')
|
|
)
|
|
obj.last_activity = float(data.get('last_activity', time.time()))
|
|
obj.request_count = int(data.get('request_count', 0))
|
|
obj.active_streams = int(data.get('active_streams', 0))
|
|
obj.created_at = float(data.get('created_at', time.time()))
|
|
# Additional tracking fields
|
|
obj.bytes_sent = int(data.get('bytes_sent', 0))
|
|
obj.position_seconds = int(data.get('position_seconds', 0))
|
|
# Range/seek tracking
|
|
obj.last_seek_byte = int(data.get('last_seek_byte', 0))
|
|
obj.last_seek_percentage = float(data.get('last_seek_percentage', 0.0))
|
|
obj.total_content_size = int(data.get('total_content_size', 0))
|
|
obj.last_seek_timestamp = float(data.get('last_seek_timestamp', 0.0))
|
|
return obj
|
|
|
|
|
|
class RedisBackedVODConnection:
|
|
"""Redis-backed VOD connection that can be accessed from any worker"""
|
|
|
|
def __init__(self, session_id: str, redis_client=None):
|
|
self.session_id = session_id
|
|
self.redis_client = redis_client or RedisClient.get_client()
|
|
self.connection_key = f"vod_persistent_connection:{session_id}"
|
|
self.lock_key = f"vod_connection_lock:{session_id}"
|
|
self.local_session = None # Local requests session
|
|
self.local_response = None # Local current response
|
|
|
|
def _get_connection_state(self) -> Optional[SerializableConnectionState]:
|
|
"""Get connection state from Redis"""
|
|
if not self.redis_client:
|
|
return None
|
|
|
|
try:
|
|
data = self.redis_client.hgetall(self.connection_key)
|
|
if not data:
|
|
return None
|
|
|
|
# Convert bytes keys/values to strings if needed
|
|
if isinstance(list(data.keys())[0], bytes):
|
|
data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data.items()}
|
|
|
|
return SerializableConnectionState.from_dict(data)
|
|
except Exception as e:
|
|
logger.error(f"[{self.session_id}] Error getting connection state from Redis: {e}")
|
|
return None
|
|
|
|
def _save_connection_state(self, state: SerializableConnectionState):
|
|
"""Save connection state to Redis"""
|
|
if not self.redis_client:
|
|
return False
|
|
|
|
try:
|
|
data = state.to_dict()
|
|
# Log the data being saved for debugging
|
|
logger.trace(f"[{self.session_id}] Saving connection state: {data}")
|
|
|
|
# Verify all values are valid for Redis
|
|
for key, value in data.items():
|
|
if value is None:
|
|
logger.error(f"[{self.session_id}] None value found for key '{key}' - this should not happen")
|
|
return False
|
|
|
|
self.redis_client.hset(self.connection_key, mapping=data)
|
|
self.redis_client.expire(self.connection_key, 3600) # 1 hour TTL
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"[{self.session_id}] Error saving connection state to Redis: {e}")
|
|
return False
|
|
|
|
def _acquire_lock(self, timeout: int = 10) -> bool:
|
|
"""Acquire distributed lock for connection operations"""
|
|
if not self.redis_client:
|
|
return False
|
|
|
|
try:
|
|
return self.redis_client.set(self.lock_key, "locked", nx=True, ex=timeout)
|
|
except Exception as e:
|
|
logger.error(f"[{self.session_id}] Error acquiring lock: {e}")
|
|
return False
|
|
|
|
def _release_lock(self):
|
|
"""Release distributed lock"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
try:
|
|
self.redis_client.delete(self.lock_key)
|
|
except Exception as e:
|
|
logger.error(f"[{self.session_id}] Error releasing lock: {e}")
|
|
|
|
def create_connection(self, stream_url: str, headers: dict, m3u_profile_id: int = None,
|
|
# Session metadata (consolidated from vod_session key)
|
|
content_obj_type: str = None, content_uuid: str = None,
|
|
content_name: str = None, client_ip: str = None,
|
|
client_user_agent: str = None, utc_start: str = None,
|
|
utc_end: str = None, offset: str = None,
|
|
worker_id: str = None) -> bool:
|
|
"""Create a new connection state in Redis with consolidated session metadata"""
|
|
if not self._acquire_lock():
|
|
logger.warning(f"[{self.session_id}] Could not acquire lock for connection creation")
|
|
return False
|
|
|
|
try:
|
|
# Check if connection already exists
|
|
existing_state = self._get_connection_state()
|
|
if existing_state:
|
|
logger.info(f"[{self.session_id}] Connection already exists in Redis")
|
|
return True
|
|
|
|
# Create new connection state with consolidated session metadata
|
|
state = SerializableConnectionState(
|
|
session_id=self.session_id,
|
|
stream_url=stream_url,
|
|
headers=headers,
|
|
m3u_profile_id=m3u_profile_id,
|
|
# Session metadata
|
|
content_obj_type=content_obj_type,
|
|
content_uuid=content_uuid,
|
|
content_name=content_name,
|
|
client_ip=client_ip,
|
|
client_user_agent=client_user_agent,
|
|
utc_start=utc_start,
|
|
utc_end=utc_end,
|
|
offset=offset,
|
|
worker_id=worker_id
|
|
)
|
|
success = self._save_connection_state(state)
|
|
|
|
if success:
|
|
logger.info(f"[{self.session_id}] Created new connection state in Redis with consolidated session metadata")
|
|
|
|
return success
|
|
finally:
|
|
self._release_lock()
|
|
|
|
def get_stream(self, range_header: str = None):
|
|
"""Get stream with optional range header - works across workers"""
|
|
# Get connection state from Redis
|
|
state = self._get_connection_state()
|
|
if not state:
|
|
logger.error(f"[{self.session_id}] No connection state found in Redis")
|
|
return None
|
|
|
|
# Update activity and increment request count
|
|
state.last_activity = time.time()
|
|
state.request_count += 1
|
|
|
|
try:
|
|
# Create local session if needed
|
|
if not self.local_session:
|
|
self.local_session = requests.Session()
|
|
|
|
# Prepare headers
|
|
headers = state.headers.copy()
|
|
if range_header:
|
|
# Validate range against content length if available
|
|
if state.content_length:
|
|
validated_range = self._validate_range_header(range_header, int(state.content_length))
|
|
if validated_range is None:
|
|
logger.warning(f"[{self.session_id}] Range not satisfiable: {range_header}")
|
|
return None
|
|
range_header = validated_range
|
|
|
|
headers['Range'] = range_header
|
|
logger.info(f"[{self.session_id}] Setting Range header: {range_header}")
|
|
|
|
# Use final URL if available, otherwise original URL
|
|
target_url = state.final_url if state.final_url else state.stream_url
|
|
allow_redirects = not state.final_url # Only follow redirects if we don't have final URL
|
|
|
|
logger.info(f"[{self.session_id}] Making request #{state.request_count} to {'final' if state.final_url else 'original'} URL")
|
|
|
|
# Make request
|
|
response = self.local_session.get(
|
|
target_url,
|
|
headers=headers,
|
|
stream=True,
|
|
timeout=(10, 30),
|
|
allow_redirects=allow_redirects
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Update state with response info on first request
|
|
if state.request_count == 1:
|
|
if not state.content_length:
|
|
# Try to get full file size from Content-Range header first (for range requests)
|
|
content_range = response.headers.get('content-range')
|
|
if content_range and '/' in content_range:
|
|
try:
|
|
# Parse "bytes 0-1023/12653476926" to get total size
|
|
total_size = content_range.split('/')[-1]
|
|
if total_size.isdigit():
|
|
state.content_length = total_size
|
|
logger.debug(f"[{self.session_id}] Got full file size from Content-Range: {total_size}")
|
|
else:
|
|
# Fallback to Content-Length for partial size
|
|
state.content_length = response.headers.get('content-length')
|
|
except Exception as e:
|
|
logger.warning(f"[{self.session_id}] Error parsing Content-Range: {e}")
|
|
state.content_length = response.headers.get('content-length')
|
|
else:
|
|
# No Content-Range, use Content-Length (for non-range requests)
|
|
state.content_length = response.headers.get('content-length')
|
|
|
|
logger.debug(f"[{self.session_id}] Response headers received: {dict(response.headers)}")
|
|
|
|
if not state.content_type: # This will be True for None, '', or any falsy value
|
|
# Get content type from provider response headers
|
|
provider_content_type = (response.headers.get('content-type') or
|
|
response.headers.get('Content-Type') or
|
|
response.headers.get('CONTENT-TYPE'))
|
|
|
|
if provider_content_type:
|
|
logger.debug(f"[{self.session_id}] Using provider Content-Type: '{provider_content_type}'")
|
|
state.content_type = provider_content_type
|
|
else:
|
|
# Provider didn't send Content-Type, infer from URL extension
|
|
inferred_content_type = infer_content_type_from_url(state.stream_url)
|
|
if inferred_content_type:
|
|
logger.info(f"[{self.session_id}] Provider missing Content-Type, inferred from URL: '{inferred_content_type}'")
|
|
state.content_type = inferred_content_type
|
|
else:
|
|
logger.debug(f"[{self.session_id}] No Content-Type from provider and could not infer from URL, using default: 'video/mp4'")
|
|
state.content_type = 'video/mp4'
|
|
else:
|
|
logger.debug(f"[{self.session_id}] Content-Type already set in state: {state.content_type}")
|
|
if not state.final_url:
|
|
state.final_url = response.url
|
|
|
|
logger.info(f"[{self.session_id}] Updated connection state: length={state.content_length}, type={state.content_type}")
|
|
|
|
# Save updated state
|
|
self._save_connection_state(state)
|
|
|
|
self.local_response = response
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{self.session_id}] Error establishing connection: {e}")
|
|
self.cleanup()
|
|
raise
|
|
|
|
def _validate_range_header(self, range_header: str, content_length: int):
|
|
"""Validate range header against content length"""
|
|
try:
|
|
if not range_header or not range_header.startswith('bytes='):
|
|
return range_header
|
|
|
|
range_part = range_header.replace('bytes=', '')
|
|
if '-' not in range_part:
|
|
return range_header
|
|
|
|
start_str, end_str = range_part.split('-', 1)
|
|
|
|
# Parse start byte
|
|
if start_str:
|
|
start_byte = int(start_str)
|
|
if start_byte >= content_length:
|
|
return None # Not satisfiable
|
|
else:
|
|
start_byte = 0
|
|
|
|
# Parse end byte
|
|
if end_str:
|
|
end_byte = int(end_str)
|
|
if end_byte >= content_length:
|
|
end_byte = content_length - 1
|
|
else:
|
|
end_byte = content_length - 1
|
|
|
|
# Ensure start <= end
|
|
if start_byte > end_byte:
|
|
return None
|
|
|
|
return f"bytes={start_byte}-{end_byte}"
|
|
|
|
except (ValueError, IndexError) as e:
|
|
logger.warning(f"[{self.session_id}] Could not validate range header {range_header}: {e}")
|
|
return range_header
|
|
|
|
def increment_active_streams(self):
|
|
"""Increment active streams count in Redis"""
|
|
if not self._acquire_lock():
|
|
return False
|
|
|
|
try:
|
|
state = self._get_connection_state()
|
|
if state:
|
|
state.active_streams += 1
|
|
state.last_activity = time.time()
|
|
self._save_connection_state(state)
|
|
logger.debug(f"[{self.session_id}] Active streams incremented to {state.active_streams}")
|
|
return True
|
|
return False
|
|
finally:
|
|
self._release_lock()
|
|
|
|
def decrement_active_streams(self):
|
|
"""Decrement active streams count in Redis"""
|
|
if not self._acquire_lock():
|
|
return False
|
|
|
|
try:
|
|
state = self._get_connection_state()
|
|
if state and state.active_streams > 0:
|
|
state.active_streams -= 1
|
|
state.last_activity = time.time()
|
|
self._save_connection_state(state)
|
|
logger.debug(f"[{self.session_id}] Active streams decremented to {state.active_streams}")
|
|
return True
|
|
return False
|
|
finally:
|
|
self._release_lock()
|
|
|
|
def has_active_streams(self) -> bool:
|
|
"""Check if connection has any active streams"""
|
|
state = self._get_connection_state()
|
|
return state.active_streams > 0 if state else False
|
|
|
|
def get_headers(self):
|
|
"""Get headers for response"""
|
|
state = self._get_connection_state()
|
|
if state:
|
|
return {
|
|
'content_length': state.content_length,
|
|
'content_type': state.content_type or 'video/mp4',
|
|
'final_url': state.final_url
|
|
}
|
|
return {}
|
|
|
|
def get_session_metadata(self):
|
|
"""Get session metadata from consolidated connection state"""
|
|
state = self._get_connection_state()
|
|
if state:
|
|
return {
|
|
'content_obj_type': state.content_obj_type,
|
|
'content_uuid': state.content_uuid,
|
|
'content_name': state.content_name,
|
|
'client_ip': state.client_ip,
|
|
'client_user_agent': state.client_user_agent,
|
|
'utc_start': state.utc_start,
|
|
'utc_end': state.utc_end,
|
|
'offset': state.offset,
|
|
'worker_id': state.worker_id,
|
|
'connection_type': state.connection_type,
|
|
'created_at': state.created_at,
|
|
'last_activity': state.last_activity,
|
|
'm3u_profile_id': state.m3u_profile_id,
|
|
'bytes_sent': state.bytes_sent,
|
|
'position_seconds': state.position_seconds,
|
|
'active_streams': state.active_streams,
|
|
'request_count': state.request_count,
|
|
# Range/seek tracking
|
|
'last_seek_byte': state.last_seek_byte,
|
|
'last_seek_percentage': state.last_seek_percentage,
|
|
'total_content_size': state.total_content_size,
|
|
'last_seek_timestamp': state.last_seek_timestamp
|
|
}
|
|
return {}
|
|
|
|
def cleanup(self, connection_manager=None, current_worker_id=None):
|
|
"""Smart cleanup based on worker ownership and active streams"""
|
|
# Always clean up local resources first
|
|
if self.local_response:
|
|
self.local_response.close()
|
|
self.local_response = None
|
|
if self.local_session:
|
|
self.local_session.close()
|
|
self.local_session = None
|
|
|
|
# Get current connection state to check ownership and active streams
|
|
state = self._get_connection_state()
|
|
|
|
if not state:
|
|
logger.info(f"[{self.session_id}] No connection state found - local cleanup only")
|
|
return
|
|
|
|
# Check if there are active streams
|
|
if state.active_streams > 0:
|
|
# There are active streams - check ownership
|
|
if current_worker_id and state.worker_id == current_worker_id:
|
|
logger.info(f"[{self.session_id}] Active streams present ({state.active_streams}) and we own them - local cleanup only")
|
|
else:
|
|
logger.info(f"[{self.session_id}] Active streams present ({state.active_streams}) but owned by worker {state.worker_id} - local cleanup only")
|
|
return
|
|
|
|
# No active streams - we can clean up Redis state
|
|
if not self.redis_client:
|
|
logger.info(f"[{self.session_id}] No Redis client - local cleanup only")
|
|
return
|
|
|
|
# Acquire lock and do final check before cleanup to prevent race conditions
|
|
if not self._acquire_lock():
|
|
logger.warning(f"[{self.session_id}] Could not acquire lock for cleanup - skipping")
|
|
return
|
|
|
|
try:
|
|
# Re-check active streams with lock held to prevent race conditions
|
|
current_state = self._get_connection_state()
|
|
if not current_state:
|
|
logger.info(f"[{self.session_id}] Connection state no longer exists - cleanup already done")
|
|
return
|
|
|
|
if current_state.active_streams > 0:
|
|
logger.info(f"[{self.session_id}] Active streams now present ({current_state.active_streams}) - skipping cleanup")
|
|
return
|
|
|
|
# Use pipeline for atomic cleanup operations
|
|
pipe = self.redis_client.pipeline()
|
|
|
|
# 1. Remove main connection state (contains consolidated data)
|
|
pipe.delete(self.connection_key)
|
|
|
|
# 2. Remove distributed lock (will be released below anyway)
|
|
pipe.delete(self.lock_key)
|
|
|
|
# Execute all cleanup operations
|
|
pipe.execute()
|
|
|
|
logger.info(f"[{self.session_id}] Cleaned up Redis keys (verified no active streams)")
|
|
|
|
# Decrement profile connections if we have the state and connection manager
|
|
if state.m3u_profile_id and connection_manager:
|
|
connection_manager._decrement_profile_connections(state.m3u_profile_id)
|
|
logger.info(f"[{self.session_id}] Profile connection count decremented for profile {state.m3u_profile_id}")
|
|
else:
|
|
if not state.m3u_profile_id:
|
|
logger.warning(f"[{self.session_id}] No profile ID in connection state - cannot decrement profile connections")
|
|
elif not connection_manager:
|
|
logger.warning(f"[{self.session_id}] No connection manager provided - cannot decrement profile connections")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}")
|
|
finally:
|
|
# Always release the lock
|
|
self._release_lock()
|
|
|
|
|
|
# Modify the VODConnectionManager to use Redis-backed connections
|
|
class MultiWorkerVODConnectionManager:
|
|
"""Enhanced VOD Connection Manager that works across multiple uwsgi workers"""
|
|
|
|
_instance = None
|
|
|
|
@classmethod
|
|
def get_instance(cls):
|
|
"""Get the singleton instance"""
|
|
if cls._instance is None:
|
|
cls._instance = cls()
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
self.redis_client = RedisClient.get_client()
|
|
self.connection_ttl = 3600 # 1 hour TTL for connections
|
|
self.session_ttl = 1800 # 30 minutes TTL for sessions
|
|
self.worker_id = self._get_worker_id()
|
|
logger.info(f"MultiWorkerVODConnectionManager initialized for worker {self.worker_id}")
|
|
|
|
def _get_worker_id(self):
|
|
"""Get unique worker ID for this process"""
|
|
import os
|
|
import socket
|
|
try:
|
|
# Use combination of hostname and PID for unique worker ID
|
|
return f"{socket.gethostname()}-{os.getpid()}"
|
|
except:
|
|
import random
|
|
return f"worker-{random.randint(1000, 9999)}"
|
|
|
|
def _get_profile_connections_key(self, profile_id: int) -> str:
|
|
"""Get Redis key for tracking connections per profile - STANDARDIZED with TS proxy"""
|
|
return f"profile_connections:{profile_id}"
|
|
|
|
def _check_profile_limits(self, m3u_profile) -> bool:
|
|
"""Check if profile has available connection slots"""
|
|
if m3u_profile.max_streams == 0: # Unlimited
|
|
return True
|
|
|
|
try:
|
|
profile_connections_key = self._get_profile_connections_key(m3u_profile.id)
|
|
current_connections = int(self.redis_client.get(profile_connections_key) or 0)
|
|
|
|
logger.info(f"[PROFILE-CHECK] Profile {m3u_profile.id} has {current_connections}/{m3u_profile.max_streams} connections")
|
|
return current_connections < m3u_profile.max_streams
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking profile limits: {e}")
|
|
return False
|
|
|
|
def _increment_profile_connections(self, m3u_profile):
|
|
"""Increment profile connection count"""
|
|
try:
|
|
profile_connections_key = self._get_profile_connections_key(m3u_profile.id)
|
|
new_count = self.redis_client.incr(profile_connections_key)
|
|
logger.info(f"[PROFILE-INCR] Profile {m3u_profile.id} connections: {new_count}")
|
|
return new_count
|
|
except Exception as e:
|
|
logger.error(f"Error incrementing profile connections: {e}")
|
|
return None
|
|
|
|
def _decrement_profile_connections(self, m3u_profile_id: int):
|
|
"""Decrement profile connection count"""
|
|
try:
|
|
profile_connections_key = self._get_profile_connections_key(m3u_profile_id)
|
|
current_count = int(self.redis_client.get(profile_connections_key) or 0)
|
|
if current_count > 0:
|
|
new_count = self.redis_client.decr(profile_connections_key)
|
|
logger.info(f"[PROFILE-DECR] Profile {m3u_profile_id} connections: {new_count}")
|
|
return new_count
|
|
else:
|
|
logger.warning(f"[PROFILE-DECR] Profile {m3u_profile_id} already at 0 connections")
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error decrementing profile connections: {e}")
|
|
return None
|
|
|
|
def stream_content_with_session(self, session_id, content_obj, stream_url, m3u_profile,
|
|
client_ip, client_user_agent, request,
|
|
utc_start=None, utc_end=None, offset=None, range_header=None):
|
|
"""Stream content with Redis-backed persistent connection"""
|
|
|
|
# Generate client ID
|
|
content_type = "movie" if isinstance(content_obj, Movie) else "episode"
|
|
content_uuid = str(content_obj.uuid)
|
|
content_name = content_obj.name if hasattr(content_obj, 'name') else str(content_obj)
|
|
client_id = session_id
|
|
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed streaming request for {content_type} {content_name}")
|
|
|
|
try:
|
|
# First, try to find an existing idle session that matches our criteria
|
|
matching_session_id = self.find_matching_idle_session(
|
|
content_type=content_type,
|
|
content_uuid=content_uuid,
|
|
client_ip=client_ip,
|
|
client_user_agent=client_user_agent,
|
|
utc_start=utc_start,
|
|
utc_end=utc_end,
|
|
offset=offset
|
|
)
|
|
|
|
# Use matching session if found, otherwise use the provided session_id
|
|
if matching_session_id:
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Found matching idle session: {matching_session_id}")
|
|
effective_session_id = matching_session_id
|
|
client_id = matching_session_id # Update client_id for logging consistency
|
|
|
|
# IMMEDIATELY reserve this session by incrementing active streams to prevent cleanup
|
|
temp_connection = RedisBackedVODConnection(effective_session_id, self.redis_client)
|
|
if temp_connection.increment_active_streams():
|
|
logger.info(f"[{client_id}] Reserved idle session - incremented active streams")
|
|
else:
|
|
logger.warning(f"[{client_id}] Failed to reserve idle session - falling back to new session")
|
|
effective_session_id = session_id
|
|
matching_session_id = None # Clear the match so we create a new connection
|
|
else:
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - No matching idle session found, using new session")
|
|
effective_session_id = session_id
|
|
|
|
# Create Redis-backed connection
|
|
redis_connection = RedisBackedVODConnection(effective_session_id, self.redis_client)
|
|
|
|
# Check if connection exists, create if not
|
|
existing_state = redis_connection._get_connection_state()
|
|
if not existing_state:
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Creating new Redis-backed connection")
|
|
|
|
# Check profile limits before creating new connection
|
|
if not self._check_profile_limits(m3u_profile):
|
|
logger.warning(f"[{client_id}] Profile {m3u_profile.name} connection limit exceeded")
|
|
return HttpResponse("Connection limit exceeded for profile", status=429)
|
|
|
|
# Apply timeshift parameters
|
|
modified_stream_url = self._apply_timeshift_parameters(stream_url, utc_start, utc_end, offset)
|
|
|
|
# Prepare headers for provider request
|
|
headers = {}
|
|
# Use M3U account's user-agent for provider requests, not client's user-agent
|
|
m3u_user_agent = m3u_profile.m3u_account.get_user_agent()
|
|
if m3u_user_agent:
|
|
headers['User-Agent'] = m3u_user_agent.user_agent
|
|
logger.info(f"[{client_id}] Using M3U account user-agent: {m3u_user_agent.user_agent}")
|
|
elif client_user_agent:
|
|
# Fallback to client's user-agent if M3U doesn't have one
|
|
headers['User-Agent'] = client_user_agent
|
|
logger.info(f"[{client_id}] Using client user-agent (M3U fallback): {client_user_agent}")
|
|
else:
|
|
logger.warning(f"[{client_id}] No user-agent available (neither M3U nor client)")
|
|
|
|
# Forward important headers from request
|
|
important_headers = ['authorization', 'referer', 'origin', 'accept']
|
|
for header_name in important_headers:
|
|
django_header = f'HTTP_{header_name.upper().replace("-", "_")}'
|
|
if hasattr(request, 'META') and django_header in request.META:
|
|
headers[header_name] = request.META[django_header]
|
|
|
|
# Create connection state in Redis with consolidated session metadata
|
|
if not redis_connection.create_connection(
|
|
stream_url=modified_stream_url,
|
|
headers=headers,
|
|
m3u_profile_id=m3u_profile.id,
|
|
# Session metadata (consolidated from separate vod_session key)
|
|
content_obj_type=content_type,
|
|
content_uuid=content_uuid,
|
|
content_name=content_name,
|
|
client_ip=client_ip,
|
|
client_user_agent=client_user_agent,
|
|
utc_start=utc_start,
|
|
utc_end=utc_end,
|
|
offset=str(offset) if offset else None,
|
|
worker_id=self.worker_id
|
|
):
|
|
logger.error(f"[{client_id}] Worker {self.worker_id} - Failed to create Redis connection")
|
|
return HttpResponse("Failed to create connection", status=500)
|
|
|
|
# Increment profile connections after successful connection creation
|
|
self._increment_profile_connections(m3u_profile)
|
|
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Created consolidated connection with session metadata")
|
|
else:
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Using existing Redis-backed connection")
|
|
|
|
# Transfer ownership to current worker and update session activity
|
|
if redis_connection._acquire_lock():
|
|
try:
|
|
state = redis_connection._get_connection_state()
|
|
if state:
|
|
old_worker = state.worker_id
|
|
state.last_activity = time.time()
|
|
state.worker_id = self.worker_id # Transfer ownership to current worker
|
|
redis_connection._save_connection_state(state)
|
|
|
|
if old_worker != self.worker_id:
|
|
logger.info(f"[{client_id}] Ownership transferred from worker {old_worker} to {self.worker_id}")
|
|
else:
|
|
logger.debug(f"[{client_id}] Worker {self.worker_id} retaining ownership")
|
|
finally:
|
|
redis_connection._release_lock()
|
|
|
|
# Get stream from Redis-backed connection
|
|
upstream_response = redis_connection.get_stream(range_header)
|
|
|
|
if upstream_response is None:
|
|
logger.warning(f"[{client_id}] Worker {self.worker_id} - Range not satisfiable")
|
|
return HttpResponse("Requested Range Not Satisfiable", status=416)
|
|
|
|
# Get connection headers
|
|
connection_headers = redis_connection.get_headers()
|
|
|
|
# Create streaming generator
|
|
def stream_generator():
|
|
decremented = False
|
|
stop_signal_detected = False
|
|
try:
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Starting Redis-backed stream")
|
|
|
|
# Increment active streams (unless we already did it for session reuse)
|
|
if not matching_session_id:
|
|
# New session - increment active streams
|
|
redis_connection.increment_active_streams()
|
|
else:
|
|
# Reused session - we already incremented when reserving the session
|
|
logger.debug(f"[{client_id}] Using pre-reserved session - active streams already incremented")
|
|
|
|
bytes_sent = 0
|
|
chunk_count = 0
|
|
|
|
# Get the stop signal key for this client
|
|
stop_key = get_vod_client_stop_key(client_id)
|
|
|
|
for chunk in upstream_response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
yield chunk
|
|
bytes_sent += len(chunk)
|
|
chunk_count += 1
|
|
|
|
# Check for stop signal every 100 chunks
|
|
if chunk_count % 100 == 0:
|
|
# Check if stop signal has been set
|
|
if self.redis_client and self.redis_client.exists(stop_key):
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Stop signal detected, terminating stream")
|
|
# Delete the stop key
|
|
self.redis_client.delete(stop_key)
|
|
stop_signal_detected = True
|
|
break
|
|
|
|
# Update the connection state
|
|
logger.debug(f"Client: [{client_id}] Worker: {self.worker_id} sent {chunk_count} chunks for VOD: {content_name}")
|
|
if redis_connection._acquire_lock():
|
|
try:
|
|
state = redis_connection._get_connection_state()
|
|
if state:
|
|
state.last_activity = time.time()
|
|
# Store cumulative bytes sent in connection state
|
|
state.bytes_sent = bytes_sent # Use cumulative bytes_sent, not chunk size
|
|
redis_connection._save_connection_state(state)
|
|
finally:
|
|
redis_connection._release_lock()
|
|
|
|
if stop_signal_detected:
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Stream stopped by signal: {bytes_sent} bytes sent")
|
|
else:
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed stream completed: {bytes_sent} bytes sent")
|
|
redis_connection.decrement_active_streams()
|
|
decremented = True
|
|
|
|
# Schedule smart cleanup if no active streams after normal completion
|
|
if not redis_connection.has_active_streams():
|
|
def delayed_cleanup():
|
|
time.sleep(1) # Wait 1 second
|
|
# Smart cleanup: check active streams and ownership
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Checking for smart cleanup after normal completion")
|
|
redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id)
|
|
|
|
import threading
|
|
cleanup_thread = threading.Thread(target=delayed_cleanup)
|
|
cleanup_thread.daemon = True
|
|
cleanup_thread.start()
|
|
|
|
except GeneratorExit:
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Client disconnected from Redis-backed stream")
|
|
if not decremented:
|
|
redis_connection.decrement_active_streams()
|
|
decremented = True
|
|
|
|
# Schedule smart cleanup if no active streams
|
|
if not redis_connection.has_active_streams():
|
|
def delayed_cleanup():
|
|
time.sleep(1) # Wait 1 second
|
|
# Smart cleanup: check active streams and ownership
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Checking for smart cleanup after client disconnect")
|
|
redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id)
|
|
|
|
import threading
|
|
cleanup_thread = threading.Thread(target=delayed_cleanup)
|
|
cleanup_thread.daemon = True
|
|
cleanup_thread.start()
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{client_id}] Worker {self.worker_id} - Error in Redis-backed stream: {e}")
|
|
if not decremented:
|
|
redis_connection.decrement_active_streams()
|
|
decremented = True
|
|
# Smart cleanup on error - immediate cleanup since we're in error state
|
|
redis_connection.cleanup(connection_manager=self, current_worker_id=self.worker_id)
|
|
yield b"Error: Stream interrupted"
|
|
|
|
finally:
|
|
if not decremented:
|
|
redis_connection.decrement_active_streams()
|
|
|
|
# Create streaming response
|
|
response = StreamingHttpResponse(
|
|
streaming_content=stream_generator(),
|
|
content_type=connection_headers.get('content_type', 'video/mp4')
|
|
)
|
|
|
|
# Set appropriate status code
|
|
response.status_code = 206 if range_header else 200
|
|
|
|
# Set required headers
|
|
response['Cache-Control'] = 'no-cache'
|
|
response['Pragma'] = 'no-cache'
|
|
response['X-Content-Type-Options'] = 'nosniff'
|
|
response['Connection'] = 'keep-alive'
|
|
response['X-Worker-ID'] = self.worker_id # Identify which worker served this
|
|
|
|
if connection_headers.get('content_length'):
|
|
response['Accept-Ranges'] = 'bytes'
|
|
|
|
# For range requests, Content-Length should be the partial content size, not full file size
|
|
if range_header and 'bytes=' in range_header:
|
|
try:
|
|
range_part = range_header.replace('bytes=', '')
|
|
if '-' in range_part:
|
|
start_byte, end_byte = range_part.split('-', 1)
|
|
start = int(start_byte) if start_byte else 0
|
|
|
|
# Get the FULL content size from the connection state (from initial request)
|
|
state = redis_connection._get_connection_state()
|
|
if state and state.content_length:
|
|
full_content_size = int(state.content_length)
|
|
end = int(end_byte) if end_byte else full_content_size - 1
|
|
|
|
# Calculate partial content size for Content-Length header
|
|
partial_content_size = end - start + 1
|
|
response['Content-Length'] = str(partial_content_size)
|
|
|
|
# Content-Range should show full file size per HTTP standards
|
|
content_range = f"bytes {start}-{end}/{full_content_size}"
|
|
response['Content-Range'] = content_range
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Set Content-Range: {content_range}, Content-Length: {partial_content_size}")
|
|
|
|
# Store range information for the VOD stats API to calculate position
|
|
if start > 0:
|
|
try:
|
|
position_percentage = (start / full_content_size) * 100
|
|
current_timestamp = time.time()
|
|
|
|
# Update the Redis connection state with seek information
|
|
if redis_connection._acquire_lock():
|
|
try:
|
|
# Refresh state in case it changed
|
|
state = redis_connection._get_connection_state()
|
|
if state:
|
|
# Store range/seek information for stats API
|
|
state.last_seek_byte = start
|
|
state.last_seek_percentage = position_percentage
|
|
state.total_content_size = full_content_size
|
|
state.last_seek_timestamp = current_timestamp
|
|
state.last_activity = current_timestamp
|
|
redis_connection._save_connection_state(state)
|
|
logger.info(f"[{client_id}] *** SEEK INFO STORED *** {position_percentage:.1f}% at byte {start:,}/{full_content_size:,} (timestamp: {current_timestamp})")
|
|
finally:
|
|
redis_connection._release_lock()
|
|
else:
|
|
logger.warning(f"[{client_id}] Could not acquire lock to update seek info")
|
|
except Exception as pos_e:
|
|
logger.error(f"[{client_id}] Error storing seek info: {pos_e}")
|
|
else:
|
|
# Fallback to partial content size if full size not available
|
|
partial_size = int(connection_headers['content_length'])
|
|
end = int(end_byte) if end_byte else partial_size - 1
|
|
content_range = f"bytes {start}-{end}/{partial_size}"
|
|
response['Content-Range'] = content_range
|
|
response['Content-Length'] = str(end - start + 1)
|
|
logger.warning(f"[{client_id}] Using partial content size for Content-Range (full size not available): {content_range}")
|
|
except Exception as e:
|
|
logger.warning(f"[{client_id}] Worker {self.worker_id} - Could not set Content-Range: {e}")
|
|
response['Content-Length'] = connection_headers['content_length']
|
|
else:
|
|
# For non-range requests, use the full content length
|
|
response['Content-Length'] = connection_headers['content_length']
|
|
|
|
logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed response ready (status: {response.status_code})")
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{client_id}] Worker {self.worker_id} - Error in Redis-backed stream_content_with_session: {e}", exc_info=True)
|
|
return HttpResponse(f"Streaming error: {str(e)}", status=500)
|
|
|
|
def _apply_timeshift_parameters(self, original_url, utc_start=None, utc_end=None, offset=None):
|
|
"""Apply timeshift parameters to URL"""
|
|
if not any([utc_start, utc_end, offset]):
|
|
return original_url
|
|
|
|
try:
|
|
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
|
|
|
|
parsed_url = urlparse(original_url)
|
|
query_params = parse_qs(parsed_url.query)
|
|
path = parsed_url.path
|
|
|
|
logger.info(f"Applying timeshift parameters: utc_start={utc_start}, utc_end={utc_end}, offset={offset}")
|
|
|
|
# Add timeshift parameters
|
|
if utc_start:
|
|
query_params['utc_start'] = [utc_start]
|
|
query_params['start'] = [utc_start]
|
|
logger.info(f"Added utc_start/start parameter: {utc_start}")
|
|
|
|
if utc_end:
|
|
query_params['utc_end'] = [utc_end]
|
|
query_params['end'] = [utc_end]
|
|
logger.info(f"Added utc_end/end parameter: {utc_end}")
|
|
|
|
if offset:
|
|
try:
|
|
offset_seconds = int(offset)
|
|
query_params['offset'] = [str(offset_seconds)]
|
|
query_params['seek'] = [str(offset_seconds)]
|
|
query_params['t'] = [str(offset_seconds)]
|
|
logger.info(f"Added offset/seek/t parameter: {offset_seconds}")
|
|
except ValueError:
|
|
logger.warning(f"Invalid offset value: {offset}")
|
|
|
|
# Handle special catchup URL patterns
|
|
if utc_start:
|
|
try:
|
|
from datetime import datetime
|
|
import re
|
|
|
|
# Parse the UTC start time
|
|
start_dt = datetime.fromisoformat(utc_start.replace('Z', '+00:00'))
|
|
|
|
# Check for catchup URL patterns like /catchup/YYYY-MM-DD/HH-MM-SS/
|
|
catchup_pattern = r'/catchup/\d{4}-\d{2}-\d{2}/\d{2}-\d{2}-\d{2}/'
|
|
if re.search(catchup_pattern, path):
|
|
# Replace the date/time in the path
|
|
date_part = start_dt.strftime('%Y-%m-%d')
|
|
time_part = start_dt.strftime('%H-%M-%S')
|
|
|
|
path = re.sub(catchup_pattern, f'/catchup/{date_part}/{time_part}/', path)
|
|
logger.info(f"Modified catchup path: {path}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not parse timeshift date: {e}")
|
|
|
|
# Reconstruct URL
|
|
new_query = urlencode(query_params, doseq=True)
|
|
modified_url = urlunparse((
|
|
parsed_url.scheme,
|
|
parsed_url.netloc,
|
|
path,
|
|
parsed_url.params,
|
|
new_query,
|
|
parsed_url.fragment
|
|
))
|
|
|
|
logger.info(f"Modified URL: {modified_url}")
|
|
return modified_url
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying timeshift parameters: {e}")
|
|
return original_url
|
|
|
|
def cleanup_persistent_connection(self, session_id: str):
|
|
"""Clean up a specific Redis-backed persistent connection"""
|
|
logger.info(f"[{session_id}] Cleaning up Redis-backed persistent connection")
|
|
|
|
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
|
|
redis_connection.cleanup(connection_manager=self)
|
|
|
|
# The cleanup method now handles all Redis keys including session data
|
|
|
|
def cleanup_stale_persistent_connections(self, max_age_seconds: int = 1800):
|
|
"""Clean up stale Redis-backed persistent connections"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
try:
|
|
logger.info(f"Cleaning up Redis-backed connections older than {max_age_seconds} seconds")
|
|
|
|
# Find all persistent connection keys
|
|
pattern = "vod_persistent_connection:*"
|
|
cursor = 0
|
|
cleanup_count = 0
|
|
current_time = time.time()
|
|
|
|
while True:
|
|
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
|
|
|
|
for key in keys:
|
|
try:
|
|
# Get connection state
|
|
data = self.redis_client.hgetall(key)
|
|
if not data:
|
|
continue
|
|
|
|
# Convert bytes to strings if needed
|
|
if isinstance(list(data.keys())[0], bytes):
|
|
data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data.items()}
|
|
|
|
last_activity = float(data.get('last_activity', 0))
|
|
active_streams = int(data.get('active_streams', 0))
|
|
|
|
# Clean up if stale and no active streams
|
|
if (current_time - last_activity > max_age_seconds) and active_streams == 0:
|
|
session_id = key.decode('utf-8').replace('vod_persistent_connection:', '')
|
|
logger.info(f"Cleaning up stale connection: {session_id}")
|
|
|
|
# Clean up connection and related keys
|
|
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
|
|
redis_connection.cleanup(connection_manager=self)
|
|
cleanup_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing connection key {key}: {e}")
|
|
continue
|
|
|
|
if cursor == 0:
|
|
break
|
|
|
|
if cleanup_count > 0:
|
|
logger.info(f"Cleaned up {cleanup_count} stale Redis-backed connections")
|
|
else:
|
|
logger.debug("No stale Redis-backed connections found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during Redis-backed connection cleanup: {e}")
|
|
|
|
def create_connection(self, content_type: str, content_uuid: str, content_name: str,
|
|
client_id: str, client_ip: str, user_agent: str,
|
|
m3u_profile: M3UAccountProfile) -> bool:
|
|
"""Create connection tracking in Redis (same as original but for Redis-backed connections)"""
|
|
if not self.redis_client:
|
|
logger.error("Redis client not available for VOD connection tracking")
|
|
return False
|
|
|
|
try:
|
|
# Check profile connection limits
|
|
profile_connections_key = f"profile_connections:{m3u_profile.id}"
|
|
current_connections = self.redis_client.get(profile_connections_key)
|
|
max_connections = getattr(m3u_profile, 'max_connections', 3) # Default to 3
|
|
|
|
if current_connections and int(current_connections) >= max_connections:
|
|
logger.warning(f"Profile {m3u_profile.name} connection limit exceeded ({current_connections}/{max_connections})")
|
|
return False
|
|
|
|
# Create connection tracking
|
|
connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}"
|
|
content_connections_key = f"vod_proxy:content:{content_type}:{content_uuid}:connections"
|
|
|
|
# Check if connection already exists
|
|
if self.redis_client.exists(connection_key):
|
|
logger.info(f"Connection already exists for {client_id} - {content_type} {content_name}")
|
|
self.redis_client.hset(connection_key, "last_activity", str(time.time()))
|
|
return True
|
|
|
|
# Connection data
|
|
connection_data = {
|
|
"content_type": content_type,
|
|
"content_uuid": content_uuid,
|
|
"content_name": content_name,
|
|
"client_id": client_id,
|
|
"client_ip": client_ip,
|
|
"user_agent": user_agent,
|
|
"m3u_profile_id": m3u_profile.id,
|
|
"m3u_profile_name": m3u_profile.name,
|
|
"connected_at": str(time.time()),
|
|
"last_activity": str(time.time()),
|
|
"bytes_sent": "0",
|
|
"position_seconds": "0"
|
|
}
|
|
|
|
# Use pipeline for atomic operations
|
|
pipe = self.redis_client.pipeline()
|
|
pipe.hset(connection_key, mapping=connection_data)
|
|
pipe.expire(connection_key, self.connection_ttl)
|
|
pipe.incr(profile_connections_key)
|
|
pipe.sadd(content_connections_key, client_id)
|
|
pipe.expire(content_connections_key, self.connection_ttl)
|
|
pipe.execute()
|
|
|
|
logger.info(f"Created Redis-backed VOD connection: {client_id} for {content_type} {content_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating Redis-backed connection: {e}")
|
|
return False
|
|
|
|
def remove_connection(self, content_type: str, content_uuid: str, client_id: str):
|
|
"""Remove connection tracking from Redis"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
try:
|
|
connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}"
|
|
content_connections_key = f"vod_proxy:content:{content_type}:{content_uuid}:connections"
|
|
|
|
# Get connection data to find profile
|
|
connection_data = self.redis_client.hgetall(connection_key)
|
|
if connection_data:
|
|
# Convert bytes to strings if needed
|
|
if isinstance(list(connection_data.keys())[0], bytes):
|
|
connection_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in connection_data.items()}
|
|
|
|
profile_id = connection_data.get('m3u_profile_id')
|
|
if profile_id:
|
|
profile_connections_key = f"profile_connections:{profile_id}"
|
|
|
|
# Use pipeline for atomic operations
|
|
pipe = self.redis_client.pipeline()
|
|
pipe.delete(connection_key)
|
|
pipe.srem(content_connections_key, client_id)
|
|
pipe.decr(profile_connections_key)
|
|
pipe.execute()
|
|
|
|
logger.info(f"Removed Redis-backed connection: {client_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error removing Redis-backed connection: {e}")
|
|
|
|
def update_connection_activity(self, content_type: str, content_uuid: str,
|
|
client_id: str, bytes_sent: int):
|
|
"""Update connection activity in Redis"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
try:
|
|
connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}"
|
|
pipe = self.redis_client.pipeline()
|
|
pipe.hset(connection_key, mapping={
|
|
"last_activity": str(time.time()),
|
|
"bytes_sent": str(bytes_sent)
|
|
})
|
|
pipe.expire(connection_key, self.connection_ttl)
|
|
pipe.execute()
|
|
except Exception as e:
|
|
logger.error(f"Error updating connection activity: {e}")
|
|
|
|
def find_matching_idle_session(self, content_type: str, content_uuid: str,
|
|
client_ip: str, client_user_agent: str,
|
|
utc_start=None, utc_end=None, offset=None) -> Optional[str]:
|
|
"""Find existing Redis-backed session that matches criteria using consolidated connection state"""
|
|
if not self.redis_client:
|
|
return None
|
|
|
|
try:
|
|
# Search for connections with consolidated session data
|
|
pattern = "vod_persistent_connection:*"
|
|
cursor = 0
|
|
matching_sessions = []
|
|
|
|
while True:
|
|
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
|
|
|
|
for key in keys:
|
|
try:
|
|
connection_data = self.redis_client.hgetall(key)
|
|
if not connection_data:
|
|
continue
|
|
|
|
# Convert bytes keys/values to strings if needed
|
|
if isinstance(list(connection_data.keys())[0], bytes):
|
|
connection_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in connection_data.items()}
|
|
|
|
# Check if content matches (using consolidated data)
|
|
stored_content_type = connection_data.get('content_obj_type', '')
|
|
stored_content_uuid = connection_data.get('content_uuid', '')
|
|
|
|
if stored_content_type != content_type or stored_content_uuid != content_uuid:
|
|
continue
|
|
|
|
# Extract session ID
|
|
session_id = key.decode('utf-8').replace('vod_persistent_connection:', '')
|
|
|
|
# Check if Redis-backed connection exists and has no active streams
|
|
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
|
|
if redis_connection.has_active_streams():
|
|
continue
|
|
|
|
# Calculate match score
|
|
score = 10 # Content match
|
|
match_reasons = ["content"]
|
|
|
|
# Check other criteria (using consolidated data)
|
|
stored_client_ip = connection_data.get('client_ip', '')
|
|
stored_user_agent = connection_data.get('client_user_agent', '') or connection_data.get('user_agent', '')
|
|
|
|
if stored_client_ip and stored_client_ip == client_ip:
|
|
score += 5
|
|
match_reasons.append("ip")
|
|
|
|
if stored_user_agent and stored_user_agent == client_user_agent:
|
|
score += 3
|
|
match_reasons.append("user-agent")
|
|
|
|
# Check timeshift parameters (using consolidated data)
|
|
stored_utc_start = connection_data.get('utc_start', '')
|
|
stored_utc_end = connection_data.get('utc_end', '')
|
|
stored_offset = connection_data.get('offset', '')
|
|
|
|
current_utc_start = utc_start or ""
|
|
current_utc_end = utc_end or ""
|
|
current_offset = str(offset) if offset else ""
|
|
|
|
if (stored_utc_start == current_utc_start and
|
|
stored_utc_end == current_utc_end and
|
|
stored_offset == current_offset):
|
|
score += 7
|
|
match_reasons.append("timeshift")
|
|
|
|
if score >= 13: # Good match threshold
|
|
matching_sessions.append({
|
|
'session_id': session_id,
|
|
'score': score,
|
|
'reasons': match_reasons,
|
|
'last_activity': float(connection_data.get('last_activity', '0'))
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error processing connection key {key}: {e}")
|
|
continue
|
|
|
|
if cursor == 0:
|
|
break
|
|
|
|
# Sort by score and last activity
|
|
matching_sessions.sort(key=lambda x: (x['score'], x['last_activity']), reverse=True)
|
|
|
|
if matching_sessions:
|
|
best_match = matching_sessions[0]
|
|
logger.info(f"Found matching Redis-backed idle session: {best_match['session_id']} "
|
|
f"(score: {best_match['score']}, reasons: {', '.join(best_match['reasons'])})")
|
|
return best_match['session_id']
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error finding matching idle session: {e}")
|
|
return None
|
|
|
|
def get_session_info(self, session_id: str) -> Optional[dict]:
|
|
"""Get session information from consolidated connection state (compatibility method)"""
|
|
if not self.redis_client:
|
|
return None
|
|
|
|
try:
|
|
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
|
|
return redis_connection.get_session_metadata()
|
|
except Exception as e:
|
|
logger.error(f"Error getting session info for {session_id}: {e}")
|
|
return None |