diff --git a/apps/channels/apps.py b/apps/channels/apps.py index 7761a15e..d6d29a80 100644 --- a/apps/channels/apps.py +++ b/apps/channels/apps.py @@ -9,12 +9,3 @@ class ChannelsConfig(AppConfig): def ready(self): # Import signals so they get registered. import apps.channels.signals - - # Kick off DVR recovery shortly after startup (idempotent via Redis lock) - try: - from .tasks import recover_recordings_on_startup - # Schedule with a short delay to allow migrations/DB readiness - recover_recordings_on_startup.apply_async(countdown=5) - except Exception: - # Avoid hard failures at startup if Celery isn't ready yet - pass diff --git a/apps/proxy/vod_proxy/multi_worker_connection_manager.py b/apps/proxy/vod_proxy/multi_worker_connection_manager.py index 10905e60..7fe1a80c 100644 --- a/apps/proxy/vod_proxy/multi_worker_connection_manager.py +++ b/apps/proxy/vod_proxy/multi_worker_connection_manager.py @@ -25,7 +25,13 @@ class SerializableConnectionState: def __init__(self, session_id: str, stream_url: str, headers: dict, content_length: str = None, content_type: str = 'video/mp4', - final_url: str = None, m3u_profile_id: int = None): + final_url: str = None, m3u_profile_id: int = None, + # Session metadata fields (previously stored in vod_session key) + content_obj_type: str = None, content_uuid: str = None, + content_name: str = None, client_ip: str = None, + user_agent: str = None, utc_start: str = None, + utc_end: str = None, offset: str = None, + worker_id: str = None, connection_type: str = "redis_backed"): self.session_id = session_id self.stream_url = stream_url self.headers = headers @@ -37,6 +43,23 @@ class SerializableConnectionState: self.request_count = 0 self.active_streams = 0 + # Session metadata (consolidated from vod_session key) + self.content_obj_type = content_obj_type + self.content_uuid = content_uuid + self.content_name = content_name + self.client_ip = client_ip + self.user_agent = user_agent + self.utc_start = utc_start or "" + self.utc_end = utc_end or "" + self.offset = offset or "" + self.worker_id = worker_id + self.connection_type = connection_type + self.created_at = time.time() + + # Additional tracking fields + self.bytes_sent = 0 + self.position_seconds = 0 + def to_dict(self): """Convert to dictionary for Redis storage""" return { @@ -49,7 +72,22 @@ class SerializableConnectionState: 'm3u_profile_id': str(self.m3u_profile_id) if self.m3u_profile_id is not None else '', 'last_activity': str(self.last_activity), 'request_count': str(self.request_count), - 'active_streams': str(self.active_streams) + 'active_streams': str(self.active_streams), + # Session metadata + 'content_obj_type': self.content_obj_type or '', + 'content_uuid': self.content_uuid or '', + 'content_name': self.content_name or '', + 'client_ip': self.client_ip or '', + 'user_agent': self.user_agent or '', + 'utc_start': self.utc_start or '', + 'utc_end': self.utc_end or '', + 'offset': self.offset or '', + 'worker_id': self.worker_id or '', + 'connection_type': self.connection_type or 'redis_backed', + 'created_at': str(self.created_at), + # Additional tracking fields + 'bytes_sent': str(self.bytes_sent), + 'position_seconds': str(self.position_seconds) } @classmethod @@ -62,11 +100,26 @@ class SerializableConnectionState: content_length=data.get('content_length') if data.get('content_length') else None, content_type=data.get('content_type', 'video/mp4'), final_url=data.get('final_url') if data.get('final_url') else None, - m3u_profile_id=int(data.get('m3u_profile_id')) if data.get('m3u_profile_id') else None + m3u_profile_id=int(data.get('m3u_profile_id')) if data.get('m3u_profile_id') else None, + # Session metadata + content_obj_type=data.get('content_obj_type') or None, + content_uuid=data.get('content_uuid') or None, + content_name=data.get('content_name') or None, + client_ip=data.get('client_ip') or None, + user_agent=data.get('user_agent') or None, + utc_start=data.get('utc_start') or '', + utc_end=data.get('utc_end') or '', + offset=data.get('offset') or '', + worker_id=data.get('worker_id') or None, + connection_type=data.get('connection_type', 'redis_backed') ) obj.last_activity = float(data.get('last_activity', time.time())) obj.request_count = int(data.get('request_count', 0)) obj.active_streams = int(data.get('active_streams', 0)) + obj.created_at = float(data.get('created_at', time.time())) + # Additional tracking fields + obj.bytes_sent = int(data.get('bytes_sent', 0)) + obj.position_seconds = int(data.get('position_seconds', 0)) return obj @@ -108,7 +161,7 @@ class RedisBackedVODConnection: try: data = state.to_dict() # Log the data being saved for debugging - logger.debug(f"[{self.session_id}] Saving connection state: {data}") + logger.trace(f"[{self.session_id}] Saving connection state: {data}") # Verify all values are valid for Redis for key, value in data.items(): @@ -144,8 +197,14 @@ class RedisBackedVODConnection: except Exception as e: logger.error(f"[{self.session_id}] Error releasing lock: {e}") - def create_connection(self, stream_url: str, headers: dict, m3u_profile_id: int = None) -> bool: - """Create a new connection state in Redis""" + def create_connection(self, stream_url: str, headers: dict, m3u_profile_id: int = None, + # Session metadata (consolidated from vod_session key) + content_obj_type: str = None, content_uuid: str = None, + content_name: str = None, client_ip: str = None, + user_agent: str = None, utc_start: str = None, + utc_end: str = None, offset: str = None, + worker_id: str = None) -> bool: + """Create a new connection state in Redis with consolidated session metadata""" if not self._acquire_lock(): logger.warning(f"[{self.session_id}] Could not acquire lock for connection creation") return False @@ -157,12 +216,27 @@ class RedisBackedVODConnection: logger.info(f"[{self.session_id}] Connection already exists in Redis") return True - # Create new connection state - state = SerializableConnectionState(self.session_id, stream_url, headers, m3u_profile_id=m3u_profile_id) + # Create new connection state with consolidated session metadata + state = SerializableConnectionState( + session_id=self.session_id, + stream_url=stream_url, + headers=headers, + m3u_profile_id=m3u_profile_id, + # Session metadata + content_obj_type=content_obj_type, + content_uuid=content_uuid, + content_name=content_name, + client_ip=client_ip, + user_agent=user_agent, + utc_start=utc_start, + utc_end=utc_end, + offset=offset, + worker_id=worker_id + ) success = self._save_connection_state(state) if success: - logger.info(f"[{self.session_id}] Created new connection state in Redis") + logger.info(f"[{self.session_id}] Created new connection state in Redis with consolidated session metadata") return success finally: @@ -325,6 +399,31 @@ class RedisBackedVODConnection: } return {} + def get_session_metadata(self): + """Get session metadata from consolidated connection state""" + state = self._get_connection_state() + if state: + return { + 'content_obj_type': state.content_obj_type, + 'content_uuid': state.content_uuid, + 'content_name': state.content_name, + 'client_ip': state.client_ip, + 'user_agent': state.user_agent, + 'utc_start': state.utc_start, + 'utc_end': state.utc_end, + 'offset': state.offset, + 'worker_id': state.worker_id, + 'connection_type': state.connection_type, + 'created_at': state.created_at, + 'last_activity': state.last_activity, + 'm3u_profile_id': state.m3u_profile_id, + 'bytes_sent': state.bytes_sent, + 'position_seconds': state.position_seconds, + 'active_streams': state.active_streams, + 'request_count': state.request_count + } + return {} + def cleanup(self, connection_manager=None): """Clean up local resources and Redis state""" # Get connection state before cleanup to handle profile decrementing @@ -340,44 +439,19 @@ class RedisBackedVODConnection: # Remove from Redis if self.redis_client: try: - # Get session information for cleanup - session_key = f"vod_session:{self.session_id}" - session_data = self.redis_client.hgetall(session_key) - - # Convert bytes to strings if needed - if session_data and isinstance(list(session_data.keys())[0], bytes): - session_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in session_data.items()} - # Use pipeline for atomic cleanup operations pipe = self.redis_client.pipeline() - # 1. Remove main connection state + # 1. Remove main connection state (now contains consolidated data) pipe.delete(self.connection_key) # 2. Remove distributed lock pipe.delete(self.lock_key) - # 3. Remove session tracking - pipe.delete(session_key) - - # 4. Clean up legacy vod_proxy connection keys if session data exists - if session_data: - content_type = session_data.get('content_type') - content_uuid = session_data.get('content_uuid') - - if content_type and content_uuid: - # Remove from vod_proxy connection tracking - vod_proxy_connection_key = f"vod_proxy:connection:{content_type}:{content_uuid}:{self.session_id}" - pipe.delete(vod_proxy_connection_key) - - # Remove from content connections set - content_connections_key = f"vod_proxy:content:{content_type}:{content_uuid}:connections" - pipe.srem(content_connections_key, self.session_id) - # Execute all cleanup operations pipe.execute() - logger.info(f"[{self.session_id}] Cleaned up all Redis keys (connection, session, locks)") + logger.info(f"[{self.session_id}] Cleaned up all Redis keys (consolidated connection state, locks)") # Decrement profile connections if we have the state and connection manager if state and state.m3u_profile_id and connection_manager: @@ -479,8 +553,28 @@ class MultiWorkerVODConnectionManager: logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed streaming request for {content_type} {content_name}") try: + # First, try to find an existing idle session that matches our criteria + matching_session_id = self.find_matching_idle_session( + content_type=content_type, + content_uuid=content_uuid, + client_ip=client_ip, + user_agent=user_agent, + utc_start=utc_start, + utc_end=utc_end, + offset=offset + ) + + # Use matching session if found, otherwise use the provided session_id + if matching_session_id: + logger.info(f"[{client_id}] Worker {self.worker_id} - Found matching idle session: {matching_session_id}") + effective_session_id = matching_session_id + client_id = matching_session_id # Update client_id for logging consistency + else: + logger.info(f"[{client_id}] Worker {self.worker_id} - No matching idle session found, using new session") + effective_session_id = session_id + # Create Redis-backed connection - redis_connection = RedisBackedVODConnection(session_id, self.redis_client) + redis_connection = RedisBackedVODConnection(effective_session_id, self.redis_client) # Check if connection exists, create if not existing_state = redis_connection._get_connection_state() @@ -515,48 +609,42 @@ class MultiWorkerVODConnectionManager: # Add worker identification headers['X-Worker-ID'] = self.worker_id - # Create connection state in Redis - if not redis_connection.create_connection(modified_stream_url, headers, m3u_profile.id): + # Create connection state in Redis with consolidated session metadata + if not redis_connection.create_connection( + stream_url=modified_stream_url, + headers=headers, + m3u_profile_id=m3u_profile.id, + # Session metadata (consolidated from separate vod_session key) + content_obj_type=content_type, + content_uuid=content_uuid, + content_name=content_name, + client_ip=client_ip, + user_agent=user_agent, + utc_start=utc_start, + utc_end=utc_end, + offset=str(offset) if offset else None, + worker_id=self.worker_id + ): logger.error(f"[{client_id}] Worker {self.worker_id} - Failed to create Redis connection") return HttpResponse("Failed to create connection", status=500) # Increment profile connections after successful connection creation self._increment_profile_connections(m3u_profile) - # Create session tracking - session_info = { - "content_type": content_type, - "content_uuid": content_uuid, - "content_name": content_name, - "created_at": str(time.time()), - "last_activity": str(time.time()), - "profile_id": str(m3u_profile.id), - "client_ip": client_ip, - "user_agent": user_agent, - "utc_start": utc_start or "", - "utc_end": utc_end or "", - "offset": str(offset) if offset else "", - "worker_id": self.worker_id, # Track which worker created this - "connection_type": "redis_backed" - } - - session_key = f"vod_session:{session_id}" - if self.redis_client: - self.redis_client.hset(session_key, mapping=session_info) - self.redis_client.expire(session_key, self.session_ttl) - - logger.info(f"[{client_id}] Worker {self.worker_id} - Created session: {session_info}") + logger.info(f"[{client_id}] Worker {self.worker_id} - Created consolidated connection with session metadata") else: logger.info(f"[{client_id}] Worker {self.worker_id} - Using existing Redis-backed connection") - # Update session activity - session_key = f"vod_session:{session_id}" - if self.redis_client: - self.redis_client.hset(session_key, mapping={ - "last_activity": str(time.time()), - "last_worker_id": self.worker_id # Track which worker last accessed this - }) - self.redis_client.expire(session_key, self.session_ttl) + # Update session activity in consolidated connection state + if redis_connection._acquire_lock(): + try: + state = redis_connection._get_connection_state() + if state: + state.last_activity = time.time() + state.worker_id = self.worker_id # Track which worker last accessed this + redis_connection._save_connection_state(state) + finally: + redis_connection._release_lock() # Get stream from Redis-backed connection upstream_response = redis_connection.get_stream(range_header) @@ -586,14 +674,19 @@ class MultiWorkerVODConnectionManager: bytes_sent += len(chunk) chunk_count += 1 - # Update activity every 100 chunks + # Update activity every 100 chunks in consolidated connection state if chunk_count % 100 == 0: - self.update_connection_activity( - content_type=content_type, - content_uuid=content_uuid, - client_id=client_id, - bytes_sent=len(chunk) - ) + # Update the connection state + if redis_connection._acquire_lock(): + try: + state = redis_connection._get_connection_state() + if state: + state.last_activity = time.time() + # Store cumulative bytes sent in connection state + state.bytes_sent = bytes_sent # Use cumulative bytes_sent, not chunk size + redis_connection._save_connection_state(state) + finally: + redis_connection._release_lock() logger.info(f"[{client_id}] Worker {self.worker_id} - Redis-backed stream completed: {bytes_sent} bytes sent") redis_connection.decrement_active_streams() @@ -925,13 +1018,13 @@ class MultiWorkerVODConnectionManager: def find_matching_idle_session(self, content_type: str, content_uuid: str, client_ip: str, user_agent: str, utc_start=None, utc_end=None, offset=None) -> Optional[str]: - """Find existing Redis-backed session that matches criteria""" + """Find existing Redis-backed session that matches criteria using consolidated connection state""" if not self.redis_client: return None try: - # Search for sessions with matching content - pattern = "vod_session:*" + # Search for connections with consolidated session data + pattern = "vod_persistent_connection:*" cursor = 0 matching_sessions = [] @@ -940,23 +1033,23 @@ class MultiWorkerVODConnectionManager: for key in keys: try: - session_data = self.redis_client.hgetall(key) - if not session_data: + connection_data = self.redis_client.hgetall(key) + if not connection_data: continue # Convert bytes keys/values to strings if needed - if isinstance(list(session_data.keys())[0], bytes): - session_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in session_data.items()} + if isinstance(list(connection_data.keys())[0], bytes): + connection_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in connection_data.items()} - # Check if content matches - stored_content_type = session_data.get('content_type', '') - stored_content_uuid = session_data.get('content_uuid', '') + # Check if content matches (using consolidated data) + stored_content_type = connection_data.get('content_obj_type', '') + stored_content_uuid = connection_data.get('content_uuid', '') if stored_content_type != content_type or stored_content_uuid != content_uuid: continue # Extract session ID - session_id = key.decode('utf-8').replace('vod_session:', '') + session_id = key.decode('utf-8').replace('vod_persistent_connection:', '') # Check if Redis-backed connection exists and has no active streams redis_connection = RedisBackedVODConnection(session_id, self.redis_client) @@ -967,9 +1060,9 @@ class MultiWorkerVODConnectionManager: score = 10 # Content match match_reasons = ["content"] - # Check other criteria - stored_client_ip = session_data.get('client_ip', '') - stored_user_agent = session_data.get('user_agent', '') + # Check other criteria (using consolidated data) + stored_client_ip = connection_data.get('client_ip', '') + stored_user_agent = connection_data.get('user_agent', '') if stored_client_ip and stored_client_ip == client_ip: score += 5 @@ -979,10 +1072,10 @@ class MultiWorkerVODConnectionManager: score += 3 match_reasons.append("user-agent") - # Check timeshift parameters - stored_utc_start = session_data.get('utc_start', '') - stored_utc_end = session_data.get('utc_end', '') - stored_offset = session_data.get('offset', '') + # Check timeshift parameters (using consolidated data) + stored_utc_start = connection_data.get('utc_start', '') + stored_utc_end = connection_data.get('utc_end', '') + stored_offset = connection_data.get('offset', '') current_utc_start = utc_start or "" current_utc_end = utc_end or "" @@ -999,11 +1092,11 @@ class MultiWorkerVODConnectionManager: 'session_id': session_id, 'score': score, 'reasons': match_reasons, - 'last_activity': float(session_data.get('last_activity', '0')) + 'last_activity': float(connection_data.get('last_activity', '0')) }) except Exception as e: - logger.debug(f"Error processing session key {key}: {e}") + logger.debug(f"Error processing connection key {key}: {e}") continue if cursor == 0: @@ -1023,3 +1116,15 @@ class MultiWorkerVODConnectionManager: except Exception as e: logger.error(f"Error finding matching idle session: {e}") return None + + def get_session_info(self, session_id: str) -> Optional[dict]: + """Get session information from consolidated connection state (compatibility method)""" + if not self.redis_client: + return None + + try: + redis_connection = RedisBackedVODConnection(session_id, self.redis_client) + return redis_connection.get_session_metadata() + except Exception as e: + logger.error(f"Error getting session info for {session_id}: {e}") + return None \ No newline at end of file diff --git a/apps/proxy/vod_proxy/urls.py b/apps/proxy/vod_proxy/urls.py index 0d2b306e..c06426ce 100644 --- a/apps/proxy/vod_proxy/urls.py +++ b/apps/proxy/vod_proxy/urls.py @@ -18,4 +18,7 @@ urlpatterns = [ # Position tracking path('position//', views.VODPositionView.as_view(), name='vod_position'), + + # VOD Stats + path('stats/', views.VODStatsView.as_view(), name='vod_stats'), ] diff --git a/apps/proxy/vod_proxy/views.py b/apps/proxy/vod_proxy/views.py index 60472403..480f9393 100644 --- a/apps/proxy/vod_proxy/views.py +++ b/apps/proxy/vod_proxy/views.py @@ -664,3 +664,190 @@ class VODPositionView(View): except Exception as e: logger.error(f"Error updating VOD position: {e}") return JsonResponse({'error': str(e)}, status=500) + + +@method_decorator(csrf_exempt, name='dispatch') +class VODStatsView(View): + """Get VOD connection statistics""" + + def get(self, request): + """Get current VOD connection statistics""" + try: + connection_manager = MultiWorkerVODConnectionManager.get_instance() + redis_client = connection_manager.redis_client + + if not redis_client: + return JsonResponse({'error': 'Redis not available'}, status=500) + + # Get all VOD persistent connections (consolidated data) + pattern = "vod_persistent_connection:*" + cursor = 0 + connections = [] + current_time = time.time() + + while True: + cursor, keys = redis_client.scan(cursor, match=pattern, count=100) + + for key in keys: + try: + key_str = key.decode('utf-8') if isinstance(key, bytes) else key + connection_data = redis_client.hgetall(key) + + if connection_data: + # Extract session ID from key + session_id = key_str.replace('vod_persistent_connection:', '') + + # Decode Redis hash data + combined_data = {} + for k, v in connection_data.items(): + k_str = k.decode('utf-8') if isinstance(k, bytes) else k + v_str = v.decode('utf-8') if isinstance(v, bytes) else v + combined_data[k_str] = v_str + + # Get content info from the connection data (not the key) + content_type = combined_data.get('content_type', 'unknown') + content_uuid = combined_data.get('content_uuid', 'unknown') + client_id = session_id + + # Get content info with enhanced metadata + content_name = "Unknown" + content_metadata = {} + try: + if content_type == 'movie': + content_obj = Movie.objects.select_related('logo').get(uuid=content_uuid) + content_name = content_obj.name + content_metadata = { + 'year': content_obj.year, + 'rating': content_obj.rating, + 'genre': content_obj.genre, + 'duration_secs': content_obj.duration_secs, + 'description': content_obj.description, + 'logo_url': content_obj.logo.url if content_obj.logo else None, + 'tmdb_id': content_obj.tmdb_id, + 'imdb_id': content_obj.imdb_id + } + elif content_type == 'episode': + content_obj = Episode.objects.select_related('series', 'series__logo').get(uuid=content_uuid) + content_name = f"{content_obj.series.name} - {content_obj.name}" + content_metadata = { + 'series_name': content_obj.series.name, + 'episode_name': content_obj.name, + 'season_number': content_obj.season_number, + 'episode_number': content_obj.episode_number, + 'air_date': content_obj.air_date.isoformat() if content_obj.air_date else None, + 'rating': content_obj.rating, + 'duration_secs': content_obj.duration_secs, + 'description': content_obj.description, + 'logo_url': content_obj.series.logo.url if content_obj.series.logo else None, + 'series_year': content_obj.series.year, + 'series_genre': content_obj.series.genre, + 'tmdb_id': content_obj.tmdb_id, + 'imdb_id': content_obj.imdb_id + } + except: + pass + + # Get M3U profile information + m3u_profile_info = {} + # Use m3u_profile_id for consistency (rename profile_id) + m3u_profile_id = combined_data.get('profile_id') or combined_data.get('m3u_profile_id') + if m3u_profile_id: + try: + from apps.m3u.models import M3UAccountProfile + profile = M3UAccountProfile.objects.select_related('m3u_account').get(id=m3u_profile_id) + m3u_profile_info = { + 'profile_name': profile.name, + 'account_name': profile.m3u_account.name, + 'account_id': profile.m3u_account.id, + 'max_streams': profile.m3u_account.max_streams, + 'm3u_profile_id': int(m3u_profile_id) + } + except Exception as e: + logger.warning(f"Could not fetch M3U profile {m3u_profile_id}: {e}") + + # Also try to get profile info from stored data if database lookup fails + if not m3u_profile_info and (combined_data.get('m3u_profile_name') or combined_data.get('m3u_profile_id')): + m3u_profile_info = { + 'profile_name': combined_data.get('m3u_profile_name', 'Unknown Profile'), + 'm3u_profile_id': combined_data.get('m3u_profile_id'), + 'account_name': 'Unknown Account' # We don't store account name directly + } + + connection_info = { + 'content_type': content_type, + 'content_uuid': content_uuid, + 'content_name': content_name, + 'content_metadata': content_metadata, + 'm3u_profile': m3u_profile_info, + 'client_id': client_id, + 'client_ip': combined_data.get('client_ip', 'Unknown'), + 'user_agent': combined_data.get('user_agent', 'Unknown'), + 'connected_at': combined_data.get('connected_at'), + 'last_activity': combined_data.get('last_activity'), + 'm3u_profile_id': m3u_profile_id # Use m3u_profile_id instead of profile_id + } + + # Calculate connection duration + duration_calculated = False + if connection_info['connected_at']: + try: + connected_time = float(connection_info['connected_at']) + duration = current_time - connected_time + connection_info['duration'] = int(duration) + duration_calculated = True + except: + pass + + # Fallback: use last_activity if connected_at is not available + if not duration_calculated and connection_info['last_activity']: + try: + last_activity_time = float(connection_info['last_activity']) + # Estimate connection duration using client_id timestamp if available + if connection_info['client_id'].startswith('vod_'): + # Extract timestamp from client_id (format: vod_timestamp_random) + parts = connection_info['client_id'].split('_') + if len(parts) >= 2: + client_start_time = float(parts[1]) / 1000.0 # Convert ms to seconds + duration = current_time - client_start_time + connection_info['duration'] = int(duration) + duration_calculated = True + except: + pass + + # Final fallback + if not duration_calculated: + connection_info['duration'] = 0 + + connections.append(connection_info) + + except Exception as e: + logger.error(f"Error processing connection key {key}: {e}") + + if cursor == 0: + break + + # Group connections by content + content_stats = {} + for conn in connections: + content_key = f"{conn['content_type']}:{conn['content_uuid']}" + if content_key not in content_stats: + content_stats[content_key] = { + 'content_type': conn['content_type'], + 'content_name': conn['content_name'], + 'content_uuid': conn['content_uuid'], + 'content_metadata': conn['content_metadata'], + 'connection_count': 0, + 'connections': [] + } + content_stats[content_key]['connection_count'] += 1 + content_stats[content_key]['connections'].append(conn) + + return JsonResponse({ + 'vod_connections': list(content_stats.values()), + 'total_connections': len(connections), + 'timestamp': current_time + }) + + except Exception as e: + logger.error(f"Error getting VOD stats: {e}") + return JsonResponse({'error': str(e)}, status=500) diff --git a/frontend/src/api.js b/frontend/src/api.js index cb834bd5..0c763dd8 100644 --- a/frontend/src/api.js +++ b/frontend/src/api.js @@ -1274,6 +1274,16 @@ export default class API { } } + static async getVODStats() { + try { + const response = await request(`${host}/proxy/vod/stats/`); + + return response; + } catch (e) { + errorNotification('Failed to retrieve VOD stats', e); + } + } + static async stopChannel(id) { try { const response = await request(`${host}/proxy/ts/stop/${id}`, { diff --git a/frontend/src/pages/Stats.jsx b/frontend/src/pages/Stats.jsx index 4b796f84..4d7502f0 100644 --- a/frontend/src/pages/Stats.jsx +++ b/frontend/src/pages/Stats.jsx @@ -82,6 +82,316 @@ const getStartDate = (uptime) => { }); }; +// Create a VOD Card component similar to ChannelCard +const VODCard = ({ vodContent }) => { + const [dateFormatSetting] = useLocalStorage('date-format', 'mdy'); + const dateFormat = dateFormatSetting === 'mdy' ? 'MM/DD' : 'DD/MM'; + + // Get metadata from the VOD content + const metadata = vodContent.content_metadata || {}; + const contentType = vodContent.content_type; + const isMovie = contentType === 'movie'; + const isEpisode = contentType === 'episode'; + + // Get poster/logo URL + const posterUrl = metadata.logo_url || logo; + + // Format duration + const formatDuration = (seconds) => { + if (!seconds) return 'Unknown'; + const hours = Math.floor(seconds / 3600); + const minutes = Math.floor((seconds % 3600) / 60); + return hours > 0 ? `${hours}h ${minutes}m` : `${minutes}m`; + }; + + // Get display title + const getDisplayTitle = () => { + if (isMovie) { + return metadata.year + ? `${vodContent.content_name} (${metadata.year})` + : vodContent.content_name; + } else if (isEpisode) { + const season = metadata.season_number + ? `S${metadata.season_number.toString().padStart(2, '0')}` + : 'S??'; + const episode = metadata.episode_number + ? `E${metadata.episode_number.toString().padStart(2, '0')}` + : 'E??'; + return `${metadata.series_name} - ${season}${episode}`; + } + return vodContent.content_name; + }; + + // Get subtitle info + const getSubtitle = () => { + if (isMovie) { + const parts = []; + if (metadata.genre) parts.push(metadata.genre); + if (metadata.rating) parts.push(`Rated ${metadata.rating}`); + return parts.join(' • '); + } else if (isEpisode) { + return metadata.episode_name || 'Episode'; + } + return ''; + }; + + // Calculate duration for connection + const calculateConnectionDuration = (connection) => { + // If duration is provided by API, use it + if (connection.duration && connection.duration > 0) { + return dayjs.duration(connection.duration, 'seconds').humanize(); + } + + // Fallback: try to extract from client_id timestamp + if (connection.client_id && connection.client_id.startsWith('vod_')) { + try { + const parts = connection.client_id.split('_'); + if (parts.length >= 2) { + const clientStartTime = parseInt(parts[1]) / 1000; // Convert ms to seconds + const currentTime = Date.now() / 1000; + const duration = currentTime - clientStartTime; + return dayjs.duration(duration, 'seconds').humanize(); + } + } catch (e) { + // Ignore parsing errors + } + } + + return 'Unknown duration'; + }; + + // Get connection start time for tooltip + const getConnectionStartTime = (connection) => { + if (connection.connected_at) { + return dayjs(connection.connected_at * 1000).format( + `${dateFormat} HH:mm:ss` + ); + } + + // Fallback: calculate from client_id timestamp + if (connection.client_id && connection.client_id.startsWith('vod_')) { + try { + const parts = connection.client_id.split('_'); + if (parts.length >= 2) { + const clientStartTime = parseInt(parts[1]); + return dayjs(clientStartTime).format(`${dateFormat} HH:mm:ss`); + } + } catch (e) { + // Ignore parsing errors + } + } + + return 'Unknown'; + }; + + return ( + + + {/* Header with poster and basic info */} + + + content poster + + + + +
+ + {formatDuration(metadata.duration_secs)} +
+
+
+
+ + {/* Title and type */} + + + {getDisplayTitle()} + + + + + + + + + {/* Subtitle/episode info */} + {getSubtitle() && ( + + + {getSubtitle()} + + + )} + + {/* Content information badges */} + + + + {contentType.toUpperCase()} + + + + {metadata.year && ( + + + {metadata.year} + + + )} + + {metadata.rating && ( + + + {metadata.rating} + + + )} + + {metadata.genre && ( + + + {metadata.genre} + + + )} + + {isEpisode && metadata.season_number && ( + + + Season {metadata.season_number} + + + )} + + + {/* Connection statistics */} + + + + + + {vodContent.connection_count} + + + + + + + On Demand + + + + + {/* Connection details table */} + + + Active Connections: + + + {vodContent.connections.map((connection, index) => ( + + + + + {connection.client_ip} + + (Client: {connection.client_id.slice(0, 12)}...) + + + + + + + + {calculateConnectionDuration(connection)} + + + + + + {/* M3U Profile Information */} + {connection.m3u_profile && + (connection.m3u_profile.profile_name || + connection.m3u_profile.account_name) && ( + + + + M3U:{' '} + {connection.m3u_profile.account_name || + 'Unknown Account'}{' '} + →{' '} + {connection.m3u_profile.profile_name || + 'Default Profile'} + + + )} + + {/* User Agent info */} + {connection.user_agent && + connection.user_agent !== 'Unknown' && ( + + + {connection.user_agent.length > 60 + ? `${connection.user_agent.substring(0, 60)}...` + : connection.user_agent} + + + )} + + ))} + + +
+
+ ); +}; + // Create a separate component for each channel card to properly handle the hook const ChannelCard = ({ channel, @@ -728,6 +1038,7 @@ const ChannelsPage = () => { const [activeChannels, setActiveChannels] = useState({}); const [clients, setClients] = useState([]); + const [vodConnections, setVodConnections] = useState([]); const [isPollingActive, setIsPollingActive] = useState(false); // Use localStorage for stats refresh interval (in seconds) @@ -844,6 +1155,24 @@ const ChannelsPage = () => { } }, [setChannelStats]); + const fetchVODStats = useCallback(async () => { + try { + const response = await API.getVODStats(); + if (response) { + setVodConnections(response.vod_connections || []); + } else { + console.log('VOD API response was empty or null'); + } + } catch (error) { + console.error('Error fetching VOD stats:', error); + console.error('Error details:', { + message: error.message, + status: error.status, + body: error.body, + }); + } + }, []); + // Set up polling for stats when on stats page useEffect(() => { const location = window.location; @@ -854,10 +1183,12 @@ const ChannelsPage = () => { // Initial fetch fetchChannelStats(); + fetchVODStats(); // Set up interval const interval = setInterval(() => { fetchChannelStats(); + fetchVODStats(); }, refreshInterval); return () => { @@ -867,12 +1198,13 @@ const ChannelsPage = () => { } else { setIsPollingActive(false); } - }, [refreshInterval, fetchChannelStats]); + }, [refreshInterval, fetchChannelStats, fetchVODStats]); // Fetch initial stats on component mount (for immediate data when navigating to page) useEffect(() => { fetchChannelStats(); - }, [fetchChannelStats]); + fetchVODStats(); + }, [fetchChannelStats, fetchVODStats]); useEffect(() => { console.log('Processing channel stats:', channelStats); @@ -967,7 +1299,7 @@ const ChannelsPage = () => { Active Streams - Automatic Refresh Interval (seconds): + Refresh Interval (seconds): setRefreshIntervalSeconds(value || 0)} @@ -991,7 +1323,10 @@ const ChannelsPage = () => {