From 3e3f4c85a16d576533f23dbd7df37b9bd7c186df Mon Sep 17 00:00:00 2001 From: dekzter Date: Sun, 2 Mar 2025 14:56:41 -0500 Subject: [PATCH] multi stream support --- apps/accounts/migrations/0001_initial.py | 2 +- apps/channels/api_views.py | 2 + apps/channels/migrations/0001_initial.py | 38 +++++-- apps/channels/models.py | 11 +- apps/channels/serializers.py | 42 +++++++- apps/dashboard/migrations/0001_initial.py | 2 +- apps/epg/migrations/0001_initial.py | 2 +- apps/hdhr/migrations/0001_initial.py | 2 +- apps/m3u/migrations/0001_initial.py | 2 +- core/migrations/0001_initial.py | 2 +- core/views.py | 126 +++++++++++++--------- dispatcharr/persistent_lock.py | 12 ++- frontend/public/index.html | 4 +- 13 files changed, 174 insertions(+), 73 deletions(-) diff --git a/apps/accounts/migrations/0001_initial.py b/apps/accounts/migrations/0001_initial.py index 4ce00e01..bc92ebe6 100644 --- a/apps/accounts/migrations/0001_initial.py +++ b/apps/accounts/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.6 on 2025-03-02 00:01 +# Generated by Django 5.1.6 on 2025-03-02 13:52 import django.contrib.auth.models import django.contrib.auth.validators diff --git a/apps/channels/api_views.py b/apps/channels/api_views.py index 75772509..d1858165 100644 --- a/apps/channels/api_views.py +++ b/apps/channels/api_views.py @@ -135,6 +135,7 @@ class ChannelViewSet(viewsets.ModelViewSet): 'tvg_id': stream.tvg_id, 'channel_group_id': channel_group.id, 'logo_url': stream.logo_url, + 'streams': [stream_id] } serializer = self.get_serializer(data=channel_data) serializer.is_valid(raise_exception=True) @@ -226,6 +227,7 @@ class ChannelViewSet(viewsets.ModelViewSet): "tvg_id": stream.tvg_id, "channel_group_id": channel_group.id, "logo_url": stream.logo_url, + "streams": [stream_id], } serializer = self.get_serializer(data=channel_data) if serializer.is_valid(): diff --git a/apps/channels/migrations/0001_initial.py b/apps/channels/migrations/0001_initial.py index 2e3990e2..8401450e 100644 --- a/apps/channels/migrations/0001_initial.py +++ b/apps/channels/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.6 on 2025-03-02 00:01 +# Generated by Django 5.1.6 on 2025-03-02 13:52 import django.db.models.deletion from django.db import migrations, models @@ -21,6 +21,20 @@ class Migration(migrations.Migration): ('name', models.CharField(max_length=100, unique=True)), ], ), + migrations.CreateModel( + name='Channel', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('channel_number', models.IntegerField()), + ('channel_name', models.CharField(max_length=255)), + ('logo_url', models.URLField(blank=True, max_length=2000, null=True)), + ('logo_file', models.ImageField(blank=True, null=True, upload_to='logos/')), + ('tvg_id', models.CharField(blank=True, max_length=255, null=True)), + ('tvg_name', models.CharField(blank=True, max_length=255, null=True)), + ('stream_profile', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='core.streamprofile')), + ('channel_group', models.ForeignKey(blank=True, help_text='Channel group this channel belongs to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='channels.channelgroup')), + ], + ), migrations.CreateModel( name='Stream', fields=[ @@ -44,18 +58,20 @@ class Migration(migrations.Migration): }, ), migrations.CreateModel( - name='Channel', + name='ChannelStream', fields=[ ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('channel_number', models.IntegerField()), - ('channel_name', models.CharField(max_length=255)), - ('logo_url', models.URLField(blank=True, max_length=2000, null=True)), - ('logo_file', models.ImageField(blank=True, null=True, upload_to='logos/')), - ('tvg_id', models.CharField(blank=True, max_length=255, null=True)), - ('tvg_name', models.CharField(blank=True, max_length=255, null=True)), - ('stream_profile', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='core.streamprofile')), - ('channel_group', models.ForeignKey(blank=True, help_text='Channel group this channel belongs to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='channels.channelgroup')), - ('streams', models.ManyToManyField(blank=True, related_name='channels', to='channels.stream')), + ('order', models.PositiveIntegerField(default=0)), + ('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='channels.channel')), + ('stream', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='channels.stream')), ], + options={ + 'ordering': ['order'], + }, + ), + migrations.AddField( + model_name='channel', + name='streams', + field=models.ManyToManyField(blank=True, related_name='channels', through='channels.ChannelStream', to='channels.stream'), ), ] diff --git a/apps/channels/models.py b/apps/channels/models.py index b965e32d..bd4045ba 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -61,6 +61,7 @@ class Channel(models.Model): streams = models.ManyToManyField( Stream, blank=True, + through='ChannelStream', related_name='channels' ) @@ -84,7 +85,7 @@ class Channel(models.Model): related_name='channels' ) - + def clean(self): # Enforce unique channel_number within a given group existing = Channel.objects.filter( @@ -109,3 +110,11 @@ class ChannelGroup(models.Model): def __str__(self): return self.name + +class ChannelStream(models.Model): + channel = models.ForeignKey(Channel, on_delete=models.CASCADE) + stream = models.ForeignKey(Stream, on_delete=models.CASCADE) + order = models.PositiveIntegerField(default=0) # Ordering field + + class Meta: + ordering = ['order'] # Ensure streams are retrieved in order diff --git a/apps/channels/serializers.py b/apps/channels/serializers.py index 0e4fba76..a0dcb6a6 100644 --- a/apps/channels/serializers.py +++ b/apps/channels/serializers.py @@ -1,5 +1,5 @@ from rest_framework import serializers -from .models import Stream, Channel, ChannelGroup +from .models import Stream, Channel, ChannelGroup, ChannelStream from core.models import StreamProfile # @@ -73,8 +73,10 @@ class ChannelSerializer(serializers.ModelSerializer): required=False ) - # Possibly show streams inline, or just by ID - # streams = StreamSerializer(many=True, read_only=True) + streams = serializers.ListField( + child=serializers.IntegerField(), write_only=True + ) + stream_ids = serializers.SerializerMethodField() class Meta: model = Channel @@ -89,5 +91,39 @@ class ChannelSerializer(serializers.ModelSerializer): 'tvg_id', 'tvg_name', 'streams', + 'stream_ids', 'stream_profile_id', ] + + def get_stream_ids(self, obj): + """Retrieve ordered stream IDs for GET requests.""" + return list(obj.streams.all().order_by('channelstream__order').values_list('id', flat=True)) + + def create(self, validated_data): + stream_ids = validated_data.pop('streams', []) + channel = Channel.objects.create(**validated_data) + + # Add streams in the specified order + for index, stream_id in enumerate(stream_ids): + ChannelStream.objects.create(channel=channel, stream_id=stream_id, order=index) + + return channel + + def update(self, instance, validated_data): + print("Validated Data:", validated_data) + stream_ids = validated_data.get('streams', None) + print(f'stream ids: {stream_ids}') + + # Update basic fields + instance.name = validated_data.get('channel_name', instance.channel_name) + instance.save() + + if stream_ids is not None: + # Clear existing relationships + instance.channelstream_set.all().delete() + + # Add new streams in order + for index, stream_id in enumerate(stream_ids): + ChannelStream.objects.create(channel=instance, stream_id=stream_id, order=index) + + return instance diff --git a/apps/dashboard/migrations/0001_initial.py b/apps/dashboard/migrations/0001_initial.py index 1ce7b141..9c39a3b7 100644 --- a/apps/dashboard/migrations/0001_initial.py +++ b/apps/dashboard/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.6 on 2025-03-02 00:01 +# Generated by Django 5.1.6 on 2025-03-02 13:52 from django.db import migrations, models diff --git a/apps/epg/migrations/0001_initial.py b/apps/epg/migrations/0001_initial.py index dfb9c0f0..9454d514 100644 --- a/apps/epg/migrations/0001_initial.py +++ b/apps/epg/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.6 on 2025-03-02 00:01 +# Generated by Django 5.1.6 on 2025-03-02 13:52 import django.db.models.deletion from django.db import migrations, models diff --git a/apps/hdhr/migrations/0001_initial.py b/apps/hdhr/migrations/0001_initial.py index 826c036d..54ad7c8c 100644 --- a/apps/hdhr/migrations/0001_initial.py +++ b/apps/hdhr/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.6 on 2025-03-02 00:01 +# Generated by Django 5.1.6 on 2025-03-02 13:52 from django.db import migrations, models diff --git a/apps/m3u/migrations/0001_initial.py b/apps/m3u/migrations/0001_initial.py index c78afaa1..eb92f063 100644 --- a/apps/m3u/migrations/0001_initial.py +++ b/apps/m3u/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.6 on 2025-03-02 00:01 +# Generated by Django 5.1.6 on 2025-03-02 13:52 import django.db.models.deletion from django.db import migrations, models diff --git a/core/migrations/0001_initial.py b/core/migrations/0001_initial.py index 5ddb01cb..79757ec4 100644 --- a/core/migrations/0001_initial.py +++ b/core/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.6 on 2025-03-02 00:01 +# Generated by Django 5.1.6 on 2025-03-02 13:52 from django.db import migrations, models diff --git a/core/views.py b/core/views.py index 0ec84bde..59d69ee0 100644 --- a/core/views.py +++ b/core/views.py @@ -32,10 +32,16 @@ def settings_view(request): def stream_view(request, stream_id): """ Streams the first available stream for the given channel. - It uses the channel’s assigned StreamProfile. + It uses the channel’s assigned StreamProfile with a fallback to core default A persistent Redis lock is used to prevent concurrent streaming on the same channel. + Priority: + - iterate through all streams + - iterate through each stream's m3u profile """ try: + redis_host = getattr(settings, "REDIS_HOST", "localhost") + redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0) + # Retrieve the channel by the provided stream_id. channel = Channel.objects.get(channel_number=stream_id) logger.debug("Channel retrieved: ID=%s, Name=%s", channel.id, channel.channel_name) @@ -45,46 +51,78 @@ def stream_view(request, stream_id): logger.error("No streams found for channel ID=%s", channel.id) return HttpResponseServerError("No stream found for this channel.") - # Get the first available stream. - stream = channel.streams.first() - logger.debug("Using stream: ID=%s, Name=%s", stream.id, stream.name) - - # Retrieve the M3U account associated with the stream. - m3u_account = stream.m3u_account - logger.debug("Using M3U account ID=%s, Name=%s", m3u_account.id, m3u_account.name) - - # Use the custom URL if available; otherwise, use the standard URL. - input_url = stream.custom_url or stream.url - logger.debug("Input URL: %s", input_url) - - # Determine which profile we can use. - m3u_profiles = m3u_account.profiles.all() - default_profile = next((obj for obj in m3u_profiles if obj.is_default), None) - profiles = [obj for obj in m3u_profiles if not obj.is_default] - active_profile = None - # -- Loop through profiles and pick the first active one -- - for profile in [default_profile] + profiles: - logger.debug(f'Checking profile {profile.name}...') - if not profile.is_active: - logger.debug('Profile is not active, skipping.') + lock_key = None + persistent_lock = None + + # iterate through channel's streams + for stream in channel.streams.all().order_by('channelstream__order'): + logger.debug(f"Checking stream: ID={stream.id}, Name={stream.name}") + + # Retrieve the M3U account associated with the stream. + m3u_account = stream.m3u_account + logger.debug(f"Using M3U account ID={m3u_account.id}, Name={m3u_account.name}") + + # Use the custom URL if available; otherwise, use the standard URL. + input_url = stream.custom_url or stream.url + logger.debug(f"Input URL: {input_url}") + + # Determine which profile we can use. + m3u_profiles = m3u_account.profiles.all() + default_profile = next((obj for obj in m3u_profiles if obj.is_default), None) + profiles = [obj for obj in m3u_profiles if not obj.is_default] + + # -- Loop through profiles and pick the first active one -- + for profile in [default_profile] + profiles: + logger.debug(f'Checking profile {profile.name}...') + if not profile.is_active: + logger.debug('Profile is not active, skipping.') + continue + + # Acquire the persistent Redis lock, indexed by 0 through max_streams available in the profile + stream_index = 0 + while True: + stream_index += 1 + if stream_index > profile.max_streams: + # @TODO: we are bailing here if no profile was found, but we need to end up supporting looping through + # all available channel streams + logger.debug(f"Profile is using all available streams.") + break + + lock_key = f"lock:{channel.id}:{stream.id}:{profile.id}:{stream_index}" + persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120) + + if not persistent_lock.acquire(): + logger.error(f"Could not acquire persistent lock for profile {profile.id} index {stream_index}, currently in use.") + continue + + break + + if persistent_lock.has_lock: + break + + if persistent_lock.has_lock == False: + logger.debug(f'Unable to get lock for profile {profile.id}:{profile.name}. Skipping...') continue - # *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check *** - logger.debug(f"Using M3U profile ID={profile.id} (ignoring viewer count limits)") - active_profile = M3UAccountProfile.objects.get(id=profile.id) - # Prepare the pattern replacement. - logger.debug("Executing the following pattern replacement:") - logger.debug(f" search: {profile.search_pattern}") - safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', profile.replace_pattern) - logger.debug(f" replace: {profile.replace_pattern}") - logger.debug(f" safe replace: {safe_replace_pattern}") - stream_url = re.sub(profile.search_pattern, safe_replace_pattern, input_url) - logger.debug(f"Generated stream url: {stream_url}") + break - if active_profile is None: - logger.exception("No available profiles for the stream") - return HttpResponseServerError("No available profiles for the stream") + if persistent_lock.has_lock == False: + logger.debug(f"Unable to find any available streams or stream profiles.") + return HttpResponseServerError("Resource busy, please try again later.") + + # *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check *** + logger.debug(f"Using stream {stream.id}{stream.name}, M3U profile {profile.id}{profile.name}, stream index {stream_index}") + active_profile = M3UAccountProfile.objects.get(id=profile.id) + + # Prepare the pattern replacement. + logger.debug("Executing the following pattern replacement:") + logger.debug(f" search: {active_profile.search_pattern}") + safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', active_profile.replace_pattern) + logger.debug(f" replace: {active_profile.replace_pattern}") + logger.debug(f" safe replace: {safe_replace_pattern}") + stream_url = re.sub(active_profile.search_pattern, safe_replace_pattern, input_url) + logger.debug(f"Generated stream url: {stream_url}") # Get the stream profile set on the channel. stream_profile = channel.stream_profile @@ -106,19 +144,9 @@ def stream_view(request, stream_id): cmd = [stream_profile.command] + parameters.split() logger.debug("Executing command: %s", cmd) - # Acquire the persistent Redis lock. - redis_host = getattr(settings, "REDIS_HOST", "localhost") - redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0) - lock_key = f"lock:channel:{channel.id}" - persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120) - - if not persistent_lock.acquire(): - logger.error("Could not acquire persistent lock for channel %s", channel.id) - return HttpResponseServerError("Resource busy, please try again later.") - try: # Start the streaming process. - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192) except Exception as e: persistent_lock.release() # Ensure the lock is released on error. logger.exception("Error starting stream for channel ID=%s", stream_id) @@ -137,6 +165,7 @@ def stream_view(request, stream_id): yield chunk finally: try: + proc.terminate() logger.debug("Streaming process terminated for stream ID=%s", s.id) except Exception as e: @@ -144,6 +173,7 @@ def stream_view(request, stream_id): persistent_lock.release() logger.debug("Persistent lock released for channel ID=%s", channel.id) + return StreamingHttpResponse( stream_generator(process, stream, persistent_lock), content_type="video/MP2T" diff --git a/dispatcharr/persistent_lock.py b/dispatcharr/persistent_lock.py index 3df2b650..360c9b5d 100644 --- a/dispatcharr/persistent_lock.py +++ b/dispatcharr/persistent_lock.py @@ -5,7 +5,7 @@ import redis class PersistentLock: """ A persistent, auto-expiring lock that uses Redis. - + Usage: 1. Instantiate with a Redis client, a unique lock key (e.g. "lock:account:123"), and an optional timeout (in seconds). @@ -16,7 +16,7 @@ class PersistentLock: def __init__(self, redis_client: redis.Redis, lock_key: str, lock_timeout: int = 120): """ Initialize the lock. - + :param redis_client: An instance of redis.Redis. :param lock_key: The unique key for the lock. :param lock_timeout: Time-to-live for the lock in seconds. @@ -25,6 +25,10 @@ class PersistentLock: self.lock_key = lock_key self.lock_timeout = lock_timeout self.lock_token = None + self.has_lock = False + + def has_lock(self) -> bool: + return self.has_lock def acquire(self) -> bool: """ @@ -33,6 +37,9 @@ class PersistentLock: self.lock_token = str(uuid.uuid4()) # Set the lock with NX (only if not exists) and EX (expire time) result = self.redis_client.set(self.lock_key, self.lock_token, nx=True, ex=self.lock_timeout) + if result is not None: + self.has_lock = True + return result is not None def refresh(self) -> bool: @@ -43,6 +50,7 @@ class PersistentLock: current_value = self.redis_client.get(self.lock_key) if current_value and current_value.decode("utf-8") == self.lock_token: self.redis_client.expire(self.lock_key, self.lock_timeout) + self.has_lock = False return True return False diff --git a/frontend/public/index.html b/frontend/public/index.html index de1fb039..5e0e89a0 100644 --- a/frontend/public/index.html +++ b/frontend/public/index.html @@ -10,7 +10,7 @@