refresh channel programs on epg change, fixed some notification bugs

This commit is contained in:
dekzter 2025-03-28 15:24:25 -04:00
parent dbee621d90
commit fa3af5ed6d
9 changed files with 150 additions and 115 deletions

View file

@ -77,13 +77,14 @@ class ChannelSerializer(serializers.ModelSerializer):
source="epg_data",
write_only=True,
required=False,
allow_null=True,
)
stream_profile_id = serializers.PrimaryKeyRelatedField(
queryset=StreamProfile.objects.all(),
source='stream_profile',
allow_null=True,
required=False
required=False,
)
streams = serializers.SerializerMethodField()
@ -137,7 +138,7 @@ class ChannelSerializer(serializers.ModelSerializer):
instance.name = validated_data.get('name', instance.name)
instance.logo_url = validated_data.get('logo_url', instance.logo_url)
instance.tvg_id = validated_data.get('tvg_id', instance.tvg_id)
instance.epg_data = validated_data.get('epg_data', instance.epg_data)
instance.epg_data = validated_data.get('epg_data', None)
# If serializer allows changing channel_group or stream_profile:
if 'channel_group' in validated_data:

View file

@ -1,9 +1,13 @@
# apps/channels/signals.py
from django.db.models.signals import m2m_changed, pre_save
from django.db.models.signals import m2m_changed, pre_save, post_save
from django.dispatch import receiver
from .models import Channel, Stream
from apps.m3u.models import M3UAccount
from apps.epg.tasks import parse_programs_for_tvg_id
import logging
logger = logging.getLogger(__name__)
@receiver(m2m_changed, sender=Channel.streams.through)
def update_channel_tvg_id_and_logo(sender, instance, action, reverse, model, pk_set, **kwargs):
@ -45,3 +49,8 @@ def set_default_m3u_account(sender, instance, **kwargs):
instance.m3u_account = default_account
else:
raise ValueError("No default M3UAccount found.")
@receiver(post_save, sender=Channel)
def refresh_epg_programs(sender, instance, created, **kwargs):
if instance.epg_data:
parse_programs_for_tvg_id.delay(instance.epg_data.id)

View file

@ -12,7 +12,6 @@ from django.db import transaction
from apps.channels.models import Channel
from apps.epg.models import EPGData, EPGSource
from core.models import CoreSettings
from apps.epg.tasks import parse_programs_for_tvg_id # <-- we import our new helper
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
@ -73,8 +72,7 @@ def match_epg_channels():
1) If channel.tvg_id is valid in EPGData, skip.
2) If channel has a tvg_id but not found in EPGData, attempt direct EPGData lookup.
3) Otherwise, perform name-based fuzzy matching with optional region-based bonus.
4) If a match is found, we set channel.tvg_id and also parse its programs
from the cached EPG file (parse_programs_for_tvg_id).
4) If a match is found, we set channel.tvg_id
5) Summarize and log results.
"""
logger.info("Starting EPG matching logic...")
@ -172,11 +170,6 @@ def match_epg_channels():
chan.epg_data = all_epg[best_epg["epg_id"]]
chan.save()
# Attempt to parse program data for this channel
if epg_file_path:
parse_programs_for_tvg_id(epg_file_path, all_epg[best_epg["epg_id"]])
logger.info(f"Loaded program data for tvg_id={best_epg['tvg_id']}")
matched_channels.append((chan.id, fallback_name, best_epg["tvg_id"]))
logger.info(
f"Channel {chan.id} '{fallback_name}' => matched tvg_id={best_epg['tvg_id']} "
@ -194,10 +187,6 @@ def match_epg_channels():
chan.epg_data = all_epg[matched_epg["epg_id"]]
chan.save()
if epg_file_path:
parse_programs_for_tvg_id(epg_file_path, all_epg[matched_epg["epg_id"]])
logger.info(f"Loaded program data for tvg_id={matched_epg['tvg_id']}")
matched_channels.append((chan.id, fallback_name, matched_epg["tvg_id"]))
logger.info(
f"Channel {chan.id} '{fallback_name}' => matched EPG tvg_id={matched_epg['tvg_id']} "

View file

@ -32,6 +32,8 @@ def refresh_epg_data():
logger.info(f"Processing EPGSource: {source.name} (type: {source.source_type})")
if source.source_type == 'xmltv':
fetch_xmltv(source)
parse_channels_only(source)
parse_programs_for_source(source)
elif source.source_type == 'schedules_direct':
fetch_schedules_direct(source)
@ -64,21 +66,12 @@ def fetch_xmltv(source):
source.file_path = file_path
source.save(update_fields=['file_path'])
# Now parse <channel> blocks only
parse_channels_only(source, file_path)
epg_entries = EPGData.objects.filter(epg_source=source)
for epg in epg_entries:
if epg.tvg_id:
if Channel.objects.filter(epg_data=epg).exists():
logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}")
parse_programs_for_tvg_id(file_path, epg)
except Exception as e:
logger.error(f"Error fetching XMLTV from {source.name}: {e}", exc_info=True)
def parse_channels_only(source, file_path):
def parse_channels_only(source):
file_path = source.file_path
logger.info(f"Parsing channels from EPG file: {file_path}")
existing_epgs = {e.tvg_id: e for e in EPGData.objects.filter(epg_source=source)}
@ -133,17 +126,27 @@ def parse_channels_only(source, file_path):
logger.info("Finished parsing channel info.")
@shared_task
def parse_programs_for_tvg_id(epg_id):
epg = EPGData.objects.get(id=epg_id)
epg_source = epg.epg_source
def parse_programs_for_tvg_id(file_path, epg):
logger.info(f"Parsing <programme> for tvg_id={epg.tvg_id} from {file_path}")
if not Channel.objects.filter(epg_data=epg).exists():
logger.info(f"No channels matched to EPG {epg.tvg_id}")
return
logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}")
# First, remove all existing programs
ProgramData.objects.filter(epg=epg).delete()
# Read entire file (decompress if .gz)
if file_path.endswith('.gz'):
with open(file_path, 'rb') as gz_file:
if epg_source.file_path.endswith('.gz'):
with open(epg_source.file_path, 'rb') as gz_file:
decompressed = gzip.decompress(gz_file.read())
xml_data = decompressed.decode('utf-8')
else:
with open(file_path, 'r', encoding='utf-8') as xml_file:
with open(epg_source.file_path, 'r', encoding='utf-8') as xml_file:
xml_data = xml_file.read()
root = ET.fromstring(xml_data)
@ -152,28 +155,32 @@ def parse_programs_for_tvg_id(file_path, epg):
matched_programmes = [p for p in root.findall('programme') if p.get('channel') == epg.tvg_id]
logger.debug(f"Found {len(matched_programmes)} programmes for tvg_id={epg.tvg_id}")
with transaction.atomic():
for prog in matched_programmes:
start_time = parse_xmltv_time(prog.get('start'))
end_time = parse_xmltv_time(prog.get('stop'))
title = prog.findtext('title', default='No Title')
desc = prog.findtext('desc', default='')
programs_to_create = []
for prog in matched_programmes:
start_time = parse_xmltv_time(prog.get('start'))
end_time = parse_xmltv_time(prog.get('stop'))
title = prog.findtext('title', default='No Title')
desc = prog.findtext('desc', default='')
obj, created = ProgramData.objects.update_or_create(
epg=epg,
start_time=start_time,
title=title,
defaults={
'end_time': end_time,
'description': desc,
'sub_title': '',
'tvg_id': epg.tvg_id,
}
)
if created:
logger.debug(f"Created ProgramData: {title} [{start_time} - {end_time}]")
programs_to_create.append(ProgramData(
epg=epg,
start_time=start_time,
title=title,
end_time=end_time,
description=desc,
sub_title='',
tvg_id=epg.tvg_id,
))
ProgramData.objects.bulk_create(programs_to_create)
logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.")
def parse_programs_for_source(epg_source, tvg_id=None):
file_path = epg_source.file_path
epg_entries = EPGData.objects.filter(epg_source=epg_source)
for epg in epg_entries:
if epg.tvg_id:
parse_programs_for_tvg_id(epg.id)
def fetch_schedules_direct(source):
logger.info(f"Fetching Schedules Direct data from source: {source.name}")

View file

@ -3,6 +3,7 @@ from django.urls import reverse
from apps.channels.models import Channel
from apps.epg.models import ProgramData
from django.utils import timezone
from datetime import datetime, timedelta
def generate_m3u(request):
"""
@ -33,6 +34,29 @@ def generate_m3u(request):
response['Content-Disposition'] = 'attachment; filename="channels.m3u"'
return response
def generate_dummy_epg(name, channel_id, num_days=7, interval_hours=4):
xml_lines = []
# Loop through the number of days
for day_offset in range(num_days):
current_day = datetime.now() + timedelta(days=day_offset)
# Loop through each 4-hour interval in the day
for hour in range(0, 24, interval_hours):
start_time = current_day.replace(hour=hour, minute=0, second=0, microsecond=0)
stop_time = start_time + timedelta(hours=interval_hours)
# Format the times as per the requested format
start_str = start_time.strftime("%Y%m%d%H%M%S") + " 0000"
stop_str = stop_time.strftime("%Y%m%d%H%M%S") + " 0000"
# Create the XML-like programme entry
xml_lines.append(f'<programme start="{start_str}" stop="{stop_str}" channel="{channel_id}">')
xml_lines.append(f' <title lang="en">{name}</title>')
xml_lines.append(f'</programme>')
return xml_lines
def generate_epg(request):
"""
Dynamically generate an XMLTV (EPG) file using the new EPGData/ProgramData models.
@ -40,37 +64,33 @@ def generate_epg(request):
by their associated EPGData record.
This version does not filter by time, so it includes the entire EPG saved in the DB.
"""
# Retrieve all ProgramData records and join the related EPGData record.
programs = ProgramData.objects.select_related('epg').all().order_by('start_time')
# Group programmes by their EPGData record.
epg_programs = {}
for prog in programs:
epg = prog.epg
epg_programs.setdefault(epg, []).append(prog)
xml_lines = []
xml_lines.append('<?xml version="1.0" encoding="UTF-8"?>')
xml_lines.append('<tv generator-info-name="Dispatcharr" generator-info-url="https://example.com">')
# Output channel definitions based on EPGData.
# Use the EPGData's tvg_id (or a fallback) as the channel identifier.
for epg in epg_programs.keys():
channel_id = epg.tvg_id if epg.tvg_id else f"default-{epg.id}"
# Retrieve all active channels
channels = Channel.objects.all()
for channel in channels:
channel_id = channel.epg_data.tvg_id if channel.epg_data else f"default-{channel.id}"
display_name = channel.epg_data.name if channel.epg_data else channel.name
xml_lines.append(f' <channel id="{channel_id}">')
xml_lines.append(f' <display-name>{epg.name}</display-name>')
xml_lines.append(f' <display-name>{display_name}</display-name>')
xml_lines.append(' </channel>')
# Output programme entries referencing the channel id from EPGData.
for epg, progs in epg_programs.items():
channel_id = epg.tvg_id if epg.tvg_id else f"default-{epg.id}"
for prog in progs:
start_str = prog.start_time.strftime("%Y%m%d%H%M%S %z")
stop_str = prog.end_time.strftime("%Y%m%d%H%M%S %z")
xml_lines.append(f' <programme start="{start_str}" stop="{stop_str}" channel="{channel_id}">')
xml_lines.append(f' <title>{prog.title}</title>')
xml_lines.append(f' <desc>{prog.description}</desc>')
xml_lines.append(' </programme>')
for channel in channels:
channel_id = channel.epg_data.tvg_id if channel.epg_data else f"default-{channel.id}"
display_name = channel.epg_data.name if channel.epg_data else channel.name
if not channel.epg_data:
xml_lines = xml_lines + generate_dummy_epg(display_name, channel_id)
else:
programs = channel.epg_data.programs.all()
for prog in programs:
start_str = prog.start_time.strftime("%Y%m%d%H%M%S %z")
stop_str = prog.end_time.strftime("%Y%m%d%H%M%S %z")
xml_lines.append(f' <programme start="{start_str}" stop="{stop_str}" channel="{channel_id}">')
xml_lines.append(f' <title>{prog.title}</title>')
xml_lines.append(f' <desc>{prog.description}</desc>')
xml_lines.append(' </programme>')
xml_lines.append('</tv>')
xml_content = "\n".join(xml_lines)

View file

@ -57,14 +57,13 @@ export const WebsocketProvider = ({ children }) => {
event = JSON.parse(event.data);
switch (event.data.type) {
case 'm3u_refresh':
console.log('inside m3u_refresh event');
if (event.data.success) {
fetchStreams();
notifications.show({
message: event.data.message,
color: 'green.5',
});
} else if (event.data.progress) {
} else if (event.data.progress !== undefined) {
if (event.data.progress == 100) {
fetchStreams();
fetchChannelGroups();

View file

@ -37,10 +37,9 @@ export default function M3URefreshNotification() {
return;
}
console.log('starting progress bar');
const notificationId = notifications.show({
loading: true,
title: `M3U Refresh: ${playlist.name}`,
title: `M3U Refresh`,
message: `Starting...`,
autoClose: false,
withCloseButton: false,
@ -48,6 +47,9 @@ export default function M3URefreshNotification() {
setProgress({
...progress,
...(playlist && {
title: `M3U Refresh: ${playlist.name}`,
}),
[id]: notificationId,
});
} else {

View file

@ -27,8 +27,9 @@ import {
useMantineTheme,
Popover,
ScrollArea,
Tooltip,
} from '@mantine/core';
import { ListOrdered, SquarePlus, SquareX } from 'lucide-react';
import { ListOrdered, SquarePlus, SquareX, X } from 'lucide-react';
import useEPGsStore from '../../store/epgs';
import { Dropzone } from '@mantine/dropzone';
import { FixedSizeList as List } from 'react-window';
@ -94,10 +95,6 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
values.stream_profile_id = null;
}
if (values.stream_profile_id == null) {
delete values.stream_profile_id;
}
if (channel?.id) {
await API.updateChannel({
id: channel.id,
@ -368,20 +365,6 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
)}
size="xs"
/>
<TextInput
id="channel_number"
name="channel_number"
label="Channel #"
value={formik.values.channel_number}
onChange={formik.handleChange}
error={
formik.errors.channel_number
? formik.touched.channel_number
: ''
}
size="xs"
/>
</Stack>
<Divider size="sm" orientation="vertical" />
@ -455,6 +438,20 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
<Divider size="sm" orientation="vertical" />
<Stack gap="5" style={{ flex: 1 }} justify="flex-start">
<TextInput
id="channel_number"
name="channel_number"
label="Channel #"
value={formik.values.channel_number}
onChange={formik.handleChange}
error={
formik.errors.channel_number
? formik.touched.channel_number
: ''
}
size="xs"
/>
<TextInput
id="tvg_id"
name="tvg_id"
@ -480,10 +477,27 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
value={
formik.values.epg_data_id
? tvgsById[formik.values.epg_data_id].name
: ''
: 'Dummy'
}
onClick={() => setEpgPopoverOpened(true)}
size="xs"
rightSection={
<Tooltip label="Use dummy EPG">
<ActionIcon
// color={theme.tailwind.green[5]}
color="white"
onClick={(e) => {
e.stopPropagation();
formik.setFieldValue('epg_data_id', null);
}}
title="Create new group"
size="small"
variant="transparent"
>
<X size="20" />
</ActionIcon>
</Tooltip>
}
/>
</Popover.Target>
@ -531,10 +545,14 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
justify="left"
size="xs"
onClick={() => {
formik.setFieldValue(
'epg_data_id',
filteredTvgs[index].id
);
if (filteredTvgs[index].id == '0') {
formik.setFieldValue('epg_data_id', null);
} else {
formik.setFieldValue(
'epg_data_id',
filteredTvgs[index].id
);
}
setEpgPopoverOpened(false);
}}
>
@ -546,16 +564,6 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
</ScrollArea>
</Popover.Dropdown>
</Popover>
<TextInput
id="logo_url"
name="logo_url"
label="Logo URL (Optional)"
style={{ marginBottom: 2 }}
value={formik.values.logo_url}
onChange={formik.handleChange}
size="xs"
/>
</Stack>
</Group>

View file

@ -63,7 +63,7 @@ export default function TVChannelGuide({ startDate, endDate }) {
// Filter your Redux/Zustand channels by matching tvg_id
const filteredChannels = Object.values(channels).filter((ch) =>
programIds.includes(ch.tvg_id)
programIds.includes(ch.epg_data?.tvg_id)
);
console.log(
`found ${filteredChannels.length} channels with matching tvg_ids`
@ -155,7 +155,7 @@ export default function TVChannelGuide({ startDate, endDate }) {
// Helper: find channel by tvg_id
function findChannelByTvgId(tvgId) {
return guideChannels.find((ch) => ch.tvg_id === tvgId);
return guideChannels.find((ch) => ch.epg_data?.tvg_id === tvgId);
}
// The Watch Now click => show floating video
@ -420,7 +420,7 @@ export default function TVChannelGuide({ startDate, endDate }) {
{/* Channel rows */}
{guideChannels.map((channel) => {
const channelPrograms = programs.filter(
(p) => p.tvg_id === channel.tvg_id
(p) => p.tvg_id === channel.epg_data?.tvg_id
);
return (
<Box