From 6a89688e6c0035eb80111328cc3f32dba8a33328 Mon Sep 17 00:00:00 2001 From: dekzter Date: Mon, 24 Mar 2025 18:56:19 -0400 Subject: [PATCH] cleanup of streams not belonging to an active group --- apps/m3u/signals.py | 2 +- apps/m3u/tasks.py | 38 +++++++++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/apps/m3u/signals.py b/apps/m3u/signals.py index c2abc4c8..c07c1c4c 100644 --- a/apps/m3u/signals.py +++ b/apps/m3u/signals.py @@ -12,4 +12,4 @@ def refresh_account_on_save(sender, instance, created, **kwargs): if it is active or newly created. """ if created: - refresh_m3u_groups(instance.id) + refresh_single_m3u_account.delay(instance.id) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index e0995543..82f2310b 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -8,6 +8,7 @@ from celery.result import AsyncResult from celery import shared_task, current_app, group from django.conf import settings from django.core.cache import cache +from django.db import transaction from .models import M3UAccount from apps.channels.models import Stream, ChannelGroup, ChannelGroupM3UAccount from asgiref.sync import async_to_sync @@ -179,7 +180,7 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): account = M3UAccount.objects.get(id=account_id) existing_groups = {group.name: group for group in ChannelGroup.objects.filter( m3u_account__m3u_account=account, # Filter by the M3UAccount - m3u_account__enabled=True # Filter by the enabled flag in the join table + m3u_account__enabled=True # Filter by the enabled flag in the join table )} streams_to_create = [] @@ -269,9 +270,29 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): return f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." +def cleanup_streams(account_id): + account = M3UAccount.objects.get(id=account_id, is_active=True) + existing_groups = ChannelGroup.objects.filter( + m3u_account__m3u_account=account, + m3u_account__enabled=True, + ).values_list('id', flat=True) + logger.info(f"Found {len(existing_groups)} active groups") + streams = Stream.objects.filter(m3u_account=account) + + streams_to_delete = Stream.objects.filter( + m3u_account=account + ).exclude( + channel_group__in=existing_groups # Exclude products having any of the excluded tags + ) + + # Delete the filtered products + streams_to_delete.delete() + + logger.info(f"Cleanup complete") + def refresh_m3u_groups(account_id): if not acquire_lock('refresh_m3u_account_groups', account_id): - return f"Task already running for account_id={account_id}." + return f"Task already running for account_id={account_id}.", None # Record start time start_time = time.time() @@ -345,7 +366,10 @@ def refresh_single_m3u_account(account_id, use_cache=False): groups = data['groups'] if not extinf_data: - extinf_data, groups = refresh_m3u_groups(account_id) + try: + extinf_data, groups = refresh_m3u_groups(account_id) + except: + return "Failed to update m3u account" hash_keys = CoreSettings.get_m3u_hash_key().split(",") @@ -371,6 +395,10 @@ def refresh_single_m3u_account(account_id, use_cache=False): progress = int((completed_batches / total_batches) * 100) # Send progress update via Channels + # Don't send 100% because we want to clean up after + if progress == 100: + progress = 99 + send_progress_update(progress, account_id) # Optionally remove completed task from the group to prevent processing it again @@ -378,6 +406,10 @@ def refresh_single_m3u_account(account_id, use_cache=False): else: logger.debug(f"Task is still running.") + # Run cleanup + cleanup_streams(account_id) + send_progress_update(100, account_id) + end_time = time.time() # Calculate elapsed time