From 17a8e94f643baee716db03056d44768bbe163e6f Mon Sep 17 00:00:00 2001 From: dekzter Date: Sun, 30 Mar 2025 11:53:26 -0400 Subject: [PATCH] django beat, refresh intervals for epg and m3u --- apps/channels/tasks.py | 13 +- ...refresh_interval_epgsource_refresh_task.py | 25 ++ .../0007_populate_periodic_tasks.py | 52 ++++ apps/epg/models.py | 5 + apps/epg/serializers.py | 2 +- apps/epg/signals.py | 49 +++- apps/epg/tasks.py | 35 ++- .../0005_m3uaccount_custom_properties.py | 18 -- ...5_m3uaccount_custom_properties_and_more.py | 30 +++ .../0006_populate_periodic_tasks.py | 52 ++++ apps/m3u/models.py | 5 + apps/m3u/serializers.py | 58 +---- apps/m3u/signals.py | 49 +++- apps/m3u/tasks.py | 37 ++- frontend/src/components/forms/Channel.jsx | 7 +- frontend/src/components/forms/EPG.jsx | 13 + frontend/src/components/forms/M3U.jsx | 236 ++++++++++-------- requirements.txt | 1 + 18 files changed, 466 insertions(+), 221 deletions(-) create mode 100644 apps/epg/migrations/0006_epgsource_refresh_interval_epgsource_refresh_task.py create mode 100644 apps/epg/migrations/0007_populate_periodic_tasks.py delete mode 100644 apps/m3u/migrations/0005_m3uaccount_custom_properties.py create mode 100644 apps/m3u/migrations/0005_m3uaccount_custom_properties_and_more.py create mode 100644 apps/m3u/migrations/0006_populate_periodic_tasks.py diff --git a/apps/channels/tasks.py b/apps/channels/tasks.py index 4726969e..d5cdb2c8 100644 --- a/apps/channels/tasks.py +++ b/apps/channels/tasks.py @@ -104,26 +104,29 @@ def match_epg_channels(): ) matched_channels = [] + channels_to_update = [] source = EPGSource.objects.filter(is_active=True).first() epg_file_path = getattr(source, 'file_path', None) if source else None with transaction.atomic(): for chan in Channel.objects.all(): - - # A) Skip if channel.tvg_id is already valid - if chan.tvg_id and EPGData.objects.filter(tvg_id=chan.tvg_id).exists(): + # skip if channel already assigned an EPG + if chan.epg_data: continue - # B) If channel has a tvg_id that doesn't exist in EPGData, do direct check + # If channel has a tvg_id that doesn't exist in EPGData, do direct check. + # I don't THINK this should happen now that we assign EPG on channel creation. if chan.tvg_id: epg_match = EPGData.objects.filter(tvg_id=chan.tvg_id).first() if epg_match: + chan.epg_data = epg_match logger.info(f"Channel {chan.id} '{chan.name}' => EPG found by tvg_id={chan.tvg_id}") + channels_to_update.append(chan) continue # C) Perform name-based fuzzy matching - fallback_name = chan.epg_data.name.strip() if chan.epg_data else chan.name + fallback_name = chan.tvg_id.strip() if chan.tvg_id else chan.name norm_chan = normalize_name(fallback_name) if not norm_chan: logger.info(f"Channel {chan.id} '{chan.name}' => empty after normalization, skipping") diff --git a/apps/epg/migrations/0006_epgsource_refresh_interval_epgsource_refresh_task.py b/apps/epg/migrations/0006_epgsource_refresh_interval_epgsource_refresh_task.py new file mode 100644 index 00000000..10cf17ba --- /dev/null +++ b/apps/epg/migrations/0006_epgsource_refresh_interval_epgsource_refresh_task.py @@ -0,0 +1,25 @@ +# Generated by Django 5.1.6 on 2025-03-29 17:31 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_beat', '0019_alter_periodictasks_options'), + ('epg', '0005_programdata_custom_properties_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='epgsource', + name='refresh_interval', + field=models.IntegerField(default=24), + ), + migrations.AddField( + model_name='epgsource', + name='refresh_task', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='django_celery_beat.periodictask'), + ), + ] diff --git a/apps/epg/migrations/0007_populate_periodic_tasks.py b/apps/epg/migrations/0007_populate_periodic_tasks.py new file mode 100644 index 00000000..edca8db9 --- /dev/null +++ b/apps/epg/migrations/0007_populate_periodic_tasks.py @@ -0,0 +1,52 @@ +from django.db import migrations +import json + +def create_default_refresh_tasks(apps, schema_editor): + """ + Creates a PeriodicTask for each existing EPGSource that doesn't have one. + """ + IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + EPGSource = apps.get_model("epg", "EPGSource") + + default_interval, _ = IntervalSchedule.objects.get_or_create( + every=24, + period="hours", + ) + + for account in EPGSource.objects.all(): + if account.refresh_task: + continue + + task_name = f"epg_source-refresh-{account.id}" + + refresh_task = PeriodicTask.objects.create( + name=task_name, + interval=default_interval, + task="apps.epg.tasks.refresh_epg_data", + kwargs=json.dumps({"account_id": account.id}), + ) + + account.refresh_task = refresh_task + account.save(update_fields=["refresh_task"]) + +def reverse_migration(apps, schema_editor): + IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + EPGSource = apps.get_model("epg", "EPGSource") + + for account in EPGSource.objects.all(): + IntervalSchedule.objects.all().delete() + PeriodicTask.objects.all().delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ("epg", "0006_epgsource_refresh_interval_epgsource_refresh_task"), + ("django_celery_beat", "0019_alter_periodictasks_options"), + ] + + operations = [ + migrations.RunPython(create_default_refresh_tasks, reverse_migration), + ] diff --git a/apps/epg/models.py b/apps/epg/models.py index c98e2d2b..3f9b018d 100644 --- a/apps/epg/models.py +++ b/apps/epg/models.py @@ -1,5 +1,6 @@ from django.db import models from django.utils import timezone +from django_celery_beat.models import PeriodicTask class EPGSource(models.Model): SOURCE_TYPE_CHOICES = [ @@ -12,6 +13,10 @@ class EPGSource(models.Model): api_key = models.CharField(max_length=255, blank=True, null=True) # For Schedules Direct is_active = models.BooleanField(default=True) file_path = models.CharField(max_length=1024, blank=True, null=True) + refresh_interval = models.IntegerField(default=24) + refresh_task = models.ForeignKey( + PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True + ) def __str__(self): return self.name diff --git a/apps/epg/serializers.py b/apps/epg/serializers.py index 8ea4100d..e4a2a4b3 100644 --- a/apps/epg/serializers.py +++ b/apps/epg/serializers.py @@ -7,7 +7,7 @@ class EPGSourceSerializer(serializers.ModelSerializer): class Meta: model = EPGSource - fields = ['id', 'name', 'source_type', 'url', 'api_key', 'is_active', 'epg_data_ids'] + fields = ['id', 'name', 'source_type', 'url', 'api_key', 'is_active', 'epg_data_ids', 'refresh_interval'] def get_epg_data_ids(self, obj): return list(obj.epgs.values_list('id', flat=True)) diff --git a/apps/epg/signals.py b/apps/epg/signals.py index c8f4a62c..148207e5 100644 --- a/apps/epg/signals.py +++ b/apps/epg/signals.py @@ -1,10 +1,57 @@ -from django.db.models.signals import post_save +from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from .models import EPGSource from .tasks import refresh_epg_data +from django_celery_beat.models import PeriodicTask, IntervalSchedule +import json @receiver(post_save, sender=EPGSource) def trigger_refresh_on_new_epg_source(sender, instance, created, **kwargs): # Trigger refresh only if the source is newly created and active if created and instance.is_active: refresh_epg_data.delay() + +@receiver(post_save, sender=EPGSource) +def create_or_update_refresh_task(sender, instance, **kwargs): + """ + Create or update a Celery Beat periodic task when an EPGSource is created/updated. + """ + task_name = f"epg_source-refresh-{instance.id}" + + interval, _ = IntervalSchedule.objects.get_or_create( + every=24, + period=IntervalSchedule.HOURS + ) + + if not instance.refresh_task: + refresh_task = PeriodicTask.objects.create( + name=task_name, + interval=interval, + task="apps.epg.tasks.refresh_epg_data", + kwargs=json.dumps({"source_id": instance.id}), + enabled=instance.refresh_interval != 0, + ) + EPGSource.objects.filter(id=instance.id).update(refresh_task=refresh_task) + else: + task = instance.refresh_task + updated_fields = [] + + if task.enabled != (instance.refresh_interval != 0): + task.enabled = instance.refresh_interval != 0 + updated_fields.append("enabled") + + if task.interval != interval: + task.interval = interval + updated_fields.append("interval") + + if updated_fields: + task.save(update_fields=updated_fields) + +@receiver(post_delete, sender=EPGSource) +def delete_refresh_task(sender, instance, **kwargs): + """ + Delete the associated Celery Beat periodic task when a Channel is deleted. + """ + if instance.refresh_task: + instance.refresh_task.interval.delete() + instance.refresh_task.delete() diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index dba80de9..409e551e 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -18,28 +18,39 @@ from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from .models import EPGSource, EPGData, ProgramData +from core.utils import acquire_task_lock, release_task_lock logger = logging.getLogger(__name__) @shared_task -def refresh_epg_data(): +def refresh_all_epg_data(): logger.info("Starting refresh_epg_data task.") active_sources = EPGSource.objects.filter(is_active=True) logger.debug(f"Found {active_sources.count()} active EPGSource(s).") for source in active_sources: - logger.info(f"Processing EPGSource: {source.name} (type: {source.source_type})") - if source.source_type == 'xmltv': - fetch_xmltv(source) - parse_channels_only(source) - parse_programs_for_source(source) - elif source.source_type == 'schedules_direct': - fetch_schedules_direct(source) + refresh_epg_data(source.id) logger.info("Finished refresh_epg_data task.") return "EPG data refreshed." +@shared_task +def refresh_epg_data(source_id): + if not acquire_task_lock('refresh_epg_data', source_id): + logger.debug(f"EPG refresh for {source_id} already running") + return + + source = EPGSource.objects.get(id=source_id) + logger.info(f"Processing EPGSource: {source.name} (type: {source.source_type})") + if source.source_type == 'xmltv': + fetch_xmltv(source) + parse_channels_only(source) + parse_programs_for_source(source) + elif source.source_type == 'schedules_direct': + fetch_schedules_direct(source) + + release_task_lock('refresh_epg_data', source_id) def fetch_xmltv(source): logger.info(f"Fetching XMLTV data from source: {source.name}") @@ -128,11 +139,16 @@ def parse_channels_only(source): @shared_task def parse_programs_for_tvg_id(epg_id): + if not acquire_task_lock('parse_epg_programs', epg_id): + logger.debug(f"Program parse for {epg_id} already in progress") + return + epg = EPGData.objects.get(id=epg_id) epg_source = epg.epg_source if not Channel.objects.filter(epg_data=epg).exists(): logger.info(f"No channels matched to EPG {epg.tvg_id}") + release_task_lock('parse_epg_programs', epg_id) return logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}") @@ -173,6 +189,9 @@ def parse_programs_for_tvg_id(epg_id): )) ProgramData.objects.bulk_create(programs_to_create) + + release_task_lock('parse_epg_programs', epg_id) + logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") def parse_programs_for_source(epg_source, tvg_id=None): diff --git a/apps/m3u/migrations/0005_m3uaccount_custom_properties.py b/apps/m3u/migrations/0005_m3uaccount_custom_properties.py deleted file mode 100644 index 587a3496..00000000 --- a/apps/m3u/migrations/0005_m3uaccount_custom_properties.py +++ /dev/null @@ -1,18 +0,0 @@ -# Generated by Django 5.1.6 on 2025-03-27 17:01 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('m3u', '0004_m3uaccount_stream_profile'), - ] - - operations = [ - migrations.AddField( - model_name='m3uaccount', - name='custom_properties', - field=models.TextField(blank=True, null=True), - ), - ] diff --git a/apps/m3u/migrations/0005_m3uaccount_custom_properties_and_more.py b/apps/m3u/migrations/0005_m3uaccount_custom_properties_and_more.py new file mode 100644 index 00000000..3728bf7f --- /dev/null +++ b/apps/m3u/migrations/0005_m3uaccount_custom_properties_and_more.py @@ -0,0 +1,30 @@ +# Generated by Django 5.1.6 on 2025-03-29 13:44 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_beat', '0019_alter_periodictasks_options'), + ('m3u', '0004_m3uaccount_stream_profile'), + ] + + operations = [ + migrations.AddField( + model_name='m3uaccount', + name='custom_properties', + field=models.TextField(blank=True, null=True), + ), + migrations.AddField( + model_name='m3uaccount', + name='refresh_interval', + field=models.IntegerField(default=24), + ), + migrations.AddField( + model_name='m3uaccount', + name='refresh_task', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='django_celery_beat.periodictask'), + ), + ] diff --git a/apps/m3u/migrations/0006_populate_periodic_tasks.py b/apps/m3u/migrations/0006_populate_periodic_tasks.py new file mode 100644 index 00000000..bc387979 --- /dev/null +++ b/apps/m3u/migrations/0006_populate_periodic_tasks.py @@ -0,0 +1,52 @@ +from django.db import migrations +import json + +def create_default_refresh_tasks(apps, schema_editor): + """ + Creates a PeriodicTask for each existing M3UAccount that doesn't have one. + """ + IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + M3UAccount = apps.get_model("m3u", "M3UAccount") + + default_interval, _ = IntervalSchedule.objects.get_or_create( + every=24, + period="hours", + ) + + for account in M3UAccount.objects.all(): + if account.refresh_task: + continue + + task_name = f"m3u_account-refresh-{account.id}" + + refresh_task = PeriodicTask.objects.create( + name=task_name, + interval=default_interval, + task="apps.m3u.tasks.refresh_single_m3u_account", + kwargs=json.dumps({"account_id": account.id}), + ) + + account.refresh_task = refresh_task + account.save(update_fields=["refresh_task"]) + +def reverse_migration(apps, schema_editor): + IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + M3UAccount = apps.get_model("m3u", "M3UAccount") + + for account in M3UAccount.objects.all(): + IntervalSchedule.objects.filter(name=f"m3u_account-refresh-interval-{account.id}").delete() + PeriodicTask.objects.filter(name=f"m3u_account-refresh-{account.id}").delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ("m3u", "0005_m3uaccount_custom_properties_and_more"), + ("django_celery_beat", "0019_alter_periodictasks_options"), + ] + + operations = [ + migrations.RunPython(create_default_refresh_tasks, reverse_migration), + ] diff --git a/apps/m3u/models.py b/apps/m3u/models.py index 18e21e0b..e324e690 100644 --- a/apps/m3u/models.py +++ b/apps/m3u/models.py @@ -4,6 +4,7 @@ from core.models import UserAgent import re from django.dispatch import receiver from apps.channels.models import StreamProfile +from django_celery_beat.models import PeriodicTask CUSTOM_M3U_ACCOUNT_NAME="custom" @@ -68,6 +69,10 @@ class M3UAccount(models.Model): related_name='m3u_accounts' ) custom_properties = models.TextField(null=True, blank=True) + refresh_interval = models.IntegerField(default=24) + refresh_task = models.ForeignKey( + PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True + ) def __str__(self): return self.name diff --git a/apps/m3u/serializers.py b/apps/m3u/serializers.py index 3946aac5..c6b1ce29 100644 --- a/apps/m3u/serializers.py +++ b/apps/m3u/serializers.py @@ -52,65 +52,9 @@ class M3UAccountSerializer(serializers.ModelSerializer): fields = [ 'id', 'name', 'server_url', 'uploaded_file', 'server_group', 'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked', - 'channel_groups', + 'channel_groups', 'refresh_interval' ] - # def get_channel_groups(self, obj): - # # Retrieve related ChannelGroupM3UAccount records for this M3UAccount - # relations = ChannelGroupM3UAccount.objects.filter(m3u_account=obj).select_related('channel_group') - - # # Serialize the channel groups with their enabled status - # return [ - # { - # 'channel_group_name': relation.channel_group.name, - # 'channel_group_id': relation.channel_group.id, - # 'enabled': relation.enabled, - # } - # for relation in relations - # ] - - # def to_representation(self, instance): - # """Override the default to_representation method to include channel_groups""" - # representation = super().to_representation(instance) - - # # Manually add the channel_groups to the representation - # channel_groups = ChannelGroupM3UAccount.objects.filter(m3u_account=instance).select_related('channel_group') - # representation['channel_groups'] = [ - # { - # 'id': relation.id, - # 'channel_group_name': relation.channel_group.name, - # 'channel_group_id': relation.channel_group.id, - # 'enabled': relation.enabled, - # } - # for relation in channel_groups - # ] - - # return representation - - # def update(self, instance, validated_data): - # logger.info(validated_data) - # channel_groups_data = validated_data.pop('channel_groups', None) - # instance = super().update(instance, validated_data) - - # if channel_groups_data is not None: - # logger.info(json.dumps(channel_groups_data)) - # # Remove existing relationships not included in the request - # existing_groups = {cg.channel_group_id: cg for cg in instance.channel_group.all()} - - # # for group_id in set(existing_groups.keys()) - sent_group_ids: - # # existing_groups[group_id].delete() - - # # Create or update relationships - # for cg_data in channel_groups_data: - # logger.info(json.dumps(cg_data)) - # ChannelGroupM3UAccount.objects.update_or_create( - # channel_group=existing_groups[cg_data['channel_group_id']], - # m3u_account=instance, - # defaults={'enabled': cg_data.get('enabled', True)} - # ) - - # return instance - class ServerGroupSerializer(serializers.ModelSerializer): """Serializer for Server Group""" diff --git a/apps/m3u/signals.py b/apps/m3u/signals.py index c07c1c4c..4e27370e 100644 --- a/apps/m3u/signals.py +++ b/apps/m3u/signals.py @@ -1,8 +1,10 @@ # apps/m3u/signals.py -from django.db.models.signals import post_save +from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from .models import M3UAccount from .tasks import refresh_single_m3u_account, refresh_m3u_groups +from django_celery_beat.models import PeriodicTask, IntervalSchedule +import json @receiver(post_save, sender=M3UAccount) def refresh_account_on_save(sender, instance, created, **kwargs): @@ -13,3 +15,48 @@ def refresh_account_on_save(sender, instance, created, **kwargs): """ if created: refresh_single_m3u_account.delay(instance.id) + +@receiver(post_save, sender=M3UAccount) +def create_or_update_refresh_task(sender, instance, **kwargs): + """ + Create or update a Celery Beat periodic task when an M3UAccount is created/updated. + """ + task_name = f"m3u_account-refresh-{instance.id}" + + interval, _ = IntervalSchedule.objects.get_or_create( + every=24, + period=IntervalSchedule.HOURS + ) + + if not instance.refresh_task: + refresh_task = PeriodicTask.objects.create( + name=task_name, + interval=interval, + task="apps.m3u.tasks.refresh_single_m3u_account", + kwargs=json.dumps({"account_id": instance.id}), + enabled=instance.refresh_interval != 0, + ) + M3UAccount.objects.filter(id=instance.id).update(refresh_task=refresh_task) + else: + task = instance.refresh_task + updated_fields = [] + + if task.enabled != (instance.refresh_interval != 0): + task.enabled = instance.refresh_interval != 0 + updated_fields.append("enabled") + + if task.interval != interval: + task.interval = interval + updated_fields.append("interval") + + if updated_fields: + task.save(update_fields=updated_fields) + +@receiver(post_delete, sender=M3UAccount) +def delete_refresh_task(sender, instance, **kwargs): + """ + Delete the associated Celery Beat periodic task when a Channel is deleted. + """ + if instance.refresh_task: + instance.refresh_task.interval.delete() + instance.refresh_task.delete() diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 605cd7fe..9646828e 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -16,13 +16,12 @@ from channels.layers import get_channel_layer from django.utils import timezone import time import json -from core.utils import redis_client +from core.utils import redis_client, acquire_task_lock, release_task_lock from core.models import CoreSettings from asgiref.sync import async_to_sync logger = logging.getLogger(__name__) -LOCK_EXPIRE = 300 BATCH_SIZE = 1000 SKIP_EXTS = {} m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u") @@ -104,19 +103,6 @@ def _matches_filters(stream_name: str, group_name: str, filters): return exclude return False -def acquire_lock(task_name, account_id): - """Acquire a lock to prevent concurrent task execution.""" - lock_id = f"task_lock_{task_name}_{account_id}" - lock_acquired = cache.add(lock_id, "locked", timeout=LOCK_EXPIRE) - if not lock_acquired: - logger.warning(f"Lock for {task_name} and account_id={account_id} already acquired. Task will not proceed.") - return lock_acquired - -def release_lock(task_name, account_id): - """Release the lock after task execution.""" - lock_id = f"task_lock_{task_name}_{account_id}" - cache.delete(lock_id) - @shared_task def refresh_m3u_accounts(): """Queue background parse for all active M3UAccounts.""" @@ -289,18 +275,19 @@ def cleanup_streams(account_id): logger.info(f"Cleanup complete") def refresh_m3u_groups(account_id): - if not acquire_lock('refresh_m3u_account_groups', account_id): + if not acquire_task_lock('refresh_m3u_account_groups', account_id): return f"Task already running for account_id={account_id}.", None # Record start time start_time = time.time() - send_progress_update(0, account_id) try: account = M3UAccount.objects.get(id=account_id, is_active=True) except M3UAccount.DoesNotExist: - release_lock('refresh_m3u_account_groups', account_id) - return f"M3UAccount with ID={account_id} not found or inactive." + release_task_lock('refresh_m3u_account_groups', account_id) + return f"M3UAccount with ID={account_id} not found or inactive.", None + + send_progress_update(0, account_id) lines = fetch_m3u_lines(account) extinf_data = [] @@ -329,14 +316,14 @@ def refresh_m3u_groups(account_id): process_groups(account, groups) - release_lock('refresh_m3u_account_groups`', account_id) + release_task_lock('refresh_m3u_account_groups', account_id) return extinf_data, groups @shared_task def refresh_single_m3u_account(account_id, use_cache=False): """Splits M3U processing into chunks and dispatches them as parallel tasks.""" - if not acquire_lock('refresh_single_m3u_account', account_id): + if not acquire_task_lock('refresh_single_m3u_account', account_id): return f"Task already running for account_id={account_id}." # Record start time @@ -347,7 +334,7 @@ def refresh_single_m3u_account(account_id, use_cache=False): account = M3UAccount.objects.get(id=account_id, is_active=True) filters = list(account.filters.all()) except M3UAccount.DoesNotExist: - release_lock('refresh_single_m3u_account', account_id) + release_task_lock('refresh_single_m3u_account', account_id) return f"M3UAccount with ID={account_id} not found or inactive." # Fetch M3U lines and handle potential issues @@ -366,7 +353,11 @@ def refresh_single_m3u_account(account_id, use_cache=False): if not extinf_data: try: extinf_data, groups = refresh_m3u_groups(account_id) + if not extinf_data or not groups: + release_task_lock('refresh_single_m3u_account', account_id) + return "Failed to update m3u account, task may already be running" except: + release_task_lock('refresh_single_m3u_account', account_id) return "Failed to update m3u account" hash_keys = CoreSettings.get_m3u_hash_key().split(",") @@ -415,7 +406,7 @@ def refresh_single_m3u_account(account_id, use_cache=False): print(f"Function took {elapsed_time} seconds to execute.") - release_lock('refresh_single_m3u_account', account_id) + release_task_lock('refresh_single_m3u_account', account_id) cursor = 0 while True: diff --git a/frontend/src/components/forms/Channel.jsx b/frontend/src/components/forms/Channel.jsx index 02f254e3..ff0745f2 100644 --- a/frontend/src/components/forms/Channel.jsx +++ b/frontend/src/components/forms/Channel.jsx @@ -472,7 +472,12 @@ const Channel = ({ channel = null, isOpen, onClose }) => { + EPG + + + } readOnly value={ formik.values.epg_data_id diff --git a/frontend/src/components/forms/EPG.jsx b/frontend/src/components/forms/EPG.jsx index 4fd46bcb..639dd0f3 100644 --- a/frontend/src/components/forms/EPG.jsx +++ b/frontend/src/components/forms/EPG.jsx @@ -34,6 +34,7 @@ const EPG = ({ epg = null, isOpen, onClose }) => { url: '', api_key: '', is_active: true, + refresh_interval: 24, }, validationSchema: Yup.object({ name: Yup.string().required('Name is required'), @@ -64,6 +65,7 @@ const EPG = ({ epg = null, isOpen, onClose }) => { url: epg.url, api_key: epg.api_key, is_active: epg.is_active, + refresh_interval: epg.refresh_interval, }); } else { formik.resetForm(); @@ -125,6 +127,17 @@ const EPG = ({ epg = null, isOpen, onClose }) => { ]} /> + + - - - )} - - + + formik.setFieldValue('is_active', e.target.checked) + } + /> + + + + {playlist && ( <> - setProfileModalOpen(false)} - /> - + + )} - - + + + + {playlist && ( + <> + setProfileModalOpen(false)} + /> + + + )} + ); }; diff --git a/requirements.txt b/requirements.txt index 53b6e023..716c64be 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,4 @@ channels channels-redis daphne django-filter +django-celery-beat