diff --git a/apps/proxy/ts_proxy/client_manager.py b/apps/proxy/ts_proxy/client_manager.py index d4b83d3a..3d89b3b8 100644 --- a/apps/proxy/ts_proxy/client_manager.py +++ b/apps/proxy/ts_proxy/client_manager.py @@ -8,7 +8,7 @@ import gevent from typing import Set, Optional from apps.proxy.config import TSConfig as Config from redis.exceptions import ConnectionError, TimeoutError -from .constants import EventType +from .constants import EventType, ChannelState, ChannelMetadataField from .config_helper import ConfigHelper from .redis_keys import RedisKeys from .utils import get_logger @@ -26,6 +26,7 @@ class ClientManager: self.lock = threading.Lock() self.last_active_time = time.time() self.worker_id = worker_id # Store worker ID as instance variable + self._heartbeat_running = True # Flag to control heartbeat thread # STANDARDIZED KEYS: Move client set under channel namespace self.client_set_key = RedisKeys.clients(channel_id) @@ -77,56 +78,28 @@ class ClientManager: logger.debug(f"Failed to trigger stats update: {e}") def _start_heartbeat_thread(self): - """Start thread to regularly refresh client presence in Redis""" + """Start thread to regularly refresh client presence in Redis for local clients""" def heartbeat_task(): - no_clients_count = 0 # Track consecutive empty cycles - max_empty_cycles = 3 # Exit after this many consecutive empty checks - logger.debug(f"Started heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") - while True: + while self._heartbeat_running: try: - # Wait for the interval - gevent.sleep(self.heartbeat_interval) + # Wait for the interval, but check stop flag frequently for quick shutdown + # Sleep in 1-second increments to allow faster response to stop signal + for _ in range(int(self.heartbeat_interval)): + if not self._heartbeat_running: + break + time.sleep(1) + + # Final check before doing work + if not self._heartbeat_running: + break # Send heartbeat for all local clients with self.lock: - if not self.clients or not self.redis_client: - # No clients left, increment our counter - no_clients_count += 1 - - # Check if we're in a shutdown delay period before exiting - in_shutdown_delay = False - if self.redis_client: - try: - disconnect_key = RedisKeys.last_client_disconnect(self.channel_id) - disconnect_time_bytes = self.redis_client.get(disconnect_key) - if disconnect_time_bytes: - disconnect_time = float(disconnect_time_bytes.decode('utf-8')) - elapsed = time.time() - disconnect_time - shutdown_delay = ConfigHelper.channel_shutdown_delay() - - if elapsed < shutdown_delay: - in_shutdown_delay = True - logger.debug(f"Channel {self.channel_id} in shutdown delay: {elapsed:.1f}s of {shutdown_delay}s elapsed") - except Exception as e: - logger.debug(f"Error checking shutdown delay: {e}") - - # Only exit if we've seen no clients for several consecutive checks AND we're not in shutdown delay - if no_clients_count >= max_empty_cycles and not in_shutdown_delay: - logger.info(f"No clients for channel {self.channel_id} after {no_clients_count} consecutive checks and not in shutdown delay, exiting heartbeat thread") - return # This exits the thread - - # Skip this cycle if we have no clients but continue if in shutdown delay - if not in_shutdown_delay: - continue - else: - # Reset counter during shutdown delay to prevent premature exit - no_clients_count = 0 - continue - else: - # Reset counter when we see clients - no_clients_count = 0 + # Skip this cycle if we have no local clients + if not self.clients: + continue # IMPROVED GHOST DETECTION: Check for stale clients before sending heartbeats current_time = time.time() @@ -197,11 +170,20 @@ class ClientManager: except Exception as e: logger.error(f"Error in client heartbeat thread: {e}") + logger.debug(f"Heartbeat thread exiting for channel {self.channel_id}") + thread = threading.Thread(target=heartbeat_task, daemon=True) thread.name = f"client-heartbeat-{self.channel_id}" thread.start() logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") + def stop(self): + """Stop the heartbeat thread and cleanup""" + logger.debug(f"Stopping ClientManager for channel {self.channel_id}") + self._heartbeat_running = False + # Give the thread a moment to exit gracefully + # Note: We don't join() here because it's a daemon thread and will exit on its own + def _execute_redis_command(self, command_func): """Execute Redis command with error handling""" if not self.redis_client: diff --git a/apps/proxy/ts_proxy/config_helper.py b/apps/proxy/ts_proxy/config_helper.py index d59fa1f9..d7d33558 100644 --- a/apps/proxy/ts_proxy/config_helper.py +++ b/apps/proxy/ts_proxy/config_helper.py @@ -100,3 +100,12 @@ class ConfigHelper: def channel_init_grace_period(): """Get channel initialization grace period in seconds""" return Config.get_channel_init_grace_period() + + @staticmethod + def chunk_timeout(): + """ + Get chunk timeout in seconds (used for both socket and HTTP read timeouts). + This controls how long we wait for each chunk before timing out. + Set this higher (e.g., 30s) for slow providers that may have intermittent delays. + """ + return ConfigHelper.get('CHUNK_TIMEOUT', 5) # Default 5 seconds diff --git a/apps/proxy/ts_proxy/http_streamer.py b/apps/proxy/ts_proxy/http_streamer.py new file mode 100644 index 00000000..147d2c93 --- /dev/null +++ b/apps/proxy/ts_proxy/http_streamer.py @@ -0,0 +1,138 @@ +""" +HTTP Stream Reader - Thread-based HTTP stream reader that writes to a pipe. +This allows us to use the same fetch_chunk() path for both transcode and HTTP streams. +""" + +import threading +import os +import requests +from requests.adapters import HTTPAdapter +from .utils import get_logger + +logger = get_logger() + + +class HTTPStreamReader: + """Thread-based HTTP stream reader that writes to a pipe""" + + def __init__(self, url, user_agent=None, chunk_size=8192): + self.url = url + self.user_agent = user_agent + self.chunk_size = chunk_size + self.session = None + self.response = None + self.thread = None + self.pipe_read = None + self.pipe_write = None + self.running = False + + def start(self): + """Start the HTTP stream reader thread""" + # Create a pipe (works on Windows and Unix) + self.pipe_read, self.pipe_write = os.pipe() + + # Start the reader thread + self.running = True + self.thread = threading.Thread(target=self._read_stream, daemon=True) + self.thread.start() + + logger.info(f"Started HTTP stream reader thread for {self.url}") + return self.pipe_read + + def _read_stream(self): + """Thread worker that reads HTTP stream and writes to pipe""" + try: + # Build headers + headers = {} + if self.user_agent: + headers['User-Agent'] = self.user_agent + + logger.info(f"HTTP reader connecting to {self.url}") + + # Create session + self.session = requests.Session() + + # Disable retries for faster failure detection + adapter = HTTPAdapter(max_retries=0, pool_connections=1, pool_maxsize=1) + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + + # Stream the URL + self.response = self.session.get( + self.url, + headers=headers, + stream=True, + timeout=(5, 30) # 5s connect, 30s read + ) + + if self.response.status_code != 200: + logger.error(f"HTTP {self.response.status_code} from {self.url}") + return + + logger.info(f"HTTP reader connected successfully, streaming data...") + + # Stream chunks to pipe + chunk_count = 0 + for chunk in self.response.iter_content(chunk_size=self.chunk_size): + if not self.running: + break + + if chunk: + try: + # Write binary data to pipe + os.write(self.pipe_write, chunk) + chunk_count += 1 + + # Log progress periodically + if chunk_count % 1000 == 0: + logger.debug(f"HTTP reader streamed {chunk_count} chunks") + except OSError as e: + logger.error(f"Pipe write error: {e}") + break + + logger.info("HTTP stream ended") + + except requests.exceptions.RequestException as e: + logger.error(f"HTTP reader request error: {e}") + except Exception as e: + logger.error(f"HTTP reader unexpected error: {e}", exc_info=True) + finally: + self.running = False + # Close write end of pipe to signal EOF + try: + if self.pipe_write is not None: + os.close(self.pipe_write) + self.pipe_write = None + except: + pass + + def stop(self): + """Stop the HTTP stream reader""" + logger.info("Stopping HTTP stream reader") + self.running = False + + # Close response + if self.response: + try: + self.response.close() + except: + pass + + # Close session + if self.session: + try: + self.session.close() + except: + pass + + # Close write end of pipe + if self.pipe_write is not None: + try: + os.close(self.pipe_write) + self.pipe_write = None + except: + pass + + # Wait for thread + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=2.0) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 0d638c1a..cca827a9 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -495,17 +495,18 @@ class ProxyServer: ) return True - # Create buffer and client manager instances - buffer = StreamBuffer(channel_id, redis_client=self.redis_client) - client_manager = ClientManager( - channel_id, - redis_client=self.redis_client, - worker_id=self.worker_id - ) + # Create buffer and client manager instances (or reuse if they exist) + if channel_id not in self.stream_buffers: + buffer = StreamBuffer(channel_id, redis_client=self.redis_client) + self.stream_buffers[channel_id] = buffer - # Store in local tracking - self.stream_buffers[channel_id] = buffer - self.client_managers[channel_id] = client_manager + if channel_id not in self.client_managers: + client_manager = ClientManager( + channel_id, + redis_client=self.redis_client, + worker_id=self.worker_id + ) + self.client_managers[channel_id] = client_manager # IMPROVED: Set initializing state in Redis BEFORE any other operations if self.redis_client: @@ -559,13 +560,15 @@ class ProxyServer: logger.info(f"Channel {channel_id} already owned by worker {current_owner}") logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only") - # Create buffer but not stream manager - buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) - self.stream_buffers[channel_id] = buffer + # Create buffer but not stream manager (only if not already exists) + if channel_id not in self.stream_buffers: + buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) + self.stream_buffers[channel_id] = buffer - # Create client manager with channel_id and redis_client - client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) - self.client_managers[channel_id] = client_manager + # Create client manager with channel_id and redis_client (only if not already exists) + if channel_id not in self.client_managers: + client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) + self.client_managers[channel_id] = client_manager return True @@ -580,13 +583,15 @@ class ProxyServer: # Another worker just acquired ownership logger.info(f"Another worker just acquired ownership of channel {channel_id}") - # Create buffer but not stream manager - buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) - self.stream_buffers[channel_id] = buffer + # Create buffer but not stream manager (only if not already exists) + if channel_id not in self.stream_buffers: + buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) + self.stream_buffers[channel_id] = buffer - # Create client manager with channel_id and redis_client - client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) - self.client_managers[channel_id] = client_manager + # Create client manager with channel_id and redis_client (only if not already exists) + if channel_id not in self.client_managers: + client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) + self.client_managers[channel_id] = client_manager return True @@ -641,13 +646,14 @@ class ProxyServer: logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}") self.stream_managers[channel_id] = stream_manager - # Create client manager with channel_id, redis_client AND worker_id - client_manager = ClientManager( - channel_id=channel_id, - redis_client=self.redis_client, - worker_id=self.worker_id - ) - self.client_managers[channel_id] = client_manager + # Create client manager with channel_id, redis_client AND worker_id (only if not already exists) + if channel_id not in self.client_managers: + client_manager = ClientManager( + channel_id=channel_id, + redis_client=self.redis_client, + worker_id=self.worker_id + ) + self.client_managers[channel_id] = client_manager # Start stream manager thread only for the owner thread = threading.Thread(target=stream_manager.run, daemon=True) @@ -855,6 +861,10 @@ class ProxyServer: # Clean up client manager - SAFE CHECK HERE TOO if channel_id in self.client_managers: try: + client_manager = self.client_managers[channel_id] + # Stop the heartbeat thread before deleting + if hasattr(client_manager, 'stop'): + client_manager.stop() del self.client_managers[channel_id] logger.info(f"Removed client manager for channel {channel_id}") except KeyError: diff --git a/apps/proxy/ts_proxy/stream_buffer.py b/apps/proxy/ts_proxy/stream_buffer.py index a5169c3a..85feb5dd 100644 --- a/apps/proxy/ts_proxy/stream_buffer.py +++ b/apps/proxy/ts_proxy/stream_buffer.py @@ -303,6 +303,14 @@ class StreamBuffer: # Retrieve chunks chunks = self.get_chunks_exact(client_index, chunk_count) + # Check if we got significantly fewer chunks than expected (likely due to expiration) + # Only check if we expected multiple chunks and got none or very few + if chunk_count > 3 and len(chunks) == 0 and chunks_behind > 10: + # Chunks are missing - likely expired from Redis + # Return empty list to signal client should skip forward + logger.debug(f"Chunks missing for client at index {client_index}, buffer at {self.index} ({chunks_behind} behind)") + return [], client_index + # Check total size total_size = sum(len(c) for c in chunks) @@ -316,7 +324,7 @@ class StreamBuffer: additional_size = sum(len(c) for c in more_chunks) if total_size + additional_size <= MAX_SIZE: chunks.extend(more_chunks) - chunk_count += additional + chunk_count += len(more_chunks) # Fixed: count actual additional chunks retrieved return chunks, client_index + chunk_count diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index 817a7b82..368691b8 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -204,6 +204,18 @@ class StreamGenerator: self.empty_reads += 1 self.consecutive_empty += 1 + # Check if we're too far behind (chunks expired from Redis) + chunks_behind = self.buffer.index - self.local_index + if chunks_behind > 50: # If more than 50 chunks behind, jump forward + # Calculate new position: stay a few chunks behind current buffer + initial_behind = ConfigHelper.initial_behind_chunks() + new_index = max(self.local_index, self.buffer.index - initial_behind) + + logger.warning(f"[{self.client_id}] Client too far behind ({chunks_behind} chunks), jumping from {self.local_index} to {new_index}") + self.local_index = new_index + self.consecutive_empty = 0 # Reset since we're repositioning + continue # Try again immediately with new position + if self._should_send_keepalive(self.local_index): keepalive_packet = create_ts_packet('keepalive') logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head") diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index be6a9c4e..d1f4ded6 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -10,6 +10,7 @@ import gevent import re from typing import Optional, List from django.shortcuts import get_object_or_404 +from urllib3.exceptions import ReadTimeoutError from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile @@ -91,11 +92,13 @@ class StreamManager: self.tried_stream_ids.add(self.current_stream_id) logger.info(f"Loaded stream ID {self.current_stream_id} from Redis for channel {buffer.channel_id}") else: - logger.warning(f"No stream_id found in Redis for channel {channel_id}") + logger.warning(f"No stream_id found in Redis for channel {channel_id}. " + f"Stream switching will rely on URL comparison to avoid selecting the same stream.") except Exception as e: logger.warning(f"Error loading stream ID from Redis: {e}") else: - logger.warning(f"Unable to get stream ID for channel {channel_id} - stream switching may not work correctly") + logger.warning(f"Unable to get stream ID for channel {channel_id}. " + f"Stream switching will rely on URL comparison to avoid selecting the same stream.") logger.info(f"Initialized stream manager for channel {buffer.channel_id}") @@ -111,6 +114,9 @@ class StreamManager: self.stderr_reader_thread = None self.ffmpeg_input_phase = True # Track if we're still reading input info + # Add HTTP reader thread property + self.http_reader = None + def _create_session(self): """Create and configure requests session with optimal settings""" session = requests.Session() @@ -737,9 +743,9 @@ class StreamManager: def _establish_http_connection(self): - """Establish a direct HTTP connection to the stream""" + """Establish HTTP connection using thread-based reader (same as transcode path)""" try: - logger.debug(f"Using TS Proxy to connect to stream: {self.url}") + logger.debug(f"Using HTTP streamer thread to connect to stream: {self.url}") # Check if we already have active HTTP connections if self.current_response or self.current_session: @@ -756,41 +762,39 @@ class StreamManager: logger.debug(f"Closing existing transcode process before establishing HTTP connection for channel {self.channel_id}") self._close_socket() - # Create new session for each connection attempt - session = self._create_session() - self.current_session = session + # Use HTTPStreamReader to fetch stream and pipe to a readable file descriptor + # This allows us to use the same fetch_chunk() path as transcode + from .http_streamer import HTTPStreamReader - # Stream the URL with proper timeout handling - response = session.get( - self.url, - stream=True, - timeout=(10, 60) # 10s connect timeout, 60s read timeout + # Create and start the HTTP stream reader + self.http_reader = HTTPStreamReader( + url=self.url, + user_agent=self.user_agent, + chunk_size=self.chunk_size ) - self.current_response = response - if response.status_code == 200: - self.connected = True - self.healthy = True - logger.info(f"Successfully connected to stream source for channel {self.channel_id}") + # Start the reader thread and get the read end of the pipe + pipe_fd = self.http_reader.start() - # Store connection start time for stability tracking - self.connection_start_time = time.time() + # Wrap the file descriptor in a file object (same as transcode stdout) + import os + self.socket = os.fdopen(pipe_fd, 'rb', buffering=0) + self.connected = True + self.healthy = True - # Set channel state to waiting for clients - self._set_waiting_for_clients() + logger.info(f"Successfully started HTTP streamer thread for channel {self.channel_id}") + + # Store connection start time for stability tracking + self.connection_start_time = time.time() + + # Set channel state to waiting for clients + self._set_waiting_for_clients() + + return True - return True - else: - logger.error(f"Failed to connect to stream for channel {self.channel_id}: HTTP {response.status_code}") - self._close_connection() - return False - except requests.exceptions.RequestException as e: - logger.error(f"HTTP request error: {e}") - self._close_connection() - return False except Exception as e: logger.error(f"Error establishing HTTP connection for channel {self.channel_id}: {e}", exc_info=True) - self._close_connection() + self._close_socket() return False def _update_bytes_processed(self, chunk_size): @@ -818,48 +822,19 @@ class StreamManager: logger.error(f"Error updating bytes processed: {e}") def _process_stream_data(self): - """Process stream data until disconnect or error""" + """Process stream data until disconnect or error - unified path for both transcode and HTTP""" try: - if self.transcode: - # Handle transcoded stream data - while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch: - if self.fetch_chunk(): - self.last_data_time = time.time() - else: - if not self.running: - break - gevent.sleep(0.1) # REPLACE time.sleep(0.1) - else: - # Handle direct HTTP connection - chunk_count = 0 - try: - for chunk in self.current_response.iter_content(chunk_size=self.chunk_size): - # Check if we've been asked to stop - if self.stop_requested or self.url_switching or self.needs_stream_switch: - break - - if chunk: - # Track chunk size before adding to buffer - chunk_size = len(chunk) - self._update_bytes_processed(chunk_size) - - # Add chunk to buffer with TS packet alignment - success = self.buffer.add_chunk(chunk) - - if success: - self.last_data_time = time.time() - chunk_count += 1 - - # Update last data timestamp in Redis - if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: - last_data_key = RedisKeys.last_data(self.buffer.channel_id) - self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) - except (AttributeError, ConnectionError) as e: - if self.stop_requested or self.url_switching: - logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}") - else: - logger.error(f"Unexpected stream error for channel {self.channel_id}: {e}") - raise + # Both transcode and HTTP now use the same subprocess/socket approach + # This gives us perfect control: check flags between chunks, timeout just returns False + while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch: + if self.fetch_chunk(): + self.last_data_time = time.time() + else: + # fetch_chunk() returned False - could be timeout, no data, or error + if not self.running: + break + # Brief sleep before retry to avoid tight loop + gevent.sleep(0.1) except Exception as e: logger.error(f"Error processing stream data for channel {self.channel_id}: {e}", exc_info=True) @@ -1183,6 +1158,15 @@ class StreamManager: if self.current_response or self.current_session: self._close_connection() + # Stop HTTP reader thread if it exists + if hasattr(self, 'http_reader') and self.http_reader: + try: + logger.debug(f"Stopping HTTP reader thread for channel {self.channel_id}") + self.http_reader.stop() + self.http_reader = None + except Exception as e: + logger.debug(f"Error stopping HTTP reader for channel {self.channel_id}: {e}") + # Otherwise handle socket and transcode resources if self.socket: try: @@ -1274,7 +1258,7 @@ class StreamManager: try: # Set timeout for chunk reads - chunk_timeout = ConfigHelper.get('CHUNK_TIMEOUT', 10) # Default 10 seconds + chunk_timeout = ConfigHelper.chunk_timeout() # Use centralized timeout configuration try: # Handle different socket types with timeout @@ -1357,7 +1341,17 @@ class StreamManager: # Only update if not already past connecting if not current_state or current_state in [ChannelState.INITIALIZING, ChannelState.CONNECTING]: # NEW CODE: Check if buffer has enough chunks - current_buffer_index = getattr(self.buffer, 'index', 0) + # IMPORTANT: Read from Redis, not local buffer.index, because in multi-worker setup + # each worker has its own StreamBuffer instance with potentially stale local index + buffer_index_key = RedisKeys.buffer_index(channel_id) + current_buffer_index = 0 + try: + redis_index = redis_client.get(buffer_index_key) + if redis_index: + current_buffer_index = int(redis_index) + except Exception as e: + logger.error(f"Error reading buffer index from Redis: {e}") + initial_chunks_needed = ConfigHelper.initial_behind_chunks() if current_buffer_index < initial_chunks_needed: @@ -1405,10 +1399,21 @@ class StreamManager: # Clean up completed timers self._buffer_check_timers = [t for t in self._buffer_check_timers if t.is_alive()] - if hasattr(self.buffer, 'index') and hasattr(self.buffer, 'channel_id'): - current_buffer_index = self.buffer.index - initial_chunks_needed = getattr(Config, 'INITIAL_BEHIND_CHUNKS', 10) + if hasattr(self.buffer, 'channel_id') and hasattr(self.buffer, 'redis_client'): channel_id = self.buffer.channel_id + redis_client = self.buffer.redis_client + + # IMPORTANT: Read from Redis, not local buffer.index + buffer_index_key = RedisKeys.buffer_index(channel_id) + current_buffer_index = 0 + try: + redis_index = redis_client.get(buffer_index_key) + if redis_index: + current_buffer_index = int(redis_index) + except Exception as e: + logger.error(f"Error reading buffer index from Redis: {e}") + + initial_chunks_needed = ConfigHelper.initial_behind_chunks() # Use ConfigHelper for consistency if current_buffer_index >= initial_chunks_needed: # We now have enough buffer, call _set_waiting_for_clients again @@ -1433,6 +1438,7 @@ class StreamManager: def _try_next_stream(self): """ Try to switch to the next available stream for this channel. + Will iterate through multiple alternate streams if needed to find one with a different URL. Returns: bool: True if successfully switched to a new stream, False otherwise @@ -1458,60 +1464,71 @@ class StreamManager: logger.warning(f"All {len(alternate_streams)} alternate streams have been tried for channel {self.channel_id}") return False - # Get the next stream to try - next_stream = untried_streams[0] - stream_id = next_stream['stream_id'] - profile_id = next_stream['profile_id'] # This is the M3U profile ID we need + # IMPROVED: Try multiple streams until we find one with a different URL + for next_stream in untried_streams: + stream_id = next_stream['stream_id'] + profile_id = next_stream['profile_id'] # This is the M3U profile ID we need - # Add to tried streams - self.tried_stream_ids.add(stream_id) + # Add to tried streams + self.tried_stream_ids.add(stream_id) - # Get stream info including URL using the profile_id we already have - logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}") - stream_info = get_stream_info_for_switch(self.channel_id, stream_id) + # Get stream info including URL using the profile_id we already have + logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}") + stream_info = get_stream_info_for_switch(self.channel_id, stream_id) - if 'error' in stream_info or not stream_info.get('url'): - logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}") - return False + if 'error' in stream_info or not stream_info.get('url'): + logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}") + continue # Try next stream instead of giving up - # Update URL and user agent - new_url = stream_info['url'] - new_user_agent = stream_info['user_agent'] - new_transcode = stream_info['transcode'] + # Update URL and user agent + new_url = stream_info['url'] + new_user_agent = stream_info['user_agent'] + new_transcode = stream_info['transcode'] - logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}") + # CRITICAL FIX: Check if the new URL is the same as current URL + # This can happen when current_stream_id is None and we accidentally select the same stream + if new_url == self.url: + logger.warning(f"Stream ID {stream_id} generates the same URL as current stream ({new_url}). " + f"Skipping this stream and trying next alternative.") + continue # Try next stream instead of giving up - # IMPORTANT: Just update the URL, don't stop the channel or release resources - switch_result = self.update_url(new_url, stream_id, profile_id) - if not switch_result: - logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}") - return False + logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}") - # Update stream ID tracking - self.current_stream_id = stream_id + # IMPORTANT: Just update the URL, don't stop the channel or release resources + switch_result = self.update_url(new_url, stream_id, profile_id) + if not switch_result: + logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}") + continue # Try next stream - # Store the new user agent and transcode settings - self.user_agent = new_user_agent - self.transcode = new_transcode + # Update stream ID tracking + self.current_stream_id = stream_id - # Update stream metadata in Redis - use the profile_id we got from get_alternate_streams - if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: - metadata_key = RedisKeys.channel_metadata(self.channel_id) - self.buffer.redis_client.hset(metadata_key, mapping={ - ChannelMetadataField.URL: new_url, - ChannelMetadataField.USER_AGENT: new_user_agent, - ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'], - ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams - ChannelMetadataField.STREAM_ID: str(stream_id), - ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()), - ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded" - }) + # Store the new user agent and transcode settings + self.user_agent = new_user_agent + self.transcode = new_transcode - # Log the switch - logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}") + # Update stream metadata in Redis - use the profile_id we got from get_alternate_streams + if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: + metadata_key = RedisKeys.channel_metadata(self.channel_id) + self.buffer.redis_client.hset(metadata_key, mapping={ + ChannelMetadataField.URL: new_url, + ChannelMetadataField.USER_AGENT: new_user_agent, + ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'], + ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams + ChannelMetadataField.STREAM_ID: str(stream_id), + ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()), + ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded" + }) - logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}") - return True + # Log the switch + logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}") + + logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}") + return True + + # If we get here, we tried all streams but none worked + logger.error(f"Tried {len(untried_streams)} alternate streams but none were suitable for channel {self.channel_id}") + return False except Exception as e: logger.error(f"Error trying next stream for channel {self.channel_id}: {e}", exc_info=True)