Switch bulk epg name and logo to backend celery tasks for efficiency and scrape epg channel logo during epg scanning.

This commit is contained in:
SergeantPanda 2025-09-16 17:17:07 -05:00
parent 8607d626fa
commit d2d1984797
9 changed files with 437 additions and 115 deletions

View file

@ -493,6 +493,68 @@ class ChannelViewSet(viewsets.ModelViewSet):
"channels": serialized_channels
})
@action(detail=False, methods=["post"], url_path="set-names-from-epg")
def set_names_from_epg(self, request):
"""
Trigger a Celery task to set channel names from EPG data
"""
from .tasks import set_channels_names_from_epg
data = request.data
channel_ids = data.get("channel_ids", [])
if not channel_ids:
return Response(
{"error": "channel_ids is required"},
status=status.HTTP_400_BAD_REQUEST,
)
if not isinstance(channel_ids, list):
return Response(
{"error": "channel_ids must be a list"},
status=status.HTTP_400_BAD_REQUEST,
)
# Start the Celery task
task = set_channels_names_from_epg.delay(channel_ids)
return Response({
"message": f"Started EPG name setting task for {len(channel_ids)} channels",
"task_id": task.id,
"channel_count": len(channel_ids)
})
@action(detail=False, methods=["post"], url_path="set-logos-from-epg")
def set_logos_from_epg(self, request):
"""
Trigger a Celery task to set channel logos from EPG data
"""
from .tasks import set_channels_logos_from_epg
data = request.data
channel_ids = data.get("channel_ids", [])
if not channel_ids:
return Response(
{"error": "channel_ids is required"},
status=status.HTTP_400_BAD_REQUEST,
)
if not isinstance(channel_ids, list):
return Response(
{"error": "channel_ids must be a list"},
status=status.HTTP_400_BAD_REQUEST,
)
# Start the Celery task
task = set_channels_logos_from_epg.delay(channel_ids)
return Response({
"message": f"Started EPG logo setting task for {len(channel_ids)} channels",
"task_id": task.id,
"channel_count": len(channel_ids)
})
@action(detail=False, methods=["get"], url_path="ids")
def get_ids(self, request, *args, **kwargs):
# Get the filtered queryset

View file

@ -2465,3 +2465,227 @@ def bulk_create_channels_from_streams(self, stream_ids, channel_profile_ids=None
'error': str(e)
})
raise
@shared_task(bind=True)
def set_channels_names_from_epg(self, channel_ids):
"""
Celery task to set channel names from EPG data for multiple channels
"""
from core.utils import send_websocket_update
task_id = self.request.id
total_channels = len(channel_ids)
updated_count = 0
errors = []
try:
logger.info(f"Starting EPG name setting task for {total_channels} channels")
# Send initial progress
send_websocket_update('updates', 'update', {
'type': 'epg_name_setting_progress',
'task_id': task_id,
'progress': 0,
'total': total_channels,
'status': 'running',
'message': 'Starting EPG name setting...'
})
batch_size = 100
for i in range(0, total_channels, batch_size):
batch_ids = channel_ids[i:i + batch_size]
batch_updates = []
# Get channels and their EPG data
channels = Channel.objects.filter(id__in=batch_ids).select_related('epg_data')
for channel in channels:
try:
if channel.epg_data and channel.epg_data.name:
if channel.name != channel.epg_data.name:
channel.name = channel.epg_data.name
batch_updates.append(channel)
updated_count += 1
except Exception as e:
errors.append(f"Channel {channel.id}: {str(e)}")
logger.error(f"Error processing channel {channel.id}: {e}")
# Bulk update the batch
if batch_updates:
Channel.objects.bulk_update(batch_updates, ['name'])
# Send progress update
progress = min(i + batch_size, total_channels)
send_websocket_update('updates', 'update', {
'type': 'epg_name_setting_progress',
'task_id': task_id,
'progress': progress,
'total': total_channels,
'status': 'running',
'message': f'Updated {updated_count} channel names...',
'updated_count': updated_count
})
# Send completion notification
send_websocket_update('updates', 'update', {
'type': 'epg_name_setting_progress',
'task_id': task_id,
'progress': total_channels,
'total': total_channels,
'status': 'completed',
'message': f'Successfully updated {updated_count} channel names from EPG data',
'updated_count': updated_count,
'error_count': len(errors),
'errors': errors
})
logger.info(f"EPG name setting task completed. Updated {updated_count} channels")
return {
'status': 'completed',
'updated_count': updated_count,
'error_count': len(errors),
'errors': errors
}
except Exception as e:
logger.error(f"EPG name setting task failed: {e}")
send_websocket_update('updates', 'update', {
'type': 'epg_name_setting_progress',
'task_id': task_id,
'progress': 0,
'total': total_channels,
'status': 'failed',
'message': f'Task failed: {str(e)}',
'error': str(e)
})
raise
@shared_task(bind=True)
def set_channels_logos_from_epg(self, channel_ids):
"""
Celery task to set channel logos from EPG data for multiple channels
Creates logos from EPG icon URLs if they don't exist
"""
from .models import Logo
from core.utils import send_websocket_update
import requests
from urllib.parse import urlparse
task_id = self.request.id
total_channels = len(channel_ids)
updated_count = 0
created_logos_count = 0
errors = []
try:
logger.info(f"Starting EPG logo setting task for {total_channels} channels")
# Send initial progress
send_websocket_update('updates', 'update', {
'type': 'epg_logo_setting_progress',
'task_id': task_id,
'progress': 0,
'total': total_channels,
'status': 'running',
'message': 'Starting EPG logo setting...'
})
batch_size = 50 # Smaller batch for logo processing
for i in range(0, total_channels, batch_size):
batch_ids = channel_ids[i:i + batch_size]
batch_updates = []
# Get channels and their EPG data
channels = Channel.objects.filter(id__in=batch_ids).select_related('epg_data', 'logo')
for channel in channels:
try:
if channel.epg_data and channel.epg_data.icon_url:
icon_url = channel.epg_data.icon_url.strip()
# Try to find existing logo with this URL
try:
logo = Logo.objects.get(url=icon_url)
except Logo.DoesNotExist:
# Create new logo from EPG icon URL
try:
# Generate a name for the logo
logo_name = channel.epg_data.name or f"Logo for {channel.epg_data.tvg_id}"
# Create the logo record
logo = Logo.objects.create(
name=logo_name,
url=icon_url
)
created_logos_count += 1
logger.info(f"Created new logo from EPG: {logo_name} - {icon_url}")
except Exception as create_error:
errors.append(f"Channel {channel.id}: Failed to create logo from {icon_url}: {str(create_error)}")
logger.error(f"Failed to create logo for channel {channel.id}: {create_error}")
continue
# Update channel logo if different
if channel.logo != logo:
channel.logo = logo
batch_updates.append(channel)
updated_count += 1
except Exception as e:
errors.append(f"Channel {channel.id}: {str(e)}")
logger.error(f"Error processing channel {channel.id}: {e}")
# Bulk update the batch
if batch_updates:
Channel.objects.bulk_update(batch_updates, ['logo'])
# Send progress update
progress = min(i + batch_size, total_channels)
send_websocket_update('updates', 'update', {
'type': 'epg_logo_setting_progress',
'task_id': task_id,
'progress': progress,
'total': total_channels,
'status': 'running',
'message': f'Updated {updated_count} channel logos, created {created_logos_count} new logos...',
'updated_count': updated_count,
'created_logos_count': created_logos_count
})
# Send completion notification
send_websocket_update('updates', 'update', {
'type': 'epg_logo_setting_progress',
'task_id': task_id,
'progress': total_channels,
'total': total_channels,
'status': 'completed',
'message': f'Successfully updated {updated_count} channel logos and created {created_logos_count} new logos from EPG data',
'updated_count': updated_count,
'created_logos_count': created_logos_count,
'error_count': len(errors),
'errors': errors
})
logger.info(f"EPG logo setting task completed. Updated {updated_count} channels, created {created_logos_count} logos")
return {
'status': 'completed',
'updated_count': updated_count,
'created_logos_count': created_logos_count,
'error_count': len(errors),
'errors': errors
}
except Exception as e:
logger.error(f"EPG logo setting task failed: {e}")
send_websocket_update('updates', 'update', {
'type': 'epg_logo_setting_progress',
'task_id': task_id,
'progress': 0,
'total': total_channels,
'status': 'failed',
'message': f'Task failed: {str(e)}',
'error': str(e)
})
raise

View file

@ -0,0 +1,18 @@
# Generated by Django 5.2.4 on 2025-09-16 22:01
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('epg', '0015_alter_programdata_custom_properties'),
]
operations = [
migrations.AddField(
model_name='epgdata',
name='icon_url',
field=models.URLField(blank=True, max_length=500, null=True),
),
]

View file

@ -127,6 +127,7 @@ class EPGData(models.Model):
# and a name (which might simply be the tvg_id if no real channel exists).
tvg_id = models.CharField(max_length=255, null=True, blank=True, db_index=True)
name = models.CharField(max_length=255)
icon_url = models.URLField(max_length=500, null=True, blank=True)
epg_source = models.ForeignKey(
EPGSource,
on_delete=models.CASCADE,

View file

@ -52,5 +52,6 @@ class EPGDataSerializer(serializers.ModelSerializer):
'id',
'tvg_id',
'name',
'icon_url',
'epg_source',
]

View file

@ -873,10 +873,12 @@ def parse_channels_only(source):
tvg_id = elem.get('id', '').strip()
if tvg_id:
display_name = None
icon_url = None
for child in elem:
if child.tag == 'display-name' and child.text:
display_name = child.text.strip()
break
elif child.tag == 'icon':
icon_url = child.get('src', '').strip()
if not display_name:
display_name = tvg_id
@ -894,17 +896,24 @@ def parse_channels_only(source):
epgs_to_create.append(EPGData(
tvg_id=tvg_id,
name=display_name,
icon_url=icon_url,
epg_source=source,
))
logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 1: {tvg_id} - {display_name}")
processed_channels += 1
continue
# We use the cached object to check if the name has changed
# We use the cached object to check if the name or icon_url has changed
epg_obj = existing_epgs[tvg_id]
needs_update = False
if epg_obj.name != display_name:
# Only update if the name actually changed
epg_obj.name = display_name
needs_update = True
if epg_obj.icon_url != icon_url:
epg_obj.icon_url = icon_url
needs_update = True
if needs_update:
epgs_to_update.append(epg_obj)
logger.debug(f"[parse_channels_only] Added channel to update to epgs_to_update: {tvg_id} - {display_name}")
else:
@ -915,6 +924,7 @@ def parse_channels_only(source):
epgs_to_create.append(EPGData(
tvg_id=tvg_id,
name=display_name,
icon_url=icon_url,
epg_source=source,
))
logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 2: {tvg_id} - {display_name}")
@ -937,7 +947,7 @@ def parse_channels_only(source):
logger.info(f"[parse_channels_only] Bulk updating {len(epgs_to_update)} EPG entries")
if process:
logger.info(f"[parse_channels_only] Memory before bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB")
EPGData.objects.bulk_update(epgs_to_update, ["name"])
EPGData.objects.bulk_update(epgs_to_update, ["name", "icon_url"])
if process:
logger.info(f"[parse_channels_only] Memory after bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB")
epgs_to_update = []
@ -1004,7 +1014,7 @@ def parse_channels_only(source):
logger.debug(f"[parse_channels_only] Created final batch of {len(epgs_to_create)} EPG entries")
if epgs_to_update:
EPGData.objects.bulk_update(epgs_to_update, ["name"])
EPGData.objects.bulk_update(epgs_to_update, ["name", "icon_url"])
logger.debug(f"[parse_channels_only] Updated final batch of {len(epgs_to_update)} EPG entries")
if process:
logger.debug(f"[parse_channels_only] Memory after final batch creation: {process.memory_info().rss / 1024 / 1024:.2f} MB")

View file

@ -516,6 +516,52 @@ export default class API {
}
}
static async setChannelNamesFromEpg(channelIds) {
try {
const response = await request(
`${host}/api/channels/channels/set-names-from-epg/`,
{
method: 'POST',
body: { channel_ids: channelIds },
}
);
notifications.show({
title: 'Task Started',
message: response.message,
color: 'blue',
});
return response;
} catch (e) {
errorNotification('Failed to start EPG name setting task', e);
throw e;
}
}
static async setChannelLogosFromEpg(channelIds) {
try {
const response = await request(
`${host}/api/channels/channels/set-logos-from-epg/`,
{
method: 'POST',
body: { channel_ids: channelIds },
}
);
notifications.show({
title: 'Task Started',
message: response.message,
color: 'blue',
});
return response;
} catch (e) {
errorNotification('Failed to start EPG logo setting task', e);
throw e;
}
}
static async assignChannelNumbers(channelIds, startingNum = 1) {
try {
const response = await request(`${host}/api/channels/channels/assign/`, {

View file

@ -207,21 +207,19 @@ const ChannelForm = ({ channel = null, isOpen, onClose }) => {
}
const tvg = tvgsById[epgDataId];
if (!tvg || !tvg.name) {
if (!tvg || !tvg.icon_url) {
notifications.show({
title: 'No EPG Name',
message: 'EPG data does not have a name to match against logos.',
title: 'No EPG Icon',
message: 'EPG data does not have an icon URL.',
color: 'orange',
});
return;
}
try {
// Try to find a logo that matches the EPG name
const matchingLogo = Object.values(logos).find(
(logo) =>
logo.name.toLowerCase().includes(tvg.name.toLowerCase()) ||
tvg.name.toLowerCase().includes(logo.name.toLowerCase())
// Try to find a logo that matches the EPG icon URL
let matchingLogo = Object.values(logos).find(
(logo) => logo.url === tvg.icon_url
);
if (matchingLogo) {
@ -232,11 +230,47 @@ const ChannelForm = ({ channel = null, isOpen, onClose }) => {
color: 'green',
});
} else {
// Logo doesn't exist - create it
notifications.show({
title: 'No Matching Logo',
message: `No existing logo found that matches "${tvg.name}". Consider uploading a logo or using the smart logo selection.`,
color: 'orange',
id: 'creating-logo',
title: 'Creating Logo',
message: `Creating new logo from EPG icon URL...`,
loading: true,
});
try {
const newLogoData = {
name: tvg.name || `Logo for ${tvg.icon_url}`,
url: tvg.icon_url,
};
// Create logo by calling the Logo API directly
const newLogo = await API.createLogo(newLogoData);
formik.setFieldValue('logo_id', newLogo.id);
// Refresh logos to update the cache
await ensureLogosLoaded();
notifications.update({
id: 'creating-logo',
title: 'Success',
message: `Created and assigned new logo "${newLogo.name}"`,
loading: false,
color: 'green',
autoClose: 5000,
});
} catch (createError) {
notifications.update({
id: 'creating-logo',
title: 'Error',
message: 'Failed to create logo from EPG icon URL',
loading: false,
color: 'red',
autoClose: 5000,
});
throw createError;
}
}
} catch (error) {
notifications.show({
@ -751,7 +785,7 @@ const ChannelForm = ({ channel = null, isOpen, onClose }) => {
variant="light"
onClick={handleSetLogoFromEpg}
disabled={!formik.values.epg_data_id}
title="Find matching logo based on EPG name"
title="Find matching logo based on EPG icon URL"
>
Use EPG Logo
</Button>

View file

@ -2,9 +2,7 @@ import React, { useState, useEffect, useMemo, useRef } from 'react';
import useChannelsStore from '../../store/channels';
import API from '../../api';
import useStreamProfilesStore from '../../store/streamProfiles';
import useEPGsStore from '../../store/epgs';
import ChannelGroupForm from './ChannelGroup';
import { useLogoSelection } from '../../hooks/useSmartLogos';
import {
Box,
Button,
@ -38,18 +36,8 @@ const ChannelBatchForm = ({ channelIds, isOpen, onClose }) => {
const groupListRef = useRef(null);
const channelGroups = useChannelsStore((s) => s.channelGroups);
const canEditChannelGroup = useChannelsStore((s) => s.canEditChannelGroup);
const streamProfiles = useStreamProfilesStore((s) => s.profiles);
const epgs = useEPGsStore((s) => s.epgs);
const tvgs = useEPGsStore((s) => s.tvgs);
const tvgsById = useEPGsStore((s) => s.tvgsById);
const {
logos,
ensureLogosLoaded,
isLoading: logosLoading,
} = useLogoSelection();
const [channelGroupModelOpen, setChannelGroupModalOpen] = useState(false);
const [selectedChannelGroup, setSelectedChannelGroup] = useState('-1');
@ -157,47 +145,24 @@ const ChannelBatchForm = ({ channelIds, isOpen, onClose }) => {
}
try {
const channelsMap = useChannelsStore.getState().channels;
const updates = [];
for (const id of channelIds) {
const channel = channelsMap[id];
if (channel && channel.epg_data_id) {
const tvg = tvgsById[channel.epg_data_id];
if (tvg && tvg.name) {
updates.push({
id,
name: tvg.name,
});
}
}
}
if (updates.length === 0) {
notifications.show({
title: 'No Updates Available',
message: 'No selected channels have EPG data with names.',
color: 'orange',
});
return;
}
await API.bulkUpdateChannels(updates);
await Promise.all([
API.requeryChannels(),
useChannelsStore.getState().fetchChannels(),
]);
// Start the backend task
await API.setChannelNamesFromEpg(channelIds);
// The task will send WebSocket updates for progress
// Just show that it started successfully
notifications.show({
title: 'Success',
message: `Updated names for ${updates.length} channels from EPG data.`,
color: 'green',
title: 'Task Started',
message: `Started setting names from EPG for ${channelIds.length} channels. Progress will be shown in notifications.`,
color: 'blue',
});
// Close the modal since the task is now running in background
onClose();
} catch (error) {
console.error('Failed to set names from EPG:', error);
console.error('Failed to start EPG name setting task:', error);
notifications.show({
title: 'Error',
message: 'Failed to set names from EPG data.',
message: 'Failed to start EPG name setting task.',
color: 'red',
});
}
@ -214,63 +179,24 @@ const ChannelBatchForm = ({ channelIds, isOpen, onClose }) => {
}
try {
// Ensure logos are loaded first
await ensureLogosLoaded();
const channelsMap = useChannelsStore.getState().channels;
const updates = [];
for (const id of channelIds) {
const channel = channelsMap[id];
if (channel && channel.epg_data_id) {
const tvg = tvgsById[channel.epg_data_id];
if (tvg && tvg.name) {
// Try to find a matching logo
const matchingLogo = Object.values(logos).find(
(logo) =>
logo.name.toLowerCase().includes(tvg.name.toLowerCase()) ||
tvg.name.toLowerCase().includes(logo.name.toLowerCase())
);
if (matchingLogo) {
updates.push({
id,
logo_id: matchingLogo.id,
});
}
}
}
}
if (updates.length === 0) {
notifications.show({
title: 'No Matching Logos',
message:
'No matching logos found for the selected channels based on their EPG names.',
color: 'orange',
});
return;
}
await API.bulkUpdateChannels(updates);
// Refresh both channels and logos data
await Promise.all([
API.requeryChannels(),
useChannelsStore.getState().fetchChannels(),
ensureLogosLoaded(), // Ensure logos are refreshed
]);
// Start the backend task
await API.setChannelLogosFromEpg(channelIds);
// The task will send WebSocket updates for progress
// Just show that it started successfully
notifications.show({
title: 'Success',
message: `Updated logos for ${updates.length} channels based on EPG names.`,
color: 'green',
title: 'Task Started',
message: `Started setting logos from EPG for ${channelIds.length} channels. Progress will be shown in notifications.`,
color: 'blue',
});
// Close the modal since the task is now running in background
onClose();
} catch (error) {
console.error('Failed to set logos from EPG:', error);
console.error('Failed to start EPG logo setting task:', error);
notifications.show({
title: 'Error',
message: 'Failed to set logos from EPG data.',
message: 'Failed to start EPG logo setting task.',
color: 'red',
});
}