forked from Mirrors/Dispatcharr
django beat, refresh intervals for epg and m3u
This commit is contained in:
parent
905435c04e
commit
17a8e94f64
18 changed files with 466 additions and 221 deletions
|
|
@ -104,26 +104,29 @@ def match_epg_channels():
|
|||
)
|
||||
|
||||
matched_channels = []
|
||||
channels_to_update = []
|
||||
|
||||
source = EPGSource.objects.filter(is_active=True).first()
|
||||
epg_file_path = getattr(source, 'file_path', None) if source else None
|
||||
|
||||
with transaction.atomic():
|
||||
for chan in Channel.objects.all():
|
||||
|
||||
# A) Skip if channel.tvg_id is already valid
|
||||
if chan.tvg_id and EPGData.objects.filter(tvg_id=chan.tvg_id).exists():
|
||||
# skip if channel already assigned an EPG
|
||||
if chan.epg_data:
|
||||
continue
|
||||
|
||||
# B) If channel has a tvg_id that doesn't exist in EPGData, do direct check
|
||||
# If channel has a tvg_id that doesn't exist in EPGData, do direct check.
|
||||
# I don't THINK this should happen now that we assign EPG on channel creation.
|
||||
if chan.tvg_id:
|
||||
epg_match = EPGData.objects.filter(tvg_id=chan.tvg_id).first()
|
||||
if epg_match:
|
||||
chan.epg_data = epg_match
|
||||
logger.info(f"Channel {chan.id} '{chan.name}' => EPG found by tvg_id={chan.tvg_id}")
|
||||
channels_to_update.append(chan)
|
||||
continue
|
||||
|
||||
# C) Perform name-based fuzzy matching
|
||||
fallback_name = chan.epg_data.name.strip() if chan.epg_data else chan.name
|
||||
fallback_name = chan.tvg_id.strip() if chan.tvg_id else chan.name
|
||||
norm_chan = normalize_name(fallback_name)
|
||||
if not norm_chan:
|
||||
logger.info(f"Channel {chan.id} '{chan.name}' => empty after normalization, skipping")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,25 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-29 17:31
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('django_celery_beat', '0019_alter_periodictasks_options'),
|
||||
('epg', '0005_programdata_custom_properties_and_more'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='epgsource',
|
||||
name='refresh_interval',
|
||||
field=models.IntegerField(default=24),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='epgsource',
|
||||
name='refresh_task',
|
||||
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='django_celery_beat.periodictask'),
|
||||
),
|
||||
]
|
||||
52
apps/epg/migrations/0007_populate_periodic_tasks.py
Normal file
52
apps/epg/migrations/0007_populate_periodic_tasks.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
from django.db import migrations
|
||||
import json
|
||||
|
||||
def create_default_refresh_tasks(apps, schema_editor):
|
||||
"""
|
||||
Creates a PeriodicTask for each existing EPGSource that doesn't have one.
|
||||
"""
|
||||
IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule")
|
||||
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
|
||||
EPGSource = apps.get_model("epg", "EPGSource")
|
||||
|
||||
default_interval, _ = IntervalSchedule.objects.get_or_create(
|
||||
every=24,
|
||||
period="hours",
|
||||
)
|
||||
|
||||
for account in EPGSource.objects.all():
|
||||
if account.refresh_task:
|
||||
continue
|
||||
|
||||
task_name = f"epg_source-refresh-{account.id}"
|
||||
|
||||
refresh_task = PeriodicTask.objects.create(
|
||||
name=task_name,
|
||||
interval=default_interval,
|
||||
task="apps.epg.tasks.refresh_epg_data",
|
||||
kwargs=json.dumps({"account_id": account.id}),
|
||||
)
|
||||
|
||||
account.refresh_task = refresh_task
|
||||
account.save(update_fields=["refresh_task"])
|
||||
|
||||
def reverse_migration(apps, schema_editor):
|
||||
IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule")
|
||||
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
|
||||
EPGSource = apps.get_model("epg", "EPGSource")
|
||||
|
||||
for account in EPGSource.objects.all():
|
||||
IntervalSchedule.objects.all().delete()
|
||||
PeriodicTask.objects.all().delete()
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("epg", "0006_epgsource_refresh_interval_epgsource_refresh_task"),
|
||||
("django_celery_beat", "0019_alter_periodictasks_options"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(create_default_refresh_tasks, reverse_migration),
|
||||
]
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
from django.db import models
|
||||
from django.utils import timezone
|
||||
from django_celery_beat.models import PeriodicTask
|
||||
|
||||
class EPGSource(models.Model):
|
||||
SOURCE_TYPE_CHOICES = [
|
||||
|
|
@ -12,6 +13,10 @@ class EPGSource(models.Model):
|
|||
api_key = models.CharField(max_length=255, blank=True, null=True) # For Schedules Direct
|
||||
is_active = models.BooleanField(default=True)
|
||||
file_path = models.CharField(max_length=1024, blank=True, null=True)
|
||||
refresh_interval = models.IntegerField(default=24)
|
||||
refresh_task = models.ForeignKey(
|
||||
PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ class EPGSourceSerializer(serializers.ModelSerializer):
|
|||
|
||||
class Meta:
|
||||
model = EPGSource
|
||||
fields = ['id', 'name', 'source_type', 'url', 'api_key', 'is_active', 'epg_data_ids']
|
||||
fields = ['id', 'name', 'source_type', 'url', 'api_key', 'is_active', 'epg_data_ids', 'refresh_interval']
|
||||
|
||||
def get_epg_data_ids(self, obj):
|
||||
return list(obj.epgs.values_list('id', flat=True))
|
||||
|
|
|
|||
|
|
@ -1,10 +1,57 @@
|
|||
from django.db.models.signals import post_save
|
||||
from django.db.models.signals import post_save, post_delete
|
||||
from django.dispatch import receiver
|
||||
from .models import EPGSource
|
||||
from .tasks import refresh_epg_data
|
||||
from django_celery_beat.models import PeriodicTask, IntervalSchedule
|
||||
import json
|
||||
|
||||
@receiver(post_save, sender=EPGSource)
|
||||
def trigger_refresh_on_new_epg_source(sender, instance, created, **kwargs):
|
||||
# Trigger refresh only if the source is newly created and active
|
||||
if created and instance.is_active:
|
||||
refresh_epg_data.delay()
|
||||
|
||||
@receiver(post_save, sender=EPGSource)
|
||||
def create_or_update_refresh_task(sender, instance, **kwargs):
|
||||
"""
|
||||
Create or update a Celery Beat periodic task when an EPGSource is created/updated.
|
||||
"""
|
||||
task_name = f"epg_source-refresh-{instance.id}"
|
||||
|
||||
interval, _ = IntervalSchedule.objects.get_or_create(
|
||||
every=24,
|
||||
period=IntervalSchedule.HOURS
|
||||
)
|
||||
|
||||
if not instance.refresh_task:
|
||||
refresh_task = PeriodicTask.objects.create(
|
||||
name=task_name,
|
||||
interval=interval,
|
||||
task="apps.epg.tasks.refresh_epg_data",
|
||||
kwargs=json.dumps({"source_id": instance.id}),
|
||||
enabled=instance.refresh_interval != 0,
|
||||
)
|
||||
EPGSource.objects.filter(id=instance.id).update(refresh_task=refresh_task)
|
||||
else:
|
||||
task = instance.refresh_task
|
||||
updated_fields = []
|
||||
|
||||
if task.enabled != (instance.refresh_interval != 0):
|
||||
task.enabled = instance.refresh_interval != 0
|
||||
updated_fields.append("enabled")
|
||||
|
||||
if task.interval != interval:
|
||||
task.interval = interval
|
||||
updated_fields.append("interval")
|
||||
|
||||
if updated_fields:
|
||||
task.save(update_fields=updated_fields)
|
||||
|
||||
@receiver(post_delete, sender=EPGSource)
|
||||
def delete_refresh_task(sender, instance, **kwargs):
|
||||
"""
|
||||
Delete the associated Celery Beat periodic task when a Channel is deleted.
|
||||
"""
|
||||
if instance.refresh_task:
|
||||
instance.refresh_task.interval.delete()
|
||||
instance.refresh_task.delete()
|
||||
|
|
|
|||
|
|
@ -18,28 +18,39 @@ from asgiref.sync import async_to_sync
|
|||
from channels.layers import get_channel_layer
|
||||
|
||||
from .models import EPGSource, EPGData, ProgramData
|
||||
from core.utils import acquire_task_lock, release_task_lock
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task
|
||||
def refresh_epg_data():
|
||||
def refresh_all_epg_data():
|
||||
logger.info("Starting refresh_epg_data task.")
|
||||
active_sources = EPGSource.objects.filter(is_active=True)
|
||||
logger.debug(f"Found {active_sources.count()} active EPGSource(s).")
|
||||
|
||||
for source in active_sources:
|
||||
logger.info(f"Processing EPGSource: {source.name} (type: {source.source_type})")
|
||||
if source.source_type == 'xmltv':
|
||||
fetch_xmltv(source)
|
||||
parse_channels_only(source)
|
||||
parse_programs_for_source(source)
|
||||
elif source.source_type == 'schedules_direct':
|
||||
fetch_schedules_direct(source)
|
||||
refresh_epg_data(source.id)
|
||||
|
||||
logger.info("Finished refresh_epg_data task.")
|
||||
return "EPG data refreshed."
|
||||
|
||||
@shared_task
|
||||
def refresh_epg_data(source_id):
|
||||
if not acquire_task_lock('refresh_epg_data', source_id):
|
||||
logger.debug(f"EPG refresh for {source_id} already running")
|
||||
return
|
||||
|
||||
source = EPGSource.objects.get(id=source_id)
|
||||
logger.info(f"Processing EPGSource: {source.name} (type: {source.source_type})")
|
||||
if source.source_type == 'xmltv':
|
||||
fetch_xmltv(source)
|
||||
parse_channels_only(source)
|
||||
parse_programs_for_source(source)
|
||||
elif source.source_type == 'schedules_direct':
|
||||
fetch_schedules_direct(source)
|
||||
|
||||
release_task_lock('refresh_epg_data', source_id)
|
||||
|
||||
def fetch_xmltv(source):
|
||||
logger.info(f"Fetching XMLTV data from source: {source.name}")
|
||||
|
|
@ -128,11 +139,16 @@ def parse_channels_only(source):
|
|||
|
||||
@shared_task
|
||||
def parse_programs_for_tvg_id(epg_id):
|
||||
if not acquire_task_lock('parse_epg_programs', epg_id):
|
||||
logger.debug(f"Program parse for {epg_id} already in progress")
|
||||
return
|
||||
|
||||
epg = EPGData.objects.get(id=epg_id)
|
||||
epg_source = epg.epg_source
|
||||
|
||||
if not Channel.objects.filter(epg_data=epg).exists():
|
||||
logger.info(f"No channels matched to EPG {epg.tvg_id}")
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
return
|
||||
|
||||
logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}")
|
||||
|
|
@ -173,6 +189,9 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
))
|
||||
|
||||
ProgramData.objects.bulk_create(programs_to_create)
|
||||
|
||||
release_task_lock('parse_epg_programs', epg_id)
|
||||
|
||||
logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.")
|
||||
|
||||
def parse_programs_for_source(epg_source, tvg_id=None):
|
||||
|
|
|
|||
|
|
@ -1,18 +0,0 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-27 17:01
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('m3u', '0004_m3uaccount_stream_profile'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='m3uaccount',
|
||||
name='custom_properties',
|
||||
field=models.TextField(blank=True, null=True),
|
||||
),
|
||||
]
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
# Generated by Django 5.1.6 on 2025-03-29 13:44
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('django_celery_beat', '0019_alter_periodictasks_options'),
|
||||
('m3u', '0004_m3uaccount_stream_profile'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='m3uaccount',
|
||||
name='custom_properties',
|
||||
field=models.TextField(blank=True, null=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='m3uaccount',
|
||||
name='refresh_interval',
|
||||
field=models.IntegerField(default=24),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='m3uaccount',
|
||||
name='refresh_task',
|
||||
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='django_celery_beat.periodictask'),
|
||||
),
|
||||
]
|
||||
52
apps/m3u/migrations/0006_populate_periodic_tasks.py
Normal file
52
apps/m3u/migrations/0006_populate_periodic_tasks.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
from django.db import migrations
|
||||
import json
|
||||
|
||||
def create_default_refresh_tasks(apps, schema_editor):
|
||||
"""
|
||||
Creates a PeriodicTask for each existing M3UAccount that doesn't have one.
|
||||
"""
|
||||
IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule")
|
||||
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
|
||||
M3UAccount = apps.get_model("m3u", "M3UAccount")
|
||||
|
||||
default_interval, _ = IntervalSchedule.objects.get_or_create(
|
||||
every=24,
|
||||
period="hours",
|
||||
)
|
||||
|
||||
for account in M3UAccount.objects.all():
|
||||
if account.refresh_task:
|
||||
continue
|
||||
|
||||
task_name = f"m3u_account-refresh-{account.id}"
|
||||
|
||||
refresh_task = PeriodicTask.objects.create(
|
||||
name=task_name,
|
||||
interval=default_interval,
|
||||
task="apps.m3u.tasks.refresh_single_m3u_account",
|
||||
kwargs=json.dumps({"account_id": account.id}),
|
||||
)
|
||||
|
||||
account.refresh_task = refresh_task
|
||||
account.save(update_fields=["refresh_task"])
|
||||
|
||||
def reverse_migration(apps, schema_editor):
|
||||
IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule")
|
||||
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
|
||||
M3UAccount = apps.get_model("m3u", "M3UAccount")
|
||||
|
||||
for account in M3UAccount.objects.all():
|
||||
IntervalSchedule.objects.filter(name=f"m3u_account-refresh-interval-{account.id}").delete()
|
||||
PeriodicTask.objects.filter(name=f"m3u_account-refresh-{account.id}").delete()
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("m3u", "0005_m3uaccount_custom_properties_and_more"),
|
||||
("django_celery_beat", "0019_alter_periodictasks_options"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(create_default_refresh_tasks, reverse_migration),
|
||||
]
|
||||
|
|
@ -4,6 +4,7 @@ from core.models import UserAgent
|
|||
import re
|
||||
from django.dispatch import receiver
|
||||
from apps.channels.models import StreamProfile
|
||||
from django_celery_beat.models import PeriodicTask
|
||||
|
||||
CUSTOM_M3U_ACCOUNT_NAME="custom"
|
||||
|
||||
|
|
@ -68,6 +69,10 @@ class M3UAccount(models.Model):
|
|||
related_name='m3u_accounts'
|
||||
)
|
||||
custom_properties = models.TextField(null=True, blank=True)
|
||||
refresh_interval = models.IntegerField(default=24)
|
||||
refresh_task = models.ForeignKey(
|
||||
PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
|
|
|||
|
|
@ -52,65 +52,9 @@ class M3UAccountSerializer(serializers.ModelSerializer):
|
|||
fields = [
|
||||
'id', 'name', 'server_url', 'uploaded_file', 'server_group',
|
||||
'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked',
|
||||
'channel_groups',
|
||||
'channel_groups', 'refresh_interval'
|
||||
]
|
||||
|
||||
# def get_channel_groups(self, obj):
|
||||
# # Retrieve related ChannelGroupM3UAccount records for this M3UAccount
|
||||
# relations = ChannelGroupM3UAccount.objects.filter(m3u_account=obj).select_related('channel_group')
|
||||
|
||||
# # Serialize the channel groups with their enabled status
|
||||
# return [
|
||||
# {
|
||||
# 'channel_group_name': relation.channel_group.name,
|
||||
# 'channel_group_id': relation.channel_group.id,
|
||||
# 'enabled': relation.enabled,
|
||||
# }
|
||||
# for relation in relations
|
||||
# ]
|
||||
|
||||
# def to_representation(self, instance):
|
||||
# """Override the default to_representation method to include channel_groups"""
|
||||
# representation = super().to_representation(instance)
|
||||
|
||||
# # Manually add the channel_groups to the representation
|
||||
# channel_groups = ChannelGroupM3UAccount.objects.filter(m3u_account=instance).select_related('channel_group')
|
||||
# representation['channel_groups'] = [
|
||||
# {
|
||||
# 'id': relation.id,
|
||||
# 'channel_group_name': relation.channel_group.name,
|
||||
# 'channel_group_id': relation.channel_group.id,
|
||||
# 'enabled': relation.enabled,
|
||||
# }
|
||||
# for relation in channel_groups
|
||||
# ]
|
||||
|
||||
# return representation
|
||||
|
||||
# def update(self, instance, validated_data):
|
||||
# logger.info(validated_data)
|
||||
# channel_groups_data = validated_data.pop('channel_groups', None)
|
||||
# instance = super().update(instance, validated_data)
|
||||
|
||||
# if channel_groups_data is not None:
|
||||
# logger.info(json.dumps(channel_groups_data))
|
||||
# # Remove existing relationships not included in the request
|
||||
# existing_groups = {cg.channel_group_id: cg for cg in instance.channel_group.all()}
|
||||
|
||||
# # for group_id in set(existing_groups.keys()) - sent_group_ids:
|
||||
# # existing_groups[group_id].delete()
|
||||
|
||||
# # Create or update relationships
|
||||
# for cg_data in channel_groups_data:
|
||||
# logger.info(json.dumps(cg_data))
|
||||
# ChannelGroupM3UAccount.objects.update_or_create(
|
||||
# channel_group=existing_groups[cg_data['channel_group_id']],
|
||||
# m3u_account=instance,
|
||||
# defaults={'enabled': cg_data.get('enabled', True)}
|
||||
# )
|
||||
|
||||
# return instance
|
||||
|
||||
class ServerGroupSerializer(serializers.ModelSerializer):
|
||||
"""Serializer for Server Group"""
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
# apps/m3u/signals.py
|
||||
from django.db.models.signals import post_save
|
||||
from django.db.models.signals import post_save, post_delete
|
||||
from django.dispatch import receiver
|
||||
from .models import M3UAccount
|
||||
from .tasks import refresh_single_m3u_account, refresh_m3u_groups
|
||||
from django_celery_beat.models import PeriodicTask, IntervalSchedule
|
||||
import json
|
||||
|
||||
@receiver(post_save, sender=M3UAccount)
|
||||
def refresh_account_on_save(sender, instance, created, **kwargs):
|
||||
|
|
@ -13,3 +15,48 @@ def refresh_account_on_save(sender, instance, created, **kwargs):
|
|||
"""
|
||||
if created:
|
||||
refresh_single_m3u_account.delay(instance.id)
|
||||
|
||||
@receiver(post_save, sender=M3UAccount)
|
||||
def create_or_update_refresh_task(sender, instance, **kwargs):
|
||||
"""
|
||||
Create or update a Celery Beat periodic task when an M3UAccount is created/updated.
|
||||
"""
|
||||
task_name = f"m3u_account-refresh-{instance.id}"
|
||||
|
||||
interval, _ = IntervalSchedule.objects.get_or_create(
|
||||
every=24,
|
||||
period=IntervalSchedule.HOURS
|
||||
)
|
||||
|
||||
if not instance.refresh_task:
|
||||
refresh_task = PeriodicTask.objects.create(
|
||||
name=task_name,
|
||||
interval=interval,
|
||||
task="apps.m3u.tasks.refresh_single_m3u_account",
|
||||
kwargs=json.dumps({"account_id": instance.id}),
|
||||
enabled=instance.refresh_interval != 0,
|
||||
)
|
||||
M3UAccount.objects.filter(id=instance.id).update(refresh_task=refresh_task)
|
||||
else:
|
||||
task = instance.refresh_task
|
||||
updated_fields = []
|
||||
|
||||
if task.enabled != (instance.refresh_interval != 0):
|
||||
task.enabled = instance.refresh_interval != 0
|
||||
updated_fields.append("enabled")
|
||||
|
||||
if task.interval != interval:
|
||||
task.interval = interval
|
||||
updated_fields.append("interval")
|
||||
|
||||
if updated_fields:
|
||||
task.save(update_fields=updated_fields)
|
||||
|
||||
@receiver(post_delete, sender=M3UAccount)
|
||||
def delete_refresh_task(sender, instance, **kwargs):
|
||||
"""
|
||||
Delete the associated Celery Beat periodic task when a Channel is deleted.
|
||||
"""
|
||||
if instance.refresh_task:
|
||||
instance.refresh_task.interval.delete()
|
||||
instance.refresh_task.delete()
|
||||
|
|
|
|||
|
|
@ -16,13 +16,12 @@ from channels.layers import get_channel_layer
|
|||
from django.utils import timezone
|
||||
import time
|
||||
import json
|
||||
from core.utils import redis_client
|
||||
from core.utils import redis_client, acquire_task_lock, release_task_lock
|
||||
from core.models import CoreSettings
|
||||
from asgiref.sync import async_to_sync
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
LOCK_EXPIRE = 300
|
||||
BATCH_SIZE = 1000
|
||||
SKIP_EXTS = {}
|
||||
m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u")
|
||||
|
|
@ -104,19 +103,6 @@ def _matches_filters(stream_name: str, group_name: str, filters):
|
|||
return exclude
|
||||
return False
|
||||
|
||||
def acquire_lock(task_name, account_id):
|
||||
"""Acquire a lock to prevent concurrent task execution."""
|
||||
lock_id = f"task_lock_{task_name}_{account_id}"
|
||||
lock_acquired = cache.add(lock_id, "locked", timeout=LOCK_EXPIRE)
|
||||
if not lock_acquired:
|
||||
logger.warning(f"Lock for {task_name} and account_id={account_id} already acquired. Task will not proceed.")
|
||||
return lock_acquired
|
||||
|
||||
def release_lock(task_name, account_id):
|
||||
"""Release the lock after task execution."""
|
||||
lock_id = f"task_lock_{task_name}_{account_id}"
|
||||
cache.delete(lock_id)
|
||||
|
||||
@shared_task
|
||||
def refresh_m3u_accounts():
|
||||
"""Queue background parse for all active M3UAccounts."""
|
||||
|
|
@ -289,18 +275,19 @@ def cleanup_streams(account_id):
|
|||
logger.info(f"Cleanup complete")
|
||||
|
||||
def refresh_m3u_groups(account_id):
|
||||
if not acquire_lock('refresh_m3u_account_groups', account_id):
|
||||
if not acquire_task_lock('refresh_m3u_account_groups', account_id):
|
||||
return f"Task already running for account_id={account_id}.", None
|
||||
|
||||
# Record start time
|
||||
start_time = time.time()
|
||||
send_progress_update(0, account_id)
|
||||
|
||||
try:
|
||||
account = M3UAccount.objects.get(id=account_id, is_active=True)
|
||||
except M3UAccount.DoesNotExist:
|
||||
release_lock('refresh_m3u_account_groups', account_id)
|
||||
return f"M3UAccount with ID={account_id} not found or inactive."
|
||||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
return f"M3UAccount with ID={account_id} not found or inactive.", None
|
||||
|
||||
send_progress_update(0, account_id)
|
||||
|
||||
lines = fetch_m3u_lines(account)
|
||||
extinf_data = []
|
||||
|
|
@ -329,14 +316,14 @@ def refresh_m3u_groups(account_id):
|
|||
|
||||
process_groups(account, groups)
|
||||
|
||||
release_lock('refresh_m3u_account_groups`', account_id)
|
||||
release_task_lock('refresh_m3u_account_groups', account_id)
|
||||
|
||||
return extinf_data, groups
|
||||
|
||||
@shared_task
|
||||
def refresh_single_m3u_account(account_id, use_cache=False):
|
||||
"""Splits M3U processing into chunks and dispatches them as parallel tasks."""
|
||||
if not acquire_lock('refresh_single_m3u_account', account_id):
|
||||
if not acquire_task_lock('refresh_single_m3u_account', account_id):
|
||||
return f"Task already running for account_id={account_id}."
|
||||
|
||||
# Record start time
|
||||
|
|
@ -347,7 +334,7 @@ def refresh_single_m3u_account(account_id, use_cache=False):
|
|||
account = M3UAccount.objects.get(id=account_id, is_active=True)
|
||||
filters = list(account.filters.all())
|
||||
except M3UAccount.DoesNotExist:
|
||||
release_lock('refresh_single_m3u_account', account_id)
|
||||
release_task_lock('refresh_single_m3u_account', account_id)
|
||||
return f"M3UAccount with ID={account_id} not found or inactive."
|
||||
|
||||
# Fetch M3U lines and handle potential issues
|
||||
|
|
@ -366,7 +353,11 @@ def refresh_single_m3u_account(account_id, use_cache=False):
|
|||
if not extinf_data:
|
||||
try:
|
||||
extinf_data, groups = refresh_m3u_groups(account_id)
|
||||
if not extinf_data or not groups:
|
||||
release_task_lock('refresh_single_m3u_account', account_id)
|
||||
return "Failed to update m3u account, task may already be running"
|
||||
except:
|
||||
release_task_lock('refresh_single_m3u_account', account_id)
|
||||
return "Failed to update m3u account"
|
||||
|
||||
hash_keys = CoreSettings.get_m3u_hash_key().split(",")
|
||||
|
|
@ -415,7 +406,7 @@ def refresh_single_m3u_account(account_id, use_cache=False):
|
|||
|
||||
print(f"Function took {elapsed_time} seconds to execute.")
|
||||
|
||||
release_lock('refresh_single_m3u_account', account_id)
|
||||
release_task_lock('refresh_single_m3u_account', account_id)
|
||||
|
||||
cursor = 0
|
||||
while True:
|
||||
|
|
|
|||
|
|
@ -472,7 +472,12 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
|
|||
<TextInput
|
||||
id="epg_data_id"
|
||||
name="epg_data_id"
|
||||
label="EPG"
|
||||
label={
|
||||
<Group style={{ width: '100%' }}>
|
||||
<Box>EPG</Box>
|
||||
<Button size="xs">Use Dummy</Button>
|
||||
</Group>
|
||||
}
|
||||
readOnly
|
||||
value={
|
||||
formik.values.epg_data_id
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
|
|||
url: '',
|
||||
api_key: '',
|
||||
is_active: true,
|
||||
refresh_interval: 24,
|
||||
},
|
||||
validationSchema: Yup.object({
|
||||
name: Yup.string().required('Name is required'),
|
||||
|
|
@ -64,6 +65,7 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
|
|||
url: epg.url,
|
||||
api_key: epg.api_key,
|
||||
is_active: epg.is_active,
|
||||
refresh_interval: epg.refresh_interval,
|
||||
});
|
||||
} else {
|
||||
formik.resetForm();
|
||||
|
|
@ -125,6 +127,17 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
|
|||
]}
|
||||
/>
|
||||
|
||||
<NumberInput
|
||||
label="Refresh Interval (hours)"
|
||||
value={formik.values.refresh_interval}
|
||||
onChange={formik.handleChange}
|
||||
error={
|
||||
formik.errors.refresh_interval
|
||||
? formik.touched.refresh_interval
|
||||
: ''
|
||||
}
|
||||
/>
|
||||
|
||||
<Flex mih={50} gap="xs" justify="flex-end" align="flex-end">
|
||||
<Button
|
||||
type="submit"
|
||||
|
|
|
|||
|
|
@ -16,6 +16,10 @@ import {
|
|||
FileInput,
|
||||
Space,
|
||||
useMantineTheme,
|
||||
NumberInput,
|
||||
Divider,
|
||||
Stack,
|
||||
Group,
|
||||
} from '@mantine/core';
|
||||
import M3UGroupFilter from './M3UGroupFilter';
|
||||
import useChannelsStore from '../../store/channels';
|
||||
|
|
@ -47,6 +51,7 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
|
|||
user_agent: `${userAgents[0].id}`,
|
||||
is_active: true,
|
||||
max_streams: 0,
|
||||
refresh_interval: 24,
|
||||
},
|
||||
validationSchema: Yup.object({
|
||||
name: Yup.string().required('Name is required'),
|
||||
|
|
@ -98,6 +103,7 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
|
|||
max_streams: playlist.max_streams,
|
||||
user_agent: playlist.user_agent,
|
||||
is_active: playlist.is_active,
|
||||
refresh_interval: playlist.refresh_interval,
|
||||
});
|
||||
} else {
|
||||
formik.resetForm();
|
||||
|
|
@ -115,129 +121,147 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
|
|||
}
|
||||
|
||||
return (
|
||||
<Modal opened={isOpen} onClose={onClose} title="M3U Account">
|
||||
<Modal size={700} opened={isOpen} onClose={onClose} title="M3U Account">
|
||||
<LoadingOverlay
|
||||
visible={formik.isSubmitting}
|
||||
overlayBlur={2}
|
||||
loaderProps={loadingText ? { children: loadingText } : {}}
|
||||
/>
|
||||
|
||||
<div style={{ width: 400, position: 'relative' }}>
|
||||
<form onSubmit={formik.handleSubmit}>
|
||||
<TextInput
|
||||
fullWidth
|
||||
id="name"
|
||||
name="name"
|
||||
label="Name"
|
||||
value={formik.values.name}
|
||||
onChange={formik.handleChange}
|
||||
error={formik.touched.name && Boolean(formik.errors.name)}
|
||||
helperText={formik.touched.name && formik.errors.name}
|
||||
/>
|
||||
<form onSubmit={formik.handleSubmit}>
|
||||
<Group justify="space-between" align="top">
|
||||
<Stack gap="5" style={{ flex: 1 }}>
|
||||
<TextInput
|
||||
fullWidth
|
||||
id="name"
|
||||
name="name"
|
||||
label="Name"
|
||||
value={formik.values.name}
|
||||
onChange={formik.handleChange}
|
||||
error={formik.touched.name && Boolean(formik.errors.name)}
|
||||
helperText={formik.touched.name && formik.errors.name}
|
||||
/>
|
||||
|
||||
<TextInput
|
||||
fullWidth
|
||||
id="server_url"
|
||||
name="server_url"
|
||||
label="URL"
|
||||
value={formik.values.server_url}
|
||||
onChange={formik.handleChange}
|
||||
error={
|
||||
formik.touched.server_url && Boolean(formik.errors.server_url)
|
||||
}
|
||||
helperText={formik.touched.server_url && formik.errors.server_url}
|
||||
/>
|
||||
<TextInput
|
||||
fullWidth
|
||||
id="server_url"
|
||||
name="server_url"
|
||||
label="URL"
|
||||
value={formik.values.server_url}
|
||||
onChange={formik.handleChange}
|
||||
error={
|
||||
formik.touched.server_url && Boolean(formik.errors.server_url)
|
||||
}
|
||||
helperText={formik.touched.server_url && formik.errors.server_url}
|
||||
/>
|
||||
|
||||
<FileInput
|
||||
id="uploaded_file"
|
||||
label="Upload files"
|
||||
placeholder="Upload files"
|
||||
value={formik.uploaded_file}
|
||||
onChange={handleFileChange}
|
||||
/>
|
||||
<FileInput
|
||||
id="uploaded_file"
|
||||
label="Upload files"
|
||||
placeholder="Upload files"
|
||||
value={formik.uploaded_file}
|
||||
onChange={handleFileChange}
|
||||
/>
|
||||
</Stack>
|
||||
|
||||
<TextInput
|
||||
fullWidth
|
||||
id="max_streams"
|
||||
name="max_streams"
|
||||
label="Max Streams"
|
||||
placeholder="0 = Unlimited"
|
||||
value={formik.values.max_streams}
|
||||
onChange={formik.handleChange}
|
||||
error={formik.errors.max_streams ? formik.touched.max_streams : ''}
|
||||
/>
|
||||
<Divider size="sm" orientation="vertical" />
|
||||
|
||||
<NativeSelect
|
||||
id="user_agent"
|
||||
name="user_agent"
|
||||
label="User-Agent"
|
||||
value={formik.values.user_agent}
|
||||
onChange={formik.handleChange}
|
||||
error={formik.errors.user_agent ? formik.touched.user_agent : ''}
|
||||
data={userAgents.map((ua) => ({
|
||||
label: ua.name,
|
||||
value: `${ua.id}`,
|
||||
}))}
|
||||
/>
|
||||
<Stack gap="5" style={{ flex: 1 }}>
|
||||
<TextInput
|
||||
fullWidth
|
||||
id="max_streams"
|
||||
name="max_streams"
|
||||
label="Max Streams"
|
||||
placeholder="0 = Unlimited"
|
||||
value={formik.values.max_streams}
|
||||
onChange={formik.handleChange}
|
||||
error={
|
||||
formik.errors.max_streams ? formik.touched.max_streams : ''
|
||||
}
|
||||
/>
|
||||
|
||||
<Space h="md" />
|
||||
<NativeSelect
|
||||
id="user_agent"
|
||||
name="user_agent"
|
||||
label="User-Agent"
|
||||
value={formik.values.user_agent}
|
||||
onChange={formik.handleChange}
|
||||
error={formik.errors.user_agent ? formik.touched.user_agent : ''}
|
||||
data={userAgents.map((ua) => ({
|
||||
label: ua.name,
|
||||
value: `${ua.id}`,
|
||||
}))}
|
||||
/>
|
||||
|
||||
<Checkbox
|
||||
label="Is Active"
|
||||
name="is_active"
|
||||
checked={formik.values.is_active}
|
||||
onChange={(e) =>
|
||||
formik.setFieldValue('is_active', e.target.checked)
|
||||
}
|
||||
/>
|
||||
<NumberInput
|
||||
label="Refresh Interval (hours)"
|
||||
value={formik.values.refresh_interval}
|
||||
onChange={formik.handleChange}
|
||||
error={
|
||||
formik.errors.refresh_interval
|
||||
? formik.touched.refresh_interval
|
||||
: ''
|
||||
}
|
||||
/>
|
||||
|
||||
<Flex mih={50} gap="xs" justify="flex-end" align="flex-end">
|
||||
{playlist && (
|
||||
<>
|
||||
<Button
|
||||
variant="filled"
|
||||
// color={theme.custom.colors.buttonPrimary}
|
||||
size="sm"
|
||||
onClick={() => setGroupFilterModalOpen(true)}
|
||||
>
|
||||
Groups
|
||||
</Button>
|
||||
<Button
|
||||
variant="filled"
|
||||
// color={theme.custom.colors.buttonPrimary}
|
||||
size="sm"
|
||||
onClick={() => setProfileModalOpen(true)}
|
||||
>
|
||||
Profiles
|
||||
</Button>
|
||||
</>
|
||||
)}
|
||||
<Button
|
||||
type="submit"
|
||||
variant="filled"
|
||||
// color={theme.custom.colors.buttonPrimary}
|
||||
disabled={formik.isSubmitting}
|
||||
size="sm"
|
||||
>
|
||||
Save
|
||||
</Button>
|
||||
</Flex>
|
||||
<Checkbox
|
||||
label="Is Active"
|
||||
name="is_active"
|
||||
checked={formik.values.is_active}
|
||||
onChange={(e) =>
|
||||
formik.setFieldValue('is_active', e.target.checked)
|
||||
}
|
||||
/>
|
||||
</Stack>
|
||||
</Group>
|
||||
|
||||
<Flex mih={50} gap="xs" justify="flex-end" align="flex-end">
|
||||
{playlist && (
|
||||
<>
|
||||
<M3UProfiles
|
||||
playlist={playlist}
|
||||
isOpen={profileModalOpen}
|
||||
onClose={() => setProfileModalOpen(false)}
|
||||
/>
|
||||
<M3UGroupFilter
|
||||
isOpen={groupFilterModalOpen}
|
||||
playlist={playlist}
|
||||
onClose={closeGroupFilter}
|
||||
/>
|
||||
<Button
|
||||
variant="filled"
|
||||
// color={theme.custom.colors.buttonPrimary}
|
||||
size="sm"
|
||||
onClick={() => setGroupFilterModalOpen(true)}
|
||||
>
|
||||
Groups
|
||||
</Button>
|
||||
<Button
|
||||
variant="filled"
|
||||
// color={theme.custom.colors.buttonPrimary}
|
||||
size="sm"
|
||||
onClick={() => setProfileModalOpen(true)}
|
||||
>
|
||||
Profiles
|
||||
</Button>
|
||||
</>
|
||||
)}
|
||||
</form>
|
||||
</div>
|
||||
|
||||
<Button
|
||||
type="submit"
|
||||
variant="filled"
|
||||
// color={theme.custom.colors.buttonPrimary}
|
||||
disabled={formik.isSubmitting}
|
||||
size="sm"
|
||||
>
|
||||
Save
|
||||
</Button>
|
||||
</Flex>
|
||||
{playlist && (
|
||||
<>
|
||||
<M3UProfiles
|
||||
playlist={playlist}
|
||||
isOpen={profileModalOpen}
|
||||
onClose={() => setProfileModalOpen(false)}
|
||||
/>
|
||||
<M3UGroupFilter
|
||||
isOpen={groupFilterModalOpen}
|
||||
playlist={playlist}
|
||||
onClose={closeGroupFilter}
|
||||
/>
|
||||
</>
|
||||
)}
|
||||
</form>
|
||||
</Modal>
|
||||
);
|
||||
};
|
||||
|
|
|
|||
|
|
@ -27,3 +27,4 @@ channels
|
|||
channels-redis
|
||||
daphne
|
||||
django-filter
|
||||
django-celery-beat
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue