From d6e05445f3929c89fe4f911dbe5c5d00a5227dca Mon Sep 17 00:00:00 2001 From: dekzter Date: Sat, 22 Mar 2025 09:56:00 -0400 Subject: [PATCH] m3u group filters --- apps/channels/serializers.py | 6 +- apps/epg/models.py | 2 +- apps/epg/tasks.py | 2 + apps/m3u/api_views.py | 36 ++++- apps/m3u/serializers.py | 69 ++++++++- apps/m3u/signals.py | 6 +- apps/m3u/tasks.py | 138 ++++++++++------- apps/proxy/ts_proxy/server.py | 10 +- core/models.py | 4 +- frontend/src/components/FloatingVideo.jsx | 29 +--- .../src/components/M3URefreshNotification.jsx | 2 + frontend/src/components/forms/M3U.jsx | 77 +++++++--- .../src/components/forms/M3UGroupFilter.jsx | 144 ++++++++++++++++++ .../src/components/tables/ChannelsTable.jsx | 3 +- frontend/src/components/tables/M3UsTable.jsx | 16 +- .../components/tables/StreamProfilesTable.jsx | 73 +++++++-- .../src/components/tables/StreamsTable.jsx | 6 +- frontend/src/store/channels.jsx | 20 ++- 18 files changed, 513 insertions(+), 130 deletions(-) create mode 100644 frontend/src/components/forms/M3UGroupFilter.jsx diff --git a/apps/channels/serializers.py b/apps/channels/serializers.py index dd92c47a..a075297d 100644 --- a/apps/channels/serializers.py +++ b/apps/channels/serializers.py @@ -148,9 +148,11 @@ class ChannelSerializer(serializers.ModelSerializer): return instance class ChannelGroupM3UAccountSerializer(serializers.ModelSerializer): + enabled = serializers.BooleanField() + class Meta: model = ChannelGroupM3UAccount - fields = ['channel_group', 'enabled'] + fields = ['id', 'channel_group', 'enabled'] # Optionally, if you only need the id of the ChannelGroup, you can customize it like this: - channel_group = serializers.PrimaryKeyRelatedField(queryset=ChannelGroup.objects.all()) + # channel_group = serializers.PrimaryKeyRelatedField(queryset=ChannelGroup.objects.all()) diff --git a/apps/epg/models.py b/apps/epg/models.py index 206791a6..305d30ed 100644 --- a/apps/epg/models.py +++ b/apps/epg/models.py @@ -19,7 +19,7 @@ class EPGSource(models.Model): class EPGData(models.Model): # Removed the Channel foreign key. We now just store the original tvg_id # and a name (which might simply be the tvg_id if no real channel exists). - tvg_id = models.CharField(max_length=255, null=True, blank=True) + tvg_id = models.CharField(max_length=255, null=True, blank=True, unique=True) name = models.CharField(max_length=255) def __str__(self): diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index dd6bb2a0..1fa7d5d4 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -106,6 +106,8 @@ def parse_channels_only(file_path): epg_obj.save() logger.debug(f"Channel <{tvg_id}> => EPGData.id={epg_obj.id}, created={created}") + parse_programs_for_tvg_id(file_path, tvg_id) + logger.info("Finished parsing channel info.") diff --git a/apps/m3u/api_views.py b/apps/m3u/api_views.py index df9c13fc..508b6a89 100644 --- a/apps/m3u/api_views.py +++ b/apps/m3u/api_views.py @@ -11,6 +11,7 @@ from django.core.cache import cache # Import all models, including UserAgent. from .models import M3UAccount, M3UFilter, ServerGroup, M3UAccountProfile from core.models import UserAgent +from apps.channels.models import ChannelGroupM3UAccount from core.serializers import UserAgentSerializer # Import all serializers, including the UserAgentSerializer. from .serializers import ( @@ -24,10 +25,43 @@ from .tasks import refresh_single_m3u_account, refresh_m3u_accounts class M3UAccountViewSet(viewsets.ModelViewSet): """Handles CRUD operations for M3U accounts""" - queryset = M3UAccount.objects.all() + queryset = M3UAccount.objects.prefetch_related('channel_group') serializer_class = M3UAccountSerializer permission_classes = [IsAuthenticated] + def update(self, request, *args, **kwargs): + # Get the M3UAccount instance we're updating + instance = self.get_object() + + # Handle updates to the 'enabled' flag of the related ChannelGroupM3UAccount instances + updates = request.data.get('channel_groups', []) + + for update_data in updates: + channel_group_id = update_data.get('channel_group') + enabled = update_data.get('enabled') + + try: + # Get the specific relationship to update + relationship = ChannelGroupM3UAccount.objects.get( + m3u_account=instance, channel_group_id=channel_group_id + ) + relationship.enabled = enabled + relationship.save() + except ChannelGroupM3UAccount.DoesNotExist: + return Response( + {"error": "ChannelGroupM3UAccount not found for the given M3UAccount and ChannelGroup."}, + status=status.HTTP_400_BAD_REQUEST + ) + + # After updating the ChannelGroupM3UAccount relationships, reload the M3UAccount instance + instance.refresh_from_db() + + refresh_single_m3u_account.delay(instance.id) + + # Serialize and return the updated M3UAccount data + serializer = self.get_serializer(instance) + return Response(serializer.data) + class M3UFilterViewSet(viewsets.ModelViewSet): """Handles CRUD operations for M3U filters""" queryset = M3UFilter.objects.all() diff --git a/apps/m3u/serializers.py b/apps/m3u/serializers.py index dbd635ef..3946aac5 100644 --- a/apps/m3u/serializers.py +++ b/apps/m3u/serializers.py @@ -1,8 +1,11 @@ from rest_framework import serializers from .models import M3UAccount, M3UFilter, ServerGroup, M3UAccountProfile from core.models import UserAgent -from apps.channels.models import ChannelGroup -from apps.channels.serializers import ChannelGroupM3UAccountSerializer +from apps.channels.models import ChannelGroup, ChannelGroupM3UAccount +from apps.channels.serializers import ChannelGroupM3UAccountSerializer, ChannelGroupSerializer +import logging + +logger = logging.getLogger(__name__) class M3UFilterSerializer(serializers.ModelSerializer): """Serializer for M3U Filters""" @@ -40,14 +43,74 @@ class M3UAccountSerializer(serializers.ModelSerializer): ) profiles = M3UAccountProfileSerializer(many=True, read_only=True) read_only_fields = ['locked'] + # channel_groups = serializers.SerializerMethodField() + channel_groups = ChannelGroupM3UAccountSerializer(source='channel_group.all', many=True, required=False) + class Meta: model = M3UAccount fields = [ 'id', 'name', 'server_url', 'uploaded_file', 'server_group', - 'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked' + 'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked', + 'channel_groups', ] + # def get_channel_groups(self, obj): + # # Retrieve related ChannelGroupM3UAccount records for this M3UAccount + # relations = ChannelGroupM3UAccount.objects.filter(m3u_account=obj).select_related('channel_group') + + # # Serialize the channel groups with their enabled status + # return [ + # { + # 'channel_group_name': relation.channel_group.name, + # 'channel_group_id': relation.channel_group.id, + # 'enabled': relation.enabled, + # } + # for relation in relations + # ] + + # def to_representation(self, instance): + # """Override the default to_representation method to include channel_groups""" + # representation = super().to_representation(instance) + + # # Manually add the channel_groups to the representation + # channel_groups = ChannelGroupM3UAccount.objects.filter(m3u_account=instance).select_related('channel_group') + # representation['channel_groups'] = [ + # { + # 'id': relation.id, + # 'channel_group_name': relation.channel_group.name, + # 'channel_group_id': relation.channel_group.id, + # 'enabled': relation.enabled, + # } + # for relation in channel_groups + # ] + + # return representation + + # def update(self, instance, validated_data): + # logger.info(validated_data) + # channel_groups_data = validated_data.pop('channel_groups', None) + # instance = super().update(instance, validated_data) + + # if channel_groups_data is not None: + # logger.info(json.dumps(channel_groups_data)) + # # Remove existing relationships not included in the request + # existing_groups = {cg.channel_group_id: cg for cg in instance.channel_group.all()} + + # # for group_id in set(existing_groups.keys()) - sent_group_ids: + # # existing_groups[group_id].delete() + + # # Create or update relationships + # for cg_data in channel_groups_data: + # logger.info(json.dumps(cg_data)) + # ChannelGroupM3UAccount.objects.update_or_create( + # channel_group=existing_groups[cg_data['channel_group_id']], + # m3u_account=instance, + # defaults={'enabled': cg_data.get('enabled', True)} + # ) + + # return instance + class ServerGroupSerializer(serializers.ModelSerializer): """Serializer for Server Group""" diff --git a/apps/m3u/signals.py b/apps/m3u/signals.py index 8b6179ca..c2abc4c8 100644 --- a/apps/m3u/signals.py +++ b/apps/m3u/signals.py @@ -2,7 +2,7 @@ from django.db.models.signals import post_save from django.dispatch import receiver from .models import M3UAccount -from .tasks import refresh_single_m3u_account +from .tasks import refresh_single_m3u_account, refresh_m3u_groups @receiver(post_save, sender=M3UAccount) def refresh_account_on_save(sender, instance, created, **kwargs): @@ -11,5 +11,5 @@ def refresh_account_on_save(sender, instance, created, **kwargs): call a Celery task that fetches & parses that single account if it is active or newly created. """ - if created or instance.is_active: - refresh_single_m3u_account.delay(instance.id) + if created: + refresh_m3u_groups(instance.id) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 5dd7c43d..e0995543 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -25,21 +25,31 @@ logger = logging.getLogger(__name__) LOCK_EXPIRE = 300 BATCH_SIZE = 1000 SKIP_EXTS = {} +m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u") + +def fetch_m3u_lines(account, use_cache=False): + os.makedirs(m3u_dir, exist_ok=True) + file_path = os.path.join(m3u_dir, f"{account.id}.m3u") -def fetch_m3u_lines(account): """Fetch M3U file lines efficiently.""" if account.server_url: - headers = {"User-Agent": account.user_agent.user_agent} - logger.info(f"Fetching from URL {account.server_url}") - try: - # Perform the HTTP request with stream and handle any potential issues - with requests.get(account.server_url, timeout=60, headers=headers, stream=True) as response: + if not use_cache or not os.path.exists(file_path): + headers = {"User-Agent": account.user_agent.user_agent} + logger.info(f"Fetching from URL {account.server_url}") + try: + response = requests.get(account.server_url, headers=headers, stream=True) response.raise_for_status() # This will raise an HTTPError if the status is not 200 - # Return an iterator for the lines - return response.iter_lines(decode_unicode=True) - except requests.exceptions.RequestException as e: - logger.error(f"Error fetching M3U from URL {account.server_url}: {e}") - return [] # Return an empty list in case of error + with open(file_path, 'wb') as file: + # Stream the content in chunks and write to the file + for chunk in response.iter_content(chunk_size=8192): # You can adjust the chunk size + if chunk: # Ensure chunk is not empty + file.write(chunk) + except requests.exceptions.RequestException as e: + logger.error(f"Error fetching M3U from URL {account.server_url}: {e}") + return [] # Return an empty list in case of error + + with open(file_path, 'r', encoding='utf-8') as f: + return f.readlines() elif account.uploaded_file: try: # Open the file and return the lines as a list or iterator @@ -137,6 +147,7 @@ def process_groups(account, group_names): groups = [] groups_to_create = [] for group_name in group_names: + logger.info(f"Handling group: {group_name}") if group_name in existing_groups: groups.append(existing_groups[group_name]) else: @@ -166,7 +177,10 @@ def process_groups(account, group_names): def process_m3u_batch(account_id, batch, group_names, hash_keys): """Processes a batch of M3U streams using bulk operations.""" account = M3UAccount.objects.get(id=account_id) - existing_groups = {group.name: group for group in ChannelGroup.objects.filter(name__in=group_names)} + existing_groups = {group.name: group for group in ChannelGroup.objects.filter( + m3u_account__m3u_account=account, # Filter by the M3UAccount + m3u_account__enabled=True # Filter by the enabled flag in the join table + )} streams_to_create = [] streams_to_update = [] @@ -174,12 +188,17 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): # compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters] - logger.info(f"Processing batch of {len(batch)}") + logger.debug(f"Processing batch of {len(batch)}") for stream_info in batch: name, url = stream_info["name"], stream_info["url"] tvg_id, tvg_logo = stream_info["attributes"].get("tvg-id", ""), stream_info["attributes"].get("tvg-logo", "") group_title = stream_info["attributes"].get("group-title", "Default Group") + # Filter out disabled groups for this account + if group_title not in existing_groups: + logger.debug(f"Skipping stream in disabled group: {group_title}") + continue + # if any(url.lower().endswith(ext) for ext in SKIP_EXTS) or len(url) > 2000: # continue @@ -250,8 +269,53 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): return f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." +def refresh_m3u_groups(account_id): + if not acquire_lock('refresh_m3u_account_groups', account_id): + return f"Task already running for account_id={account_id}." + + # Record start time + start_time = time.time() + send_progress_update(0, account_id) + + try: + account = M3UAccount.objects.get(id=account_id, is_active=True) + except M3UAccount.DoesNotExist: + release_lock('refresh_m3u_account_groups', account_id) + return f"M3UAccount with ID={account_id} not found or inactive." + + lines = fetch_m3u_lines(account) + extinf_data = [] + groups = set(["Default Group"]) + + for line in lines: + line = line.strip() + if line.startswith("#EXTINF"): + parsed = parse_extinf_line(line) + if parsed: + if "group-title" in parsed["attributes"]: + groups.add(parsed["attributes"]["group-title"]) + + extinf_data.append(parsed) + elif extinf_data and line.startswith("http"): + # Associate URL with the last EXTINF line + extinf_data[-1]["url"] = line + + groups = list(groups) + cache_path = os.path.join(m3u_dir, f"{account_id}.json") + with open(cache_path, 'w', encoding='utf-8') as f: + json.dump({ + "extinf_data": extinf_data, + "groups": groups, + }, f) + + process_groups(account, groups) + + release_lock('refresh_m3u_account_groups`', account_id) + + return extinf_data, groups + @shared_task -def refresh_single_m3u_account(account_id): +def refresh_single_m3u_account(account_id, use_cache=False): """Splits M3U processing into chunks and dispatches them as parallel tasks.""" if not acquire_lock('refresh_single_m3u_account', account_id): return f"Task already running for account_id={account_id}." @@ -269,47 +333,19 @@ def refresh_single_m3u_account(account_id): # Fetch M3U lines and handle potential issues # lines = fetch_m3u_lines(account) # Extracted fetch logic into separate function - - lines = [] - if account.server_url: - if not account.user_agent: - err_msg = f"User-Agent not provided for account id {account_id}." - logger.error(err_msg) - release_lock('refresh_single_m3u_account', account_id) - return err_msg - - headers = {"User-Agent": account.user_agent.user_agent} - response = requests.get(account.server_url, headers=headers) - response.raise_for_status() - lines = response.text.splitlines() - elif account.uploaded_file: - file_path = account.uploaded_file.path - with open(file_path, 'r', encoding='utf-8') as f: - lines = f.read().splitlines() - extinf_data = [] - stream_hashes = [] - groups = set("Default Group") + groups = None - for line in lines: - line = line.strip() - if line.startswith("#EXTINF"): - parsed = parse_extinf_line(line) - if parsed: - groups.add(parsed["attributes"].get("group-title", "Default Group")) - extinf_data.append(parsed) - elif extinf_data and line.startswith("http"): - # Associate URL with the last EXTINF line - extinf_data[-1]["url"] = line + cache_path = os.path.join(m3u_dir, f"{account_id}.json") + if use_cache and os.path.exists(cache_path): + with open(cache_path, 'r') as file: + data = json.load(file) + + extinf_data = data['extinf_data'] + groups = data['groups'] if not extinf_data: - release_lock('refresh_single_m3u_account', account_id) - return "No valid EXTINF data found." - - groups = list(groups) - # Retrieve all unique groups so we can create / associate them before - # processing the streams themselves - process_groups(account, groups) + extinf_data, groups = refresh_m3u_groups(account_id) hash_keys = CoreSettings.get_m3u_hash_key().split(",") diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index aa2e7ffc..c539532e 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -715,8 +715,14 @@ class ProxyServer: try: # Send worker heartbeat first if self.redis_client: - worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat" - self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) + while True: + try: + worker_heartbeat_key = f"ts_proxy:worker:{self.worker_id}:heartbeat" + self.redis_client.setex(worker_heartbeat_key, 30, str(time.time())) + break + except: + logger.debug("Waiting for redis connection...") + time.sleep(1) # Refresh channel registry self.refresh_channel_registry() diff --git a/core/models.py b/core/models.py index 29acb530..a4fa92d4 100644 --- a/core/models.py +++ b/core/models.py @@ -67,7 +67,7 @@ class StreamProfile(models.Model): def save(self, *args, **kwargs): if self.pk: # Only check existing records orig = StreamProfile.objects.get(pk=self.pk) - if orig.is_protected: + if orig.locked: allowed_fields = {"user_agent_id"} # Only allow this field to change for field in self._meta.fields: field_name = field.name @@ -91,7 +91,7 @@ class StreamProfile(models.Model): def update(cls, pk, **kwargs): instance = cls.objects.get(pk=pk) - if instance.is_protected: + if instance.locked: allowed_fields = {"user_agent_id"} # Only allow updating this field for field_name, new_value in kwargs.items(): diff --git a/frontend/src/components/FloatingVideo.jsx b/frontend/src/components/FloatingVideo.jsx index 1c0c1400..62e53e3e 100644 --- a/frontend/src/components/FloatingVideo.jsx +++ b/frontend/src/components/FloatingVideo.jsx @@ -3,6 +3,8 @@ import React, { useEffect, useRef } from 'react'; import Draggable from 'react-draggable'; import useVideoStore from '../store/useVideoStore'; import mpegts from 'mpegts.js'; +import { ActionIcon, Flex } from '@mantine/core'; +import { SquareX } from 'lucide-react'; export default function FloatingVideo() { const { isVisible, streamUrl, hideVideo } = useVideoStore(); @@ -65,28 +67,11 @@ export default function FloatingVideo() { }} > {/* Simple header row with a close button */} -
- -
+ + + + + {/* The