diff --git a/apps/accounts/migrations/0001_initial.py b/apps/accounts/migrations/0001_initial.py
index 4ce00e01..bc92ebe6 100644
--- a/apps/accounts/migrations/0001_initial.py
+++ b/apps/accounts/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 5.1.6 on 2025-03-02 00:01
+# Generated by Django 5.1.6 on 2025-03-02 13:52
import django.contrib.auth.models
import django.contrib.auth.validators
diff --git a/apps/channels/api_views.py b/apps/channels/api_views.py
index 75772509..d1858165 100644
--- a/apps/channels/api_views.py
+++ b/apps/channels/api_views.py
@@ -135,6 +135,7 @@ class ChannelViewSet(viewsets.ModelViewSet):
'tvg_id': stream.tvg_id,
'channel_group_id': channel_group.id,
'logo_url': stream.logo_url,
+ 'streams': [stream_id]
}
serializer = self.get_serializer(data=channel_data)
serializer.is_valid(raise_exception=True)
@@ -226,6 +227,7 @@ class ChannelViewSet(viewsets.ModelViewSet):
"tvg_id": stream.tvg_id,
"channel_group_id": channel_group.id,
"logo_url": stream.logo_url,
+ "streams": [stream_id],
}
serializer = self.get_serializer(data=channel_data)
if serializer.is_valid():
diff --git a/apps/channels/migrations/0001_initial.py b/apps/channels/migrations/0001_initial.py
index 2e3990e2..8401450e 100644
--- a/apps/channels/migrations/0001_initial.py
+++ b/apps/channels/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 5.1.6 on 2025-03-02 00:01
+# Generated by Django 5.1.6 on 2025-03-02 13:52
import django.db.models.deletion
from django.db import migrations, models
@@ -21,6 +21,20 @@ class Migration(migrations.Migration):
('name', models.CharField(max_length=100, unique=True)),
],
),
+ migrations.CreateModel(
+ name='Channel',
+ fields=[
+ ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('channel_number', models.IntegerField()),
+ ('channel_name', models.CharField(max_length=255)),
+ ('logo_url', models.URLField(blank=True, max_length=2000, null=True)),
+ ('logo_file', models.ImageField(blank=True, null=True, upload_to='logos/')),
+ ('tvg_id', models.CharField(blank=True, max_length=255, null=True)),
+ ('tvg_name', models.CharField(blank=True, max_length=255, null=True)),
+ ('stream_profile', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='core.streamprofile')),
+ ('channel_group', models.ForeignKey(blank=True, help_text='Channel group this channel belongs to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='channels.channelgroup')),
+ ],
+ ),
migrations.CreateModel(
name='Stream',
fields=[
@@ -44,18 +58,20 @@ class Migration(migrations.Migration):
},
),
migrations.CreateModel(
- name='Channel',
+ name='ChannelStream',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
- ('channel_number', models.IntegerField()),
- ('channel_name', models.CharField(max_length=255)),
- ('logo_url', models.URLField(blank=True, max_length=2000, null=True)),
- ('logo_file', models.ImageField(blank=True, null=True, upload_to='logos/')),
- ('tvg_id', models.CharField(blank=True, max_length=255, null=True)),
- ('tvg_name', models.CharField(blank=True, max_length=255, null=True)),
- ('stream_profile', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='core.streamprofile')),
- ('channel_group', models.ForeignKey(blank=True, help_text='Channel group this channel belongs to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='channels.channelgroup')),
- ('streams', models.ManyToManyField(blank=True, related_name='channels', to='channels.stream')),
+ ('order', models.PositiveIntegerField(default=0)),
+ ('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='channels.channel')),
+ ('stream', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='channels.stream')),
],
+ options={
+ 'ordering': ['order'],
+ },
+ ),
+ migrations.AddField(
+ model_name='channel',
+ name='streams',
+ field=models.ManyToManyField(blank=True, related_name='channels', through='channels.ChannelStream', to='channels.stream'),
),
]
diff --git a/apps/channels/models.py b/apps/channels/models.py
index b965e32d..bd4045ba 100644
--- a/apps/channels/models.py
+++ b/apps/channels/models.py
@@ -61,6 +61,7 @@ class Channel(models.Model):
streams = models.ManyToManyField(
Stream,
blank=True,
+ through='ChannelStream',
related_name='channels'
)
@@ -84,7 +85,7 @@ class Channel(models.Model):
related_name='channels'
)
-
+
def clean(self):
# Enforce unique channel_number within a given group
existing = Channel.objects.filter(
@@ -109,3 +110,11 @@ class ChannelGroup(models.Model):
def __str__(self):
return self.name
+
+class ChannelStream(models.Model):
+ channel = models.ForeignKey(Channel, on_delete=models.CASCADE)
+ stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
+ order = models.PositiveIntegerField(default=0) # Ordering field
+
+ class Meta:
+ ordering = ['order'] # Ensure streams are retrieved in order
diff --git a/apps/channels/serializers.py b/apps/channels/serializers.py
index 0e4fba76..a0dcb6a6 100644
--- a/apps/channels/serializers.py
+++ b/apps/channels/serializers.py
@@ -1,5 +1,5 @@
from rest_framework import serializers
-from .models import Stream, Channel, ChannelGroup
+from .models import Stream, Channel, ChannelGroup, ChannelStream
from core.models import StreamProfile
#
@@ -73,8 +73,10 @@ class ChannelSerializer(serializers.ModelSerializer):
required=False
)
- # Possibly show streams inline, or just by ID
- # streams = StreamSerializer(many=True, read_only=True)
+ streams = serializers.ListField(
+ child=serializers.IntegerField(), write_only=True
+ )
+ stream_ids = serializers.SerializerMethodField()
class Meta:
model = Channel
@@ -89,5 +91,39 @@ class ChannelSerializer(serializers.ModelSerializer):
'tvg_id',
'tvg_name',
'streams',
+ 'stream_ids',
'stream_profile_id',
]
+
+ def get_stream_ids(self, obj):
+ """Retrieve ordered stream IDs for GET requests."""
+ return list(obj.streams.all().order_by('channelstream__order').values_list('id', flat=True))
+
+ def create(self, validated_data):
+ stream_ids = validated_data.pop('streams', [])
+ channel = Channel.objects.create(**validated_data)
+
+ # Add streams in the specified order
+ for index, stream_id in enumerate(stream_ids):
+ ChannelStream.objects.create(channel=channel, stream_id=stream_id, order=index)
+
+ return channel
+
+ def update(self, instance, validated_data):
+ print("Validated Data:", validated_data)
+ stream_ids = validated_data.get('streams', None)
+ print(f'stream ids: {stream_ids}')
+
+ # Update basic fields
+ instance.name = validated_data.get('channel_name', instance.channel_name)
+ instance.save()
+
+ if stream_ids is not None:
+ # Clear existing relationships
+ instance.channelstream_set.all().delete()
+
+ # Add new streams in order
+ for index, stream_id in enumerate(stream_ids):
+ ChannelStream.objects.create(channel=instance, stream_id=stream_id, order=index)
+
+ return instance
diff --git a/apps/dashboard/migrations/0001_initial.py b/apps/dashboard/migrations/0001_initial.py
index 1ce7b141..9c39a3b7 100644
--- a/apps/dashboard/migrations/0001_initial.py
+++ b/apps/dashboard/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 5.1.6 on 2025-03-02 00:01
+# Generated by Django 5.1.6 on 2025-03-02 13:52
from django.db import migrations, models
diff --git a/apps/epg/migrations/0001_initial.py b/apps/epg/migrations/0001_initial.py
index dfb9c0f0..9454d514 100644
--- a/apps/epg/migrations/0001_initial.py
+++ b/apps/epg/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 5.1.6 on 2025-03-02 00:01
+# Generated by Django 5.1.6 on 2025-03-02 13:52
import django.db.models.deletion
from django.db import migrations, models
diff --git a/apps/hdhr/migrations/0001_initial.py b/apps/hdhr/migrations/0001_initial.py
index 826c036d..54ad7c8c 100644
--- a/apps/hdhr/migrations/0001_initial.py
+++ b/apps/hdhr/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 5.1.6 on 2025-03-02 00:01
+# Generated by Django 5.1.6 on 2025-03-02 13:52
from django.db import migrations, models
diff --git a/apps/m3u/migrations/0001_initial.py b/apps/m3u/migrations/0001_initial.py
index c78afaa1..eb92f063 100644
--- a/apps/m3u/migrations/0001_initial.py
+++ b/apps/m3u/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 5.1.6 on 2025-03-02 00:01
+# Generated by Django 5.1.6 on 2025-03-02 13:52
import django.db.models.deletion
from django.db import migrations, models
diff --git a/core/migrations/0001_initial.py b/core/migrations/0001_initial.py
index 5ddb01cb..79757ec4 100644
--- a/core/migrations/0001_initial.py
+++ b/core/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 5.1.6 on 2025-03-02 00:01
+# Generated by Django 5.1.6 on 2025-03-02 13:52
from django.db import migrations, models
diff --git a/core/views.py b/core/views.py
index 0ec84bde..59d69ee0 100644
--- a/core/views.py
+++ b/core/views.py
@@ -32,10 +32,16 @@ def settings_view(request):
def stream_view(request, stream_id):
"""
Streams the first available stream for the given channel.
- It uses the channel’s assigned StreamProfile.
+ It uses the channel’s assigned StreamProfile with a fallback to core default
A persistent Redis lock is used to prevent concurrent streaming on the same channel.
+ Priority:
+ - iterate through all streams
+ - iterate through each stream's m3u profile
"""
try:
+ redis_host = getattr(settings, "REDIS_HOST", "localhost")
+ redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0)
+
# Retrieve the channel by the provided stream_id.
channel = Channel.objects.get(channel_number=stream_id)
logger.debug("Channel retrieved: ID=%s, Name=%s", channel.id, channel.channel_name)
@@ -45,46 +51,78 @@ def stream_view(request, stream_id):
logger.error("No streams found for channel ID=%s", channel.id)
return HttpResponseServerError("No stream found for this channel.")
- # Get the first available stream.
- stream = channel.streams.first()
- logger.debug("Using stream: ID=%s, Name=%s", stream.id, stream.name)
-
- # Retrieve the M3U account associated with the stream.
- m3u_account = stream.m3u_account
- logger.debug("Using M3U account ID=%s, Name=%s", m3u_account.id, m3u_account.name)
-
- # Use the custom URL if available; otherwise, use the standard URL.
- input_url = stream.custom_url or stream.url
- logger.debug("Input URL: %s", input_url)
-
- # Determine which profile we can use.
- m3u_profiles = m3u_account.profiles.all()
- default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
- profiles = [obj for obj in m3u_profiles if not obj.is_default]
-
active_profile = None
- # -- Loop through profiles and pick the first active one --
- for profile in [default_profile] + profiles:
- logger.debug(f'Checking profile {profile.name}...')
- if not profile.is_active:
- logger.debug('Profile is not active, skipping.')
+ lock_key = None
+ persistent_lock = None
+
+ # iterate through channel's streams
+ for stream in channel.streams.all().order_by('channelstream__order'):
+ logger.debug(f"Checking stream: ID={stream.id}, Name={stream.name}")
+
+ # Retrieve the M3U account associated with the stream.
+ m3u_account = stream.m3u_account
+ logger.debug(f"Using M3U account ID={m3u_account.id}, Name={m3u_account.name}")
+
+ # Use the custom URL if available; otherwise, use the standard URL.
+ input_url = stream.custom_url or stream.url
+ logger.debug(f"Input URL: {input_url}")
+
+ # Determine which profile we can use.
+ m3u_profiles = m3u_account.profiles.all()
+ default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
+ profiles = [obj for obj in m3u_profiles if not obj.is_default]
+
+ # -- Loop through profiles and pick the first active one --
+ for profile in [default_profile] + profiles:
+ logger.debug(f'Checking profile {profile.name}...')
+ if not profile.is_active:
+ logger.debug('Profile is not active, skipping.')
+ continue
+
+ # Acquire the persistent Redis lock, indexed by 0 through max_streams available in the profile
+ stream_index = 0
+ while True:
+ stream_index += 1
+ if stream_index > profile.max_streams:
+ # @TODO: we are bailing here if no profile was found, but we need to end up supporting looping through
+ # all available channel streams
+ logger.debug(f"Profile is using all available streams.")
+ break
+
+ lock_key = f"lock:{channel.id}:{stream.id}:{profile.id}:{stream_index}"
+ persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120)
+
+ if not persistent_lock.acquire():
+ logger.error(f"Could not acquire persistent lock for profile {profile.id} index {stream_index}, currently in use.")
+ continue
+
+ break
+
+ if persistent_lock.has_lock:
+ break
+
+ if persistent_lock.has_lock == False:
+ logger.debug(f'Unable to get lock for profile {profile.id}:{profile.name}. Skipping...')
continue
- # *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check ***
- logger.debug(f"Using M3U profile ID={profile.id} (ignoring viewer count limits)")
- active_profile = M3UAccountProfile.objects.get(id=profile.id)
- # Prepare the pattern replacement.
- logger.debug("Executing the following pattern replacement:")
- logger.debug(f" search: {profile.search_pattern}")
- safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', profile.replace_pattern)
- logger.debug(f" replace: {profile.replace_pattern}")
- logger.debug(f" safe replace: {safe_replace_pattern}")
- stream_url = re.sub(profile.search_pattern, safe_replace_pattern, input_url)
- logger.debug(f"Generated stream url: {stream_url}")
+
break
- if active_profile is None:
- logger.exception("No available profiles for the stream")
- return HttpResponseServerError("No available profiles for the stream")
+ if persistent_lock.has_lock == False:
+ logger.debug(f"Unable to find any available streams or stream profiles.")
+ return HttpResponseServerError("Resource busy, please try again later.")
+
+ # *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check ***
+ logger.debug(f"Using stream {stream.id}{stream.name}, M3U profile {profile.id}{profile.name}, stream index {stream_index}")
+ active_profile = M3UAccountProfile.objects.get(id=profile.id)
+
+ # Prepare the pattern replacement.
+ logger.debug("Executing the following pattern replacement:")
+ logger.debug(f" search: {active_profile.search_pattern}")
+ safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', active_profile.replace_pattern)
+ logger.debug(f" replace: {active_profile.replace_pattern}")
+ logger.debug(f" safe replace: {safe_replace_pattern}")
+ stream_url = re.sub(active_profile.search_pattern, safe_replace_pattern, input_url)
+ logger.debug(f"Generated stream url: {stream_url}")
# Get the stream profile set on the channel.
stream_profile = channel.stream_profile
@@ -106,19 +144,9 @@ def stream_view(request, stream_id):
cmd = [stream_profile.command] + parameters.split()
logger.debug("Executing command: %s", cmd)
- # Acquire the persistent Redis lock.
- redis_host = getattr(settings, "REDIS_HOST", "localhost")
- redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0)
- lock_key = f"lock:channel:{channel.id}"
- persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120)
-
- if not persistent_lock.acquire():
- logger.error("Could not acquire persistent lock for channel %s", channel.id)
- return HttpResponseServerError("Resource busy, please try again later.")
-
try:
# Start the streaming process.
- process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192)
except Exception as e:
persistent_lock.release() # Ensure the lock is released on error.
logger.exception("Error starting stream for channel ID=%s", stream_id)
@@ -137,6 +165,7 @@ def stream_view(request, stream_id):
yield chunk
finally:
try:
+
proc.terminate()
logger.debug("Streaming process terminated for stream ID=%s", s.id)
except Exception as e:
@@ -144,6 +173,7 @@ def stream_view(request, stream_id):
persistent_lock.release()
logger.debug("Persistent lock released for channel ID=%s", channel.id)
+
return StreamingHttpResponse(
stream_generator(process, stream, persistent_lock),
content_type="video/MP2T"
diff --git a/dispatcharr/persistent_lock.py b/dispatcharr/persistent_lock.py
index 3df2b650..360c9b5d 100644
--- a/dispatcharr/persistent_lock.py
+++ b/dispatcharr/persistent_lock.py
@@ -5,7 +5,7 @@ import redis
class PersistentLock:
"""
A persistent, auto-expiring lock that uses Redis.
-
+
Usage:
1. Instantiate with a Redis client, a unique lock key (e.g. "lock:account:123"),
and an optional timeout (in seconds).
@@ -16,7 +16,7 @@ class PersistentLock:
def __init__(self, redis_client: redis.Redis, lock_key: str, lock_timeout: int = 120):
"""
Initialize the lock.
-
+
:param redis_client: An instance of redis.Redis.
:param lock_key: The unique key for the lock.
:param lock_timeout: Time-to-live for the lock in seconds.
@@ -25,6 +25,10 @@ class PersistentLock:
self.lock_key = lock_key
self.lock_timeout = lock_timeout
self.lock_token = None
+ self.has_lock = False
+
+ def has_lock(self) -> bool:
+ return self.has_lock
def acquire(self) -> bool:
"""
@@ -33,6 +37,9 @@ class PersistentLock:
self.lock_token = str(uuid.uuid4())
# Set the lock with NX (only if not exists) and EX (expire time)
result = self.redis_client.set(self.lock_key, self.lock_token, nx=True, ex=self.lock_timeout)
+ if result is not None:
+ self.has_lock = True
+
return result is not None
def refresh(self) -> bool:
@@ -43,6 +50,7 @@ class PersistentLock:
current_value = self.redis_client.get(self.lock_key)
if current_value and current_value.decode("utf-8") == self.lock_token:
self.redis_client.expire(self.lock_key, self.lock_timeout)
+ self.has_lock = False
return True
return False
diff --git a/frontend/public/index.html b/frontend/public/index.html
index de1fb039..5e0e89a0 100644
--- a/frontend/public/index.html
+++ b/frontend/public/index.html
@@ -10,7 +10,7 @@