From d953386c574b0cac76973db56b8761e03983c1e2 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 13 Mar 2025 18:06:32 -0500 Subject: [PATCH 1/3] Code cleanup. --- apps/proxy/ts_proxy/server.py | 179 ++-------------------------------- 1 file changed, 7 insertions(+), 172 deletions(-) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 9129f4df..a4a89586 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -235,21 +235,14 @@ class StreamManager: while self.running: try: now = time.time() - if now - self.last_data_time > 10 and self.connected: - # No data for 10 seconds, mark as unhealthy + if now - self.last_data_time > getattr(Config.CONNECTION_TIMEOUT,10) and self.connected: + # Mark unhealthy if no data for too long if self.healthy: - logger.warning("Stream health check: No data received for 10+ seconds") + logger.warning(f"Stream unhealthy - no data for {now - self.last_data_time:.1f}s") self.healthy = False - - # After 30 seconds with no data, force reconnection - if now - self.last_data_time > 30: - logger.warning("Stream appears dead, forcing reconnection") - self._close_socket() - self.connected = False - self.last_data_time = time.time() # Reset timer for the reconnect elif self.connected and not self.healthy: - # Stream is receiving data again after being unhealthy - logger.info("Stream health restored, receiving data again") + # Auto-recover health when data resumes + logger.info("Stream health restored") self.healthy = True except Exception as e: @@ -279,53 +272,7 @@ class StreamManager: def _close_socket(self): """Backward compatibility wrapper for _close_connection""" return self._close_connection() - - def fetch_chunk(self): - """Fetch data from socket with direct pass-through to buffer""" - if not self.connected or not self.socket: - return False - - try: - # Read data chunk - no need to align with TS packet size anymore - try: - # Try to read data chunk - if hasattr(self.socket, 'recv'): - chunk = self.socket.recv(Config.CHUNK_SIZE) # Standard socket - else: - chunk = self.socket.read(Config.CHUNK_SIZE) # SocketIO object - - except AttributeError: - # Fall back to read() if recv() isn't available - chunk = self.socket.read(Config.CHUNK_SIZE) - - if not chunk: - # Connection closed by server - logger.warning("Server closed connection") - self._close_socket() - self.connected = False - return False - - # Add directly to buffer without TS-specific processing - success = self.buffer.add_chunk(chunk) - - # Update last data timestamp in Redis if successful - if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: - last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" - self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) - - return True - - except (socket.timeout, socket.error) as e: - # Socket error - logger.error(f"Socket error: {e}") - self._close_socket() - self.connected = False - return False - - except Exception as e: - logger.error(f"Error in fetch_chunk: {e}") - return False - + def _set_waiting_for_clients(self): """Set channel state to waiting for clients after successful connection""" try: @@ -367,36 +314,6 @@ class StreamManager: except Exception as e: logger.error(f"Error setting waiting for clients state: {e}") - def _read_stream(self): - """Read from stream with minimal processing""" - try: - # Read up to CHUNK_SIZE bytes - chunk = self.sock.recv(self.CHUNK_SIZE) - - if not chunk: - # Connection closed - logger.debug("Connection closed by remote host") - return False - - # If we got data, just add it directly to the buffer - if chunk: - success = self.buffer.add_chunk(chunk) - - # Update last data timestamp in Redis if successful - if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: - last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" - self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) # 1 minute expiry - - return success - return True - - except socket.timeout: - # Expected timeout - no data available - return True - except Exception as e: - # Error reading from socket - logger.error(f"Error reading from stream: {e}") - return False class StreamBuffer: """Manages stream data buffering with optimized chunk storage""" @@ -915,88 +832,6 @@ class ClientManager: except Exception as e: logger.error(f"Error refreshing client TTL: {e}") -class StreamFetcher: - """Handles stream data fetching""" - - def __init__(self, manager: StreamManager, buffer: StreamBuffer): - self.manager = manager - self.buffer = buffer - - def fetch_loop(self) -> None: - """Main fetch loop for stream data""" - while self.manager.running: - try: - if not self._handle_connection(): - continue - - with self.manager.session.get(self.manager.url, stream=True) as response: - if response.status_code == 200: - self._handle_successful_connection() - self._process_stream(response) - - except requests.exceptions.RequestException as e: - self._handle_connection_error(e) - - def _handle_connection(self) -> bool: - """Handle connection state and retries""" - if not self.manager.connected: - if not self.manager.should_retry(): - logger.error(f"Failed to connect after {self.manager.max_retries} attempts") - return False - - if not self.manager.running: - return False - - self.manager.retry_count += 1 - logger.info(f"Connecting to stream: {self.manager.url} " - f"(attempt {self.manager.retry_count}/{self.manager.max_retries})") - return True - - def _handle_successful_connection(self) -> None: - """Handle successful stream connection""" - if not self.manager.connected: - logger.info("Stream connected successfully") - self.manager.connected = True - self.manager.retry_count = 0 - - def _process_stream(self, response: requests.Response) -> None: - """Process incoming stream data""" - for chunk in response.iter_content(chunk_size=Config.CHUNK_SIZE): - if not self.manager.running: - logger.info("Stream fetch stopped - shutting down") - return - - if chunk: - if self.manager.ready_event.is_set(): - logger.info("Stream switch in progress, closing connection") - self.manager.ready_event.clear() - break - - with self.buffer.lock: - self.buffer.buffer.append(chunk) - self.buffer.index += 1 - - def _handle_connection_error(self, error: Exception) -> None: - """Handle stream connection errors""" - logger.error(f"Stream connection error: {error}") - self.manager.connected = False - - if not self.manager.running: - return - - logger.info(f"Attempting to reconnect in {Config.RECONNECT_DELAY} seconds...") - if not wait_for_running(self.manager, Config.RECONNECT_DELAY): - return - -def wait_for_running(manager: StreamManager, delay: float) -> bool: - """Wait while checking manager running state""" - start = time.time() - while time.time() - start < delay: - if not manager.running: - return False - threading.Event().wait(0.1) - return True - class ProxyServer: """Manages TS proxy server instance with worker coordination""" @@ -1662,9 +1497,9 @@ class ProxyServer: all_keys = self.redis_client.keys(channel_pattern) if all_keys: + # Delete all matching keys in bulk self.redis_client.delete(*all_keys) logger.info(f"Cleaned up {len(all_keys)} Redis keys for channel {channel_id}") - except Exception as e: logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}") From 387c2491b7a46ae95cf5205fcb58bb8d73761d4d Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 13 Mar 2025 18:34:49 -0500 Subject: [PATCH 2/3] Continuing cleanup. --- apps/proxy/ts_proxy/server.py | 57 +++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index a4a89586..d4f95570 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -125,7 +125,8 @@ class StreamManager: if "'NoneType' object has no attribute 'read'" in str(e): logger.warning(f"Connection closed by server (read {chunk_count} chunks before disconnect)") else: - # Re-raise unexpected AttributeError + # Re-raise unexpected AttributeErrors + logger.error(f"Unexpected AttributeError: {e}") raise else: logger.error(f"Failed to connect to stream: HTTP {response.status_code}") @@ -235,7 +236,7 @@ class StreamManager: while self.running: try: now = time.time() - if now - self.last_data_time > getattr(Config.CONNECTION_TIMEOUT,10) and self.connected: + if now - self.last_data_time > getattr(Config, 'CONNECTION_TIMEOUT', 10) and self.connected: # Mark unhealthy if no data for too long if self.healthy: logger.warning(f"Stream unhealthy - no data for {now - self.last_data_time:.1f}s") @@ -595,22 +596,18 @@ class ClientManager: # Check if client exists in Redis at all exists = self.redis_client.exists(client_key) if not exists: - # Client entry has expired in Redis but still in our local set - logger.warning(f"Found ghost client {client_id} - expired in Redis but still in local set") + logger.debug(f"Client {client_id} no longer exists in Redis, removing locally") clients_to_remove.add(client_id) continue - + # Check for stale activity using last_active field last_active = self.redis_client.hget(client_key, "last_active") if last_active: last_active_time = float(last_active.decode('utf-8')) - time_since_activity = current_time - last_active_time + ghost_timeout = self.heartbeat_interval * getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0) - # If client hasn't been active for too long, mark for removal - # Use configurable threshold for detection - ghost_threshold = getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0) - if time_since_activity > self.heartbeat_interval * ghost_threshold: - logger.warning(f"Detected ghost client {client_id} - last active {time_since_activity:.1f}s ago") + if current_time - last_active_time > ghost_timeout: + logger.debug(f"Client {client_id} inactive for {current_time - last_active_time:.1f}s, removing as ghost") clients_to_remove.add(client_id) # Remove ghost clients in a separate step @@ -631,8 +628,8 @@ class ClientManager: # Skip if we just sent a heartbeat recently if client_id in self.last_heartbeat_time: - time_since_last = current_time - self.last_heartbeat_time[client_id] - if time_since_last < self.heartbeat_interval * 0.8: + time_since_heartbeat = current_time - self.last_heartbeat_time[client_id] + if time_since_heartbeat < self.heartbeat_interval * 0.5: # Only heartbeat at half interval minimum continue # Only update clients that remain @@ -1487,21 +1484,37 @@ class ProxyServer: logger.error(f"Error checking orphaned channels: {e}") def _clean_redis_keys(self, channel_id): - """Clean up all Redis keys for a channel""" + """Clean up all Redis keys for a channel more efficiently""" if not self.redis_client: - return + return 0 try: - # All keys are now under the channel namespace for easy pattern matching - channel_pattern = f"ts_proxy:channel:{channel_id}:*" - all_keys = self.redis_client.keys(channel_pattern) + # Define key patterns to scan for + patterns = [ + f"ts_proxy:channel:{channel_id}:*", # All channel keys + f"ts_proxy:events:{channel_id}" # Event channel + ] + + total_deleted = 0 + + for pattern in patterns: + cursor = 0 + while True: + cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100) + if keys: + self.redis_client.delete(*keys) + total_deleted += len(keys) + + # Exit when cursor returns to 0 + if cursor == 0: + break + + logger.info(f"Cleaned up {total_deleted} Redis keys for channel {channel_id}") + return total_deleted - if all_keys: - # Delete all matching keys in bulk - self.redis_client.delete(*all_keys) - logger.info(f"Cleaned up {len(all_keys)} Redis keys for channel {channel_id}") except Exception as e: logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}") + return 0 def refresh_channel_registry(self): """Refresh TTL for active channels using standard keys""" From 140937a192044e5224b3614742b6f262612d8242 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 13 Mar 2025 20:54:30 -0500 Subject: [PATCH 3/3] Added detailed status for active channels including connected clients. --- apps/proxy/ts_proxy/urls.py | 2 + apps/proxy/ts_proxy/views.py | 291 +++++++++++++++++++++++++++++++++++ 2 files changed, 293 insertions(+) diff --git a/apps/proxy/ts_proxy/urls.py b/apps/proxy/ts_proxy/urls.py index 77c22534..e5e3add6 100644 --- a/apps/proxy/ts_proxy/urls.py +++ b/apps/proxy/ts_proxy/urls.py @@ -7,4 +7,6 @@ urlpatterns = [ path('stream/', views.stream_ts, name='stream'), path('initialize/', views.initialize_stream, name='initialize'), path('change_stream/', views.change_stream, name='change_stream'), + path('status', views.channel_status, name='channel_status'), + path('status/', views.channel_status, name='channel_status_detail'), ] \ No newline at end of file diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 399e93c3..9df7cadd 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -4,6 +4,7 @@ import time import random import sys import os +import re from django.http import StreamingHttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods, require_GET @@ -550,6 +551,296 @@ def change_stream(request, channel_id): logger.error(f"Failed to change stream: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) +@require_GET +def channel_status(request, channel_id=None): + """ + Returns status information about channels with detail level based on request: + - /status/ returns basic summary of all channels + - /status/{channel_id} returns detailed info about specific channel + """ + try: + # Check if Redis is available + if not proxy_server.redis_client: + return JsonResponse({'error': 'Redis connection not available'}, status=500) + + # Function for detailed channel info (used when channel_id is provided) + def get_detailed_channel_info(channel_id): + # Get channel metadata + metadata_key = f"ts_proxy:channel:{channel_id}:metadata" + metadata = proxy_server.redis_client.hgetall(metadata_key) + + if not metadata: + return None + + # Get detailed info - existing implementation + # Get channel metadata + metadata_key = f"ts_proxy:channel:{channel_id}:metadata" + metadata = proxy_server.redis_client.hgetall(metadata_key) + + if not metadata: + return None + + # Basic channel info + buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index") + + info = { + 'channel_id': channel_id, + 'state': metadata.get(b'state', b'unknown').decode('utf-8'), + 'url': metadata.get(b'url', b'').decode('utf-8'), + 'created_at': metadata.get(b'created_at', b'0').decode('utf-8'), + 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), + + # Properly decode the buffer index value + 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + } + + # Add timing information + if b'state_changed_at' in metadata: + state_changed_at = float(metadata[b'state_changed_at'].decode('utf-8')) + info['state_changed_at'] = state_changed_at + info['state_duration'] = time.time() - state_changed_at + + if b'created_at' in metadata: + created_at = float(metadata[b'created_at'].decode('utf-8')) + info['created_at'] = created_at + info['uptime'] = time.time() - created_at + + # Get client information + client_set_key = f"ts_proxy:channel:{channel_id}:clients" + client_ids = proxy_server.redis_client.smembers(client_set_key) + clients = [] + + for client_id in client_ids: + client_id_str = client_id.decode('utf-8') + client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" + client_data = proxy_server.redis_client.hgetall(client_key) + + if client_data: + client_info = { + 'client_id': client_id_str, + 'user_agent': client_data.get(b'user_agent', b'unknown').decode('utf-8'), + 'worker_id': client_data.get(b'worker_id', b'unknown').decode('utf-8'), + } + + if b'connected_at' in client_data: + connected_at = float(client_data[b'connected_at'].decode('utf-8')) + client_info['connected_at'] = connected_at + client_info['connection_duration'] = time.time() - connected_at + + if b'last_active' in client_data: + last_active = float(client_data[b'last_active'].decode('utf-8')) + client_info['last_active'] = last_active + client_info['last_active_ago'] = time.time() - last_active + + clients.append(client_info) + + info['clients'] = clients + info['client_count'] = len(clients) + + # Get buffer health with improved diagnostics + buffer_stats = { + 'chunks': info['buffer_index'], + 'diagnostics': {} + } + + # Sample a few recent chunks to check sizes with better error handling + if info['buffer_index'] > 0: + try: + sample_chunks = min(5, info['buffer_index']) + chunk_sizes = [] + chunk_keys_found = [] + chunk_keys_missing = [] + + # Check if the keys exist before getting + for i in range(info['buffer_index']-sample_chunks+1, info['buffer_index']+1): + chunk_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{i}" + + # Check if key exists first + if proxy_server.redis_client.exists(chunk_key): + chunk_data = proxy_server.redis_client.get(chunk_key) + if chunk_data: + chunk_size = len(chunk_data) + chunk_sizes.append(chunk_size) + chunk_keys_found.append(i) + + # Check for TS alignment (packets are 188 bytes) + ts_packets = chunk_size // 188 + ts_aligned = chunk_size % 188 == 0 + + # Add for first chunk only to avoid too much data + if len(chunk_keys_found) == 1: + buffer_stats['diagnostics']['first_chunk'] = { + 'index': i, + 'size': chunk_size, + 'ts_packets': ts_packets, + 'aligned': ts_aligned, + 'first_byte': chunk_data[0] if chunk_size > 0 else None + } + else: + chunk_keys_missing.append(i) + + # Add detailed diagnostics + if chunk_sizes: + buffer_stats['avg_chunk_size'] = sum(chunk_sizes) / len(chunk_sizes) + buffer_stats['recent_chunk_sizes'] = chunk_sizes + buffer_stats['keys_found'] = chunk_keys_found + buffer_stats['keys_missing'] = chunk_keys_missing + + # Calculate data rate + total_data = sum(chunk_sizes) + buffer_stats['total_sample_bytes'] = total_data + + # Add TS packet analysis + total_ts_packets = total_data // 188 + buffer_stats['estimated_ts_packets'] = total_ts_packets + buffer_stats['is_ts_aligned'] = all(size % 188 == 0 for size in chunk_sizes) + else: + # If no chunks found, scan for keys to help debug + all_buffer_keys = [] + cursor = 0 + + buffer_key_pattern = f"ts_proxy:channel:{channel_id}:buffer:chunk:*" + + while True: + cursor, keys = proxy_server.redis_client.scan(cursor, match=buffer_key_pattern, count=100) + if keys: + all_buffer_keys.extend([k.decode('utf-8') for k in keys]) + if cursor == 0 or len(all_buffer_keys) >= 20: # Limit to 20 keys + break + + buffer_stats['diagnostics']['all_buffer_keys'] = all_buffer_keys[:20] # First 20 keys + buffer_stats['diagnostics']['total_buffer_keys'] = len(all_buffer_keys) + + except Exception as e: + # Capture any errors for diagnostics + buffer_stats['error'] = str(e) + buffer_stats['diagnostics']['exception'] = str(e) + + # Add TTL information to see if chunks are expiring + chunk_ttl_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{info['buffer_index']}" + chunk_ttl = proxy_server.redis_client.ttl(chunk_ttl_key) + buffer_stats['latest_chunk_ttl'] = chunk_ttl + + info['buffer_stats'] = buffer_stats + + # Get local worker info if available + if channel_id in proxy_server.stream_managers: + manager = proxy_server.stream_managers[channel_id] + info['local_manager'] = { + 'healthy': manager.healthy, + 'connected': manager.connected, + 'last_data_time': manager.last_data_time, + 'last_data_age': time.time() - manager.last_data_time + } + + return info + + # Function for basic channel info (used for all channels summary) + def get_basic_channel_info(channel_id): + # Get channel metadata + metadata_key = f"ts_proxy:channel:{channel_id}:metadata" + metadata = proxy_server.redis_client.hgetall(metadata_key) + + if not metadata: + return None + + # Basic channel info only - omit diagnostics and details + buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index") + + # Count clients (using efficient count method) + client_set_key = f"ts_proxy:channel:{channel_id}:clients" + client_count = proxy_server.redis_client.scard(client_set_key) or 0 + + # Calculate uptime + created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) + uptime = time.time() - created_at if created_at > 0 else 0 + + # Simplified info + info = { + 'channel_id': channel_id, + 'state': metadata.get(b'state', b'unknown').decode('utf-8'), + 'url': metadata.get(b'url', b'').decode('utf-8'), + 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), + 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + 'client_count': client_count, + 'uptime': uptime + } + + # Quick health check if available locally + if channel_id in proxy_server.stream_managers: + manager = proxy_server.stream_managers[channel_id] + info['healthy'] = manager.healthy + + # Get concise client information + clients = [] + client_set_key = f"ts_proxy:channel:{channel_id}:clients" + client_ids = proxy_server.redis_client.smembers(client_set_key) + + # Process only if we have clients and keep it limited + if client_ids: + # Get up to 10 clients for the basic view + for client_id in list(client_ids)[:10]: + client_id_str = client_id.decode('utf-8') + client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" + + # Efficient way - just retrieve the essentials + client_info = { + 'client_id': client_id_str, + 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent') + } + + if client_info['user_agent']: + client_info['user_agent'] = client_info['user_agent'].decode('utf-8') + else: + client_info['user_agent'] = 'unknown' + + # Just get connected_at for client age + connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') + if connected_at_bytes: + connected_at = float(connected_at_bytes.decode('utf-8')) + client_info['connected_since'] = time.time() - connected_at + + clients.append(client_info) + + # Add clients to info + info['clients'] = clients + + return info + + # Handle single channel or all channels + if channel_id: + # Detailed info for specific channel + channel_info = get_detailed_channel_info(channel_id) + if channel_info: + return JsonResponse(channel_info) + else: + return JsonResponse({'error': f'Channel {channel_id} not found'}, status=404) + else: + # Basic info for all channels + channel_pattern = "ts_proxy:channel:*:metadata" + all_channels = [] + + # Extract channel IDs from keys + cursor = 0 + while True: + cursor, keys = proxy_server.redis_client.scan(cursor, match=channel_pattern) + for key in keys: + channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8')) + if channel_id_match: + ch_id = channel_id_match.group(1) + channel_info = get_basic_channel_info(ch_id) + if channel_info: + all_channels.append(channel_info) + + if cursor == 0: + break + + return JsonResponse({'channels': all_channels, 'count': len(all_channels)}) + + except Exception as e: + logger.error(f"Error in channel_status: {e}", exc_info=True) + return JsonResponse({'error': str(e)}, status=500) + def get_client_data(buffer, local_index): """Get optimal amount of data for client""" # Define limits