Track VOD connections in Redis.

This commit is contained in:
SergeantPanda 2025-08-05 21:24:41 -05:00
parent 5e7987ce1a
commit d18817acb0
5 changed files with 362 additions and 35 deletions

View file

@ -10,6 +10,7 @@ import gc # Add import for garbage collection
from core.utils import RedisClient
from apps.proxy.ts_proxy.channel_status import ChannelStatus
from core.utils import send_websocket_update
from apps.proxy.vod_proxy.connection_manager import get_connection_manager
logger = logging.getLogger(__name__)
@ -59,3 +60,13 @@ def fetch_channel_stats():
# Explicitly clean up large data structures
all_channels = None
gc.collect()
@shared_task
def cleanup_vod_connections():
"""Clean up stale VOD connections"""
try:
connection_manager = get_connection_manager()
connection_manager.cleanup_stale_connections(max_age_seconds=3600) # 1 hour
logger.info("VOD connection cleanup completed")
except Exception as e:
logger.error(f"Error in VOD connection cleanup: {e}", exc_info=True)

View file

@ -5,4 +5,5 @@ app_name = 'proxy'
urlpatterns = [
path('ts/', include('apps.proxy.ts_proxy.urls')),
path('hls/', include('apps.proxy.hls_proxy.urls')),
path('vod/', include('apps.proxy.vod_proxy.urls')),
]

View file

@ -0,0 +1,306 @@
"""
VOD Connection Manager - Redis-based connection tracking for VOD streams
"""
import time
import json
import logging
import threading
from typing import Optional, Dict, Any
from core.utils import RedisClient
from apps.vod.models import Movie, Episode
from apps.m3u.models import M3UAccountProfile
logger = logging.getLogger("vod_proxy")
class VODConnectionManager:
"""Manages VOD connections using Redis for tracking"""
def __init__(self):
self.redis_client = RedisClient.get_client()
self.connection_ttl = 3600 # 1 hour TTL for connections
def _get_connection_key(self, content_type: str, content_uuid: str, client_id: str) -> str:
"""Get Redis key for a specific connection"""
return f"vod_proxy:connection:{content_type}:{content_uuid}:{client_id}"
def _get_profile_connections_key(self, profile_id: int) -> str:
"""Get Redis key for tracking connections per profile"""
return f"vod_proxy:profile:{profile_id}:connections"
def _get_content_connections_key(self, content_type: str, content_uuid: str) -> str:
"""Get Redis key for tracking connections per content"""
return f"vod_proxy:content:{content_type}:{content_uuid}:connections"
def create_connection(self, content_type: str, content_uuid: str, content_name: str,
client_id: str, client_ip: str, user_agent: str,
m3u_profile: M3UAccountProfile) -> bool:
"""
Create a new VOD connection with profile limit checking
Returns:
bool: True if connection was created, False if profile limit exceeded
"""
if not self.redis_client:
logger.error("Redis client not available for VOD connection tracking")
return False
try:
# Check profile connection limits
if not self._check_profile_limits(m3u_profile):
logger.warning(f"Profile {m3u_profile.name} connection limit exceeded")
return False
connection_key = self._get_connection_key(content_type, content_uuid, client_id)
profile_connections_key = self._get_profile_connections_key(m3u_profile.id)
content_connections_key = self._get_content_connections_key(content_type, content_uuid)
# Connection data
connection_data = {
"content_type": content_type,
"content_uuid": content_uuid,
"content_name": content_name,
"client_id": client_id,
"client_ip": client_ip,
"user_agent": user_agent,
"m3u_profile_id": m3u_profile.id,
"m3u_profile_name": m3u_profile.name,
"connected_at": str(time.time()),
"last_activity": str(time.time()),
"bytes_sent": "0",
"position_seconds": "0"
}
# Use pipeline for atomic operations
pipe = self.redis_client.pipeline()
# Store connection data
pipe.hset(connection_key, mapping=connection_data)
pipe.expire(connection_key, self.connection_ttl)
# Add to profile connections set
pipe.sadd(profile_connections_key, client_id)
pipe.expire(profile_connections_key, self.connection_ttl)
# Add to content connections set
pipe.sadd(content_connections_key, client_id)
pipe.expire(content_connections_key, self.connection_ttl)
# Execute all operations
pipe.execute()
logger.info(f"Created VOD connection: {client_id} for {content_type} {content_name}")
return True
except Exception as e:
logger.error(f"Error creating VOD connection: {e}")
return False
def _check_profile_limits(self, m3u_profile: M3UAccountProfile) -> bool:
"""Check if profile has available connection slots"""
if m3u_profile.max_streams == 0: # Unlimited
return True
try:
profile_connections_key = self._get_profile_connections_key(m3u_profile.id)
current_connections = self.redis_client.scard(profile_connections_key) or 0
return current_connections < m3u_profile.max_streams
except Exception as e:
logger.error(f"Error checking profile limits: {e}")
return False
def update_connection_activity(self, content_type: str, content_uuid: str,
client_id: str, bytes_sent: int = 0,
position_seconds: int = 0) -> bool:
"""Update connection activity"""
if not self.redis_client:
return False
try:
connection_key = self._get_connection_key(content_type, content_uuid, client_id)
update_data = {
"last_activity": str(time.time())
}
if bytes_sent > 0:
# Get current bytes and add to it
current_bytes = self.redis_client.hget(connection_key, "bytes_sent")
if current_bytes:
total_bytes = int(current_bytes.decode('utf-8')) + bytes_sent
else:
total_bytes = bytes_sent
update_data["bytes_sent"] = str(total_bytes)
if position_seconds > 0:
update_data["position_seconds"] = str(position_seconds)
# Update connection data
self.redis_client.hset(connection_key, mapping=update_data)
self.redis_client.expire(connection_key, self.connection_ttl)
return True
except Exception as e:
logger.error(f"Error updating connection activity: {e}")
return False
def remove_connection(self, content_type: str, content_uuid: str, client_id: str) -> bool:
"""Remove a VOD connection"""
if not self.redis_client:
return False
try:
connection_key = self._get_connection_key(content_type, content_uuid, client_id)
# Get connection data before removing
connection_data = self.redis_client.hgetall(connection_key)
if not connection_data:
return True # Already removed
# Get profile ID for cleanup
profile_id = None
if b"m3u_profile_id" in connection_data:
try:
profile_id = int(connection_data[b"m3u_profile_id"].decode('utf-8'))
except ValueError:
pass
# Use pipeline for atomic cleanup
pipe = self.redis_client.pipeline()
# Remove connection data
pipe.delete(connection_key)
# Remove from profile connections set
if profile_id:
profile_connections_key = self._get_profile_connections_key(profile_id)
pipe.srem(profile_connections_key, client_id)
# Remove from content connections set
content_connections_key = self._get_content_connections_key(content_type, content_uuid)
pipe.srem(content_connections_key, client_id)
# Execute cleanup
pipe.execute()
logger.info(f"Removed VOD connection: {client_id}")
return True
except Exception as e:
logger.error(f"Error removing connection: {e}")
return False
def get_connection_info(self, content_type: str, content_uuid: str, client_id: str) -> Optional[Dict[str, Any]]:
"""Get connection information"""
if not self.redis_client:
return None
try:
connection_key = self._get_connection_key(content_type, content_uuid, client_id)
connection_data = self.redis_client.hgetall(connection_key)
if not connection_data:
return None
# Convert bytes to strings and parse numbers
info = {}
for key, value in connection_data.items():
key_str = key.decode('utf-8')
value_str = value.decode('utf-8')
# Parse numeric fields
if key_str in ['connected_at', 'last_activity']:
info[key_str] = float(value_str)
elif key_str in ['bytes_sent', 'position_seconds', 'm3u_profile_id']:
info[key_str] = int(value_str)
else:
info[key_str] = value_str
return info
except Exception as e:
logger.error(f"Error getting connection info: {e}")
return None
def get_profile_connections(self, profile_id: int) -> int:
"""Get current connection count for a profile"""
if not self.redis_client:
return 0
try:
profile_connections_key = self._get_profile_connections_key(profile_id)
return self.redis_client.scard(profile_connections_key) or 0
except Exception as e:
logger.error(f"Error getting profile connections: {e}")
return 0
def get_content_connections(self, content_type: str, content_uuid: str) -> int:
"""Get current connection count for content"""
if not self.redis_client:
return 0
try:
content_connections_key = self._get_content_connections_key(content_type, content_uuid)
return self.redis_client.scard(content_connections_key) or 0
except Exception as e:
logger.error(f"Error getting content connections: {e}")
return 0
def cleanup_stale_connections(self, max_age_seconds: int = 3600):
"""Clean up stale connections that haven't been active recently"""
if not self.redis_client:
return
try:
pattern = "vod_proxy:connection:*"
cursor = 0
cleaned = 0
current_time = time.time()
while True:
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
for key in keys:
try:
key_str = key.decode('utf-8')
last_activity = self.redis_client.hget(key, "last_activity")
if last_activity:
last_activity_time = float(last_activity.decode('utf-8'))
if current_time - last_activity_time > max_age_seconds:
# Extract info for cleanup
parts = key_str.split(':')
if len(parts) >= 5:
content_type = parts[2]
content_uuid = parts[3]
client_id = parts[4]
self.remove_connection(content_type, content_uuid, client_id)
cleaned += 1
except Exception as e:
logger.error(f"Error processing key {key}: {e}")
if cursor == 0:
break
if cleaned > 0:
logger.info(f"Cleaned up {cleaned} stale VOD connections")
except Exception as e:
logger.error(f"Error during connection cleanup: {e}")
# Global instance
_connection_manager = None
def get_connection_manager() -> VODConnectionManager:
"""Get the global VOD connection manager instance"""
global _connection_manager
if _connection_manager is None:
_connection_manager = VODConnectionManager()
return _connection_manager

View file

@ -5,13 +5,12 @@ import requests
from django.http import StreamingHttpResponse, JsonResponse
from django.shortcuts import get_object_or_404
from django.views.decorators.csrf import csrf_exempt
from django.contrib.contenttypes.models import ContentType
from rest_framework.decorators import api_view
from apps.vod.models import Movie, Episode, VODConnection
from apps.m3u.models import M3UAccountProfile
from apps.vod.models import Movie, Episode
from dispatcharr.utils import network_access_allowed, get_client_ip
from core.models import UserAgent, CoreSettings
from .connection_manager import get_connection_manager
logger = logging.getLogger(__name__)
@ -47,12 +46,15 @@ def _stream_content(request, model_class, content_uuid, content_type_name):
logger.info(f"[{client_id}] VOD stream request for: {content.name}")
try:
# Get connection manager
connection_manager = get_connection_manager()
# Get available M3U profile for connection management
m3u_account = content.m3u_account
available_profile = None
for profile in m3u_account.profiles.filter(is_active=True):
current_connections = VODConnection.objects.filter(m3u_profile=profile).count()
current_connections = connection_manager.get_profile_connections(profile.id)
if profile.max_streams == 0 or current_connections < profile.max_streams:
available_profile = profile
break
@ -63,17 +65,23 @@ def _stream_content(request, model_class, content_uuid, content_type_name):
status=503
)
# Create connection tracking record using generic foreign key
content_type = ContentType.objects.get_for_model(content)
connection = VODConnection.objects.create(
content_type=content_type,
object_id=content.id,
m3u_profile=available_profile,
# Create connection tracking record in Redis
connection_created = connection_manager.create_connection(
content_type=content_type_name,
content_uuid=str(content_uuid),
content_name=content.name,
client_id=client_id,
client_ip=client_ip,
user_agent=client_user_agent
user_agent=client_user_agent,
m3u_profile=available_profile
)
if not connection_created:
return JsonResponse(
{"error": "Failed to create connection tracking"},
status=503
)
# Get user agent for upstream request
try:
user_agent_obj = m3u_account.get_user_agent()
@ -105,7 +113,7 @@ def _stream_content(request, model_class, content_uuid, content_type_name):
if response.status_code not in [200, 206]:
logger.error(f"[{client_id}] Upstream error: {response.status_code}")
connection.delete()
connection_manager.remove_connection(content_type_name, str(content_uuid), client_id)
return JsonResponse(
{"error": f"Upstream server error: {response.status_code}"},
status=response.status_code
@ -127,21 +135,19 @@ def _stream_content(request, model_class, content_uuid, content_type_name):
# Update connection activity periodically
if bytes_sent % (8192 * 10) == 0: # Every ~80KB
try:
connection.update_activity(bytes_sent=len(chunk))
except VODConnection.DoesNotExist:
# Connection was cleaned up, stop streaming
break
connection_manager.update_connection_activity(
content_type_name,
str(content_uuid),
client_id,
bytes_sent=len(chunk)
)
except Exception as e:
logger.error(f"[{client_id}] Streaming error: {e}")
finally:
# Clean up connection when streaming ends
try:
connection.delete()
logger.info(f"[{client_id}] Connection cleaned up")
except VODConnection.DoesNotExist:
pass
connection_manager.remove_connection(content_type_name, str(content_uuid), client_id)
logger.info(f"[{client_id}] Connection cleaned up")
# Build response with appropriate headers
streaming_response = StreamingHttpResponse(
@ -166,7 +172,7 @@ def _stream_content(request, model_class, content_uuid, content_type_name):
except requests.RequestException as e:
logger.error(f"[{client_id}] Request error: {e}")
connection.delete()
connection_manager.remove_connection(content_type_name, str(content_uuid), client_id)
return JsonResponse(
{"error": "Failed to connect to upstream server"},
status=502
@ -184,17 +190,17 @@ def _stream_content(request, model_class, content_uuid, content_type_name):
@api_view(["POST"])
def update_movie_position(request, movie_uuid):
"""Update playback position for a movie"""
return _update_position(request, Movie, movie_uuid)
return _update_position(request, Movie, movie_uuid, "movie")
@csrf_exempt
@api_view(["POST"])
def update_episode_position(request, episode_uuid):
"""Update playback position for an episode"""
return _update_position(request, Episode, episode_uuid)
return _update_position(request, Episode, episode_uuid, "episode")
def _update_position(request, model_class, content_uuid):
def _update_position(request, model_class, content_uuid, content_type_name):
"""Generic function to update playback position"""
if not network_access_allowed(request, "STREAMS"):
@ -208,18 +214,21 @@ def _update_position(request, model_class, content_uuid):
try:
content = get_object_or_404(model_class, uuid=content_uuid)
content_type = ContentType.objects.get_for_model(content)
connection = VODConnection.objects.get(
content_type=content_type,
object_id=content.id,
client_id=client_id
connection_manager = get_connection_manager()
# Update position in Redis
success = connection_manager.update_connection_activity(
content_type_name,
str(content_uuid),
client_id,
position_seconds=position
)
connection.update_activity(position=position)
if not success:
return JsonResponse({"error": "Connection not found"}, status=404)
return JsonResponse({"status": "success"})
except VODConnection.DoesNotExist:
return JsonResponse({"error": "Connection not found"}, status=404)
except Exception as e:
logger.error(f"Position update error: {e}")
return JsonResponse({"error": "Internal server error"}, status=500)

View file

@ -66,7 +66,7 @@ urlpatterns = [
# Optionally, serve the raw Swagger JSON
path("swagger.json", schema_view.without_ui(cache_timeout=0), name="schema-json"),
path("proxy/vod/", include("apps.proxy.vod_proxy.urls")),
# VOD proxy is now handled by the main proxy URLs above
# Catch-all routes should always be last
path("", TemplateView.as_view(template_name="index.html")), # React entry point
path("<path:unused_path>", TemplateView.as_view(template_name="index.html")),