From e88e3928dd86aa9ef3c91bdccfede352cb195ea9 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 8 May 2025 20:17:22 -0500 Subject: [PATCH] If EPG source is not found when task is run, delete the task so it doesn't keep happening. --- apps/epg/signals.py | 55 ++++++-------------------------- apps/epg/tasks.py | 76 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 84 insertions(+), 47 deletions(-) diff --git a/apps/epg/signals.py b/apps/epg/signals.py index 7fa7f54f..6f98e84a 100644 --- a/apps/epg/signals.py +++ b/apps/epg/signals.py @@ -1,7 +1,7 @@ from django.db.models.signals import post_save, post_delete, pre_save from django.dispatch import receiver from .models import EPGSource -from .tasks import refresh_epg_data +from .tasks import refresh_epg_data, delete_epg_refresh_task_by_id from django_celery_beat.models import PeriodicTask, IntervalSchedule import json import logging @@ -59,57 +59,20 @@ def delete_refresh_task(sender, instance, **kwargs): Delete the associated Celery Beat periodic task when an EPGSource is deleted. """ try: + # First try the foreign key relationship to find the task ID task = None - task_name = f"epg_source-refresh-{instance.id}" - - # First try the foreign key relationship if instance.refresh_task: logger.info(f"Found task via foreign key: {instance.refresh_task.id} for EPGSource {instance.id}") task = instance.refresh_task + + # Store task ID before deletion if we need to bypass the helper function + if task: + delete_epg_refresh_task_by_id(instance.id) else: - # If relationship is broken, look for task by name - logger.warning(f"No refresh_task found via foreign key for EPGSource {instance.id}, looking up by name") - from django_celery_beat.models import PeriodicTask - try: - task = PeriodicTask.objects.get(name=task_name) - logger.info(f"Found task by name: {task.id} for EPGSource {instance.id}") - except PeriodicTask.DoesNotExist: - logger.warning(f"No PeriodicTask found with name {task_name}") - return - - # Now delete the task and its interval - if task: - # Store interval info before deleting the task - interval_id = None - if hasattr(task, 'interval') and task.interval: - interval_id = task.interval.id - - # Count how many TOTAL tasks use this interval (including this one) - from django_celery_beat.models import PeriodicTask - tasks_with_same_interval = PeriodicTask.objects.filter(interval_id=interval_id).count() - logger.info(f"Interval {interval_id} is used by {tasks_with_same_interval} tasks total") - - # Delete the task first - task_id = task.id - task.delete() - logger.info(f"Successfully deleted periodic task {task_id}") - - # Now check if we should delete the interval - # We only delete if it was the ONLY task using this interval - # (meaning remaining count would be zero after our deletion) - if interval_id and tasks_with_same_interval == 1: - from django_celery_beat.models import IntervalSchedule - try: - interval = IntervalSchedule.objects.get(id=interval_id) - logger.info(f"Deleting interval schedule {interval_id} (not shared with other tasks)") - interval.delete() - logger.info(f"Successfully deleted interval {interval_id}") - except IntervalSchedule.DoesNotExist: - logger.warning(f"Interval {interval_id} no longer exists") - elif interval_id: - logger.info(f"Not deleting interval {interval_id} as it's shared with {tasks_with_same_interval-1} other tasks") + # Otherwise use the helper function + delete_epg_refresh_task_by_id(instance.id) except Exception as e: - logger.error(f"Error deleting periodic task for EPGSource {instance.id}: {str(e)}", exc_info=True) + logger.error(f"Error in delete_refresh_task signal handler: {str(e)}", exc_info=True) @receiver(pre_save, sender=EPGSource) def update_status_on_active_change(sender, instance, **kwargs): diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index 52f8f5a1..f102630f 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -49,6 +49,61 @@ def send_epg_update(source_id, action, progress, **kwargs): ) +def delete_epg_refresh_task_by_id(epg_id): + """ + Delete the periodic task associated with an EPG source ID. + Can be called directly or from the post_delete signal. + Returns True if a task was found and deleted, False otherwise. + """ + try: + task = None + task_name = f"epg_source-refresh-{epg_id}" + + # Look for task by name + try: + from django_celery_beat.models import PeriodicTask, IntervalSchedule + task = PeriodicTask.objects.get(name=task_name) + logger.info(f"Found task by name: {task.id} for EPGSource {epg_id}") + except PeriodicTask.DoesNotExist: + logger.warning(f"No PeriodicTask found with name {task_name}") + return False + + # Now delete the task and its interval + if task: + # Store interval info before deleting the task + interval_id = None + if hasattr(task, 'interval') and task.interval: + interval_id = task.interval.id + + # Count how many TOTAL tasks use this interval (including this one) + tasks_with_same_interval = PeriodicTask.objects.filter(interval_id=interval_id).count() + logger.info(f"Interval {interval_id} is used by {tasks_with_same_interval} tasks total") + + # Delete the task first + task_id = task.id + task.delete() + logger.info(f"Successfully deleted periodic task {task_id}") + + # Now check if we should delete the interval + # We only delete if it was the ONLY task using this interval + if interval_id and tasks_with_same_interval == 1: + try: + interval = IntervalSchedule.objects.get(id=interval_id) + logger.info(f"Deleting interval schedule {interval_id} (not shared with other tasks)") + interval.delete() + logger.info(f"Successfully deleted interval {interval_id}") + except IntervalSchedule.DoesNotExist: + logger.warning(f"Interval {interval_id} no longer exists") + elif interval_id: + logger.info(f"Not deleting interval {interval_id} as it's shared with {tasks_with_same_interval-1} other tasks") + + return True + return False + except Exception as e: + logger.error(f"Error deleting periodic task for EPGSource {epg_id}: {str(e)}", exc_info=True) + return False + + @shared_task def refresh_all_epg_data(): logger.info("Starting refresh_epg_data task.") @@ -69,11 +124,30 @@ def refresh_epg_data(source_id): return try: - source = EPGSource.objects.get(id=source_id) + # Try to get the EPG source + try: + source = EPGSource.objects.get(id=source_id) + except EPGSource.DoesNotExist: + # The EPG source doesn't exist, so delete the periodic task if it exists + logger.warning(f"EPG source with ID {source_id} not found, but task was triggered. Cleaning up orphaned task.") + + # Call the shared function to delete the task + if delete_epg_refresh_task_by_id(source_id): + logger.info(f"Successfully cleaned up orphaned task for EPG source {source_id}") + else: + logger.info(f"No orphaned task found for EPG source {source_id}") + + # Release the lock and exit + release_task_lock('refresh_epg_data', source_id) + return f"EPG source {source_id} does not exist, task cleaned up" + + # The source exists but is not active, just skip processing if not source.is_active: logger.info(f"EPG source {source_id} is not active. Skipping.") + release_task_lock('refresh_epg_data', source_id) return + # Continue with the normal processing... logger.info(f"Processing EPGSource: {source.name} (type: {source.source_type})") if source.source_type == 'xmltv': fetch_success = fetch_xmltv(source)