Dispatcharr/apps/proxy/vod_proxy/connection_manager.py
SergeantPanda de31826137 refactor: externalize Redis and Celery configuration via environment variables
Replace hardcoded localhost:6379 values throughout codebase with environment-based configuration. Add REDIS_PORT support and allow REDIS_URL override for external Redis services. Configure Celery broker/result backend to use Redis settings with environment variable overrides.

Closes #762
2025-12-18 16:54:59 -06:00

1449 lines
67 KiB
Python

"""
VOD Connection Manager - Redis-based connection tracking for VOD streams
"""
import time
import json
import logging
import threading
import random
import re
import requests
from typing import Optional, Dict, Any
from django.http import StreamingHttpResponse, HttpResponse
from core.utils import RedisClient
from apps.vod.models import Movie, Episode
from apps.m3u.models import M3UAccountProfile
logger = logging.getLogger("vod_proxy")
class PersistentVODConnection:
"""Handles a single persistent connection to a VOD provider for a session"""
def __init__(self, session_id: str, stream_url: str, headers: dict):
self.session_id = session_id
self.stream_url = stream_url
self.base_headers = headers
self.session = None
self.current_response = None
self.content_length = None
self.content_type = 'video/mp4'
self.final_url = None
self.lock = threading.Lock()
self.request_count = 0 # Track number of requests on this connection
self.last_activity = time.time() # Track last activity for cleanup
self.cleanup_timer = None # Timer for delayed cleanup
self.active_streams = 0 # Count of active stream generators
def _establish_connection(self, range_header=None):
"""Establish or re-establish connection to provider"""
try:
if not self.session:
self.session = requests.Session()
headers = self.base_headers.copy()
# Validate range header against content length
if range_header and self.content_length:
logger.info(f"[{self.session_id}] Validating range {range_header} against content length {self.content_length}")
validated_range = self._validate_range_header(range_header, int(self.content_length))
if validated_range is None:
# Range is not satisfiable, but don't raise error - return empty response
logger.warning(f"[{self.session_id}] Range not satisfiable: {range_header} for content length {self.content_length}")
return None
elif validated_range != range_header:
range_header = validated_range
logger.info(f"[{self.session_id}] Adjusted range header: {range_header}")
else:
logger.info(f"[{self.session_id}] Range header validated successfully: {range_header}")
elif range_header:
logger.info(f"[{self.session_id}] Range header provided but no content length available yet: {range_header}")
if range_header:
headers['Range'] = range_header
logger.info(f"[{self.session_id}] Setting Range header: {range_header}")
# Track request count for better logging
self.request_count += 1
if self.request_count == 1:
logger.info(f"[{self.session_id}] Making initial request to provider")
target_url = self.stream_url
allow_redirects = True
else:
logger.info(f"[{self.session_id}] Making range request #{self.request_count} on SAME session (using final URL)")
# Use the final URL from first request to avoid redirect chain
target_url = self.final_url if self.final_url else self.stream_url
allow_redirects = False # No need to follow redirects again
logger.info(f"[{self.session_id}] Using cached final URL: {target_url}")
response = self.session.get(
target_url,
headers=headers,
stream=True,
timeout=(10, 30),
allow_redirects=allow_redirects
)
response.raise_for_status()
# Log successful response
if self.request_count == 1:
logger.info(f"[{self.session_id}] Request #{self.request_count} successful: {response.status_code} (followed redirects)")
else:
logger.info(f"[{self.session_id}] Request #{self.request_count} successful: {response.status_code} (direct to final URL)")
# Capture headers from final URL
if not self.content_length:
# First check if we have a pre-stored content length from HEAD request
try:
import redis
from django.conf import settings
redis_host = getattr(settings, 'REDIS_HOST', 'localhost')
redis_port = int(getattr(settings, 'REDIS_PORT', 6379))
redis_db = int(getattr(settings, 'REDIS_DB', 0))
r = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
content_length_key = f"vod_content_length:{self.session_id}"
stored_length = r.get(content_length_key)
if stored_length:
self.content_length = stored_length
logger.info(f"[{self.session_id}] *** USING PRE-STORED CONTENT LENGTH: {self.content_length} ***")
else:
# Fallback to response headers
self.content_length = response.headers.get('content-length')
logger.info(f"[{self.session_id}] *** USING RESPONSE CONTENT LENGTH: {self.content_length} ***")
except Exception as e:
logger.error(f"[{self.session_id}] Error checking Redis for content length: {e}")
# Fallback to response headers
self.content_length = response.headers.get('content-length')
self.content_type = response.headers.get('content-type', 'video/mp4')
self.final_url = response.url
logger.info(f"[{self.session_id}] *** PERSISTENT CONNECTION - Final URL: {self.final_url} ***")
logger.info(f"[{self.session_id}] *** PERSISTENT CONNECTION - Content-Length: {self.content_length} ***")
self.current_response = response
return response
except Exception as e:
logger.error(f"[{self.session_id}] Error establishing connection: {e}")
self.cleanup()
raise
def _validate_range_header(self, range_header, content_length):
"""Validate and potentially adjust range header against content length"""
try:
if not range_header or not range_header.startswith('bytes='):
return range_header
range_part = range_header.replace('bytes=', '')
if '-' not in range_part:
return range_header
start_str, end_str = range_part.split('-', 1)
# Parse start byte
if start_str:
start_byte = int(start_str)
if start_byte >= content_length:
# Start is beyond file end - not satisfiable
logger.warning(f"[{self.session_id}] Range start {start_byte} >= content length {content_length} - not satisfiable")
return None
else:
start_byte = 0
# Parse end byte
if end_str:
end_byte = int(end_str)
if end_byte >= content_length:
# Adjust end to file end
end_byte = content_length - 1
logger.info(f"[{self.session_id}] Adjusted range end to {end_byte}")
else:
end_byte = content_length - 1
# Ensure start <= end
if start_byte > end_byte:
logger.warning(f"[{self.session_id}] Range start {start_byte} > end {end_byte} - not satisfiable")
return None
validated_range = f"bytes={start_byte}-{end_byte}"
return validated_range
except (ValueError, IndexError) as e:
logger.warning(f"[{self.session_id}] Could not validate range header {range_header}: {e}")
return range_header
def get_stream(self, range_header=None):
"""Get stream with optional range header - reuses connection for range requests"""
with self.lock:
# Update activity timestamp
self.last_activity = time.time()
# Cancel any pending cleanup since connection is being reused
self.cancel_cleanup()
# For range requests, we don't need to close the connection
# We can make a new request on the same session
if range_header:
logger.info(f"[{self.session_id}] Range request on existing connection: {range_header}")
# Close only the response stream, keep the session alive
if self.current_response:
logger.info(f"[{self.session_id}] Closing previous response stream (keeping connection alive)")
self.current_response.close()
self.current_response = None
# Make new request (reuses connection if session exists)
response = self._establish_connection(range_header)
if response is None:
# Range not satisfiable - return None to indicate this
return None
return self.current_response
def cancel_cleanup(self):
"""Cancel any pending cleanup - called when connection is reused"""
if self.cleanup_timer:
self.cleanup_timer.cancel()
self.cleanup_timer = None
logger.info(f"[{self.session_id}] Cancelled pending cleanup - connection being reused for new request")
def increment_active_streams(self):
"""Increment the count of active streams"""
with self.lock:
self.active_streams += 1
logger.debug(f"[{self.session_id}] Active streams incremented to {self.active_streams}")
def decrement_active_streams(self):
"""Decrement the count of active streams"""
with self.lock:
if self.active_streams > 0:
self.active_streams -= 1
logger.debug(f"[{self.session_id}] Active streams decremented to {self.active_streams}")
else:
logger.warning(f"[{self.session_id}] Attempted to decrement active streams when already at 0")
def has_active_streams(self) -> bool:
"""Check if connection has any active streams"""
with self.lock:
return self.active_streams > 0
def schedule_cleanup_if_not_streaming(self, delay_seconds: int = 10):
"""Schedule cleanup only if no active streams"""
with self.lock:
if self.active_streams > 0:
logger.info(f"[{self.session_id}] Connection has {self.active_streams} active streams - NOT scheduling cleanup")
return False
# No active streams, proceed with delayed cleanup
if self.cleanup_timer:
self.cleanup_timer.cancel()
def delayed_cleanup():
logger.info(f"[{self.session_id}] Delayed cleanup triggered - checking if connection is still needed")
# Use the singleton VODConnectionManager instance
manager = VODConnectionManager.get_instance()
manager.cleanup_persistent_connection(self.session_id)
self.cleanup_timer = threading.Timer(delay_seconds, delayed_cleanup)
self.cleanup_timer.start()
logger.info(f"[{self.session_id}] Scheduled cleanup in {delay_seconds} seconds (connection not actively streaming)")
return True
def get_headers(self):
"""Get headers for response"""
return {
'content_length': self.content_length,
'content_type': self.content_type,
'final_url': self.final_url
}
def cleanup(self):
"""Clean up connection resources"""
with self.lock:
# Cancel any pending cleanup timer
if self.cleanup_timer:
self.cleanup_timer.cancel()
self.cleanup_timer = None
logger.debug(f"[{self.session_id}] Cancelled cleanup timer during manual cleanup")
# Clear active streams count
self.active_streams = 0
if self.current_response:
self.current_response.close()
self.current_response = None
if self.session:
self.session.close()
self.session = None
logger.info(f"[{self.session_id}] Persistent connection cleaned up")
class VODConnectionManager:
"""Manages VOD connections using Redis for tracking"""
_instance = None
_persistent_connections = {} # session_id -> PersistentVODConnection
@classmethod
def get_instance(cls):
"""Get the singleton instance of VODConnectionManager"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self.redis_client = RedisClient.get_client()
self.connection_ttl = 3600 # 1 hour TTL for connections
self.session_ttl = 1800 # 30 minutes TTL for sessions
def find_matching_idle_session(self, content_type: str, content_uuid: str,
client_ip: str, user_agent: str,
utc_start=None, utc_end=None, offset=None) -> Optional[str]:
"""
Find an existing session that matches content and client criteria with no active streams
Args:
content_type: Type of content (movie, episode, series)
content_uuid: UUID of the content
client_ip: Client IP address
user_agent: Client user agent
utc_start: UTC start time for timeshift
utc_end: UTC end time for timeshift
offset: Offset in seconds
Returns:
Session ID if matching idle session found, None otherwise
"""
if not self.redis_client:
return None
try:
# Search for sessions with matching content
pattern = "vod_session:*"
cursor = 0
matching_sessions = []
while True:
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
for key in keys:
try:
session_data = self.redis_client.hgetall(key)
if not session_data:
continue
# Extract session info
stored_content_type = session_data.get(b'content_type', b'').decode('utf-8')
stored_content_uuid = session_data.get(b'content_uuid', b'').decode('utf-8')
# Check if content matches
if stored_content_type != content_type or stored_content_uuid != content_uuid:
continue
# Extract session ID from key
session_id = key.decode('utf-8').replace('vod_session:', '')
# Check if session has an active persistent connection
persistent_conn = self._persistent_connections.get(session_id)
if not persistent_conn:
# No persistent connection exists, skip
continue
# Check if connection has no active streams
if persistent_conn.has_active_streams():
logger.debug(f"[{session_id}] Session has active streams - skipping")
continue
# Get stored client info for comparison
stored_client_ip = session_data.get(b'client_ip', b'').decode('utf-8')
stored_user_agent = session_data.get(b'user_agent', b'').decode('utf-8')
# Check timeshift parameters match
stored_utc_start = session_data.get(b'utc_start', b'').decode('utf-8')
stored_utc_end = session_data.get(b'utc_end', b'').decode('utf-8')
stored_offset = session_data.get(b'offset', b'').decode('utf-8')
current_utc_start = utc_start or ""
current_utc_end = utc_end or ""
current_offset = str(offset) if offset else ""
# Calculate match score
score = 0
match_reasons = []
# Content already matches (required)
score += 10
match_reasons.append("content")
# IP match (high priority)
if stored_client_ip and stored_client_ip == client_ip:
score += 5
match_reasons.append("ip")
# User-Agent match (medium priority)
if stored_user_agent and stored_user_agent == user_agent:
score += 3
match_reasons.append("user-agent")
# Timeshift parameters match (high priority for seeking)
if (stored_utc_start == current_utc_start and
stored_utc_end == current_utc_end and
stored_offset == current_offset):
score += 7
match_reasons.append("timeshift")
# Consider it a good match if we have at least content + one other criteria
if score >= 13: # content(10) + ip(5) or content(10) + user-agent(3) + something else
matching_sessions.append({
'session_id': session_id,
'score': score,
'reasons': match_reasons,
'last_activity': float(session_data.get(b'last_activity', b'0').decode('utf-8'))
})
except Exception as e:
logger.debug(f"Error processing session key {key}: {e}")
continue
if cursor == 0:
break
# Sort by score (highest first), then by last activity (most recent first)
matching_sessions.sort(key=lambda x: (x['score'], x['last_activity']), reverse=True)
if matching_sessions:
best_match = matching_sessions[0]
logger.info(f"Found matching idle session: {best_match['session_id']} "
f"(score: {best_match['score']}, reasons: {', '.join(best_match['reasons'])})")
return best_match['session_id']
else:
logger.debug(f"No matching idle sessions found for {content_type} {content_uuid}")
return None
except Exception as e:
logger.error(f"Error finding matching idle session: {e}")
return None
def _get_connection_key(self, content_type: str, content_uuid: str, client_id: str) -> str:
"""Get Redis key for a specific connection"""
return f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}"
def _get_profile_connections_key(self, profile_id: int) -> str:
"""Get Redis key for tracking connections per profile - STANDARDIZED with TS proxy"""
return f"profile_connections:{profile_id}"
def _get_content_connections_key(self, content_type: str, content_uuid: str) -> str:
"""Get Redis key for tracking connections per content"""
return f"vod_proxy:content:{content_type}:{content_uuid}:connections"
def create_connection(self, content_type: str, content_uuid: str, content_name: str,
client_id: str, client_ip: str, user_agent: str,
m3u_profile: M3UAccountProfile) -> bool:
"""
Create a new VOD connection with profile limit checking
Returns:
bool: True if connection was created, False if profile limit exceeded
"""
if not self.redis_client:
logger.error("Redis client not available for VOD connection tracking")
return False
try:
# Check profile connection limits using standardized key
if not self._check_profile_limits(m3u_profile):
logger.warning(f"Profile {m3u_profile.name} connection limit exceeded")
return False
connection_key = self._get_connection_key(content_type, content_uuid, client_id)
profile_connections_key = self._get_profile_connections_key(m3u_profile.id)
content_connections_key = self._get_content_connections_key(content_type, content_uuid)
# Check if connection already exists to prevent duplicate counting
if self.redis_client.exists(connection_key):
logger.info(f"Connection already exists for {client_id} - {content_type} {content_name}")
# Update activity but don't increment profile counter
self.redis_client.hset(connection_key, "last_activity", str(time.time()))
return True
# Connection data
connection_data = {
"content_type": content_type,
"content_uuid": content_uuid,
"content_name": content_name,
"client_id": client_id,
"client_ip": client_ip,
"user_agent": user_agent,
"m3u_profile_id": m3u_profile.id,
"m3u_profile_name": m3u_profile.name,
"connected_at": str(time.time()),
"last_activity": str(time.time()),
"bytes_sent": "0",
"position_seconds": "0",
"last_position_update": str(time.time())
}
# Use pipeline for atomic operations
pipe = self.redis_client.pipeline()
# Store connection data
pipe.hset(connection_key, mapping=connection_data)
pipe.expire(connection_key, self.connection_ttl)
# Increment profile connections using standardized method
pipe.incr(profile_connections_key)
# Add to content connections set
pipe.sadd(content_connections_key, client_id)
pipe.expire(content_connections_key, self.connection_ttl)
# Execute all operations
pipe.execute()
logger.info(f"Created VOD connection: {client_id} for {content_type} {content_name}")
return True
except Exception as e:
logger.error(f"Error creating VOD connection: {e}")
return False
def _check_profile_limits(self, m3u_profile: M3UAccountProfile) -> bool:
"""Check if profile has available connection slots"""
if m3u_profile.max_streams == 0: # Unlimited
return True
try:
profile_connections_key = self._get_profile_connections_key(m3u_profile.id)
current_connections = int(self.redis_client.get(profile_connections_key) or 0)
return current_connections < m3u_profile.max_streams
except Exception as e:
logger.error(f"Error checking profile limits: {e}")
return False
def update_connection_activity(self, content_type: str, content_uuid: str,
client_id: str, bytes_sent: int = 0,
position_seconds: int = 0) -> bool:
"""Update connection activity"""
if not self.redis_client:
return False
try:
connection_key = self._get_connection_key(content_type, content_uuid, client_id)
update_data = {
"last_activity": str(time.time())
}
if bytes_sent > 0:
# Get current bytes and add to it
current_bytes = self.redis_client.hget(connection_key, "bytes_sent")
if current_bytes:
total_bytes = int(current_bytes.decode('utf-8')) + bytes_sent
else:
total_bytes = bytes_sent
update_data["bytes_sent"] = str(total_bytes)
if position_seconds > 0:
update_data["position_seconds"] = str(position_seconds)
# Update connection data
self.redis_client.hset(connection_key, mapping=update_data)
self.redis_client.expire(connection_key, self.connection_ttl)
return True
except Exception as e:
logger.error(f"Error updating connection activity: {e}")
return False
def remove_connection(self, content_type: str, content_uuid: str, client_id: str) -> bool:
"""Remove a VOD connection"""
if not self.redis_client:
return False
try:
connection_key = self._get_connection_key(content_type, content_uuid, client_id)
# Get connection data before removing
connection_data = self.redis_client.hgetall(connection_key)
if not connection_data:
return True # Already removed
# Get profile ID for cleanup
profile_id = None
if b"m3u_profile_id" in connection_data:
try:
profile_id = int(connection_data[b"m3u_profile_id"].decode('utf-8'))
except ValueError:
pass
# Use pipeline for atomic cleanup
pipe = self.redis_client.pipeline()
# Remove connection data
pipe.delete(connection_key)
# Decrement profile connections using standardized key
if profile_id:
profile_connections_key = self._get_profile_connections_key(profile_id)
current_count = int(self.redis_client.get(profile_connections_key) or 0)
if current_count > 0:
pipe.decr(profile_connections_key)
# Remove from content connections set
content_connections_key = self._get_content_connections_key(content_type, content_uuid)
pipe.srem(content_connections_key, client_id)
# Execute cleanup
pipe.execute()
logger.info(f"Removed VOD connection: {client_id}")
return True
except Exception as e:
logger.error(f"Error removing connection: {e}")
return False
def get_connection_info(self, content_type: str, content_uuid: str, client_id: str) -> Optional[Dict[str, Any]]:
"""Get connection information"""
if not self.redis_client:
return None
try:
connection_key = self._get_connection_key(content_type, content_uuid, client_id)
connection_data = self.redis_client.hgetall(connection_key)
if not connection_data:
return None
# Convert bytes to strings and parse numbers
info = {}
for key, value in connection_data.items():
key_str = key.decode('utf-8')
value_str = value.decode('utf-8')
# Parse numeric fields
if key_str in ['connected_at', 'last_activity']:
info[key_str] = float(value_str)
elif key_str in ['bytes_sent', 'position_seconds', 'm3u_profile_id']:
info[key_str] = int(value_str)
else:
info[key_str] = value_str
return info
except Exception as e:
logger.error(f"Error getting connection info: {e}")
return None
def get_profile_connections(self, profile_id: int) -> int:
"""Get current connection count for a profile using standardized key"""
if not self.redis_client:
return 0
try:
profile_connections_key = self._get_profile_connections_key(profile_id)
return int(self.redis_client.get(profile_connections_key) or 0)
except Exception as e:
logger.error(f"Error getting profile connections: {e}")
return 0
def get_content_connections(self, content_type: str, content_uuid: str) -> int:
"""Get current connection count for content"""
if not self.redis_client:
return 0
try:
content_connections_key = self._get_content_connections_key(content_type, content_uuid)
return self.redis_client.scard(content_connections_key) or 0
except Exception as e:
logger.error(f"Error getting content connections: {e}")
return 0
def cleanup_stale_connections(self, max_age_seconds: int = 3600):
"""Clean up stale connections that haven't been active recently"""
if not self.redis_client:
return
try:
pattern = "vod_proxy:connection:*"
cursor = 0
cleaned = 0
current_time = time.time()
while True:
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
for key in keys:
try:
key_str = key.decode('utf-8')
last_activity = self.redis_client.hget(key, "last_activity")
if last_activity:
last_activity_time = float(last_activity.decode('utf-8'))
if current_time - last_activity_time > max_age_seconds:
# Extract info for cleanup
parts = key_str.split(':')
if len(parts) >= 5:
content_type = parts[2]
content_uuid = parts[3]
client_id = parts[4]
self.remove_connection(content_type, content_uuid, client_id)
cleaned += 1
except Exception as e:
logger.error(f"Error processing key {key}: {e}")
if cursor == 0:
break
if cleaned > 0:
logger.info(f"Cleaned up {cleaned} stale VOD connections")
except Exception as e:
logger.error(f"Error during connection cleanup: {e}")
def stream_content(self, content_obj, stream_url, m3u_profile, client_ip, user_agent, request,
utc_start=None, utc_end=None, offset=None, range_header=None):
"""
Stream VOD content with connection tracking and timeshift support
Args:
content_obj: Movie or Episode object
stream_url: Final stream URL to proxy
m3u_profile: M3UAccountProfile instance
client_ip: Client IP address
user_agent: Client user agent
request: Django request object
utc_start: UTC start time for timeshift (e.g., '2023-01-01T12:00:00')
utc_end: UTC end time for timeshift
offset: Offset in seconds for seeking
range_header: HTTP Range header for partial content requests
Returns:
StreamingHttpResponse or HttpResponse with error
"""
try:
# Generate unique client ID
client_id = f"vod_{int(time.time() * 1000)}_{random.randint(1000, 9999)}"
# Determine content type and get content info
if hasattr(content_obj, 'episodes'): # Series
content_type = 'series'
elif hasattr(content_obj, 'series'): # Episode
content_type = 'episode'
else: # Movie
content_type = 'movie'
content_uuid = str(content_obj.uuid)
content_name = getattr(content_obj, 'name', getattr(content_obj, 'title', 'Unknown'))
# Create connection tracking
connection_created = self.create_connection(
content_type=content_type,
content_uuid=content_uuid,
content_name=content_name,
client_id=client_id,
client_ip=client_ip,
user_agent=user_agent,
m3u_profile=m3u_profile
)
if not connection_created:
logger.error(f"Failed to create connection tracking for {content_type} {content_uuid}")
return HttpResponse("Connection limit exceeded", status=503)
# Modify stream URL for timeshift functionality
modified_stream_url = self._apply_timeshift_parameters(
stream_url, utc_start, utc_end, offset
)
logger.info(f"[{client_id}] Modified stream URL for timeshift: {modified_stream_url}")
# Create streaming generator with simplified header handling
upstream_response = None
def stream_generator():
nonlocal upstream_response
try:
logger.info(f"[{client_id}] Starting VOD stream for {content_type} {content_name}")
# Prepare request headers
headers = {}
if user_agent:
headers['User-Agent'] = user_agent
# Forward important headers
important_headers = [
'authorization', 'x-forwarded-for', 'x-real-ip',
'referer', 'origin', 'accept'
]
for header_name in important_headers:
django_header = f'HTTP_{header_name.upper().replace("-", "_")}'
if hasattr(request, 'META') and django_header in request.META:
headers[header_name] = request.META[django_header]
logger.debug(f"[{client_id}] Forwarded header {header_name}")
# Add client IP
if client_ip:
headers['X-Forwarded-For'] = client_ip
headers['X-Real-IP'] = client_ip
# Add Range header if provided for seeking support
if range_header:
headers['Range'] = range_header
logger.info(f"[{client_id}] Added Range header: {range_header}")
# Make request to upstream server with automatic redirect following
upstream_response = requests.get(modified_stream_url, headers=headers, stream=True, timeout=(10, 30), allow_redirects=True)
upstream_response.raise_for_status()
# Log upstream response info
logger.info(f"[{client_id}] Upstream response status: {upstream_response.status_code}")
logger.info(f"[{client_id}] Upstream content-type: {upstream_response.headers.get('content-type', 'unknown')}")
if 'content-length' in upstream_response.headers:
logger.info(f"[{client_id}] Upstream content-length: {upstream_response.headers['content-length']}")
if 'content-range' in upstream_response.headers:
logger.info(f"[{client_id}] Upstream content-range: {upstream_response.headers['content-range']}")
bytes_sent = 0
chunk_count = 0
for chunk in upstream_response.iter_content(chunk_size=8192):
if chunk:
yield chunk
bytes_sent += len(chunk)
chunk_count += 1
# Update connection activity every 100 chunks
if chunk_count % 100 == 0:
self.update_connection_activity(
content_type=content_type,
content_uuid=content_uuid,
client_id=client_id,
bytes_sent=len(chunk)
)
logger.info(f"[{client_id}] VOD stream completed: {bytes_sent} bytes sent")
except requests.RequestException as e:
logger.error(f"[{client_id}] Error streaming from source: {e}")
yield b"Error: Unable to stream content"
except Exception as e:
logger.error(f"[{client_id}] Error in stream generator: {e}")
finally:
# Clean up connection tracking
self.remove_connection(content_type, content_uuid, client_id)
if upstream_response:
upstream_response.close()
def stream_generator():
nonlocal upstream_response
try:
logger.info(f"[{client_id}] Starting VOD stream for {content_type} {content_name}")
# Prepare request headers
headers = {}
if user_agent:
headers['User-Agent'] = user_agent
# Forward important headers
important_headers = [
'authorization', 'x-forwarded-for', 'x-real-ip',
'referer', 'origin', 'accept'
]
for header_name in important_headers:
django_header = f'HTTP_{header_name.upper().replace("-", "_")}'
if hasattr(request, 'META') and django_header in request.META:
headers[header_name] = request.META[django_header]
logger.debug(f"[{client_id}] Forwarded header {header_name}")
# Add client IP
if client_ip:
headers['X-Forwarded-For'] = client_ip
headers['X-Real-IP'] = client_ip
# Add Range header if provided for seeking support
if range_header:
headers['Range'] = range_header
logger.info(f"[{client_id}] Added Range header: {range_header}")
# Make single request to upstream server with automatic redirect following
upstream_response = requests.get(modified_stream_url, headers=headers, stream=True, timeout=(10, 30), allow_redirects=True)
upstream_response.raise_for_status()
# Log upstream response info
logger.info(f"[{client_id}] Upstream response status: {upstream_response.status_code}")
logger.info(f"[{client_id}] Final URL after redirects: {upstream_response.url}")
logger.info(f"[{client_id}] Upstream content-type: {upstream_response.headers.get('content-type', 'unknown')}")
if 'content-length' in upstream_response.headers:
logger.info(f"[{client_id}] Upstream content-length: {upstream_response.headers['content-length']}")
if 'content-range' in upstream_response.headers:
logger.info(f"[{client_id}] Upstream content-range: {upstream_response.headers['content-range']}")
bytes_sent = 0
chunk_count = 0
for chunk in upstream_response.iter_content(chunk_size=8192):
if chunk:
yield chunk
bytes_sent += len(chunk)
chunk_count += 1
# Update connection activity every 100 chunks
if chunk_count % 100 == 0:
self.update_connection_activity(
content_type=content_type,
content_uuid=content_uuid,
client_id=client_id,
bytes_sent=len(chunk)
)
logger.info(f"[{client_id}] VOD stream completed: {bytes_sent} bytes sent")
except requests.RequestException as e:
logger.error(f"[{client_id}] Error streaming from source: {e}")
yield b"Error: Unable to stream content"
except Exception as e:
logger.error(f"[{client_id}] Error in stream generator: {e}")
finally:
# Clean up connection tracking
self.remove_connection(content_type, content_uuid, client_id)
if upstream_response:
upstream_response.close()
# Create streaming response with sensible defaults
response = StreamingHttpResponse(
streaming_content=stream_generator(),
content_type='video/mp4'
)
# Set status code based on request type
if range_header:
response.status_code = 206
logger.info(f"[{client_id}] Set response status to 206 for range request")
else:
response.status_code = 200
logger.info(f"[{client_id}] Set response status to 200 for full request")
# Set headers that VLC and other players expect
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['X-Content-Type-Options'] = 'nosniff'
response['Connection'] = 'keep-alive'
response['Accept-Ranges'] = 'bytes'
# Log the critical headers we're sending to the client
logger.info(f"[{client_id}] Response headers to client - Status: {response.status_code}, Accept-Ranges: {response.get('Accept-Ranges', 'MISSING')}")
if 'Content-Length' in response:
logger.info(f"[{client_id}] Content-Length: {response['Content-Length']}")
if 'Content-Range' in response:
logger.info(f"[{client_id}] Content-Range: {response['Content-Range']}")
if 'Content-Type' in response:
logger.info(f"[{client_id}] Content-Type: {response['Content-Type']}")
# Critical: Log what VLC needs to see for seeking to work
if response.status_code == 200:
logger.info(f"[{client_id}] VLC SEEKING INFO: Full content response (200). VLC should see Accept-Ranges and Content-Length to enable seeking.")
elif response.status_code == 206:
logger.info(f"[{client_id}] VLC SEEKING INFO: Partial content response (206). This confirms seeking is working if VLC requested a range.")
return response
except Exception as e:
logger.error(f"Error in stream_content: {e}", exc_info=True)
return HttpResponse(f"Streaming error: {str(e)}", status=500)
def stream_content_with_session(self, session_id, content_obj, stream_url, m3u_profile, client_ip, user_agent, request,
utc_start=None, utc_end=None, offset=None, range_header=None):
"""
Stream VOD content with persistent connection per session
Maintains 1 open connection to provider per session that handles all range requests
dynamically based on client Range headers for seeking functionality.
"""
try:
# Use session_id as client_id for connection tracking
client_id = session_id
# Determine content type and get content info
if hasattr(content_obj, 'episodes'): # Series
content_type = 'series'
elif hasattr(content_obj, 'series'): # Episode
content_type = 'episode'
else: # Movie
content_type = 'movie'
content_uuid = str(content_obj.uuid)
content_name = getattr(content_obj, 'name', getattr(content_obj, 'title', 'Unknown'))
# Check for existing connection or create new one
persistent_conn = self._persistent_connections.get(session_id)
# Cancel any pending cleanup timer for this session regardless of new/existing
if persistent_conn:
persistent_conn.cancel_cleanup()
# If no existing connection, try to find a matching idle session first
if not persistent_conn:
# Look for existing idle sessions that match content and client criteria
matching_session_id = self.find_matching_idle_session(
content_type, content_uuid, client_ip, user_agent,
utc_start, utc_end, offset
)
if matching_session_id:
logger.info(f"[{client_id}] Found matching idle session {matching_session_id} - redirecting client")
# Update the session activity and client info
session_key = f"vod_session:{matching_session_id}"
if self.redis_client:
update_data = {
"last_activity": str(time.time()),
"client_ip": client_ip, # Update in case IP changed
"user_agent": user_agent # Update in case user agent changed
}
self.redis_client.hset(session_key, mapping=update_data)
self.redis_client.expire(session_key, self.session_ttl)
# Get the existing persistent connection
persistent_conn = self._persistent_connections.get(matching_session_id)
if persistent_conn:
# Update the session_id to use the matching one
client_id = matching_session_id
session_id = matching_session_id
logger.info(f"[{client_id}] Successfully redirected to existing idle session")
else:
logger.warning(f"[{client_id}] Matching session found but no persistent connection - will create new")
if not persistent_conn:
logger.info(f"[{client_id}] Creating NEW persistent connection for {content_type} {content_name}")
# Create session in Redis for tracking
session_info = {
"content_type": content_type,
"content_uuid": content_uuid,
"content_name": content_name,
"created_at": str(time.time()),
"last_activity": str(time.time()),
"profile_id": str(m3u_profile.id),
"connection_counted": "True",
"client_ip": client_ip,
"user_agent": user_agent,
"utc_start": utc_start or "",
"utc_end": utc_end or "",
"offset": str(offset) if offset else ""
}
session_key = f"vod_session:{session_id}"
if self.redis_client:
self.redis_client.hset(session_key, mapping=session_info)
self.redis_client.expire(session_key, self.session_ttl)
logger.info(f"[{client_id}] Created new session: {session_info}")
# Apply timeshift parameters to URL
modified_stream_url = self._apply_timeshift_parameters(stream_url, utc_start, utc_end, offset)
logger.info(f"[{client_id}] Modified stream URL for timeshift: {modified_stream_url}")
# Prepare headers
headers = {
'User-Agent': user_agent or 'VLC/3.0.21 LibVLC/3.0.21',
'Accept': '*/*',
'Connection': 'keep-alive'
}
# Add any authentication headers from profile
if hasattr(m3u_profile, 'auth_headers') and m3u_profile.auth_headers:
headers.update(m3u_profile.auth_headers)
# Create persistent connection
persistent_conn = PersistentVODConnection(session_id, modified_stream_url, headers)
self._persistent_connections[session_id] = persistent_conn
# Track connection in profile
self.create_connection(content_type, content_uuid, content_name, client_id, client_ip, user_agent, m3u_profile)
else:
logger.info(f"[{client_id}] Using EXISTING persistent connection for {content_type} {content_name}")
# Update session activity
session_key = f"vod_session:{session_id}"
if self.redis_client:
self.redis_client.hset(session_key, "last_activity", str(time.time()))
self.redis_client.expire(session_key, self.session_ttl)
logger.info(f"[{client_id}] Reusing existing session - no new connection created")
# Log the incoming Range header for debugging
if range_header:
logger.info(f"[{client_id}] *** CLIENT RANGE REQUEST: {range_header} ***")
# Parse range for seeking detection
try:
if 'bytes=' in range_header:
range_part = range_header.replace('bytes=', '')
if '-' in range_part:
start_byte, end_byte = range_part.split('-', 1)
if start_byte and int(start_byte) > 0:
start_pos_mb = int(start_byte) / (1024 * 1024)
logger.info(f"[{client_id}] *** VLC SEEKING TO: {start_pos_mb:.1f} MB ***")
else:
logger.info(f"[{client_id}] Range request from start")
except Exception as e:
logger.warning(f"[{client_id}] Could not parse range header: {e}")
else:
logger.info(f"[{client_id}] Full content request (no Range header)")
# Get stream from persistent connection with current range
upstream_response = persistent_conn.get_stream(range_header)
# Handle range not satisfiable
if upstream_response is None:
logger.warning(f"[{client_id}] Range not satisfiable - returning 416 error")
return HttpResponse(
"Requested Range Not Satisfiable",
status=416,
headers={
'Content-Range': f'bytes */{persistent_conn.content_length}' if persistent_conn.content_length else 'bytes */*'
}
)
connection_headers = persistent_conn.get_headers()
# Ensure any pending cleanup is cancelled before starting stream
persistent_conn.cancel_cleanup()
# Create streaming generator
def stream_generator():
decremented = False # Track if we've already decremented the counter
try:
logger.info(f"[{client_id}] Starting stream from persistent connection")
# Increment active streams counter
persistent_conn.increment_active_streams()
bytes_sent = 0
chunk_count = 0
for chunk in upstream_response.iter_content(chunk_size=8192):
if chunk:
yield chunk
bytes_sent += len(chunk)
chunk_count += 1
# Update connection activity every 100 chunks
if chunk_count % 100 == 0:
self.update_connection_activity(
content_type=content_type,
content_uuid=content_uuid,
client_id=client_id,
bytes_sent=len(chunk)
)
logger.info(f"[{client_id}] Persistent stream completed normally: {bytes_sent} bytes sent")
# Stream completed normally - decrement counter
persistent_conn.decrement_active_streams()
decremented = True
except GeneratorExit:
# Client disconnected - decrement counter and schedule cleanup only if no active streams
logger.info(f"[{client_id}] Client disconnected - checking if cleanup should be scheduled")
persistent_conn.decrement_active_streams()
decremented = True
scheduled = persistent_conn.schedule_cleanup_if_not_streaming(delay_seconds=10)
if not scheduled:
logger.info(f"[{client_id}] Cleanup not scheduled - connection still has active streams")
except Exception as e:
logger.error(f"[{client_id}] Error in persistent stream: {e}")
# On error, decrement counter and cleanup the connection as it may be corrupted
persistent_conn.decrement_active_streams()
decremented = True
logger.info(f"[{client_id}] Cleaning up persistent connection due to error")
self.cleanup_persistent_connection(session_id)
yield b"Error: Stream interrupted"
finally:
# Safety net: only decrement if we haven't already
if not decremented:
logger.warning(f"[{client_id}] Stream generator exited without decrement - applying safety net")
persistent_conn.decrement_active_streams()
# This runs regardless of how the generator exits
logger.debug(f"[{client_id}] Stream generator finished")
# Create streaming response
response = StreamingHttpResponse(
streaming_content=stream_generator(),
content_type=connection_headers['content_type']
)
# Set status code based on range request
if range_header:
response.status_code = 206
logger.info(f"[{client_id}] Set response status to 206 for range request")
else:
response.status_code = 200
logger.info(f"[{client_id}] Set response status to 200 for full request")
# Set headers that VLC expects
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['X-Content-Type-Options'] = 'nosniff'
response['Connection'] = 'keep-alive'
response['Accept-Ranges'] = 'bytes'
# CRITICAL: Forward Content-Length from persistent connection
if connection_headers['content_length']:
response['Content-Length'] = connection_headers['content_length']
logger.info(f"[{client_id}] *** FORWARDED Content-Length: {connection_headers['content_length']} *** (VLC seeking enabled)")
else:
logger.warning(f"[{client_id}] *** NO Content-Length available *** (VLC seeking may not work)")
# Handle range requests - set Content-Range for partial responses
if range_header and connection_headers['content_length']:
try:
if 'bytes=' in range_header:
range_part = range_header.replace('bytes=', '')
if '-' in range_part:
start_byte, end_byte = range_part.split('-', 1)
start = int(start_byte) if start_byte else 0
end = int(end_byte) if end_byte else int(connection_headers['content_length']) - 1
total_size = int(connection_headers['content_length'])
content_range = f"bytes {start}-{end}/{total_size}"
response['Content-Range'] = content_range
logger.info(f"[{client_id}] Set Content-Range: {content_range}")
except Exception as e:
logger.warning(f"[{client_id}] Could not set Content-Range: {e}")
# Log response headers
logger.info(f"[{client_id}] PERSISTENT Response - Status: {response.status_code}, Content-Length: {response.get('Content-Length', 'MISSING')}")
if 'Content-Range' in response:
logger.info(f"[{client_id}] PERSISTENT Content-Range: {response['Content-Range']}")
# Log VLC seeking status
if response.status_code == 200:
if connection_headers['content_length']:
logger.info(f"[{client_id}] ✅ PERSISTENT VLC SEEKING: Full response with Content-Length - seeking should work!")
else:
logger.info(f"[{client_id}] ❌ PERSISTENT VLC SEEKING: Full response but no Content-Length - seeking won't work!")
elif response.status_code == 206:
logger.info(f"[{client_id}] ✅ PERSISTENT VLC SEEKING: Partial response - seeking is working!")
return response
except Exception as e:
logger.error(f"Error in persistent stream_content_with_session: {e}", exc_info=True)
# Cleanup persistent connection on error
if session_id in self._persistent_connections:
self._persistent_connections[session_id].cleanup()
del self._persistent_connections[session_id]
return HttpResponse(f"Streaming error: {str(e)}", status=500)
def _apply_timeshift_parameters(self, original_url, utc_start=None, utc_end=None, offset=None):
"""
Apply timeshift parameters to the stream URL
Args:
original_url: Original stream URL
utc_start: UTC start time (ISO format string)
utc_end: UTC end time (ISO format string)
offset: Offset in seconds
Returns:
Modified URL with timeshift parameters
"""
try:
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
parsed_url = urlparse(original_url)
query_params = parse_qs(parsed_url.query)
logger.debug(f"Original URL: {original_url}")
logger.debug(f"Original query params: {query_params}")
# Add timeshift parameters if provided
if utc_start:
# Support both utc_start and start parameter names
query_params['utc_start'] = [utc_start]
query_params['start'] = [utc_start] # Some providers use 'start'
logger.info(f"Added utc_start/start parameter: {utc_start}")
if utc_end:
# Support both utc_end and end parameter names
query_params['utc_end'] = [utc_end]
query_params['end'] = [utc_end] # Some providers use 'end'
logger.info(f"Added utc_end/end parameter: {utc_end}")
if offset:
try:
# Ensure offset is a valid number
offset_seconds = int(offset)
# Support multiple offset parameter names
query_params['offset'] = [str(offset_seconds)]
query_params['seek'] = [str(offset_seconds)] # Some providers use 'seek'
query_params['t'] = [str(offset_seconds)] # Some providers use 't'
logger.info(f"Added offset/seek/t parameter: {offset_seconds} seconds")
except (ValueError, TypeError):
logger.warning(f"Invalid offset value: {offset}, skipping")
# Handle special URL patterns for VOD providers
# Some providers embed timeshift info in the path rather than query params
path = parsed_url.path
# Check if this looks like an IPTV catchup URL pattern
catchup_pattern = r'/(\d{4}-\d{2}-\d{2})/(\d{2}-\d{2}-\d{2})'
if utc_start and re.search(catchup_pattern, path):
# Convert ISO format to provider-specific format if needed
try:
from datetime import datetime
start_dt = datetime.fromisoformat(utc_start.replace('Z', '+00:00'))
date_part = start_dt.strftime('%Y-%m-%d')
time_part = start_dt.strftime('%H-%M-%S')
# Replace existing date/time in path
path = re.sub(catchup_pattern, f'/{date_part}/{time_part}', path)
logger.info(f"Modified path for catchup: {path}")
except Exception as e:
logger.warning(f"Could not parse timeshift date: {e}")
# Reconstruct URL with new parameters
new_query = urlencode(query_params, doseq=True)
modified_url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
path, # Use potentially modified path
parsed_url.params,
new_query,
parsed_url.fragment
))
logger.info(f"Modified URL: {modified_url}")
return modified_url
except Exception as e:
logger.error(f"Error applying timeshift parameters: {e}")
return original_url
def cleanup_persistent_connection(self, session_id: str):
"""Clean up a specific persistent connection"""
if session_id in self._persistent_connections:
logger.info(f"[{session_id}] Cleaning up persistent connection")
self._persistent_connections[session_id].cleanup()
del self._persistent_connections[session_id]
# Clean up ALL Redis keys associated with this session
session_key = f"vod_session:{session_id}"
if self.redis_client:
try:
session_data = self.redis_client.hgetall(session_key)
if session_data:
# Get session details for connection cleanup
content_type = session_data.get(b'content_type', b'').decode('utf-8')
content_uuid = session_data.get(b'content_uuid', b'').decode('utf-8')
profile_id = session_data.get(b'profile_id')
# Generate client_id from session_id (matches what's used during streaming)
client_id = session_id
# Remove individual connection tracking keys created during streaming
if content_type and content_uuid:
logger.info(f"[{session_id}] Cleaning up connection tracking keys")
self.remove_connection(content_type, content_uuid, client_id)
# Remove from profile connections if counted (additional safety check)
if session_data.get(b'connection_counted') == b'True' and profile_id:
profile_key = self._get_profile_connections_key(int(profile_id.decode('utf-8')))
current_count = int(self.redis_client.get(profile_key) or 0)
if current_count > 0:
self.redis_client.decr(profile_key)
logger.info(f"[{session_id}] Decremented profile {profile_id.decode('utf-8')} connections")
# Remove session tracking key
self.redis_client.delete(session_key)
logger.info(f"[{session_id}] Removed session tracking")
# Clean up any additional session-related keys (pattern cleanup)
try:
# Look for any other keys that might be related to this session
pattern = f"*{session_id}*"
cursor = 0
session_related_keys = []
while True:
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
session_related_keys.extend(keys)
if cursor == 0:
break
if session_related_keys:
# Filter out keys we already deleted
remaining_keys = [k for k in session_related_keys if k.decode('utf-8') != session_key]
if remaining_keys:
self.redis_client.delete(*remaining_keys)
logger.info(f"[{session_id}] Cleaned up {len(remaining_keys)} additional session-related keys")
except Exception as scan_error:
logger.warning(f"[{session_id}] Error during pattern cleanup: {scan_error}")
except Exception as e:
logger.error(f"[{session_id}] Error cleaning up session: {e}")
def cleanup_stale_persistent_connections(self, max_age_seconds: int = 1800):
"""Clean up stale persistent connections that haven't been used recently"""
current_time = time.time()
stale_sessions = []
for session_id, conn in self._persistent_connections.items():
try:
# Check connection's last activity time first
if hasattr(conn, 'last_activity'):
time_since_last_activity = current_time - conn.last_activity
if time_since_last_activity > max_age_seconds:
logger.info(f"[{session_id}] Connection inactive for {time_since_last_activity:.1f}s (max: {max_age_seconds}s)")
stale_sessions.append(session_id)
continue
# Fallback to Redis session data if connection doesn't have last_activity
session_key = f"vod_session:{session_id}"
if self.redis_client:
session_data = self.redis_client.hgetall(session_key)
if session_data:
created_at = float(session_data.get(b'created_at', b'0').decode('utf-8'))
if current_time - created_at > max_age_seconds:
logger.info(f"[{session_id}] Session older than {max_age_seconds}s")
stale_sessions.append(session_id)
else:
# Session data missing, connection is stale
logger.info(f"[{session_id}] Session data missing from Redis")
stale_sessions.append(session_id)
except Exception as e:
logger.error(f"[{session_id}] Error checking session age: {e}")
stale_sessions.append(session_id)
# Clean up stale connections
for session_id in stale_sessions:
logger.info(f"[{session_id}] Cleaning up stale persistent connection")
self.cleanup_persistent_connection(session_id)
if stale_sessions:
logger.info(f"Cleaned up {len(stale_sessions)} stale persistent connections")
else:
logger.debug(f"No stale persistent connections found (checked {len(self._persistent_connections)} connections)")
# Global instance
_connection_manager = None
def get_connection_manager() -> VODConnectionManager:
"""Get the global VOD connection manager instance"""
global _connection_manager
if _connection_manager is None:
_connection_manager = VODConnectionManager()
return _connection_manager