Fixed regression with buffer checks when clients should be disconnecting due to failure.

This commit is contained in:
SergeantPanda 2025-03-20 21:03:01 -05:00
parent cb02069fb7
commit 4738d301d1
5 changed files with 161 additions and 15 deletions

View file

@ -78,3 +78,8 @@ class RedisKeys:
def worker_heartbeat(worker_id):
"""Key for worker heartbeat"""
return f"ts_proxy:worker:{worker_id}:heartbeat"
@staticmethod
def transcode_active(channel_id):
"""Key indicating active transcode process"""
return f"ts_proxy:channel:{channel_id}:transcode_active"

View file

@ -308,6 +308,12 @@ class ProxyServer:
if current and current.decode('utf-8') == self.worker_id:
self.redis_client.delete(lock_key)
logger.info(f"Released ownership of channel {channel_id}")
# Also ensure channel stopping key is set to signal clients
stop_key = RedisKeys.channel_stopping(channel_id)
self.redis_client.setex(stop_key, 30, "true")
logger.info(f"Set stopping signal for channel {channel_id} clients")
except Exception as e:
logger.error(f"Error releasing channel ownership: {e}")
@ -458,7 +464,8 @@ class ProxyServer:
buffer,
user_agent=channel_user_agent,
transcode=transcode,
stream_id=channel_stream_id # Pass stream ID to the manager
stream_id=channel_stream_id, # Pass stream ID to the manager
worker_id=self.worker_id # Pass worker_id explicitly to eliminate circular dependency
)
logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}")
self.stream_managers[channel_id] = stream_manager

View file

@ -218,9 +218,19 @@ class ChannelService:
if metadata and b'state' in metadata:
state = metadata[b'state'].decode('utf-8')
channel_info = {"state": state}
# Immediately mark as stopping in metadata so clients detect it faster
proxy_server.redis_client.hset(metadata_key, "state", ChannelState.STOPPING)
proxy_server.redis_client.hset(metadata_key, "state_changed_at", str(time.time()))
except Exception as e:
logger.error(f"Error fetching channel state: {e}")
# Set stopping flag with higher TTL to ensure it persists
if proxy_server.redis_client:
stop_key = RedisKeys.channel_stopping(channel_id)
proxy_server.redis_client.setex(stop_key, 60, "true") # Higher TTL of 60 seconds
logger.info(f"Set channel stopping flag with 60s TTL for channel {channel_id}")
# Broadcast stop event to all workers via PubSub
if proxy_server.redis_client:
ChannelService._publish_channel_stop_event(channel_id)

View file

@ -103,7 +103,7 @@ class StreamGenerator:
if state in ['waiting_for_clients', 'active']:
logger.info(f"[{self.client_id}] Channel {self.channel_id} now ready (state={state})")
return True
elif state in ['error', 'stopped']:
elif state in ['error', 'stopped', 'stopping']: # Added 'stopping' to error states
error_message = metadata.get(b'error_message', b'Unknown error').decode('utf-8')
logger.error(f"[{self.client_id}] Channel {self.channel_id} in error state: {state}, message: {error_message}")
# Send error packet before giving up
@ -119,6 +119,13 @@ class StreamGenerator:
self.bytes_sent += len(keepalive_packet)
last_keepalive = time.time()
# Also check stopping key directly
stop_key = RedisKeys.channel_stopping(self.channel_id)
if proxy_server.redis_client.exists(stop_key):
logger.error(f"[{self.client_id}] Channel {self.channel_id} stopping flag detected during initialization")
yield create_ts_packet('error', "Error: Channel is stopping")
return False
# Wait a bit before checking again
time.sleep(0.1)
@ -221,12 +228,21 @@ class StreamGenerator:
# Check if this specific client has been stopped (Redis keys, etc.)
if proxy_server.redis_client:
# Channel stop check
# Channel stop check - with extended key set
stop_key = RedisKeys.channel_stopping(self.channel_id)
if proxy_server.redis_client.exists(stop_key):
logger.info(f"[{self.client_id}] Detected channel stop signal, terminating stream")
return False
# Also check channel state in metadata
metadata_key = RedisKeys.channel_metadata(self.channel_id)
metadata = proxy_server.redis_client.hgetall(metadata_key)
if metadata and b'state' in metadata:
state = metadata[b'state'].decode('utf-8')
if state in ['error', 'stopped', 'stopping']:
logger.info(f"[{self.client_id}] Channel in {state} state, terminating stream")
return False
# Client stop check
client_stop_key = RedisKeys.client_stop(self.channel_id, self.client_id)
if proxy_server.redis_client.exists(client_stop_key):

View file

@ -24,7 +24,7 @@ logger = logging.getLogger("ts_proxy")
class StreamManager:
"""Manages a connection to a TS stream without using raw sockets"""
def __init__(self, channel_id, url, buffer, user_agent=None, transcode=False, stream_id=None):
def __init__(self, channel_id, url, buffer, user_agent=None, transcode=False, stream_id=None, worker_id=None):
# Basic properties
self.channel_id = channel_id
self.url = url
@ -36,6 +36,8 @@ class StreamManager:
self.current_response = None
self.current_session = None
self.url_switching = False
# Store worker_id for ownership checks
self.worker_id = worker_id
# Sockets used for transcode jobs
self.socket = None
@ -89,6 +91,9 @@ class StreamManager:
logger.info(f"Initialized stream manager for channel {buffer.channel_id}")
# Add this flag for tracking transcoding process status
self.transcode_process_active = False
def _create_session(self):
"""Create and configure requests session with optimal settings"""
session = requests.Session()
@ -234,9 +239,61 @@ class StreamManager:
except Exception as e:
logger.error(f"Stream error: {e}", exc_info=True)
finally:
# Enhanced cleanup in the finally block
self.connected = False
# Explicitly cancel all timers
for timer in list(self._buffer_check_timers):
try:
if timer and timer.is_alive():
timer.cancel()
except Exception:
pass
self._buffer_check_timers.clear()
# Make sure transcode process is terminated
if self.transcode_process_active:
logger.info("Ensuring transcode process is terminated in finally block")
self._close_socket()
# Close all connections
self._close_all_connections()
logger.info(f"Stream manager stopped")
# Update channel state in Redis to prevent clients from waiting indefinitely
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
try:
metadata_key = RedisKeys.channel_metadata(self.channel_id)
# Check if we're the owner before updating state
owner_key = RedisKeys.channel_owner(self.channel_id)
current_owner = self.buffer.redis_client.get(owner_key)
# Use the worker_id that was passed in during initialization
if current_owner and self.worker_id and current_owner.decode('utf-8') == self.worker_id:
# Determine the appropriate error message based on retry failures
if self.tried_stream_ids and len(self.tried_stream_ids) > 0:
error_message = f"All {len(self.tried_stream_ids)} stream options failed"
else:
error_message = f"Connection failed after {self.max_retries} attempts"
# Update metadata to indicate error state
update_data = {
"state": ChannelState.ERROR,
"state_changed_at": str(time.time()),
"error_message": error_message,
"error_time": str(time.time())
}
self.buffer.redis_client.hset(metadata_key, mapping=update_data)
logger.info(f"Updated channel {self.channel_id} state to ERROR in Redis after stream failure")
# Also set stopping key to ensure clients disconnect
stop_key = RedisKeys.channel_stopping(self.channel_id)
self.buffer.redis_client.setex(stop_key, 60, "true")
except Exception as e:
logger.error(f"Failed to update channel state in Redis: {e}")
logger.info(f"Stream manager stopped for channel {self.channel_id}")
def _establish_transcode_connection(self):
"""Establish a connection using transcoding"""
@ -264,11 +321,14 @@ class StreamManager:
self.transcode_process = subprocess.Popen(
self.transcode_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, # Suppress FFmpeg logs
stderr=subprocess.DEVNULL, # Suppress error logs
bufsize=188 * 64 # Buffer optimized for TS packets
)
self.socket = self.transcode_process.stdout # Read from FFmpeg output
# Set flag that transcoding process is active
self.transcode_process_active = True
self.socket = self.transcode_process.stdout # Read from std output
self.connected = True
# Set channel state to waiting for clients
@ -392,6 +452,8 @@ class StreamManager:
def stop(self):
"""Stop the stream manager and cancel all timers"""
logger.info(f"Stopping stream manager for channel {self.channel_id}")
# Add at the beginning of your stop method
self.stopping = True
@ -405,7 +467,6 @@ class StreamManager:
self._buffer_check_timers.clear()
# Rest of your existing stop method...
# Set the flag first
self.stop_requested = True
@ -423,6 +484,9 @@ class StreamManager:
except Exception:
pass
# Explicitly close socket/transcode resources
self._close_socket()
# Set running to false to ensure thread exits
self.running = False
@ -530,15 +594,56 @@ class StreamManager:
self.socket = None
self.connected = False
# Enhanced transcode process cleanup with more aggressive termination
if self.transcode_process:
try:
# First try polite termination
logger.debug(f"Terminating transcode process for channel {self.channel_id}")
self.transcode_process.terminate()
self.transcode_process.wait()
# Give it a short time to terminate gracefully
try:
self.transcode_process.wait(timeout=1.0)
except subprocess.TimeoutExpired:
# If it doesn't terminate quickly, kill it
logger.warning(f"Transcode process didn't terminate within timeout, killing forcefully")
self.transcode_process.kill()
try:
self.transcode_process.wait(timeout=1.0)
except subprocess.TimeoutExpired:
logger.error(f"Failed to kill transcode process even with force")
except Exception as e:
logger.debug(f"Error terminating transcode process: {e}")
pass
# Final attempt: try to kill directly
try:
self.transcode_process.kill()
except Exception as e:
logger.error(f"Final kill attempt failed: {e}")
self.transcode_process = None
self.transcode_process_active = False # Reset the flag
# Clear transcode active key in Redis if available
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
try:
transcode_key = RedisKeys.transcode_active(self.channel_id)
self.buffer.redis_client.delete(transcode_key)
logger.debug(f"Cleared transcode active flag for channel {self.channel_id}")
except Exception as e:
logger.debug(f"Error clearing transcode flag: {e}")
# Cancel any remaining buffer check timers
for timer in list(self._buffer_check_timers):
try:
if timer and timer.is_alive():
timer.cancel()
logger.debug(f"Cancelled buffer check timer during socket close for channel {self.channel_id}")
except Exception as e:
logger.debug(f"Error canceling timer during socket close: {e}")
self._buffer_check_timers = []
def fetch_chunk(self):
"""Fetch data from socket with direct pass-through to buffer"""
@ -649,10 +754,10 @@ class StreamManager:
def _check_buffer_and_set_state(self):
"""Check buffer size and set state to waiting_for_clients when ready"""
try:
# First check if we're stopping or reconnecting
if getattr(self, 'stopping', False) or getattr(self, 'reconnecting', False):
# Enhanced stop detection with short-circuit return
if not self.running or getattr(self, 'stopping', False) or getattr(self, 'reconnecting', False):
logger.debug(f"Buffer check aborted - channel {self.buffer.channel_id} is stopping or reconnecting")
return
return False # Return value to indicate check was aborted
# Clean up completed timers
self._buffer_check_timers = [t for t in self._buffer_check_timers if t.is_alive()]
@ -670,14 +775,17 @@ class StreamManager:
# Still waiting, log progress and schedule another check
logger.debug(f"Buffer filling for channel {channel_id}: {current_buffer_index}/{initial_chunks_needed} chunks")
# Schedule another check - NOW WITH TRACKING
if not getattr(self, 'stopping', False):
# Schedule another check - NOW WITH STOPPING CHECK
if self.running and not getattr(self, 'stopping', False):
timer = threading.Timer(0.5, self._check_buffer_and_set_state)
timer.daemon = True
timer.start()
self._buffer_check_timers.append(timer)
return True # Return value to indicate check was successful
except Exception as e:
logger.error(f"Error in buffer check: {e}")
return False
def _try_next_stream(self):
"""