diff --git a/apps/channels/models.py b/apps/channels/models.py index e012e3fd..60c35923 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -288,32 +288,62 @@ class Channel(models.Model): def get_stream(self): """ Finds an available stream for the requested channel and returns the selected stream and profile. + + Returns: + Tuple[Optional[int], Optional[int], Optional[str]]: (stream_id, profile_id, error_reason) """ redis_client = RedisClient.get_client() + error_reason = None - # 2. Check if a stream is already active for this channel - stream_id = redis_client.get(f"channel_stream:{self.id}") - if stream_id: - stream_id = int(stream_id) - profile_id = redis_client.get(f"stream_profile:{stream_id}") - if profile_id: - profile_id = int(profile_id) - return stream_id, profile_id + # Check if this channel has any streams + if not self.streams.exists(): + error_reason = "No streams assigned to channel" + return None, None, error_reason - # 3. Iterate through channel streams and their profiles + # Check if a stream is already active for this channel + stream_id_bytes = redis_client.get(f"channel_stream:{self.id}") + if stream_id_bytes: + try: + stream_id = int(stream_id_bytes) + profile_id_bytes = redis_client.get(f"stream_profile:{stream_id}") + if profile_id_bytes: + try: + profile_id = int(profile_id_bytes) + return stream_id, profile_id, None + except (ValueError, TypeError): + logger.debug(f"Invalid profile ID retrieved from Redis: {profile_id_bytes}") + except (ValueError, TypeError): + logger.debug(f"Invalid stream ID retrieved from Redis: {stream_id_bytes}") + + # No existing active stream, attempt to assign a new one + has_streams_but_maxed_out = False + has_active_profiles = False + + # Iterate through channel streams and their profiles for stream in self.streams.all().order_by('channelstream__order'): # Retrieve the M3U account associated with the stream. m3u_account = stream.m3u_account + if not m3u_account: + logger.debug(f"Stream {stream.id} has no M3U account") + continue + m3u_profiles = m3u_account.profiles.all() default_profile = next((obj for obj in m3u_profiles if obj.is_default), None) + + if not default_profile: + logger.debug(f"M3U account {m3u_account.id} has no default profile") + continue + profiles = [default_profile] + [obj for obj in m3u_profiles if not obj.is_default] for profile in profiles: - logger.info(profile) # Skip inactive profiles - if profile.is_active == False: + if not profile.is_active: + logger.debug(f"Skipping inactive profile {profile.id}") continue + has_active_profiles = True + profile_connections_key = f"profile_connections:{profile.id}" current_connections = int(redis_client.get(profile_connections_key) or 0) @@ -321,16 +351,27 @@ class Channel(models.Model): if profile.max_streams == 0 or current_connections < profile.max_streams: # Start a new stream redis_client.set(f"channel_stream:{self.id}", stream.id) - redis_client.set(f"stream_profile:{stream.id}", profile.id) # Store only the matched profile + redis_client.set(f"stream_profile:{stream.id}", profile.id) # Increment connection count for profiles with limits if profile.max_streams > 0: redis_client.incr(profile_connections_key) - return stream.id, profile.id # Return newly assigned stream and matched profile + return stream.id, profile.id, None # Return newly assigned stream and matched profile + else: + # This profile is at max connections + has_streams_but_maxed_out = True + logger.debug(f"Profile {profile.id} at max connections: {current_connections}/{profile.max_streams}") - # 4. No available streams - return None, None + # No available streams - determine specific reason + if has_streams_but_maxed_out: + error_reason = "All M3U profiles have reached maximum connection limits" + elif has_active_profiles: + error_reason = "No compatible profile found for any assigned stream" + else: + error_reason = "No active profiles found for any assigned stream" + + return None, None, error_reason def release_stream(self): """ diff --git a/apps/proxy/config.py b/apps/proxy/config.py index 28d3b872..f8151ad8 100644 --- a/apps/proxy/config.py +++ b/apps/proxy/config.py @@ -5,6 +5,10 @@ class BaseConfig: CHUNK_SIZE = 8192 CLIENT_POLL_INTERVAL = 0.1 MAX_RETRIES = 3 + RETRY_WAIT_INTERVAL = 0.5 # seconds to wait between retries + CONNECTION_TIMEOUT = 10 # seconds to wait for initial connection + MAX_STREAM_SWITCHES = 10 # Maximum number of stream switch attempts before giving up + BUFFER_CHUNK_SIZE = 188 * 1361 # ~256KB # Redis settings REDIS_CHUNK_TTL = 60 # Number in seconds - Chunks expire after 1 minute @@ -24,10 +28,6 @@ class HLSConfig(BaseConfig): class TSConfig(BaseConfig): """Configuration settings for TS proxy""" - # Connection settings - CONNECTION_TIMEOUT = 10 # seconds to wait for initial connection - MAX_RETRIES = 3 # maximum connection retry attempts - # Buffer settings INITIAL_BEHIND_CHUNKS = 4 # How many chunks behind to start a client (4 chunks = ~1MB) CHUNK_BATCH_SIZE = 5 # How many chunks to fetch in one batch @@ -52,8 +52,7 @@ class TSConfig(BaseConfig): # TS packets are 188 bytes # Make chunk size a multiple of TS packet size for perfect alignment # ~1MB is ideal for streaming (matches typical media buffer sizes) - BUFFER_CHUNK_SIZE = 188 * 1361 # ~256KB - # Maximum number of stream switch attempts before giving up - MAX_STREAM_SWITCHES = 10 + + diff --git a/apps/proxy/ts_proxy/config_helper.py b/apps/proxy/ts_proxy/config_helper.py index c0576cb7..f78ba0b6 100644 --- a/apps/proxy/ts_proxy/config_helper.py +++ b/apps/proxy/ts_proxy/config_helper.py @@ -70,3 +70,8 @@ class ConfigHelper: def max_stream_switches(): """Get maximum number of stream switch attempts""" return ConfigHelper.get('MAX_STREAM_SWITCHES', 10) + + @staticmethod + def retry_wait_interval(): + """Get wait interval between connection retries in seconds""" + return ConfigHelper.get('RETRY_WAIT_INTERVAL', 0.5) # Default to 0.5 second diff --git a/apps/proxy/ts_proxy/stream_buffer.py b/apps/proxy/ts_proxy/stream_buffer.py index a94204ab..4d73bdc2 100644 --- a/apps/proxy/ts_proxy/stream_buffer.py +++ b/apps/proxy/ts_proxy/stream_buffer.py @@ -94,7 +94,7 @@ class StreamBuffer: writes_done += 1 if writes_done > 0: - logger.debug(f"Added {writes_done} optimized chunks ({self.target_chunk_size} bytes each) to Redis") + logger.debug(f"Added {writes_done} chunks ({self.target_chunk_size} bytes each) to Redis for channel {self.channel_id} at index {self.index}") return True diff --git a/apps/proxy/ts_proxy/url_utils.py b/apps/proxy/ts_proxy/url_utils.py index 6f1b6495..f80aae75 100644 --- a/apps/proxy/ts_proxy/url_utils.py +++ b/apps/proxy/ts_proxy/url_utils.py @@ -34,41 +34,54 @@ def generate_stream_url(channel_id: str) -> Tuple[str, str, bool, Optional[int]] Returns: Tuple[str, str, bool, Optional[int]]: (stream_url, user_agent, transcode_flag, profile_id) """ - # Get channel and related objects - channel = get_stream_object(channel_id) - stream_id, profile_id = channel.get_stream() + try: + channel = get_stream_object(channel_id) - if stream_id is None or profile_id is None: - logger.error(f"No stream assigned to channel {channel_id}") + # Get stream and profile for this channel + # Note: get_stream now returns 3 values (stream_id, profile_id, error_reason) + stream_id, profile_id, error_reason = channel.get_stream() + + if not stream_id or not profile_id: + logger.error(f"No stream available for channel {channel_id}: {error_reason}") + return None, None, False, None + + # Look up the Stream and Profile objects + try: + stream = Stream.objects.get(id=stream_id) + profile = M3UAccountProfile.objects.get(id=profile_id) + except (Stream.DoesNotExist, M3UAccountProfile.DoesNotExist) as e: + logger.error(f"Error getting stream or profile: {e}") + return None, None, False, None + + # Get the M3U account profile for URL pattern + m3u_profile = profile + + # Get the appropriate user agent + m3u_account = M3UAccount.objects.get(id=m3u_profile.m3u_account.id) + stream_user_agent = UserAgent.objects.get(id=m3u_account.user_agent.id).user_agent + + if stream_user_agent is None: + stream_user_agent = UserAgent.objects.get(id=CoreSettings.get_default_user_agent_id()) + logger.debug(f"No user agent found for account, using default: {stream_user_agent}") + + # Generate stream URL based on the selected profile + input_url = stream.url + stream_url = transform_url(input_url, m3u_profile.search_pattern, m3u_profile.replace_pattern) + + # Check if transcoding is needed + stream_profile = channel.get_stream_profile() + if stream_profile.is_proxy() or stream_profile is None: + transcode = False + else: + transcode = True + + stream_profile_id = stream_profile.id + + return stream_url, stream_user_agent, transcode, stream_profile_id + except Exception as e: + logger.error(f"Error generating stream URL: {e}") return None, None, False, None - # Get the M3U account profile for URL pattern - stream = get_object_or_404(Stream, pk=stream_id) - m3u_profile = get_object_or_404(M3UAccountProfile, pk=profile_id) - - # Get the appropriate user agent - m3u_account = M3UAccount.objects.get(id=m3u_profile.m3u_account.id) - stream_user_agent = UserAgent.objects.get(id=m3u_account.user_agent.id).user_agent - - if stream_user_agent is None: - stream_user_agent = UserAgent.objects.get(id=CoreSettings.get_default_user_agent_id()) - logger.debug(f"No user agent found for account, using default: {stream_user_agent}") - - # Generate stream URL based on the selected profile - input_url = stream.url - stream_url = transform_url(input_url, m3u_profile.search_pattern, m3u_profile.replace_pattern) - - # Check if transcoding is needed - stream_profile = channel.get_stream_profile() - if stream_profile.is_proxy() or stream_profile is None: - transcode = False - else: - transcode = True - - stream_profile_id = stream_profile.id - - return stream_url, stream_user_agent, transcode, stream_profile_id - def transform_url(input_url: str, search_pattern: str, replace_pattern: str) -> str: """ Transform a URL using regex pattern replacement. @@ -140,9 +153,9 @@ def get_stream_info_for_switch(channel_id: str, target_stream_id: Optional[int] # Use first available profile m3u_profile_id = profiles.first().id else: - stream_id, m3u_profile_id = channel.get_stream() + stream_id, m3u_profile_id, error_reason = channel.get_stream() if stream_id is None or m3u_profile_id is None: - return {'error': 'No stream assigned to channel'} + return {'error': error_reason or 'No stream assigned to channel'} # Get the stream and profile objects directly stream = get_object_or_404(Stream, pk=stream_id) diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index de140035..32d552da 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -86,8 +86,41 @@ def stream_ts(request, channel_id): # Initialize the channel (but don't wait for completion) logger.info(f"[{client_id}] Starting channel {channel_id} initialization") - # Use the utility function to get stream URL and settings - stream_url, stream_user_agent, transcode, profile_value = generate_stream_url(channel_id) + # Use max retry attempts and connection timeout from config + max_retries = ConfigHelper.max_retries() + retry_timeout = ConfigHelper.connection_timeout() + wait_start_time = time.time() + + stream_url = None + stream_user_agent = None + transcode = False + profile_value = None + error_reason = None + + # Try to get a stream with configured retries + for attempt in range(max_retries): + stream_url, stream_user_agent, transcode, profile_value = generate_stream_url(channel_id) + + if stream_url is not None: + logger.info(f"[{client_id}] Successfully obtained stream for channel {channel_id}") + break + + # If we failed because there are no streams assigned, don't retry + _, _, error_reason = channel.get_stream() + if error_reason and 'maximum connection limits' not in error_reason: + logger.warning(f"[{client_id}] Can't retry - error not related to connection limits: {error_reason}") + break + + # Don't exceed the overall connection timeout + if time.time() - wait_start_time > retry_timeout: + logger.warning(f"[{client_id}] Connection wait timeout exceeded ({retry_timeout}s)") + break + + # Wait before retrying (using exponential backoff with a cap) + wait_time = min(0.5 * (2 ** attempt), 2.0) # Caps at 2 seconds + logger.info(f"[{client_id}] Waiting {wait_time:.1f}s for a connection to become available (attempt {attempt+1}/{max_retries})") + time.sleep(wait_time) + if stream_url is None: # Make sure to release any stream locks that might have been acquired if hasattr(channel, 'streams') and channel.streams.exists(): @@ -98,10 +131,16 @@ def stream_ts(request, channel_id): except Exception as e: logger.error(f"[{client_id}] Error releasing stream: {e}") - return JsonResponse({'error': 'No available streams for this channel'}, status=404) + # Get the specific error message if available + wait_duration = f"{int(time.time() - wait_start_time)}s" + error_msg = error_reason if error_reason else 'No available streams for this channel' + return JsonResponse({ + 'error': error_msg, + 'waited': wait_duration + }, status=503) # 503 Service Unavailable is appropriate here # Get the stream ID from the channel - stream_id, m3u_profile_id = channel.get_stream() + stream_id, m3u_profile_id, _ = channel.get_stream() logger.info(f"Channel {channel_id} using stream ID {stream_id}, m3u account profile ID {m3u_profile_id}") # Generate transcode command if needed