diff --git a/core/tasks.py b/core/tasks.py index 4ee00926..0b0c4705 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -7,7 +7,7 @@ import logging import re import time import os -from core.utils import RedisClient, send_websocket_update +from core.utils import RedisClient, send_websocket_update, acquire_task_lock, release_task_lock from apps.proxy.ts_proxy.channel_status import ChannelStatus from apps.m3u.models import M3UAccount from apps.epg.models import EPGSource @@ -311,78 +311,81 @@ def fetch_channel_stats(): return @shared_task -def rehash_streams(keys): +def rehash_streams(): """ - Rehash all streams with new hash keys and handle duplicates. + Regenerate stream hashes for all streams based on current hash key configuration. + This task checks for and blocks M3U refresh tasks to prevent conflicts. """ - batch_size = 1000 - queryset = Stream.objects.all() + from apps.channels.models import Stream + from apps.m3u.models import M3UAccount - # Track statistics - total_processed = 0 - duplicates_merged = 0 - hash_keys = {} + logger.info("Starting stream rehash process") - total_records = queryset.count() - logger.info(f"Starting rehash of {total_records} streams with keys: {keys}") + # Get all M3U account IDs for locking + m3u_account_ids = list(M3UAccount.objects.filter(is_active=True).values_list('id', flat=True)) - # Send initial WebSocket update - send_websocket_update( - 'updates', - 'update', - { - "success": True, - "type": "stream_rehash", - "action": "starting", - "progress": 0, - "total_records": total_records, - "message": f"Starting rehash of {total_records} streams" - } - ) + # Check if any M3U refresh tasks are currently running + blocked_accounts = [] + for account_id in m3u_account_ids: + if not acquire_task_lock('refresh_single_m3u_account', account_id, timeout=0): + blocked_accounts.append(account_id) - for start in range(0, total_records, batch_size): - batch_processed = 0 - batch_duplicates = 0 + if blocked_accounts: + # Release any locks we did acquire + for account_id in m3u_account_ids: + if account_id not in blocked_accounts: + release_task_lock('refresh_single_m3u_account', account_id) - with transaction.atomic(): - batch = queryset[start:start + batch_size] + logger.warning(f"Rehash blocked: M3U refresh tasks running for accounts: {blocked_accounts}") + return f"Rehash blocked: M3U refresh tasks running for {len(blocked_accounts)} accounts" - for obj in batch: - # Generate new hash - new_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys) + acquired_locks = m3u_account_ids.copy() - # Check if this hash already exists in our tracking dict or in database - if new_hash in hash_keys: - # Found duplicate in current batch - merge the streams - existing_stream_id = hash_keys[new_hash] - existing_stream = Stream.objects.get(id=existing_stream_id) + try: + batch_size = 1000 + queryset = Stream.objects.all() - # Move any channel relationships from duplicate to existing stream - ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream_id) + # Track statistics + total_processed = 0 + duplicates_merged = 0 + hash_keys = {} - # Update the existing stream with the most recent data - if obj.updated_at > existing_stream.updated_at: - existing_stream.name = obj.name - existing_stream.url = obj.url - existing_stream.logo_url = obj.logo_url - existing_stream.tvg_id = obj.tvg_id - existing_stream.m3u_account = obj.m3u_account - existing_stream.channel_group = obj.channel_group - existing_stream.custom_properties = obj.custom_properties - existing_stream.last_seen = obj.last_seen - existing_stream.updated_at = obj.updated_at - existing_stream.save() + total_records = queryset.count() + logger.info(f"Starting rehash of {total_records} streams") + + # Send initial WebSocket update + send_websocket_update( + 'updates', + 'update', + { + "success": True, + "type": "stream_rehash", + "action": "starting", + "progress": 0, + "total_records": total_records, + "message": f"Starting rehash of {total_records} streams" + } + ) + + for start in range(0, total_records, batch_size): + batch_processed = 0 + batch_duplicates = 0 + + with transaction.atomic(): + batch = queryset[start:start + batch_size] + + for obj in batch: + # Generate new hash + new_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys) + + # Check if this hash already exists in our tracking dict or in database + if new_hash in hash_keys: + # Found duplicate in current batch - merge the streams + existing_stream_id = hash_keys[new_hash] + existing_stream = Stream.objects.get(id=existing_stream_id) - # Delete the duplicate - obj.delete() - batch_duplicates += 1 - else: - # Check if hash already exists in database (from previous batches or existing data) - existing_stream = Stream.objects.filter(stream_hash=new_hash).exclude(id=obj.id).first() - if existing_stream: - # Found duplicate in database - merge the streams # Move any channel relationships from duplicate to existing stream - ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream.id) + ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream_id) # Update the existing stream with the most recent data if obj.updated_at > existing_stream.updated_at: @@ -400,65 +403,95 @@ def rehash_streams(keys): # Delete the duplicate obj.delete() batch_duplicates += 1 - hash_keys[new_hash] = existing_stream.id else: - # Update hash for this stream - obj.stream_hash = new_hash - obj.save(update_fields=['stream_hash']) - hash_keys[new_hash] = obj.id + # Check if hash already exists in database (from previous batches or existing data) + existing_stream = Stream.objects.filter(stream_hash=new_hash).exclude(id=obj.id).first() + if existing_stream: + # Found duplicate in database - merge the streams + # Move any channel relationships from duplicate to existing stream + ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream.id) - batch_processed += 1 + # Update the existing stream with the most recent data + if obj.updated_at > existing_stream.updated_at: + existing_stream.name = obj.name + existing_stream.url = obj.url + existing_stream.logo_url = obj.logo_url + existing_stream.tvg_id = obj.tvg_id + existing_stream.m3u_account = obj.m3u_account + existing_stream.channel_group = obj.channel_group + existing_stream.custom_properties = obj.custom_properties + existing_stream.last_seen = obj.last_seen + existing_stream.updated_at = obj.updated_at + existing_stream.save() - total_processed += batch_processed - duplicates_merged += batch_duplicates + # Delete the duplicate + obj.delete() + batch_duplicates += 1 + hash_keys[new_hash] = existing_stream.id + else: + # Update hash for this stream + obj.stream_hash = new_hash + obj.save(update_fields=['stream_hash']) + hash_keys[new_hash] = obj.id - # Calculate progress percentage - progress_percent = int((total_processed / total_records) * 100) - current_batch = start // batch_size + 1 - total_batches = (total_records // batch_size) + 1 + batch_processed += 1 - # Send progress update via WebSocket + total_processed += batch_processed + duplicates_merged += batch_duplicates + + # Calculate progress percentage + progress_percent = int((total_processed / total_records) * 100) + current_batch = start // batch_size + 1 + total_batches = (total_records // batch_size) + 1 + + # Send progress update via WebSocket + send_websocket_update( + 'updates', + 'update', + { + "success": True, + "type": "stream_rehash", + "action": "processing", + "progress": progress_percent, + "batch": current_batch, + "total_batches": total_batches, + "processed": total_processed, + "duplicates_merged": duplicates_merged, + "message": f"Processed batch {current_batch}/{total_batches}: {batch_processed} streams, {batch_duplicates} duplicates merged" + } + ) + + logger.info(f"Rehashed batch {current_batch}/{total_batches}: " + f"{batch_processed} processed, {batch_duplicates} duplicates merged") + + logger.info(f"Rehashing complete: {total_processed} streams processed, " + f"{duplicates_merged} duplicates merged") + + # Send completion update via WebSocket send_websocket_update( 'updates', 'update', { "success": True, "type": "stream_rehash", - "action": "processing", - "progress": progress_percent, - "batch": current_batch, - "total_batches": total_batches, - "processed": total_processed, + "action": "completed", + "progress": 100, + "total_processed": total_processed, "duplicates_merged": duplicates_merged, - "message": f"Processed batch {current_batch}/{total_batches}: {batch_processed} streams, {batch_duplicates} duplicates merged" - } + "final_count": total_processed - duplicates_merged, + "message": f"Rehashing complete: {total_processed} streams processed, {duplicates_merged} duplicates merged" + }, + collect_garbage=True # Force garbage collection after completion ) - logger.info(f"Rehashed batch {current_batch}/{total_batches}: " - f"{batch_processed} processed, {batch_duplicates} duplicates merged") + logger.info("Stream rehash completed successfully") + return f"Successfully rehashed {total_processed} streams" - logger.info(f"Rehashing complete: {total_processed} streams processed, " - f"{duplicates_merged} duplicates merged") - - # Send completion update via WebSocket - send_websocket_update( - 'updates', - 'update', - { - "success": True, - "type": "stream_rehash", - "action": "completed", - "progress": 100, - "total_processed": total_processed, - "duplicates_merged": duplicates_merged, - "final_count": total_processed - duplicates_merged, - "message": f"Rehashing complete: {total_processed} streams processed, {duplicates_merged} duplicates merged" - }, - collect_garbage=True # Force garbage collection after completion - ) - - return { - 'total_processed': total_processed, - 'duplicates_merged': duplicates_merged, - 'final_count': total_processed - duplicates_merged - } + except Exception as e: + logger.error(f"Error during stream rehash: {e}") + raise + finally: + # Always release all acquired M3U locks + for account_id in acquired_locks: + release_task_lock('refresh_single_m3u_account', account_id) + logger.info(f"Released M3U task locks for {len(acquired_locks)} accounts")