All clients connect and share the redis buffer.

This commit is contained in:
SergeantPanda 2025-03-09 18:38:12 -05:00
parent 905c01d933
commit ddec51ec5b

View file

@ -149,6 +149,11 @@ class StreamManager:
else:
logging.warning("Failed to add chunk to buffer")
# If successful, update last data timestamp in Redis
if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data"
self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) # 1 minute expiry
return success
return False
@ -172,6 +177,21 @@ class StreamManager:
def run(self):
"""Main execution loop with stream health monitoring"""
try:
# Check if buffer already has data - in which case we might not need to connect
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
buffer_index = self.buffer.redis_client.get(f"ts_proxy:buffer:{self.buffer.channel_id}:index")
if buffer_index and int(buffer_index) > 0:
# There's already data in Redis, check if it's recent (within last 10 seconds)
last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data"
last_data = self.buffer.redis_client.get(last_data_key)
if last_data:
last_time = float(last_data)
if time.time() - last_time < 10:
logging.info(f"Recent data found in Redis, no need to reconnect")
self.connected = True
self.healthy = True
return
# Start health monitor thread
health_thread = threading.Thread(target=self._monitor_health, daemon=True)
health_thread.start()
@ -825,7 +845,7 @@ class ProxyServer:
logging.debug(f"Created StreamBuffer for channel {channel_id}")
self.stream_buffers[channel_id] = buffer
# Create stream manager (actual connection to provider)
# Only the owner worker creates the actual stream manager
stream_manager = StreamManager(channel_url, buffer, user_agent=channel_user_agent)
logging.debug(f"Created StreamManager for channel {channel_id}")
self.stream_managers[channel_id] = stream_manager
@ -839,7 +859,7 @@ class ProxyServer:
activity_key = f"ts_proxy:active_channel:{channel_id}"
self.redis_client.set(activity_key, "1", ex=300)
# Start stream manager thread
# Start stream manager thread only for the owner
thread = threading.Thread(target=stream_manager.run, daemon=True)
thread.name = f"stream-{channel_id}"
thread.start()