From ddec51ec5b33d9d9b3493c2d65360dd8c2ef13e2 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sun, 9 Mar 2025 18:38:12 -0500 Subject: [PATCH] All clients connect and share the redis buffer. --- apps/proxy/ts_proxy/server.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 3d6066b4..3c010806 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -149,6 +149,11 @@ class StreamManager: else: logging.warning("Failed to add chunk to buffer") + # If successful, update last data timestamp in Redis + if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: + last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" + self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) # 1 minute expiry + return success return False @@ -172,6 +177,21 @@ class StreamManager: def run(self): """Main execution loop with stream health monitoring""" try: + # Check if buffer already has data - in which case we might not need to connect + if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: + buffer_index = self.buffer.redis_client.get(f"ts_proxy:buffer:{self.buffer.channel_id}:index") + if buffer_index and int(buffer_index) > 0: + # There's already data in Redis, check if it's recent (within last 10 seconds) + last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" + last_data = self.buffer.redis_client.get(last_data_key) + if last_data: + last_time = float(last_data) + if time.time() - last_time < 10: + logging.info(f"Recent data found in Redis, no need to reconnect") + self.connected = True + self.healthy = True + return + # Start health monitor thread health_thread = threading.Thread(target=self._monitor_health, daemon=True) health_thread.start() @@ -825,7 +845,7 @@ class ProxyServer: logging.debug(f"Created StreamBuffer for channel {channel_id}") self.stream_buffers[channel_id] = buffer - # Create stream manager (actual connection to provider) + # Only the owner worker creates the actual stream manager stream_manager = StreamManager(channel_url, buffer, user_agent=channel_user_agent) logging.debug(f"Created StreamManager for channel {channel_id}") self.stream_managers[channel_id] = stream_manager @@ -839,7 +859,7 @@ class ProxyServer: activity_key = f"ts_proxy:active_channel:{channel_id}" self.redis_client.set(activity_key, "1", ex=300) - # Start stream manager thread + # Start stream manager thread only for the owner thread = threading.Thread(target=stream_manager.run, daemon=True) thread.name = f"stream-{channel_id}" thread.start()