Implement asynchronous bulk channel creation from stream IDs with WebSocket progress updates

This commit is contained in:
SergeantPanda 2025-09-13 19:59:43 -05:00
parent 5875c31750
commit cf7ea09341
5 changed files with 499 additions and 94 deletions

View file

@ -202,7 +202,7 @@ class StreamViewSet(viewsets.ModelViewSet):
{"error": "ids must be a list of integers"},
status=status.HTTP_400_BAD_REQUEST,
)
streams = Stream.objects.filter(id__in=ids)
serializer = self.get_serializer(streams, many=True)
return Response(serializer.data)
@ -958,6 +958,65 @@ class ChannelViewSet(viewsets.ModelViewSet):
return Response(response_data, status=status.HTTP_201_CREATED)
@swagger_auto_schema(
method="post",
operation_description=(
"Asynchronously bulk create channels from stream IDs. "
"Returns a task ID to track progress via WebSocket. "
"This is the recommended approach for large bulk operations."
),
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["stream_ids"],
properties={
"stream_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_INTEGER),
description="List of stream IDs to create channels from"
),
"channel_profile_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_INTEGER),
description="(Optional) Channel profile ID(s) to add the channels to. If not provided, channels are added to all profiles."
),
},
),
responses={202: "Task started successfully"},
)
@action(detail=False, methods=["post"], url_path="from-stream/bulk-async")
def from_stream_bulk_async(self, request):
from .tasks import bulk_create_channels_from_streams
stream_ids = request.data.get("stream_ids", [])
channel_profile_ids = request.data.get("channel_profile_ids")
if not stream_ids:
return Response(
{"error": "stream_ids is required and cannot be empty"},
status=status.HTTP_400_BAD_REQUEST,
)
if not isinstance(stream_ids, list):
return Response(
{"error": "stream_ids must be a list of integers"},
status=status.HTTP_400_BAD_REQUEST,
)
# Normalize channel_profile_ids to array if single ID provided
if channel_profile_ids is not None:
if not isinstance(channel_profile_ids, list):
channel_profile_ids = [channel_profile_ids]
# Start the async task
task = bulk_create_channels_from_streams.delay(stream_ids, channel_profile_ids)
return Response({
"task_id": task.id,
"message": f"Bulk channel creation task started for {len(stream_ids)} streams",
"stream_count": len(stream_ids),
"status": "started"
}, status=status.HTTP_202_ACCEPTED)
# ─────────────────────────────────────────────────────────
# 6) EPG Fuzzy Matching
# ─────────────────────────────────────────────────────────

View file

@ -1597,3 +1597,275 @@ def prefetch_recording_artwork(recording_id):
except Exception as e:
logger.debug(f"prefetch_recording_artwork failed: {e}")
return f"error: {e}"
@shared_task(bind=True)
def bulk_create_channels_from_streams(self, stream_ids, channel_profile_ids=None):
"""
Asynchronously create channels from a list of stream IDs.
Provides progress updates via WebSocket.
"""
from apps.channels.models import Stream, Channel, ChannelGroup, ChannelProfile, ChannelProfileMembership, Logo
from apps.epg.models import EPGData
from django.db import transaction
from django.shortcuts import get_object_or_404
from core.utils import send_websocket_update
task_id = self.request.id
total_streams = len(stream_ids)
created_channels = []
errors = []
try:
# Send initial progress update
send_websocket_update('updates', 'update', {
'type': 'bulk_channel_creation_progress',
'task_id': task_id,
'progress': 0,
'total': total_streams,
'status': 'starting',
'message': f'Starting bulk creation of {total_streams} channels...'
})
# Gather current used numbers once
used_numbers = set(Channel.objects.all().values_list("channel_number", flat=True))
next_number = 1
def get_auto_number():
nonlocal next_number
while next_number in used_numbers:
next_number += 1
used_numbers.add(next_number)
return next_number
logos_to_create = []
channels_to_create = []
streams_map = []
logo_map = []
profile_map = []
# Process streams in batches to avoid memory issues
batch_size = 100
processed = 0
for i in range(0, total_streams, batch_size):
batch_stream_ids = stream_ids[i:i + batch_size]
batch_streams = Stream.objects.filter(id__in=batch_stream_ids)
# Send progress update
send_websocket_update('updates', 'update', {
'type': 'bulk_channel_creation_progress',
'task_id': task_id,
'progress': processed,
'total': total_streams,
'status': 'processing',
'message': f'Processing streams {processed + 1}-{min(processed + batch_size, total_streams)} of {total_streams}...'
})
for stream in batch_streams:
try:
name = stream.name
channel_group = stream.channel_group
stream_custom_props = stream.custom_properties or {}
# Determine channel number
channel_number = None
if "tvg-chno" in stream_custom_props:
channel_number = float(stream_custom_props["tvg-chno"])
elif "channel-number" in stream_custom_props:
channel_number = float(stream_custom_props["channel-number"])
elif "num" in stream_custom_props:
channel_number = float(stream_custom_props["num"])
# Get TVC guide station ID
tvc_guide_stationid = None
if "tvc-guide-stationid" in stream_custom_props:
tvc_guide_stationid = stream_custom_props["tvc-guide-stationid"]
if channel_number is None:
channel_number = get_auto_number()
elif (
channel_number in used_numbers
or Channel.objects.filter(channel_number=channel_number).exists()
):
channel_number = get_auto_number()
else:
used_numbers.add(channel_number)
channel_data = {
"channel_number": channel_number,
"name": name,
"tvc_guide_stationid": tvc_guide_stationid,
"tvg_id": stream.tvg_id,
}
# Only add channel_group_id if the stream has a channel group
if channel_group:
channel_data["channel_group_id"] = channel_group.id
# Attempt to find existing EPGs with the same tvg-id
epgs = EPGData.objects.filter(tvg_id=stream.tvg_id)
if epgs:
channel_data["epg_data_id"] = epgs.first().id
channel = Channel(**channel_data)
channels_to_create.append(channel)
streams_map.append([stream.id])
# Store profile IDs for this channel
profile_map.append(channel_profile_ids)
# Handle logo
if stream.logo_url:
logos_to_create.append(
Logo(
url=stream.logo_url,
name=stream.name or stream.tvg_id,
)
)
logo_map.append(stream.logo_url)
else:
logo_map.append(None)
processed += 1
except Exception as e:
errors.append({
'stream_id': stream.id if 'stream' in locals() else 'unknown',
'error': str(e)
})
processed += 1
# Create logos first
if logos_to_create:
send_websocket_update('updates', 'update', {
'type': 'bulk_channel_creation_progress',
'task_id': task_id,
'progress': processed,
'total': total_streams,
'status': 'creating_logos',
'message': f'Creating {len(logos_to_create)} logos...'
})
Logo.objects.bulk_create(logos_to_create, ignore_conflicts=True)
# Get logo objects for association
channel_logos = {
logo.url: logo
for logo in Logo.objects.filter(
url__in=[url for url in logo_map if url is not None]
)
}
# Create channels in database
if channels_to_create:
send_websocket_update('updates', 'update', {
'type': 'bulk_channel_creation_progress',
'task_id': task_id,
'progress': processed,
'total': total_streams,
'status': 'creating_channels',
'message': f'Creating {len(channels_to_create)} channels in database...'
})
with transaction.atomic():
created_channels = Channel.objects.bulk_create(channels_to_create)
# Update channels with logos and create stream associations
update = []
channel_stream_associations = []
channel_profile_memberships = []
for channel, stream_ids, logo_url, profile_ids in zip(
created_channels, streams_map, logo_map, profile_map
):
if logo_url:
channel.logo = channel_logos[logo_url]
update.append(channel)
# Create stream associations
for stream_id in stream_ids:
from apps.channels.models import ChannelStream
channel_stream_associations.append(
ChannelStream(channel=channel, stream_id=stream_id, order=0)
)
# Handle channel profile membership
if profile_ids:
try:
specific_profiles = ChannelProfile.objects.filter(id__in=profile_ids)
channel_profile_memberships.extend([
ChannelProfileMembership(
channel_profile=profile,
channel=channel,
enabled=True
)
for profile in specific_profiles
])
except Exception as e:
errors.append({
'channel_id': channel.id,
'error': f'Failed to add to profiles: {str(e)}'
})
else:
# Add to all profiles by default
all_profiles = ChannelProfile.objects.all()
channel_profile_memberships.extend([
ChannelProfileMembership(
channel_profile=profile,
channel=channel,
enabled=True
)
for profile in all_profiles
])
# Bulk update channels with logos
if update:
Channel.objects.bulk_update(update, ["logo"])
# Bulk create channel-stream associations
if channel_stream_associations:
from apps.channels.models import ChannelStream
ChannelStream.objects.bulk_create(channel_stream_associations, ignore_conflicts=True)
# Bulk create profile memberships
if channel_profile_memberships:
ChannelProfileMembership.objects.bulk_create(channel_profile_memberships, ignore_conflicts=True)
# Send completion update
send_websocket_update('updates', 'update', {
'type': 'bulk_channel_creation_progress',
'task_id': task_id,
'progress': total_streams,
'total': total_streams,
'status': 'completed',
'message': f'Successfully created {len(created_channels)} channels',
'created_count': len(created_channels),
'error_count': len(errors),
'errors': errors[:10] # Send first 10 errors only
})
# Send general channel update notification
send_websocket_update('updates', 'update', {
'type': 'channels_created',
'count': len(created_channels)
})
return {
'status': 'completed',
'created_count': len(created_channels),
'error_count': len(errors),
'errors': errors
}
except Exception as e:
logger.error(f"Bulk channel creation failed: {e}")
send_websocket_update('updates', 'update', {
'type': 'bulk_channel_creation_progress',
'task_id': task_id,
'progress': 0,
'total': total_streams,
'status': 'failed',
'message': f'Task failed: {str(e)}',
'error': str(e)
})
raise

View file

@ -549,6 +549,48 @@ export const WebsocketProvider = ({ children }) => {
});
break;
case 'channels_created':
// General notification for channel creation
notifications.show({
title: 'Channels Created',
message: `Successfully created ${parsedEvent.data.count || 'multiple'} channels`,
color: 'green',
autoClose: 4000,
});
// Refresh the channels table to show new channels
try {
await useChannelsStore.getState().fetchChannels();
console.log('Channels refreshed after bulk creation');
} catch (error) {
console.error(
'Error refreshing channels after creation:',
error
);
}
break;
case 'bulk_channel_creation_progress':
// Handle completion status globally to refresh channels
if (parsedEvent.data.status === 'completed') {
try {
await API.requeryChannels();
console.log(
'Channels refreshed after bulk creation completion'
);
} catch (error) {
console.error(
'Error refreshing channels after bulk completion:',
error
);
}
}
// Pass through to individual components for progress updates
setVal(parsedEvent);
break;
default:
console.error(
`Unknown websocket event type: ${parsedEvent.data?.type}`

View file

@ -570,6 +570,31 @@ export default class API {
}
}
static async createChannelsFromStreamsAsync(streamIds, channelProfileIds = null) {
try {
const requestBody = {
stream_ids: streamIds,
};
if (channelProfileIds !== null) {
requestBody.channel_profile_ids = channelProfileIds;
}
const response = await request(
`${host}/api/channels/channels/from-stream/bulk-async/`,
{
method: 'POST',
body: requestBody,
}
);
return response;
} catch (e) {
errorNotification('Failed to start bulk channel creation task', e);
throw e;
}
}
static async getStreams(ids = null) {
try {
const params = new URLSearchParams();

View file

@ -1,10 +1,17 @@
import React, { useEffect, useMemo, useCallback, useState } from 'react';
import React, {
useEffect,
useMemo,
useCallback,
useState,
useContext,
} from 'react';
import API from '../../api';
import StreamForm from '../forms/Stream';
import usePlaylistsStore from '../../store/playlists';
import useChannelsStore from '../../store/channels';
import useLogosStore from '../../store/logos';
import { copyToClipboard, useDebounce } from '../../utils';
import { WebsocketContext } from '../../WebSocket';
import {
SquarePlus,
ListPlus,
@ -198,6 +205,10 @@ const StreamsTable = () => {
const [selectedStreamIds, setSelectedStreamIds] = useState([]);
const [isCreatingChannels, setIsCreatingChannels] = useState(false);
const [creationProgress, setCreationProgress] = useState('');
const [activeTaskId, setActiveTaskId] = useState(null);
// WebSocket context for real-time updates
const [, , wsValue] = useContext(WebsocketContext);
// const [allRowsSelected, setAllRowsSelected] = useState(false);
// Add local storage for page size
@ -421,104 +432,38 @@ const StreamsTable = () => {
]);
// Bulk creation: create channels from selected streams in one API call
// Bulk creation: create channels from selected streams asynchronously
const createChannelsFromStreams = async () => {
setIsLoading(true);
if (selectedStreamIds.length === 0) return;
setIsCreatingChannels(true);
setCreationProgress('Preparing channel creation...');
setCreationProgress('Starting bulk channel creation...');
try {
const selectedChannelProfileId =
useChannelsStore.getState().selectedProfileId;
// For very large selections (>1000), process in smaller batches
const BATCH_SIZE = 1000;
const shouldBatch = selectedStreamIds.length > BATCH_SIZE;
if (shouldBatch) {
console.log(`Processing ${selectedStreamIds.length} streams in batches of ${BATCH_SIZE}`);
for (let i = 0; i < selectedStreamIds.length; i += BATCH_SIZE) {
const batchIds = selectedStreamIds.slice(i, i + BATCH_SIZE);
const batchNumber = Math.floor(i / BATCH_SIZE) + 1;
const totalBatches = Math.ceil(selectedStreamIds.length / BATCH_SIZE);
setCreationProgress(`Processing batch ${batchNumber}/${totalBatches} (${batchIds.length} streams)...`);
console.log(`Processing batch ${batchNumber}/${totalBatches}`);
// Try to fetch the actual stream data for this batch
let streamsData = [];
try {
streamsData = await API.getStreamsByIds(batchIds);
console.log(`Successfully fetched ${streamsData.length} streams for batch`);
} catch (error) {
console.warn('Could not fetch stream details for batch, using IDs only:', error);
}
// Use the async API for all bulk operations
const response = await API.createChannelsFromStreamsAsync(
selectedStreamIds,
selectedChannelProfileId !== '0' ? [selectedChannelProfileId] : null
);
const streamData = batchIds.map((streamId) => {
const stream = streamsData.find((s) => s.id === streamId);
return {
stream_id: streamId,
name: stream?.name || `Stream ${streamId}`,
...(selectedChannelProfileId !== '0' && {
channel_profile_ids: selectedChannelProfileId,
}),
};
});
setActiveTaskId(response.task_id);
setCreationProgress(
`Task started for ${response.stream_count} streams. Processing in background...`
);
await API.createChannelsFromStreams(streamData);
}
} else {
// Process all at once for smaller selections
setCreationProgress(`Fetching stream data for ${selectedStreamIds.length} streams...`);
let streamsData = [];
try {
streamsData = await API.getStreamsByIds(selectedStreamIds);
console.log(`Successfully fetched ${streamsData.length} streams`);
} catch (error) {
console.warn('Could not fetch stream details, using IDs only:', error);
// If fetching streams fails, we'll create channels with fallback names
// This happens when there are too many streams or network issues
}
setCreationProgress(`Creating ${selectedStreamIds.length} channels...`);
const streamData = selectedStreamIds.map((streamId) => {
const stream = streamsData.find((s) => s.id === streamId);
return {
stream_id: streamId,
name: stream?.name || `Stream ${streamId}`,
...(selectedChannelProfileId !== '0' && {
channel_profile_ids: selectedChannelProfileId,
}),
};
});
console.log(`Creating ${streamData.length} channels from selected streams`);
await API.createChannelsFromStreams(streamData);
}
setCreationProgress('Refreshing channels...');
await API.requeryChannels();
// Refresh channel profiles to update the membership information
await useChannelsStore.getState().fetchChannelProfiles();
fetchLogos();
// Clear selection and refresh data
setSelectedStreamIds([]);
await fetchData();
setCreationProgress('');
console.log(
`Bulk creation task started: ${response.task_id} for ${response.stream_count} streams`
);
console.log('Active task ID set to:', response.task_id);
} catch (error) {
console.error('Error creating channels from streams:', error);
setCreationProgress('Error occurred during channel creation');
} finally {
setIsLoading(false);
console.error('Error starting bulk channel creation:', error);
setCreationProgress('Error starting bulk channel creation');
setIsCreatingChannels(false);
// Clear progress after a short delay to let user see completion/error
setTimeout(() => setCreationProgress(''), 3000);
// Clear error message after delay
setTimeout(() => setCreationProgress(''), 5000);
}
};
@ -744,6 +689,69 @@ const StreamsTable = () => {
fetchData();
}, [fetchData]);
// Listen for WebSocket updates for bulk creation progress
useEffect(() => {
if (wsValue) {
console.log('WebSocket message received:', wsValue);
if (
wsValue.data &&
wsValue.data.type === 'bulk_channel_creation_progress'
) {
const data = wsValue.data;
console.log('Bulk creation progress update:', data);
// Only handle progress for our active task
if (activeTaskId && data.task_id === activeTaskId) {
const progress = data.progress || 0;
const total = data.total || 0;
const status = data.status;
const message = data.message;
console.log(
`Task ${activeTaskId} progress: ${progress}/${total} (${status})`
);
if (status === 'completed') {
setCreationProgress(
`✅ Completed! Created ${data.created_count} channels`
);
setIsCreatingChannels(false);
setActiveTaskId(null);
// Clear selection and refresh data
setSelectedStreamIds([]);
fetchData();
// Note: API.requeryChannels() is called by WebSocket handler globally
// Clear success message after delay
setTimeout(() => setCreationProgress(''), 5000);
} else if (status === 'failed') {
setCreationProgress(
`❌ Task failed: ${data.error || 'Unknown error'}`
);
setIsCreatingChannels(false);
setActiveTaskId(null);
// Clear error message after longer delay
setTimeout(() => setCreationProgress(''), 10000);
} else {
// Update progress
const progressPercent =
total > 0 ? Math.round((progress / total) * 100) : 0;
setCreationProgress(
`${message} (${progress}/${total} - ${progressPercent}%)`
);
}
} else {
console.log(
`Ignoring progress for task ${data.task_id}, active task is ${activeTaskId}`
);
}
}
}
}, [wsValue, activeTaskId, fetchData]);
return (
<>
<Flex
@ -838,10 +846,9 @@ const StreamsTable = () => {
disabled={selectedStreamIds.length == 0 || isCreatingChannels}
loading={isCreatingChannels}
>
{isCreatingChannels
? (creationProgress || 'Creating Channels...')
: `Create Channels (${selectedStreamIds.length})`
}
{isCreatingChannels
? creationProgress || 'Creating Channels...'
: `Create Channels (${selectedStreamIds.length})`}
</Button>
<Button