Merge pull request #553 from Dispatcharr:Proxy-changes

Enhance HTTP streaming and timeout configurations
This commit is contained in:
SergeantPanda 2025-10-12 09:46:21 -05:00 committed by GitHub
commit 071561c570
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 370 additions and 194 deletions

View file

@ -8,7 +8,7 @@ import gevent
from typing import Set, Optional
from apps.proxy.config import TSConfig as Config
from redis.exceptions import ConnectionError, TimeoutError
from .constants import EventType
from .constants import EventType, ChannelState, ChannelMetadataField
from .config_helper import ConfigHelper
from .redis_keys import RedisKeys
from .utils import get_logger
@ -26,6 +26,7 @@ class ClientManager:
self.lock = threading.Lock()
self.last_active_time = time.time()
self.worker_id = worker_id # Store worker ID as instance variable
self._heartbeat_running = True # Flag to control heartbeat thread
# STANDARDIZED KEYS: Move client set under channel namespace
self.client_set_key = RedisKeys.clients(channel_id)
@ -77,56 +78,28 @@ class ClientManager:
logger.debug(f"Failed to trigger stats update: {e}")
def _start_heartbeat_thread(self):
"""Start thread to regularly refresh client presence in Redis"""
"""Start thread to regularly refresh client presence in Redis for local clients"""
def heartbeat_task():
no_clients_count = 0 # Track consecutive empty cycles
max_empty_cycles = 3 # Exit after this many consecutive empty checks
logger.debug(f"Started heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
while True:
while self._heartbeat_running:
try:
# Wait for the interval
gevent.sleep(self.heartbeat_interval)
# Wait for the interval, but check stop flag frequently for quick shutdown
# Sleep in 1-second increments to allow faster response to stop signal
for _ in range(int(self.heartbeat_interval)):
if not self._heartbeat_running:
break
time.sleep(1)
# Final check before doing work
if not self._heartbeat_running:
break
# Send heartbeat for all local clients
with self.lock:
if not self.clients or not self.redis_client:
# No clients left, increment our counter
no_clients_count += 1
# Check if we're in a shutdown delay period before exiting
in_shutdown_delay = False
if self.redis_client:
try:
disconnect_key = RedisKeys.last_client_disconnect(self.channel_id)
disconnect_time_bytes = self.redis_client.get(disconnect_key)
if disconnect_time_bytes:
disconnect_time = float(disconnect_time_bytes.decode('utf-8'))
elapsed = time.time() - disconnect_time
shutdown_delay = ConfigHelper.channel_shutdown_delay()
if elapsed < shutdown_delay:
in_shutdown_delay = True
logger.debug(f"Channel {self.channel_id} in shutdown delay: {elapsed:.1f}s of {shutdown_delay}s elapsed")
except Exception as e:
logger.debug(f"Error checking shutdown delay: {e}")
# Only exit if we've seen no clients for several consecutive checks AND we're not in shutdown delay
if no_clients_count >= max_empty_cycles and not in_shutdown_delay:
logger.info(f"No clients for channel {self.channel_id} after {no_clients_count} consecutive checks and not in shutdown delay, exiting heartbeat thread")
return # This exits the thread
# Skip this cycle if we have no clients but continue if in shutdown delay
if not in_shutdown_delay:
continue
else:
# Reset counter during shutdown delay to prevent premature exit
no_clients_count = 0
continue
else:
# Reset counter when we see clients
no_clients_count = 0
# Skip this cycle if we have no local clients
if not self.clients:
continue
# IMPROVED GHOST DETECTION: Check for stale clients before sending heartbeats
current_time = time.time()
@ -197,11 +170,20 @@ class ClientManager:
except Exception as e:
logger.error(f"Error in client heartbeat thread: {e}")
logger.debug(f"Heartbeat thread exiting for channel {self.channel_id}")
thread = threading.Thread(target=heartbeat_task, daemon=True)
thread.name = f"client-heartbeat-{self.channel_id}"
thread.start()
logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
def stop(self):
"""Stop the heartbeat thread and cleanup"""
logger.debug(f"Stopping ClientManager for channel {self.channel_id}")
self._heartbeat_running = False
# Give the thread a moment to exit gracefully
# Note: We don't join() here because it's a daemon thread and will exit on its own
def _execute_redis_command(self, command_func):
"""Execute Redis command with error handling"""
if not self.redis_client:

View file

@ -100,3 +100,12 @@ class ConfigHelper:
def channel_init_grace_period():
"""Get channel initialization grace period in seconds"""
return Config.get_channel_init_grace_period()
@staticmethod
def chunk_timeout():
"""
Get chunk timeout in seconds (used for both socket and HTTP read timeouts).
This controls how long we wait for each chunk before timing out.
Set this higher (e.g., 30s) for slow providers that may have intermittent delays.
"""
return ConfigHelper.get('CHUNK_TIMEOUT', 5) # Default 5 seconds

View file

@ -0,0 +1,138 @@
"""
HTTP Stream Reader - Thread-based HTTP stream reader that writes to a pipe.
This allows us to use the same fetch_chunk() path for both transcode and HTTP streams.
"""
import threading
import os
import requests
from requests.adapters import HTTPAdapter
from .utils import get_logger
logger = get_logger()
class HTTPStreamReader:
"""Thread-based HTTP stream reader that writes to a pipe"""
def __init__(self, url, user_agent=None, chunk_size=8192):
self.url = url
self.user_agent = user_agent
self.chunk_size = chunk_size
self.session = None
self.response = None
self.thread = None
self.pipe_read = None
self.pipe_write = None
self.running = False
def start(self):
"""Start the HTTP stream reader thread"""
# Create a pipe (works on Windows and Unix)
self.pipe_read, self.pipe_write = os.pipe()
# Start the reader thread
self.running = True
self.thread = threading.Thread(target=self._read_stream, daemon=True)
self.thread.start()
logger.info(f"Started HTTP stream reader thread for {self.url}")
return self.pipe_read
def _read_stream(self):
"""Thread worker that reads HTTP stream and writes to pipe"""
try:
# Build headers
headers = {}
if self.user_agent:
headers['User-Agent'] = self.user_agent
logger.info(f"HTTP reader connecting to {self.url}")
# Create session
self.session = requests.Session()
# Disable retries for faster failure detection
adapter = HTTPAdapter(max_retries=0, pool_connections=1, pool_maxsize=1)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# Stream the URL
self.response = self.session.get(
self.url,
headers=headers,
stream=True,
timeout=(5, 30) # 5s connect, 30s read
)
if self.response.status_code != 200:
logger.error(f"HTTP {self.response.status_code} from {self.url}")
return
logger.info(f"HTTP reader connected successfully, streaming data...")
# Stream chunks to pipe
chunk_count = 0
for chunk in self.response.iter_content(chunk_size=self.chunk_size):
if not self.running:
break
if chunk:
try:
# Write binary data to pipe
os.write(self.pipe_write, chunk)
chunk_count += 1
# Log progress periodically
if chunk_count % 1000 == 0:
logger.debug(f"HTTP reader streamed {chunk_count} chunks")
except OSError as e:
logger.error(f"Pipe write error: {e}")
break
logger.info("HTTP stream ended")
except requests.exceptions.RequestException as e:
logger.error(f"HTTP reader request error: {e}")
except Exception as e:
logger.error(f"HTTP reader unexpected error: {e}", exc_info=True)
finally:
self.running = False
# Close write end of pipe to signal EOF
try:
if self.pipe_write is not None:
os.close(self.pipe_write)
self.pipe_write = None
except:
pass
def stop(self):
"""Stop the HTTP stream reader"""
logger.info("Stopping HTTP stream reader")
self.running = False
# Close response
if self.response:
try:
self.response.close()
except:
pass
# Close session
if self.session:
try:
self.session.close()
except:
pass
# Close write end of pipe
if self.pipe_write is not None:
try:
os.close(self.pipe_write)
self.pipe_write = None
except:
pass
# Wait for thread
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)

View file

@ -495,17 +495,18 @@ class ProxyServer:
)
return True
# Create buffer and client manager instances
buffer = StreamBuffer(channel_id, redis_client=self.redis_client)
client_manager = ClientManager(
channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
# Create buffer and client manager instances (or reuse if they exist)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Store in local tracking
self.stream_buffers[channel_id] = buffer
self.client_managers[channel_id] = client_manager
if channel_id not in self.client_managers:
client_manager = ClientManager(
channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# IMPROVED: Set initializing state in Redis BEFORE any other operations
if self.redis_client:
@ -559,13 +560,15 @@ class ProxyServer:
logger.info(f"Channel {channel_id} already owned by worker {current_owner}")
logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only")
# Create buffer but not stream manager
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create buffer but not stream manager (only if not already exists)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id and redis_client (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
@ -580,13 +583,15 @@ class ProxyServer:
# Another worker just acquired ownership
logger.info(f"Another worker just acquired ownership of channel {channel_id}")
# Create buffer but not stream manager
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create buffer but not stream manager (only if not already exists)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id and redis_client (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
@ -641,13 +646,14 @@ class ProxyServer:
logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}")
self.stream_managers[channel_id] = stream_manager
# Create client manager with channel_id, redis_client AND worker_id
client_manager = ClientManager(
channel_id=channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id, redis_client AND worker_id (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(
channel_id=channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# Start stream manager thread only for the owner
thread = threading.Thread(target=stream_manager.run, daemon=True)
@ -855,6 +861,10 @@ class ProxyServer:
# Clean up client manager - SAFE CHECK HERE TOO
if channel_id in self.client_managers:
try:
client_manager = self.client_managers[channel_id]
# Stop the heartbeat thread before deleting
if hasattr(client_manager, 'stop'):
client_manager.stop()
del self.client_managers[channel_id]
logger.info(f"Removed client manager for channel {channel_id}")
except KeyError:

View file

@ -303,6 +303,14 @@ class StreamBuffer:
# Retrieve chunks
chunks = self.get_chunks_exact(client_index, chunk_count)
# Check if we got significantly fewer chunks than expected (likely due to expiration)
# Only check if we expected multiple chunks and got none or very few
if chunk_count > 3 and len(chunks) == 0 and chunks_behind > 10:
# Chunks are missing - likely expired from Redis
# Return empty list to signal client should skip forward
logger.debug(f"Chunks missing for client at index {client_index}, buffer at {self.index} ({chunks_behind} behind)")
return [], client_index
# Check total size
total_size = sum(len(c) for c in chunks)
@ -316,7 +324,7 @@ class StreamBuffer:
additional_size = sum(len(c) for c in more_chunks)
if total_size + additional_size <= MAX_SIZE:
chunks.extend(more_chunks)
chunk_count += additional
chunk_count += len(more_chunks) # Fixed: count actual additional chunks retrieved
return chunks, client_index + chunk_count

View file

@ -204,6 +204,18 @@ class StreamGenerator:
self.empty_reads += 1
self.consecutive_empty += 1
# Check if we're too far behind (chunks expired from Redis)
chunks_behind = self.buffer.index - self.local_index
if chunks_behind > 50: # If more than 50 chunks behind, jump forward
# Calculate new position: stay a few chunks behind current buffer
initial_behind = ConfigHelper.initial_behind_chunks()
new_index = max(self.local_index, self.buffer.index - initial_behind)
logger.warning(f"[{self.client_id}] Client too far behind ({chunks_behind} chunks), jumping from {self.local_index} to {new_index}")
self.local_index = new_index
self.consecutive_empty = 0 # Reset since we're repositioning
continue # Try again immediately with new position
if self._should_send_keepalive(self.local_index):
keepalive_packet = create_ts_packet('keepalive')
logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head")

View file

@ -10,6 +10,7 @@ import gevent
import re
from typing import Optional, List
from django.shortcuts import get_object_or_404
from urllib3.exceptions import ReadTimeoutError
from apps.proxy.config import TSConfig as Config
from apps.channels.models import Channel, Stream
from apps.m3u.models import M3UAccount, M3UAccountProfile
@ -91,11 +92,13 @@ class StreamManager:
self.tried_stream_ids.add(self.current_stream_id)
logger.info(f"Loaded stream ID {self.current_stream_id} from Redis for channel {buffer.channel_id}")
else:
logger.warning(f"No stream_id found in Redis for channel {channel_id}")
logger.warning(f"No stream_id found in Redis for channel {channel_id}. "
f"Stream switching will rely on URL comparison to avoid selecting the same stream.")
except Exception as e:
logger.warning(f"Error loading stream ID from Redis: {e}")
else:
logger.warning(f"Unable to get stream ID for channel {channel_id} - stream switching may not work correctly")
logger.warning(f"Unable to get stream ID for channel {channel_id}. "
f"Stream switching will rely on URL comparison to avoid selecting the same stream.")
logger.info(f"Initialized stream manager for channel {buffer.channel_id}")
@ -111,6 +114,9 @@ class StreamManager:
self.stderr_reader_thread = None
self.ffmpeg_input_phase = True # Track if we're still reading input info
# Add HTTP reader thread property
self.http_reader = None
def _create_session(self):
"""Create and configure requests session with optimal settings"""
session = requests.Session()
@ -737,9 +743,9 @@ class StreamManager:
def _establish_http_connection(self):
"""Establish a direct HTTP connection to the stream"""
"""Establish HTTP connection using thread-based reader (same as transcode path)"""
try:
logger.debug(f"Using TS Proxy to connect to stream: {self.url}")
logger.debug(f"Using HTTP streamer thread to connect to stream: {self.url}")
# Check if we already have active HTTP connections
if self.current_response or self.current_session:
@ -756,41 +762,39 @@ class StreamManager:
logger.debug(f"Closing existing transcode process before establishing HTTP connection for channel {self.channel_id}")
self._close_socket()
# Create new session for each connection attempt
session = self._create_session()
self.current_session = session
# Use HTTPStreamReader to fetch stream and pipe to a readable file descriptor
# This allows us to use the same fetch_chunk() path as transcode
from .http_streamer import HTTPStreamReader
# Stream the URL with proper timeout handling
response = session.get(
self.url,
stream=True,
timeout=(10, 60) # 10s connect timeout, 60s read timeout
# Create and start the HTTP stream reader
self.http_reader = HTTPStreamReader(
url=self.url,
user_agent=self.user_agent,
chunk_size=self.chunk_size
)
self.current_response = response
if response.status_code == 200:
self.connected = True
self.healthy = True
logger.info(f"Successfully connected to stream source for channel {self.channel_id}")
# Start the reader thread and get the read end of the pipe
pipe_fd = self.http_reader.start()
# Store connection start time for stability tracking
self.connection_start_time = time.time()
# Wrap the file descriptor in a file object (same as transcode stdout)
import os
self.socket = os.fdopen(pipe_fd, 'rb', buffering=0)
self.connected = True
self.healthy = True
# Set channel state to waiting for clients
self._set_waiting_for_clients()
logger.info(f"Successfully started HTTP streamer thread for channel {self.channel_id}")
# Store connection start time for stability tracking
self.connection_start_time = time.time()
# Set channel state to waiting for clients
self._set_waiting_for_clients()
return True
return True
else:
logger.error(f"Failed to connect to stream for channel {self.channel_id}: HTTP {response.status_code}")
self._close_connection()
return False
except requests.exceptions.RequestException as e:
logger.error(f"HTTP request error: {e}")
self._close_connection()
return False
except Exception as e:
logger.error(f"Error establishing HTTP connection for channel {self.channel_id}: {e}", exc_info=True)
self._close_connection()
self._close_socket()
return False
def _update_bytes_processed(self, chunk_size):
@ -818,48 +822,19 @@ class StreamManager:
logger.error(f"Error updating bytes processed: {e}")
def _process_stream_data(self):
"""Process stream data until disconnect or error"""
"""Process stream data until disconnect or error - unified path for both transcode and HTTP"""
try:
if self.transcode:
# Handle transcoded stream data
while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch:
if self.fetch_chunk():
self.last_data_time = time.time()
else:
if not self.running:
break
gevent.sleep(0.1) # REPLACE time.sleep(0.1)
else:
# Handle direct HTTP connection
chunk_count = 0
try:
for chunk in self.current_response.iter_content(chunk_size=self.chunk_size):
# Check if we've been asked to stop
if self.stop_requested or self.url_switching or self.needs_stream_switch:
break
if chunk:
# Track chunk size before adding to buffer
chunk_size = len(chunk)
self._update_bytes_processed(chunk_size)
# Add chunk to buffer with TS packet alignment
success = self.buffer.add_chunk(chunk)
if success:
self.last_data_time = time.time()
chunk_count += 1
# Update last data timestamp in Redis
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
last_data_key = RedisKeys.last_data(self.buffer.channel_id)
self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60)
except (AttributeError, ConnectionError) as e:
if self.stop_requested or self.url_switching:
logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}")
else:
logger.error(f"Unexpected stream error for channel {self.channel_id}: {e}")
raise
# Both transcode and HTTP now use the same subprocess/socket approach
# This gives us perfect control: check flags between chunks, timeout just returns False
while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch:
if self.fetch_chunk():
self.last_data_time = time.time()
else:
# fetch_chunk() returned False - could be timeout, no data, or error
if not self.running:
break
# Brief sleep before retry to avoid tight loop
gevent.sleep(0.1)
except Exception as e:
logger.error(f"Error processing stream data for channel {self.channel_id}: {e}", exc_info=True)
@ -1183,6 +1158,15 @@ class StreamManager:
if self.current_response or self.current_session:
self._close_connection()
# Stop HTTP reader thread if it exists
if hasattr(self, 'http_reader') and self.http_reader:
try:
logger.debug(f"Stopping HTTP reader thread for channel {self.channel_id}")
self.http_reader.stop()
self.http_reader = None
except Exception as e:
logger.debug(f"Error stopping HTTP reader for channel {self.channel_id}: {e}")
# Otherwise handle socket and transcode resources
if self.socket:
try:
@ -1274,7 +1258,7 @@ class StreamManager:
try:
# Set timeout for chunk reads
chunk_timeout = ConfigHelper.get('CHUNK_TIMEOUT', 10) # Default 10 seconds
chunk_timeout = ConfigHelper.chunk_timeout() # Use centralized timeout configuration
try:
# Handle different socket types with timeout
@ -1357,7 +1341,17 @@ class StreamManager:
# Only update if not already past connecting
if not current_state or current_state in [ChannelState.INITIALIZING, ChannelState.CONNECTING]:
# NEW CODE: Check if buffer has enough chunks
current_buffer_index = getattr(self.buffer, 'index', 0)
# IMPORTANT: Read from Redis, not local buffer.index, because in multi-worker setup
# each worker has its own StreamBuffer instance with potentially stale local index
buffer_index_key = RedisKeys.buffer_index(channel_id)
current_buffer_index = 0
try:
redis_index = redis_client.get(buffer_index_key)
if redis_index:
current_buffer_index = int(redis_index)
except Exception as e:
logger.error(f"Error reading buffer index from Redis: {e}")
initial_chunks_needed = ConfigHelper.initial_behind_chunks()
if current_buffer_index < initial_chunks_needed:
@ -1405,10 +1399,21 @@ class StreamManager:
# Clean up completed timers
self._buffer_check_timers = [t for t in self._buffer_check_timers if t.is_alive()]
if hasattr(self.buffer, 'index') and hasattr(self.buffer, 'channel_id'):
current_buffer_index = self.buffer.index
initial_chunks_needed = getattr(Config, 'INITIAL_BEHIND_CHUNKS', 10)
if hasattr(self.buffer, 'channel_id') and hasattr(self.buffer, 'redis_client'):
channel_id = self.buffer.channel_id
redis_client = self.buffer.redis_client
# IMPORTANT: Read from Redis, not local buffer.index
buffer_index_key = RedisKeys.buffer_index(channel_id)
current_buffer_index = 0
try:
redis_index = redis_client.get(buffer_index_key)
if redis_index:
current_buffer_index = int(redis_index)
except Exception as e:
logger.error(f"Error reading buffer index from Redis: {e}")
initial_chunks_needed = ConfigHelper.initial_behind_chunks() # Use ConfigHelper for consistency
if current_buffer_index >= initial_chunks_needed:
# We now have enough buffer, call _set_waiting_for_clients again
@ -1433,6 +1438,7 @@ class StreamManager:
def _try_next_stream(self):
"""
Try to switch to the next available stream for this channel.
Will iterate through multiple alternate streams if needed to find one with a different URL.
Returns:
bool: True if successfully switched to a new stream, False otherwise
@ -1458,60 +1464,71 @@ class StreamManager:
logger.warning(f"All {len(alternate_streams)} alternate streams have been tried for channel {self.channel_id}")
return False
# Get the next stream to try
next_stream = untried_streams[0]
stream_id = next_stream['stream_id']
profile_id = next_stream['profile_id'] # This is the M3U profile ID we need
# IMPROVED: Try multiple streams until we find one with a different URL
for next_stream in untried_streams:
stream_id = next_stream['stream_id']
profile_id = next_stream['profile_id'] # This is the M3U profile ID we need
# Add to tried streams
self.tried_stream_ids.add(stream_id)
# Add to tried streams
self.tried_stream_ids.add(stream_id)
# Get stream info including URL using the profile_id we already have
logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}")
stream_info = get_stream_info_for_switch(self.channel_id, stream_id)
# Get stream info including URL using the profile_id we already have
logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}")
stream_info = get_stream_info_for_switch(self.channel_id, stream_id)
if 'error' in stream_info or not stream_info.get('url'):
logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}")
return False
if 'error' in stream_info or not stream_info.get('url'):
logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}")
continue # Try next stream instead of giving up
# Update URL and user agent
new_url = stream_info['url']
new_user_agent = stream_info['user_agent']
new_transcode = stream_info['transcode']
# Update URL and user agent
new_url = stream_info['url']
new_user_agent = stream_info['user_agent']
new_transcode = stream_info['transcode']
logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}")
# CRITICAL FIX: Check if the new URL is the same as current URL
# This can happen when current_stream_id is None and we accidentally select the same stream
if new_url == self.url:
logger.warning(f"Stream ID {stream_id} generates the same URL as current stream ({new_url}). "
f"Skipping this stream and trying next alternative.")
continue # Try next stream instead of giving up
# IMPORTANT: Just update the URL, don't stop the channel or release resources
switch_result = self.update_url(new_url, stream_id, profile_id)
if not switch_result:
logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}")
return False
logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}")
# Update stream ID tracking
self.current_stream_id = stream_id
# IMPORTANT: Just update the URL, don't stop the channel or release resources
switch_result = self.update_url(new_url, stream_id, profile_id)
if not switch_result:
logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}")
continue # Try next stream
# Store the new user agent and transcode settings
self.user_agent = new_user_agent
self.transcode = new_transcode
# Update stream ID tracking
self.current_stream_id = stream_id
# Update stream metadata in Redis - use the profile_id we got from get_alternate_streams
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
metadata_key = RedisKeys.channel_metadata(self.channel_id)
self.buffer.redis_client.hset(metadata_key, mapping={
ChannelMetadataField.URL: new_url,
ChannelMetadataField.USER_AGENT: new_user_agent,
ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'],
ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams
ChannelMetadataField.STREAM_ID: str(stream_id),
ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()),
ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded"
})
# Store the new user agent and transcode settings
self.user_agent = new_user_agent
self.transcode = new_transcode
# Log the switch
logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}")
# Update stream metadata in Redis - use the profile_id we got from get_alternate_streams
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
metadata_key = RedisKeys.channel_metadata(self.channel_id)
self.buffer.redis_client.hset(metadata_key, mapping={
ChannelMetadataField.URL: new_url,
ChannelMetadataField.USER_AGENT: new_user_agent,
ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'],
ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams
ChannelMetadataField.STREAM_ID: str(stream_id),
ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()),
ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded"
})
logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}")
return True
# Log the switch
logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}")
logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}")
return True
# If we get here, we tried all streams but none worked
logger.error(f"Tried {len(untried_streams)} alternate streams but none were suitable for channel {self.channel_id}")
return False
except Exception as e:
logger.error(f"Error trying next stream for channel {self.channel_id}: {e}", exc_info=True)