Add rehash streams endpoint and UI integration for triggering stream rehashing

This commit is contained in:
SergeantPanda 2025-07-11 14:11:41 -05:00
parent fafd93e958
commit 1c7fa21b86
4 changed files with 156 additions and 16 deletions

View file

@ -2,7 +2,7 @@
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .api_views import UserAgentViewSet, StreamProfileViewSet, CoreSettingsViewSet, environment, version
from .api_views import UserAgentViewSet, StreamProfileViewSet, CoreSettingsViewSet, environment, version, rehash_streams_endpoint
router = DefaultRouter()
router.register(r'useragents', UserAgentViewSet, basename='useragent')
@ -12,5 +12,6 @@ router.register(r'settings', CoreSettingsViewSet, basename='settings')
urlpatterns = [
path('settings/env/', environment, name='token_refresh'),
path('version/', version, name='version'),
path('rehash-streams/', rehash_streams_endpoint, name='rehash_streams'),
path('', include(router.urls)),
]

View file

@ -280,3 +280,40 @@ def version(request):
"timestamp": __timestamp__,
}
)
@swagger_auto_schema(
method="post",
operation_description="Trigger rehashing of all streams",
responses={200: "Rehash task started"},
)
@api_view(["POST"])
@permission_classes([Authenticated])
def rehash_streams_endpoint(request):
"""Trigger the rehash streams task"""
try:
# Get the current hash keys from settings
hash_key_setting = CoreSettings.objects.get(key=STREAM_HASH_KEY)
hash_keys = hash_key_setting.value.split(",")
# Queue the rehash task
task = rehash_streams.delay(hash_keys)
return Response({
"success": True,
"message": "Stream rehashing task has been queued",
"task_id": task.id
}, status=status.HTTP_200_OK)
except CoreSettings.DoesNotExist:
return Response({
"success": False,
"message": "Hash key settings not found"
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"Error triggering rehash streams: {e}")
return Response({
"success": False,
"message": "Failed to trigger rehash task"
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

View file

@ -312,32 +312,100 @@ def fetch_channel_stats():
@shared_task
def rehash_streams(keys):
"""
Rehash all streams with new hash keys and handle duplicates.
"""
batch_size = 1000
queryset = Stream.objects.all()
# Track statistics
total_processed = 0
duplicates_merged = 0
hash_keys = {}
total_records = queryset.count()
logger.info(f"Starting rehash of {total_records} streams with keys: {keys}")
for start in range(0, total_records, batch_size):
batch_processed = 0
batch_duplicates = 0
with transaction.atomic():
batch = queryset[start:start + batch_size]
for obj in batch:
stream_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys)
if stream_hash in hash_keys:
# Handle duplicate keys and remove any without channels
stream_channels = ChannelStream.objects.filter(stream_id=obj.id).count()
if stream_channels == 0:
# Generate new hash
new_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys)
# Check if this hash already exists in our tracking dict or in database
if new_hash in hash_keys:
# Found duplicate in current batch - merge the streams
existing_stream_id = hash_keys[new_hash]
existing_stream = Stream.objects.get(id=existing_stream_id)
# Move any channel relationships from duplicate to existing stream
ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream_id)
# Update the existing stream with the most recent data
if obj.updated_at > existing_stream.updated_at:
existing_stream.name = obj.name
existing_stream.url = obj.url
existing_stream.logo_url = obj.logo_url
existing_stream.tvg_id = obj.tvg_id
existing_stream.m3u_account = obj.m3u_account
existing_stream.channel_group = obj.channel_group
existing_stream.custom_properties = obj.custom_properties
existing_stream.last_seen = obj.last_seen
existing_stream.updated_at = obj.updated_at
existing_stream.save()
# Delete the duplicate
obj.delete()
batch_duplicates += 1
else:
# Check if hash already exists in database (from previous batches or existing data)
existing_stream = Stream.objects.filter(stream_hash=new_hash).exclude(id=obj.id).first()
if existing_stream:
# Found duplicate in database - merge the streams
# Move any channel relationships from duplicate to existing stream
ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream.id)
# Update the existing stream with the most recent data
if obj.updated_at > existing_stream.updated_at:
existing_stream.name = obj.name
existing_stream.url = obj.url
existing_stream.logo_url = obj.logo_url
existing_stream.tvg_id = obj.tvg_id
existing_stream.m3u_account = obj.m3u_account
existing_stream.channel_group = obj.channel_group
existing_stream.custom_properties = obj.custom_properties
existing_stream.last_seen = obj.last_seen
existing_stream.updated_at = obj.updated_at
existing_stream.save()
# Delete the duplicate
obj.delete()
continue
batch_duplicates += 1
hash_keys[new_hash] = existing_stream.id
else:
# Update hash for this stream
obj.stream_hash = new_hash
obj.save(update_fields=['stream_hash'])
hash_keys[new_hash] = obj.id
batch_processed += 1
existing_stream_channels = ChannelStream.objects.filter(stream_id=hash_keys[stream_hash]).count()
if existing_stream_channels == 0:
Stream.objects.filter(id=hash_keys[stream_hash]).delete()
total_processed += batch_processed
duplicates_merged += batch_duplicates
obj.stream_hash = stream_hash
obj.save(update_fields=['stream_hash'])
hash_keys[stream_hash] = obj.id
logger.info(f"Rehashed batch {start//batch_size + 1}/{(total_records//batch_size) + 1}: "
f"{batch_processed} processed, {batch_duplicates} duplicates merged")
logger.debug(f"Re-hashed {batch_size} streams")
logger.info(f"Rehashing complete: {total_processed} streams processed, "
f"{duplicates_merged} duplicates merged")
logger.debug(f"Re-hashing complete")
return {
'total_processed': total_processed,
'duplicates_merged': duplicates_merged,
'final_count': total_processed - duplicates_merged
}

View file

@ -47,6 +47,8 @@ const SettingsPage = () => {
useState([]);
const [proxySettingsSaved, setProxySettingsSaved] = useState(false);
const [rehashingStreams, setRehashingStreams] = useState(false);
const [rehashSuccess, setRehashSuccess] = useState(false);
// UI / local storage settings
const [tableSize, setTableSize] = useLocalStorage('table-size', 'default');
@ -245,6 +247,22 @@ const SettingsPage = () => {
}
};
const onRehashStreams = async () => {
setRehashingStreams(true);
setRehashSuccess(false);
try {
await API.post('/core/rehash-streams/');
setRehashSuccess(true);
setTimeout(() => setRehashSuccess(false), 5000); // Clear success message after 5 seconds
} catch (error) {
console.error('Error rehashing streams:', error);
// You might want to add error state handling here
} finally {
setRehashingStreams(false);
}
};
return (
<Center
style={{
@ -395,12 +413,28 @@ const SettingsPage = () => {
key={form.key('m3u-hash-key')}
/>
{rehashSuccess && (
<Alert
variant="light"
color="green"
title="Rehash task queued successfully"
/>
)}
<Flex
mih={50}
gap="xs"
justify="flex-end"
justify="space-between"
align="flex-end"
>
<Button
onClick={onRehashStreams}
loading={rehashingStreams}
variant="outline"
color="blue"
>
Rehash Streams
</Button>
<Button
type="submit"
disabled={form.submitting}