cleanup of streams not belonging to an active group

This commit is contained in:
dekzter 2025-03-24 18:56:19 -04:00
parent bc460d73b4
commit 6a89688e6c
2 changed files with 36 additions and 4 deletions

View file

@ -12,4 +12,4 @@ def refresh_account_on_save(sender, instance, created, **kwargs):
if it is active or newly created.
"""
if created:
refresh_m3u_groups(instance.id)
refresh_single_m3u_account.delay(instance.id)

View file

@ -8,6 +8,7 @@ from celery.result import AsyncResult
from celery import shared_task, current_app, group
from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from .models import M3UAccount
from apps.channels.models import Stream, ChannelGroup, ChannelGroupM3UAccount
from asgiref.sync import async_to_sync
@ -179,7 +180,7 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys):
account = M3UAccount.objects.get(id=account_id)
existing_groups = {group.name: group for group in ChannelGroup.objects.filter(
m3u_account__m3u_account=account, # Filter by the M3UAccount
m3u_account__enabled=True # Filter by the enabled flag in the join table
m3u_account__enabled=True # Filter by the enabled flag in the join table
)}
streams_to_create = []
@ -269,9 +270,29 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys):
return f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
def cleanup_streams(account_id):
account = M3UAccount.objects.get(id=account_id, is_active=True)
existing_groups = ChannelGroup.objects.filter(
m3u_account__m3u_account=account,
m3u_account__enabled=True,
).values_list('id', flat=True)
logger.info(f"Found {len(existing_groups)} active groups")
streams = Stream.objects.filter(m3u_account=account)
streams_to_delete = Stream.objects.filter(
m3u_account=account
).exclude(
channel_group__in=existing_groups # Exclude products having any of the excluded tags
)
# Delete the filtered products
streams_to_delete.delete()
logger.info(f"Cleanup complete")
def refresh_m3u_groups(account_id):
if not acquire_lock('refresh_m3u_account_groups', account_id):
return f"Task already running for account_id={account_id}."
return f"Task already running for account_id={account_id}.", None
# Record start time
start_time = time.time()
@ -345,7 +366,10 @@ def refresh_single_m3u_account(account_id, use_cache=False):
groups = data['groups']
if not extinf_data:
extinf_data, groups = refresh_m3u_groups(account_id)
try:
extinf_data, groups = refresh_m3u_groups(account_id)
except:
return "Failed to update m3u account"
hash_keys = CoreSettings.get_m3u_hash_key().split(",")
@ -371,6 +395,10 @@ def refresh_single_m3u_account(account_id, use_cache=False):
progress = int((completed_batches / total_batches) * 100)
# Send progress update via Channels
# Don't send 100% because we want to clean up after
if progress == 100:
progress = 99
send_progress_update(progress, account_id)
# Optionally remove completed task from the group to prevent processing it again
@ -378,6 +406,10 @@ def refresh_single_m3u_account(account_id, use_cache=False):
else:
logger.debug(f"Task is still running.")
# Run cleanup
cleanup_streams(account_id)
send_progress_update(100, account_id)
end_time = time.time()
# Calculate elapsed time