diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml new file mode 100644 index 00000000..46f7ee8b --- /dev/null +++ b/.github/workflows/docker-build.yml @@ -0,0 +1,55 @@ +name: Build and Push Multi-Arch Docker Image + +on: + push: + branches: + - main + - dev + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Determine image tag and branch + id: set-tag-branch + run: | + if [[ "${{ github.ref }}" == "refs/heads/main" ]]; then + echo "TAG=latest" >> $GITHUB_ENV + echo "BRANCH=main" >> $GITHUB_ENV + elif [[ "${{ github.ref }}" == "refs/heads/dev" ]]; then + echo "TAG=dev" >> $GITHUB_ENV + echo "BRANCH=dev" >> $GITHUB_ENV + fi + + - name: Convert repository name to lowercase + run: echo "REPO_NAME=$(echo '${{ github.repository }}' | tr '[:upper:]' '[:lower:]')" >> $GITHUB_ENV + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: docker + file: docker/Dockerfile + push: true + platforms: linux/amd64,linux/arm64 + build-args: | + BRANCH=${{ env.BRANCH }} + tags: | + ghcr.io/${{ env.REPO_NAME }}:${{ env.TAG }} + ghcr.io/${{ env.REPO_NAME }}:${{ github.sha }} diff --git a/apps/channels/admin.py b/apps/channels/admin.py index dd7e09a1..302811af 100644 --- a/apps/channels/admin.py +++ b/apps/channels/admin.py @@ -6,13 +6,16 @@ class StreamAdmin(admin.ModelAdmin): list_display = ( 'id', # Primary Key 'name', - 'group_name', + 'channel_group', 'url', 'current_viewers', 'updated_at', ) - list_filter = ('group_name',) - search_fields = ('id', 'name', 'url', 'group_name') # Added 'id' for searching by ID + + list_filter = ('channel_group',) # Filter by 'channel_group' (foreign key) + + search_fields = ('id', 'name', 'url', 'channel_group__name') # Search by 'ChannelGroup' name + ordering = ('-updated_at',) @admin.register(Channel) diff --git a/apps/channels/api_views.py b/apps/channels/api_views.py index 9744f7f0..579d9dfa 100644 --- a/apps/channels/api_views.py +++ b/apps/channels/api_views.py @@ -23,14 +23,14 @@ class StreamPagination(PageNumberPagination): class StreamFilter(django_filters.FilterSet): name = django_filters.CharFilter(lookup_expr='icontains') - group_name = django_filters.CharFilter(lookup_expr='icontains') + channel_group_name = django_filters.CharFilter(field_name="channel_group__name", lookup_expr="icontains") m3u_account = django_filters.NumberFilter(field_name="m3u_account__id") m3u_account_name = django_filters.CharFilter(field_name="m3u_account__name", lookup_expr="icontains") m3u_account_is_active = django_filters.BooleanFilter(field_name="m3u_account__is_active") class Meta: model = Stream - fields = ['name', 'group_name', 'm3u_account', 'm3u_account_name', 'm3u_account_is_active'] + fields = ['name', 'channel_group_name', 'm3u_account', 'm3u_account_name', 'm3u_account_is_active'] # ───────────────────────────────────────────────────────── # 1) Stream API (CRUD) @@ -43,20 +43,27 @@ class StreamViewSet(viewsets.ModelViewSet): filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter] filterset_class = StreamFilter - search_fields = ['name', 'group_name'] - ordering_fields = ['name', 'group_name'] + search_fields = ['name', 'channel_group__name'] + ordering_fields = ['name', 'channel_group__name'] ordering = ['-name'] def get_queryset(self): qs = super().get_queryset() # Exclude streams from inactive M3U accounts qs = qs.exclude(m3u_account__is_active=False) + assigned = self.request.query_params.get('assigned') if assigned is not None: qs = qs.filter(channels__id=assigned) + unassigned = self.request.query_params.get('unassigned') if unassigned == '1': qs = qs.filter(channels__isnull=True) + + channel_group = self.request.query_params.get('channel_group') + if channel_group: + qs = qs.filter(channel_group__name=channel_group) + return qs @action(detail=False, methods=['get'], url_path='ids') @@ -75,7 +82,8 @@ class StreamViewSet(viewsets.ModelViewSet): @action(detail=False, methods=['get'], url_path='groups') def get_groups(self, request, *args, **kwargs): - group_names = Stream.objects.exclude(group_name__isnull=True).exclude(group_name="").order_by().values_list('group_name', flat=True).distinct() + # Get unique ChannelGroup names that are linked to streams + group_names = ChannelGroup.objects.filter(streams__isnull=False).order_by('name').values_list('name', flat=True).distinct() # Return the response with the list of unique group names return Response(list(group_names)) @@ -158,7 +166,7 @@ class ChannelViewSet(viewsets.ModelViewSet): if not stream_id: return Response({"error": "Missing stream_id"}, status=status.HTTP_400_BAD_REQUEST) stream = get_object_or_404(Stream, pk=stream_id) - channel_group, _ = ChannelGroup.objects.get_or_create(name=stream.group_name) + channel_group = stream.channel_group # Check if client provided a channel_number; if not, auto-assign one. provided_number = request.data.get('channel_number') @@ -254,7 +262,7 @@ class ChannelViewSet(viewsets.ModelViewSet): errors.append({"item": item, "error": str(e)}) continue - channel_group, _ = ChannelGroup.objects.get_or_create(name=stream.group_name) + channel_group = stream.channel_group # Determine channel number: if provided, use it (if free); else auto assign. provided_number = item.get('channel_number') diff --git a/apps/channels/forms.py b/apps/channels/forms.py index baf169af..bee073b6 100644 --- a/apps/channels/forms.py +++ b/apps/channels/forms.py @@ -42,5 +42,5 @@ class StreamForm(forms.ModelForm): 'logo_url', 'tvg_id', 'local_file', - 'group_name', + 'channel_group', ] diff --git a/apps/channels/migrations/0005_stream_channel_group_stream_last_seen_and_more.py b/apps/channels/migrations/0005_stream_channel_group_stream_last_seen_and_more.py new file mode 100644 index 00000000..61a95220 --- /dev/null +++ b/apps/channels/migrations/0005_stream_channel_group_stream_last_seen_and_more.py @@ -0,0 +1,44 @@ +# Generated by Django 5.1.6 on 2025-03-19 16:33 + +import datetime +import django.db.models.deletion +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dispatcharr_channels', '0004_stream_is_custom'), + ('m3u', '0003_create_custom_account'), + ] + + operations = [ + migrations.AddField( + model_name='stream', + name='channel_group', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='streams', to='dispatcharr_channels.channelgroup'), + ), + migrations.AddField( + model_name='stream', + name='last_seen', + field=models.DateTimeField(db_index=True, default=datetime.datetime.now), + ), + migrations.AlterField( + model_name='channel', + name='uuid', + field=models.UUIDField(db_index=True, default=uuid.uuid4, editable=False, unique=True), + ), + migrations.CreateModel( + name='ChannelGroupM3UAccount', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('enabled', models.BooleanField(default=True)), + ('channel_group', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='m3u_account', to='dispatcharr_channels.channelgroup')), + ('m3u_account', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='channel_group', to='m3u.m3uaccount')), + ], + options={ + 'unique_together': {('channel_group', 'm3u_account')}, + }, + ), + ] diff --git a/apps/channels/migrations/0006_migrate_stream_groups.py b/apps/channels/migrations/0006_migrate_stream_groups.py new file mode 100644 index 00000000..94fc8235 --- /dev/null +++ b/apps/channels/migrations/0006_migrate_stream_groups.py @@ -0,0 +1,51 @@ +# In your app's migrations folder, create a new migration file +# e.g., migrations/000X_migrate_channel_group_to_foreign_key.py + +from django.db import migrations + +def migrate_channel_group(apps, schema_editor): + Stream = apps.get_model('dispatcharr_channels', 'Stream') + ChannelGroup = apps.get_model('dispatcharr_channels', 'ChannelGroup') + ChannelGroupM3UAccount = apps.get_model('dispatcharr_channels', 'ChannelGroup') + M3UAccount = apps.get_model('m3u', 'M3UAccount') + + streams_to_update = [] + for stream in Stream.objects.all(): + # If the stream has a 'channel_group' string, try to find or create the ChannelGroup + if stream.group_name: # group_name holds the channel group string + channel_group_name = stream.group_name.strip() + + # Try to find the ChannelGroup by name + channel_group, created = ChannelGroup.objects.get_or_create(name=channel_group_name) + + # Set the foreign key to the found or newly created ChannelGroup + stream.channel_group = channel_group + + streams_to_update.append(stream) + + # If the stream has an M3U account, ensure the M3U account is linked + if stream.m3u_account: + ChannelGroupM3UAccount.objects.get_or_create( + channel_group=channel_group, + m3u_account=stream.m3u_account, + enabled=True # Or set it to whatever the default logic is + ) + + Stream.objects.bulk_update(streams_to_update, ['channel_group']) + +def reverse_migration(apps, schema_editor): + # This reverse migration would undo the changes, setting `channel_group` to `None` and clearing any relationships. + Stream = apps.get_model('yourapp', 'Stream') + for stream in Stream.objects.all(): + stream.channel_group = None + stream.save() + +class Migration(migrations.Migration): + + dependencies = [ + ('dispatcharr_channels', '0005_stream_channel_group_stream_last_seen_and_more'), + ] + + operations = [ + migrations.RunPython(migrate_channel_group, reverse_code=reverse_migration), + ] diff --git a/apps/channels/migrations/0007_remove_stream_group_name.py b/apps/channels/migrations/0007_remove_stream_group_name.py new file mode 100644 index 00000000..3d7b567a --- /dev/null +++ b/apps/channels/migrations/0007_remove_stream_group_name.py @@ -0,0 +1,17 @@ +# Generated by Django 5.1.6 on 2025-03-19 16:43 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('dispatcharr_channels', '0006_migrate_stream_groups'), + ] + + operations = [ + migrations.RemoveField( + model_name='stream', + name='group_name', + ), + ] diff --git a/apps/channels/migrations/0008_stream_stream_hash.py b/apps/channels/migrations/0008_stream_stream_hash.py new file mode 100644 index 00000000..5279dd19 --- /dev/null +++ b/apps/channels/migrations/0008_stream_stream_hash.py @@ -0,0 +1,23 @@ +# Generated by Django 5.1.6 on 2025-03-19 18:21 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dispatcharr_channels', '0007_remove_stream_group_name'), + ] + + operations = [ + migrations.AddField( + model_name='stream', + name='stream_hash', + field=models.CharField(db_index=True, help_text='Unique hash for this stream from the M3U account', max_length=255, null=True, unique=True), + ), + migrations.AlterField( + model_name='stream', + name='logo_url', + field=models.TextField(blank=True, null=True), + ), + ] diff --git a/apps/channels/models.py b/apps/channels/models.py index c501d927..ec95e309 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -3,15 +3,49 @@ from django.core.exceptions import ValidationError from core.models import StreamProfile from django.conf import settings from core.models import StreamProfile, CoreSettings -from core.utils import redis_client +from core.utils import redis_client, execute_redis_command import logging import uuid +from datetime import datetime +import hashlib +import json logger = logging.getLogger(__name__) # If you have an M3UAccount model in apps.m3u, you can still import it: from apps.m3u.models import M3UAccount +# Add fallback functions if Redis isn't available +def get_total_viewers(channel_id): + """Get viewer count from Redis or return 0 if Redis isn't available""" + if redis_client is None: + return 0 + + try: + return int(redis_client.get(f"channel:{channel_id}:viewers") or 0) + except Exception: + return 0 + +class ChannelGroup(models.Model): + name = models.CharField(max_length=100, unique=True) + + def related_channels(self): + # local import if needed to avoid cyc. Usually fine in a single file though + return Channel.objects.filter(channel_group=self) + + def __str__(self): + return self.name + + @classmethod + def bulk_create_and_fetch(cls, objects): + # Perform the bulk create operation + cls.objects.bulk_create(objects) + + # Use a unique field to fetch the created objects (assuming 'name' is unique) + created_objects = cls.objects.filter(name__in=[obj.name for obj in objects]) + + return created_objects + class Stream(models.Model): """ Represents a single stream (e.g. from an M3U source or custom URL). @@ -25,12 +59,18 @@ class Stream(models.Model): blank=True, related_name="streams", ) - logo_url = models.URLField(max_length=2000, blank=True, null=True) + logo_url = models.TextField(blank=True, null=True) tvg_id = models.CharField(max_length=255, blank=True, null=True) local_file = models.FileField(upload_to='uploads/', blank=True, null=True) current_viewers = models.PositiveIntegerField(default=0) updated_at = models.DateTimeField(auto_now=True) - group_name = models.CharField(max_length=255, blank=True, null=True) + channel_group = models.ForeignKey( + ChannelGroup, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name='streams' + ) stream_profile = models.ForeignKey( StreamProfile, null=True, @@ -42,6 +82,14 @@ class Stream(models.Model): default=False, help_text="Whether this is a user-created stream or from an M3U account" ) + stream_hash = models.CharField( + max_length=255, + null=True, + unique=True, + help_text="Unique hash for this stream from the M3U account", + db_index=True, + ) + last_seen = models.DateTimeField(db_index=True, default=datetime.now) class Meta: # If you use m3u_account, you might do unique_together = ('name','url','m3u_account') @@ -52,6 +100,37 @@ class Stream(models.Model): def __str__(self): return self.name or self.url or f"Stream ID {self.id}" + @classmethod + def generate_hash_key(cls, name, url, tvg_id, keys=None): + if keys is None: + keys = CoreSettings.get_m3u_hash_key().split(",") + + stream_parts = { + "name": name, "url": url, "tvg_id": tvg_id + } + + hash_parts = {key: stream_parts[key] for key in keys if key in stream_parts} + + # Serialize and hash the dictionary + serialized_obj = json.dumps(hash_parts, sort_keys=True) # sort_keys ensures consistent ordering + hash_object = hashlib.sha256(serialized_obj.encode()) + return hash_object.hexdigest() + + @classmethod + def update_or_create_by_hash(cls, hash_value, **fields_to_update): + try: + # Try to find the Stream object with the given hash + stream = cls.objects.get(stream_hash=hash_value) + # If it exists, update the fields + for field, value in fields_to_update.items(): + setattr(stream, field, value) + stream.save() # Save the updated object + return stream, False # False means it was updated, not created + except cls.DoesNotExist: + # If it doesn't exist, create a new object with the given hash + fields_to_update['stream_hash'] = hash_value # Make sure the hash field is set + stream = cls.objects.create(**fields_to_update) + return stream, True # True means it was created class ChannelManager(models.Manager): def active(self): @@ -95,7 +174,7 @@ class Channel(models.Model): related_name='channels' ) - uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True) + uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True, db_index=True) def clean(self): # Enforce unique channel_number within a given group @@ -198,16 +277,6 @@ class Channel(models.Model): if current_count > 0: redis_client.decr(profile_connections_key) -class ChannelGroup(models.Model): - name = models.CharField(max_length=100, unique=True) - - def related_channels(self): - # local import if needed to avoid cyc. Usually fine in a single file though - return Channel.objects.filter(channel_group=self) - - def __str__(self): - return self.name - class ChannelStream(models.Model): channel = models.ForeignKey(Channel, on_delete=models.CASCADE) stream = models.ForeignKey(Stream, on_delete=models.CASCADE) @@ -215,3 +284,22 @@ class ChannelStream(models.Model): class Meta: ordering = ['order'] # Ensure streams are retrieved in order + +class ChannelGroupM3UAccount(models.Model): + channel_group = models.ForeignKey( + ChannelGroup, + on_delete=models.CASCADE, + related_name='m3u_account' + ) + m3u_account = models.ForeignKey( + M3UAccount, + on_delete=models.CASCADE, + related_name='channel_group' + ) + enabled = models.BooleanField(default=True) + + class Meta: + unique_together = ('channel_group', 'm3u_account') + + def __str__(self): + return f"{self.channel_group.name} - {self.m3u_account.name} (Enabled: {self.enabled})" diff --git a/apps/channels/serializers.py b/apps/channels/serializers.py index b1f7e022..a075297d 100644 --- a/apps/channels/serializers.py +++ b/apps/channels/serializers.py @@ -1,5 +1,5 @@ from rest_framework import serializers -from .models import Stream, Channel, ChannelGroup, ChannelStream +from .models import Stream, Channel, ChannelGroup, ChannelStream, ChannelGroupM3UAccount from core.models import StreamProfile # @@ -26,9 +26,9 @@ class StreamSerializer(serializers.ModelSerializer): 'local_file', 'current_viewers', 'updated_at', - 'group_name', 'stream_profile_id', 'is_custom', + 'channel_group', ] def get_fields(self): @@ -41,7 +41,7 @@ class StreamSerializer(serializers.ModelSerializer): fields['url'].read_only = True fields['m3u_account'].read_only = True fields['tvg_id'].read_only = True - fields['group_name'].read_only = True + fields['channel_group'].read_only = True return fields @@ -146,3 +146,13 @@ class ChannelSerializer(serializers.ModelSerializer): ChannelStream.objects.create(channel=instance, stream_id=stream.id, order=index) return instance + +class ChannelGroupM3UAccountSerializer(serializers.ModelSerializer): + enabled = serializers.BooleanField() + + class Meta: + model = ChannelGroupM3UAccount + fields = ['id', 'channel_group', 'enabled'] + + # Optionally, if you only need the id of the ChannelGroup, you can customize it like this: + # channel_group = serializers.PrimaryKeyRelatedField(queryset=ChannelGroup.objects.all()) diff --git a/apps/channels/views.py b/apps/channels/views.py index b28cc123..8af3fa30 100644 --- a/apps/channels/views.py +++ b/apps/channels/views.py @@ -16,7 +16,7 @@ class StreamDashboardView(View): def get(self, request, *args, **kwargs): streams = Stream.objects.values( 'id', 'name', 'url', - 'group_name', 'current_viewers' + 'channel_group', 'current_viewers' ) return JsonResponse({'data': list(streams)}, safe=False) diff --git a/apps/epg/models.py b/apps/epg/models.py index 206791a6..305d30ed 100644 --- a/apps/epg/models.py +++ b/apps/epg/models.py @@ -19,7 +19,7 @@ class EPGSource(models.Model): class EPGData(models.Model): # Removed the Channel foreign key. We now just store the original tvg_id # and a name (which might simply be the tvg_id if no real channel exists). - tvg_id = models.CharField(max_length=255, null=True, blank=True) + tvg_id = models.CharField(max_length=255, null=True, blank=True, unique=True) name = models.CharField(max_length=255) def __str__(self): diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index d7109a9a..9b9fa49f 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -110,6 +110,8 @@ def parse_channels_only(file_path): epg_obj.save() logger.debug(f"Channel <{tvg_id}> => EPGData.id={epg_obj.id}, created={created}") + parse_programs_for_tvg_id(file_path, tvg_id) + logger.info("Finished parsing channel info.") diff --git a/apps/m3u/api_views.py b/apps/m3u/api_views.py index df9c13fc..508b6a89 100644 --- a/apps/m3u/api_views.py +++ b/apps/m3u/api_views.py @@ -11,6 +11,7 @@ from django.core.cache import cache # Import all models, including UserAgent. from .models import M3UAccount, M3UFilter, ServerGroup, M3UAccountProfile from core.models import UserAgent +from apps.channels.models import ChannelGroupM3UAccount from core.serializers import UserAgentSerializer # Import all serializers, including the UserAgentSerializer. from .serializers import ( @@ -24,10 +25,43 @@ from .tasks import refresh_single_m3u_account, refresh_m3u_accounts class M3UAccountViewSet(viewsets.ModelViewSet): """Handles CRUD operations for M3U accounts""" - queryset = M3UAccount.objects.all() + queryset = M3UAccount.objects.prefetch_related('channel_group') serializer_class = M3UAccountSerializer permission_classes = [IsAuthenticated] + def update(self, request, *args, **kwargs): + # Get the M3UAccount instance we're updating + instance = self.get_object() + + # Handle updates to the 'enabled' flag of the related ChannelGroupM3UAccount instances + updates = request.data.get('channel_groups', []) + + for update_data in updates: + channel_group_id = update_data.get('channel_group') + enabled = update_data.get('enabled') + + try: + # Get the specific relationship to update + relationship = ChannelGroupM3UAccount.objects.get( + m3u_account=instance, channel_group_id=channel_group_id + ) + relationship.enabled = enabled + relationship.save() + except ChannelGroupM3UAccount.DoesNotExist: + return Response( + {"error": "ChannelGroupM3UAccount not found for the given M3UAccount and ChannelGroup."}, + status=status.HTTP_400_BAD_REQUEST + ) + + # After updating the ChannelGroupM3UAccount relationships, reload the M3UAccount instance + instance.refresh_from_db() + + refresh_single_m3u_account.delay(instance.id) + + # Serialize and return the updated M3UAccount data + serializer = self.get_serializer(instance) + return Response(serializer.data) + class M3UFilterViewSet(viewsets.ModelViewSet): """Handles CRUD operations for M3U filters""" queryset = M3UFilter.objects.all() diff --git a/apps/m3u/migrations/0004_m3uaccount_stream_profile.py b/apps/m3u/migrations/0004_m3uaccount_stream_profile.py new file mode 100644 index 00000000..65f69fcd --- /dev/null +++ b/apps/m3u/migrations/0004_m3uaccount_stream_profile.py @@ -0,0 +1,20 @@ +# Generated by Django 5.1.6 on 2025-03-19 16:33 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0009_m3u_hash_settings'), + ('m3u', '0003_create_custom_account'), + ] + + operations = [ + migrations.AddField( + model_name='m3uaccount', + name='stream_profile', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='m3u_accounts', to='core.streamprofile'), + ), + ] diff --git a/apps/m3u/models.py b/apps/m3u/models.py index 2b3a3020..773261df 100644 --- a/apps/m3u/models.py +++ b/apps/m3u/models.py @@ -3,6 +3,7 @@ from django.core.exceptions import ValidationError from core.models import UserAgent import re from django.dispatch import receiver +from apps.channels.models import StreamProfile CUSTOM_M3U_ACCOUNT_NAME="custom" @@ -59,6 +60,13 @@ class M3UAccount(models.Model): default=False, help_text="Protected - can't be deleted or modified" ) + stream_profile = models.ForeignKey( + StreamProfile, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name='m3u_accounts' + ) def __str__(self): return self.name @@ -86,6 +94,16 @@ class M3UAccount(models.Model): def get_custom_account(cls): return cls.objects.get(name=CUSTOM_M3U_ACCOUNT_NAME, locked=True) + # def get_channel_groups(self): + # return ChannelGroup.objects.filter(m3u_account__m3u_account=self) + + # def is_channel_group_enabled(self, channel_group): + # """Check if the specified ChannelGroup is enabled for this M3UAccount.""" + # return self.channel_group.filter(channel_group=channel_group, enabled=True).exists() + + # def get_enabled_streams(self): + # """Return all streams linked to this account with enabled ChannelGroups.""" + # return self.streams.filter(channel_group__in=ChannelGroup.objects.filter(m3u_account__enabled=True)) class M3UFilter(models.Model): """Defines filters for M3U accounts based on stream name or group title.""" diff --git a/apps/m3u/serializers.py b/apps/m3u/serializers.py index 391794a1..3946aac5 100644 --- a/apps/m3u/serializers.py +++ b/apps/m3u/serializers.py @@ -1,13 +1,19 @@ from rest_framework import serializers from .models import M3UAccount, M3UFilter, ServerGroup, M3UAccountProfile from core.models import UserAgent +from apps.channels.models import ChannelGroup, ChannelGroupM3UAccount +from apps.channels.serializers import ChannelGroupM3UAccountSerializer, ChannelGroupSerializer +import logging + +logger = logging.getLogger(__name__) class M3UFilterSerializer(serializers.ModelSerializer): """Serializer for M3U Filters""" + channel_groups = ChannelGroupM3UAccountSerializer(source='m3u_account', many=True) class Meta: model = M3UFilter - fields = ['id', 'filter_type', 'regex_pattern', 'exclude'] + fields = ['id', 'filter_type', 'regex_pattern', 'exclude', 'channel_groups'] from rest_framework import serializers from .models import M3UAccountProfile @@ -37,14 +43,74 @@ class M3UAccountSerializer(serializers.ModelSerializer): ) profiles = M3UAccountProfileSerializer(many=True, read_only=True) read_only_fields = ['locked'] + # channel_groups = serializers.SerializerMethodField() + channel_groups = ChannelGroupM3UAccountSerializer(source='channel_group.all', many=True, required=False) + class Meta: model = M3UAccount fields = [ 'id', 'name', 'server_url', 'uploaded_file', 'server_group', - 'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked' + 'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked', + 'channel_groups', ] + # def get_channel_groups(self, obj): + # # Retrieve related ChannelGroupM3UAccount records for this M3UAccount + # relations = ChannelGroupM3UAccount.objects.filter(m3u_account=obj).select_related('channel_group') + + # # Serialize the channel groups with their enabled status + # return [ + # { + # 'channel_group_name': relation.channel_group.name, + # 'channel_group_id': relation.channel_group.id, + # 'enabled': relation.enabled, + # } + # for relation in relations + # ] + + # def to_representation(self, instance): + # """Override the default to_representation method to include channel_groups""" + # representation = super().to_representation(instance) + + # # Manually add the channel_groups to the representation + # channel_groups = ChannelGroupM3UAccount.objects.filter(m3u_account=instance).select_related('channel_group') + # representation['channel_groups'] = [ + # { + # 'id': relation.id, + # 'channel_group_name': relation.channel_group.name, + # 'channel_group_id': relation.channel_group.id, + # 'enabled': relation.enabled, + # } + # for relation in channel_groups + # ] + + # return representation + + # def update(self, instance, validated_data): + # logger.info(validated_data) + # channel_groups_data = validated_data.pop('channel_groups', None) + # instance = super().update(instance, validated_data) + + # if channel_groups_data is not None: + # logger.info(json.dumps(channel_groups_data)) + # # Remove existing relationships not included in the request + # existing_groups = {cg.channel_group_id: cg for cg in instance.channel_group.all()} + + # # for group_id in set(existing_groups.keys()) - sent_group_ids: + # # existing_groups[group_id].delete() + + # # Create or update relationships + # for cg_data in channel_groups_data: + # logger.info(json.dumps(cg_data)) + # ChannelGroupM3UAccount.objects.update_or_create( + # channel_group=existing_groups[cg_data['channel_group_id']], + # m3u_account=instance, + # defaults={'enabled': cg_data.get('enabled', True)} + # ) + + # return instance + class ServerGroupSerializer(serializers.ModelSerializer): """Serializer for Server Group""" diff --git a/apps/m3u/signals.py b/apps/m3u/signals.py index 8b6179ca..c2abc4c8 100644 --- a/apps/m3u/signals.py +++ b/apps/m3u/signals.py @@ -2,7 +2,7 @@ from django.db.models.signals import post_save from django.dispatch import receiver from .models import M3UAccount -from .tasks import refresh_single_m3u_account +from .tasks import refresh_single_m3u_account, refresh_m3u_groups @receiver(post_save, sender=M3UAccount) def refresh_account_on_save(sender, instance, created, **kwargs): @@ -11,5 +11,5 @@ def refresh_account_on_save(sender, instance, created, **kwargs): call a Celery task that fetches & parses that single account if it is active or newly created. """ - if created or instance.is_active: - refresh_single_m3u_account.delay(instance.id) + if created: + refresh_m3u_groups(instance.id) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 068626b2..e0995543 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -4,17 +4,63 @@ import re import requests import os from celery.app.control import Inspect -from celery import shared_task, current_app +from celery.result import AsyncResult +from celery import shared_task, current_app, group from django.conf import settings from django.core.cache import cache from .models import M3UAccount -from apps.channels.models import Stream +from apps.channels.models import Stream, ChannelGroup, ChannelGroupM3UAccount from asgiref.sync import async_to_sync from channels.layers import get_channel_layer +from django.utils import timezone +import time +from channels.layers import get_channel_layer +import json +from core.utils import redis_client +from core.models import CoreSettings +from asgiref.sync import async_to_sync logger = logging.getLogger(__name__) -LOCK_EXPIRE = 120 # Lock expires after 120 seconds +LOCK_EXPIRE = 300 +BATCH_SIZE = 1000 +SKIP_EXTS = {} +m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u") + +def fetch_m3u_lines(account, use_cache=False): + os.makedirs(m3u_dir, exist_ok=True) + file_path = os.path.join(m3u_dir, f"{account.id}.m3u") + + """Fetch M3U file lines efficiently.""" + if account.server_url: + if not use_cache or not os.path.exists(file_path): + headers = {"User-Agent": account.user_agent.user_agent} + logger.info(f"Fetching from URL {account.server_url}") + try: + response = requests.get(account.server_url, headers=headers, stream=True) + response.raise_for_status() # This will raise an HTTPError if the status is not 200 + with open(file_path, 'wb') as file: + # Stream the content in chunks and write to the file + for chunk in response.iter_content(chunk_size=8192): # You can adjust the chunk size + if chunk: # Ensure chunk is not empty + file.write(chunk) + except requests.exceptions.RequestException as e: + logger.error(f"Error fetching M3U from URL {account.server_url}: {e}") + return [] # Return an empty list in case of error + + with open(file_path, 'r', encoding='utf-8') as f: + return f.readlines() + elif account.uploaded_file: + try: + # Open the file and return the lines as a list or iterator + with open(account.uploaded_file.path, 'r', encoding='utf-8') as f: + return f.readlines() # Ensure you return lines from the file, not the file object + except IOError as e: + logger.error(f"Error opening file {account.uploaded_file.path}: {e}") + return [] # Return an empty list in case of error + + # Return an empty list if neither server_url nor uploaded_file is available + return [] def parse_extinf_line(line: str) -> dict: """ @@ -35,7 +81,7 @@ def parse_extinf_line(line: str) -> dict: if len(parts) != 2: return None attributes_part, display_name = parts[0], parts[1].strip() - attrs = dict(re.findall(r'(\w+)=["\']([^"\']+)["\']', attributes_part)) + attrs = dict(re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part)) # Use tvg-name attribute if available; otherwise, use the display name. name = attrs.get('tvg-name', display_name) return { @@ -44,20 +90,18 @@ def parse_extinf_line(line: str) -> dict: 'name': name } -def _get_group_title(extinf_line: str) -> str: - """Extract group title from EXTINF line.""" - match = re.search(r'group-title="([^"]*)"', extinf_line) - return match.group(1) if match else "Default Group" +import re +import logging -def _matches_filters(stream_name: str, group_name: str, filters) -> bool: - logger.info("Testing filter") - for f in filters: - pattern = f.regex_pattern +logger = logging.getLogger(__name__) + +def _matches_filters(stream_name: str, group_name: str, filters): + """Check if a stream or group name matches a precompiled regex filter.""" + compiled_filters = [(re.compile(f.regex_pattern, re.IGNORECASE), f.exclude) for f in filters] + for pattern, exclude in compiled_filters: target = group_name if f.filter_type == 'group' else stream_name - logger.info(f"Testing {pattern} on: {target}") - if re.search(pattern, target or '', re.IGNORECASE): - logger.debug(f"Filter matched: {pattern} on {target}. Exclude={f.exclude}") - return f.exclude + if pattern.search(target or ''): + return exclude return False def acquire_lock(task_name, account_id): @@ -86,198 +130,279 @@ def refresh_m3u_accounts(): logger.info(msg) return msg -@shared_task -def refresh_single_m3u_account(account_id): - logger.info(f"Task {refresh_single_m3u_account.request.id}: Starting refresh for account_id={account_id}") +def check_field_lengths(streams_to_create): + for stream in streams_to_create: + for field, value in stream.__dict__.items(): + if isinstance(value, str) and len(value) > 255: + print(f"{field} --- {value}") + print("") + print("") + +@shared_task +def process_groups(account, group_names): + existing_groups = {group.name: group for group in ChannelGroup.objects.filter(name__in=group_names)} + logger.info(f"Currently {len(existing_groups)} existing groups") + + groups = [] + groups_to_create = [] + for group_name in group_names: + logger.info(f"Handling group: {group_name}") + if group_name in existing_groups: + groups.append(existing_groups[group_name]) + else: + groups_to_create.append(ChannelGroup( + name=group_name, + )) + + if groups_to_create: + logger.info(f"Creating {len(groups_to_create)} groups") + created = ChannelGroup.bulk_create_and_fetch(groups_to_create) + logger.info(f"Created {len(created)} groups") + groups.extend(created) + + relations = [] + for group in groups: + relations.append(ChannelGroupM3UAccount( + channel_group=group, + m3u_account=account, + )) + + ChannelGroupM3UAccount.objects.bulk_create( + relations, + ignore_conflicts=True + ) + +@shared_task +def process_m3u_batch(account_id, batch, group_names, hash_keys): + """Processes a batch of M3U streams using bulk operations.""" + account = M3UAccount.objects.get(id=account_id) + existing_groups = {group.name: group for group in ChannelGroup.objects.filter( + m3u_account__m3u_account=account, # Filter by the M3UAccount + m3u_account__enabled=True # Filter by the enabled flag in the join table + )} + + streams_to_create = [] + streams_to_update = [] + stream_hashes = {} + + # compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters] + + logger.debug(f"Processing batch of {len(batch)}") + for stream_info in batch: + name, url = stream_info["name"], stream_info["url"] + tvg_id, tvg_logo = stream_info["attributes"].get("tvg-id", ""), stream_info["attributes"].get("tvg-logo", "") + group_title = stream_info["attributes"].get("group-title", "Default Group") + + # Filter out disabled groups for this account + if group_title not in existing_groups: + logger.debug(f"Skipping stream in disabled group: {group_title}") + continue + + # if any(url.lower().endswith(ext) for ext in SKIP_EXTS) or len(url) > 2000: + # continue + + # if _matches_filters(name, group_title, account.filters.all()): + # continue + + # if any(compiled_pattern.search(current_info['name']) for ftype, compiled_pattern in compiled_filters if ftype == 'name'): + # excluded_count += 1 + # current_info = None + # continue + + try: + stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) + if redis_client.exists(f"m3u_refresh:{stream_hash}"): + # duplicate already processed by another batch + continue + + redis_client.set(f"m3u_refresh:{stream_hash}", "true") + stream_props = { + "name": name, + "url": url, + "logo_url": tvg_logo, + "tvg_id": tvg_id, + "m3u_account": account, + "channel_group": existing_groups[group_title], + "stream_hash": stream_hash, + } + + if stream_hash not in stream_hashes: + stream_hashes[stream_hash] = stream_props + except Exception as e: + logger.error(f"Failed to process stream {name}: {e}") + logger.error(json.dumps(stream_info)) + + existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())} + logger.info(f"Hashed {len(stream_hashes.keys())} unique streams") + + for stream_hash, stream_props in stream_hashes.items(): + if stream_hash in existing_streams: + obj = existing_streams[stream_hash] + changed = False + for key, value in stream_props.items(): + if getattr(obj, key) == value: + continue + changed = True + setattr(obj, key, value) + + obj.last_seen = timezone.now() + if changed: + streams_to_update.append(obj) + del existing_streams[stream_hash] + else: + existing_streams[stream_hash] = obj + else: + stream_props["last_seen"] = timezone.now() + streams_to_create.append(Stream(**stream_props)) + + try: + if streams_to_create: + Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True) + if streams_to_update: + Stream.objects.bulk_update(streams_to_update, stream_props.keys()) + if len(existing_streams.keys()) > 0: + Stream.objects.bulk_update(existing_streams.values(), ["last_seen"]) + except Exception as e: + logger.error(f"Bulk create failed: {str(e)}") + + + return f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." + +def refresh_m3u_groups(account_id): + if not acquire_lock('refresh_m3u_account_groups', account_id): + return f"Task already running for account_id={account_id}." + + # Record start time + start_time = time.time() + send_progress_update(0, account_id) + + try: + account = M3UAccount.objects.get(id=account_id, is_active=True) + except M3UAccount.DoesNotExist: + release_lock('refresh_m3u_account_groups', account_id) + return f"M3UAccount with ID={account_id} not found or inactive." + + lines = fetch_m3u_lines(account) + extinf_data = [] + groups = set(["Default Group"]) + + for line in lines: + line = line.strip() + if line.startswith("#EXTINF"): + parsed = parse_extinf_line(line) + if parsed: + if "group-title" in parsed["attributes"]: + groups.add(parsed["attributes"]["group-title"]) + + extinf_data.append(parsed) + elif extinf_data and line.startswith("http"): + # Associate URL with the last EXTINF line + extinf_data[-1]["url"] = line + + groups = list(groups) + cache_path = os.path.join(m3u_dir, f"{account_id}.json") + with open(cache_path, 'w', encoding='utf-8') as f: + json.dump({ + "extinf_data": extinf_data, + "groups": groups, + }, f) + + process_groups(account, groups) + + release_lock('refresh_m3u_account_groups`', account_id) + + return extinf_data, groups + +@shared_task +def refresh_single_m3u_account(account_id, use_cache=False): + """Splits M3U processing into chunks and dispatches them as parallel tasks.""" if not acquire_lock('refresh_single_m3u_account', account_id): return f"Task already running for account_id={account_id}." + # Record start time + start_time = time.time() + send_progress_update(0, account_id) + try: account = M3UAccount.objects.get(id=account_id, is_active=True) filters = list(account.filters.all()) - logger.info(f"Found active M3UAccount (id={account.id}, name={account.name}).") except M3UAccount.DoesNotExist: - msg = f"M3UAccount with ID={account_id} not found or inactive." - logger.warning(msg) release_lock('refresh_single_m3u_account', account_id) - return msg - except Exception as e: - logger.error(f"Error fetching M3UAccount {account_id}: {e}") - release_lock('refresh_single_m3u_account', account_id) - return str(e) + return f"M3UAccount with ID={account_id} not found or inactive." - try: - lines = [] - if account.server_url: - if not account.user_agent: - err_msg = f"User-Agent not provided for account id {account_id}." - logger.error(err_msg) - release_lock('refresh_single_m3u_account', account_id) - return err_msg + # Fetch M3U lines and handle potential issues + # lines = fetch_m3u_lines(account) # Extracted fetch logic into separate function + extinf_data = [] + groups = None - headers = {"User-Agent": account.user_agent.user_agent} - response = requests.get(account.server_url, headers=headers) - response.raise_for_status() - lines = response.text.splitlines() - elif account.uploaded_file: - file_path = account.uploaded_file.path - with open(file_path, 'r', encoding='utf-8') as f: - lines = f.read().splitlines() - else: - err_msg = f"No server_url or uploaded_file provided for account_id={account_id}." - logger.error(err_msg) - release_lock('refresh_single_m3u_account', account_id) - return err_msg - except Exception as e: - err_msg = f"Failed fetching M3U: {e}" - logger.error(err_msg) - release_lock('refresh_single_m3u_account', account_id) - return err_msg + cache_path = os.path.join(m3u_dir, f"{account_id}.json") + if use_cache and os.path.exists(cache_path): + with open(cache_path, 'r') as file: + data = json.load(file) - logger.info(f"M3U has {len(lines)} lines. Now parsing for Streams.") - skip_exts = ('.mkv', '.mp4', '.m4v', '.wav', '.avi', '.flv', '.m4p', '.mpg', - '.mpeg', '.m2v', '.mp2', '.mpe', '.mpv') + extinf_data = data['extinf_data'] + groups = data['groups'] - created_count, updated_count, excluded_count = 0, 0, 0 - current_info = None + if not extinf_data: + extinf_data, groups = refresh_m3u_groups(account_id) - for line in lines: - line = line.strip() - if line.startswith('#EXTINF'): - extinf = parse_extinf_line(line) - if not extinf: - continue - name = extinf['name'] - tvg_id = extinf['attributes'].get('tvg-id', '') - tvg_logo = extinf['attributes'].get('tvg-logo', '') - # Prefer group-title from attributes if available. - group_title = extinf['attributes'].get('group-title', _get_group_title(line)) - logger.debug(f"Parsed EXTINF: name={name}, logo_url={tvg_logo}, tvg_id={tvg_id}, group_title={group_title}") - current_info = { - "name": name, - "logo_url": tvg_logo, - "group_title": group_title, - "tvg_id": tvg_id, - } - elif current_info and line.startswith('http'): - lower_line = line.lower() - if any(lower_line.endswith(ext) for ext in skip_exts): - logger.debug(f"Skipping file with unsupported extension: {line}") - current_info = None - continue + hash_keys = CoreSettings.get_m3u_hash_key().split(",") - if len(line) > 2000: - logger.warning(f"Stream URL too long, skipping: {line}") - excluded_count += 1 - current_info = None - continue + # Break into batches and process in parallel + batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)] + task_group = group(process_m3u_batch.s(account_id, batch, groups, hash_keys) for batch in batches) - if _matches_filters(current_info['name'], current_info['group_title'], filters): - logger.info(f"Stream excluded by filter: {current_info['name']} in group {current_info['group_title']}") - excluded_count += 1 - current_info = None - continue + total_batches = len(batches) + completed_batches = 0 + logger.debug(f"Dispatched {len(batches)} parallel tasks for account_id={account_id}.") - defaults = { - "logo_url": current_info["logo_url"], - "tvg_id": current_info["tvg_id"] - } - try: - obj, created = Stream.objects.update_or_create( - name=current_info["name"], - url=line, - m3u_account=account, - group_name=current_info["group_title"], - defaults=defaults - ) - if created: - created_count += 1 - else: - updated_count += 1 - except Exception as e: - logger.error(f"Failed to update/create stream {current_info['name']}: {e}") - finally: - current_info = None + # result = task_group.apply_async() + result = task_group.apply_async() + + while completed_batches < total_batches: + for async_result in result: + if async_result.ready(): # If the task has completed + task_result = async_result.result # The result of the task + logger.debug(f"Task completed with result: {task_result}") + completed_batches += 1 + + # Calculate progress + progress = int((completed_batches / total_batches) * 100) + + # Send progress update via Channels + send_progress_update(progress, account_id) + + # Optionally remove completed task from the group to prevent processing it again + result.remove(async_result) + else: + logger.debug(f"Task is still running.") + + end_time = time.time() + + # Calculate elapsed time + elapsed_time = end_time - start_time + + print(f"Function took {elapsed_time} seconds to execute.") - logger.info(f"Completed parsing. Created {created_count} new Streams, updated {updated_count} existing Streams, excluded {excluded_count} Streams.") release_lock('refresh_single_m3u_account', account_id) + + cursor = 0 + while True: + cursor, keys = redis_client.scan(cursor, match=f"m3u_refresh:*", count=BATCH_SIZE) + if keys: + redis_client.delete(*keys) # Delete the matching keys + if cursor == 0: + break + + return f"Dispatched jobs complete." + +def send_progress_update(progress, account_id): channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( - "updates", + 'updates', { - "type": "update", - "data": {"success": True, "type": "m3u_refresh", "message": "M3U refresh completed successfully"} - }, + 'type': 'update', + "data": {"progress": progress, "type": "m3u_refresh", "account": account_id} + } ) - return f"Account {account_id} => Created {created_count}, updated {updated_count}, excluded {excluded_count} Streams." - -def process_uploaded_m3u_file(file, account): - """Save and parse an uploaded M3U file.""" - upload_dir = os.path.join(settings.MEDIA_ROOT, 'm3u_uploads') - os.makedirs(upload_dir, exist_ok=True) - file_path = os.path.join(upload_dir, file.name) - - with open(file_path, 'wb+') as destination: - for chunk in file.chunks(): - destination.write(chunk) - - try: - parse_m3u_file(file_path, account) - except Exception as e: - logger.error(f"Error parsing uploaded M3U file {file_path}: {e}") - -def parse_m3u_file(file_path, account): - """Parse a local M3U file and create or update Streams.""" - skip_exts = ('.mkv', '.mp4', '.ts', '.m4v', '.wav', '.avi', '.flv', '.m4p', '.mpg', - '.mpeg', '.m2v', '.mp2', '.mpe', '.mpv') - - try: - with open(file_path, 'r', encoding='utf-8') as f: - lines = f.read().splitlines() - except Exception as e: - logger.error(f"Failed to read M3U file {file_path}: {e}") - return f"Error reading M3U file {file_path}" - - created_count, updated_count, excluded_count = 0, 0, 0 - current_info = None - - for line in lines: - line = line.strip() - if line.startswith('#EXTINF'): - extinf = parse_extinf_line(line) - if not extinf: - continue - name = extinf['name'] - tvg_id = extinf['attributes'].get('tvg-id', '') - tvg_logo = extinf['attributes'].get('tvg-logo', '') - current_info = {"name": name, "logo_url": tvg_logo, "tvg_id": tvg_id} - elif current_info and line.startswith('http'): - lower_line = line.lower() - if any(lower_line.endswith(ext) for ext in skip_exts): - logger.info(f"Skipping file with unsupported extension: {line}") - current_info = None - continue - - defaults = { - "logo_url": current_info["logo_url"], - "tvg_id": current_info.get("tvg_id", "") - } - - try: - obj, created = Stream.objects.update_or_create( - name=current_info["name"], - url=line, - m3u_account=account, - defaults=defaults - ) - if created: - created_count += 1 - else: - updated_count += 1 - except Exception as e: - logger.error(f"Failed to update/create stream {current_info['name']}: {e}") - finally: - current_info = None - - return f"Parsed local M3U file {file_path}, created {created_count} Streams, updated {updated_count} Streams, excluded {excluded_count} Streams." diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py index cc8a1e5b..95c561f5 100644 --- a/apps/proxy/ts_proxy/channel_status.py +++ b/apps/proxy/ts_proxy/channel_status.py @@ -4,8 +4,10 @@ import re from . import proxy_server from .redis_keys import RedisKeys from .constants import TS_PACKET_SIZE +from redis.exceptions import ConnectionError, TimeoutError +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class ChannelStatus: @@ -172,76 +174,98 @@ class ChannelStatus: return info - # Function for basic channel info (used for all channels summary) - def get_basic_channel_info(channel_id): - # Get channel metadata - metadata_key = RedisKeys.channel_metadata(channel_id) - metadata = proxy_server.redis_client.hgetall(metadata_key) - - if not metadata: + @staticmethod + def _execute_redis_command(command_func): + """Execute Redis command with error handling""" + if not proxy_server.redis_client: return None - # Basic channel info only - omit diagnostics and details - buffer_index_key = RedisKeys.buffer_index(channel_id) - buffer_index_value = proxy_server.redis_client.get(buffer_index_key) + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error in ChannelStatus: {e}") + return None + except Exception as e: + logger.error(f"Redis command error in ChannelStatus: {e}") + return None - # Count clients (using efficient count method) - client_set_key = RedisKeys.clients(channel_id) - client_count = proxy_server.redis_client.scard(client_set_key) or 0 + @staticmethod + def get_basic_channel_info(channel_id): + """Get basic channel information with Redis error handling""" + try: + # Use _execute_redis_command for Redis operations + metadata_key = RedisKeys.channel_metadata(channel_id) + metadata = ChannelStatus._execute_redis_command( + lambda: proxy_server.redis_client.hgetall(metadata_key) + ) - # Calculate uptime - created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) - uptime = time.time() - created_at if created_at > 0 else 0 + if not metadata: + return None - # Simplified info - info = { - 'channel_id': channel_id, - 'state': metadata.get(b'state', b'unknown').decode('utf-8'), - 'url': metadata.get(b'url', b'').decode('utf-8'), - 'profile': metadata.get(b'profile', b'unknown').decode('utf-8'), - 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), - 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, - 'client_count': client_count, - 'uptime': uptime - } + # Basic channel info only - omit diagnostics and details + buffer_index_key = RedisKeys.buffer_index(channel_id) + buffer_index_value = proxy_server.redis_client.get(buffer_index_key) - # Quick health check if available locally - if channel_id in proxy_server.stream_managers: - manager = proxy_server.stream_managers[channel_id] - info['healthy'] = manager.healthy + # Count clients (using efficient count method) + client_set_key = RedisKeys.clients(channel_id) + client_count = proxy_server.redis_client.scard(client_set_key) or 0 - # Get concise client information - clients = [] - client_ids = proxy_server.redis_client.smembers(client_set_key) + # Calculate uptime + created_at = float(metadata.get(b'init_time', b'0').decode('utf-8')) + uptime = time.time() - created_at if created_at > 0 else 0 - # Process only if we have clients and keep it limited - if client_ids: - # Get up to 10 clients for the basic view - for client_id in list(client_ids)[:10]: - client_id_str = client_id.decode('utf-8') - client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" + # Simplified info + info = { + 'channel_id': channel_id, + 'state': metadata.get(b'state', b'unknown').decode('utf-8'), + 'url': metadata.get(b'url', b'').decode('utf-8'), + 'profile': metadata.get(b'profile', b'unknown').decode('utf-8'), + 'owner': metadata.get(b'owner', b'unknown').decode('utf-8'), + 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + 'client_count': client_count, + 'uptime': uptime + } - # Efficient way - just retrieve the essentials - client_info = { - 'client_id': client_id_str, - 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'), - 'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'), - } + # Quick health check if available locally + if channel_id in proxy_server.stream_managers: + manager = proxy_server.stream_managers[channel_id] + info['healthy'] = manager.healthy - if client_info['user_agent']: - client_info['user_agent'] = client_info['user_agent'].decode('utf-8') - else: - client_info['user_agent'] = 'unknown' + # Get concise client information + clients = [] + client_ids = proxy_server.redis_client.smembers(client_set_key) - # Just get connected_at for client age - connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') - if connected_at_bytes: - connected_at = float(connected_at_bytes.decode('utf-8')) - client_info['connected_since'] = time.time() - connected_at + # Process only if we have clients and keep it limited + if client_ids: + # Get up to 10 clients for the basic view + for client_id in list(client_ids)[:10]: + client_id_str = client_id.decode('utf-8') + client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}" - clients.append(client_info) + # Efficient way - just retrieve the essentials + client_info = { + 'client_id': client_id_str, + 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'), + 'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'), + } - # Add clients to info - info['clients'] = clients + if client_info['user_agent']: + client_info['user_agent'] = client_info['user_agent'].decode('utf-8') + else: + client_info['user_agent'] = 'unknown' - return info + # Just get connected_at for client age + connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') + if connected_at_bytes: + connected_at = float(connected_at_bytes.decode('utf-8')) + client_info['connected_since'] = time.time() - connected_at + + clients.append(client_info) + + # Add clients to info + info['clients'] = clients + + return info + except Exception as e: + logger.error(f"Error getting channel info: {e}") + return None diff --git a/apps/proxy/ts_proxy/client_manager.py b/apps/proxy/ts_proxy/client_manager.py index ed5868a9..42d7e04d 100644 --- a/apps/proxy/ts_proxy/client_manager.py +++ b/apps/proxy/ts_proxy/client_manager.py @@ -6,11 +6,13 @@ import time import json from typing import Set, Optional from apps.proxy.config import TSConfig as Config +from redis.exceptions import ConnectionError, TimeoutError from .constants import EventType from .config_helper import ConfigHelper from .redis_keys import RedisKeys +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class ClientManager: """Manages client connections with no duplicates""" @@ -120,6 +122,20 @@ class ClientManager: thread.start() logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") + def _execute_redis_command(self, command_func): + """Execute Redis command with error handling""" + if not self.redis_client: + return None + + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error in ClientManager: {e}") + return None + except Exception as e: + logger.error(f"Redis command error in ClientManager: {e}") + return None + def _notify_owner_of_activity(self): """Notify channel owner that clients are active on this worker""" if not self.redis_client or not self.clients: @@ -130,11 +146,15 @@ class ClientManager: # STANDARDIZED KEY: Worker info under channel namespace worker_key = f"ts_proxy:channel:{self.channel_id}:worker:{worker_id}" - self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients))) + self._execute_redis_command( + lambda: self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients))) + ) # STANDARDIZED KEY: Activity timestamp under channel namespace activity_key = f"ts_proxy:channel:{self.channel_id}:activity" - self.redis_client.setex(activity_key, self.client_ttl, str(time.time())) + self._execute_redis_command( + lambda: self.redis_client.setex(activity_key, self.client_ttl, str(time.time())) + ) except Exception as e: logger.error(f"Error notifying owner of client activity: {e}") diff --git a/apps/proxy/ts_proxy/config_helper.py b/apps/proxy/ts_proxy/config_helper.py index 5a16581a..c0576cb7 100644 --- a/apps/proxy/ts_proxy/config_helper.py +++ b/apps/proxy/ts_proxy/config_helper.py @@ -2,11 +2,8 @@ Helper module to access configuration values with proper defaults. """ -import logging from apps.proxy.config import TSConfig as Config -logger = logging.getLogger("ts_proxy") - class ConfigHelper: """ Helper class for accessing configuration values with sensible defaults. diff --git a/apps/proxy/ts_proxy/redis_keys.py b/apps/proxy/ts_proxy/redis_keys.py index 1eaa8aa5..22b02648 100644 --- a/apps/proxy/ts_proxy/redis_keys.py +++ b/apps/proxy/ts_proxy/redis_keys.py @@ -83,3 +83,8 @@ class RedisKeys: def transcode_active(channel_id): """Key indicating active transcode process""" return f"ts_proxy:channel:{channel_id}:transcode_active" + + @staticmethod + def client_metadata(channel_id, client_id): + """Key for client metadata hash""" + return f"ts_proxy:channel:{channel_id}:clients:{client_id}" diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index cfe5b4ce..2dd923fd 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -18,14 +18,17 @@ import json from typing import Dict, Optional, Set from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel +from core.utils import redis_client as global_redis_client, redis_pubsub_client as global_redis_pubsub_client # Import both global Redis clients +from redis.exceptions import ConnectionError, TimeoutError from .stream_manager import StreamManager from .stream_buffer import StreamBuffer from .client_manager import ClientManager from .redis_keys import RedisKeys from .constants import ChannelState, EventType, StreamType from .config_helper import ConfigHelper +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class ProxyServer: """Manages TS proxy server instance with worker coordination""" @@ -43,19 +46,25 @@ class ProxyServer: hostname = socket.gethostname() self.worker_id = f"{hostname}:{pid}" - # Connect to Redis + # Connect to Redis - try using global client first self.redis_client = None - try: - import redis - from django.conf import settings + self.redis_connection_attempts = 0 + self.redis_max_retries = 3 + self.redis_retry_interval = 5 # seconds + + try: + # First try to use the global client from core.utils + if global_redis_client is not None: + self.redis_client = global_redis_client + logger.info(f"Using global Redis client") + logger.info(f"Worker ID: {self.worker_id}") + else: + # Fall back to direct connection with retry + self._setup_redis_connection() - redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0') - self.redis_client = redis.from_url(redis_url) - logger.info(f"Connected to Redis at {redis_url}") - logger.info(f"Worker ID: {self.worker_id}") except Exception as e: + logger.error(f"Failed to initialize Redis: {e}") self.redis_client = None - logger.error(f"Failed to connect to Redis: {e}") # Start cleanup thread self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60) @@ -64,179 +73,306 @@ class ProxyServer: # Start event listener for Redis pubsub messages self._start_event_listener() + def _setup_redis_connection(self): + """Setup Redis connection with retry logic""" + import redis + from django.conf import settings + + while self.redis_connection_attempts < self.redis_max_retries: + try: + logger.info(f"Attempting to connect to Redis ({self.redis_connection_attempts+1}/{self.redis_max_retries})") + + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + + # Create Redis client with reasonable timeouts + self.redis_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=5, + socket_connect_timeout=5, + retry_on_timeout=True, + health_check_interval=30 + ) + + # Test connection + self.redis_client.ping() + logger.info(f"Successfully connected to Redis at {redis_host}:{redis_port}/{redis_db}") + logger.info(f"Worker ID: {self.worker_id}") + break + + except (ConnectionError, TimeoutError) as e: + self.redis_connection_attempts += 1 + if self.redis_connection_attempts >= self.redis_max_retries: + logger.error(f"Failed to connect to Redis after {self.redis_max_retries} attempts: {e}") + self.redis_client = None + else: + # Exponential backoff with a maximum of 30 seconds + retry_delay = min(self.redis_retry_interval * (2 ** (self.redis_connection_attempts - 1)), 30) + logger.warning(f"Redis connection failed. Retrying in {retry_delay}s... ({self.redis_connection_attempts}/{self.redis_max_retries})") + time.sleep(retry_delay) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis: {e}", exc_info=True) + self.redis_client = None + break + + def _execute_redis_command(self, command_func, *args, **kwargs): + """Execute Redis command with error handling and reconnection logic""" + if not self.redis_client: + return None + + try: + return command_func(*args, **kwargs) + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection lost: {e}. Attempting to reconnect...") + try: + # Try to reconnect + self.redis_connection_attempts = 0 + self._setup_redis_connection() + if self.redis_client: + # Retry the command once + return command_func(*args, **kwargs) + except Exception as reconnect_error: + logger.error(f"Failed to reconnect to Redis: {reconnect_error}") + return None + except Exception as e: + logger.error(f"Redis command error: {e}") + return None + def _start_event_listener(self): """Listen for events from other workers""" if not self.redis_client: return def event_listener(): - try: - pubsub = self.redis_client.pubsub() - pubsub.psubscribe("ts_proxy:events:*") + retry_count = 0 + max_retries = 10 + base_retry_delay = 1 # Start with 1 second delay + max_retry_delay = 30 # Cap at 30 seconds - logger.info(f"Started Redis event listener for client activity") + while True: + try: + # Use the global PubSub client if available + if global_redis_pubsub_client: + pubsub_client = global_redis_pubsub_client + logger.info("Using global Redis PubSub client for event listener") + else: + # Fall back to creating a dedicated client if global one is unavailable + from django.conf import settings + import redis - for message in pubsub.listen(): - if message["type"] != "pmessage": - continue + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - try: - channel = message["channel"].decode("utf-8") - data = json.loads(message["data"].decode("utf-8")) + pubsub_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=60, + socket_connect_timeout=10, + socket_keepalive=True, + health_check_interval=30 + ) + logger.info("Created dedicated Redis PubSub client for event listener") - event_type = data.get("event") - channel_id = data.get("channel_id") + # Test connection before subscribing + pubsub_client.ping() - if channel_id and event_type: - # For owner, update client status immediately - if self.am_i_owner(channel_id): - if event_type == EventType.CLIENT_CONNECTED: - logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}") - # Reset any disconnect timer - disconnect_key = RedisKeys.last_client_disconnect(channel_id) - self.redis_client.delete(disconnect_key) + # Create a pubsub instance from the client + pubsub = pubsub_client.pubsub() + pubsub.psubscribe("ts_proxy:events:*") - elif event_type == EventType.CLIENT_DISCONNECTED: - logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}") - # Check if any clients remain - if channel_id in self.client_managers: - # VERIFY REDIS CLIENT COUNT DIRECTLY - client_set_key = RedisKeys.clients(channel_id) - total = self.redis_client.scard(client_set_key) or 0 + logger.info(f"Started Redis event listener for client activity") - if total == 0: - logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}") - # Set the disconnect timer for other workers to see - disconnect_key = RedisKeys.last_client_disconnect(channel_id) - self.redis_client.setex(disconnect_key, 60, str(time.time())) + # Reset retry count on successful connection + retry_count = 0 - # Get configured shutdown delay or default - shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0) + for message in pubsub.listen(): + if message["type"] != "pmessage": + continue - if shutdown_delay > 0: - logger.info(f"Waiting {shutdown_delay}s before stopping channel...") - time.sleep(shutdown_delay) + try: + channel = message["channel"].decode("utf-8") + data = json.loads(message["data"].decode("utf-8")) - # Re-check client count before stopping - total = self.redis_client.scard(client_set_key) or 0 - if total > 0: - logger.info(f"New clients connected during shutdown delay - aborting shutdown") - self.redis_client.delete(disconnect_key) - return + event_type = data.get("event") + channel_id = data.get("channel_id") - # Stop the channel directly + if channel_id and event_type: + # For owner, update client status immediately + if self.am_i_owner(channel_id): + if event_type == EventType.CLIENT_CONNECTED: + logger.debug(f"Owner received {EventType.CLIENT_CONNECTED} event for channel {channel_id}") + # Reset any disconnect timer + disconnect_key = RedisKeys.last_client_disconnect(channel_id) + self.redis_client.delete(disconnect_key) + + elif event_type == EventType.CLIENT_DISCONNECTED: + logger.debug(f"Owner received {EventType.CLIENT_DISCONNECTED} event for channel {channel_id}") + # Check if any clients remain + if channel_id in self.client_managers: + # VERIFY REDIS CLIENT COUNT DIRECTLY + client_set_key = RedisKeys.clients(channel_id) + total = self.redis_client.scard(client_set_key) or 0 + + if total == 0: + logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}") + # Set the disconnect timer for other workers to see + disconnect_key = RedisKeys.last_client_disconnect(channel_id) + self.redis_client.setex(disconnect_key, 60, str(time.time())) + + # Get configured shutdown delay or default + shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0) + + if shutdown_delay > 0: + logger.info(f"Waiting {shutdown_delay}s before stopping channel...") + time.sleep(shutdown_delay) + + # Re-check client count before stopping + total = self.redis_client.scard(client_set_key) or 0 + if total > 0: + logger.info(f"New clients connected during shutdown delay - aborting shutdown") + self.redis_client.delete(disconnect_key) + return + + # Stop the channel directly + self.stop_channel(channel_id) + + + elif event_type == EventType.STREAM_SWITCH: + logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}") + # Handle stream switch request + new_url = data.get("url") + user_agent = data.get("user_agent") + + if new_url and channel_id in self.stream_managers: + # Update metadata in Redis + if self.redis_client: + metadata_key = RedisKeys.channel_metadata(channel_id) + self.redis_client.hset(metadata_key, "url", new_url) + if user_agent: + self.redis_client.hset(metadata_key, "user_agent", user_agent) + + # Set switch status + status_key = RedisKeys.switch_status(channel_id) + self.redis_client.set(status_key, "switching") + + # Perform the stream switch + stream_manager = self.stream_managers[channel_id] + success = stream_manager.update_url(new_url) + + if success: + logger.info(f"Stream switch initiated for channel {channel_id}") + + # Publish confirmation + switch_result = { + "event": EventType.STREAM_SWITCHED, # Use constant instead of string + "channel_id": channel_id, + "success": True, + "url": new_url, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(switch_result) + ) + + # Update status + if self.redis_client: + self.redis_client.set(status_key, "switched") + else: + logger.error(f"Failed to switch stream for channel {channel_id}") + + # Publish failure + switch_result = { + "event": EventType.STREAM_SWITCHED, + "channel_id": channel_id, + "success": False, + "url": new_url, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(switch_result) + ) + elif event_type == EventType.CHANNEL_STOP: + logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}") + # First mark channel as stopping in Redis + if self.redis_client: + # Set stopping state in metadata + metadata_key = RedisKeys.channel_metadata(channel_id) + if self.redis_client.exists(metadata_key): + self.redis_client.hset(metadata_key, mapping={ + "state": ChannelState.STOPPING, + "state_changed_at": str(time.time()) + }) + + # If we have local resources for this channel, clean them up + if channel_id in self.stream_buffers or channel_id in self.client_managers: + # Use existing stop_channel method + logger.info(f"Stopping local resources for channel {channel_id}") self.stop_channel(channel_id) + # Acknowledge stop by publishing a response + stop_response = { + "event": EventType.CHANNEL_STOPPED, + "channel_id": channel_id, + "worker_id": self.worker_id, + "timestamp": time.time() + } + self.redis_client.publish( + f"ts_proxy:events:{channel_id}", + json.dumps(stop_response) + ) + elif event_type == EventType.CLIENT_STOP: + client_id = data.get("client_id") + if client_id and channel_id: + logger.info(f"Received request to stop client {client_id} on channel {channel_id}") - elif event_type == EventType.STREAM_SWITCH: - logger.info(f"Owner received {EventType.STREAM_SWITCH} request for channel {channel_id}") - # Handle stream switch request - new_url = data.get("url") - user_agent = data.get("user_agent") + # Both remove from client manager AND set a key for the generator to detect + if channel_id in self.client_managers: + client_manager = self.client_managers[channel_id] + if client_id in client_manager.clients: + client_manager.remove_client(client_id) + logger.info(f"Removed client {client_id} from client manager") - if new_url and channel_id in self.stream_managers: - # Update metadata in Redis - if self.redis_client: - metadata_key = RedisKeys.channel_metadata(channel_id) - self.redis_client.hset(metadata_key, "url", new_url) - if user_agent: - self.redis_client.hset(metadata_key, "user_agent", user_agent) - - # Set switch status - status_key = RedisKeys.switch_status(channel_id) - self.redis_client.set(status_key, "switching") - - # Perform the stream switch - stream_manager = self.stream_managers[channel_id] - success = stream_manager.update_url(new_url) - - if success: - logger.info(f"Stream switch initiated for channel {channel_id}") - - # Publish confirmation - switch_result = { - "event": EventType.STREAM_SWITCHED, # Use constant instead of string - "channel_id": channel_id, - "success": True, - "url": new_url, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(switch_result) - ) - - # Update status + # Set a Redis key for the generator to detect if self.redis_client: - self.redis_client.set(status_key, "switched") - else: - logger.error(f"Failed to switch stream for channel {channel_id}") + stop_key = RedisKeys.client_stop(channel_id, client_id) + self.redis_client.setex(stop_key, 30, "true") # 30 second TTL + logger.info(f"Set stop key for client {client_id}") + except Exception as e: + logger.error(f"Error processing event message: {e}") - # Publish failure - switch_result = { - "event": EventType.STREAM_SWITCHED, - "channel_id": channel_id, - "success": False, - "url": new_url, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(switch_result) - ) - elif event_type == EventType.CHANNEL_STOP: - logger.info(f"Received {EventType.CHANNEL_STOP} event for channel {channel_id}") - # First mark channel as stopping in Redis - if self.redis_client: - # Set stopping state in metadata - metadata_key = RedisKeys.channel_metadata(channel_id) - if self.redis_client.exists(metadata_key): - self.redis_client.hset(metadata_key, mapping={ - "state": ChannelState.STOPPING, - "state_changed_at": str(time.time()) - }) + except (ConnectionError, TimeoutError) as e: + # Calculate exponential backoff with jitter + retry_count += 1 + delay = min(base_retry_delay * (2 ** (retry_count - 1)), max_retry_delay) + # Add some randomness to prevent thundering herd + jitter = random.uniform(0, 0.5 * delay) + final_delay = delay + jitter - # If we have local resources for this channel, clean them up - if channel_id in self.stream_buffers or channel_id in self.client_managers: - # Use existing stop_channel method - logger.info(f"Stopping local resources for channel {channel_id}") - self.stop_channel(channel_id) + logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})") + time.sleep(final_delay) - # Acknowledge stop by publishing a response - stop_response = { - "event": EventType.CHANNEL_STOPPED, - "channel_id": channel_id, - "worker_id": self.worker_id, - "timestamp": time.time() - } - self.redis_client.publish( - f"ts_proxy:events:{channel_id}", - json.dumps(stop_response) - ) - elif event_type == EventType.CLIENT_STOP: - client_id = data.get("client_id") - if client_id and channel_id: - logger.info(f"Received request to stop client {client_id} on channel {channel_id}") + # Try to clean up the old connection + try: + if 'pubsub' in locals(): + pubsub.close() + if 'pubsub_client' in locals(): + pubsub_client.close() + except: + pass - # Both remove from client manager AND set a key for the generator to detect - if channel_id in self.client_managers: - client_manager = self.client_managers[channel_id] - if client_id in client_manager.clients: - client_manager.remove_client(client_id) - logger.info(f"Removed client {client_id} from client manager") - - # Set a Redis key for the generator to detect - if self.redis_client: - stop_key = RedisKeys.client_stop(channel_id, client_id) - self.redis_client.setex(stop_key, 30, "true") # 30 second TTL - logger.info(f"Set stop key for client {client_id}") - except Exception as e: - logger.error(f"Error processing event message: {e}") - except Exception as e: - logger.error(f"Error in event listener: {e}") - time.sleep(5) # Wait before reconnecting - # Try to restart the listener - self._start_event_listener() + except Exception as e: + logger.error(f"Error in event listener: {e}") + # Add a short delay to prevent rapid retries on persistent errors + time.sleep(5) thread = threading.Thread(target=event_listener, daemon=True) thread.name = "redis-event-listener" @@ -249,10 +385,9 @@ class ProxyServer: try: lock_key = RedisKeys.channel_owner(channel_id) - owner = self.redis_client.get(lock_key) - if owner: - return owner.decode('utf-8') - return None + return self._execute_redis_command( + lambda: self.redis_client.get(lock_key).decode('utf-8') if self.redis_client.get(lock_key) else None + ) except Exception as e: logger.error(f"Error getting channel owner: {e}") return None @@ -271,20 +406,32 @@ class ProxyServer: # Create a lock key with proper namespace lock_key = RedisKeys.channel_owner(channel_id) - # Use Redis SETNX for atomic locking - only succeeds if the key doesn't exist - acquired = self.redis_client.setnx(lock_key, self.worker_id) + # Use Redis SETNX for atomic locking with error handling + acquired = self._execute_redis_command( + lambda: self.redis_client.setnx(lock_key, self.worker_id) + ) + + if acquired is None: # Redis command failed + logger.warning(f"Redis command failed during ownership acquisition - assuming ownership") + return True # If acquired, set expiry to prevent orphaned locks if acquired: - self.redis_client.expire(lock_key, ttl) + self._execute_redis_command( + lambda: self.redis_client.expire(lock_key, ttl) + ) logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}") return True # If not acquired, check if we already own it (might be a retry) - current_owner = self.redis_client.get(lock_key) + current_owner = self._execute_redis_command( + lambda: self.redis_client.get(lock_key) + ) if current_owner and current_owner.decode('utf-8') == self.worker_id: # Refresh TTL - self.redis_client.expire(lock_key, ttl) + self._execute_redis_command( + lambda: self.redis_client.expire(lock_key, ttl) + ) logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}") return True @@ -689,7 +836,7 @@ class ProxyServer: return True except Exception as e: - logger.error(f"Error stopping channel {channel_id}: {e}", exc_info=True) + logger.error(f"Error stopping channel {channel_id}: {e}") return False def check_inactive_channels(self): @@ -723,7 +870,9 @@ class ProxyServer: # Send worker heartbeat first if self.redis_client: worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat" - self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) + self._execute_redis_command( + lambda: self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) + ) # Refresh channel registry self.refresh_channel_registry() diff --git a/apps/proxy/ts_proxy/stream_buffer.py b/apps/proxy/ts_proxy/stream_buffer.py index 7bf4ba21..a94204ab 100644 --- a/apps/proxy/ts_proxy/stream_buffer.py +++ b/apps/proxy/ts_proxy/stream_buffer.py @@ -10,8 +10,9 @@ from apps.proxy.config import TSConfig as Config from .redis_keys import RedisKeys from .config_helper import ConfigHelper from .constants import TS_PACKET_SIZE +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class StreamBuffer: """Manages stream data buffering with optimized chunk storage""" diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index 8a91f1dc..3d4f037c 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -8,10 +8,11 @@ import logging import threading from apps.proxy.config import TSConfig as Config from . import proxy_server -from .utils import create_ts_packet +from .utils import create_ts_packet, get_logger from .redis_keys import RedisKeys +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() class StreamGenerator: """ @@ -162,11 +163,6 @@ class StreamGenerator: def _stream_data_generator(self): """Generate stream data chunks based on buffer contents.""" - bytes_sent = 0 - chunks_sent = 0 - stream_start_time = time.time() - local_index = self.local_index - # Main streaming loop while True: # Check if resources still exist @@ -174,12 +170,11 @@ class StreamGenerator: break # Get chunks at client's position using improved strategy - chunks, next_index = self.buffer.get_optimized_client_data(local_index) + chunks, next_index = self.buffer.get_optimized_client_data(self.local_index) if chunks: - yield from self._process_chunks(chunks, next_index, bytes_sent, chunks_sent, stream_start_time) - local_index = next_index - self.local_index = local_index + yield from self._process_chunks(chunks, next_index) + self.local_index = next_index self.last_yield_time = time.time() self.empty_reads = 0 self.consecutive_empty = 0 @@ -188,11 +183,11 @@ class StreamGenerator: self.empty_reads += 1 self.consecutive_empty += 1 - if self._should_send_keepalive(local_index): + if self._should_send_keepalive(self.local_index): keepalive_packet = create_ts_packet('keepalive') logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head") yield keepalive_packet - bytes_sent += len(keepalive_packet) + self.bytes_sent += len(keepalive_packet) self.last_yield_time = time.time() self.consecutive_empty = 0 # Reset consecutive counter but keep total empty_reads time.sleep(Config.KEEPALIVE_INTERVAL) @@ -204,11 +199,11 @@ class StreamGenerator: # Log empty reads periodically if self.empty_reads % 50 == 0: stream_status = "healthy" if (self.stream_manager and self.stream_manager.healthy) else "unknown" - logger.debug(f"[{self.client_id}] Waiting for chunks beyond {local_index} (buffer at {self.buffer.index}, stream: {stream_status})") + logger.debug(f"[{self.client_id}] Waiting for chunks beyond {self.local_index} (buffer at {self.buffer.index}, stream: {stream_status})") # Check for ghost clients - if self._is_ghost_client(local_index): - logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - local_index} chunks ahead but client stuck at {local_index}") + if self._is_ghost_client(self.local_index): + logger.warning(f"[{self.client_id}] Possible ghost client: buffer has advanced {self.buffer.index - self.local_index} chunks ahead but client stuck at {self.local_index}") break # Check for timeouts @@ -258,7 +253,7 @@ class StreamGenerator: return True - def _process_chunks(self, chunks, next_index, bytes_sent, chunks_sent, stream_start_time): + def _process_chunks(self, chunks, next_index): """Process and yield chunks to the client.""" # Process and send chunks total_size = sum(len(c) for c in chunks) @@ -268,20 +263,34 @@ class StreamGenerator: for chunk in chunks: try: yield chunk - bytes_sent += len(chunk) - chunks_sent += 1 + self.bytes_sent += len(chunk) + self.chunks_sent += 1 + logger.debug(f"[{self.client_id}] Sent chunk {self.chunks_sent} ({len(chunk)} bytes) to client") + # Log every 10 chunks and store in redis for visibility + if self.chunks_sent % 10 == 0: + elapsed = time.time() - self.stream_start_time + rate = self.bytes_sent / elapsed / 1024 if elapsed > 0 else 0 + logger.debug(f"[{self.client_id}] Stats: {self.chunks_sent} chunks, {self.bytes_sent/1024:.1f} KB, {rate:.1f} KB/s") + + # Store stats in Redis client metadata + if proxy_server.redis_client: + try: + client_key = RedisKeys.client_metadata(self.channel_id, self.client_id) + stats = { + "chunks_sent": str(self.chunks_sent), + "bytes_sent": str(self.bytes_sent), + "transfer_rate_KBps": str(round(rate, 1)), + "stats_updated_at": str(time.time()) + } + proxy_server.redis_client.hset(client_key, mapping=stats) + # No need to set expiration as client heartbeat will refresh this key + except Exception as e: + logger.warning(f"[{self.client_id}] Failed to store stats in Redis: {e}") - # Log every 100 chunks for visibility - if chunks_sent % 100 == 0: - elapsed = time.time() - stream_start_time - rate = bytes_sent / elapsed / 1024 if elapsed > 0 else 0 - logger.info(f"[{self.client_id}] Stats: {chunks_sent} chunks, {bytes_sent/1024:.1f}KB, {rate:.1f}KB/s") except Exception as e: logger.error(f"[{self.client_id}] Error sending chunk to client: {e}") raise # Re-raise to exit the generator - return bytes_sent, chunks_sent - def _should_send_keepalive(self, local_index): """Determine if a keepalive packet should be sent.""" # Check if we're caught up to buffer head diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index c47766eb..e8c3daac 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -13,13 +13,13 @@ from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile from core.models import UserAgent, CoreSettings from .stream_buffer import StreamBuffer -from .utils import detect_stream_type +from .utils import detect_stream_type, get_logger from .redis_keys import RedisKeys from .constants import ChannelState, EventType, StreamType, TS_PACKET_SIZE from .config_helper import ConfigHelper from .url_utils import get_alternate_streams, get_stream_info_for_switch -logger = logging.getLogger("ts_proxy") +logger = get_logger() class StreamManager: """Manages a connection to a TS stream without using raw sockets""" diff --git a/apps/proxy/ts_proxy/url_utils.py b/apps/proxy/ts_proxy/url_utils.py index e478c1c0..a0ed4476 100644 --- a/apps/proxy/ts_proxy/url_utils.py +++ b/apps/proxy/ts_proxy/url_utils.py @@ -9,8 +9,9 @@ from django.shortcuts import get_object_or_404 from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile from core.models import UserAgent, CoreSettings +from .utils import get_logger -logger = logging.getLogger("ts_proxy") +logger = get_logger() def generate_stream_url(channel_id: str) -> Tuple[str, str, bool]: """ @@ -54,7 +55,7 @@ def generate_stream_url(channel_id: str) -> Tuple[str, str, bool]: transcode = True # Get profile name as string - profile_value = str(stream_profile) + profile_value = stream_profile.id return stream_url, stream_user_agent, transcode, profile_value diff --git a/apps/proxy/ts_proxy/utils.py b/apps/proxy/ts_proxy/utils.py index d8c96464..b568b804 100644 --- a/apps/proxy/ts_proxy/utils.py +++ b/apps/proxy/ts_proxy/utils.py @@ -1,6 +1,7 @@ import logging import re from urllib.parse import urlparse +import inspect logger = logging.getLogger("ts_proxy") @@ -77,4 +78,31 @@ def create_ts_packet(packet_type='null', message=None): msg_bytes = message.encode('utf-8') packet[4:4+min(len(msg_bytes), 180)] = msg_bytes[:180] - return bytes(packet) \ No newline at end of file + return bytes(packet) + +def get_logger(component_name=None): + """ + Get a standardized logger with ts_proxy prefix and optional component name. + + Args: + component_name (str, optional): Name of the component. If not provided, + will try to detect from the calling module. + + Returns: + logging.Logger: A configured logger with standardized naming. + """ + if component_name: + logger_name = f"ts_proxy.{component_name}" + else: + # Try to get the calling module name if not explicitly specified + frame = inspect.currentframe().f_back + module = inspect.getmodule(frame) + if module: + # Extract just the filename without extension + module_name = module.__name__.split('.')[-1] + logger_name = f"ts_proxy.{module_name}" + else: + # Default if detection fails + logger_name = "ts_proxy" + + return logging.getLogger(logger_name) \ No newline at end of file diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 75ef4fc5..fe87e677 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -22,9 +22,9 @@ from .constants import ChannelState, EventType, StreamType from .config_helper import ConfigHelper from .services.channel_service import ChannelService from .url_utils import generate_stream_url, transform_url, get_stream_info_for_switch +from .utils import get_logger -# Configure logging properly -logger = logging.getLogger("ts_proxy") +logger = get_logger() @api_view(['GET']) diff --git a/core/command_utils.py b/core/command_utils.py new file mode 100644 index 00000000..f6adfd88 --- /dev/null +++ b/core/command_utils.py @@ -0,0 +1,34 @@ +import sys +import os + +def is_management_command(excluded_commands=None): + """ + Detect if we're running a Django management command like migrate, collectstatic, etc. + + Args: + excluded_commands: List of commands that should still use Redis (e.g. runserver) + + Returns: + bool: True if we're running a management command + """ + # First check if we're in build mode + if os.environ.get("DISPATCHARR_BUILD") == "1": + return True + + if excluded_commands is None: + excluded_commands = ['runserver', 'runworker', 'daphne'] + + # Check if we're running via manage.py + if not ('manage.py' in sys.argv[0]): + return False + + # Check if we have a command argument + if len(sys.argv) > 1: + command = sys.argv[1] + # Return False if command is in excluded list - these commands DO need Redis + if command in excluded_commands: + return False + # Otherwise it's a command that should work without Redis + return True + + return False diff --git a/core/migrations/0009_m3u_hash_settings.py b/core/migrations/0009_m3u_hash_settings.py new file mode 100644 index 00000000..eab5f141 --- /dev/null +++ b/core/migrations/0009_m3u_hash_settings.py @@ -0,0 +1,22 @@ +# Generated by Django 5.1.6 on 2025-03-01 14:01 + +from django.db import migrations +from django.utils.text import slugify + +def preload_core_settings(apps, schema_editor): + CoreSettings = apps.get_model("core", "CoreSettings") + CoreSettings.objects.create( + key=slugify("M3U Hash Key"), + name="M3U Hash Key", + value="name,url,tvg_id", + ) + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0008_rename_profile_name_streamprofile_name_and_more'), + ] + + operations = [ + migrations.RunPython(preload_core_settings), + ] diff --git a/core/models.py b/core/models.py index 86ea2623..a4fa92d4 100644 --- a/core/models.py +++ b/core/models.py @@ -67,7 +67,7 @@ class StreamProfile(models.Model): def save(self, *args, **kwargs): if self.pk: # Only check existing records orig = StreamProfile.objects.get(pk=self.pk) - if orig.is_protected: + if orig.locked: allowed_fields = {"user_agent_id"} # Only allow this field to change for field in self._meta.fields: field_name = field.name @@ -91,7 +91,7 @@ class StreamProfile(models.Model): def update(cls, pk, **kwargs): instance = cls.objects.get(pk=pk) - if instance.is_protected: + if instance.locked: allowed_fields = {"user_agent_id"} # Only allow updating this field for field_name, new_value in kwargs.items(): @@ -142,9 +142,9 @@ class StreamProfile(models.Model): DEFAULT_USER_AGENT_KEY= slugify("Default User-Agent") DEFAULT_STREAM_PROFILE_KEY = slugify("Default Stream Profile") +STREAM_HASH_KEY = slugify("M3U Hash Key") PREFERRED_REGION_KEY = slugify("Preferred Region") - class CoreSettings(models.Model): key = models.CharField( max_length=255, @@ -170,10 +170,12 @@ class CoreSettings(models.Model): return cls.objects.get(key=DEFAULT_STREAM_PROFILE_KEY).value @classmethod + def get_m3u_hash_key(cls): + return cls.objects.get(key=STREAM_HASH_KEY).value + def get_preferred_region(cls): """Retrieve the preferred region setting (or return None if not found).""" try: return cls.objects.get(key=PREFERRED_REGION_KEY).value except cls.DoesNotExist: return None - diff --git a/core/redis_pubsub.py b/core/redis_pubsub.py new file mode 100644 index 00000000..5d0032b0 --- /dev/null +++ b/core/redis_pubsub.py @@ -0,0 +1,267 @@ +""" +Redis PubSub utilities for maintaining long-lived connections. +""" +import threading +import time +import logging +import json +from redis import Redis +from redis.exceptions import ConnectionError, TimeoutError + +logger = logging.getLogger(__name__) + +class DummyPubSub: + """Dummy PubSub implementation when Redis isn't available""" + def __init__(self): + pass + + def subscribe(self, *args, **kwargs): + pass + + def psubscribe(self, *args, **kwargs): + pass + + def get_message(self, *args, **kwargs): + return None + + def close(self): + pass + +class RedisPubSubManager: + """ + A robust Redis PubSub manager that handles disconnections and reconnections. + """ + + def __init__(self, redis_client=None, auto_reconnect=True): + """ + Initialize the PubSub manager. + + Args: + redis_client: An existing Redis client to use + auto_reconnect: Whether to automatically reconnect on failure + """ + self.redis_client = redis_client + self.pubsub = None + self.subscriptions = set() + self.pattern_subscriptions = set() + self.auto_reconnect = auto_reconnect + self.running = True + self.lock = threading.RLock() + self.message_handlers = {} # Map of channels to handler functions + self.message_thread = None + self.is_dummy = redis_client is None + + def subscribe(self, channel, handler=None): + """ + Subscribe to a channel. + + Args: + channel: The channel to subscribe to + handler: Optional function to call when messages are received + """ + if self.is_dummy: + return + + with self.lock: + self.subscriptions.add(channel) + if handler: + self.message_handlers[channel] = handler + + if self.pubsub: + self.pubsub.subscribe(channel) + logger.info(f"Subscribed to channel: {channel}") + + def psubscribe(self, pattern, handler=None): + """ + Subscribe to a channel pattern. + + Args: + pattern: The pattern to subscribe to + handler: Optional function to call when messages are received + """ + if self.is_dummy: + return + + with self.lock: + self.pattern_subscriptions.add(pattern) + if handler: + self.message_handlers[pattern] = handler + + if self.pubsub: + self.pubsub.psubscribe(pattern) + logger.info(f"Subscribed to pattern: {pattern}") + + def publish(self, channel, message): + """ + Publish a message to a channel. + + Args: + channel: The channel to publish to + message: The message to publish (will be JSON-encoded if not a string) + + Returns: + Number of clients that received the message + """ + if self.is_dummy: + return 0 + + try: + if not isinstance(message, str): + message = json.dumps(message) + return self.redis_client.publish(channel, message) + except Exception as e: + logger.error(f"Error publishing to {channel}: {e}") + return 0 + + def start_listening(self): + """ + Start listening for messages in a background thread. + """ + if self.is_dummy: + logger.debug("Running with dummy Redis client - not starting listener") + return + + if not self.message_thread: + self._connect() + self.message_thread = threading.Thread( + target=self._listen_for_messages, + daemon=True, + name="redis-pubsub-listener" + ) + self.message_thread.start() + logger.info("Started Redis PubSub listener thread") + + def stop(self): + """ + Stop listening and clean up resources. + """ + if self.is_dummy: + return + + self.running = False + if self.pubsub: + try: + self.pubsub.close() + except: + pass + self.pubsub = None + + def _connect(self): + """ + Establish a new PubSub connection and subscribe to all channels. + """ + if self.is_dummy: + self.pubsub = DummyPubSub() + return + + with self.lock: + # Close any existing connection + if self.pubsub: + try: + self.pubsub.close() + except: + pass + + # Create a new PubSub instance - critical: no timeout for subscribe operations + # This prevents the connection from timing out while waiting for messages + self.pubsub = self.redis_client.pubsub() + + # Resubscribe to all channels + if self.subscriptions: + self.pubsub.subscribe(*self.subscriptions) + logger.info(f"Resubscribed to channels: {self.subscriptions}") + + # Resubscribe to all patterns + if self.pattern_subscriptions: + self.pubsub.psubscribe(*self.pattern_subscriptions) + logger.info(f"Resubscribed to patterns: {self.pattern_subscriptions}") + + def _listen_for_messages(self): + """ + Background thread that listens for messages and handles reconnections. + """ + if self.is_dummy: + return + + consecutive_errors = 0 + + while self.running: + try: + # Check if we need to connect + if not self.pubsub: + self._connect() + + # Listen for messages with NO timeout - this is critical! + message = self.pubsub.get_message(timeout=None) + + if message: + # Don't process subscription confirmation messages + if message['type'] in ('subscribe', 'psubscribe'): + continue + + channel = message.get('channel') + if channel: + # Decode binary channel name if needed + if isinstance(channel, bytes): + channel = channel.decode('utf-8') + + # Find and call the appropriate handler + handler = self.message_handlers.get(channel) + if handler: + try: + handler(message) + except Exception as e: + logger.error(f"Error in message handler for {channel}: {e}") + + # Reset error counter on success + consecutive_errors = 0 + + # Small sleep to prevent excessive CPU usage + time.sleep(0.01) + + except (ConnectionError, TimeoutError) as e: + consecutive_errors += 1 + + if not self.auto_reconnect: + logger.error(f"PubSub connection error and auto_reconnect is disabled: {e}") + break + + # Exponential backoff for reconnection attempts + backoff = min(consecutive_errors * 0.5, 5) + logger.warning(f"PubSub connection error, reconnecting in {backoff} seconds: {e}") + time.sleep(backoff) + + # Reconnect + self._connect() + + except Exception as e: + logger.error(f"Unexpected error in PubSub listener: {e}") + time.sleep(1) # Prevent tight loop in case of persistent errors + + logger.info("PubSub listener thread stopping") + +# Create a singleton instance +pubsub_manager = None + +def get_pubsub_manager(redis_client=None): + """ + Get or create the PubSub manager singleton. + + Args: + redis_client: Optional Redis client to use + + Returns: + The PubSub manager instance + """ + global pubsub_manager + + if pubsub_manager is None: + pubsub_manager = RedisPubSubManager(redis_client) + # Only start if redis_client is not None + if redis_client is not None: + try: + pubsub_manager.start_listening() + except Exception as e: + logger.error(f"Failed to start PubSub listener: {e}") + + return pubsub_manager diff --git a/core/utils.py b/core/utils.py index e9b5f6d2..073c2169 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,28 +1,165 @@ import redis import logging +import time +import os +import threading from django.conf import settings +from redis.exceptions import ConnectionError, TimeoutError logger = logging.getLogger(__name__) -def get_redis_client(): - """Get Redis client with connection validation""" - try: - # Create Redis client - client = redis.Redis( - host=settings.REDIS_HOST, - port=getattr(settings, 'REDIS_PORT', 6379), - db=settings.REDIS_DB, - socket_timeout=5, - socket_connect_timeout=5 - ) - - # Validate connection with ping - client.ping() - logger.info(f"Connected to Redis at {settings.REDIS_HOST}:6379/{settings.REDIS_DB}") - return client - except Exception as e: - logger.error(f"Failed to connect to Redis: {e}") +# Import the command detector +from .command_utils import is_management_command + +def get_redis_client(max_retries=5, retry_interval=1): + """Get Redis client with connection validation and retry logic""" + # Skip Redis connection for management commands like collectstatic + if is_management_command(): + logger.info("Running as management command - skipping Redis initialization") return None -# Initialize the global client -redis_client = get_redis_client() \ No newline at end of file + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + + # Use standardized settings + socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5) + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + + # Create Redis client with better defaults + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout + ) + + # Validate connection with ping + client.ping() + logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") + return client + + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis: {e}") + return None + +def get_redis_pubsub_client(max_retries=5, retry_interval=1): + """Get Redis client optimized for PubSub operations""" + # Skip Redis connection for management commands like collectstatic + if is_management_command(): + logger.info("Running as management command - skipping Redis PubSub initialization") + return None + + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + + # Use standardized settings but without socket timeouts for PubSub + # Important: socket_timeout is None for PubSub operations + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + + # Create Redis client with PubSub-optimized settings - no timeout + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=None, # Critical: No timeout for PubSub operations + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout + ) + + # Validate connection with ping + client.ping() + logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}") + + # We don't need the keepalive thread anymore since we're using proper PubSub handling + return client + + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis for PubSub: {e}") + return None + +def execute_redis_command(redis_client, command_func, default_return=None): + """ + Execute a Redis command with proper error handling + + Args: + redis_client: The Redis client instance + command_func: Lambda function containing the Redis command to execute + default_return: Value to return if command fails + + Returns: + Command result or default_return on failure + """ + if redis_client is None: + return default_return + + try: + return command_func() + except (ConnectionError, TimeoutError) as e: + logger.warning(f"Redis connection error: {e}") + return default_return + except Exception as e: + logger.error(f"Redis command error: {e}") + return default_return + +# Initialize the global clients with retry logic +# Skip Redis initialization if running as a management command +if is_management_command(): + redis_client = None + redis_pubsub_client = None + logger.info("Running as management command - Redis clients set to None") +else: + redis_client = get_redis_client() + redis_pubsub_client = get_redis_pubsub_client() + +# Import and initialize the PubSub manager +# Skip if running as management command or if Redis client is None +if not is_management_command() and redis_client is not None: + from .redis_pubsub import get_pubsub_manager + pubsub_manager = get_pubsub_manager(redis_client) +else: + logger.info("PubSub manager not initialized (running as management command or Redis not available)") + pubsub_manager = None \ No newline at end of file diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 5cd21169..9b381bfd 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -151,6 +151,19 @@ AUTH_USER_MODEL = 'accounts.User' CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0') CELERY_RESULT_BACKEND = CELERY_BROKER_URL +# Configure Redis key prefix +CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = { + 'prefix': 'celery-task:', # Set the Redis key prefix for Celery +} + +# Set TTL (Time-to-Live) for task results (in seconds) +CELERY_RESULT_EXPIRES = 3600 # 1 hour TTL for task results + +# Optionally, set visibility timeout for task retries (if using Redis) +CELERY_BROKER_TRANSPORT_OPTIONS = { + 'visibility_timeout': 3600, # Time in seconds that a task remains invisible during retries +} + CELERY_BEAT_SCHEDULE = { 'fetch-channel-statuses': { 'task': 'apps.proxy.tasks.fetch_channel_stats', @@ -179,8 +192,15 @@ SIMPLE_JWT = { 'BLACKLIST_AFTER_ROTATION': True, # Optional: Whether to blacklist refresh tokens } -# Redis settings for TS proxy +# Redis connection settings REDIS_URL = 'redis://localhost:6379/0' +REDIS_SOCKET_TIMEOUT = 60 # Socket timeout in seconds +REDIS_SOCKET_CONNECT_TIMEOUT = 5 # Connection timeout in seconds +REDIS_HEALTH_CHECK_INTERVAL = 15 # Health check every 15 seconds +REDIS_SOCKET_KEEPALIVE = True # Enable socket keepalive +REDIS_RETRY_ON_TIMEOUT = True # Retry on timeout +REDIS_MAX_RETRIES = 10 # Maximum number of retries +REDIS_RETRY_INTERVAL = 1 # Initial retry interval in seconds # Proxy Settings PROXY_SETTINGS = { diff --git a/docker/Dockerfile b/docker/Dockerfile index 54aa75e5..b833a9f7 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,9 +1,13 @@ FROM python:3.13-slim AS builder +# Define build argument with default value of "main" +ARG BRANCH=main + ENV PATH="/dispatcharrpy/bin:$PATH" \ VIRTUAL_ENV=/dispatcharrpy \ DJANGO_SETTINGS_MODULE=dispatcharr.settings \ - PYTHONUNBUFFERED=1 + PYTHONUNBUFFERED=1 \ + DISPATCHARR_BUILD=1 RUN apt-get update && \ apt-get install -y --no-install-recommends \ @@ -23,14 +27,14 @@ RUN apt-get update && \ nodejs && \ python -m pip install virtualenv && \ virtualenv /dispatcharrpy && \ - git clone https://github.com/Dispatcharr/Dispatcharr /app && \ + git clone -b ${BRANCH} https://github.com/Dispatcharr/Dispatcharr /app && \ cd /app && \ rm -rf .git && \ cd /app && \ pip install --no-cache-dir -r requirements.txt && \ python manage.py collectstatic --noinput && \ cd /app/frontend && \ - npm install && \ + npm install --legacy-peer-deps && \ npm run build && \ find . -maxdepth 1 ! -name '.' ! -name 'dist' -exec rm -rf '{}' \; @@ -45,43 +49,44 @@ ENV PATH="/dispatcharrpy/bin:$PATH" \ COPY --from=builder /dispatcharrpy /dispatcharrpy COPY --from=builder /app /app +# Install base dependencies with memory optimization RUN apt-get update && \ - apt-get install -y --no-install-recommends \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ curl \ ffmpeg \ - gnupg2 \ - gpg \ libpcre3 \ libpq-dev \ - lsb-release \ nginx \ procps \ streamlink \ - wget && \ + wget \ + gnupg2 \ + lsb-release && \ cp /app/docker/nginx.conf /etc/nginx/sites-enabled/default && \ - echo "=== setting up postgres ====" && \ - echo "deb http://apt.postgresql.org/pub/repos/apt/ bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ - wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - && \ - echo "=== setting up redis ===" && \ - curl -fsSL https://packages.redis.io/gpg | gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Set up Redis repository in a separate step +RUN curl -fsSL https://packages.redis.io/gpg | gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg && \ chmod 644 /usr/share/keyrings/redis-archive-keyring.gpg && \ echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | tee /etc/apt/sources.list.d/redis.list && \ apt-get update && \ - apt-get install -y \ - postgresql-14 \ - postgresql-contrib-14 \ - redis-server && \ - mkdir /data && \ - apt-get remove -y \ - gnupg2 \ - gpg \ - lsb-release && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y redis-server && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Set up PostgreSQL repository and install in a separate step +RUN echo "=== setting up postgres ====" && \ + sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' && \ + wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor -o /usr/share/keyrings/postgresql-keyring.gpg && \ + echo "deb [signed-by=/usr/share/keyrings/postgresql-keyring.gpg] http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ + apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y postgresql-14 postgresql-contrib-14 && \ + mkdir -p /data && \ + apt-get remove -y gnupg2 lsb-release && \ apt-get clean && \ apt-get autoremove -y && \ - rm -rf \ - /tmp/* \ - /var/lib/apt/lists/* \ - /var/tmp/* + rm -rf /tmp/* /var/lib/apt/lists/* /var/tmp/* WORKDIR /app diff --git a/docker/build-dev.sh b/docker/build-dev.sh index 0b723671..c5c79474 100755 --- a/docker/build-dev.sh +++ b/docker/build-dev.sh @@ -1,3 +1,3 @@ #!/bin/bash -docker build -t dispatcharr/dispatcharr:dev -f Dockerfile .. +docker build --build-arg BRANCH=dev -t dispatcharr/dispatcharr:dev -f Dockerfile .. diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 2898ed2f..d04edcb0 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -85,10 +85,6 @@ else pids+=("$nginx_pid") fi -cd /app -python manage.py migrate --noinput -python manage.py collectstatic --noinput - uwsgi_file="/app/docker/uwsgi.ini" if [ "$DISPATCHARR_ENV" = "dev" ]; then uwsgi_file="/app/docker/uwsgi.dev.ini" @@ -100,6 +96,12 @@ uwsgi_pid=$(pgrep uwsgi | sort | head -n1) echo "✅ uwsgi started with PID $uwsgi_pid" pids+=("$uwsgi_pid") + + +cd /app +python manage.py migrate --noinput +python manage.py collectstatic --noinput + # Wait for at least one process to exit and log the process that exited first if [ ${#pids[@]} -gt 0 ]; then echo "⏳ Waiting for processes to exit..." diff --git a/docker/init/03-init-dispatcharr.sh b/docker/init/03-init-dispatcharr.sh index 951958a2..722e916b 100644 --- a/docker/init/03-init-dispatcharr.sh +++ b/docker/init/03-init-dispatcharr.sh @@ -10,4 +10,10 @@ if [ "$(id -u)" = "0" ]; then chown -R $PUID:$PGID /app chown $PUID:www-data /app/uwsgi.sock chmod 777 /app/uwsgi.sock + + # Create and set permissions for the cached_m3u directory + mkdir -p /app/media/cached_m3u + chown -R $PUID:$PGID /app/media/cached_m3u + chmod 777 /app/media/cached_m3u + echo "Created and set permissions for cached_m3u directory" fi diff --git a/docker/uwsgi.dev.ini b/docker/uwsgi.dev.ini index 93ef9fa0..5b23b183 100644 --- a/docker/uwsgi.dev.ini +++ b/docker/uwsgi.dev.ini @@ -2,9 +2,14 @@ ; exec-before = python manage.py collectstatic --noinput ; exec-before = python manage.py migrate --noinput +; First run Redis availability check script once +exec-pre = python /app/scripts/wait_for_redis.py + +; Start Redis first +attach-daemon = redis-server +; Then start other services attach-daemon = celery -A dispatcharr worker -l info attach-daemon = celery -A dispatcharr beat -l info -attach-daemon = redis-server attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application attach-daemon = cd /app/frontend && npm run dev diff --git a/docker/uwsgi.ini b/docker/uwsgi.ini index adb5de33..1b1de50a 100644 --- a/docker/uwsgi.ini +++ b/docker/uwsgi.ini @@ -2,9 +2,14 @@ ; exec-before = python manage.py collectstatic --noinput ; exec-before = python manage.py migrate --noinput +; First run Redis availability check script once +exec-pre = python /app/scripts/wait_for_redis.py + +; Start Redis first +attach-daemon = redis-server +; Then start other services attach-daemon = celery -A dispatcharr worker -l error attach-daemon = celery -A dispatcharr beat -l error -attach-daemon = redis-server attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application # Core settings diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 4d2476b4..bfe43fc1 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -26,6 +26,7 @@ import './index.css'; import mantineTheme from './mantineTheme'; import API from './api'; import { Notifications } from '@mantine/notifications'; +import M3URefreshNotification from './components/M3URefreshNotification'; const drawerWidth = 240; const miniDrawerWidth = 60; @@ -143,6 +144,7 @@ const App = () => { + diff --git a/frontend/src/WebSocket.jsx b/frontend/src/WebSocket.jsx index d361409b..0538887d 100644 --- a/frontend/src/WebSocket.jsx +++ b/frontend/src/WebSocket.jsx @@ -8,6 +8,8 @@ import React, { import useStreamsStore from './store/streams'; import { notifications } from '@mantine/notifications'; import useChannelsStore from './store/channels'; +import usePlaylistsStore from './store/playlists'; +import useEPGsStore from './store/epgs'; export const WebsocketContext = createContext(false, null, () => {}); @@ -16,7 +18,9 @@ export const WebsocketProvider = ({ children }) => { const [val, setVal] = useState(null); const { fetchStreams } = useStreamsStore(); - const { setChannelStats } = useChannelsStore(); + const { setChannelStats, fetchChannelGroups } = useChannelsStore(); + const { setRefreshProgress } = usePlaylistsStore(); + const { fetchEPGData } = useEPGsStore(); const ws = useRef(null); @@ -52,12 +56,20 @@ export const WebsocketProvider = ({ children }) => { event = JSON.parse(event.data); switch (event.data.type) { case 'm3u_refresh': + console.log('inside m3u_refresh event'); if (event.data.success) { fetchStreams(); notifications.show({ message: event.data.message, color: 'green.5', }); + } else if (event.data.progress) { + if (event.data.progress == 100) { + fetchStreams(); + fetchChannelGroups(); + fetchEPGData(); + } + setRefreshProgress(event.data.account, event.data.progress); } break; diff --git a/frontend/src/api.js b/frontend/src/api.js index 60403bd0..0590e2d4 100644 --- a/frontend/src/api.js +++ b/frontend/src/api.js @@ -581,6 +581,18 @@ export default class API { return retval; } + static async getEPGData() { + const response = await fetch(`${host}/api/epg/epgdata/`, { + headers: { + Authorization: `Bearer ${await API.getAuthToken()}`, + 'Content-Type': 'application/json', + }, + }); + + const retval = await response.json(); + return retval; + } + // Notice there's a duplicated "refreshPlaylist" method above; // you might want to rename or remove one if it's not needed. diff --git a/frontend/src/components/FloatingVideo.jsx b/frontend/src/components/FloatingVideo.jsx index 1c0c1400..62e53e3e 100644 --- a/frontend/src/components/FloatingVideo.jsx +++ b/frontend/src/components/FloatingVideo.jsx @@ -3,6 +3,8 @@ import React, { useEffect, useRef } from 'react'; import Draggable from 'react-draggable'; import useVideoStore from '../store/useVideoStore'; import mpegts from 'mpegts.js'; +import { ActionIcon, Flex } from '@mantine/core'; +import { SquareX } from 'lucide-react'; export default function FloatingVideo() { const { isVisible, streamUrl, hideVideo } = useVideoStore(); @@ -65,28 +67,11 @@ export default function FloatingVideo() { }} > {/* Simple header row with a close button */} -
- -
+ + + + + {/* The