Changed read timeout for http connection for the proxy server to 10 secounds to avoid unnecessary timeouts.

Improved try_next_stream to not fail if the returned stream is the same. It will now try a different stream.
Force a client to jump ahead in the buffer if they fall to far behind.
This commit is contained in:
SergeantPanda 2025-10-11 19:45:21 -05:00
parent fa08216600
commit 74280baa85
3 changed files with 106 additions and 51 deletions

View file

@ -303,6 +303,14 @@ class StreamBuffer:
# Retrieve chunks
chunks = self.get_chunks_exact(client_index, chunk_count)
# Check if we got significantly fewer chunks than expected (likely due to expiration)
# Only check if we expected multiple chunks and got none or very few
if chunk_count > 3 and len(chunks) == 0 and chunks_behind > 10:
# Chunks are missing - likely expired from Redis
# Return empty list to signal client should skip forward
logger.debug(f"Chunks missing for client at index {client_index}, buffer at {self.index} ({chunks_behind} behind)")
return [], client_index
# Check total size
total_size = sum(len(c) for c in chunks)
@ -316,7 +324,7 @@ class StreamBuffer:
additional_size = sum(len(c) for c in more_chunks)
if total_size + additional_size <= MAX_SIZE:
chunks.extend(more_chunks)
chunk_count += additional
chunk_count += len(more_chunks) # Fixed: count actual additional chunks retrieved
return chunks, client_index + chunk_count

View file

@ -204,6 +204,18 @@ class StreamGenerator:
self.empty_reads += 1
self.consecutive_empty += 1
# Check if we're too far behind (chunks expired from Redis)
chunks_behind = self.buffer.index - self.local_index
if chunks_behind > 50: # If more than 50 chunks behind, jump forward
# Calculate new position: stay a few chunks behind current buffer
initial_behind = ConfigHelper.initial_behind_chunks()
new_index = max(self.local_index, self.buffer.index - initial_behind)
logger.warning(f"[{self.client_id}] Client too far behind ({chunks_behind} chunks), jumping from {self.local_index} to {new_index}")
self.local_index = new_index
self.consecutive_empty = 0 # Reset since we're repositioning
continue # Try again immediately with new position
if self._should_send_keepalive(self.local_index):
keepalive_packet = create_ts_packet('keepalive')
logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head")

View file

@ -92,11 +92,13 @@ class StreamManager:
self.tried_stream_ids.add(self.current_stream_id)
logger.info(f"Loaded stream ID {self.current_stream_id} from Redis for channel {buffer.channel_id}")
else:
logger.warning(f"No stream_id found in Redis for channel {channel_id}")
logger.warning(f"No stream_id found in Redis for channel {channel_id}. "
f"Stream switching will rely on URL comparison to avoid selecting the same stream.")
except Exception as e:
logger.warning(f"Error loading stream ID from Redis: {e}")
else:
logger.warning(f"Unable to get stream ID for channel {channel_id} - stream switching may not work correctly")
logger.warning(f"Unable to get stream ID for channel {channel_id}. "
f"Stream switching will rely on URL comparison to avoid selecting the same stream.")
logger.info(f"Initialized stream manager for channel {buffer.channel_id}")
@ -767,7 +769,7 @@ class StreamManager:
response = session.get(
self.url,
stream=True,
timeout=(5, chunk_timeout) # 5s connect timeout, configurable chunk timeout
timeout=(5, 10) # 5s connect timeout, 10s read timeout
)
self.current_response = response
@ -1378,7 +1380,17 @@ class StreamManager:
# Only update if not already past connecting
if not current_state or current_state in [ChannelState.INITIALIZING, ChannelState.CONNECTING]:
# NEW CODE: Check if buffer has enough chunks
current_buffer_index = getattr(self.buffer, 'index', 0)
# IMPORTANT: Read from Redis, not local buffer.index, because in multi-worker setup
# each worker has its own StreamBuffer instance with potentially stale local index
buffer_index_key = RedisKeys.buffer_index(channel_id)
current_buffer_index = 0
try:
redis_index = redis_client.get(buffer_index_key)
if redis_index:
current_buffer_index = int(redis_index)
except Exception as e:
logger.error(f"Error reading buffer index from Redis: {e}")
initial_chunks_needed = ConfigHelper.initial_behind_chunks()
if current_buffer_index < initial_chunks_needed:
@ -1426,10 +1438,21 @@ class StreamManager:
# Clean up completed timers
self._buffer_check_timers = [t for t in self._buffer_check_timers if t.is_alive()]
if hasattr(self.buffer, 'index') and hasattr(self.buffer, 'channel_id'):
current_buffer_index = self.buffer.index
initial_chunks_needed = getattr(Config, 'INITIAL_BEHIND_CHUNKS', 10)
if hasattr(self.buffer, 'channel_id') and hasattr(self.buffer, 'redis_client'):
channel_id = self.buffer.channel_id
redis_client = self.buffer.redis_client
# IMPORTANT: Read from Redis, not local buffer.index
buffer_index_key = RedisKeys.buffer_index(channel_id)
current_buffer_index = 0
try:
redis_index = redis_client.get(buffer_index_key)
if redis_index:
current_buffer_index = int(redis_index)
except Exception as e:
logger.error(f"Error reading buffer index from Redis: {e}")
initial_chunks_needed = ConfigHelper.initial_behind_chunks() # Use ConfigHelper for consistency
if current_buffer_index >= initial_chunks_needed:
# We now have enough buffer, call _set_waiting_for_clients again
@ -1454,6 +1477,7 @@ class StreamManager:
def _try_next_stream(self):
"""
Try to switch to the next available stream for this channel.
Will iterate through multiple alternate streams if needed to find one with a different URL.
Returns:
bool: True if successfully switched to a new stream, False otherwise
@ -1479,60 +1503,71 @@ class StreamManager:
logger.warning(f"All {len(alternate_streams)} alternate streams have been tried for channel {self.channel_id}")
return False
# Get the next stream to try
next_stream = untried_streams[0]
stream_id = next_stream['stream_id']
profile_id = next_stream['profile_id'] # This is the M3U profile ID we need
# IMPROVED: Try multiple streams until we find one with a different URL
for next_stream in untried_streams:
stream_id = next_stream['stream_id']
profile_id = next_stream['profile_id'] # This is the M3U profile ID we need
# Add to tried streams
self.tried_stream_ids.add(stream_id)
# Add to tried streams
self.tried_stream_ids.add(stream_id)
# Get stream info including URL using the profile_id we already have
logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}")
stream_info = get_stream_info_for_switch(self.channel_id, stream_id)
# Get stream info including URL using the profile_id we already have
logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}")
stream_info = get_stream_info_for_switch(self.channel_id, stream_id)
if 'error' in stream_info or not stream_info.get('url'):
logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}")
return False
if 'error' in stream_info or not stream_info.get('url'):
logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}")
continue # Try next stream instead of giving up
# Update URL and user agent
new_url = stream_info['url']
new_user_agent = stream_info['user_agent']
new_transcode = stream_info['transcode']
# Update URL and user agent
new_url = stream_info['url']
new_user_agent = stream_info['user_agent']
new_transcode = stream_info['transcode']
logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}")
# CRITICAL FIX: Check if the new URL is the same as current URL
# This can happen when current_stream_id is None and we accidentally select the same stream
if new_url == self.url:
logger.warning(f"Stream ID {stream_id} generates the same URL as current stream ({new_url}). "
f"Skipping this stream and trying next alternative.")
continue # Try next stream instead of giving up
# IMPORTANT: Just update the URL, don't stop the channel or release resources
switch_result = self.update_url(new_url, stream_id, profile_id)
if not switch_result:
logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}")
return False
logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}")
# Update stream ID tracking
self.current_stream_id = stream_id
# IMPORTANT: Just update the URL, don't stop the channel or release resources
switch_result = self.update_url(new_url, stream_id, profile_id)
if not switch_result:
logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}")
continue # Try next stream
# Store the new user agent and transcode settings
self.user_agent = new_user_agent
self.transcode = new_transcode
# Update stream ID tracking
self.current_stream_id = stream_id
# Update stream metadata in Redis - use the profile_id we got from get_alternate_streams
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
metadata_key = RedisKeys.channel_metadata(self.channel_id)
self.buffer.redis_client.hset(metadata_key, mapping={
ChannelMetadataField.URL: new_url,
ChannelMetadataField.USER_AGENT: new_user_agent,
ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'],
ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams
ChannelMetadataField.STREAM_ID: str(stream_id),
ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()),
ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded"
})
# Store the new user agent and transcode settings
self.user_agent = new_user_agent
self.transcode = new_transcode
# Log the switch
logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}")
# Update stream metadata in Redis - use the profile_id we got from get_alternate_streams
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
metadata_key = RedisKeys.channel_metadata(self.channel_id)
self.buffer.redis_client.hset(metadata_key, mapping={
ChannelMetadataField.URL: new_url,
ChannelMetadataField.USER_AGENT: new_user_agent,
ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'],
ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams
ChannelMetadataField.STREAM_ID: str(stream_id),
ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()),
ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded"
})
logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}")
return True
# Log the switch
logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}")
logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}")
return True
# If we get here, we tried all streams but none worked
logger.error(f"Tried {len(untried_streams)} alternate streams but none were suitable for channel {self.channel_id}")
return False
except Exception as e:
logger.error(f"Error trying next stream for channel {self.channel_id}: {e}", exc_info=True)