diff --git a/apps/channels/models.py b/apps/channels/models.py index 5ba5974e..54f8b11a 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -109,7 +109,7 @@ class Channel(models.Model): def get_stream_profile(self): stream_profile = self.stream_profile if not stream_profile: - stream_profile = StreamProfile.objects.get(id=CoreSettings.objects.get(key="default-stream-profile").value) + stream_profile = CoreSettings.get_default_stream_profile() return stream_profile diff --git a/apps/proxy/ts_proxy/urls.py b/apps/proxy/ts_proxy/urls.py index 743f0d8f..0f88e4ff 100644 --- a/apps/proxy/ts_proxy/urls.py +++ b/apps/proxy/ts_proxy/urls.py @@ -4,7 +4,7 @@ from . import views app_name = 'ts_proxy' urlpatterns = [ - path('stream/', views.stream_ts, name='stream'), + # path('stream/', views.stream_ts, name='stream'), path('change_stream/', views.change_stream, name='change_stream'), path('status', views.channel_status, name='channel_status'), path('status/', views.channel_status, name='channel_status_detail'), diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 4dc449cb..34ab09e3 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -5,7 +5,7 @@ import random import sys import os import re -from django.http import StreamingHttpResponse, JsonResponse +from django.http import StreamingHttpResponse, JsonResponse, HttpResponseRedirect from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods, require_GET from django.shortcuts import get_object_or_404 @@ -14,7 +14,7 @@ from .server import ProxyServer from apps.proxy.ts_proxy.server import logging as server_logging from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile -from core.models import UserAgent +from core.models import UserAgent, CoreSettings # Configure logging properly to ensure visibility logger = server_logging @@ -83,6 +83,8 @@ def stream_ts(request, channel_id): # Load in the user-agent for the account m3u_account = M3UAccount.objects.get(id=profile.m3u_account.id) user_agent = UserAgent.objects.get(id=m3u_account.user_agent.id).user_agent + if user_agent is None: + user_agent = CoreSettings.get_default_user_agent() # Generate stream URL based on the selected profile input_url = stream.custom_url or stream.url @@ -95,19 +97,22 @@ def stream_ts(request, channel_id): logger.debug(f"Generated stream url: {stream_url}") # Generate transcode command - # @TODO: once complete, provide option to direct proxy - transcode_cmd = channel.get_stream_profile().build_command(stream_url) + stream_profile = channel.get_stream_profile() + if stream_profile.is_redirect(): + return HttpResponseRedirect(stream_url) + + transcode_cmd = stream_profile.build_command(stream_url, user_agent or "") if not initialize_stream(channel_id, stream_url, user_agent, transcode_cmd): return JsonResponse({'error': 'Failed to initialize channel'}, status=500) - if user_agent is None: - # Extract if not set - for header in ['HTTP_USER_AGENT', 'User-Agent', 'user-agent']: - if header in request.META: - user_agent = request.META[header] - logger.debug(f"[{client_id}] Found user agent in header: {header}") - break + # Extract user agent from client + user_agent = None + for header in ['HTTP_USER_AGENT', 'User-Agent', 'user-agent']: + if header in request.META: + user_agent = request.META[header] + logger.debug(f"[{client_id}] Found user agent in header: {header}") + break # Wait for channel to become ready if it's initializing if proxy_server.redis_client: @@ -581,81 +586,81 @@ def channel_status(request, channel_id=None): # Check if Redis is available if not proxy_server.redis_client: return JsonResponse({'error': 'Redis connection not available'}, status=500) - + # Function for detailed channel info (used when channel_id is provided) def get_detailed_channel_info(channel_id): # Get channel metadata metadata_key = f"ts_proxy:channel:{channel_id}:metadata" metadata = proxy_server.redis_client.hgetall(metadata_key) - + if not metadata: return None - + # Get detailed info - existing implementation # Get channel metadata metadata_key = f"ts_proxy:channel:{channel_id}:metadata" metadata = proxy_server.redis_client.hgetall(metadata_key) - + if not metadata: return None - + # Basic channel info buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index") - + info = { 'channel_id': channel_id, 'state': metadata.get(b'state', b'unknown').decode('utf-8'), 'url': metadata.get(b'url', b'').decode('utf-8'), 'created_at': metadata.get(b'created_at', b'0').decode('utf-8'), 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), - + # Properly decode the buffer index value 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, } - + # Add timing information if b'state_changed_at' in metadata: state_changed_at = float(metadata[b'state_changed_at'].decode('utf-8')) info['state_changed_at'] = state_changed_at info['state_duration'] = time.time() - state_changed_at - + if b'created_at' in metadata: created_at = float(metadata[b'created_at'].decode('utf-8')) info['created_at'] = created_at info['uptime'] = time.time() - created_at - + # Get client information client_set_key = f"ts_proxy:channel:{channel_id}:clients" client_ids = proxy_server.redis_client.smembers(client_set_key) clients = [] - + for client_id in client_ids: client_id_str = client_id.decode('utf-8') client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" client_data = proxy_server.redis_client.hgetall(client_key) - + if client_data: client_info = { 'client_id': client_id_str, 'user_agent': client_data.get(b'user_agent', b'unknown').decode('utf-8'), 'worker_id': client_data.get(b'worker_id', b'unknown').decode('utf-8'), } - + if b'connected_at' in client_data: connected_at = float(client_data[b'connected_at'].decode('utf-8')) client_info['connected_at'] = connected_at client_info['connection_duration'] = time.time() - connected_at - + if b'last_active' in client_data: last_active = float(client_data[b'last_active'].decode('utf-8')) client_info['last_active'] = last_active client_info['last_active_ago'] = time.time() - last_active - + clients.append(client_info) - + info['clients'] = clients info['client_count'] = len(clients) - + # Get buffer health with improved diagnostics buffer_stats = { 'chunks': info['buffer_index'], @@ -669,11 +674,11 @@ def channel_status(request, channel_id=None): chunk_sizes = [] chunk_keys_found = [] chunk_keys_missing = [] - + # Check if the keys exist before getting for i in range(info['buffer_index']-sample_chunks+1, info['buffer_index']+1): chunk_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{i}" - + # Check if key exists first if proxy_server.redis_client.exists(chunk_key): chunk_data = proxy_server.redis_client.get(chunk_key) @@ -681,11 +686,11 @@ def channel_status(request, channel_id=None): chunk_size = len(chunk_data) chunk_sizes.append(chunk_size) chunk_keys_found.append(i) - + # Check for TS alignment (packets are 188 bytes) ts_packets = chunk_size // 188 ts_aligned = chunk_size % 188 == 0 - + # Add for first chunk only to avoid too much data if len(chunk_keys_found) == 1: buffer_stats['diagnostics']['first_chunk'] = { @@ -697,18 +702,18 @@ def channel_status(request, channel_id=None): } else: chunk_keys_missing.append(i) - - # Add detailed diagnostics + + # Add detailed diagnostics if chunk_sizes: buffer_stats['avg_chunk_size'] = sum(chunk_sizes) / len(chunk_sizes) - buffer_stats['recent_chunk_sizes'] = chunk_sizes + buffer_stats['recent_chunk_sizes'] = chunk_sizes buffer_stats['keys_found'] = chunk_keys_found buffer_stats['keys_missing'] = chunk_keys_missing - + # Calculate data rate total_data = sum(chunk_sizes) buffer_stats['total_sample_bytes'] = total_data - + # Add TS packet analysis total_ts_packets = total_data // 188 buffer_stats['estimated_ts_packets'] = total_ts_packets @@ -719,29 +724,29 @@ def channel_status(request, channel_id=None): cursor = 0 buffer_key_pattern = f"ts_proxy:channel:{channel_id}:buffer:chunk:*" - + while True: cursor, keys = proxy_server.redis_client.scan(cursor, match=buffer_key_pattern, count=100) if keys: all_buffer_keys.extend([k.decode('utf-8') for k in keys]) if cursor == 0 or len(all_buffer_keys) >= 20: # Limit to 20 keys break - + buffer_stats['diagnostics']['all_buffer_keys'] = all_buffer_keys[:20] # First 20 keys buffer_stats['diagnostics']['total_buffer_keys'] = len(all_buffer_keys) - + except Exception as e: # Capture any errors for diagnostics buffer_stats['error'] = str(e) buffer_stats['diagnostics']['exception'] = str(e) - + # Add TTL information to see if chunks are expiring - chunk_ttl_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{info['buffer_index']}" + chunk_ttl_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{info['buffer_index']}" chunk_ttl = proxy_server.redis_client.ttl(chunk_ttl_key) buffer_stats['latest_chunk_ttl'] = chunk_ttl info['buffer_stats'] = buffer_stats - + # Get local worker info if available if channel_id in proxy_server.stream_managers: manager = proxy_server.stream_managers[channel_id] @@ -751,29 +756,29 @@ def channel_status(request, channel_id=None): 'last_data_time': manager.last_data_time, 'last_data_age': time.time() - manager.last_data_time } - + return info - + # Function for basic channel info (used for all channels summary) def get_basic_channel_info(channel_id): # Get channel metadata metadata_key = f"ts_proxy:channel:{channel_id}:metadata" metadata = proxy_server.redis_client.hgetall(metadata_key) - + if not metadata: return None - + # Basic channel info only - omit diagnostics and details buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index") - + # Count clients (using efficient count method) client_set_key = f"ts_proxy:channel:{channel_id}:clients" client_count = proxy_server.redis_client.scard(client_set_key) or 0 - + # Calculate uptime created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) uptime = time.time() - created_at if created_at > 0 else 0 - + # Simplified info info = { 'channel_id': channel_id, @@ -784,7 +789,7 @@ def channel_status(request, channel_id=None): 'client_count': client_count, 'uptime': uptime } - + # Quick health check if available locally if channel_id in proxy_server.stream_managers: manager = proxy_server.stream_managers[channel_id] @@ -801,31 +806,31 @@ def channel_status(request, channel_id=None): for client_id in list(client_ids)[:10]: client_id_str = client_id.decode('utf-8') client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" - + # Efficient way - just retrieve the essentials client_info = { 'client_id': client_id_str, 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent') } - + if client_info['user_agent']: client_info['user_agent'] = client_info['user_agent'].decode('utf-8') else: client_info['user_agent'] = 'unknown' - + # Just get connected_at for client age connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') if connected_at_bytes: connected_at = float(connected_at_bytes.decode('utf-8')) client_info['connected_since'] = time.time() - connected_at - + clients.append(client_info) # Add clients to info info['clients'] = clients - + return info - + # Handle single channel or all channels if channel_id: # Detailed info for specific channel @@ -838,7 +843,7 @@ def channel_status(request, channel_id=None): # Basic info for all channels channel_pattern = "ts_proxy:channel:*:metadata" all_channels = [] - + # Extract channel IDs from keys cursor = 0 while True: @@ -850,12 +855,12 @@ def channel_status(request, channel_id=None): channel_info = get_basic_channel_info(ch_id) if channel_info: all_channels.append(channel_info) - + if cursor == 0: break - + return JsonResponse({'channels': all_channels, 'count': len(all_channels)}) - + except Exception as e: logger.error(f"Error in channel_status: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) diff --git a/core/migrations/0005_streamprofile_locked_alter_streamprofile_command_and_more.py b/core/migrations/0005_streamprofile_locked_alter_streamprofile_command_and_more.py new file mode 100644 index 00000000..71f7de0b --- /dev/null +++ b/core/migrations/0005_streamprofile_locked_alter_streamprofile_command_and_more.py @@ -0,0 +1,34 @@ +# Generated by Django 5.1.6 on 2025-03-14 18:12 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0004_preload_core_settings'), + ] + + operations = [ + migrations.AddField( + model_name='streamprofile', + name='locked', + field=models.BooleanField(default=False, help_text="Protected - can't be deleted or modified"), + ), + migrations.AlterField( + model_name='streamprofile', + name='command', + field=models.CharField(blank=True, help_text="Command to execute (e.g., 'yt.sh', 'streamlink', or 'vlc')", max_length=255), + ), + migrations.AlterField( + model_name='streamprofile', + name='parameters', + field=models.TextField(blank=True, help_text='Command-line parameters. Use {userAgent} and {streamUrl} as placeholders.'), + ), + migrations.AlterField( + model_name='streamprofile', + name='user_agent', + field=models.ForeignKey(blank=True, help_text='Optional user agent to use. If not set, you can fall back to a default.', null=True, on_delete=django.db.models.deletion.SET_NULL, to='core.useragent'), + ), + ] diff --git a/core/migrations/0006_set_locked_stream_profiles.py b/core/migrations/0006_set_locked_stream_profiles.py new file mode 100644 index 00000000..1e390c9b --- /dev/null +++ b/core/migrations/0006_set_locked_stream_profiles.py @@ -0,0 +1,55 @@ +# Generated by Django 5.1.6 on 2025-03-14 17:46 + +from django.db import migrations, models +from core.models import CoreSettings, StreamProfile + +def lock_or_create_profiles(apps, schema_editor): + StreamProfile = apps.get_model("core", "StreamProfile") + UserAgent = apps.get_model("core", "UserAgent") + + # Define the system profiles that should exist + system_profiles = [ + { + "name": "ffmpeg", + "command": "ffmpeg", + "parameters": "-i {streamUrl} -c:v copy -c:a copy -f mpegts pipe:1", + }, + { + "name": "streamlink", + "command": "streamlink", + "parameters": "{streamUrl} best --stdout", + }, + ] + + for profile_data in system_profiles: + existing_profile = StreamProfile.objects.filter( + profile_name=profile_data["name"], + command=profile_data["command"], + parameters=profile_data["parameters"], + ).first() + + if existing_profile: + # Lock existing profile + existing_profile.locked = True + existing_profile.save() + else: + # Create a new locked profile + new_profile = StreamProfile.objects.create( + profile_name=profile_data["name"], + command=profile_data["command"], + parameters=profile_data["parameters"], + locked=True, + ) + +def reverse_migration(apps, schema_editor): + # No need to reverse changes + pass + +class Migration(migrations.Migration): + dependencies = [ + ('core', '0005_streamprofile_locked_alter_streamprofile_command_and_more'), + ] + + operations = [ + migrations.RunPython(lock_or_create_profiles, reverse_code=reverse_migration), + ] diff --git a/core/migrations/0007_create_proxy_and_redirect_stream_profiles.py b/core/migrations/0007_create_proxy_and_redirect_stream_profiles.py new file mode 100644 index 00000000..cf10496a --- /dev/null +++ b/core/migrations/0007_create_proxy_and_redirect_stream_profiles.py @@ -0,0 +1,32 @@ +# Generated by Django 5.1.6 on 2025-03-14 17:16 + +from django.db import migrations + +def create_proxy_stream_profile(apps, schema_editor): + StreamProfile = apps.get_model("core", "StreamProfile") + StreamProfile.objects.create( + profile_name="Proxy", + command="", + parameters="", + locked=True, + is_active=True, + user_agent="1", + ) + + StreamProfile.objects.create( + profile_name="Redirect", + command="", + parameters="", + locked=True, + is_active=True, + user_agent="1", + ) + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0006_set_locked_stream_profiles'), + ] + + operations = [ + ] diff --git a/core/models.py b/core/models.py index 2309fdf4..cae92f46 100644 --- a/core/models.py +++ b/core/models.py @@ -1,5 +1,6 @@ # core/models.py from django.db import models +from django.utils.text import slugify class UserAgent(models.Model): user_agent_name = models.CharField( @@ -27,35 +28,111 @@ class UserAgent(models.Model): def __str__(self): return self.user_agent_name +PROXY_PROFILE = 'Proxy' +REDIRECT_PROFILE = 'Redirect' + class StreamProfile(models.Model): profile_name = models.CharField(max_length=255, help_text="Name of the stream profile") command = models.CharField( max_length=255, - help_text="Command to execute (e.g., 'yt.sh', 'streamlink', or 'vlc')" + help_text="Command to execute (e.g., 'yt.sh', 'streamlink', or 'vlc')", + blank=True ) parameters = models.TextField( - help_text="Command-line parameters. Use {userAgent} and {streamUrl} as placeholders." + help_text="Command-line parameters. Use {userAgent} and {streamUrl} as placeholders.", + blank=True + ) + locked = models.BooleanField( + default=False, + help_text="Protected - can't be deleted or modified" ) is_active = models.BooleanField(default=True, help_text="Whether this profile is active") - user_agent = models.CharField( - max_length=512, - blank=True, + user_agent = models.ForeignKey( + "UserAgent", + on_delete=models.SET_NULL, null=True, + blank=True, help_text="Optional user agent to use. If not set, you can fall back to a default." ) def __str__(self): return self.profile_name - def build_command(self, stream_url): - cmd = [] - if self.command == "ffmpeg": - cmd = ["ffmpeg", "-i", stream_url] + self.parameters.split() + ["pipe:1"] - elif self.command == "streamlink": - cmd = ["streamlink", stream_url] + self.parameters.split() + def delete(self): + if self.locked(): + raise ValueError("This profile is locked and cannot be deleted.") + + self.delete() + + def save(self, *args, **kwargs): + if self.pk: # Only check existing records + orig = StreamProfile.objects.get(pk=self.pk) + if orig.is_protected: + allowed_fields = {"user_agent_id"} # Only allow this field to change + for field in self._meta.fields: + field_name = field.name + + # Convert user_agent to user_agent_id for comparison + orig_value = getattr(orig, field_name) + new_value = getattr(self, field_name) + + # Ensure that ForeignKey fields compare their ID values + if isinstance(orig_value, models.Model): + orig_value = orig_value.pk + if isinstance(new_value, models.Model): + new_value = new_value.pk + + if field_name not in allowed_fields and orig_value != new_value: + raise ValidationError(f"Cannot modify {field_name} on a protected profile.") + + super().save(*args, **kwargs) + + @classmethod + def update(cls, pk, **kwargs): + instance = cls.objects.get(pk=pk) + + if instance.is_protected: + allowed_fields = {"user_agent_id"} # Only allow updating this field + + for field_name, new_value in kwargs.items(): + if field_name not in allowed_fields: + raise ValidationError(f"Cannot modify {field_name} on a protected profile.") + + # Ensure user_agent ForeignKey updates correctly + if field_name == "user_agent" and isinstance(new_value, cls._meta.get_field("user_agent").related_model): + new_value = new_value.pk # Convert object to ID if needed + + setattr(instance, field_name, new_value) + + instance.save() + return instance + + def is_proxy(self): + if self.locked and self.name == PROXY_PROFILE: + return True + return False + + def is_redirect(self): + if self.locked and self.name == REDIRECT_PROFILE: + return True + return False + + def build_command(self, stream_url, user_agent): + if self.is_proxy(): + return [] + + replacements = { + "{streamUrl}": stream_url, + "{userAgent}": user_agent, + } + + cmd = [self.command] + [replacements.get(part, part) for part in self.parameters.split()] return cmd +DEFAULT_USER_AGENT_KEY= slugify("Default User-Agent") +DEFAULT_STREAM_PROFILE_KEY = slugify("Default Stream Profile") + class CoreSettings(models.Model): key = models.CharField( max_length=255, @@ -70,3 +147,14 @@ class CoreSettings(models.Model): def __str__(self): return "Core Settings" + + @classmethod + def get_default_user_agent(cls): + """Retrieve a system profile by name (or return None if not found).""" + default_ua_id = cls.objects.get(key=DEFAULT_USER_AGENT_KEY).value + return UserAgent.objects.get(id=default_ua_id) + + @classmethod + def get_default_stream_profile(cls): + default_sp_id = cls.objects.get(key=DEFAULT_STREAM_PROFILE_KEY).value + return StreamProfile.objects.get(id=default_sp_id) diff --git a/core/utils.py b/core/utils.py index 07a51ff8..149fcc8f 100644 --- a/core/utils.py +++ b/core/utils.py @@ -3,7 +3,7 @@ from django.conf import settings # Global Redis connection (Singleton) redis_client = redis.Redis( - host=getattr(settings, "REDIS_HOST", "localhost"), + host=settings.REDIS_HOST, port=6379, - db=int(getattr(settings, "REDIS_DB", "0")) + db=settings.REDIS_DB )