diff --git a/core/views.py b/core/views.py index 002dda60..97551af3 100644 --- a/core/views.py +++ b/core/views.py @@ -32,11 +32,8 @@ def settings_view(request): def stream_view(request, stream_id): """ Streams the first available stream for the given channel. - It uses the channel’s assigned StreamProfile with a fallback to core default + It uses the channel’s assigned StreamProfile. A persistent Redis lock is used to prevent concurrent streaming on the same channel. - Priority: - - iterate through all streams - - iterate through each stream's m3u profile """ try: redis_host = getattr(settings, "REDIS_HOST", "localhost") @@ -51,72 +48,59 @@ def stream_view(request, stream_id): logger.error("No streams found for channel ID=%s", channel.id) return HttpResponseServerError("No stream found for this channel.") + # Get the first available stream. + stream = channel.streams.first() + logger.debug("Using stream: ID=%s, Name=%s", stream.id, stream.name) + + # Retrieve the M3U account associated with the stream. + m3u_account = stream.m3u_account + logger.debug("Using M3U account ID=%s, Name=%s", m3u_account.id, m3u_account.name) + + # Use the custom URL if available; otherwise, use the standard URL. + input_url = stream.custom_url or stream.url + logger.debug("Input URL: %s", input_url) + + # Determine which profile we can use. + m3u_profiles = m3u_account.profiles.all() + default_profile = next((obj for obj in m3u_profiles if obj.is_default), None) + profiles = [obj for obj in m3u_profiles if not obj.is_default] + active_profile = None lock_key = None persistent_lock = None - - # iterate through channel's streams - for stream in channel.streams.all().order_by('channelstream__order'): - logger.debug(f"Checking stream: ID={stream.id}, Name={stream.name}") - - # Retrieve the M3U account associated with the stream. - m3u_account = stream.m3u_account - logger.debug(f"Using M3U account ID={m3u_account.id}, Name={m3u_account.name}") - - # Use the custom URL if available; otherwise, use the standard URL. - input_url = stream.custom_url or stream.url - logger.debug(f"Input URL: {input_url}") - - # Determine which profile we can use. - m3u_profiles = m3u_account.profiles.all() - default_profile = next((obj for obj in m3u_profiles if obj.is_default), None) - profiles = [obj for obj in m3u_profiles if not obj.is_default] - - # -- Loop through profiles and pick the first active one -- - for profile in [default_profile] + profiles: - logger.debug(f'Checking profile {profile.name}...') - if not profile.is_active: - logger.debug('Profile is not active, skipping.') - continue - - logger.debug(f'Profile has a max streams of {profile.max_streams}') - # Acquire the persistent Redis lock, indexed by 0 through max_streams available in the profile - stream_index = 0 - while True: - stream_index += 1 - if stream_index > profile.max_streams: - # @TODO: we are bailing here if no profile was found, but we need to end up supporting looping through - # all available channel streams - logger.debug(f"Profile is using all available streams.") - break - - lock_key = f"lock:{profile.id}:{stream_index}" - persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120) - - logger.debug(f'Attempting to acquire lock: {lock_key}') - if not persistent_lock.acquire(): - logger.error(f"Could not acquire persistent lock for profile {profile.id} index {stream_index}, currently in use.") - continue - - break - - if persistent_lock.has_lock: - break - - if persistent_lock.has_lock == False: - logger.debug(f'Unable to get lock for profile {profile.id}:{profile.name}. Skipping...') + # -- Loop through profiles and pick the first active one -- + for profile in [default_profile] + profiles: + logger.debug(f'Checking profile {profile.name}...') + if not profile.is_active: + logger.debug('Profile is not active, skipping.') continue - break + logger.debug(f'Profile has a max streams of {profile.max_streams}, checking if any are available') + stream_index = 0 + while stream_index < profile.max_streams: + stream_index += 1 - if persistent_lock.has_lock == False: - logger.debug(f"Unable to find any available streams or stream profiles.") - return HttpResponseServerError("Resource busy, please try again later.") + lock_key = f"lock:{profile.id}:{stream_index}" + persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120) + logger.debug(f'Attempting to acquire lock: {lock_key}') - # *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check *** - logger.debug(f"Using stream {stream.id}{stream.name}, M3U profile {profile.id}{profile.name}, stream index {stream_index}") - active_profile = M3UAccountProfile.objects.get(id=profile.id) + if not persistent_lock.acquire(): + logger.error(f"Could not acquire persistent lock for profile {profile.id} index {stream_index}, currently in use.") + persistent_lock = None + continue + break + + if persistent_lock is not None: + logger.debug(f'Successfully acquired lock: {lock_key}') + active_profile = M3UAccountProfile.objects.get(id=profile.id) + break + + if active_profile is None or persistent_lock is None: + logger.exception("No available profiles for the stream") + return HttpResponseServerError("No available profiles for the stream") + + logger.debug(f"Using M3U profile ID={active_profile.id} (ignoring viewer count limits)") # Prepare the pattern replacement. logger.debug("Executing the following pattern replacement:") logger.debug(f" search: {active_profile.search_pattern}") @@ -148,7 +132,7 @@ def stream_view(request, stream_id): try: # Start the streaming process. - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192) + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except Exception as e: persistent_lock.release() # Ensure the lock is released on error. logger.exception("Error starting stream for channel ID=%s", stream_id) @@ -167,7 +151,6 @@ def stream_view(request, stream_id): yield chunk finally: try: - proc.terminate() logger.debug("Streaming process terminated for stream ID=%s", s.id) except Exception as e: @@ -175,7 +158,6 @@ def stream_view(request, stream_id): persistent_lock.release() logger.debug("Persistent lock released for channel ID=%s", channel.id) - return StreamingHttpResponse( stream_generator(process, stream, persistent_lock), content_type="video/MP2T"