Enhancement: Add chunk timeout configuration in ConfigHelper. Improve StreamManager timeout handling for consistency. Only 1 heartbeat thread per worker should be started now. Timeout on proxy reduced from 60 seconds to 5.

This commit is contained in:
SergeantPanda 2025-10-11 18:08:20 -05:00
parent fbd83e61b7
commit fa08216600
4 changed files with 93 additions and 75 deletions

View file

@ -8,7 +8,7 @@ import gevent
from typing import Set, Optional
from apps.proxy.config import TSConfig as Config
from redis.exceptions import ConnectionError, TimeoutError
from .constants import EventType
from .constants import EventType, ChannelState, ChannelMetadataField
from .config_helper import ConfigHelper
from .redis_keys import RedisKeys
from .utils import get_logger
@ -26,6 +26,7 @@ class ClientManager:
self.lock = threading.Lock()
self.last_active_time = time.time()
self.worker_id = worker_id # Store worker ID as instance variable
self._heartbeat_running = True # Flag to control heartbeat thread
# STANDARDIZED KEYS: Move client set under channel namespace
self.client_set_key = RedisKeys.clients(channel_id)
@ -77,56 +78,28 @@ class ClientManager:
logger.debug(f"Failed to trigger stats update: {e}")
def _start_heartbeat_thread(self):
"""Start thread to regularly refresh client presence in Redis"""
"""Start thread to regularly refresh client presence in Redis for local clients"""
def heartbeat_task():
no_clients_count = 0 # Track consecutive empty cycles
max_empty_cycles = 3 # Exit after this many consecutive empty checks
logger.debug(f"Started heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
while True:
while self._heartbeat_running:
try:
# Wait for the interval
gevent.sleep(self.heartbeat_interval)
# Wait for the interval, but check stop flag frequently for quick shutdown
# Sleep in 1-second increments to allow faster response to stop signal
for _ in range(int(self.heartbeat_interval)):
if not self._heartbeat_running:
break
time.sleep(1)
# Final check before doing work
if not self._heartbeat_running:
break
# Send heartbeat for all local clients
with self.lock:
if not self.clients or not self.redis_client:
# No clients left, increment our counter
no_clients_count += 1
# Check if we're in a shutdown delay period before exiting
in_shutdown_delay = False
if self.redis_client:
try:
disconnect_key = RedisKeys.last_client_disconnect(self.channel_id)
disconnect_time_bytes = self.redis_client.get(disconnect_key)
if disconnect_time_bytes:
disconnect_time = float(disconnect_time_bytes.decode('utf-8'))
elapsed = time.time() - disconnect_time
shutdown_delay = ConfigHelper.channel_shutdown_delay()
if elapsed < shutdown_delay:
in_shutdown_delay = True
logger.debug(f"Channel {self.channel_id} in shutdown delay: {elapsed:.1f}s of {shutdown_delay}s elapsed")
except Exception as e:
logger.debug(f"Error checking shutdown delay: {e}")
# Only exit if we've seen no clients for several consecutive checks AND we're not in shutdown delay
if no_clients_count >= max_empty_cycles and not in_shutdown_delay:
logger.info(f"No clients for channel {self.channel_id} after {no_clients_count} consecutive checks and not in shutdown delay, exiting heartbeat thread")
return # This exits the thread
# Skip this cycle if we have no clients but continue if in shutdown delay
if not in_shutdown_delay:
continue
else:
# Reset counter during shutdown delay to prevent premature exit
no_clients_count = 0
continue
else:
# Reset counter when we see clients
no_clients_count = 0
# Skip this cycle if we have no local clients
if not self.clients:
continue
# IMPROVED GHOST DETECTION: Check for stale clients before sending heartbeats
current_time = time.time()
@ -197,11 +170,20 @@ class ClientManager:
except Exception as e:
logger.error(f"Error in client heartbeat thread: {e}")
logger.debug(f"Heartbeat thread exiting for channel {self.channel_id}")
thread = threading.Thread(target=heartbeat_task, daemon=True)
thread.name = f"client-heartbeat-{self.channel_id}"
thread.start()
logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
def stop(self):
"""Stop the heartbeat thread and cleanup"""
logger.debug(f"Stopping ClientManager for channel {self.channel_id}")
self._heartbeat_running = False
# Give the thread a moment to exit gracefully
# Note: We don't join() here because it's a daemon thread and will exit on its own
def _execute_redis_command(self, command_func):
"""Execute Redis command with error handling"""
if not self.redis_client:

View file

@ -100,3 +100,8 @@ class ConfigHelper:
def channel_init_grace_period():
"""Get channel initialization grace period in seconds"""
return Config.get_channel_init_grace_period()
@staticmethod
def chunk_timeout():
"""Get chunk timeout in seconds (used for both socket and HTTP read timeouts)"""
return ConfigHelper.get('CHUNK_TIMEOUT', 5) # Default 5 seconds

View file

@ -495,17 +495,18 @@ class ProxyServer:
)
return True
# Create buffer and client manager instances
buffer = StreamBuffer(channel_id, redis_client=self.redis_client)
client_manager = ClientManager(
channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
# Create buffer and client manager instances (or reuse if they exist)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Store in local tracking
self.stream_buffers[channel_id] = buffer
self.client_managers[channel_id] = client_manager
if channel_id not in self.client_managers:
client_manager = ClientManager(
channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# IMPROVED: Set initializing state in Redis BEFORE any other operations
if self.redis_client:
@ -559,13 +560,15 @@ class ProxyServer:
logger.info(f"Channel {channel_id} already owned by worker {current_owner}")
logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only")
# Create buffer but not stream manager
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create buffer but not stream manager (only if not already exists)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id and redis_client (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
@ -580,13 +583,15 @@ class ProxyServer:
# Another worker just acquired ownership
logger.info(f"Another worker just acquired ownership of channel {channel_id}")
# Create buffer but not stream manager
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create buffer but not stream manager (only if not already exists)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id and redis_client (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
@ -641,13 +646,14 @@ class ProxyServer:
logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}")
self.stream_managers[channel_id] = stream_manager
# Create client manager with channel_id, redis_client AND worker_id
client_manager = ClientManager(
channel_id=channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id, redis_client AND worker_id (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(
channel_id=channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# Start stream manager thread only for the owner
thread = threading.Thread(target=stream_manager.run, daemon=True)
@ -855,6 +861,10 @@ class ProxyServer:
# Clean up client manager - SAFE CHECK HERE TOO
if channel_id in self.client_managers:
try:
client_manager = self.client_managers[channel_id]
# Stop the heartbeat thread before deleting
if hasattr(client_manager, 'stop'):
client_manager.stop()
del self.client_managers[channel_id]
logger.info(f"Removed client manager for channel {channel_id}")
except KeyError:

View file

@ -10,6 +10,7 @@ import gevent
import re
from typing import Optional, List
from django.shortcuts import get_object_or_404
from urllib3.exceptions import ReadTimeoutError
from apps.proxy.config import TSConfig as Config
from apps.channels.models import Channel, Stream
from apps.m3u.models import M3UAccount, M3UAccountProfile
@ -761,10 +762,12 @@ class StreamManager:
self.current_session = session
# Stream the URL with proper timeout handling
# Use same chunk timeout as socket connections for consistency
chunk_timeout = ConfigHelper.chunk_timeout()
response = session.get(
self.url,
stream=True,
timeout=(10, 60) # 10s connect timeout, 60s read timeout
timeout=(5, chunk_timeout) # 5s connect timeout, configurable chunk timeout
)
self.current_response = response
@ -832,6 +835,13 @@ class StreamManager:
else:
# Handle direct HTTP connection
chunk_count = 0
# Check if response is still valid before attempting to read
if not self.current_response:
logger.debug(f"Response object is None for channel {self.channel_id}, connection likely closed")
self.connected = False
return
try:
for chunk in self.current_response.iter_content(chunk_size=self.chunk_size):
# Check if we've been asked to stop
@ -854,6 +864,17 @@ class StreamManager:
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
last_data_key = RedisKeys.last_data(self.buffer.channel_id)
self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60)
except (requests.exceptions.ReadTimeout, ReadTimeoutError, requests.exceptions.ConnectionError) as e:
if self.stop_requested or self.url_switching:
logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}")
else:
# Handle timeout errors - log and close connection, let main loop handle retry
logger.warning(f"Stream read timeout for channel {self.channel_id}: {e}")
# Close the current connection
self._close_connection()
return # Exit this method, main loop will retry based on retry_count
except (AttributeError, ConnectionError) as e:
if self.stop_requested or self.url_switching:
logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}")
@ -1274,7 +1295,7 @@ class StreamManager:
try:
# Set timeout for chunk reads
chunk_timeout = ConfigHelper.get('CHUNK_TIMEOUT', 10) # Default 10 seconds
chunk_timeout = ConfigHelper.chunk_timeout() # Use centralized timeout configuration
try:
# Handle different socket types with timeout