Enhance stream retrieval process with error handling and retry logic

- Updated `get_stream` method to return error reasons for better debugging.
- Improved `generate_stream_url` to handle errors and log specific issues.
- Implemented retry mechanism in `stream_ts` view for obtaining streams with exponential backoff.
- Added new configuration options for connection timeout and retry intervals.
This commit is contained in:
SergeantPanda 2025-04-10 08:38:53 -05:00
parent 5c74aec790
commit 6ccc2b9a6d
6 changed files with 158 additions and 61 deletions

View file

@ -288,32 +288,62 @@ class Channel(models.Model):
def get_stream(self):
"""
Finds an available stream for the requested channel and returns the selected stream and profile.
Returns:
Tuple[Optional[int], Optional[int], Optional[str]]: (stream_id, profile_id, error_reason)
"""
redis_client = RedisClient.get_client()
error_reason = None
# 2. Check if a stream is already active for this channel
stream_id = redis_client.get(f"channel_stream:{self.id}")
if stream_id:
stream_id = int(stream_id)
profile_id = redis_client.get(f"stream_profile:{stream_id}")
if profile_id:
profile_id = int(profile_id)
return stream_id, profile_id
# Check if this channel has any streams
if not self.streams.exists():
error_reason = "No streams assigned to channel"
return None, None, error_reason
# 3. Iterate through channel streams and their profiles
# Check if a stream is already active for this channel
stream_id_bytes = redis_client.get(f"channel_stream:{self.id}")
if stream_id_bytes:
try:
stream_id = int(stream_id_bytes)
profile_id_bytes = redis_client.get(f"stream_profile:{stream_id}")
if profile_id_bytes:
try:
profile_id = int(profile_id_bytes)
return stream_id, profile_id, None
except (ValueError, TypeError):
logger.debug(f"Invalid profile ID retrieved from Redis: {profile_id_bytes}")
except (ValueError, TypeError):
logger.debug(f"Invalid stream ID retrieved from Redis: {stream_id_bytes}")
# No existing active stream, attempt to assign a new one
has_streams_but_maxed_out = False
has_active_profiles = False
# Iterate through channel streams and their profiles
for stream in self.streams.all().order_by('channelstream__order'):
# Retrieve the M3U account associated with the stream.
m3u_account = stream.m3u_account
if not m3u_account:
logger.debug(f"Stream {stream.id} has no M3U account")
continue
m3u_profiles = m3u_account.profiles.all()
default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
if not default_profile:
logger.debug(f"M3U account {m3u_account.id} has no default profile")
continue
profiles = [default_profile] + [obj for obj in m3u_profiles if not obj.is_default]
for profile in profiles:
logger.info(profile)
# Skip inactive profiles
if profile.is_active == False:
if not profile.is_active:
logger.debug(f"Skipping inactive profile {profile.id}")
continue
has_active_profiles = True
profile_connections_key = f"profile_connections:{profile.id}"
current_connections = int(redis_client.get(profile_connections_key) or 0)
@ -321,16 +351,27 @@ class Channel(models.Model):
if profile.max_streams == 0 or current_connections < profile.max_streams:
# Start a new stream
redis_client.set(f"channel_stream:{self.id}", stream.id)
redis_client.set(f"stream_profile:{stream.id}", profile.id) # Store only the matched profile
redis_client.set(f"stream_profile:{stream.id}", profile.id)
# Increment connection count for profiles with limits
if profile.max_streams > 0:
redis_client.incr(profile_connections_key)
return stream.id, profile.id # Return newly assigned stream and matched profile
return stream.id, profile.id, None # Return newly assigned stream and matched profile
else:
# This profile is at max connections
has_streams_but_maxed_out = True
logger.debug(f"Profile {profile.id} at max connections: {current_connections}/{profile.max_streams}")
# 4. No available streams
return None, None
# No available streams - determine specific reason
if has_streams_but_maxed_out:
error_reason = "All M3U profiles have reached maximum connection limits"
elif has_active_profiles:
error_reason = "No compatible profile found for any assigned stream"
else:
error_reason = "No active profiles found for any assigned stream"
return None, None, error_reason
def release_stream(self):
"""

View file

@ -5,6 +5,10 @@ class BaseConfig:
CHUNK_SIZE = 8192
CLIENT_POLL_INTERVAL = 0.1
MAX_RETRIES = 3
RETRY_WAIT_INTERVAL = 0.5 # seconds to wait between retries
CONNECTION_TIMEOUT = 10 # seconds to wait for initial connection
MAX_STREAM_SWITCHES = 10 # Maximum number of stream switch attempts before giving up
BUFFER_CHUNK_SIZE = 188 * 1361 # ~256KB
# Redis settings
REDIS_CHUNK_TTL = 60 # Number in seconds - Chunks expire after 1 minute
@ -24,10 +28,6 @@ class HLSConfig(BaseConfig):
class TSConfig(BaseConfig):
"""Configuration settings for TS proxy"""
# Connection settings
CONNECTION_TIMEOUT = 10 # seconds to wait for initial connection
MAX_RETRIES = 3 # maximum connection retry attempts
# Buffer settings
INITIAL_BEHIND_CHUNKS = 4 # How many chunks behind to start a client (4 chunks = ~1MB)
CHUNK_BATCH_SIZE = 5 # How many chunks to fetch in one batch
@ -52,8 +52,7 @@ class TSConfig(BaseConfig):
# TS packets are 188 bytes
# Make chunk size a multiple of TS packet size for perfect alignment
# ~1MB is ideal for streaming (matches typical media buffer sizes)
BUFFER_CHUNK_SIZE = 188 * 1361 # ~256KB
# Maximum number of stream switch attempts before giving up
MAX_STREAM_SWITCHES = 10

View file

@ -70,3 +70,8 @@ class ConfigHelper:
def max_stream_switches():
"""Get maximum number of stream switch attempts"""
return ConfigHelper.get('MAX_STREAM_SWITCHES', 10)
@staticmethod
def retry_wait_interval():
"""Get wait interval between connection retries in seconds"""
return ConfigHelper.get('RETRY_WAIT_INTERVAL', 0.5) # Default to 0.5 second

View file

@ -94,7 +94,7 @@ class StreamBuffer:
writes_done += 1
if writes_done > 0:
logger.debug(f"Added {writes_done} optimized chunks ({self.target_chunk_size} bytes each) to Redis")
logger.debug(f"Added {writes_done} chunks ({self.target_chunk_size} bytes each) to Redis for channel {self.channel_id} at index {self.index}")
return True

View file

@ -34,41 +34,54 @@ def generate_stream_url(channel_id: str) -> Tuple[str, str, bool, Optional[int]]
Returns:
Tuple[str, str, bool, Optional[int]]: (stream_url, user_agent, transcode_flag, profile_id)
"""
# Get channel and related objects
channel = get_stream_object(channel_id)
stream_id, profile_id = channel.get_stream()
try:
channel = get_stream_object(channel_id)
if stream_id is None or profile_id is None:
logger.error(f"No stream assigned to channel {channel_id}")
# Get stream and profile for this channel
# Note: get_stream now returns 3 values (stream_id, profile_id, error_reason)
stream_id, profile_id, error_reason = channel.get_stream()
if not stream_id or not profile_id:
logger.error(f"No stream available for channel {channel_id}: {error_reason}")
return None, None, False, None
# Look up the Stream and Profile objects
try:
stream = Stream.objects.get(id=stream_id)
profile = M3UAccountProfile.objects.get(id=profile_id)
except (Stream.DoesNotExist, M3UAccountProfile.DoesNotExist) as e:
logger.error(f"Error getting stream or profile: {e}")
return None, None, False, None
# Get the M3U account profile for URL pattern
m3u_profile = profile
# Get the appropriate user agent
m3u_account = M3UAccount.objects.get(id=m3u_profile.m3u_account.id)
stream_user_agent = UserAgent.objects.get(id=m3u_account.user_agent.id).user_agent
if stream_user_agent is None:
stream_user_agent = UserAgent.objects.get(id=CoreSettings.get_default_user_agent_id())
logger.debug(f"No user agent found for account, using default: {stream_user_agent}")
# Generate stream URL based on the selected profile
input_url = stream.url
stream_url = transform_url(input_url, m3u_profile.search_pattern, m3u_profile.replace_pattern)
# Check if transcoding is needed
stream_profile = channel.get_stream_profile()
if stream_profile.is_proxy() or stream_profile is None:
transcode = False
else:
transcode = True
stream_profile_id = stream_profile.id
return stream_url, stream_user_agent, transcode, stream_profile_id
except Exception as e:
logger.error(f"Error generating stream URL: {e}")
return None, None, False, None
# Get the M3U account profile for URL pattern
stream = get_object_or_404(Stream, pk=stream_id)
m3u_profile = get_object_or_404(M3UAccountProfile, pk=profile_id)
# Get the appropriate user agent
m3u_account = M3UAccount.objects.get(id=m3u_profile.m3u_account.id)
stream_user_agent = UserAgent.objects.get(id=m3u_account.user_agent.id).user_agent
if stream_user_agent is None:
stream_user_agent = UserAgent.objects.get(id=CoreSettings.get_default_user_agent_id())
logger.debug(f"No user agent found for account, using default: {stream_user_agent}")
# Generate stream URL based on the selected profile
input_url = stream.url
stream_url = transform_url(input_url, m3u_profile.search_pattern, m3u_profile.replace_pattern)
# Check if transcoding is needed
stream_profile = channel.get_stream_profile()
if stream_profile.is_proxy() or stream_profile is None:
transcode = False
else:
transcode = True
stream_profile_id = stream_profile.id
return stream_url, stream_user_agent, transcode, stream_profile_id
def transform_url(input_url: str, search_pattern: str, replace_pattern: str) -> str:
"""
Transform a URL using regex pattern replacement.
@ -140,9 +153,9 @@ def get_stream_info_for_switch(channel_id: str, target_stream_id: Optional[int]
# Use first available profile
m3u_profile_id = profiles.first().id
else:
stream_id, m3u_profile_id = channel.get_stream()
stream_id, m3u_profile_id, error_reason = channel.get_stream()
if stream_id is None or m3u_profile_id is None:
return {'error': 'No stream assigned to channel'}
return {'error': error_reason or 'No stream assigned to channel'}
# Get the stream and profile objects directly
stream = get_object_or_404(Stream, pk=stream_id)

View file

@ -86,8 +86,41 @@ def stream_ts(request, channel_id):
# Initialize the channel (but don't wait for completion)
logger.info(f"[{client_id}] Starting channel {channel_id} initialization")
# Use the utility function to get stream URL and settings
stream_url, stream_user_agent, transcode, profile_value = generate_stream_url(channel_id)
# Use max retry attempts and connection timeout from config
max_retries = ConfigHelper.max_retries()
retry_timeout = ConfigHelper.connection_timeout()
wait_start_time = time.time()
stream_url = None
stream_user_agent = None
transcode = False
profile_value = None
error_reason = None
# Try to get a stream with configured retries
for attempt in range(max_retries):
stream_url, stream_user_agent, transcode, profile_value = generate_stream_url(channel_id)
if stream_url is not None:
logger.info(f"[{client_id}] Successfully obtained stream for channel {channel_id}")
break
# If we failed because there are no streams assigned, don't retry
_, _, error_reason = channel.get_stream()
if error_reason and 'maximum connection limits' not in error_reason:
logger.warning(f"[{client_id}] Can't retry - error not related to connection limits: {error_reason}")
break
# Don't exceed the overall connection timeout
if time.time() - wait_start_time > retry_timeout:
logger.warning(f"[{client_id}] Connection wait timeout exceeded ({retry_timeout}s)")
break
# Wait before retrying (using exponential backoff with a cap)
wait_time = min(0.5 * (2 ** attempt), 2.0) # Caps at 2 seconds
logger.info(f"[{client_id}] Waiting {wait_time:.1f}s for a connection to become available (attempt {attempt+1}/{max_retries})")
time.sleep(wait_time)
if stream_url is None:
# Make sure to release any stream locks that might have been acquired
if hasattr(channel, 'streams') and channel.streams.exists():
@ -98,10 +131,16 @@ def stream_ts(request, channel_id):
except Exception as e:
logger.error(f"[{client_id}] Error releasing stream: {e}")
return JsonResponse({'error': 'No available streams for this channel'}, status=404)
# Get the specific error message if available
wait_duration = f"{int(time.time() - wait_start_time)}s"
error_msg = error_reason if error_reason else 'No available streams for this channel'
return JsonResponse({
'error': error_msg,
'waited': wait_duration
}, status=503) # 503 Service Unavailable is appropriate here
# Get the stream ID from the channel
stream_id, m3u_profile_id = channel.get_stream()
stream_id, m3u_profile_id, _ = channel.get_stream()
logger.info(f"Channel {channel_id} using stream ID {stream_id}, m3u account profile ID {m3u_profile_id}")
# Generate transcode command if needed