Acquire locks when rehash starts.

This commit is contained in:
SergeantPanda 2025-07-11 15:13:29 -05:00
parent 8ec489d26f
commit 073fe72a49

View file

@ -7,7 +7,7 @@ import logging
import re
import time
import os
from core.utils import RedisClient, send_websocket_update
from core.utils import RedisClient, send_websocket_update, acquire_task_lock, release_task_lock
from apps.proxy.ts_proxy.channel_status import ChannelStatus
from apps.m3u.models import M3UAccount
from apps.epg.models import EPGSource
@ -311,78 +311,81 @@ def fetch_channel_stats():
return
@shared_task
def rehash_streams(keys):
def rehash_streams():
"""
Rehash all streams with new hash keys and handle duplicates.
Regenerate stream hashes for all streams based on current hash key configuration.
This task checks for and blocks M3U refresh tasks to prevent conflicts.
"""
batch_size = 1000
queryset = Stream.objects.all()
from apps.channels.models import Stream
from apps.m3u.models import M3UAccount
# Track statistics
total_processed = 0
duplicates_merged = 0
hash_keys = {}
logger.info("Starting stream rehash process")
total_records = queryset.count()
logger.info(f"Starting rehash of {total_records} streams with keys: {keys}")
# Get all M3U account IDs for locking
m3u_account_ids = list(M3UAccount.objects.filter(is_active=True).values_list('id', flat=True))
# Send initial WebSocket update
send_websocket_update(
'updates',
'update',
{
"success": True,
"type": "stream_rehash",
"action": "starting",
"progress": 0,
"total_records": total_records,
"message": f"Starting rehash of {total_records} streams"
}
)
# Check if any M3U refresh tasks are currently running
blocked_accounts = []
for account_id in m3u_account_ids:
if not acquire_task_lock('refresh_single_m3u_account', account_id, timeout=0):
blocked_accounts.append(account_id)
for start in range(0, total_records, batch_size):
batch_processed = 0
batch_duplicates = 0
if blocked_accounts:
# Release any locks we did acquire
for account_id in m3u_account_ids:
if account_id not in blocked_accounts:
release_task_lock('refresh_single_m3u_account', account_id)
with transaction.atomic():
batch = queryset[start:start + batch_size]
logger.warning(f"Rehash blocked: M3U refresh tasks running for accounts: {blocked_accounts}")
return f"Rehash blocked: M3U refresh tasks running for {len(blocked_accounts)} accounts"
for obj in batch:
# Generate new hash
new_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys)
acquired_locks = m3u_account_ids.copy()
# Check if this hash already exists in our tracking dict or in database
if new_hash in hash_keys:
# Found duplicate in current batch - merge the streams
existing_stream_id = hash_keys[new_hash]
existing_stream = Stream.objects.get(id=existing_stream_id)
try:
batch_size = 1000
queryset = Stream.objects.all()
# Move any channel relationships from duplicate to existing stream
ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream_id)
# Track statistics
total_processed = 0
duplicates_merged = 0
hash_keys = {}
# Update the existing stream with the most recent data
if obj.updated_at > existing_stream.updated_at:
existing_stream.name = obj.name
existing_stream.url = obj.url
existing_stream.logo_url = obj.logo_url
existing_stream.tvg_id = obj.tvg_id
existing_stream.m3u_account = obj.m3u_account
existing_stream.channel_group = obj.channel_group
existing_stream.custom_properties = obj.custom_properties
existing_stream.last_seen = obj.last_seen
existing_stream.updated_at = obj.updated_at
existing_stream.save()
total_records = queryset.count()
logger.info(f"Starting rehash of {total_records} streams")
# Send initial WebSocket update
send_websocket_update(
'updates',
'update',
{
"success": True,
"type": "stream_rehash",
"action": "starting",
"progress": 0,
"total_records": total_records,
"message": f"Starting rehash of {total_records} streams"
}
)
for start in range(0, total_records, batch_size):
batch_processed = 0
batch_duplicates = 0
with transaction.atomic():
batch = queryset[start:start + batch_size]
for obj in batch:
# Generate new hash
new_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys)
# Check if this hash already exists in our tracking dict or in database
if new_hash in hash_keys:
# Found duplicate in current batch - merge the streams
existing_stream_id = hash_keys[new_hash]
existing_stream = Stream.objects.get(id=existing_stream_id)
# Delete the duplicate
obj.delete()
batch_duplicates += 1
else:
# Check if hash already exists in database (from previous batches or existing data)
existing_stream = Stream.objects.filter(stream_hash=new_hash).exclude(id=obj.id).first()
if existing_stream:
# Found duplicate in database - merge the streams
# Move any channel relationships from duplicate to existing stream
ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream.id)
ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream_id)
# Update the existing stream with the most recent data
if obj.updated_at > existing_stream.updated_at:
@ -400,65 +403,95 @@ def rehash_streams(keys):
# Delete the duplicate
obj.delete()
batch_duplicates += 1
hash_keys[new_hash] = existing_stream.id
else:
# Update hash for this stream
obj.stream_hash = new_hash
obj.save(update_fields=['stream_hash'])
hash_keys[new_hash] = obj.id
# Check if hash already exists in database (from previous batches or existing data)
existing_stream = Stream.objects.filter(stream_hash=new_hash).exclude(id=obj.id).first()
if existing_stream:
# Found duplicate in database - merge the streams
# Move any channel relationships from duplicate to existing stream
ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream.id)
batch_processed += 1
# Update the existing stream with the most recent data
if obj.updated_at > existing_stream.updated_at:
existing_stream.name = obj.name
existing_stream.url = obj.url
existing_stream.logo_url = obj.logo_url
existing_stream.tvg_id = obj.tvg_id
existing_stream.m3u_account = obj.m3u_account
existing_stream.channel_group = obj.channel_group
existing_stream.custom_properties = obj.custom_properties
existing_stream.last_seen = obj.last_seen
existing_stream.updated_at = obj.updated_at
existing_stream.save()
total_processed += batch_processed
duplicates_merged += batch_duplicates
# Delete the duplicate
obj.delete()
batch_duplicates += 1
hash_keys[new_hash] = existing_stream.id
else:
# Update hash for this stream
obj.stream_hash = new_hash
obj.save(update_fields=['stream_hash'])
hash_keys[new_hash] = obj.id
# Calculate progress percentage
progress_percent = int((total_processed / total_records) * 100)
current_batch = start // batch_size + 1
total_batches = (total_records // batch_size) + 1
batch_processed += 1
# Send progress update via WebSocket
total_processed += batch_processed
duplicates_merged += batch_duplicates
# Calculate progress percentage
progress_percent = int((total_processed / total_records) * 100)
current_batch = start // batch_size + 1
total_batches = (total_records // batch_size) + 1
# Send progress update via WebSocket
send_websocket_update(
'updates',
'update',
{
"success": True,
"type": "stream_rehash",
"action": "processing",
"progress": progress_percent,
"batch": current_batch,
"total_batches": total_batches,
"processed": total_processed,
"duplicates_merged": duplicates_merged,
"message": f"Processed batch {current_batch}/{total_batches}: {batch_processed} streams, {batch_duplicates} duplicates merged"
}
)
logger.info(f"Rehashed batch {current_batch}/{total_batches}: "
f"{batch_processed} processed, {batch_duplicates} duplicates merged")
logger.info(f"Rehashing complete: {total_processed} streams processed, "
f"{duplicates_merged} duplicates merged")
# Send completion update via WebSocket
send_websocket_update(
'updates',
'update',
{
"success": True,
"type": "stream_rehash",
"action": "processing",
"progress": progress_percent,
"batch": current_batch,
"total_batches": total_batches,
"processed": total_processed,
"action": "completed",
"progress": 100,
"total_processed": total_processed,
"duplicates_merged": duplicates_merged,
"message": f"Processed batch {current_batch}/{total_batches}: {batch_processed} streams, {batch_duplicates} duplicates merged"
}
"final_count": total_processed - duplicates_merged,
"message": f"Rehashing complete: {total_processed} streams processed, {duplicates_merged} duplicates merged"
},
collect_garbage=True # Force garbage collection after completion
)
logger.info(f"Rehashed batch {current_batch}/{total_batches}: "
f"{batch_processed} processed, {batch_duplicates} duplicates merged")
logger.info("Stream rehash completed successfully")
return f"Successfully rehashed {total_processed} streams"
logger.info(f"Rehashing complete: {total_processed} streams processed, "
f"{duplicates_merged} duplicates merged")
# Send completion update via WebSocket
send_websocket_update(
'updates',
'update',
{
"success": True,
"type": "stream_rehash",
"action": "completed",
"progress": 100,
"total_processed": total_processed,
"duplicates_merged": duplicates_merged,
"final_count": total_processed - duplicates_merged,
"message": f"Rehashing complete: {total_processed} streams processed, {duplicates_merged} duplicates merged"
},
collect_garbage=True # Force garbage collection after completion
)
return {
'total_processed': total_processed,
'duplicates_merged': duplicates_merged,
'final_count': total_processed - duplicates_merged
}
except Exception as e:
logger.error(f"Error during stream rehash: {e}")
raise
finally:
# Always release all acquired M3U locks
for account_id in acquired_locks:
release_task_lock('refresh_single_m3u_account', account_id)
logger.info(f"Released M3U task locks for {len(acquired_locks)} accounts")