From 8ec489d26f8504135911980aceb534685ae50737 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 11 Jul 2025 14:47:12 -0500 Subject: [PATCH] Send websocket updates during rehash. --- core/tasks.py | 55 +++++++++++++++++++++++++++++++++++++- frontend/src/WebSocket.jsx | 37 +++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/core/tasks.py b/core/tasks.py index 157ffadc..4ee00926 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -326,6 +326,20 @@ def rehash_streams(keys): total_records = queryset.count() logger.info(f"Starting rehash of {total_records} streams with keys: {keys}") + # Send initial WebSocket update + send_websocket_update( + 'updates', + 'update', + { + "success": True, + "type": "stream_rehash", + "action": "starting", + "progress": 0, + "total_records": total_records, + "message": f"Starting rehash of {total_records} streams" + } + ) + for start in range(0, total_records, batch_size): batch_processed = 0 batch_duplicates = 0 @@ -398,12 +412,51 @@ def rehash_streams(keys): total_processed += batch_processed duplicates_merged += batch_duplicates - logger.info(f"Rehashed batch {start//batch_size + 1}/{(total_records//batch_size) + 1}: " + # Calculate progress percentage + progress_percent = int((total_processed / total_records) * 100) + current_batch = start // batch_size + 1 + total_batches = (total_records // batch_size) + 1 + + # Send progress update via WebSocket + send_websocket_update( + 'updates', + 'update', + { + "success": True, + "type": "stream_rehash", + "action": "processing", + "progress": progress_percent, + "batch": current_batch, + "total_batches": total_batches, + "processed": total_processed, + "duplicates_merged": duplicates_merged, + "message": f"Processed batch {current_batch}/{total_batches}: {batch_processed} streams, {batch_duplicates} duplicates merged" + } + ) + + logger.info(f"Rehashed batch {current_batch}/{total_batches}: " f"{batch_processed} processed, {batch_duplicates} duplicates merged") logger.info(f"Rehashing complete: {total_processed} streams processed, " f"{duplicates_merged} duplicates merged") + # Send completion update via WebSocket + send_websocket_update( + 'updates', + 'update', + { + "success": True, + "type": "stream_rehash", + "action": "completed", + "progress": 100, + "total_processed": total_processed, + "duplicates_merged": duplicates_merged, + "final_count": total_processed - duplicates_merged, + "message": f"Rehashing complete: {total_processed} streams processed, {duplicates_merged} duplicates merged" + }, + collect_garbage=True # Force garbage collection after completion + ) + return { 'total_processed': total_processed, 'duplicates_merged': duplicates_merged, diff --git a/frontend/src/WebSocket.jsx b/frontend/src/WebSocket.jsx index aeed8826..314f82e9 100644 --- a/frontend/src/WebSocket.jsx +++ b/frontend/src/WebSocket.jsx @@ -372,6 +372,43 @@ export const WebsocketProvider = ({ children }) => { } break; + case 'stream_rehash': + // Handle stream rehash progress updates + if (parsedEvent.data.action === 'starting') { + notifications.show({ + id: 'stream-rehash-progress', // Persistent ID + title: 'Stream Rehash Started', + message: parsedEvent.data.message, + color: 'blue.5', + autoClose: false, // Don't auto-close + withCloseButton: false, // No close button during processing + loading: true, // Show loading indicator + }); + } else if (parsedEvent.data.action === 'processing') { + // Update the existing notification + notifications.update({ + id: 'stream-rehash-progress', + title: 'Stream Rehash in Progress', + message: `${parsedEvent.data.progress}% complete - ${parsedEvent.data.processed} streams processed, ${parsedEvent.data.duplicates_merged} duplicates merged`, + color: 'blue.5', + autoClose: false, + withCloseButton: false, + loading: true, + }); + } else if (parsedEvent.data.action === 'completed') { + // Update to completion state + notifications.update({ + id: 'stream-rehash-progress', + title: 'Stream Rehash Complete', + message: `Processed ${parsedEvent.data.total_processed} streams, merged ${parsedEvent.data.duplicates_merged} duplicates. Final count: ${parsedEvent.data.final_count}`, + color: 'green.5', + autoClose: 8000, // Auto-close after completion + withCloseButton: true, // Allow manual close + loading: false, // Remove loading indicator + }); + } + break; + default: console.error( `Unknown websocket event type: ${parsedEvent.data?.type}`