mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
Merge pull request #12 from Dispatcharr/multi-stream
multi stream support
This commit is contained in:
commit
fe4976fa72
13 changed files with 174 additions and 73 deletions
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-02 00:01
|
||||
# Generated by Django 5.1.6 on 2025-03-02 13:52
|
||||
|
||||
import django.contrib.auth.models
|
||||
import django.contrib.auth.validators
|
||||
|
|
|
|||
|
|
@ -135,6 +135,7 @@ class ChannelViewSet(viewsets.ModelViewSet):
|
|||
'tvg_id': stream.tvg_id,
|
||||
'channel_group_id': channel_group.id,
|
||||
'logo_url': stream.logo_url,
|
||||
'streams': [stream_id]
|
||||
}
|
||||
serializer = self.get_serializer(data=channel_data)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
|
|
@ -226,6 +227,7 @@ class ChannelViewSet(viewsets.ModelViewSet):
|
|||
"tvg_id": stream.tvg_id,
|
||||
"channel_group_id": channel_group.id,
|
||||
"logo_url": stream.logo_url,
|
||||
"streams": [stream_id],
|
||||
}
|
||||
serializer = self.get_serializer(data=channel_data)
|
||||
if serializer.is_valid():
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-02 00:01
|
||||
# Generated by Django 5.1.6 on 2025-03-02 13:52
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
|
@ -21,6 +21,20 @@ class Migration(migrations.Migration):
|
|||
('name', models.CharField(max_length=100, unique=True)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Channel',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('channel_number', models.IntegerField()),
|
||||
('channel_name', models.CharField(max_length=255)),
|
||||
('logo_url', models.URLField(blank=True, max_length=2000, null=True)),
|
||||
('logo_file', models.ImageField(blank=True, null=True, upload_to='logos/')),
|
||||
('tvg_id', models.CharField(blank=True, max_length=255, null=True)),
|
||||
('tvg_name', models.CharField(blank=True, max_length=255, null=True)),
|
||||
('stream_profile', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='core.streamprofile')),
|
||||
('channel_group', models.ForeignKey(blank=True, help_text='Channel group this channel belongs to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='channels.channelgroup')),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Stream',
|
||||
fields=[
|
||||
|
|
@ -44,18 +58,20 @@ class Migration(migrations.Migration):
|
|||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Channel',
|
||||
name='ChannelStream',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('channel_number', models.IntegerField()),
|
||||
('channel_name', models.CharField(max_length=255)),
|
||||
('logo_url', models.URLField(blank=True, max_length=2000, null=True)),
|
||||
('logo_file', models.ImageField(blank=True, null=True, upload_to='logos/')),
|
||||
('tvg_id', models.CharField(blank=True, max_length=255, null=True)),
|
||||
('tvg_name', models.CharField(blank=True, max_length=255, null=True)),
|
||||
('stream_profile', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='core.streamprofile')),
|
||||
('channel_group', models.ForeignKey(blank=True, help_text='Channel group this channel belongs to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='channels.channelgroup')),
|
||||
('streams', models.ManyToManyField(blank=True, related_name='channels', to='channels.stream')),
|
||||
('order', models.PositiveIntegerField(default=0)),
|
||||
('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='channels.channel')),
|
||||
('stream', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='channels.stream')),
|
||||
],
|
||||
options={
|
||||
'ordering': ['order'],
|
||||
},
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='channel',
|
||||
name='streams',
|
||||
field=models.ManyToManyField(blank=True, related_name='channels', through='channels.ChannelStream', to='channels.stream'),
|
||||
),
|
||||
]
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ class Channel(models.Model):
|
|||
streams = models.ManyToManyField(
|
||||
Stream,
|
||||
blank=True,
|
||||
through='ChannelStream',
|
||||
related_name='channels'
|
||||
)
|
||||
|
||||
|
|
@ -84,7 +85,7 @@ class Channel(models.Model):
|
|||
related_name='channels'
|
||||
)
|
||||
|
||||
|
||||
|
||||
def clean(self):
|
||||
# Enforce unique channel_number within a given group
|
||||
existing = Channel.objects.filter(
|
||||
|
|
@ -109,3 +110,11 @@ class ChannelGroup(models.Model):
|
|||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
class ChannelStream(models.Model):
|
||||
channel = models.ForeignKey(Channel, on_delete=models.CASCADE)
|
||||
stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
|
||||
order = models.PositiveIntegerField(default=0) # Ordering field
|
||||
|
||||
class Meta:
|
||||
ordering = ['order'] # Ensure streams are retrieved in order
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
from rest_framework import serializers
|
||||
from .models import Stream, Channel, ChannelGroup
|
||||
from .models import Stream, Channel, ChannelGroup, ChannelStream
|
||||
from core.models import StreamProfile
|
||||
|
||||
#
|
||||
|
|
@ -73,8 +73,10 @@ class ChannelSerializer(serializers.ModelSerializer):
|
|||
required=False
|
||||
)
|
||||
|
||||
# Possibly show streams inline, or just by ID
|
||||
# streams = StreamSerializer(many=True, read_only=True)
|
||||
streams = serializers.ListField(
|
||||
child=serializers.IntegerField(), write_only=True
|
||||
)
|
||||
stream_ids = serializers.SerializerMethodField()
|
||||
|
||||
class Meta:
|
||||
model = Channel
|
||||
|
|
@ -89,5 +91,39 @@ class ChannelSerializer(serializers.ModelSerializer):
|
|||
'tvg_id',
|
||||
'tvg_name',
|
||||
'streams',
|
||||
'stream_ids',
|
||||
'stream_profile_id',
|
||||
]
|
||||
|
||||
def get_stream_ids(self, obj):
|
||||
"""Retrieve ordered stream IDs for GET requests."""
|
||||
return list(obj.streams.all().order_by('channelstream__order').values_list('id', flat=True))
|
||||
|
||||
def create(self, validated_data):
|
||||
stream_ids = validated_data.pop('streams', [])
|
||||
channel = Channel.objects.create(**validated_data)
|
||||
|
||||
# Add streams in the specified order
|
||||
for index, stream_id in enumerate(stream_ids):
|
||||
ChannelStream.objects.create(channel=channel, stream_id=stream_id, order=index)
|
||||
|
||||
return channel
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
print("Validated Data:", validated_data)
|
||||
stream_ids = validated_data.get('streams', None)
|
||||
print(f'stream ids: {stream_ids}')
|
||||
|
||||
# Update basic fields
|
||||
instance.name = validated_data.get('channel_name', instance.channel_name)
|
||||
instance.save()
|
||||
|
||||
if stream_ids is not None:
|
||||
# Clear existing relationships
|
||||
instance.channelstream_set.all().delete()
|
||||
|
||||
# Add new streams in order
|
||||
for index, stream_id in enumerate(stream_ids):
|
||||
ChannelStream.objects.create(channel=instance, stream_id=stream_id, order=index)
|
||||
|
||||
return instance
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-02 00:01
|
||||
# Generated by Django 5.1.6 on 2025-03-02 13:52
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-02 00:01
|
||||
# Generated by Django 5.1.6 on 2025-03-02 13:52
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-02 00:01
|
||||
# Generated by Django 5.1.6 on 2025-03-02 13:52
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-02 00:01
|
||||
# Generated by Django 5.1.6 on 2025-03-02 13:52
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-02 00:01
|
||||
# Generated by Django 5.1.6 on 2025-03-02 13:52
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
|
|
|||
126
core/views.py
126
core/views.py
|
|
@ -32,10 +32,16 @@ def settings_view(request):
|
|||
def stream_view(request, stream_id):
|
||||
"""
|
||||
Streams the first available stream for the given channel.
|
||||
It uses the channel’s assigned StreamProfile.
|
||||
It uses the channel’s assigned StreamProfile with a fallback to core default
|
||||
A persistent Redis lock is used to prevent concurrent streaming on the same channel.
|
||||
Priority:
|
||||
- iterate through all streams
|
||||
- iterate through each stream's m3u profile
|
||||
"""
|
||||
try:
|
||||
redis_host = getattr(settings, "REDIS_HOST", "localhost")
|
||||
redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0)
|
||||
|
||||
# Retrieve the channel by the provided stream_id.
|
||||
channel = Channel.objects.get(channel_number=stream_id)
|
||||
logger.debug("Channel retrieved: ID=%s, Name=%s", channel.id, channel.channel_name)
|
||||
|
|
@ -45,46 +51,78 @@ def stream_view(request, stream_id):
|
|||
logger.error("No streams found for channel ID=%s", channel.id)
|
||||
return HttpResponseServerError("No stream found for this channel.")
|
||||
|
||||
# Get the first available stream.
|
||||
stream = channel.streams.first()
|
||||
logger.debug("Using stream: ID=%s, Name=%s", stream.id, stream.name)
|
||||
|
||||
# Retrieve the M3U account associated with the stream.
|
||||
m3u_account = stream.m3u_account
|
||||
logger.debug("Using M3U account ID=%s, Name=%s", m3u_account.id, m3u_account.name)
|
||||
|
||||
# Use the custom URL if available; otherwise, use the standard URL.
|
||||
input_url = stream.custom_url or stream.url
|
||||
logger.debug("Input URL: %s", input_url)
|
||||
|
||||
# Determine which profile we can use.
|
||||
m3u_profiles = m3u_account.profiles.all()
|
||||
default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
|
||||
profiles = [obj for obj in m3u_profiles if not obj.is_default]
|
||||
|
||||
active_profile = None
|
||||
# -- Loop through profiles and pick the first active one --
|
||||
for profile in [default_profile] + profiles:
|
||||
logger.debug(f'Checking profile {profile.name}...')
|
||||
if not profile.is_active:
|
||||
logger.debug('Profile is not active, skipping.')
|
||||
lock_key = None
|
||||
persistent_lock = None
|
||||
|
||||
# iterate through channel's streams
|
||||
for stream in channel.streams.all().order_by('channelstream__order'):
|
||||
logger.debug(f"Checking stream: ID={stream.id}, Name={stream.name}")
|
||||
|
||||
# Retrieve the M3U account associated with the stream.
|
||||
m3u_account = stream.m3u_account
|
||||
logger.debug(f"Using M3U account ID={m3u_account.id}, Name={m3u_account.name}")
|
||||
|
||||
# Use the custom URL if available; otherwise, use the standard URL.
|
||||
input_url = stream.custom_url or stream.url
|
||||
logger.debug(f"Input URL: {input_url}")
|
||||
|
||||
# Determine which profile we can use.
|
||||
m3u_profiles = m3u_account.profiles.all()
|
||||
default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
|
||||
profiles = [obj for obj in m3u_profiles if not obj.is_default]
|
||||
|
||||
# -- Loop through profiles and pick the first active one --
|
||||
for profile in [default_profile] + profiles:
|
||||
logger.debug(f'Checking profile {profile.name}...')
|
||||
if not profile.is_active:
|
||||
logger.debug('Profile is not active, skipping.')
|
||||
continue
|
||||
|
||||
# Acquire the persistent Redis lock, indexed by 0 through max_streams available in the profile
|
||||
stream_index = 0
|
||||
while True:
|
||||
stream_index += 1
|
||||
if stream_index > profile.max_streams:
|
||||
# @TODO: we are bailing here if no profile was found, but we need to end up supporting looping through
|
||||
# all available channel streams
|
||||
logger.debug(f"Profile is using all available streams.")
|
||||
break
|
||||
|
||||
lock_key = f"lock:{channel.id}:{stream.id}:{profile.id}:{stream_index}"
|
||||
persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120)
|
||||
|
||||
if not persistent_lock.acquire():
|
||||
logger.error(f"Could not acquire persistent lock for profile {profile.id} index {stream_index}, currently in use.")
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
if persistent_lock.has_lock:
|
||||
break
|
||||
|
||||
if persistent_lock.has_lock == False:
|
||||
logger.debug(f'Unable to get lock for profile {profile.id}:{profile.name}. Skipping...')
|
||||
continue
|
||||
# *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check ***
|
||||
logger.debug(f"Using M3U profile ID={profile.id} (ignoring viewer count limits)")
|
||||
active_profile = M3UAccountProfile.objects.get(id=profile.id)
|
||||
# Prepare the pattern replacement.
|
||||
logger.debug("Executing the following pattern replacement:")
|
||||
logger.debug(f" search: {profile.search_pattern}")
|
||||
safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', profile.replace_pattern)
|
||||
logger.debug(f" replace: {profile.replace_pattern}")
|
||||
logger.debug(f" safe replace: {safe_replace_pattern}")
|
||||
stream_url = re.sub(profile.search_pattern, safe_replace_pattern, input_url)
|
||||
logger.debug(f"Generated stream url: {stream_url}")
|
||||
|
||||
break
|
||||
|
||||
if active_profile is None:
|
||||
logger.exception("No available profiles for the stream")
|
||||
return HttpResponseServerError("No available profiles for the stream")
|
||||
if persistent_lock.has_lock == False:
|
||||
logger.debug(f"Unable to find any available streams or stream profiles.")
|
||||
return HttpResponseServerError("Resource busy, please try again later.")
|
||||
|
||||
# *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check ***
|
||||
logger.debug(f"Using stream {stream.id}{stream.name}, M3U profile {profile.id}{profile.name}, stream index {stream_index}")
|
||||
active_profile = M3UAccountProfile.objects.get(id=profile.id)
|
||||
|
||||
# Prepare the pattern replacement.
|
||||
logger.debug("Executing the following pattern replacement:")
|
||||
logger.debug(f" search: {active_profile.search_pattern}")
|
||||
safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', active_profile.replace_pattern)
|
||||
logger.debug(f" replace: {active_profile.replace_pattern}")
|
||||
logger.debug(f" safe replace: {safe_replace_pattern}")
|
||||
stream_url = re.sub(active_profile.search_pattern, safe_replace_pattern, input_url)
|
||||
logger.debug(f"Generated stream url: {stream_url}")
|
||||
|
||||
# Get the stream profile set on the channel.
|
||||
stream_profile = channel.stream_profile
|
||||
|
|
@ -106,19 +144,9 @@ def stream_view(request, stream_id):
|
|||
cmd = [stream_profile.command] + parameters.split()
|
||||
logger.debug("Executing command: %s", cmd)
|
||||
|
||||
# Acquire the persistent Redis lock.
|
||||
redis_host = getattr(settings, "REDIS_HOST", "localhost")
|
||||
redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0)
|
||||
lock_key = f"lock:channel:{channel.id}"
|
||||
persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120)
|
||||
|
||||
if not persistent_lock.acquire():
|
||||
logger.error("Could not acquire persistent lock for channel %s", channel.id)
|
||||
return HttpResponseServerError("Resource busy, please try again later.")
|
||||
|
||||
try:
|
||||
# Start the streaming process.
|
||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192)
|
||||
except Exception as e:
|
||||
persistent_lock.release() # Ensure the lock is released on error.
|
||||
logger.exception("Error starting stream for channel ID=%s", stream_id)
|
||||
|
|
@ -137,6 +165,7 @@ def stream_view(request, stream_id):
|
|||
yield chunk
|
||||
finally:
|
||||
try:
|
||||
|
||||
proc.terminate()
|
||||
logger.debug("Streaming process terminated for stream ID=%s", s.id)
|
||||
except Exception as e:
|
||||
|
|
@ -144,6 +173,7 @@ def stream_view(request, stream_id):
|
|||
persistent_lock.release()
|
||||
logger.debug("Persistent lock released for channel ID=%s", channel.id)
|
||||
|
||||
|
||||
return StreamingHttpResponse(
|
||||
stream_generator(process, stream, persistent_lock),
|
||||
content_type="video/MP2T"
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import redis
|
|||
class PersistentLock:
|
||||
"""
|
||||
A persistent, auto-expiring lock that uses Redis.
|
||||
|
||||
|
||||
Usage:
|
||||
1. Instantiate with a Redis client, a unique lock key (e.g. "lock:account:123"),
|
||||
and an optional timeout (in seconds).
|
||||
|
|
@ -16,7 +16,7 @@ class PersistentLock:
|
|||
def __init__(self, redis_client: redis.Redis, lock_key: str, lock_timeout: int = 120):
|
||||
"""
|
||||
Initialize the lock.
|
||||
|
||||
|
||||
:param redis_client: An instance of redis.Redis.
|
||||
:param lock_key: The unique key for the lock.
|
||||
:param lock_timeout: Time-to-live for the lock in seconds.
|
||||
|
|
@ -25,6 +25,10 @@ class PersistentLock:
|
|||
self.lock_key = lock_key
|
||||
self.lock_timeout = lock_timeout
|
||||
self.lock_token = None
|
||||
self.has_lock = False
|
||||
|
||||
def has_lock(self) -> bool:
|
||||
return self.has_lock
|
||||
|
||||
def acquire(self) -> bool:
|
||||
"""
|
||||
|
|
@ -33,6 +37,9 @@ class PersistentLock:
|
|||
self.lock_token = str(uuid.uuid4())
|
||||
# Set the lock with NX (only if not exists) and EX (expire time)
|
||||
result = self.redis_client.set(self.lock_key, self.lock_token, nx=True, ex=self.lock_timeout)
|
||||
if result is not None:
|
||||
self.has_lock = True
|
||||
|
||||
return result is not None
|
||||
|
||||
def refresh(self) -> bool:
|
||||
|
|
@ -43,6 +50,7 @@ class PersistentLock:
|
|||
current_value = self.redis_client.get(self.lock_key)
|
||||
if current_value and current_value.decode("utf-8") == self.lock_token:
|
||||
self.redis_client.expire(self.lock_key, self.lock_timeout)
|
||||
self.has_lock = False
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@
|
|||
<meta name="theme-color" content="#000000" />
|
||||
<meta
|
||||
name="description"
|
||||
content="Web site created using create-react-app"
|
||||
content="IPTV Master Control"
|
||||
/>
|
||||
<link rel="apple-touch-icon" href="%PUBLIC_URL%/logo192.png" />
|
||||
<!--
|
||||
|
|
@ -42,7 +42,7 @@
|
|||
|
||||
|
||||
|
||||
<title>React App</title>
|
||||
<title>Dispatcharr</title>
|
||||
</head>
|
||||
<body>
|
||||
<noscript>You need to enable JavaScript to run this app.</noscript>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue