Added client speed statistics to client metadata in redis.

This commit is contained in:
SergeantPanda 2025-03-21 19:59:54 -05:00
parent 6fdadd4287
commit d04ba07d10
2 changed files with 37 additions and 24 deletions

View file

@ -83,3 +83,8 @@ class RedisKeys:
def transcode_active(channel_id):
"""Key indicating active transcode process"""
return f"ts_proxy:channel:{channel_id}:transcode_active"
@staticmethod
def client_metadata(channel_id, client_id):
"""Key for client metadata hash"""
return f"ts_proxy:channel:{channel_id}:clients:{client_id}"

View file

@ -162,11 +162,6 @@ class StreamGenerator:
def _stream_data_generator(self):
"""Generate stream data chunks based on buffer contents."""
bytes_sent = 0
chunks_sent = 0
stream_start_time = time.time()
local_index = self.local_index
# Main streaming loop
while True:
# Check if resources still exist
@ -174,12 +169,11 @@ class StreamGenerator:
break
# Get chunks at client's position using improved strategy
chunks, next_index = self.buffer.get_optimized_client_data(local_index)
chunks, next_index = self.buffer.get_optimized_client_data(self.local_index)
if chunks:
yield from self._process_chunks(chunks, next_index, bytes_sent, chunks_sent, stream_start_time)
local_index = next_index
self.local_index = local_index
yield from self._process_chunks(chunks, next_index)
self.local_index = next_index
self.last_yield_time = time.time()
self.empty_reads = 0
self.consecutive_empty = 0
@ -188,11 +182,11 @@ class StreamGenerator:
self.empty_reads += 1
self.consecutive_empty += 1
if self._should_send_keepalive(local_index):
if self._should_send_keepalive(self.local_index):
keepalive_packet = create_ts_packet('keepalive')
logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head")
yield keepalive_packet
bytes_sent += len(keepalive_packet)
self.bytes_sent += len(keepalive_packet)
self.last_yield_time = time.time()
self.consecutive_empty = 0 # Reset consecutive counter but keep total empty_reads
time.sleep(Config.KEEPALIVE_INTERVAL)
@ -204,11 +198,11 @@ class StreamGenerator:
# Log empty reads periodically
if self.empty_reads % 50 == 0:
stream_status = "healthy" if (self.stream_manager and self.stream_manager.healthy) else "unknown"
logger.debug(f"[{self.client_id}] Waiting for chunks beyond {local_index} (buffer at {self.buffer.index}, stream: {stream_status})")
logger.debug(f"[{self.client_id}] Waiting for chunks beyond {self.local_index} (buffer at {self.buffer.index}, stream: {stream_status})")
# Check for ghost clients
if self._is_ghost_client(local_index):
logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - local_index} chunks ahead but client stuck at {local_index}")
if self._is_ghost_client(self.local_index):
logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - self.local_index} chunks ahead but client stuck at {self.local_index}")
break
# Check for timeouts
@ -258,7 +252,7 @@ class StreamGenerator:
return True
def _process_chunks(self, chunks, next_index, bytes_sent, chunks_sent, stream_start_time):
def _process_chunks(self, chunks, next_index):
"""Process and yield chunks to the client."""
# Process and send chunks
total_size = sum(len(c) for c in chunks)
@ -268,20 +262,34 @@ class StreamGenerator:
for chunk in chunks:
try:
yield chunk
bytes_sent += len(chunk)
chunks_sent += 1
self.bytes_sent += len(chunk)
self.chunks_sent += 1
logger.debug(f"[{self.client_id}] Sent chunk {self.chunks_sent} ({len(chunk)} bytes) to client")
# Log every 10 chunks and store in redis for visibility
if self.chunks_sent % 10 == 0:
elapsed = time.time() - self.stream_start_time
rate = self.bytes_sent / elapsed / 1024 if elapsed > 0 else 0
logger.debug(f"[{self.client_id}] Stats: {self.chunks_sent} chunks, {self.bytes_sent/1024:.1f} KB, {rate:.1f} KB/s")
# Store stats in Redis client metadata
if proxy_server.redis_client:
try:
client_key = RedisKeys.client_metadata(self.channel_id, self.client_id)
stats = {
"chunks_sent": str(self.chunks_sent),
"bytes_sent": str(self.bytes_sent),
"transfer_rate_KBps": str(round(rate, 1)),
"stats_updated_at": str(time.time())
}
proxy_server.redis_client.hset(client_key, mapping=stats)
# No need to set expiration as client heartbeat will refresh this key
except Exception as e:
logger.warning(f"[{self.client_id}] Failed to store stats in Redis: {e}")
# Log every 100 chunks for visibility
if chunks_sent % 100 == 0:
elapsed = time.time() - stream_start_time
rate = bytes_sent / elapsed / 1024 if elapsed > 0 else 0
logger.info(f"[{self.client_id}] Stats: {chunks_sent} chunks, {bytes_sent/1024:.1f}KB, {rate:.1f}KB/s")
except Exception as e:
logger.error(f"[{self.client_id}] Error sending chunk to client: {e}")
raise # Re-raise to exit the generator
return bytes_sent, chunks_sent
def _should_send_keepalive(self, local_index):
"""Determine if a keepalive packet should be sent."""
# Check if we're caught up to buffer head