From 8f8b4ef371d2c657e190bdaf761cbfcad5048ca5 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 15 Mar 2025 14:35:34 -0500 Subject: [PATCH] More refractoring and slight modifications. --- apps/proxy/config.py | 2 +- apps/proxy/ts_proxy/stream_manager.py | 47 +++++-- apps/proxy/ts_proxy/views.py | 174 +++++++++++++------------- 3 files changed, 123 insertions(+), 100 deletions(-) diff --git a/apps/proxy/config.py b/apps/proxy/config.py index acddc39d..4c77792f 100644 --- a/apps/proxy/config.py +++ b/apps/proxy/config.py @@ -1,7 +1,7 @@ """Shared configuration between proxy types""" class BaseConfig: - DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20' + DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20' # Will only be used if connection to settings fail CHUNK_SIZE = 8192 CLIENT_POLL_INTERVAL = 0.1 MAX_RETRIES = 3 diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index b6b98634..7de2d83f 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -69,6 +69,9 @@ class StreamManager: def run(self): """Main execution loop using HTTP streaming with improved connection handling""" + # Add a stop flag to the class properties + self.stop_requested = False + try: # Start health monitor thread health_thread = threading.Thread(target=self._monitor_health, daemon=True) @@ -78,6 +81,8 @@ class StreamManager: while self.running: if len(self.transcode_cmd) > 0: + # Start command process for transcoding + logger.debug(f"Starting transcode process: {self.transcode_cmd}") self.transcode_process = subprocess.Popen( self.transcode_cmd, stdout=subprocess.PIPE, @@ -101,6 +106,8 @@ class StreamManager: time.sleep(0.1) else: try: + # Using direct HTTP streaming + logger.debug(f"Using TS Proxy to connect to stream: {self.url}") # Create new session for each connection attempt session = self._create_session() self.current_session = session @@ -125,7 +132,9 @@ class StreamManager: try: chunk_count = 0 for chunk in response.iter_content(chunk_size=self.chunk_size): - if not self.running: + # Check if we've been asked to stop + if self.stop_requested: + logger.info(f"Stream loop for channel {self.channel_id} stopping due to request") break if chunk: @@ -140,7 +149,15 @@ class StreamManager: if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) - except AttributeError as e: + except (AttributeError, ConnectionError) as e: + if self.stop_requested: + # This is expected during shutdown, just log at debug level + logger.debug(f"Expected connection error during shutdown: {e}") + else: + # Unexpected error during normal operation + logger.error(f"Unexpected stream error: {e}") + # Handle the error appropriately + except Exception as e: # Handle the specific 'NoneType' object has no attribute 'read' error if "'NoneType' object has no attribute 'read'" in str(e): logger.warning(f"Connection closed by server (read {chunk_count} chunks before disconnect)") @@ -215,22 +232,26 @@ class StreamManager: logger.info("Stream manager stopped") def stop(self): - """Stop the stream manager and clean up resources""" - self.running = False - - if self.current_response: + """Stop this stream""" + # Set the flag first + self.stop_requested = True + + # Close any active response connection + if hasattr(self, 'current_response') and self.current_response: # CORRECT NAME try: - self.current_response.close() - except: + self.current_response.close() # CORRECT NAME + except Exception: pass - - if self.current_session: + + # Also close the session + if hasattr(self, 'current_session') and self.current_session: try: self.current_session.close() - except: + except Exception: pass - - logger.info("Stream manager resources released") + + # Set running to false to ensure thread exits + self.running = False def update_url(self, new_url): """Update stream URL and reconnect with HTTP streaming approach""" diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 881406ac..922f462a 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -21,7 +21,7 @@ logger = logging.getLogger("ts_proxy") @require_GET def stream_ts(request, channel_id): - """Stream TS data to client with integrated channel initialization""" + """Stream TS data to client with immediate response and keep-alive packets during initialization""" client_user_agent = None logger.info(f"Fetching channel ID {channel_id}") channel = get_object_or_404(Channel, pk=channel_id) @@ -38,9 +38,11 @@ def stream_ts(request, channel_id): logger.debug(f"[{client_id}] Client connected with user agent: {client_user_agent}") break - # Check if channel exists or initialize it + # Start initialization if needed + channel_initializing = False if not proxy_server.check_if_channel_exists(channel_id): - logger.info(f"[{client_id}] Channel {channel_id} needs initialization") + # Initialize the channel (but don't wait for completion) + logger.info(f"[{client_id}] Starting channel {channel_id} initialization") # Get stream details from channel model stream_id, profile_id = channel.get_stream() @@ -99,44 +101,11 @@ def stream_ts(request, channel_id): time.sleep(0.1) logger.info(f"[{client_id}] Successfully initialized channel {channel_id}") - - # Wait for channel to become ready if it's initializing - if proxy_server.redis_client: - wait_start = time.time() - max_wait = getattr(Config, 'CLIENT_WAIT_TIMEOUT', 30) # Maximum wait time in seconds - - # Check channel state - metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - waiting = True - - while waiting and time.time() - wait_start < max_wait: - metadata = proxy_server.redis_client.hgetall(metadata_key) - if not metadata or b'state' not in metadata: - logger.warning(f"[{client_id}] Channel {channel_id} metadata missing") - break - - state = metadata[b'state'].decode('utf-8') - - # If channel is ready for clients, continue - if state in ['waiting_for_clients', 'active']: - logger.info(f"[{client_id}] Channel {channel_id} ready (state={state}), proceeding with connection") - waiting = False - elif state in ['initializing', 'connecting']: - # Channel is still initializing or connecting, wait a bit longer - elapsed = time.time() - wait_start - logger.info(f"[{client_id}] Waiting for channel {channel_id} to become ready ({elapsed:.1f}s), current state: {state}") - time.sleep(0.5) # Wait 500ms before checking again - else: - # Unknown or error state - logger.warning(f"[{client_id}] Channel {channel_id} in unexpected state: {state}") - break - - # Check if we timed out waiting - if waiting and time.time() - wait_start >= max_wait: - logger.warning(f"[{client_id}] Timeout waiting for channel {channel_id} to become ready") - return JsonResponse({'error': 'Timeout waiting for channel to initialize'}, status=503) - - # CRITICAL FIX: Ensure local resources are properly initialized before streaming + channel_initializing = True + logger.info(f"[{client_id}] Channel {channel_id} initialization started") + + # Register client - can do this regardless of initialization state + # Create local resources if needed if channel_id not in proxy_server.stream_buffers or channel_id not in proxy_server.client_managers: logger.warning(f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now") @@ -161,62 +130,94 @@ def stream_ts(request, channel_id): return JsonResponse({'error': 'Failed to initialize channel locally'}, status=500) logger.info(f"[{client_id}] Successfully initialized channel {channel_id} locally") - - # Get stream buffer and client manager + + # Register client buffer = proxy_server.stream_buffers[channel_id] client_manager = proxy_server.client_managers[channel_id] client_manager.add_client(client_id, client_user_agent) logger.info(f"[{client_id}] Client registered with channel {channel_id}") - # Start stream response + # Define a single generate function def generate(): stream_start_time = time.time() bytes_sent = 0 chunks_sent = 0 - + + # Keep track of initialization state + initialization_start = time.time() + max_init_wait = getattr(Config, 'CLIENT_WAIT_TIMEOUT', 30) + channel_ready = not channel_initializing + keepalive_interval = 0.5 + last_keepalive = 0 + try: - - logger.info(f"[{client_id}] New client connected to channel {channel_id} with user agent: {client_user_agent}") - - # Add client to manager with user agent - client_manager = proxy_server.client_managers[channel_id] - client_count = client_manager.add_client(client_id, client_user_agent) - - # If this is the first client, try to acquire ownership - if client_count == 1 and not proxy_server.am_i_owner(channel_id): - if proxy_server.try_acquire_ownership(channel_id): - logger.info(f"[{client_id}] First client, acquiring channel ownership") - - # Get channel metadata from Redis + logger.info(f"[{client_id}] Stream generator started, channel_ready={channel_ready}") + + # Wait for initialization to complete if needed + if not channel_ready: + # While init is happening, send keepalive packets + while time.time() - initialization_start < max_init_wait: + # Check if initialization has completed if proxy_server.redis_client: metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - url_bytes = proxy_server.redis_client.hget(metadata_key, "url") - ua_bytes = proxy_server.redis_client.hget(metadata_key, "user_agent") - - url = url_bytes.decode('utf-8') if url_bytes else None - user_agent = ua_bytes.decode('utf-8') if ua_bytes else None - - if url: - # Create and start stream connection - from .stream_manager import StreamManager - - logger.info(f"[{client_id}] Creating stream connection for URL: {url}") - buffer = proxy_server.stream_buffers[channel_id] - - stream_manager = StreamManager(url, buffer, user_agent=user_agent) - proxy_server.stream_managers[channel_id] = stream_manager - - thread = threading.Thread(target=stream_manager.run, daemon=True) - thread.name = f"stream-{channel_id}" - thread.start() - - # Wait briefly for connection - wait_start = time.time() - while not stream_manager.connected: - if time.time() - wait_start > Config.CONNECTION_TIMEOUT: - break - time.sleep(0.1) - + metadata = proxy_server.redis_client.hgetall(metadata_key) + + if metadata and b'state' in metadata: + state = metadata[b'state'].decode('utf-8') + if state in ['waiting_for_clients', 'active']: + logger.info(f"[{client_id}] Channel {channel_id} now ready (state={state})") + channel_ready = True + break + elif state in ['error', 'stopped']: + error_message = metadata.get(b'error_message', b'Unknown error').decode('utf-8') + logger.error(f"[{client_id}] Channel {channel_id} in error state: {state}, message: {error_message}") + # Send error in a comment TS packet before giving up + error_packet = bytearray(188) + error_packet[0] = 0x47 # Sync byte + error_packet[1] = 0x1F # PID high bits + error_packet[2] = 0xFF # PID low bits + error_msg = f"Error: {error_message}".encode('utf-8') + error_packet[4:4+min(len(error_msg), 180)] = error_msg[:180] + yield bytes(error_packet) + return + else: + # Still initializing - send keepalive if needed + if time.time() - last_keepalive >= keepalive_interval: + keepalive_packet = bytearray(188) + keepalive_packet[0] = 0x47 # Sync byte + keepalive_packet[1] = 0x1F # PID high bits (null packet) + keepalive_packet[2] = 0xFF # PID low bits (null packet) + + # Add status info in packet payload (will be ignored by players) + status_msg = f"Initializing: {state}".encode('utf-8') + keepalive_packet[4:4+min(len(status_msg), 180)] = status_msg[:180] + + logger.debug(f"[{client_id}] Sending keepalive packet during initialization, state={state}") + yield bytes(keepalive_packet) + bytes_sent += len(keepalive_packet) + last_keepalive = time.time() + + # Wait a bit before checking again (don't send too many keepalives) + time.sleep(0.1) + + # Check if we timed out waiting + if not channel_ready: + logger.warning(f"[{client_id}] Timed out waiting for initialization") + error_packet = bytearray(188) + error_packet[0] = 0x47 # Sync byte + error_packet[1] = 0x1F # PID high bits + error_packet[2] = 0xFF # PID low bits + error_msg = f"Error: Initialization timeout".encode('utf-8') + error_packet[4:4+min(len(error_msg), 180)] = error_msg[:180] + yield bytes(error_packet) + return + + # Channel is now ready - original streaming code goes here + logger.info(f"[{client_id}] Channel {channel_id} ready, starting normal streaming") + + # Reset start time for real streaming + stream_start_time = time.time() + # Get buffer - stream manager may not exist in this worker buffer = proxy_server.stream_buffers.get(channel_id) stream_manager = proxy_server.stream_managers.get(channel_id) @@ -384,12 +385,13 @@ def stream_ts(request, channel_id): shutdown_thread.daemon = True shutdown_thread.start() + # IMPORTANT: Return the StreamingHttpResponse from the main function response = StreamingHttpResponse( streaming_content=generate(), content_type='video/mp2t' ) response['Cache-Control'] = 'no-cache' - return response + return response # This now properly returns from stream_ts except Exception as e: logger.error(f"Error in stream_ts: {e}", exc_info=True)