From 85bb7aa0ec96f221016ac481efd57a4b1efea481 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 15 Mar 2025 11:44:59 -0500 Subject: [PATCH] Even more refractoring. --- apps/proxy/ts_proxy/__init__.py | 4 +- apps/proxy/ts_proxy/channel_status.py | 252 ++++++++++++++++++++++++++ apps/proxy/ts_proxy/stream_buffer.py | 1 + apps/proxy/ts_proxy/views.py | 251 +------------------------ 4 files changed, 259 insertions(+), 249 deletions(-) create mode 100644 apps/proxy/ts_proxy/channel_status.py diff --git a/apps/proxy/ts_proxy/__init__.py b/apps/proxy/ts_proxy/__init__.py index c4c9665f..2f48eb4b 100644 --- a/apps/proxy/ts_proxy/__init__.py +++ b/apps/proxy/ts_proxy/__init__.py @@ -1,9 +1,9 @@ """Transport Stream proxy module""" +# Only class imports, no instance creation from .server import ProxyServer from .stream_manager import StreamManager from .stream_buffer import StreamBuffer from .client_manager import ClientManager -# Create the singleton instance -proxy_server = ProxyServer() \ No newline at end of file +proxy_server = ProxyServer() diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py new file mode 100644 index 00000000..0fba940b --- /dev/null +++ b/apps/proxy/ts_proxy/channel_status.py @@ -0,0 +1,252 @@ +import logging +import time +import re +from . import proxy_server + +logger = logging.getLogger("ts_proxy") + +class ChannelStatus: + + def get_detailed_channel_info(channel_id): + # Get channel metadata + metadata_key = f"ts_proxy:channel:{channel_id}:metadata" + metadata = proxy_server.redis_client.hgetall(metadata_key) + + if not metadata: + return None + + # Get detailed info - existing implementation + # Get channel metadata + metadata_key = f"ts_proxy:channel:{channel_id}:metadata" + metadata = proxy_server.redis_client.hgetall(metadata_key) + + if not metadata: + return None + + # Basic channel info + buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index") + + info = { + 'channel_id': channel_id, + 'state': metadata.get(b'state', b'unknown').decode('utf-8'), + 'url': metadata.get(b'url', b'').decode('utf-8'), + 'created_at': metadata.get(b'created_at', b'0').decode('utf-8'), + 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), + + # Properly decode the buffer index value + 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + } + + # Add timing information + if b'state_changed_at' in metadata: + state_changed_at = float(metadata[b'state_changed_at'].decode('utf-8')) + info['state_changed_at'] = state_changed_at + info['state_duration'] = time.time() - state_changed_at + + if b'created_at' in metadata: + created_at = float(metadata[b'created_at'].decode('utf-8')) + info['created_at'] = created_at + info['uptime'] = time.time() - created_at + + # Get client information + client_set_key = f"ts_proxy:channel:{channel_id}:clients" + client_ids = proxy_server.redis_client.smembers(client_set_key) + clients = [] + + for client_id in client_ids: + client_id_str = client_id.decode('utf-8') + client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" + client_data = proxy_server.redis_client.hgetall(client_key) + + if client_data: + client_info = { + 'client_id': client_id_str, + 'user_agent': client_data.get(b'user_agent', b'unknown').decode('utf-8'), + 'worker_id': client_data.get(b'worker_id', b'unknown').decode('utf-8'), + } + + if b'connected_at' in client_data: + connected_at = float(client_data[b'connected_at'].decode('utf-8')) + client_info['connected_at'] = connected_at + client_info['connection_duration'] = time.time() - connected_at + + if b'last_active' in client_data: + last_active = float(client_data[b'last_active'].decode('utf-8')) + client_info['last_active'] = last_active + client_info['last_active_ago'] = time.time() - last_active + + clients.append(client_info) + + info['clients'] = clients + info['client_count'] = len(clients) + + # Get buffer health with improved diagnostics + buffer_stats = { + 'chunks': info['buffer_index'], + 'diagnostics': {} + } + + # Sample a few recent chunks to check sizes with better error handling + if info['buffer_index'] > 0: + try: + sample_chunks = min(5, info['buffer_index']) + chunk_sizes = [] + chunk_keys_found = [] + chunk_keys_missing = [] + + # Check if the keys exist before getting + for i in range(info['buffer_index']-sample_chunks+1, info['buffer_index']+1): + chunk_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{i}" + + # Check if key exists first + if proxy_server.redis_client.exists(chunk_key): + chunk_data = proxy_server.redis_client.get(chunk_key) + if chunk_data: + chunk_size = len(chunk_data) + chunk_sizes.append(chunk_size) + chunk_keys_found.append(i) + + # Check for TS alignment (packets are 188 bytes) + ts_packets = chunk_size // 188 + ts_aligned = chunk_size % 188 == 0 + + # Add for first chunk only to avoid too much data + if len(chunk_keys_found) == 1: + buffer_stats['diagnostics']['first_chunk'] = { + 'index': i, + 'size': chunk_size, + 'ts_packets': ts_packets, + 'aligned': ts_aligned, + 'first_byte': chunk_data[0] if chunk_size > 0 else None + } + else: + chunk_keys_missing.append(i) + + # Add detailed diagnostics + if chunk_sizes: + buffer_stats['avg_chunk_size'] = sum(chunk_sizes) / len(chunk_sizes) + buffer_stats['recent_chunk_sizes'] = chunk_sizes + buffer_stats['keys_found'] = chunk_keys_found + buffer_stats['keys_missing'] = chunk_keys_missing + + # Calculate data rate + total_data = sum(chunk_sizes) + buffer_stats['total_sample_bytes'] = total_data + + # Add TS packet analysis + total_ts_packets = total_data // 188 + buffer_stats['estimated_ts_packets'] = total_ts_packets + buffer_stats['is_ts_aligned'] = all(size % 188 == 0 for size in chunk_sizes) + else: + # If no chunks found, scan for keys to help debug + all_buffer_keys = [] + cursor = 0 + + buffer_key_pattern = f"ts_proxy:channel:{channel_id}:buffer:chunk:*" + + while True: + cursor, keys = proxy_server.redis_client.scan(cursor, match=buffer_key_pattern, count=100) + if keys: + all_buffer_keys.extend([k.decode('utf-8') for k in keys]) + if cursor == 0 or len(all_buffer_keys) >= 20: # Limit to 20 keys + break + + buffer_stats['diagnostics']['all_buffer_keys'] = all_buffer_keys[:20] # First 20 keys + buffer_stats['diagnostics']['total_buffer_keys'] = len(all_buffer_keys) + + except Exception as e: + # Capture any errors for diagnostics + buffer_stats['error'] = str(e) + buffer_stats['diagnostics']['exception'] = str(e) + + # Add TTL information to see if chunks are expiring + chunk_ttl_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{info['buffer_index']}" + chunk_ttl = proxy_server.redis_client.ttl(chunk_ttl_key) + buffer_stats['latest_chunk_ttl'] = chunk_ttl + + info['buffer_stats'] = buffer_stats + + # Get local worker info if available + if channel_id in proxy_server.stream_managers: + manager = proxy_server.stream_managers[channel_id] + info['local_manager'] = { + 'healthy': manager.healthy, + 'connected': manager.connected, + 'last_data_time': manager.last_data_time, + 'last_data_age': time.time() - manager.last_data_time + } + + return info + + # Function for basic channel info (used for all channels summary) + def get_basic_channel_info(channel_id): + # Get channel metadata + metadata_key = f"ts_proxy:channel:{channel_id}:metadata" + metadata = proxy_server.redis_client.hgetall(metadata_key) + + if not metadata: + return None + + # Basic channel info only - omit diagnostics and details + buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index") + + # Count clients (using efficient count method) + client_set_key = f"ts_proxy:channel:{channel_id}:clients" + client_count = proxy_server.redis_client.scard(client_set_key) or 0 + + # Calculate uptime + created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) + uptime = time.time() - created_at if created_at > 0 else 0 + + # Simplified info + info = { + 'channel_id': channel_id, + 'state': metadata.get(b'state', b'unknown').decode('utf-8'), + 'url': metadata.get(b'url', b'').decode('utf-8'), + 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), + 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + 'client_count': client_count, + 'uptime': uptime + } + + # Quick health check if available locally + if channel_id in proxy_server.stream_managers: + manager = proxy_server.stream_managers[channel_id] + info['healthy'] = manager.healthy + + # Get concise client information + clients = [] + client_set_key = f"ts_proxy:channel:{channel_id}:clients" + client_ids = proxy_server.redis_client.smembers(client_set_key) + + # Process only if we have clients and keep it limited + if client_ids: + # Get up to 10 clients for the basic view + for client_id in list(client_ids)[:10]: + client_id_str = client_id.decode('utf-8') + client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" + + # Efficient way - just retrieve the essentials + client_info = { + 'client_id': client_id_str, + 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent') + } + + if client_info['user_agent']: + client_info['user_agent'] = client_info['user_agent'].decode('utf-8') + else: + client_info['user_agent'] = 'unknown' + + # Just get connected_at for client age + connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') + if connected_at_bytes: + connected_at = float(connected_at_bytes.decode('utf-8')) + client_info['connected_since'] = time.time() - connected_at + + clients.append(client_info) + + # Add clients to info + info['clients'] = clients + + return info + diff --git a/apps/proxy/ts_proxy/stream_buffer.py b/apps/proxy/ts_proxy/stream_buffer.py index fe097af7..dc1e2101 100644 --- a/apps/proxy/ts_proxy/stream_buffer.py +++ b/apps/proxy/ts_proxy/stream_buffer.py @@ -5,6 +5,7 @@ import logging import time from collections import deque from typing import Optional, Deque +import random from apps.proxy.config import TSConfig as Config logger = logging.getLogger("ts_proxy") diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index cb21a218..0d434c88 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -9,6 +9,7 @@ from django.views.decorators.http import require_http_methods, require_GET from django.shortcuts import get_object_or_404 from apps.proxy.config import TSConfig as Config from . import proxy_server +from .channel_status import ChannelStatus import logging from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile @@ -543,256 +544,12 @@ def channel_status(request, channel_id=None): try: # Check if Redis is available if not proxy_server.redis_client: - return JsonResponse({'error': 'Redis connection not available'}, status=500) - - # Function for detailed channel info (used when channel_id is provided) - def get_detailed_channel_info(channel_id): - # Get channel metadata - metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - metadata = proxy_server.redis_client.hgetall(metadata_key) - - if not metadata: - return None - - # Get detailed info - existing implementation - # Get channel metadata - metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - metadata = proxy_server.redis_client.hgetall(metadata_key) - - if not metadata: - return None - - # Basic channel info - buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index") - - info = { - 'channel_id': channel_id, - 'state': metadata.get(b'state', b'unknown').decode('utf-8'), - 'url': metadata.get(b'url', b'').decode('utf-8'), - 'created_at': metadata.get(b'created_at', b'0').decode('utf-8'), - 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), - - # Properly decode the buffer index value - 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, - } - - # Add timing information - if b'state_changed_at' in metadata: - state_changed_at = float(metadata[b'state_changed_at'].decode('utf-8')) - info['state_changed_at'] = state_changed_at - info['state_duration'] = time.time() - state_changed_at - - if b'created_at' in metadata: - created_at = float(metadata[b'created_at'].decode('utf-8')) - info['created_at'] = created_at - info['uptime'] = time.time() - created_at - - # Get client information - client_set_key = f"ts_proxy:channel:{channel_id}:clients" - client_ids = proxy_server.redis_client.smembers(client_set_key) - clients = [] - - for client_id in client_ids: - client_id_str = client_id.decode('utf-8') - client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" - client_data = proxy_server.redis_client.hgetall(client_key) - - if client_data: - client_info = { - 'client_id': client_id_str, - 'user_agent': client_data.get(b'user_agent', b'unknown').decode('utf-8'), - 'worker_id': client_data.get(b'worker_id', b'unknown').decode('utf-8'), - } - - if b'connected_at' in client_data: - connected_at = float(client_data[b'connected_at'].decode('utf-8')) - client_info['connected_at'] = connected_at - client_info['connection_duration'] = time.time() - connected_at - - if b'last_active' in client_data: - last_active = float(client_data[b'last_active'].decode('utf-8')) - client_info['last_active'] = last_active - client_info['last_active_ago'] = time.time() - last_active - - clients.append(client_info) - - info['clients'] = clients - info['client_count'] = len(clients) - - # Get buffer health with improved diagnostics - buffer_stats = { - 'chunks': info['buffer_index'], - 'diagnostics': {} - } - - # Sample a few recent chunks to check sizes with better error handling - if info['buffer_index'] > 0: - try: - sample_chunks = min(5, info['buffer_index']) - chunk_sizes = [] - chunk_keys_found = [] - chunk_keys_missing = [] - - # Check if the keys exist before getting - for i in range(info['buffer_index']-sample_chunks+1, info['buffer_index']+1): - chunk_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{i}" - - # Check if key exists first - if proxy_server.redis_client.exists(chunk_key): - chunk_data = proxy_server.redis_client.get(chunk_key) - if chunk_data: - chunk_size = len(chunk_data) - chunk_sizes.append(chunk_size) - chunk_keys_found.append(i) - - # Check for TS alignment (packets are 188 bytes) - ts_packets = chunk_size // 188 - ts_aligned = chunk_size % 188 == 0 - - # Add for first chunk only to avoid too much data - if len(chunk_keys_found) == 1: - buffer_stats['diagnostics']['first_chunk'] = { - 'index': i, - 'size': chunk_size, - 'ts_packets': ts_packets, - 'aligned': ts_aligned, - 'first_byte': chunk_data[0] if chunk_size > 0 else None - } - else: - chunk_keys_missing.append(i) - - # Add detailed diagnostics - if chunk_sizes: - buffer_stats['avg_chunk_size'] = sum(chunk_sizes) / len(chunk_sizes) - buffer_stats['recent_chunk_sizes'] = chunk_sizes - buffer_stats['keys_found'] = chunk_keys_found - buffer_stats['keys_missing'] = chunk_keys_missing - - # Calculate data rate - total_data = sum(chunk_sizes) - buffer_stats['total_sample_bytes'] = total_data - - # Add TS packet analysis - total_ts_packets = total_data // 188 - buffer_stats['estimated_ts_packets'] = total_ts_packets - buffer_stats['is_ts_aligned'] = all(size % 188 == 0 for size in chunk_sizes) - else: - # If no chunks found, scan for keys to help debug - all_buffer_keys = [] - cursor = 0 - - buffer_key_pattern = f"ts_proxy:channel:{channel_id}:buffer:chunk:*" - - while True: - cursor, keys = proxy_server.redis_client.scan(cursor, match=buffer_key_pattern, count=100) - if keys: - all_buffer_keys.extend([k.decode('utf-8') for k in keys]) - if cursor == 0 or len(all_buffer_keys) >= 20: # Limit to 20 keys - break - - buffer_stats['diagnostics']['all_buffer_keys'] = all_buffer_keys[:20] # First 20 keys - buffer_stats['diagnostics']['total_buffer_keys'] = len(all_buffer_keys) - - except Exception as e: - # Capture any errors for diagnostics - buffer_stats['error'] = str(e) - buffer_stats['diagnostics']['exception'] = str(e) - - # Add TTL information to see if chunks are expiring - chunk_ttl_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{info['buffer_index']}" - chunk_ttl = proxy_server.redis_client.ttl(chunk_ttl_key) - buffer_stats['latest_chunk_ttl'] = chunk_ttl - - info['buffer_stats'] = buffer_stats - - # Get local worker info if available - if channel_id in proxy_server.stream_managers: - manager = proxy_server.stream_managers[channel_id] - info['local_manager'] = { - 'healthy': manager.healthy, - 'connected': manager.connected, - 'last_data_time': manager.last_data_time, - 'last_data_age': time.time() - manager.last_data_time - } - - return info - - # Function for basic channel info (used for all channels summary) - def get_basic_channel_info(channel_id): - # Get channel metadata - metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - metadata = proxy_server.redis_client.hgetall(metadata_key) - - if not metadata: - return None - - # Basic channel info only - omit diagnostics and details - buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index") - - # Count clients (using efficient count method) - client_set_key = f"ts_proxy:channel:{channel_id}:clients" - client_count = proxy_server.redis_client.scard(client_set_key) or 0 - - # Calculate uptime - created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) - uptime = time.time() - created_at if created_at > 0 else 0 - - # Simplified info - info = { - 'channel_id': channel_id, - 'state': metadata.get(b'state', b'unknown').decode('utf-8'), - 'url': metadata.get(b'url', b'').decode('utf-8'), - 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), - 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, - 'client_count': client_count, - 'uptime': uptime - } - - # Quick health check if available locally - if channel_id in proxy_server.stream_managers: - manager = proxy_server.stream_managers[channel_id] - info['healthy'] = manager.healthy - - # Get concise client information - clients = [] - client_set_key = f"ts_proxy:channel:{channel_id}:clients" - client_ids = proxy_server.redis_client.smembers(client_set_key) - - # Process only if we have clients and keep it limited - if client_ids: - # Get up to 10 clients for the basic view - for client_id in list(client_ids)[:10]: - client_id_str = client_id.decode('utf-8') - client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" - - # Efficient way - just retrieve the essentials - client_info = { - 'client_id': client_id_str, - 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent') - } - - if client_info['user_agent']: - client_info['user_agent'] = client_info['user_agent'].decode('utf-8') - else: - client_info['user_agent'] = 'unknown' - - # Just get connected_at for client age - connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') - if connected_at_bytes: - connected_at = float(connected_at_bytes.decode('utf-8')) - client_info['connected_since'] = time.time() - connected_at - - clients.append(client_info) - - # Add clients to info - info['clients'] = clients - - return info + return JsonResponse({'error': 'Redis connection not available'}, status=500) # Handle single channel or all channels if channel_id: # Detailed info for specific channel - channel_info = get_detailed_channel_info(channel_id) + channel_info = ChannelStatus.get_detailed_channel_info(channel_id) if channel_info: return JsonResponse(channel_info) else: @@ -810,7 +567,7 @@ def channel_status(request, channel_id=None): channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8')) if channel_id_match: ch_id = channel_id_match.group(1) - channel_info = get_basic_channel_info(ch_id) + channel_info = ChannelStatus.get_basic_channel_info(ch_id) if channel_info: all_channels.append(channel_info)