From cf7ea093415a05da7b6fae84ccb2d9f1c4cbc4c6 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 13 Sep 2025 19:59:43 -0500 Subject: [PATCH] Implement asynchronous bulk channel creation from stream IDs with WebSocket progress updates --- apps/channels/api_views.py | 61 +++- apps/channels/tasks.py | 272 ++++++++++++++++++ frontend/src/WebSocket.jsx | 42 +++ frontend/src/api.js | 25 ++ .../src/components/tables/StreamsTable.jsx | 193 +++++++------ 5 files changed, 499 insertions(+), 94 deletions(-) diff --git a/apps/channels/api_views.py b/apps/channels/api_views.py index efc9073a..d965d385 100644 --- a/apps/channels/api_views.py +++ b/apps/channels/api_views.py @@ -202,7 +202,7 @@ class StreamViewSet(viewsets.ModelViewSet): {"error": "ids must be a list of integers"}, status=status.HTTP_400_BAD_REQUEST, ) - + streams = Stream.objects.filter(id__in=ids) serializer = self.get_serializer(streams, many=True) return Response(serializer.data) @@ -958,6 +958,65 @@ class ChannelViewSet(viewsets.ModelViewSet): return Response(response_data, status=status.HTTP_201_CREATED) + @swagger_auto_schema( + method="post", + operation_description=( + "Asynchronously bulk create channels from stream IDs. " + "Returns a task ID to track progress via WebSocket. " + "This is the recommended approach for large bulk operations." + ), + request_body=openapi.Schema( + type=openapi.TYPE_OBJECT, + required=["stream_ids"], + properties={ + "stream_ids": openapi.Schema( + type=openapi.TYPE_ARRAY, + items=openapi.Items(type=openapi.TYPE_INTEGER), + description="List of stream IDs to create channels from" + ), + "channel_profile_ids": openapi.Schema( + type=openapi.TYPE_ARRAY, + items=openapi.Items(type=openapi.TYPE_INTEGER), + description="(Optional) Channel profile ID(s) to add the channels to. If not provided, channels are added to all profiles." + ), + }, + ), + responses={202: "Task started successfully"}, + ) + @action(detail=False, methods=["post"], url_path="from-stream/bulk-async") + def from_stream_bulk_async(self, request): + from .tasks import bulk_create_channels_from_streams + + stream_ids = request.data.get("stream_ids", []) + channel_profile_ids = request.data.get("channel_profile_ids") + + if not stream_ids: + return Response( + {"error": "stream_ids is required and cannot be empty"}, + status=status.HTTP_400_BAD_REQUEST, + ) + + if not isinstance(stream_ids, list): + return Response( + {"error": "stream_ids must be a list of integers"}, + status=status.HTTP_400_BAD_REQUEST, + ) + + # Normalize channel_profile_ids to array if single ID provided + if channel_profile_ids is not None: + if not isinstance(channel_profile_ids, list): + channel_profile_ids = [channel_profile_ids] + + # Start the async task + task = bulk_create_channels_from_streams.delay(stream_ids, channel_profile_ids) + + return Response({ + "task_id": task.id, + "message": f"Bulk channel creation task started for {len(stream_ids)} streams", + "stream_count": len(stream_ids), + "status": "started" + }, status=status.HTTP_202_ACCEPTED) + # ───────────────────────────────────────────────────────── # 6) EPG Fuzzy Matching # ───────────────────────────────────────────────────────── diff --git a/apps/channels/tasks.py b/apps/channels/tasks.py index 5135ddaf..612ee27f 100755 --- a/apps/channels/tasks.py +++ b/apps/channels/tasks.py @@ -1597,3 +1597,275 @@ def prefetch_recording_artwork(recording_id): except Exception as e: logger.debug(f"prefetch_recording_artwork failed: {e}") return f"error: {e}" + + +@shared_task(bind=True) +def bulk_create_channels_from_streams(self, stream_ids, channel_profile_ids=None): + """ + Asynchronously create channels from a list of stream IDs. + Provides progress updates via WebSocket. + """ + from apps.channels.models import Stream, Channel, ChannelGroup, ChannelProfile, ChannelProfileMembership, Logo + from apps.epg.models import EPGData + from django.db import transaction + from django.shortcuts import get_object_or_404 + from core.utils import send_websocket_update + + task_id = self.request.id + total_streams = len(stream_ids) + created_channels = [] + errors = [] + + try: + # Send initial progress update + send_websocket_update('updates', 'update', { + 'type': 'bulk_channel_creation_progress', + 'task_id': task_id, + 'progress': 0, + 'total': total_streams, + 'status': 'starting', + 'message': f'Starting bulk creation of {total_streams} channels...' + }) + + # Gather current used numbers once + used_numbers = set(Channel.objects.all().values_list("channel_number", flat=True)) + next_number = 1 + + def get_auto_number(): + nonlocal next_number + while next_number in used_numbers: + next_number += 1 + used_numbers.add(next_number) + return next_number + + logos_to_create = [] + channels_to_create = [] + streams_map = [] + logo_map = [] + profile_map = [] + + # Process streams in batches to avoid memory issues + batch_size = 100 + processed = 0 + + for i in range(0, total_streams, batch_size): + batch_stream_ids = stream_ids[i:i + batch_size] + batch_streams = Stream.objects.filter(id__in=batch_stream_ids) + + # Send progress update + send_websocket_update('updates', 'update', { + 'type': 'bulk_channel_creation_progress', + 'task_id': task_id, + 'progress': processed, + 'total': total_streams, + 'status': 'processing', + 'message': f'Processing streams {processed + 1}-{min(processed + batch_size, total_streams)} of {total_streams}...' + }) + + for stream in batch_streams: + try: + name = stream.name + channel_group = stream.channel_group + stream_custom_props = stream.custom_properties or {} + + # Determine channel number + channel_number = None + if "tvg-chno" in stream_custom_props: + channel_number = float(stream_custom_props["tvg-chno"]) + elif "channel-number" in stream_custom_props: + channel_number = float(stream_custom_props["channel-number"]) + elif "num" in stream_custom_props: + channel_number = float(stream_custom_props["num"]) + + # Get TVC guide station ID + tvc_guide_stationid = None + if "tvc-guide-stationid" in stream_custom_props: + tvc_guide_stationid = stream_custom_props["tvc-guide-stationid"] + + if channel_number is None: + channel_number = get_auto_number() + elif ( + channel_number in used_numbers + or Channel.objects.filter(channel_number=channel_number).exists() + ): + channel_number = get_auto_number() + else: + used_numbers.add(channel_number) + + channel_data = { + "channel_number": channel_number, + "name": name, + "tvc_guide_stationid": tvc_guide_stationid, + "tvg_id": stream.tvg_id, + } + + # Only add channel_group_id if the stream has a channel group + if channel_group: + channel_data["channel_group_id"] = channel_group.id + + # Attempt to find existing EPGs with the same tvg-id + epgs = EPGData.objects.filter(tvg_id=stream.tvg_id) + if epgs: + channel_data["epg_data_id"] = epgs.first().id + + channel = Channel(**channel_data) + channels_to_create.append(channel) + streams_map.append([stream.id]) + + # Store profile IDs for this channel + profile_map.append(channel_profile_ids) + + # Handle logo + if stream.logo_url: + logos_to_create.append( + Logo( + url=stream.logo_url, + name=stream.name or stream.tvg_id, + ) + ) + logo_map.append(stream.logo_url) + else: + logo_map.append(None) + + processed += 1 + + except Exception as e: + errors.append({ + 'stream_id': stream.id if 'stream' in locals() else 'unknown', + 'error': str(e) + }) + processed += 1 + + # Create logos first + if logos_to_create: + send_websocket_update('updates', 'update', { + 'type': 'bulk_channel_creation_progress', + 'task_id': task_id, + 'progress': processed, + 'total': total_streams, + 'status': 'creating_logos', + 'message': f'Creating {len(logos_to_create)} logos...' + }) + Logo.objects.bulk_create(logos_to_create, ignore_conflicts=True) + + # Get logo objects for association + channel_logos = { + logo.url: logo + for logo in Logo.objects.filter( + url__in=[url for url in logo_map if url is not None] + ) + } + + # Create channels in database + if channels_to_create: + send_websocket_update('updates', 'update', { + 'type': 'bulk_channel_creation_progress', + 'task_id': task_id, + 'progress': processed, + 'total': total_streams, + 'status': 'creating_channels', + 'message': f'Creating {len(channels_to_create)} channels in database...' + }) + + with transaction.atomic(): + created_channels = Channel.objects.bulk_create(channels_to_create) + + # Update channels with logos and create stream associations + update = [] + channel_stream_associations = [] + channel_profile_memberships = [] + + for channel, stream_ids, logo_url, profile_ids in zip( + created_channels, streams_map, logo_map, profile_map + ): + if logo_url: + channel.logo = channel_logos[logo_url] + update.append(channel) + + # Create stream associations + for stream_id in stream_ids: + from apps.channels.models import ChannelStream + channel_stream_associations.append( + ChannelStream(channel=channel, stream_id=stream_id, order=0) + ) + + # Handle channel profile membership + if profile_ids: + try: + specific_profiles = ChannelProfile.objects.filter(id__in=profile_ids) + channel_profile_memberships.extend([ + ChannelProfileMembership( + channel_profile=profile, + channel=channel, + enabled=True + ) + for profile in specific_profiles + ]) + except Exception as e: + errors.append({ + 'channel_id': channel.id, + 'error': f'Failed to add to profiles: {str(e)}' + }) + else: + # Add to all profiles by default + all_profiles = ChannelProfile.objects.all() + channel_profile_memberships.extend([ + ChannelProfileMembership( + channel_profile=profile, + channel=channel, + enabled=True + ) + for profile in all_profiles + ]) + + # Bulk update channels with logos + if update: + Channel.objects.bulk_update(update, ["logo"]) + + # Bulk create channel-stream associations + if channel_stream_associations: + from apps.channels.models import ChannelStream + ChannelStream.objects.bulk_create(channel_stream_associations, ignore_conflicts=True) + + # Bulk create profile memberships + if channel_profile_memberships: + ChannelProfileMembership.objects.bulk_create(channel_profile_memberships, ignore_conflicts=True) + + # Send completion update + send_websocket_update('updates', 'update', { + 'type': 'bulk_channel_creation_progress', + 'task_id': task_id, + 'progress': total_streams, + 'total': total_streams, + 'status': 'completed', + 'message': f'Successfully created {len(created_channels)} channels', + 'created_count': len(created_channels), + 'error_count': len(errors), + 'errors': errors[:10] # Send first 10 errors only + }) + + # Send general channel update notification + send_websocket_update('updates', 'update', { + 'type': 'channels_created', + 'count': len(created_channels) + }) + + return { + 'status': 'completed', + 'created_count': len(created_channels), + 'error_count': len(errors), + 'errors': errors + } + + except Exception as e: + logger.error(f"Bulk channel creation failed: {e}") + send_websocket_update('updates', 'update', { + 'type': 'bulk_channel_creation_progress', + 'task_id': task_id, + 'progress': 0, + 'total': total_streams, + 'status': 'failed', + 'message': f'Task failed: {str(e)}', + 'error': str(e) + }) + raise diff --git a/frontend/src/WebSocket.jsx b/frontend/src/WebSocket.jsx index 819a923f..ef0a9cfc 100644 --- a/frontend/src/WebSocket.jsx +++ b/frontend/src/WebSocket.jsx @@ -549,6 +549,48 @@ export const WebsocketProvider = ({ children }) => { }); break; + case 'channels_created': + // General notification for channel creation + notifications.show({ + title: 'Channels Created', + message: `Successfully created ${parsedEvent.data.count || 'multiple'} channels`, + color: 'green', + autoClose: 4000, + }); + + // Refresh the channels table to show new channels + try { + await useChannelsStore.getState().fetchChannels(); + console.log('Channels refreshed after bulk creation'); + } catch (error) { + console.error( + 'Error refreshing channels after creation:', + error + ); + } + + break; + + case 'bulk_channel_creation_progress': + // Handle completion status globally to refresh channels + if (parsedEvent.data.status === 'completed') { + try { + await API.requeryChannels(); + console.log( + 'Channels refreshed after bulk creation completion' + ); + } catch (error) { + console.error( + 'Error refreshing channels after bulk completion:', + error + ); + } + } + + // Pass through to individual components for progress updates + setVal(parsedEvent); + break; + default: console.error( `Unknown websocket event type: ${parsedEvent.data?.type}` diff --git a/frontend/src/api.js b/frontend/src/api.js index b3cf5205..29d018bd 100644 --- a/frontend/src/api.js +++ b/frontend/src/api.js @@ -570,6 +570,31 @@ export default class API { } } + static async createChannelsFromStreamsAsync(streamIds, channelProfileIds = null) { + try { + const requestBody = { + stream_ids: streamIds, + }; + + if (channelProfileIds !== null) { + requestBody.channel_profile_ids = channelProfileIds; + } + + const response = await request( + `${host}/api/channels/channels/from-stream/bulk-async/`, + { + method: 'POST', + body: requestBody, + } + ); + + return response; + } catch (e) { + errorNotification('Failed to start bulk channel creation task', e); + throw e; + } + } + static async getStreams(ids = null) { try { const params = new URLSearchParams(); diff --git a/frontend/src/components/tables/StreamsTable.jsx b/frontend/src/components/tables/StreamsTable.jsx index a880f39b..7424a656 100644 --- a/frontend/src/components/tables/StreamsTable.jsx +++ b/frontend/src/components/tables/StreamsTable.jsx @@ -1,10 +1,17 @@ -import React, { useEffect, useMemo, useCallback, useState } from 'react'; +import React, { + useEffect, + useMemo, + useCallback, + useState, + useContext, +} from 'react'; import API from '../../api'; import StreamForm from '../forms/Stream'; import usePlaylistsStore from '../../store/playlists'; import useChannelsStore from '../../store/channels'; import useLogosStore from '../../store/logos'; import { copyToClipboard, useDebounce } from '../../utils'; +import { WebsocketContext } from '../../WebSocket'; import { SquarePlus, ListPlus, @@ -198,6 +205,10 @@ const StreamsTable = () => { const [selectedStreamIds, setSelectedStreamIds] = useState([]); const [isCreatingChannels, setIsCreatingChannels] = useState(false); const [creationProgress, setCreationProgress] = useState(''); + const [activeTaskId, setActiveTaskId] = useState(null); + + // WebSocket context for real-time updates + const [, , wsValue] = useContext(WebsocketContext); // const [allRowsSelected, setAllRowsSelected] = useState(false); // Add local storage for page size @@ -421,104 +432,38 @@ const StreamsTable = () => { ]); // Bulk creation: create channels from selected streams in one API call + // Bulk creation: create channels from selected streams asynchronously const createChannelsFromStreams = async () => { - setIsLoading(true); + if (selectedStreamIds.length === 0) return; + setIsCreatingChannels(true); - setCreationProgress('Preparing channel creation...'); - + setCreationProgress('Starting bulk channel creation...'); + try { const selectedChannelProfileId = useChannelsStore.getState().selectedProfileId; - // For very large selections (>1000), process in smaller batches - const BATCH_SIZE = 1000; - const shouldBatch = selectedStreamIds.length > BATCH_SIZE; - - if (shouldBatch) { - console.log(`Processing ${selectedStreamIds.length} streams in batches of ${BATCH_SIZE}`); - - for (let i = 0; i < selectedStreamIds.length; i += BATCH_SIZE) { - const batchIds = selectedStreamIds.slice(i, i + BATCH_SIZE); - const batchNumber = Math.floor(i / BATCH_SIZE) + 1; - const totalBatches = Math.ceil(selectedStreamIds.length / BATCH_SIZE); - - setCreationProgress(`Processing batch ${batchNumber}/${totalBatches} (${batchIds.length} streams)...`); - console.log(`Processing batch ${batchNumber}/${totalBatches}`); - - // Try to fetch the actual stream data for this batch - let streamsData = []; - try { - streamsData = await API.getStreamsByIds(batchIds); - console.log(`Successfully fetched ${streamsData.length} streams for batch`); - } catch (error) { - console.warn('Could not fetch stream details for batch, using IDs only:', error); - } + // Use the async API for all bulk operations + const response = await API.createChannelsFromStreamsAsync( + selectedStreamIds, + selectedChannelProfileId !== '0' ? [selectedChannelProfileId] : null + ); - const streamData = batchIds.map((streamId) => { - const stream = streamsData.find((s) => s.id === streamId); - return { - stream_id: streamId, - name: stream?.name || `Stream ${streamId}`, - ...(selectedChannelProfileId !== '0' && { - channel_profile_ids: selectedChannelProfileId, - }), - }; - }); + setActiveTaskId(response.task_id); + setCreationProgress( + `Task started for ${response.stream_count} streams. Processing in background...` + ); - await API.createChannelsFromStreams(streamData); - } - } else { - // Process all at once for smaller selections - setCreationProgress(`Fetching stream data for ${selectedStreamIds.length} streams...`); - - let streamsData = []; - try { - streamsData = await API.getStreamsByIds(selectedStreamIds); - console.log(`Successfully fetched ${streamsData.length} streams`); - } catch (error) { - console.warn('Could not fetch stream details, using IDs only:', error); - // If fetching streams fails, we'll create channels with fallback names - // This happens when there are too many streams or network issues - } - - setCreationProgress(`Creating ${selectedStreamIds.length} channels...`); - - const streamData = selectedStreamIds.map((streamId) => { - const stream = streamsData.find((s) => s.id === streamId); - return { - stream_id: streamId, - name: stream?.name || `Stream ${streamId}`, - ...(selectedChannelProfileId !== '0' && { - channel_profile_ids: selectedChannelProfileId, - }), - }; - }); - - console.log(`Creating ${streamData.length} channels from selected streams`); - await API.createChannelsFromStreams(streamData); - } - - setCreationProgress('Refreshing channels...'); - await API.requeryChannels(); - - // Refresh channel profiles to update the membership information - await useChannelsStore.getState().fetchChannelProfiles(); - - fetchLogos(); - - // Clear selection and refresh data - setSelectedStreamIds([]); - await fetchData(); - - setCreationProgress(''); + console.log( + `Bulk creation task started: ${response.task_id} for ${response.stream_count} streams` + ); + console.log('Active task ID set to:', response.task_id); } catch (error) { - console.error('Error creating channels from streams:', error); - setCreationProgress('Error occurred during channel creation'); - } finally { - setIsLoading(false); + console.error('Error starting bulk channel creation:', error); + setCreationProgress('Error starting bulk channel creation'); setIsCreatingChannels(false); - // Clear progress after a short delay to let user see completion/error - setTimeout(() => setCreationProgress(''), 3000); + // Clear error message after delay + setTimeout(() => setCreationProgress(''), 5000); } }; @@ -744,6 +689,69 @@ const StreamsTable = () => { fetchData(); }, [fetchData]); + // Listen for WebSocket updates for bulk creation progress + useEffect(() => { + if (wsValue) { + console.log('WebSocket message received:', wsValue); + + if ( + wsValue.data && + wsValue.data.type === 'bulk_channel_creation_progress' + ) { + const data = wsValue.data; + console.log('Bulk creation progress update:', data); + + // Only handle progress for our active task + if (activeTaskId && data.task_id === activeTaskId) { + const progress = data.progress || 0; + const total = data.total || 0; + const status = data.status; + const message = data.message; + + console.log( + `Task ${activeTaskId} progress: ${progress}/${total} (${status})` + ); + + if (status === 'completed') { + setCreationProgress( + `✅ Completed! Created ${data.created_count} channels` + ); + setIsCreatingChannels(false); + setActiveTaskId(null); + + // Clear selection and refresh data + setSelectedStreamIds([]); + fetchData(); + // Note: API.requeryChannels() is called by WebSocket handler globally + + // Clear success message after delay + setTimeout(() => setCreationProgress(''), 5000); + } else if (status === 'failed') { + setCreationProgress( + `❌ Task failed: ${data.error || 'Unknown error'}` + ); + setIsCreatingChannels(false); + setActiveTaskId(null); + + // Clear error message after longer delay + setTimeout(() => setCreationProgress(''), 10000); + } else { + // Update progress + const progressPercent = + total > 0 ? Math.round((progress / total) * 100) : 0; + setCreationProgress( + `${message} (${progress}/${total} - ${progressPercent}%)` + ); + } + } else { + console.log( + `Ignoring progress for task ${data.task_id}, active task is ${activeTaskId}` + ); + } + } + } + }, [wsValue, activeTaskId, fetchData]); + return ( <> { disabled={selectedStreamIds.length == 0 || isCreatingChannels} loading={isCreatingChannels} > - {isCreatingChannels - ? (creationProgress || 'Creating Channels...') - : `Create Channels (${selectedStreamIds.length})` - } + {isCreatingChannels + ? creationProgress || 'Creating Channels...' + : `Create Channels (${selectedStreamIds.length})`}