diff --git a/core/api_urls.py b/core/api_urls.py index e30eb698..30714d44 100644 --- a/core/api_urls.py +++ b/core/api_urls.py @@ -2,7 +2,7 @@ from django.urls import path, include from rest_framework.routers import DefaultRouter -from .api_views import UserAgentViewSet, StreamProfileViewSet, CoreSettingsViewSet, environment, version +from .api_views import UserAgentViewSet, StreamProfileViewSet, CoreSettingsViewSet, environment, version, rehash_streams_endpoint router = DefaultRouter() router.register(r'useragents', UserAgentViewSet, basename='useragent') @@ -12,5 +12,6 @@ router.register(r'settings', CoreSettingsViewSet, basename='settings') urlpatterns = [ path('settings/env/', environment, name='token_refresh'), path('version/', version, name='version'), + path('rehash-streams/', rehash_streams_endpoint, name='rehash_streams'), path('', include(router.urls)), ] diff --git a/core/api_views.py b/core/api_views.py index b416cf92..6b9743f6 100644 --- a/core/api_views.py +++ b/core/api_views.py @@ -280,3 +280,40 @@ def version(request): "timestamp": __timestamp__, } ) + + +@swagger_auto_schema( + method="post", + operation_description="Trigger rehashing of all streams", + responses={200: "Rehash task started"}, +) +@api_view(["POST"]) +@permission_classes([Authenticated]) +def rehash_streams_endpoint(request): + """Trigger the rehash streams task""" + try: + # Get the current hash keys from settings + hash_key_setting = CoreSettings.objects.get(key=STREAM_HASH_KEY) + hash_keys = hash_key_setting.value.split(",") + + # Queue the rehash task + task = rehash_streams.delay(hash_keys) + + return Response({ + "success": True, + "message": "Stream rehashing task has been queued", + "task_id": task.id + }, status=status.HTTP_200_OK) + + except CoreSettings.DoesNotExist: + return Response({ + "success": False, + "message": "Hash key settings not found" + }, status=status.HTTP_400_BAD_REQUEST) + + except Exception as e: + logger.error(f"Error triggering rehash streams: {e}") + return Response({ + "success": False, + "message": "Failed to trigger rehash task" + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) diff --git a/core/tasks.py b/core/tasks.py index 0fdaedf7..157ffadc 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -312,32 +312,100 @@ def fetch_channel_stats(): @shared_task def rehash_streams(keys): + """ + Rehash all streams with new hash keys and handle duplicates. + """ batch_size = 1000 queryset = Stream.objects.all() + # Track statistics + total_processed = 0 + duplicates_merged = 0 hash_keys = {} + total_records = queryset.count() + logger.info(f"Starting rehash of {total_records} streams with keys: {keys}") + for start in range(0, total_records, batch_size): + batch_processed = 0 + batch_duplicates = 0 + with transaction.atomic(): batch = queryset[start:start + batch_size] + for obj in batch: - stream_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys) - if stream_hash in hash_keys: - # Handle duplicate keys and remove any without channels - stream_channels = ChannelStream.objects.filter(stream_id=obj.id).count() - if stream_channels == 0: + # Generate new hash + new_hash = Stream.generate_hash_key(obj.name, obj.url, obj.tvg_id, keys) + + # Check if this hash already exists in our tracking dict or in database + if new_hash in hash_keys: + # Found duplicate in current batch - merge the streams + existing_stream_id = hash_keys[new_hash] + existing_stream = Stream.objects.get(id=existing_stream_id) + + # Move any channel relationships from duplicate to existing stream + ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream_id) + + # Update the existing stream with the most recent data + if obj.updated_at > existing_stream.updated_at: + existing_stream.name = obj.name + existing_stream.url = obj.url + existing_stream.logo_url = obj.logo_url + existing_stream.tvg_id = obj.tvg_id + existing_stream.m3u_account = obj.m3u_account + existing_stream.channel_group = obj.channel_group + existing_stream.custom_properties = obj.custom_properties + existing_stream.last_seen = obj.last_seen + existing_stream.updated_at = obj.updated_at + existing_stream.save() + + # Delete the duplicate + obj.delete() + batch_duplicates += 1 + else: + # Check if hash already exists in database (from previous batches or existing data) + existing_stream = Stream.objects.filter(stream_hash=new_hash).exclude(id=obj.id).first() + if existing_stream: + # Found duplicate in database - merge the streams + # Move any channel relationships from duplicate to existing stream + ChannelStream.objects.filter(stream_id=obj.id).update(stream_id=existing_stream.id) + + # Update the existing stream with the most recent data + if obj.updated_at > existing_stream.updated_at: + existing_stream.name = obj.name + existing_stream.url = obj.url + existing_stream.logo_url = obj.logo_url + existing_stream.tvg_id = obj.tvg_id + existing_stream.m3u_account = obj.m3u_account + existing_stream.channel_group = obj.channel_group + existing_stream.custom_properties = obj.custom_properties + existing_stream.last_seen = obj.last_seen + existing_stream.updated_at = obj.updated_at + existing_stream.save() + + # Delete the duplicate obj.delete() - continue + batch_duplicates += 1 + hash_keys[new_hash] = existing_stream.id + else: + # Update hash for this stream + obj.stream_hash = new_hash + obj.save(update_fields=['stream_hash']) + hash_keys[new_hash] = obj.id + batch_processed += 1 - existing_stream_channels = ChannelStream.objects.filter(stream_id=hash_keys[stream_hash]).count() - if existing_stream_channels == 0: - Stream.objects.filter(id=hash_keys[stream_hash]).delete() + total_processed += batch_processed + duplicates_merged += batch_duplicates - obj.stream_hash = stream_hash - obj.save(update_fields=['stream_hash']) - hash_keys[stream_hash] = obj.id + logger.info(f"Rehashed batch {start//batch_size + 1}/{(total_records//batch_size) + 1}: " + f"{batch_processed} processed, {batch_duplicates} duplicates merged") - logger.debug(f"Re-hashed {batch_size} streams") + logger.info(f"Rehashing complete: {total_processed} streams processed, " + f"{duplicates_merged} duplicates merged") - logger.debug(f"Re-hashing complete") + return { + 'total_processed': total_processed, + 'duplicates_merged': duplicates_merged, + 'final_count': total_processed - duplicates_merged + } diff --git a/frontend/src/pages/Settings.jsx b/frontend/src/pages/Settings.jsx index a5b07fa2..865ac6c7 100644 --- a/frontend/src/pages/Settings.jsx +++ b/frontend/src/pages/Settings.jsx @@ -47,6 +47,8 @@ const SettingsPage = () => { useState([]); const [proxySettingsSaved, setProxySettingsSaved] = useState(false); + const [rehashingStreams, setRehashingStreams] = useState(false); + const [rehashSuccess, setRehashSuccess] = useState(false); // UI / local storage settings const [tableSize, setTableSize] = useLocalStorage('table-size', 'default'); @@ -245,6 +247,22 @@ const SettingsPage = () => { } }; + const onRehashStreams = async () => { + setRehashingStreams(true); + setRehashSuccess(false); + + try { + await API.post('/core/rehash-streams/'); + setRehashSuccess(true); + setTimeout(() => setRehashSuccess(false), 5000); // Clear success message after 5 seconds + } catch (error) { + console.error('Error rehashing streams:', error); + // You might want to add error state handling here + } finally { + setRehashingStreams(false); + } + }; + return (
{ key={form.key('m3u-hash-key')} /> + {rehashSuccess && ( + + )} + +