Refactor stats and vod proxy

This commit is contained in:
SergeantPanda 2025-09-05 19:30:13 -05:00
parent 4712a4305c
commit 1080b1fb94
6 changed files with 781 additions and 110 deletions

View file

@ -9,12 +9,3 @@ class ChannelsConfig(AppConfig):
def ready(self):
# Import signals so they get registered.
import apps.channels.signals
# Kick off DVR recovery shortly after startup (idempotent via Redis lock)
try:
from .tasks import recover_recordings_on_startup
# Schedule with a short delay to allow migrations/DB readiness
recover_recordings_on_startup.apply_async(countdown=5)
except Exception:
# Avoid hard failures at startup if Celery isn't ready yet
pass

View file

@ -25,7 +25,13 @@ class SerializableConnectionState:
def __init__(self, session_id: str, stream_url: str, headers: dict,
content_length: str = None, content_type: str = 'video/mp4',
final_url: str = None, m3u_profile_id: int = None):
final_url: str = None, m3u_profile_id: int = None,
# Session metadata fields (previously stored in vod_session key)
content_obj_type: str = None, content_uuid: str = None,
content_name: str = None, client_ip: str = None,
user_agent: str = None, utc_start: str = None,
utc_end: str = None, offset: str = None,
worker_id: str = None, connection_type: str = "redis_backed"):
self.session_id = session_id
self.stream_url = stream_url
self.headers = headers
@ -37,6 +43,23 @@ class SerializableConnectionState:
self.request_count = 0
self.active_streams = 0
# Session metadata (consolidated from vod_session key)
self.content_obj_type = content_obj_type
self.content_uuid = content_uuid
self.content_name = content_name
self.client_ip = client_ip
self.user_agent = user_agent
self.utc_start = utc_start or ""
self.utc_end = utc_end or ""
self.offset = offset or ""
self.worker_id = worker_id
self.connection_type = connection_type
self.created_at = time.time()
# Additional tracking fields
self.bytes_sent = 0
self.position_seconds = 0
def to_dict(self):
"""Convert to dictionary for Redis storage"""
return {
@ -49,7 +72,22 @@ class SerializableConnectionState:
'm3u_profile_id': str(self.m3u_profile_id) if self.m3u_profile_id is not None else '',
'last_activity': str(self.last_activity),
'request_count': str(self.request_count),
'active_streams': str(self.active_streams)
'active_streams': str(self.active_streams),
# Session metadata
'content_obj_type': self.content_obj_type or '',
'content_uuid': self.content_uuid or '',
'content_name': self.content_name or '',
'client_ip': self.client_ip or '',
'user_agent': self.user_agent or '',
'utc_start': self.utc_start or '',
'utc_end': self.utc_end or '',
'offset': self.offset or '',
'worker_id': self.worker_id or '',
'connection_type': self.connection_type or 'redis_backed',
'created_at': str(self.created_at),
# Additional tracking fields
'bytes_sent': str(self.bytes_sent),
'position_seconds': str(self.position_seconds)
}
@classmethod
@ -62,11 +100,26 @@ class SerializableConnectionState:
content_length=data.get('content_length') if data.get('content_length') else None,
content_type=data.get('content_type', 'video/mp4'),
final_url=data.get('final_url') if data.get('final_url') else None,
m3u_profile_id=int(data.get('m3u_profile_id')) if data.get('m3u_profile_id') else None
m3u_profile_id=int(data.get('m3u_profile_id')) if data.get('m3u_profile_id') else None,
# Session metadata
content_obj_type=data.get('content_obj_type') or None,
content_uuid=data.get('content_uuid') or None,
content_name=data.get('content_name') or None,
client_ip=data.get('client_ip') or None,
user_agent=data.get('user_agent') or None,
utc_start=data.get('utc_start') or '',
utc_end=data.get('utc_end') or '',
offset=data.get('offset') or '',
worker_id=data.get('worker_id') or None,
connection_type=data.get('connection_type', 'redis_backed')
)
obj.last_activity = float(data.get('last_activity', time.time()))
obj.request_count = int(data.get('request_count', 0))
obj.active_streams = int(data.get('active_streams', 0))
obj.created_at = float(data.get('created_at', time.time()))
# Additional tracking fields
obj.bytes_sent = int(data.get('bytes_sent', 0))
obj.position_seconds = int(data.get('position_seconds', 0))
return obj
@ -108,7 +161,7 @@ class RedisBackedVODConnection:
try:
data = state.to_dict()
# Log the data being saved for debugging
logger.debug(f"[{self.session_id}] Saving connection state: {data}")
logger.trace(f"[{self.session_id}] Saving connection state: {data}")
# Verify all values are valid for Redis
for key, value in data.items():
@ -144,8 +197,14 @@ class RedisBackedVODConnection:
except Exception as e:
logger.error(f"[{self.session_id}] Error releasing lock: {e}")
def create_connection(self, stream_url: str, headers: dict, m3u_profile_id: int = None) -> bool:
"""Create a new connection state in Redis"""
def create_connection(self, stream_url: str, headers: dict, m3u_profile_id: int = None,
# Session metadata (consolidated from vod_session key)
content_obj_type: str = None, content_uuid: str = None,
content_name: str = None, client_ip: str = None,
user_agent: str = None, utc_start: str = None,
utc_end: str = None, offset: str = None,
worker_id: str = None) -> bool:
"""Create a new connection state in Redis with consolidated session metadata"""
if not self._acquire_lock():
logger.warning(f"[{self.session_id}] Could not acquire lock for connection creation")
return False
@ -157,12 +216,27 @@ class RedisBackedVODConnection:
logger.info(f"[{self.session_id}] Connection already exists in Redis")
return True
# Create new connection state
state = SerializableConnectionState(self.session_id, stream_url, headers, m3u_profile_id=m3u_profile_id)
# Create new connection state with consolidated session metadata
state = SerializableConnectionState(
session_id=self.session_id,
stream_url=stream_url,
headers=headers,
m3u_profile_id=m3u_profile_id,
# Session metadata
content_obj_type=content_obj_type,
content_uuid=content_uuid,
content_name=content_name,
client_ip=client_ip,
user_agent=user_agent,
utc_start=utc_start,
utc_end=utc_end,
offset=offset,
worker_id=worker_id
)
success = self._save_connection_state(state)
if success:
logger.info(f"[{self.session_id}] Created new connection state in Redis")
logger.info(f"[{self.session_id}] Created new connection state in Redis with consolidated session metadata")
return success
finally:
@ -325,6 +399,31 @@ class RedisBackedVODConnection:
}
return {}
def get_session_metadata(self):
"""Get session metadata from consolidated connection state"""
state = self._get_connection_state()
if state:
return {
'content_obj_type': state.content_obj_type,
'content_uuid': state.content_uuid,
'content_name': state.content_name,
'client_ip': state.client_ip,
'user_agent': state.user_agent,
'utc_start': state.utc_start,
'utc_end': state.utc_end,
'offset': state.offset,
'worker_id': state.worker_id,
'connection_type': state.connection_type,
'created_at': state.created_at,
'last_activity': state.last_activity,
'm3u_profile_id': state.m3u_profile_id,
'bytes_sent': state.bytes_sent,
'position_seconds': state.position_seconds,
'active_streams': state.active_streams,
'request_count': state.request_count
}
return {}
def cleanup(self, connection_manager=None):
"""Clean up local resources and Redis state"""
# Get connection state before cleanup to handle profile decrementing
@ -340,44 +439,19 @@ class RedisBackedVODConnection:
# Remove from Redis
if self.redis_client:
try:
# Get session information for cleanup
session_key = f"vod_session:{self.session_id}"
session_data = self.redis_client.hgetall(session_key)
# Convert bytes to strings if needed
if session_data and isinstance(list(session_data.keys())[0], bytes):
session_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in session_data.items()}
# Use pipeline for atomic cleanup operations
pipe = self.redis_client.pipeline()
# 1. Remove main connection state
# 1. Remove main connection state (now contains consolidated data)
pipe.delete(self.connection_key)
# 2. Remove distributed lock
pipe.delete(self.lock_key)
# 3. Remove session tracking
pipe.delete(session_key)
# 4. Clean up legacy vod_proxy connection keys if session data exists
if session_data:
content_type = session_data.get('content_type')
content_uuid = session_data.get('content_uuid')
if content_type and content_uuid:
# Remove from vod_proxy connection tracking
vod_proxy_connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{self.session_id}"
pipe.delete(vod_proxy_connection_key)
# Remove from content connections set
content_connections_key = f"vod_proxy:content:{content_type}:{content_uuid}:connections"
pipe.srem(content_connections_key, self.session_id)
# Execute all cleanup operations
pipe.execute()
logger.info(f"[{self.session_id}] Cleaned up all Redis keys (connection, session, locks)")
logger.info(f"[{self.session_id}] Cleaned up all Redis keys (consolidated connection state, locks)")
# Decrement profile connections if we have the state and connection manager
if state and state.m3u_profile_id and connection_manager:
@ -479,8 +553,28 @@ class MultiWorkerVODConnectionManager:
logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed streaming request for {content_type} {content_name}")
try:
# First, try to find an existing idle session that matches our criteria
matching_session_id = self.find_matching_idle_session(
content_type=content_type,
content_uuid=content_uuid,
client_ip=client_ip,
user_agent=user_agent,
utc_start=utc_start,
utc_end=utc_end,
offset=offset
)
# Use matching session if found, otherwise use the provided session_id
if matching_session_id:
logger.info(f"[{client_id}] Worker {self.worker_id} - Found matching idle session: {matching_session_id}")
effective_session_id = matching_session_id
client_id = matching_session_id # Update client_id for logging consistency
else:
logger.info(f"[{client_id}] Worker {self.worker_id} - No matching idle session found, using new session")
effective_session_id = session_id
# Create Redis-backed connection
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
redis_connection = RedisBackedVODConnection(effective_session_id, self.redis_client)
# Check if connection exists, create if not
existing_state = redis_connection._get_connection_state()
@ -515,48 +609,42 @@ class MultiWorkerVODConnectionManager:
# Add worker identification
headers['X-Worker-ID'] = self.worker_id
# Create connection state in Redis
if not redis_connection.create_connection(modified_stream_url, headers, m3u_profile.id):
# Create connection state in Redis with consolidated session metadata
if not redis_connection.create_connection(
stream_url=modified_stream_url,
headers=headers,
m3u_profile_id=m3u_profile.id,
# Session metadata (consolidated from separate vod_session key)
content_obj_type=content_type,
content_uuid=content_uuid,
content_name=content_name,
client_ip=client_ip,
user_agent=user_agent,
utc_start=utc_start,
utc_end=utc_end,
offset=str(offset) if offset else None,
worker_id=self.worker_id
):
logger.error(f"[{client_id}] Worker {self.worker_id} - Failed to create Redis connection")
return HttpResponse("Failed to create connection", status=500)
# Increment profile connections after successful connection creation
self._increment_profile_connections(m3u_profile)
# Create session tracking
session_info = {
"content_type": content_type,
"content_uuid": content_uuid,
"content_name": content_name,
"created_at": str(time.time()),
"last_activity": str(time.time()),
"profile_id": str(m3u_profile.id),
"client_ip": client_ip,
"user_agent": user_agent,
"utc_start": utc_start or "",
"utc_end": utc_end or "",
"offset": str(offset) if offset else "",
"worker_id": self.worker_id, # Track which worker created this
"connection_type": "redis_backed"
}
session_key = f"vod_session:{session_id}"
if self.redis_client:
self.redis_client.hset(session_key, mapping=session_info)
self.redis_client.expire(session_key, self.session_ttl)
logger.info(f"[{client_id}] Worker {self.worker_id} - Created session: {session_info}")
logger.info(f"[{client_id}] Worker {self.worker_id} - Created consolidated connection with session metadata")
else:
logger.info(f"[{client_id}] Worker {self.worker_id} - Using existing Redis-backed connection")
# Update session activity
session_key = f"vod_session:{session_id}"
if self.redis_client:
self.redis_client.hset(session_key, mapping={
"last_activity": str(time.time()),
"last_worker_id": self.worker_id # Track which worker last accessed this
})
self.redis_client.expire(session_key, self.session_ttl)
# Update session activity in consolidated connection state
if redis_connection._acquire_lock():
try:
state = redis_connection._get_connection_state()
if state:
state.last_activity = time.time()
state.worker_id = self.worker_id # Track which worker last accessed this
redis_connection._save_connection_state(state)
finally:
redis_connection._release_lock()
# Get stream from Redis-backed connection
upstream_response = redis_connection.get_stream(range_header)
@ -586,14 +674,19 @@ class MultiWorkerVODConnectionManager:
bytes_sent += len(chunk)
chunk_count += 1
# Update activity every 100 chunks
# Update activity every 100 chunks in consolidated connection state
if chunk_count % 100 == 0:
self.update_connection_activity(
content_type=content_type,
content_uuid=content_uuid,
client_id=client_id,
bytes_sent=len(chunk)
)
# Update the connection state
if redis_connection._acquire_lock():
try:
state = redis_connection._get_connection_state()
if state:
state.last_activity = time.time()
# Store cumulative bytes sent in connection state
state.bytes_sent = bytes_sent # Use cumulative bytes_sent, not chunk size
redis_connection._save_connection_state(state)
finally:
redis_connection._release_lock()
logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed stream completed: {bytes_sent} bytes sent")
redis_connection.decrement_active_streams()
@ -925,13 +1018,13 @@ class MultiWorkerVODConnectionManager:
def find_matching_idle_session(self, content_type: str, content_uuid: str,
client_ip: str, user_agent: str,
utc_start=None, utc_end=None, offset=None) -> Optional[str]:
"""Find existing Redis-backed session that matches criteria"""
"""Find existing Redis-backed session that matches criteria using consolidated connection state"""
if not self.redis_client:
return None
try:
# Search for sessions with matching content
pattern = "vod_session:*"
# Search for connections with consolidated session data
pattern = "vod_persistent_connection:*"
cursor = 0
matching_sessions = []
@ -940,23 +1033,23 @@ class MultiWorkerVODConnectionManager:
for key in keys:
try:
session_data = self.redis_client.hgetall(key)
if not session_data:
connection_data = self.redis_client.hgetall(key)
if not connection_data:
continue
# Convert bytes keys/values to strings if needed
if isinstance(list(session_data.keys())[0], bytes):
session_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in session_data.items()}
if isinstance(list(connection_data.keys())[0], bytes):
connection_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in connection_data.items()}
# Check if content matches
stored_content_type = session_data.get('content_type', '')
stored_content_uuid = session_data.get('content_uuid', '')
# Check if content matches (using consolidated data)
stored_content_type = connection_data.get('content_obj_type', '')
stored_content_uuid = connection_data.get('content_uuid', '')
if stored_content_type != content_type or stored_content_uuid != content_uuid:
continue
# Extract session ID
session_id = key.decode('utf-8').replace('vod_session:', '')
session_id = key.decode('utf-8').replace('vod_persistent_connection:', '')
# Check if Redis-backed connection exists and has no active streams
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
@ -967,9 +1060,9 @@ class MultiWorkerVODConnectionManager:
score = 10 # Content match
match_reasons = ["content"]
# Check other criteria
stored_client_ip = session_data.get('client_ip', '')
stored_user_agent = session_data.get('user_agent', '')
# Check other criteria (using consolidated data)
stored_client_ip = connection_data.get('client_ip', '')
stored_user_agent = connection_data.get('user_agent', '')
if stored_client_ip and stored_client_ip == client_ip:
score += 5
@ -979,10 +1072,10 @@ class MultiWorkerVODConnectionManager:
score += 3
match_reasons.append("user-agent")
# Check timeshift parameters
stored_utc_start = session_data.get('utc_start', '')
stored_utc_end = session_data.get('utc_end', '')
stored_offset = session_data.get('offset', '')
# Check timeshift parameters (using consolidated data)
stored_utc_start = connection_data.get('utc_start', '')
stored_utc_end = connection_data.get('utc_end', '')
stored_offset = connection_data.get('offset', '')
current_utc_start = utc_start or ""
current_utc_end = utc_end or ""
@ -999,11 +1092,11 @@ class MultiWorkerVODConnectionManager:
'session_id': session_id,
'score': score,
'reasons': match_reasons,
'last_activity': float(session_data.get('last_activity', '0'))
'last_activity': float(connection_data.get('last_activity', '0'))
})
except Exception as e:
logger.debug(f"Error processing session key {key}: {e}")
logger.debug(f"Error processing connection key {key}: {e}")
continue
if cursor == 0:
@ -1023,3 +1116,15 @@ class MultiWorkerVODConnectionManager:
except Exception as e:
logger.error(f"Error finding matching idle session: {e}")
return None
def get_session_info(self, session_id: str) -> Optional[dict]:
"""Get session information from consolidated connection state (compatibility method)"""
if not self.redis_client:
return None
try:
redis_connection = RedisBackedVODConnection(session_id, self.redis_client)
return redis_connection.get_session_metadata()
except Exception as e:
logger.error(f"Error getting session info for {session_id}: {e}")
return None

View file

@ -18,4 +18,7 @@ urlpatterns = [
# Position tracking
path('position/<uuid:content_id>/', views.VODPositionView.as_view(), name='vod_position'),
# VOD Stats
path('stats/', views.VODStatsView.as_view(), name='vod_stats'),
]

View file

@ -664,3 +664,190 @@ class VODPositionView(View):
except Exception as e:
logger.error(f"Error updating VOD position: {e}")
return JsonResponse({'error': str(e)}, status=500)
@method_decorator(csrf_exempt, name='dispatch')
class VODStatsView(View):
"""Get VOD connection statistics"""
def get(self, request):
"""Get current VOD connection statistics"""
try:
connection_manager = MultiWorkerVODConnectionManager.get_instance()
redis_client = connection_manager.redis_client
if not redis_client:
return JsonResponse({'error': 'Redis not available'}, status=500)
# Get all VOD persistent connections (consolidated data)
pattern = "vod_persistent_connection:*"
cursor = 0
connections = []
current_time = time.time()
while True:
cursor, keys = redis_client.scan(cursor, match=pattern, count=100)
for key in keys:
try:
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
connection_data = redis_client.hgetall(key)
if connection_data:
# Extract session ID from key
session_id = key_str.replace('vod_persistent_connection:', '')
# Decode Redis hash data
combined_data = {}
for k, v in connection_data.items():
k_str = k.decode('utf-8') if isinstance(k, bytes) else k
v_str = v.decode('utf-8') if isinstance(v, bytes) else v
combined_data[k_str] = v_str
# Get content info from the connection data (not the key)
content_type = combined_data.get('content_type', 'unknown')
content_uuid = combined_data.get('content_uuid', 'unknown')
client_id = session_id
# Get content info with enhanced metadata
content_name = "Unknown"
content_metadata = {}
try:
if content_type == 'movie':
content_obj = Movie.objects.select_related('logo').get(uuid=content_uuid)
content_name = content_obj.name
content_metadata = {
'year': content_obj.year,
'rating': content_obj.rating,
'genre': content_obj.genre,
'duration_secs': content_obj.duration_secs,
'description': content_obj.description,
'logo_url': content_obj.logo.url if content_obj.logo else None,
'tmdb_id': content_obj.tmdb_id,
'imdb_id': content_obj.imdb_id
}
elif content_type == 'episode':
content_obj = Episode.objects.select_related('series', 'series__logo').get(uuid=content_uuid)
content_name = f"{content_obj.series.name} - {content_obj.name}"
content_metadata = {
'series_name': content_obj.series.name,
'episode_name': content_obj.name,
'season_number': content_obj.season_number,
'episode_number': content_obj.episode_number,
'air_date': content_obj.air_date.isoformat() if content_obj.air_date else None,
'rating': content_obj.rating,
'duration_secs': content_obj.duration_secs,
'description': content_obj.description,
'logo_url': content_obj.series.logo.url if content_obj.series.logo else None,
'series_year': content_obj.series.year,
'series_genre': content_obj.series.genre,
'tmdb_id': content_obj.tmdb_id,
'imdb_id': content_obj.imdb_id
}
except:
pass
# Get M3U profile information
m3u_profile_info = {}
# Use m3u_profile_id for consistency (rename profile_id)
m3u_profile_id = combined_data.get('profile_id') or combined_data.get('m3u_profile_id')
if m3u_profile_id:
try:
from apps.m3u.models import M3UAccountProfile
profile = M3UAccountProfile.objects.select_related('m3u_account').get(id=m3u_profile_id)
m3u_profile_info = {
'profile_name': profile.name,
'account_name': profile.m3u_account.name,
'account_id': profile.m3u_account.id,
'max_streams': profile.m3u_account.max_streams,
'm3u_profile_id': int(m3u_profile_id)
}
except Exception as e:
logger.warning(f"Could not fetch M3U profile {m3u_profile_id}: {e}")
# Also try to get profile info from stored data if database lookup fails
if not m3u_profile_info and (combined_data.get('m3u_profile_name') or combined_data.get('m3u_profile_id')):
m3u_profile_info = {
'profile_name': combined_data.get('m3u_profile_name', 'Unknown Profile'),
'm3u_profile_id': combined_data.get('m3u_profile_id'),
'account_name': 'Unknown Account' # We don't store account name directly
}
connection_info = {
'content_type': content_type,
'content_uuid': content_uuid,
'content_name': content_name,
'content_metadata': content_metadata,
'm3u_profile': m3u_profile_info,
'client_id': client_id,
'client_ip': combined_data.get('client_ip', 'Unknown'),
'user_agent': combined_data.get('user_agent', 'Unknown'),
'connected_at': combined_data.get('connected_at'),
'last_activity': combined_data.get('last_activity'),
'm3u_profile_id': m3u_profile_id # Use m3u_profile_id instead of profile_id
}
# Calculate connection duration
duration_calculated = False
if connection_info['connected_at']:
try:
connected_time = float(connection_info['connected_at'])
duration = current_time - connected_time
connection_info['duration'] = int(duration)
duration_calculated = True
except:
pass
# Fallback: use last_activity if connected_at is not available
if not duration_calculated and connection_info['last_activity']:
try:
last_activity_time = float(connection_info['last_activity'])
# Estimate connection duration using client_id timestamp if available
if connection_info['client_id'].startswith('vod_'):
# Extract timestamp from client_id (format: vod_timestamp_random)
parts = connection_info['client_id'].split('_')
if len(parts) >= 2:
client_start_time = float(parts[1]) / 1000.0 # Convert ms to seconds
duration = current_time - client_start_time
connection_info['duration'] = int(duration)
duration_calculated = True
except:
pass
# Final fallback
if not duration_calculated:
connection_info['duration'] = 0
connections.append(connection_info)
except Exception as e:
logger.error(f"Error processing connection key {key}: {e}")
if cursor == 0:
break
# Group connections by content
content_stats = {}
for conn in connections:
content_key = f"{conn['content_type']}:{conn['content_uuid']}"
if content_key not in content_stats:
content_stats[content_key] = {
'content_type': conn['content_type'],
'content_name': conn['content_name'],
'content_uuid': conn['content_uuid'],
'content_metadata': conn['content_metadata'],
'connection_count': 0,
'connections': []
}
content_stats[content_key]['connection_count'] += 1
content_stats[content_key]['connections'].append(conn)
return JsonResponse({
'vod_connections': list(content_stats.values()),
'total_connections': len(connections),
'timestamp': current_time
})
except Exception as e:
logger.error(f"Error getting VOD stats: {e}")
return JsonResponse({'error': str(e)}, status=500)

View file

@ -1274,6 +1274,16 @@ export default class API {
}
}
static async getVODStats() {
try {
const response = await request(`${host}/proxy/vod/stats/`);
return response;
} catch (e) {
errorNotification('Failed to retrieve VOD stats', e);
}
}
static async stopChannel(id) {
try {
const response = await request(`${host}/proxy/ts/stop/${id}`, {

View file

@ -82,6 +82,316 @@ const getStartDate = (uptime) => {
});
};
// Create a VOD Card component similar to ChannelCard
const VODCard = ({ vodContent }) => {
const [dateFormatSetting] = useLocalStorage('date-format', 'mdy');
const dateFormat = dateFormatSetting === 'mdy' ? 'MM/DD' : 'DD/MM';
// Get metadata from the VOD content
const metadata = vodContent.content_metadata || {};
const contentType = vodContent.content_type;
const isMovie = contentType === 'movie';
const isEpisode = contentType === 'episode';
// Get poster/logo URL
const posterUrl = metadata.logo_url || logo;
// Format duration
const formatDuration = (seconds) => {
if (!seconds) return 'Unknown';
const hours = Math.floor(seconds / 3600);
const minutes = Math.floor((seconds % 3600) / 60);
return hours > 0 ? `${hours}h ${minutes}m` : `${minutes}m`;
};
// Get display title
const getDisplayTitle = () => {
if (isMovie) {
return metadata.year
? `${vodContent.content_name} (${metadata.year})`
: vodContent.content_name;
} else if (isEpisode) {
const season = metadata.season_number
? `S${metadata.season_number.toString().padStart(2, '0')}`
: 'S??';
const episode = metadata.episode_number
? `E${metadata.episode_number.toString().padStart(2, '0')}`
: 'E??';
return `${metadata.series_name} - ${season}${episode}`;
}
return vodContent.content_name;
};
// Get subtitle info
const getSubtitle = () => {
if (isMovie) {
const parts = [];
if (metadata.genre) parts.push(metadata.genre);
if (metadata.rating) parts.push(`Rated ${metadata.rating}`);
return parts.join(' • ');
} else if (isEpisode) {
return metadata.episode_name || 'Episode';
}
return '';
};
// Calculate duration for connection
const calculateConnectionDuration = (connection) => {
// If duration is provided by API, use it
if (connection.duration && connection.duration > 0) {
return dayjs.duration(connection.duration, 'seconds').humanize();
}
// Fallback: try to extract from client_id timestamp
if (connection.client_id && connection.client_id.startsWith('vod_')) {
try {
const parts = connection.client_id.split('_');
if (parts.length >= 2) {
const clientStartTime = parseInt(parts[1]) / 1000; // Convert ms to seconds
const currentTime = Date.now() / 1000;
const duration = currentTime - clientStartTime;
return dayjs.duration(duration, 'seconds').humanize();
}
} catch (e) {
// Ignore parsing errors
}
}
return 'Unknown duration';
};
// Get connection start time for tooltip
const getConnectionStartTime = (connection) => {
if (connection.connected_at) {
return dayjs(connection.connected_at * 1000).format(
`${dateFormat} HH:mm:ss`
);
}
// Fallback: calculate from client_id timestamp
if (connection.client_id && connection.client_id.startsWith('vod_')) {
try {
const parts = connection.client_id.split('_');
if (parts.length >= 2) {
const clientStartTime = parseInt(parts[1]);
return dayjs(clientStartTime).format(`${dateFormat} HH:mm:ss`);
}
} catch (e) {
// Ignore parsing errors
}
}
return 'Unknown';
};
return (
<Card
shadow="sm"
padding="md"
radius="md"
withBorder
style={{
color: '#fff',
backgroundColor: '#27272A',
maxWidth: '700px',
width: '100%',
}}
>
<Stack style={{ position: 'relative' }}>
{/* Header with poster and basic info */}
<Group justify="space-between">
<Box
style={{
width: '100px',
height: '50px',
display: 'flex',
alignItems: 'center',
justifyContent: 'center',
}}
>
<img
src={posterUrl}
style={{
maxWidth: '100%',
maxHeight: '100%',
objectFit: 'contain',
}}
alt="content poster"
/>
</Box>
<Group>
<Tooltip label="Content Duration">
<Center>
<Timer style={{ paddingRight: 5 }} />
{formatDuration(metadata.duration_secs)}
</Center>
</Tooltip>
</Group>
</Group>
{/* Title and type */}
<Flex justify="space-between" align="center">
<Group>
<Text fw={500}>{getDisplayTitle()}</Text>
</Group>
<Tooltip label="Content Type">
<Group gap={5}>
<Video size="18" />
{isMovie ? 'Movie' : 'TV Episode'}
</Group>
</Tooltip>
</Flex>
{/* Subtitle/episode info */}
{getSubtitle() && (
<Flex justify="flex-start" align="center" mt={-8}>
<Text size="sm" c="dimmed">
{getSubtitle()}
</Text>
</Flex>
)}
{/* Content information badges */}
<Group gap="xs" mt="xs">
<Tooltip label="Content Type">
<Badge size="sm" variant="light" color={isMovie ? 'blue' : 'green'}>
{contentType.toUpperCase()}
</Badge>
</Tooltip>
{metadata.year && (
<Tooltip label="Release Year">
<Badge size="sm" variant="light" color="orange">
{metadata.year}
</Badge>
</Tooltip>
)}
{metadata.rating && (
<Tooltip label="Content Rating">
<Badge size="sm" variant="light" color="yellow">
{metadata.rating}
</Badge>
</Tooltip>
)}
{metadata.genre && (
<Tooltip label="Genre">
<Badge size="sm" variant="light" color="pink">
{metadata.genre}
</Badge>
</Tooltip>
)}
{isEpisode && metadata.season_number && (
<Tooltip label="Season Number">
<Badge size="sm" variant="light" color="cyan">
Season {metadata.season_number}
</Badge>
</Tooltip>
)}
</Group>
{/* Connection statistics */}
<Group justify="space-between">
<Group gap={5}>
<Tooltip
label={`${vodContent.connection_count} active viewer${vodContent.connection_count !== 1 ? 's' : ''}`}
>
<Group gap={4} style={{ cursor: 'help' }}>
<Users size="18" />
<Text size="sm">{vodContent.connection_count}</Text>
</Group>
</Tooltip>
</Group>
<Tooltip label="VOD Content">
<Text size="sm" style={{ cursor: 'help' }}>
On Demand
</Text>
</Tooltip>
</Group>
{/* Connection details table */}
<Box mt="md">
<Text size="sm" fw={500} mb="xs">
Active Connections:
</Text>
<Stack gap="xs">
{vodContent.connections.map((connection, index) => (
<Box
key={`${connection.client_id}-${index}`}
p="xs"
style={{
backgroundColor: 'rgba(255, 255, 255, 0.05)',
borderRadius: '4px',
border: '1px solid #444',
}}
>
<Group justify="space-between" align="center">
<Group align="center" gap="xs">
<HardDriveDownload size={14} />
<Text size="xs">{connection.client_ip}</Text>
<Text size="xs" c="dimmed">
(Client: {connection.client_id.slice(0, 12)}...)
</Text>
</Group>
<Group align="center" gap="xs">
<Timer size={14} />
<Tooltip
label={`Connected at ${getConnectionStartTime(connection)}`}
>
<Text size="xs" c="dimmed">
{calculateConnectionDuration(connection)}
</Text>
</Tooltip>
</Group>
</Group>
{/* M3U Profile Information */}
{connection.m3u_profile &&
(connection.m3u_profile.profile_name ||
connection.m3u_profile.account_name) && (
<Group mt="xs" gap="xs">
<HardDriveUpload size={12} />
<Text size="xs" c="dimmed">
M3U:{' '}
{connection.m3u_profile.account_name ||
'Unknown Account'}{' '}
{' '}
{connection.m3u_profile.profile_name ||
'Default Profile'}
</Text>
</Group>
)}
{/* User Agent info */}
{connection.user_agent &&
connection.user_agent !== 'Unknown' && (
<Group mt="xs" gap="xs">
<Text
size="xs"
c="dimmed"
style={{ fontFamily: 'monospace' }}
>
{connection.user_agent.length > 60
? `${connection.user_agent.substring(0, 60)}...`
: connection.user_agent}
</Text>
</Group>
)}
</Box>
))}
</Stack>
</Box>
</Stack>
</Card>
);
};
// Create a separate component for each channel card to properly handle the hook
const ChannelCard = ({
channel,
@ -728,6 +1038,7 @@ const ChannelsPage = () => {
const [activeChannels, setActiveChannels] = useState({});
const [clients, setClients] = useState([]);
const [vodConnections, setVodConnections] = useState([]);
const [isPollingActive, setIsPollingActive] = useState(false);
// Use localStorage for stats refresh interval (in seconds)
@ -844,6 +1155,24 @@ const ChannelsPage = () => {
}
}, [setChannelStats]);
const fetchVODStats = useCallback(async () => {
try {
const response = await API.getVODStats();
if (response) {
setVodConnections(response.vod_connections || []);
} else {
console.log('VOD API response was empty or null');
}
} catch (error) {
console.error('Error fetching VOD stats:', error);
console.error('Error details:', {
message: error.message,
status: error.status,
body: error.body,
});
}
}, []);
// Set up polling for stats when on stats page
useEffect(() => {
const location = window.location;
@ -854,10 +1183,12 @@ const ChannelsPage = () => {
// Initial fetch
fetchChannelStats();
fetchVODStats();
// Set up interval
const interval = setInterval(() => {
fetchChannelStats();
fetchVODStats();
}, refreshInterval);
return () => {
@ -867,12 +1198,13 @@ const ChannelsPage = () => {
} else {
setIsPollingActive(false);
}
}, [refreshInterval, fetchChannelStats]);
}, [refreshInterval, fetchChannelStats, fetchVODStats]);
// Fetch initial stats on component mount (for immediate data when navigating to page)
useEffect(() => {
fetchChannelStats();
}, [fetchChannelStats]);
fetchVODStats();
}, [fetchChannelStats, fetchVODStats]);
useEffect(() => {
console.log('Processing channel stats:', channelStats);
@ -967,7 +1299,7 @@ const ChannelsPage = () => {
<Title order={3}>Active Streams</Title>
<Group align="center">
<Group align="center" gap="xs">
<Text size="sm">Automatic Refresh Interval (seconds):</Text>
<Text size="sm">Refresh Interval (seconds):</Text>
<NumberInput
value={refreshIntervalSeconds}
onChange={(value) => setRefreshIntervalSeconds(value || 0)}
@ -991,7 +1323,10 @@ const ChannelsPage = () => {
<Button
size="xs"
variant="subtle"
onClick={fetchChannelStats}
onClick={() => {
fetchChannelStats();
fetchVODStats();
}}
loading={false}
>
Refresh Now
@ -1033,6 +1368,46 @@ const ChannelsPage = () => {
))
)}
</div>
{/* VOD Connections Section */}
<Box style={{ padding: '10px', borderBottom: '1px solid #444' }}>
<Group justify="space-between" align="center">
<Title order={3}>VOD Connections</Title>
<Text size="sm" c="dimmed">
{vodConnections.length} active connection
{vodConnections.length !== 1 ? 's' : ''}
</Text>
</Group>
</Box>
<div
style={{
display: 'grid',
gap: '1rem',
padding: '10px',
gridTemplateColumns: 'repeat(auto-fill, minmax(400px, 1fr))',
}}
>
{vodConnections.length === 0 ? (
<Box
style={{
gridColumn: '1 / -1',
textAlign: 'center',
padding: '40px',
}}
>
<Text size="xl" color="dimmed">
No active VOD connections
</Text>
</Box>
) : (
vodConnections.map((vodContent) => (
<VODCard
key={`${vodContent.content_type}-${vodContent.content_uuid}`}
vodContent={vodContent}
/>
))
)}
</div>
</Box>
);
};