From d04ba07d10ee3a10a40b9fd76df723c819e673db Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 21 Mar 2025 19:59:54 -0500 Subject: [PATCH] Added client speed statistics to client metadata in redis. --- apps/proxy/ts_proxy/redis_keys.py | 5 +++ apps/proxy/ts_proxy/stream_generator.py | 56 ++++++++++++++----------- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/apps/proxy/ts_proxy/redis_keys.py b/apps/proxy/ts_proxy/redis_keys.py index 1eaa8aa5..22b02648 100644 --- a/apps/proxy/ts_proxy/redis_keys.py +++ b/apps/proxy/ts_proxy/redis_keys.py @@ -83,3 +83,8 @@ class RedisKeys: def transcode_active(channel_id): """Key indicating active transcode process""" return f"ts_proxy:channel:{channel_id}:transcode_active" + + @staticmethod + def client_metadata(channel_id, client_id): + """Key for client metadata hash""" + return f"ts_proxy:channel:{channel_id}:clients:{client_id}" diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index 8a91f1dc..6a6341c9 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -162,11 +162,6 @@ class StreamGenerator: def _stream_data_generator(self): """Generate stream data chunks based on buffer contents.""" - bytes_sent = 0 - chunks_sent = 0 - stream_start_time = time.time() - local_index = self.local_index - # Main streaming loop while True: # Check if resources still exist @@ -174,12 +169,11 @@ class StreamGenerator: break # Get chunks at client's position using improved strategy - chunks, next_index = self.buffer.get_optimized_client_data(local_index) + chunks, next_index = self.buffer.get_optimized_client_data(self.local_index) if chunks: - yield from self._process_chunks(chunks, next_index, bytes_sent, chunks_sent, stream_start_time) - local_index = next_index - self.local_index = local_index + yield from self._process_chunks(chunks, next_index) + self.local_index = next_index self.last_yield_time = time.time() self.empty_reads = 0 self.consecutive_empty = 0 @@ -188,11 +182,11 @@ class StreamGenerator: self.empty_reads += 1 self.consecutive_empty += 1 - if self._should_send_keepalive(local_index): + if self._should_send_keepalive(self.local_index): keepalive_packet = create_ts_packet('keepalive') logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head") yield keepalive_packet - bytes_sent += len(keepalive_packet) + self.bytes_sent += len(keepalive_packet) self.last_yield_time = time.time() self.consecutive_empty = 0 # Reset consecutive counter but keep total empty_reads time.sleep(Config.KEEPALIVE_INTERVAL) @@ -204,11 +198,11 @@ class StreamGenerator: # Log empty reads periodically if self.empty_reads % 50 == 0: stream_status = "healthy" if (self.stream_manager and self.stream_manager.healthy) else "unknown" - logger.debug(f"[{self.client_id}] Waiting for chunks beyond {local_index} (buffer at {self.buffer.index}, stream: {stream_status})") + logger.debug(f"[{self.client_id}] Waiting for chunks beyond {self.local_index} (buffer at {self.buffer.index}, stream: {stream_status})") # Check for ghost clients - if self._is_ghost_client(local_index): - logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - local_index} chunks ahead but client stuck at {local_index}") + if self._is_ghost_client(self.local_index): + logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - self.local_index} chunks ahead but client stuck at {self.local_index}") break # Check for timeouts @@ -258,7 +252,7 @@ class StreamGenerator: return True - def _process_chunks(self, chunks, next_index, bytes_sent, chunks_sent, stream_start_time): + def _process_chunks(self, chunks, next_index): """Process and yield chunks to the client.""" # Process and send chunks total_size = sum(len(c) for c in chunks) @@ -268,20 +262,34 @@ class StreamGenerator: for chunk in chunks: try: yield chunk - bytes_sent += len(chunk) - chunks_sent += 1 + self.bytes_sent += len(chunk) + self.chunks_sent += 1 + logger.debug(f"[{self.client_id}] Sent chunk {self.chunks_sent} ({len(chunk)} bytes) to client") + # Log every 10 chunks and store in redis for visibility + if self.chunks_sent % 10 == 0: + elapsed = time.time() - self.stream_start_time + rate = self.bytes_sent / elapsed / 1024 if elapsed > 0 else 0 + logger.debug(f"[{self.client_id}] Stats: {self.chunks_sent} chunks, {self.bytes_sent/1024:.1f} KB, {rate:.1f} KB/s") + + # Store stats in Redis client metadata + if proxy_server.redis_client: + try: + client_key = RedisKeys.client_metadata(self.channel_id, self.client_id) + stats = { + "chunks_sent": str(self.chunks_sent), + "bytes_sent": str(self.bytes_sent), + "transfer_rate_KBps": str(round(rate, 1)), + "stats_updated_at": str(time.time()) + } + proxy_server.redis_client.hset(client_key, mapping=stats) + # No need to set expiration as client heartbeat will refresh this key + except Exception as e: + logger.warning(f"[{self.client_id}] Failed to store stats in Redis: {e}") - # Log every 100 chunks for visibility - if chunks_sent % 100 == 0: - elapsed = time.time() - stream_start_time - rate = bytes_sent / elapsed / 1024 if elapsed > 0 else 0 - logger.info(f"[{self.client_id}] Stats: {chunks_sent} chunks, {bytes_sent/1024:.1f}KB, {rate:.1f}KB/s") except Exception as e: logger.error(f"[{self.client_id}] Error sending chunk to client: {e}") raise # Re-raise to exit the generator - return bytes_sent, chunks_sent - def _should_send_keepalive(self, local_index): """Determine if a keepalive packet should be sent.""" # Check if we're caught up to buffer head