Websockets, fixed channel name collision, added back in multi-stream per channel support

This commit is contained in:
dekzter 2025-03-05 17:04:43 -05:00
parent 993ab0828f
commit 3ecb49375c
10 changed files with 122 additions and 52 deletions

1
.gitignore vendored
View file

@ -6,3 +6,4 @@ __pycache__/
node_modules/ node_modules/
.history/ .history/
staticfiles/ staticfiles/
static/

View file

@ -9,7 +9,7 @@ class User(AbstractUser):
""" """
avatar_config = models.JSONField(default=dict, blank=True, null=True) avatar_config = models.JSONField(default=dict, blank=True, null=True)
channel_groups = models.ManyToManyField( channel_groups = models.ManyToManyField(
'channels.ChannelGroup', # Updated reference to renamed model 'dispatcharr_channels.ChannelGroup', # Updated reference to renamed model
blank=True, blank=True,
related_name="users" related_name="users"
) )

View file

@ -4,7 +4,8 @@ class ChannelsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField' default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.channels' name = 'apps.channels'
verbose_name = "Channel & Stream Management" verbose_name = "Channel & Stream Management"
label = 'dispatcharr_channels'
def ready(self): def ready(self):
# Import signals so they get registered. # Import signals so they get registered.
import apps.channels.signals import apps.channels.signals

View file

@ -9,6 +9,8 @@ from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
from .models import M3UAccount from .models import M3UAccount
from apps.channels.models import Stream from apps.channels.models import Stream
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -120,7 +122,7 @@ def refresh_single_m3u_account(account_id):
# Extract tvg-id # Extract tvg-id
tvg_id_match = re.search(r'tvg-id="([^"]*)"', line) tvg_id_match = re.search(r'tvg-id="([^"]*)"', line)
tvg_id = tvg_id_match.group(1) if tvg_id_match else "" tvg_id = tvg_id_match.group(1) if tvg_id_match else ""
fallback_name = line.split(",", 1)[-1].strip() if "," in line else "Default Stream" fallback_name = line.split(",", 1)[-1].strip() if "," in line else "Default Stream"
name = tvg_name_match.group(1) if tvg_name_match else fallback_name name = tvg_name_match.group(1) if tvg_name_match else fallback_name
@ -178,6 +180,14 @@ def refresh_single_m3u_account(account_id):
logger.info(f"Completed parsing. Created {created_count} new Streams, updated {updated_count} existing Streams, excluded {excluded_count} Streams.") logger.info(f"Completed parsing. Created {created_count} new Streams, updated {updated_count} existing Streams, excluded {excluded_count} Streams.")
release_lock('refresh_single_m3u_account', account_id) release_lock('refresh_single_m3u_account', account_id)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"updates",
{
"type": "m3u_refresh",
"message": {"success": True, "message": "M3U refresh completed successfully"}
},
)
return f"Account {account_id} => Created {created_count}, updated {updated_count}, excluded {excluded_count} Streams." return f"Account {account_id} => Created {created_count}, updated {updated_count}, excluded {excluded_count} Streams."
def process_uploaded_m3u_file(file, account): def process_uploaded_m3u_file(file, account):

View file

@ -37,7 +37,7 @@ def stream_view(request, stream_id):
""" """
try: try:
redis_host = getattr(settings, "REDIS_HOST", "localhost") redis_host = getattr(settings, "REDIS_HOST", "localhost")
redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0) redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=getattr(settings, "REDIS_DB", "0"))
# Retrieve the channel by the provided stream_id. # Retrieve the channel by the provided stream_id.
channel = Channel.objects.get(channel_number=stream_id) channel = Channel.objects.get(channel_number=stream_id)
@ -48,57 +48,70 @@ def stream_view(request, stream_id):
logger.error("No streams found for channel ID=%s", channel.id) logger.error("No streams found for channel ID=%s", channel.id)
return HttpResponseServerError("No stream found for this channel.") return HttpResponseServerError("No stream found for this channel.")
# Get the first available stream. active_stream = None
stream = channel.streams.first() m3u_account = None
logger.debug("Using stream: ID=%s, Name=%s", stream.id, stream.name)
# Retrieve the M3U account associated with the stream.
m3u_account = stream.m3u_account
logger.debug("Using M3U account ID=%s, Name=%s", m3u_account.id, m3u_account.name)
# Use the custom URL if available; otherwise, use the standard URL.
input_url = stream.custom_url or stream.url
logger.debug("Input URL: %s", input_url)
# Determine which profile we can use.
m3u_profiles = m3u_account.profiles.all()
default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
profiles = [obj for obj in m3u_profiles if not obj.is_default]
active_profile = None active_profile = None
lock_key = None lock_key = None
persistent_lock = None persistent_lock = None
# -- Loop through profiles and pick the first active one --
for profile in [default_profile] + profiles:
logger.debug(f'Checking profile {profile.name}...')
if not profile.is_active:
logger.debug('Profile is not active, skipping.')
continue
logger.debug(f'Profile has a max streams of {profile.max_streams}, checking if any are available') streams = channel.streams.all().order_by('channelstream__order')
stream_index = 0 logger.debug(f'Found {len(streams)} streams for channel {channel.channel_number}')
while stream_index < profile.max_streams: for stream in streams:
stream_index += 1 # Get the first available stream.
logger.debug("Checking stream: ID=%s, Name=%s", stream.id, stream.name)
lock_key = f"lock:{profile.id}:{stream_index}" # Retrieve the M3U account associated with the stream.
persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120) m3u_account = stream.m3u_account
logger.debug(f'Attempting to acquire lock: {lock_key}') logger.debug("Stream M3U account ID=%s, Name=%s", m3u_account.id, m3u_account.name)
if not persistent_lock.acquire(): # Use the custom URL if available; otherwise, use the standard URL.
logger.error(f"Could not acquire persistent lock for profile {profile.id} index {stream_index}, currently in use.") input_url = stream.custom_url or stream.url
persistent_lock = None logger.debug("Input URL: %s", input_url)
# Determine which profile we can use.
m3u_profiles = m3u_account.profiles.all()
default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
profiles = [obj for obj in m3u_profiles if not obj.is_default]
# -- Loop through profiles and pick the first active one --
for profile in [default_profile] + profiles:
logger.debug(f'Checking profile {profile.name}...')
if not profile.is_active:
logger.debug('Profile is not active, skipping.')
continue continue
break logger.debug(f'Profile has a max streams of {profile.max_streams}, checking if any are available')
stream_index = 0
while stream_index < profile.max_streams:
stream_index += 1
if persistent_lock is not None: lock_key = f"lock:{profile.id}:{stream_index}"
logger.debug(f'Successfully acquired lock: {lock_key}') persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120)
active_profile = M3UAccountProfile.objects.get(id=profile.id) logger.debug(f'Attempting to acquire lock: {lock_key}')
break
if active_profile is None or persistent_lock is None: if not persistent_lock.acquire():
logger.exception("No available profiles for the stream") logger.error(f"Could not acquire persistent lock for profile {profile.id} index {stream_index}, currently in use.")
return HttpResponseServerError("No available profiles for the stream") persistent_lock = None
continue
break
if persistent_lock is not None:
logger.debug(f'Successfully acquired lock: {lock_key}')
active_profile = M3UAccountProfile.objects.get(id=profile.id)
break
if active_profile is None or persistent_lock is None:
logger.exception("No available profiles for the stream")
continue
logger.debug(f"Found available stream profile: stream={stream.name}, profile={profile.name}")
break
if not active_profile:
logger.exception("No available streams for this channel")
return HttpResponseServerError("No available streams for this channel")
logger.debug(f"Using M3U profile ID={active_profile.id} (ignoring viewer count limits)") logger.debug(f"Using M3U profile ID={active_profile.id} (ignoring viewer count limits)")
# Prepare the pattern replacement. # Prepare the pattern replacement.

View file

@ -1,8 +1,14 @@
"""
ASGI config for dispatcharr project.
"""
import os import os
from django.core.asgi import get_asgi_application from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import dispatcharr.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dispatcharr.settings') os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dispatcharr.settings")
application = get_asgi_application()
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(dispatcharr.routing.websocket_urlpatterns)
),
})

18
dispatcharr/consumers.py Normal file
View file

@ -0,0 +1,18 @@
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class MyWebSocketConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
self.room_name = "updates"
await self.channel_layer.group_add(self.room_name, self.channel_name)
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_name, self.channel_name)
async def receive(self, text_data):
data = json.loads(text_data)
print("Received:", data)
async def m3u_refresh(self, event):
await self.send(text_data=json.dumps(event))

6
dispatcharr/routing.py Normal file
View file

@ -0,0 +1,6 @@
from django.urls import path
from dispatcharr.consumers import MyWebSocketConsumer
websocket_urlpatterns = [
path("ws/", MyWebSocketConsumer.as_asgi()),
]

View file

@ -6,6 +6,7 @@ BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = 'REPLACE_ME_WITH_A_REAL_SECRET' SECRET_KEY = 'REPLACE_ME_WITH_A_REAL_SECRET'
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost") REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_DB = os.environ.get("REDIS_DB", "localhost")
DEBUG = True DEBUG = True
ALLOWED_HOSTS = ["*"] ALLOWED_HOSTS = ["*"]
@ -13,7 +14,7 @@ ALLOWED_HOSTS = ["*"]
INSTALLED_APPS = [ INSTALLED_APPS = [
'apps.api', 'apps.api',
'apps.accounts', 'apps.accounts',
'apps.channels', 'apps.channels.apps.ChannelsConfig',
'apps.dashboard', 'apps.dashboard',
'apps.epg', 'apps.epg',
'apps.hdhr', 'apps.hdhr',
@ -21,6 +22,8 @@ INSTALLED_APPS = [
'apps.output', 'apps.output',
'core', 'core',
'drf_yasg', 'drf_yasg',
'daphne',
'channels',
'django.contrib.admin', 'django.contrib.admin',
'django.contrib.auth', 'django.contrib.auth',
'django.contrib.contenttypes', 'django.contrib.contenttypes',
@ -69,6 +72,15 @@ TEMPLATES = [
WSGI_APPLICATION = 'dispatcharr.wsgi.application' WSGI_APPLICATION = 'dispatcharr.wsgi.application'
ASGI_APPLICATION = 'dispatcharr.asgi.application' ASGI_APPLICATION = 'dispatcharr.asgi.application'
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [(REDIS_HOST, 6379, REDIS_DB)], # Ensure Redis is running
},
},
}
if os.getenv('DB_ENGINE', None) == 'sqlite': if os.getenv('DB_ENGINE', None) == 'sqlite':
DATABASES = { DATABASES = {
'default': { 'default': {

View file

@ -6,6 +6,7 @@ from django.views.generic import TemplateView, RedirectView
from rest_framework import permissions from rest_framework import permissions
from drf_yasg.views import get_schema_view from drf_yasg.views import get_schema_view
from drf_yasg import openapi from drf_yasg import openapi
from .routing import websocket_urlpatterns
# Define schema_view for Swagger # Define schema_view for Swagger
schema_view = get_schema_view( schema_view = get_schema_view(
@ -24,7 +25,7 @@ schema_view = get_schema_view(
urlpatterns = [ urlpatterns = [
# API Routes # API Routes
path('api/', include(('apps.api.urls', 'api'), namespace='api')), path('api/', include(('apps.api.urls', 'api'), namespace='api')),
path('api', RedirectView.as_view(url='/api/', permanent=True)), path('api', RedirectView.as_view(url='/api/', permanent=True)),
# Admin # Admin
path('admin', RedirectView.as_view(url='/admin/', permanent=True)), # This fixes the issue path('admin', RedirectView.as_view(url='/admin/', permanent=True)), # This fixes the issue
@ -52,6 +53,8 @@ urlpatterns = [
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
urlpatterns += websocket_urlpatterns
# Serve static files for development (React's JS, CSS, etc.) # Serve static files for development (React's JS, CSS, etc.)
if settings.DEBUG: if settings.DEBUG:
urlpatterns += static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) urlpatterns += static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)