From 2995d2c456b024cdc9564a8d4cc6ce0ee4aeb662 Mon Sep 17 00:00:00 2001 From: dekzter Date: Mon, 31 Mar 2025 10:03:49 -0400 Subject: [PATCH] added centralized task locking --- core/utils.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/core/utils.py b/core/utils.py index 073c2169..2132db3c 100644 --- a/core/utils.py +++ b/core/utils.py @@ -5,6 +5,7 @@ import os import threading from django.conf import settings from redis.exceptions import ConnectionError, TimeoutError +from django.core.cache import cache logger = logging.getLogger(__name__) @@ -145,6 +146,27 @@ def execute_redis_command(redis_client, command_func, default_return=None): logger.error(f"Redis command error: {e}") return default_return +def acquire_task_lock(task_name, id): + """Acquire a lock to prevent concurrent task execution.""" + redis_client = get_redis_client() + lock_id = f"task_lock_{task_name}_{id}" + + # Use the Redis SET command with NX (only set if not exists) and EX (set expiration) + lock_acquired = redis_client.set(lock_id, "locked", ex=300, nx=True) + + if not lock_acquired: + logger.warning(f"Lock for {task_name} and id={id} already acquired. Task will not proceed.") + + return lock_acquired + +def release_task_lock(task_name, id): + """Release the lock after task execution.""" + redis_client = get_redis_client() + lock_id = f"task_lock_{task_name}_{id}" + + # Remove the lock + redis_client.delete(lock_id) + # Initialize the global clients with retry logic # Skip Redis initialization if running as a management command if is_management_command(): @@ -162,4 +184,4 @@ if not is_management_command() and redis_client is not None: pubsub_manager = get_pubsub_manager(redis_client) else: logger.info("PubSub manager not initialized (running as management command or Redis not available)") - pubsub_manager = None \ No newline at end of file + pubsub_manager = None