diff --git a/apps/proxy/config.py b/apps/proxy/config.py index 4c77792f..d8fabfd6 100644 --- a/apps/proxy/config.py +++ b/apps/proxy/config.py @@ -29,7 +29,7 @@ class TSConfig(BaseConfig): MAX_RETRIES = 3 # maximum connection retry attempts # Buffer settings - INITIAL_BEHIND_CHUNKS = 4 # How many chunks behind to start a client + INITIAL_BEHIND_CHUNKS = 3 # How many chunks behind to start a client CHUNK_BATCH_SIZE = 5 # How many chunks to fetch in one batch KEEPALIVE_INTERVAL = 0.5 # Seconds between keepalive packets when at buffer head diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index 7de2d83f..44df13c2 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -388,7 +388,7 @@ class StreamManager: return False def _set_waiting_for_clients(self): - """Set channel state to waiting for clients after successful connection""" + """Set channel state to waiting for clients AFTER buffer has enough chunks""" try: if hasattr(self.buffer, 'channel_id') and hasattr(self.buffer, 'redis_client'): channel_id = self.buffer.channel_id @@ -396,8 +396,6 @@ class StreamManager: if channel_id and redis_client: current_time = str(time.time()) - - # SIMPLIFIED: Always use direct Redis update for reliability metadata_key = f"ts_proxy:channel:{channel_id}:metadata" # Check current state first @@ -411,19 +409,68 @@ class StreamManager: # Only update if not already past connecting if not current_state or current_state in ["initializing", "connecting"]: - # Update directly - don't rely on proxy_server reference + # NEW CODE: Check if buffer has enough chunks + current_buffer_index = getattr(self.buffer, 'index', 0) + initial_chunks_needed = getattr(Config, 'INITIAL_BEHIND_CHUNKS', 10) + + if current_buffer_index < initial_chunks_needed: + # Not enough buffer yet - set to connecting state if not already + if current_state != "connecting": + update_data = { + "state": "connecting", + "state_changed_at": current_time + } + redis_client.hset(metadata_key, mapping=update_data) + logger.info(f"Channel {channel_id} connected but waiting for buffer to fill: {current_buffer_index}/{initial_chunks_needed} chunks") + + # Schedule a retry to check buffer status again + timer = threading.Timer(0.5, self._check_buffer_and_set_state) + timer.daemon = True + timer.start() + return False + + # We have enough buffer, proceed with state change update_data = { "state": "waiting_for_clients", "connection_ready_time": current_time, - "state_changed_at": current_time + "state_changed_at": current_time, + "buffer_chunks": str(current_buffer_index) } redis_client.hset(metadata_key, mapping=update_data) # Get configured grace period or default grace_period = getattr(Config, 'CHANNEL_INIT_GRACE_PERIOD', 20) - logger.info(f"STREAM MANAGER: Updated channel {channel_id} state: {current_state or 'None'} → waiting_for_clients") + logger.info(f"STREAM MANAGER: Updated channel {channel_id} state: {current_state or 'None'} → waiting_for_clients with {current_buffer_index} buffer chunks") logger.info(f"Started initial connection grace period ({grace_period}s) for channel {channel_id}") else: logger.debug(f"Not changing state: channel {channel_id} already in {current_state} state") except Exception as e: - logger.error(f"Error setting waiting for clients state: {e}") \ No newline at end of file + logger.error(f"Error setting waiting for clients state: {e}") + + def _check_buffer_and_set_state(self): + """Check buffer size and set state to waiting_for_clients when ready""" + try: + # This method will be called asynchronously to check buffer status + # and update state when enough chunks are available + if hasattr(self.buffer, 'index') and hasattr(self.buffer, 'channel_id'): + current_buffer_index = self.buffer.index + initial_chunks_needed = getattr(Config, 'INITIAL_BEHIND_CHUNKS', 10) + channel_id = self.buffer.channel_id + + if current_buffer_index >= initial_chunks_needed: + # We now have enough buffer, call _set_waiting_for_clients again + logger.info(f"Buffer threshold reached for channel {channel_id}: {current_buffer_index}/{initial_chunks_needed} chunks") + self._set_waiting_for_clients() + else: + # Still waiting, log progress and schedule another check + logger.debug(f"Buffer filling for channel {channel_id}: {current_buffer_index}/{initial_chunks_needed} chunks") + if current_buffer_index > 0 and current_buffer_index % 5 == 0: + # Log less frequently to avoid spamming logs + logger.info(f"Buffer filling for channel {channel_id}: {current_buffer_index}/{initial_chunks_needed} chunks") + + # Schedule another check + timer = threading.Timer(0.5, self._check_buffer_and_set_state) + timer.daemon = True + timer.start() + except Exception as e: + logger.error(f"Error in buffer check: {e}") \ No newline at end of file