From 91a85020c35db9b86f9f0858d69372502424a92b Mon Sep 17 00:00:00 2001 From: dekzter Date: Thu, 27 Mar 2025 09:26:04 -0400 Subject: [PATCH 1/2] modifications to allow previewing of a raw stream --- apps/channels/models.py | 15 +++++++++++---- apps/channels/serializers.py | 3 ++- apps/proxy/ts_proxy/server.py | 18 +++++++++++++----- .../ts_proxy/services/channel_service.py | 7 +++++-- apps/proxy/ts_proxy/stream_manager.py | 6 ++---- apps/proxy/ts_proxy/url_utils.py | 19 +++++++++++++++++-- apps/proxy/ts_proxy/views.py | 7 ++++--- 7 files changed, 54 insertions(+), 21 deletions(-) diff --git a/apps/channels/models.py b/apps/channels/models.py index 04a9fd16..ceeae01e 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -133,15 +133,21 @@ class Stream(models.Model): stream = cls.objects.create(**fields_to_update) return stream, True # True means it was created + # @TODO: honor stream's stream profile + def get_stream_profile(self): + stream_profile = StreamProfile.objects.get(id=CoreSettings.get_default_stream_profile_id()) + + return stream_profile + def get_stream(self): """ Finds an available stream for the requested channel and returns the selected stream and profile. """ - profile_id = redis_client.get(f"stream_profile:{stream_id}") + profile_id = redis_client.get(f"stream_profile:{self.id}") if profile_id: profile_id = int(profile_id) - return profile_id + return self.id, profile_id # Retrieve the M3U account associated with the stream. m3u_account = self.m3u_account @@ -168,10 +174,10 @@ class Stream(models.Model): if profile.max_streams > 0: redis_client.incr(profile_connections_key) - return profile.id # Return newly assigned stream and matched profile + return self.id, profile.id # Return newly assigned stream and matched profile # 4. No available streams - return None + return None, None def release_stream(self): """ @@ -260,6 +266,7 @@ class Channel(models.Model): def __str__(self): return f"{self.channel_number} - {self.name}" + # @TODO: honor stream's stream profile def get_stream_profile(self): stream_profile = self.stream_profile if not stream_profile: diff --git a/apps/channels/serializers.py b/apps/channels/serializers.py index 65843dea..9f31eea2 100644 --- a/apps/channels/serializers.py +++ b/apps/channels/serializers.py @@ -14,7 +14,7 @@ class StreamSerializer(serializers.ModelSerializer): allow_null=True, required=False ) - read_only_fields = ['is_custom', 'm3u_account'] + read_only_fields = ['is_custom', 'm3u_account', 'stream_hash'] class Meta: model = Stream @@ -31,6 +31,7 @@ class StreamSerializer(serializers.ModelSerializer): 'stream_profile_id', 'is_custom', 'channel_group', + 'stream_hash', ] def get_fields(self): diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 2dd923fd..282c5a74 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -17,7 +17,7 @@ import os import json from typing import Dict, Optional, Set from apps.proxy.config import TSConfig as Config -from apps.channels.models import Channel +from apps.channels.models import Channel, Stream from core.utils import redis_client as global_redis_client, redis_pubsub_client as global_redis_pubsub_client # Import both global Redis clients from redis.exceptions import ConnectionError, TimeoutError from .stream_manager import StreamManager @@ -740,12 +740,16 @@ class ProxyServer: # Force release resources in the Channel model try: - from apps.channels.models import Channel channel = Channel.objects.get(uuid=channel_id) channel.release_stream() logger.info(f"Released stream allocation for zombie channel {channel_id}") except Exception as e: - logger.error(f"Error releasing stream for zombie channel {channel_id}: {e}") + try: + stream = Stream.objects.get(stream_hash=channel_id) + stream.release_stream() + logger.info(f"Released stream allocation for zombie channel {channel_id}") + except Exception as e: + logger.error(f"Error releasing stream for zombie channel {channel_id}: {e}") return True except Exception as e: @@ -1067,8 +1071,12 @@ class ProxyServer: def _clean_redis_keys(self, channel_id): """Clean up all Redis keys for a channel more efficiently""" # Release the channel, stream, and profile keys from the channel - channel = Channel.objects.get(uuid=channel_id) - channel.release_stream() + try: + channel = Channel.objects.get(uuid=channel_id) + channel.release_stream() + except: + stream = Stream.objects.get(stream_hash=channel_id) + stream.release_stream() if not self.redis_client: return 0 diff --git a/apps/proxy/ts_proxy/services/channel_service.py b/apps/proxy/ts_proxy/services/channel_service.py index 210e4b0f..d10d5fed 100644 --- a/apps/proxy/ts_proxy/services/channel_service.py +++ b/apps/proxy/ts_proxy/services/channel_service.py @@ -248,8 +248,11 @@ class ChannelService: logger.info(f"Released channel {channel_id} stream allocation") model_released = True except Channel.DoesNotExist: - logger.warning(f"Could not find Channel model for UUID {channel_id}") - model_released = False + logger.warning(f"Could not find Channel model for UUID {channel_id}, attempting stream hash") + stream = Stream.objects.get(stream_hash=channel_id) + stream.release_stream() + logger.info(f"Released stream {channel_id} stream allocation") + model_released = True except Exception as e: logger.error(f"Error releasing channel stream: {e}") model_released = False diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index db0d6bb9..d2247135 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -17,7 +17,7 @@ from .utils import detect_stream_type, get_logger from .redis_keys import RedisKeys from .constants import ChannelState, EventType, StreamType, TS_PACKET_SIZE from .config_helper import ConfigHelper -from .url_utils import get_alternate_streams, get_stream_info_for_switch +from .url_utils import get_alternate_streams, get_stream_info_for_switch, get_stream_object logger = get_logger() @@ -304,7 +304,7 @@ class StreamManager: """Establish a connection using transcoding""" try: logger.debug(f"Building transcode command for channel {self.channel_id}") - channel = get_object_or_404(Channel, uuid=self.channel_id) + channel = get_stream_object(self.channel_id) # Use FFmpeg specifically for HLS streams if hasattr(self, 'force_ffmpeg') and self.force_ffmpeg: @@ -908,5 +908,3 @@ class StreamManager: except Exception as e: logger.error(f"Error trying next stream for channel {self.channel_id}: {e}", exc_info=True) return False - - diff --git a/apps/proxy/ts_proxy/url_utils.py b/apps/proxy/ts_proxy/url_utils.py index a0ed4476..927ea7e8 100644 --- a/apps/proxy/ts_proxy/url_utils.py +++ b/apps/proxy/ts_proxy/url_utils.py @@ -10,9 +10,20 @@ from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile from core.models import UserAgent, CoreSettings from .utils import get_logger +from uuid import UUID logger = get_logger() +def get_stream_object(id: str): + try: + uuid_obj = UUID(id, version=4) + logger.info(f"Fetching channel ID {id}") + return get_object_or_404(Channel, uuid=id) + except: + # UUID check failed, assume stream hash + logger.info(f"Fetching stream hash {id}") + return get_object_or_404(Stream, stream_hash=id) + def generate_stream_url(channel_id: str) -> Tuple[str, str, bool]: """ Generate the appropriate stream URL for a channel based on its profile settings. @@ -24,7 +35,7 @@ def generate_stream_url(channel_id: str) -> Tuple[str, str, bool]: Tuple[str, str, bool]: (stream_url, user_agent, transcode_flag) """ # Get channel and related objects - channel = get_object_or_404(Channel, uuid=channel_id) + channel = get_stream_object(channel_id) stream_id, profile_id = channel.get_stream() if stream_id is None or profile_id is None: @@ -177,7 +188,11 @@ def get_alternate_streams(channel_id: str, current_stream_id: Optional[int] = No """ try: # Get channel object - channel = get_object_or_404(Channel, uuid=channel_id) + channel = get_stream_object(channel_id) + if isinstance(channel, Stream): + logger.error(f"Stream is not a channel") + return [] + logger.debug(f"Looking for alternate streams for channel {channel_id}, current stream ID: {current_stream_id}") # Get all assigned streams for this channel diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index fe87e677..3ba89123 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -21,8 +21,9 @@ from rest_framework.permissions import IsAuthenticated from .constants import ChannelState, EventType, StreamType from .config_helper import ConfigHelper from .services.channel_service import ChannelService -from .url_utils import generate_stream_url, transform_url, get_stream_info_for_switch +from .url_utils import generate_stream_url, transform_url, get_stream_info_for_switch, get_stream_object from .utils import get_logger +from uuid import UUID logger = get_logger() @@ -30,9 +31,9 @@ logger = get_logger() @api_view(['GET']) def stream_ts(request, channel_id): """Stream TS data to client with immediate response and keep-alive packets during initialization""" + channel = get_stream_object(channel_id) + client_user_agent = None - logger.info(f"Fetching channel ID {channel_id}") - channel = get_object_or_404(Channel, uuid=channel_id) try: # Generate a unique client ID From d7f96005b88156640cc7be24f3ea950e8ae94b12 Mon Sep 17 00:00:00 2001 From: dekzter Date: Thu, 27 Mar 2025 09:27:45 -0400 Subject: [PATCH 2/2] stream preview functionality --- .../src/components/tables/StreamsTable.jsx | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/frontend/src/components/tables/StreamsTable.jsx b/frontend/src/components/tables/StreamsTable.jsx index 8622752c..19b15aa4 100644 --- a/frontend/src/components/tables/StreamsTable.jsx +++ b/frontend/src/components/tables/StreamsTable.jsx @@ -44,6 +44,8 @@ import { IconSquarePlus, } from '@tabler/icons-react'; import { useNavigate } from 'react-router-dom'; +import useSettingsStore from '../../store/settings'; +import useVideoStore from '../../store/useVideoStore'; const StreamsTable = ({}) => { const theme = useMantineTheme(); @@ -91,6 +93,10 @@ const StreamsTable = ({}) => { const channelSelectionStreams = useChannelsStore( (state) => state.channels[state.channelsPageSelection[0]?.id]?.streams ); + const { + environment: { env_mode }, + } = useSettingsStore(); + const { showVideo } = useVideoStore(); const isMoreActionsOpen = Boolean(moreActionsAnchorEl); @@ -432,6 +438,14 @@ const StreamsTable = ({}) => { setPagination(updater); }; + function handleWatchStream(streamHash) { + let vidUrl = `/proxy/ts/stream/${streamHash}`; + if (env_mode == 'dev') { + vidUrl = `${window.location.protocol}//${window.location.hostname}:5656${vidUrl}`; + } + showVideo(vidUrl); + } + const table = useMantineReactTable({ ...TableHelper.defaultProperties, columns, @@ -565,6 +579,11 @@ const StreamsTable = ({}) => { > Delete Stream + handleWatchStream(row.original.stream_hash)} + > + Preview Stream +