mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 10:45:27 +00:00
Use redis to track provider connections to work with multi-worker uwsgi.
This commit is contained in:
parent
d04fdb6f69
commit
97b82e5520
3 changed files with 939 additions and 4 deletions
931
apps/proxy/vod_proxy/multi_worker_connection_manager.py
Normal file
931
apps/proxy/vod_proxy/multi_worker_connection_manager.py
Normal file
|
|
@ -0,0 +1,931 @@
|
|||
"""
|
||||
Enhanced VOD Connection Manager with Redis-based connection sharing for multi-worker environments
|
||||
"""
|
||||
|
||||
import time
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import random
|
||||
import re
|
||||
import requests
|
||||
import pickle
|
||||
import base64
|
||||
from typing import Optional, Dict, Any
|
||||
from django.http import StreamingHttpResponse, HttpResponse
|
||||
from core.utils import RedisClient
|
||||
from apps.vod.models import Movie, Episode
|
||||
from apps.m3u.models import M3UAccountProfile
|
||||
|
||||
logger = logging.getLogger("vod_proxy")
|
||||
|
||||
|
||||
class SerializableConnectionState:
|
||||
"""Serializable connection state that can be stored in Redis"""
|
||||
|
||||
def __init__(self, session_id: str, stream_url: str, headers: dict,
|
||||
content_length: str = None, content_type: str = 'video/mp4',
|
||||
final_url: str = None):
|
||||
self.session_id = session_id
|
||||
self.stream_url = stream_url
|
||||
self.headers = headers
|
||||
self.content_length = content_length
|
||||
self.content_type = content_type
|
||||
self.final_url = final_url
|
||||
self.last_activity = time.time()
|
||||
self.request_count = 0
|
||||
self.active_streams = 0
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert to dictionary for Redis storage"""
|
||||
return {
|
||||
'session_id': self.session_id or '',
|
||||
'stream_url': self.stream_url or '',
|
||||
'headers': json.dumps(self.headers or {}),
|
||||
'content_length': str(self.content_length) if self.content_length is not None else '',
|
||||
'content_type': self.content_type or 'video/mp4',
|
||||
'final_url': self.final_url or '',
|
||||
'last_activity': str(self.last_activity),
|
||||
'request_count': str(self.request_count),
|
||||
'active_streams': str(self.active_streams)
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict):
|
||||
"""Create from dictionary loaded from Redis"""
|
||||
obj = cls(
|
||||
session_id=data['session_id'],
|
||||
stream_url=data['stream_url'],
|
||||
headers=json.loads(data['headers']) if data['headers'] else {},
|
||||
content_length=data.get('content_length') if data.get('content_length') else None,
|
||||
content_type=data.get('content_type', 'video/mp4'),
|
||||
final_url=data.get('final_url') if data.get('final_url') else None
|
||||
)
|
||||
obj.last_activity = float(data.get('last_activity', time.time()))
|
||||
obj.request_count = int(data.get('request_count', 0))
|
||||
obj.active_streams = int(data.get('active_streams', 0))
|
||||
return obj
|
||||
|
||||
|
||||
class RedisBackedVODConnection:
|
||||
"""Redis-backed VOD connection that can be accessed from any worker"""
|
||||
|
||||
def __init__(self, session_id: str, redis_client=None):
|
||||
self.session_id = session_id
|
||||
self.redis_client = redis_client or RedisClient.get_client()
|
||||
self.connection_key = f"vod_persistent_connection:{session_id}"
|
||||
self.lock_key = f"vod_connection_lock:{session_id}"
|
||||
self.local_session = None # Local requests session
|
||||
self.local_response = None # Local current response
|
||||
|
||||
def _get_connection_state(self) -> Optional[SerializableConnectionState]:
|
||||
"""Get connection state from Redis"""
|
||||
if not self.redis_client:
|
||||
return None
|
||||
|
||||
try:
|
||||
data = self.redis_client.hgetall(self.connection_key)
|
||||
if not data:
|
||||
return None
|
||||
|
||||
# Convert bytes keys/values to strings if needed
|
||||
if isinstance(list(data.keys())[0], bytes):
|
||||
data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data.items()}
|
||||
|
||||
return SerializableConnectionState.from_dict(data)
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.session_id}] Error getting connection state from Redis: {e}")
|
||||
return None
|
||||
|
||||
def _save_connection_state(self, state: SerializableConnectionState):
|
||||
"""Save connection state to Redis"""
|
||||
if not self.redis_client:
|
||||
return False
|
||||
|
||||
try:
|
||||
data = state.to_dict()
|
||||
# Log the data being saved for debugging
|
||||
logger.debug(f"[{self.session_id}] Saving connection state: {data}")
|
||||
|
||||
# Verify all values are valid for Redis
|
||||
for key, value in data.items():
|
||||
if value is None:
|
||||
logger.error(f"[{self.session_id}] None value found for key '{key}' - this should not happen")
|
||||
return False
|
||||
|
||||
self.redis_client.hset(self.connection_key, mapping=data)
|
||||
self.redis_client.expire(self.connection_key, 3600) # 1 hour TTL
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.session_id}] Error saving connection state to Redis: {e}")
|
||||
return False
|
||||
|
||||
def _acquire_lock(self, timeout: int = 10) -> bool:
|
||||
"""Acquire distributed lock for connection operations"""
|
||||
if not self.redis_client:
|
||||
return False
|
||||
|
||||
try:
|
||||
return self.redis_client.set(self.lock_key, "locked", nx=True, ex=timeout)
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.session_id}] Error acquiring lock: {e}")
|
||||
return False
|
||||
|
||||
def _release_lock(self):
|
||||
"""Release distributed lock"""
|
||||
if not self.redis_client:
|
||||
return
|
||||
|
||||
try:
|
||||
self.redis_client.delete(self.lock_key)
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.session_id}] Error releasing lock: {e}")
|
||||
|
||||
def create_connection(self, stream_url: str, headers: dict) -> bool:
|
||||
"""Create a new connection state in Redis"""
|
||||
if not self._acquire_lock():
|
||||
logger.warning(f"[{self.session_id}] Could not acquire lock for connection creation")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Check if connection already exists
|
||||
existing_state = self._get_connection_state()
|
||||
if existing_state:
|
||||
logger.info(f"[{self.session_id}] Connection already exists in Redis")
|
||||
return True
|
||||
|
||||
# Create new connection state
|
||||
state = SerializableConnectionState(self.session_id, stream_url, headers)
|
||||
success = self._save_connection_state(state)
|
||||
|
||||
if success:
|
||||
logger.info(f"[{self.session_id}] Created new connection state in Redis")
|
||||
|
||||
return success
|
||||
finally:
|
||||
self._release_lock()
|
||||
|
||||
def get_stream(self, range_header: str = None):
|
||||
"""Get stream with optional range header - works across workers"""
|
||||
# Get connection state from Redis
|
||||
state = self._get_connection_state()
|
||||
if not state:
|
||||
logger.error(f"[{self.session_id}] No connection state found in Redis")
|
||||
return None
|
||||
|
||||
# Update activity and increment request count
|
||||
state.last_activity = time.time()
|
||||
state.request_count += 1
|
||||
|
||||
try:
|
||||
# Create local session if needed
|
||||
if not self.local_session:
|
||||
self.local_session = requests.Session()
|
||||
|
||||
# Prepare headers
|
||||
headers = state.headers.copy()
|
||||
if range_header:
|
||||
# Validate range against content length if available
|
||||
if state.content_length:
|
||||
validated_range = self._validate_range_header(range_header, int(state.content_length))
|
||||
if validated_range is None:
|
||||
logger.warning(f"[{self.session_id}] Range not satisfiable: {range_header}")
|
||||
return None
|
||||
range_header = validated_range
|
||||
|
||||
headers['Range'] = range_header
|
||||
logger.info(f"[{self.session_id}] Setting Range header: {range_header}")
|
||||
|
||||
# Use final URL if available, otherwise original URL
|
||||
target_url = state.final_url if state.final_url else state.stream_url
|
||||
allow_redirects = not state.final_url # Only follow redirects if we don't have final URL
|
||||
|
||||
logger.info(f"[{self.session_id}] Making request #{state.request_count} to {'final' if state.final_url else 'original'} URL")
|
||||
|
||||
# Make request
|
||||
response = self.local_session.get(
|
||||
target_url,
|
||||
headers=headers,
|
||||
stream=True,
|
||||
timeout=(10, 30),
|
||||
allow_redirects=allow_redirects
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
# Update state with response info on first request
|
||||
if state.request_count == 1:
|
||||
if not state.content_length:
|
||||
state.content_length = response.headers.get('content-length')
|
||||
if not state.content_type:
|
||||
state.content_type = response.headers.get('content-type', 'video/mp4')
|
||||
if not state.final_url:
|
||||
state.final_url = response.url
|
||||
|
||||
logger.info(f"[{self.session_id}] Updated connection state: length={state.content_length}, type={state.content_type}")
|
||||
|
||||
# Save updated state
|
||||
self._save_connection_state(state)
|
||||
|
||||
self.local_response = response
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.session_id}] Error establishing connection: {e}")
|
||||
self.cleanup()
|
||||
raise
|
||||
|
||||
def _validate_range_header(self, range_header: str, content_length: int):
|
||||
"""Validate range header against content length"""
|
||||
try:
|
||||
if not range_header or not range_header.startswith('bytes='):
|
||||
return range_header
|
||||
|
||||
range_part = range_header.replace('bytes=', '')
|
||||
if '-' not in range_part:
|
||||
return range_header
|
||||
|
||||
start_str, end_str = range_part.split('-', 1)
|
||||
|
||||
# Parse start byte
|
||||
if start_str:
|
||||
start_byte = int(start_str)
|
||||
if start_byte >= content_length:
|
||||
return None # Not satisfiable
|
||||
else:
|
||||
start_byte = 0
|
||||
|
||||
# Parse end byte
|
||||
if end_str:
|
||||
end_byte = int(end_str)
|
||||
if end_byte >= content_length:
|
||||
end_byte = content_length - 1
|
||||
else:
|
||||
end_byte = content_length - 1
|
||||
|
||||
# Ensure start <= end
|
||||
if start_byte > end_byte:
|
||||
return None
|
||||
|
||||
return f"bytes={start_byte}-{end_byte}"
|
||||
|
||||
except (ValueError, IndexError) as e:
|
||||
logger.warning(f"[{self.session_id}] Could not validate range header {range_header}: {e}")
|
||||
return range_header
|
||||
|
||||
def increment_active_streams(self):
|
||||
"""Increment active streams count in Redis"""
|
||||
if not self._acquire_lock():
|
||||
return False
|
||||
|
||||
try:
|
||||
state = self._get_connection_state()
|
||||
if state:
|
||||
state.active_streams += 1
|
||||
state.last_activity = time.time()
|
||||
self._save_connection_state(state)
|
||||
logger.debug(f"[{self.session_id}] Active streams incremented to {state.active_streams}")
|
||||
return True
|
||||
return False
|
||||
finally:
|
||||
self._release_lock()
|
||||
|
||||
def decrement_active_streams(self):
|
||||
"""Decrement active streams count in Redis"""
|
||||
if not self._acquire_lock():
|
||||
return False
|
||||
|
||||
try:
|
||||
state = self._get_connection_state()
|
||||
if state and state.active_streams > 0:
|
||||
state.active_streams -= 1
|
||||
state.last_activity = time.time()
|
||||
self._save_connection_state(state)
|
||||
logger.debug(f"[{self.session_id}] Active streams decremented to {state.active_streams}")
|
||||
return True
|
||||
return False
|
||||
finally:
|
||||
self._release_lock()
|
||||
|
||||
def has_active_streams(self) -> bool:
|
||||
"""Check if connection has any active streams"""
|
||||
state = self._get_connection_state()
|
||||
return state.active_streams > 0 if state else False
|
||||
|
||||
def get_headers(self):
|
||||
"""Get headers for response"""
|
||||
state = self._get_connection_state()
|
||||
if state:
|
||||
return {
|
||||
'content_length': state.content_length,
|
||||
'content_type': state.content_type,
|
||||
'final_url': state.final_url
|
||||
}
|
||||
return {}
|
||||
|
||||
def cleanup(self):
|
||||
"""Clean up local resources and Redis state"""
|
||||
if self.local_response:
|
||||
self.local_response.close()
|
||||
self.local_response = None
|
||||
if self.local_session:
|
||||
self.local_session.close()
|
||||
self.local_session = None
|
||||
|
||||
# Remove from Redis
|
||||
if self.redis_client:
|
||||
try:
|
||||
self.redis_client.delete(self.connection_key)
|
||||
self.redis_client.delete(self.lock_key)
|
||||
logger.info(f"[{self.session_id}] Cleaned up Redis connection state")
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.session_id}] Error cleaning up Redis state: {e}")
|
||||
|
||||
|
||||
# Modify the VODConnectionManager to use Redis-backed connections
|
||||
class MultiWorkerVODConnectionManager:
|
||||
"""Enhanced VOD Connection Manager that works across multiple uwsgi workers"""
|
||||
|
||||
_instance = None
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls):
|
||||
"""Get the singleton instance"""
|
||||
if cls._instance is None:
|
||||
cls._instance = cls()
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
self.redis_client = RedisClient.get_client()
|
||||
self.connection_ttl = 3600 # 1 hour TTL for connections
|
||||
self.session_ttl = 1800 # 30 minutes TTL for sessions
|
||||
self.worker_id = self._get_worker_id()
|
||||
logger.info(f"MultiWorkerVODConnectionManager initialized for worker {self.worker_id}")
|
||||
|
||||
def _get_worker_id(self):
|
||||
"""Get unique worker ID for this process"""
|
||||
import os
|
||||
import socket
|
||||
try:
|
||||
# Use combination of hostname and PID for unique worker ID
|
||||
return f"{socket.gethostname()}-{os.getpid()}"
|
||||
except:
|
||||
import random
|
||||
return f"worker-{random.randint(1000, 9999)}"
|
||||
|
||||
def stream_content_with_session(self, session_id, content_obj, stream_url, m3u_profile,
|
||||
client_ip, user_agent, request,
|
||||
utc_start=None, utc_end=None, offset=None, range_header=None):
|
||||
"""Stream content with Redis-backed persistent connection"""
|
||||
|
||||
# Generate client ID
|
||||
content_type = "movie" if isinstance(content_obj, Movie) else "episode"
|
||||
content_uuid = str(content_obj.uuid)
|
||||
content_name = content_obj.name if hasattr(content_obj, 'name') else str(content_obj)
|
||||
client_id = session_id
|
||||
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed streaming request for {content_type} {content_name}")
|
||||
|
||||
try:
|
||||
# Create Redis-backed connection
|
||||
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
|
||||
|
||||
# Check if connection exists, create if not
|
||||
existing_state = redis_connection._get_connection_state()
|
||||
if not existing_state:
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Creating new Redis-backed connection")
|
||||
|
||||
# Apply timeshift parameters
|
||||
modified_stream_url = self._apply_timeshift_parameters(stream_url, utc_start, utc_end, offset)
|
||||
|
||||
# Prepare headers
|
||||
headers = {}
|
||||
if user_agent:
|
||||
headers['User-Agent'] = user_agent
|
||||
|
||||
# Forward important headers from request
|
||||
important_headers = ['authorization', 'x-forwarded-for', 'x-real-ip', 'referer', 'origin', 'accept']
|
||||
for header_name in important_headers:
|
||||
django_header = f'HTTP_{header_name.upper().replace("-", "_")}'
|
||||
if hasattr(request, 'META') and django_header in request.META:
|
||||
headers[header_name] = request.META[django_header]
|
||||
|
||||
# Add client IP
|
||||
if client_ip:
|
||||
headers['X-Forwarded-For'] = client_ip
|
||||
headers['X-Real-IP'] = client_ip
|
||||
|
||||
# Add worker identification
|
||||
headers['X-Worker-ID'] = self.worker_id
|
||||
|
||||
# Create connection state in Redis
|
||||
if not redis_connection.create_connection(modified_stream_url, headers):
|
||||
logger.error(f"[{client_id}] Worker {self.worker_id} - Failed to create Redis connection")
|
||||
return HttpResponse("Failed to create connection", status=500)
|
||||
|
||||
# Create session tracking
|
||||
session_info = {
|
||||
"content_type": content_type,
|
||||
"content_uuid": content_uuid,
|
||||
"content_name": content_name,
|
||||
"created_at": str(time.time()),
|
||||
"last_activity": str(time.time()),
|
||||
"profile_id": str(m3u_profile.id),
|
||||
"client_ip": client_ip,
|
||||
"user_agent": user_agent,
|
||||
"utc_start": utc_start or "",
|
||||
"utc_end": utc_end or "",
|
||||
"offset": str(offset) if offset else "",
|
||||
"worker_id": self.worker_id, # Track which worker created this
|
||||
"connection_type": "redis_backed"
|
||||
}
|
||||
|
||||
session_key = f"vod_session:{session_id}"
|
||||
if self.redis_client:
|
||||
self.redis_client.hset(session_key, mapping=session_info)
|
||||
self.redis_client.expire(session_key, self.session_ttl)
|
||||
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Created session: {session_info}")
|
||||
else:
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Using existing Redis-backed connection")
|
||||
|
||||
# Update session activity
|
||||
session_key = f"vod_session:{session_id}"
|
||||
if self.redis_client:
|
||||
self.redis_client.hset(session_key, mapping={
|
||||
"last_activity": str(time.time()),
|
||||
"last_worker_id": self.worker_id # Track which worker last accessed this
|
||||
})
|
||||
self.redis_client.expire(session_key, self.session_ttl)
|
||||
|
||||
# Get stream from Redis-backed connection
|
||||
upstream_response = redis_connection.get_stream(range_header)
|
||||
|
||||
if upstream_response is None:
|
||||
logger.warning(f"[{client_id}] Worker {self.worker_id} - Range not satisfiable")
|
||||
return HttpResponse("Requested Range Not Satisfiable", status=416)
|
||||
|
||||
# Get connection headers
|
||||
connection_headers = redis_connection.get_headers()
|
||||
|
||||
# Create streaming generator
|
||||
def stream_generator():
|
||||
decremented = False
|
||||
try:
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Starting Redis-backed stream")
|
||||
|
||||
# Increment active streams
|
||||
redis_connection.increment_active_streams()
|
||||
|
||||
bytes_sent = 0
|
||||
chunk_count = 0
|
||||
|
||||
for chunk in upstream_response.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
yield chunk
|
||||
bytes_sent += len(chunk)
|
||||
chunk_count += 1
|
||||
|
||||
# Update activity every 100 chunks
|
||||
if chunk_count % 100 == 0:
|
||||
self.update_connection_activity(
|
||||
content_type=content_type,
|
||||
content_uuid=content_uuid,
|
||||
client_id=client_id,
|
||||
bytes_sent=len(chunk)
|
||||
)
|
||||
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed stream completed: {bytes_sent} bytes sent")
|
||||
redis_connection.decrement_active_streams()
|
||||
decremented = True
|
||||
|
||||
except GeneratorExit:
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Client disconnected from Redis-backed stream")
|
||||
if not decremented:
|
||||
redis_connection.decrement_active_streams()
|
||||
decremented = True
|
||||
|
||||
# Schedule cleanup if no active streams
|
||||
if not redis_connection.has_active_streams():
|
||||
def delayed_cleanup():
|
||||
time.sleep(10) # Wait 10 seconds
|
||||
if not redis_connection.has_active_streams():
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Cleaning up idle Redis connection")
|
||||
redis_connection.cleanup()
|
||||
|
||||
import threading
|
||||
cleanup_thread = threading.Thread(target=delayed_cleanup)
|
||||
cleanup_thread.daemon = True
|
||||
cleanup_thread.start()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{client_id}] Worker {self.worker_id} - Error in Redis-backed stream: {e}")
|
||||
if not decremented:
|
||||
redis_connection.decrement_active_streams()
|
||||
decremented = True
|
||||
redis_connection.cleanup()
|
||||
yield b"Error: Stream interrupted"
|
||||
|
||||
finally:
|
||||
if not decremented:
|
||||
redis_connection.decrement_active_streams()
|
||||
|
||||
# Create streaming response
|
||||
response = StreamingHttpResponse(
|
||||
streaming_content=stream_generator(),
|
||||
content_type=connection_headers.get('content_type', 'video/mp4')
|
||||
)
|
||||
|
||||
# Set appropriate status code
|
||||
response.status_code = 206 if range_header else 200
|
||||
|
||||
# Set required headers
|
||||
response['Cache-Control'] = 'no-cache'
|
||||
response['Pragma'] = 'no-cache'
|
||||
response['X-Content-Type-Options'] = 'nosniff'
|
||||
response['Connection'] = 'keep-alive'
|
||||
response['X-Worker-ID'] = self.worker_id # Identify which worker served this
|
||||
|
||||
if connection_headers.get('content_length'):
|
||||
response['Accept-Ranges'] = 'bytes'
|
||||
response['Content-Length'] = connection_headers['content_length']
|
||||
|
||||
# Set Content-Range for partial requests
|
||||
if range_header and 'bytes=' in range_header:
|
||||
try:
|
||||
range_part = range_header.replace('bytes=', '')
|
||||
if '-' in range_part:
|
||||
start_byte, end_byte = range_part.split('-', 1)
|
||||
start = int(start_byte) if start_byte else 0
|
||||
end = int(end_byte) if end_byte else int(connection_headers['content_length']) - 1
|
||||
total_size = int(connection_headers['content_length'])
|
||||
|
||||
content_range = f"bytes {start}-{end}/{total_size}"
|
||||
response['Content-Range'] = content_range
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Set Content-Range: {content_range}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[{client_id}] Worker {self.worker_id} - Could not set Content-Range: {e}")
|
||||
|
||||
logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed response ready (status: {response.status_code})")
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{client_id}] Worker {self.worker_id} - Error in Redis-backed stream_content_with_session: {e}", exc_info=True)
|
||||
return HttpResponse(f"Streaming error: {str(e)}", status=500)
|
||||
|
||||
def _apply_timeshift_parameters(self, original_url, utc_start=None, utc_end=None, offset=None):
|
||||
"""Apply timeshift parameters to URL"""
|
||||
if not any([utc_start, utc_end, offset]):
|
||||
return original_url
|
||||
|
||||
try:
|
||||
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
|
||||
|
||||
parsed_url = urlparse(original_url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
path = parsed_url.path
|
||||
|
||||
logger.info(f"Applying timeshift parameters: utc_start={utc_start}, utc_end={utc_end}, offset={offset}")
|
||||
|
||||
# Add timeshift parameters
|
||||
if utc_start:
|
||||
query_params['utc_start'] = [utc_start]
|
||||
query_params['start'] = [utc_start]
|
||||
logger.info(f"Added utc_start/start parameter: {utc_start}")
|
||||
|
||||
if utc_end:
|
||||
query_params['utc_end'] = [utc_end]
|
||||
query_params['end'] = [utc_end]
|
||||
logger.info(f"Added utc_end/end parameter: {utc_end}")
|
||||
|
||||
if offset:
|
||||
try:
|
||||
offset_seconds = int(offset)
|
||||
query_params['offset'] = [str(offset_seconds)]
|
||||
query_params['seek'] = [str(offset_seconds)]
|
||||
query_params['t'] = [str(offset_seconds)]
|
||||
logger.info(f"Added offset/seek/t parameter: {offset_seconds}")
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid offset value: {offset}")
|
||||
|
||||
# Handle special catchup URL patterns
|
||||
if utc_start:
|
||||
try:
|
||||
from datetime import datetime
|
||||
import re
|
||||
|
||||
# Parse the UTC start time
|
||||
start_dt = datetime.fromisoformat(utc_start.replace('Z', '+00:00'))
|
||||
|
||||
# Check for catchup URL patterns like /catchup/YYYY-MM-DD/HH-MM-SS/
|
||||
catchup_pattern = r'/catchup/\d{4}-\d{2}-\d{2}/\d{2}-\d{2}-\d{2}/'
|
||||
if re.search(catchup_pattern, path):
|
||||
# Replace the date/time in the path
|
||||
date_part = start_dt.strftime('%Y-%m-%d')
|
||||
time_part = start_dt.strftime('%H-%M-%S')
|
||||
|
||||
path = re.sub(catchup_pattern, f'/catchup/{date_part}/{time_part}/', path)
|
||||
logger.info(f"Modified catchup path: {path}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not parse timeshift date: {e}")
|
||||
|
||||
# Reconstruct URL
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
modified_url = urlunparse((
|
||||
parsed_url.scheme,
|
||||
parsed_url.netloc,
|
||||
path,
|
||||
parsed_url.params,
|
||||
new_query,
|
||||
parsed_url.fragment
|
||||
))
|
||||
|
||||
logger.info(f"Modified URL: {modified_url}")
|
||||
return modified_url
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error applying timeshift parameters: {e}")
|
||||
return original_url
|
||||
|
||||
def cleanup_persistent_connection(self, session_id: str):
|
||||
"""Clean up a specific Redis-backed persistent connection"""
|
||||
logger.info(f"[{session_id}] Cleaning up Redis-backed persistent connection")
|
||||
|
||||
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
|
||||
redis_connection.cleanup()
|
||||
|
||||
# Also clean up session data
|
||||
if self.redis_client:
|
||||
try:
|
||||
session_key = f"vod_session:{session_id}"
|
||||
self.redis_client.delete(session_key)
|
||||
logger.info(f"[{session_id}] Cleaned up session data")
|
||||
except Exception as e:
|
||||
logger.error(f"[{session_id}] Error cleaning up session data: {e}")
|
||||
|
||||
def cleanup_stale_persistent_connections(self, max_age_seconds: int = 1800):
|
||||
"""Clean up stale Redis-backed persistent connections"""
|
||||
if not self.redis_client:
|
||||
return
|
||||
|
||||
try:
|
||||
logger.info(f"Cleaning up Redis-backed connections older than {max_age_seconds} seconds")
|
||||
|
||||
# Find all persistent connection keys
|
||||
pattern = "vod_persistent_connection:*"
|
||||
cursor = 0
|
||||
cleanup_count = 0
|
||||
current_time = time.time()
|
||||
|
||||
while True:
|
||||
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
|
||||
|
||||
for key in keys:
|
||||
try:
|
||||
# Get connection state
|
||||
data = self.redis_client.hgetall(key)
|
||||
if not data:
|
||||
continue
|
||||
|
||||
# Convert bytes to strings if needed
|
||||
if isinstance(list(data.keys())[0], bytes):
|
||||
data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data.items()}
|
||||
|
||||
last_activity = float(data.get('last_activity', 0))
|
||||
active_streams = int(data.get('active_streams', 0))
|
||||
|
||||
# Clean up if stale and no active streams
|
||||
if (current_time - last_activity > max_age_seconds) and active_streams == 0:
|
||||
session_id = key.decode('utf-8').replace('vod_persistent_connection:', '')
|
||||
logger.info(f"Cleaning up stale connection: {session_id}")
|
||||
|
||||
# Clean up connection and related keys
|
||||
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
|
||||
redis_connection.cleanup()
|
||||
cleanup_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing connection key {key}: {e}")
|
||||
continue
|
||||
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
if cleanup_count > 0:
|
||||
logger.info(f"Cleaned up {cleanup_count} stale Redis-backed connections")
|
||||
else:
|
||||
logger.debug("No stale Redis-backed connections found")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during Redis-backed connection cleanup: {e}")
|
||||
|
||||
def create_connection(self, content_type: str, content_uuid: str, content_name: str,
|
||||
client_id: str, client_ip: str, user_agent: str,
|
||||
m3u_profile: M3UAccountProfile) -> bool:
|
||||
"""Create connection tracking in Redis (same as original but for Redis-backed connections)"""
|
||||
if not self.redis_client:
|
||||
logger.error("Redis client not available for VOD connection tracking")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Check profile connection limits
|
||||
profile_connections_key = f"profile_connections:{m3u_profile.id}"
|
||||
current_connections = self.redis_client.get(profile_connections_key)
|
||||
max_connections = getattr(m3u_profile, 'max_connections', 3) # Default to 3
|
||||
|
||||
if current_connections and int(current_connections) >= max_connections:
|
||||
logger.warning(f"Profile {m3u_profile.name} connection limit exceeded ({current_connections}/{max_connections})")
|
||||
return False
|
||||
|
||||
# Create connection tracking
|
||||
connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}"
|
||||
content_connections_key = f"vod_proxy:content:{content_type}:{content_uuid}:connections"
|
||||
|
||||
# Check if connection already exists
|
||||
if self.redis_client.exists(connection_key):
|
||||
logger.info(f"Connection already exists for {client_id} - {content_type} {content_name}")
|
||||
self.redis_client.hset(connection_key, "last_activity", str(time.time()))
|
||||
return True
|
||||
|
||||
# Connection data
|
||||
connection_data = {
|
||||
"content_type": content_type,
|
||||
"content_uuid": content_uuid,
|
||||
"content_name": content_name,
|
||||
"client_id": client_id,
|
||||
"client_ip": client_ip,
|
||||
"user_agent": user_agent,
|
||||
"m3u_profile_id": m3u_profile.id,
|
||||
"m3u_profile_name": m3u_profile.name,
|
||||
"connected_at": str(time.time()),
|
||||
"last_activity": str(time.time()),
|
||||
"bytes_sent": "0",
|
||||
"position_seconds": "0"
|
||||
}
|
||||
|
||||
# Use pipeline for atomic operations
|
||||
pipe = self.redis_client.pipeline()
|
||||
pipe.hset(connection_key, mapping=connection_data)
|
||||
pipe.expire(connection_key, self.connection_ttl)
|
||||
pipe.incr(profile_connections_key)
|
||||
pipe.sadd(content_connections_key, client_id)
|
||||
pipe.expire(content_connections_key, self.connection_ttl)
|
||||
pipe.execute()
|
||||
|
||||
logger.info(f"Created Redis-backed VOD connection: {client_id} for {content_type} {content_name}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating Redis-backed connection: {e}")
|
||||
return False
|
||||
|
||||
def remove_connection(self, content_type: str, content_uuid: str, client_id: str):
|
||||
"""Remove connection tracking from Redis"""
|
||||
if not self.redis_client:
|
||||
return
|
||||
|
||||
try:
|
||||
connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}"
|
||||
content_connections_key = f"vod_proxy:content:{content_type}:{content_uuid}:connections"
|
||||
|
||||
# Get connection data to find profile
|
||||
connection_data = self.redis_client.hgetall(connection_key)
|
||||
if connection_data:
|
||||
# Convert bytes to strings if needed
|
||||
if isinstance(list(connection_data.keys())[0], bytes):
|
||||
connection_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in connection_data.items()}
|
||||
|
||||
profile_id = connection_data.get('m3u_profile_id')
|
||||
if profile_id:
|
||||
profile_connections_key = f"profile_connections:{profile_id}"
|
||||
|
||||
# Use pipeline for atomic operations
|
||||
pipe = self.redis_client.pipeline()
|
||||
pipe.delete(connection_key)
|
||||
pipe.srem(content_connections_key, client_id)
|
||||
pipe.decr(profile_connections_key)
|
||||
pipe.execute()
|
||||
|
||||
logger.info(f"Removed Redis-backed connection: {client_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error removing Redis-backed connection: {e}")
|
||||
|
||||
def update_connection_activity(self, content_type: str, content_uuid: str,
|
||||
client_id: str, bytes_sent: int):
|
||||
"""Update connection activity in Redis"""
|
||||
if not self.redis_client:
|
||||
return
|
||||
|
||||
try:
|
||||
connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}"
|
||||
pipe = self.redis_client.pipeline()
|
||||
pipe.hset(connection_key, mapping={
|
||||
"last_activity": str(time.time()),
|
||||
"bytes_sent": str(bytes_sent)
|
||||
})
|
||||
pipe.expire(connection_key, self.connection_ttl)
|
||||
pipe.execute()
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating connection activity: {e}")
|
||||
|
||||
def find_matching_idle_session(self, content_type: str, content_uuid: str,
|
||||
client_ip: str, user_agent: str,
|
||||
utc_start=None, utc_end=None, offset=None) -> Optional[str]:
|
||||
"""Find existing Redis-backed session that matches criteria"""
|
||||
if not self.redis_client:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Search for sessions with matching content
|
||||
pattern = "vod_session:*"
|
||||
cursor = 0
|
||||
matching_sessions = []
|
||||
|
||||
while True:
|
||||
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
|
||||
|
||||
for key in keys:
|
||||
try:
|
||||
session_data = self.redis_client.hgetall(key)
|
||||
if not session_data:
|
||||
continue
|
||||
|
||||
# Convert bytes keys/values to strings if needed
|
||||
if isinstance(list(session_data.keys())[0], bytes):
|
||||
session_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in session_data.items()}
|
||||
|
||||
# Check if content matches
|
||||
stored_content_type = session_data.get('content_type', '')
|
||||
stored_content_uuid = session_data.get('content_uuid', '')
|
||||
|
||||
if stored_content_type != content_type or stored_content_uuid != content_uuid:
|
||||
continue
|
||||
|
||||
# Extract session ID
|
||||
session_id = key.decode('utf-8').replace('vod_session:', '')
|
||||
|
||||
# Check if Redis-backed connection exists and has no active streams
|
||||
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
|
||||
if redis_connection.has_active_streams():
|
||||
continue
|
||||
|
||||
# Calculate match score
|
||||
score = 10 # Content match
|
||||
match_reasons = ["content"]
|
||||
|
||||
# Check other criteria
|
||||
stored_client_ip = session_data.get('client_ip', '')
|
||||
stored_user_agent = session_data.get('user_agent', '')
|
||||
|
||||
if stored_client_ip and stored_client_ip == client_ip:
|
||||
score += 5
|
||||
match_reasons.append("ip")
|
||||
|
||||
if stored_user_agent and stored_user_agent == user_agent:
|
||||
score += 3
|
||||
match_reasons.append("user-agent")
|
||||
|
||||
# Check timeshift parameters
|
||||
stored_utc_start = session_data.get('utc_start', '')
|
||||
stored_utc_end = session_data.get('utc_end', '')
|
||||
stored_offset = session_data.get('offset', '')
|
||||
|
||||
current_utc_start = utc_start or ""
|
||||
current_utc_end = utc_end or ""
|
||||
current_offset = str(offset) if offset else ""
|
||||
|
||||
if (stored_utc_start == current_utc_start and
|
||||
stored_utc_end == current_utc_end and
|
||||
stored_offset == current_offset):
|
||||
score += 7
|
||||
match_reasons.append("timeshift")
|
||||
|
||||
if score >= 13: # Good match threshold
|
||||
matching_sessions.append({
|
||||
'session_id': session_id,
|
||||
'score': score,
|
||||
'reasons': match_reasons,
|
||||
'last_activity': float(session_data.get('last_activity', '0'))
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error processing session key {key}: {e}")
|
||||
continue
|
||||
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
# Sort by score and last activity
|
||||
matching_sessions.sort(key=lambda x: (x['score'], x['last_activity']), reverse=True)
|
||||
|
||||
if matching_sessions:
|
||||
best_match = matching_sessions[0]
|
||||
logger.info(f"Found matching Redis-backed idle session: {best_match['session_id']} "
|
||||
f"(score: {best_match['score']}, reasons: {', '.join(best_match['reasons'])})")
|
||||
return best_match['session_id']
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error finding matching idle session: {e}")
|
||||
return None
|
||||
|
|
@ -15,6 +15,7 @@ from django.views import View
|
|||
from apps.vod.models import Movie, Series, Episode
|
||||
from apps.m3u.models import M3UAccount, M3UAccountProfile
|
||||
from apps.proxy.vod_proxy.connection_manager import VODConnectionManager
|
||||
from apps.proxy.vod_proxy.multi_worker_connection_manager import MultiWorkerVODConnectionManager
|
||||
from .utils import get_client_info, create_vod_response
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -180,8 +181,9 @@ class VODStreamView(View):
|
|||
logger.error(f"[VOD-ERROR] Invalid stream URL: {final_stream_url}")
|
||||
return HttpResponse("Invalid stream URL", status=500)
|
||||
|
||||
# Get connection manager
|
||||
connection_manager = VODConnectionManager.get_instance()
|
||||
# Get connection manager (Redis-backed for multi-worker support)
|
||||
connection_manager = MultiWorkerVODConnectionManager.get_instance()
|
||||
|
||||
|
||||
# Stream the content with session-based connection reuse
|
||||
logger.info("[VOD-STREAM] Calling connection manager to stream content")
|
||||
|
|
@ -304,7 +306,9 @@ class VODStreamView(View):
|
|||
|
||||
# Now create a persistent connection for the session (if one doesn't exist)
|
||||
# This ensures the FUSE GET requests will reuse the same connection
|
||||
connection_manager = VODConnectionManager.get_instance()
|
||||
|
||||
connection_manager = MultiWorkerVODConnectionManager.get_instance()
|
||||
|
||||
logger.info(f"[VOD-HEAD] Pre-creating persistent connection for session: {session_id}")
|
||||
|
||||
# We don't actually stream content here, just ensure connection is ready
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ die-on-term = true
|
|||
static-map = /static=/app/static
|
||||
|
||||
# Worker configuration
|
||||
workers = 1
|
||||
workers = 8
|
||||
threads = 8
|
||||
enable-threads = true
|
||||
lazy-apps = true
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue