forked from Mirrors/Dispatcharr
commit
ecbef65891
19 changed files with 459 additions and 128 deletions
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
|
|
@ -3,6 +3,8 @@ name: CI Pipeline
|
|||
on:
|
||||
push:
|
||||
branches: [dev]
|
||||
paths-ignore:
|
||||
- '**.md'
|
||||
pull_request:
|
||||
branches: [dev]
|
||||
workflow_dispatch:
|
||||
|
|
|
|||
20
CHANGELOG.md
20
CHANGELOG.md
|
|
@ -7,6 +7,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Sort buttons for 'Group' and 'M3U' columns in Streams table for improved stream organization and filtering - Thanks [@bobey6](https://github.com/bobey6)
|
||||
- EPG source priority field for controlling which EPG source is preferred when multiple sources have matching entries for a channel (higher numbers = higher priority) (Closes #603)
|
||||
|
||||
### Changed
|
||||
|
||||
- EPG program parsing optimized for sources with many channels but only a fraction mapped. Now parses XML file once per source instead of once per channel, dramatically reducing I/O and CPU overhead. For sources with 10,000 channels and 100 mapped, this results in ~99x fewer file opens and ~100x fewer full file scans. Orphaned programs for unmapped channels are also cleaned up during refresh to prevent database bloat. Database updates are now atomic to prevent clients from seeing empty/partial EPG data during refresh.
|
||||
- EPG table now displays detailed status messages including refresh progress, success messages, and last message for idle sources (matching M3U table behavior) (Closes #214)
|
||||
- IPv6 access now allowed by default with all IPv6 CIDRs accepted - Thanks [@adrianmace](https://github.com/adrianmace)
|
||||
- nginx.conf updated to bind to both IPv4 and IPv6 ports - Thanks [@jordandalley](https://github.com/jordandalley)
|
||||
- EPG matching now respects source priority and only uses active (enabled) EPG sources (Closes #672)
|
||||
- EPG form API Key field now only visible when Schedules Direct source type is selected
|
||||
|
||||
### Fixed
|
||||
|
||||
- EPG table "Updated" column now updates in real-time via WebSocket using the actual backend timestamp instead of requiring a page refresh
|
||||
- Bulk channel editor confirmation dialog now displays the correct stream profile name that will be applied to the selected channels.
|
||||
- uWSGI not found and 502 bad gateway on first startup
|
||||
|
||||
## [0.13.1] - 2025-12-06
|
||||
|
||||
### Fixed
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ class StreamViewSet(viewsets.ModelViewSet):
|
|||
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
||||
filterset_class = StreamFilter
|
||||
search_fields = ["name", "channel_group__name"]
|
||||
ordering_fields = ["name", "channel_group__name"]
|
||||
ordering_fields = ["name", "channel_group__name", "m3u_account__name"]
|
||||
ordering = ["-name"]
|
||||
|
||||
def get_permissions(self):
|
||||
|
|
|
|||
|
|
@ -295,7 +295,11 @@ def match_channels_to_epg(channels_data, epg_data, region_code=None, use_ml=True
|
|||
if score > 50: # Only show decent matches
|
||||
logger.debug(f" EPG '{row['name']}' (norm: '{row['norm_name']}') => score: {score} (base: {base_score}, bonus: {bonus})")
|
||||
|
||||
if score > best_score:
|
||||
# When scores are equal, prefer higher priority EPG source
|
||||
row_priority = row.get('epg_source_priority', 0)
|
||||
best_priority = best_epg.get('epg_source_priority', 0) if best_epg else -1
|
||||
|
||||
if score > best_score or (score == best_score and row_priority > best_priority):
|
||||
best_score = score
|
||||
best_epg = row
|
||||
|
||||
|
|
@ -471,9 +475,9 @@ def match_epg_channels():
|
|||
"norm_chan": normalize_name(channel.name) # Always use channel name for fuzzy matching!
|
||||
})
|
||||
|
||||
# Get all EPG data
|
||||
# Get all EPG data from active sources, ordered by source priority (highest first) so we prefer higher priority matches
|
||||
epg_data = []
|
||||
for epg in EPGData.objects.all():
|
||||
for epg in EPGData.objects.select_related('epg_source').filter(epg_source__is_active=True):
|
||||
normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
|
||||
epg_data.append({
|
||||
'id': epg.id,
|
||||
|
|
@ -482,9 +486,13 @@ def match_epg_channels():
|
|||
'name': epg.name,
|
||||
'norm_name': normalize_name(epg.name),
|
||||
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
|
||||
'epg_source_priority': epg.epg_source.priority if epg.epg_source else 0,
|
||||
})
|
||||
|
||||
logger.info(f"Processing {len(channels_data)} channels against {len(epg_data)} EPG entries")
|
||||
# Sort EPG data by source priority (highest first) so we prefer higher priority matches
|
||||
epg_data.sort(key=lambda x: x['epg_source_priority'], reverse=True)
|
||||
|
||||
logger.info(f"Processing {len(channels_data)} channels against {len(epg_data)} EPG entries (from active sources only)")
|
||||
|
||||
# Run EPG matching with progress updates - automatically uses conservative thresholds for bulk operations
|
||||
result = match_channels_to_epg(channels_data, epg_data, region_code, use_ml=True, send_progress=True)
|
||||
|
|
@ -618,9 +626,9 @@ def match_selected_channels_epg(channel_ids):
|
|||
"norm_chan": normalize_name(channel.name)
|
||||
})
|
||||
|
||||
# Get all EPG data
|
||||
# Get all EPG data from active sources, ordered by source priority (highest first) so we prefer higher priority matches
|
||||
epg_data = []
|
||||
for epg in EPGData.objects.all():
|
||||
for epg in EPGData.objects.select_related('epg_source').filter(epg_source__is_active=True):
|
||||
normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
|
||||
epg_data.append({
|
||||
'id': epg.id,
|
||||
|
|
@ -629,9 +637,13 @@ def match_selected_channels_epg(channel_ids):
|
|||
'name': epg.name,
|
||||
'norm_name': normalize_name(epg.name),
|
||||
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
|
||||
'epg_source_priority': epg.epg_source.priority if epg.epg_source else 0,
|
||||
})
|
||||
|
||||
logger.info(f"Processing {len(channels_data)} selected channels against {len(epg_data)} EPG entries")
|
||||
# Sort EPG data by source priority (highest first) so we prefer higher priority matches
|
||||
epg_data.sort(key=lambda x: x['epg_source_priority'], reverse=True)
|
||||
|
||||
logger.info(f"Processing {len(channels_data)} selected channels against {len(epg_data)} EPG entries (from active sources only)")
|
||||
|
||||
# Run EPG matching with progress updates - automatically uses appropriate thresholds
|
||||
result = match_channels_to_epg(channels_data, epg_data, region_code, use_ml=True, send_progress=True)
|
||||
|
|
@ -749,9 +761,10 @@ def match_single_channel_epg(channel_id):
|
|||
test_normalized = normalize_name(test_name)
|
||||
logger.debug(f"DEBUG normalization example: '{test_name}' → '{test_normalized}' (call sign preserved)")
|
||||
|
||||
# Get all EPG data for matching - must include norm_name field
|
||||
# Get all EPG data for matching from active sources - must include norm_name field
|
||||
# Ordered by source priority (highest first) so we prefer higher priority matches
|
||||
epg_data_list = []
|
||||
for epg in EPGData.objects.filter(name__isnull=False).exclude(name=''):
|
||||
for epg in EPGData.objects.select_related('epg_source').filter(epg_source__is_active=True, name__isnull=False).exclude(name=''):
|
||||
normalized_epg_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
|
||||
epg_data_list.append({
|
||||
'id': epg.id,
|
||||
|
|
@ -760,10 +773,14 @@ def match_single_channel_epg(channel_id):
|
|||
'name': epg.name,
|
||||
'norm_name': normalize_name(epg.name),
|
||||
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
|
||||
'epg_source_priority': epg.epg_source.priority if epg.epg_source else 0,
|
||||
})
|
||||
|
||||
# Sort EPG data by source priority (highest first) so we prefer higher priority matches
|
||||
epg_data_list.sort(key=lambda x: x['epg_source_priority'], reverse=True)
|
||||
|
||||
if not epg_data_list:
|
||||
return {"matched": False, "message": "No EPG data available for matching"}
|
||||
return {"matched": False, "message": "No EPG data available for matching (from active sources)"}
|
||||
|
||||
logger.info(f"Matching single channel '{channel.name}' against {len(epg_data_list)} EPG entries")
|
||||
|
||||
|
|
|
|||
18
apps/epg/migrations/0021_epgsource_priority.py
Normal file
18
apps/epg/migrations/0021_epgsource_priority.py
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
# Generated by Django 5.2.4 on 2025-12-05 15:24
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('epg', '0020_migrate_time_to_starttime_placeholders'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='epgsource',
|
||||
name='priority',
|
||||
field=models.PositiveIntegerField(default=0, help_text='Priority for EPG matching (higher numbers = higher priority). Used when multiple EPG sources have matching entries for a channel.'),
|
||||
),
|
||||
]
|
||||
|
|
@ -45,6 +45,10 @@ class EPGSource(models.Model):
|
|||
null=True,
|
||||
help_text="Custom properties for dummy EPG configuration (regex patterns, timezone, duration, etc.)"
|
||||
)
|
||||
priority = models.PositiveIntegerField(
|
||||
default=0,
|
||||
help_text="Priority for EPG matching (higher numbers = higher priority). Used when multiple EPG sources have matching entries for a channel."
|
||||
)
|
||||
status = models.CharField(
|
||||
max_length=20,
|
||||
choices=STATUS_CHOICES,
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ class EPGSourceSerializer(serializers.ModelSerializer):
|
|||
'is_active',
|
||||
'file_path',
|
||||
'refresh_interval',
|
||||
'priority',
|
||||
'status',
|
||||
'last_message',
|
||||
'created_at',
|
||||
|
|
|
|||
|
|
@ -1393,11 +1393,23 @@ def parse_programs_for_tvg_id(epg_id):
|
|||
|
||||
|
||||
def parse_programs_for_source(epg_source, tvg_id=None):
|
||||
"""
|
||||
Parse programs for all MAPPED channels from an EPG source in a single pass.
|
||||
|
||||
This is an optimized version that:
|
||||
1. Only processes EPG entries that are actually mapped to channels
|
||||
2. Parses the XML file ONCE instead of once per channel
|
||||
3. Skips programmes for unmapped channels entirely during parsing
|
||||
|
||||
This dramatically improves performance when an EPG source has many channels
|
||||
but only a fraction are mapped.
|
||||
"""
|
||||
# Send initial programs parsing notification
|
||||
send_epg_update(epg_source.id, "parsing_programs", 0)
|
||||
should_log_memory = False
|
||||
process = None
|
||||
initial_memory = 0
|
||||
source_file = None
|
||||
|
||||
# Add memory tracking only in trace mode or higher
|
||||
try:
|
||||
|
|
@ -1417,82 +1429,229 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
should_log_memory = False
|
||||
|
||||
try:
|
||||
# Process EPG entries in batches rather than all at once
|
||||
batch_size = 20 # Process fewer channels at once to reduce memory usage
|
||||
epg_count = EPGData.objects.filter(epg_source=epg_source).count()
|
||||
# Only get EPG entries that are actually mapped to channels
|
||||
mapped_epg_ids = set(
|
||||
Channel.objects.filter(
|
||||
epg_data__epg_source=epg_source,
|
||||
epg_data__isnull=False
|
||||
).values_list('epg_data_id', flat=True)
|
||||
)
|
||||
|
||||
if epg_count == 0:
|
||||
logger.info(f"No EPG entries found for source: {epg_source.name}")
|
||||
# Update status - this is not an error, just no entries
|
||||
if not mapped_epg_ids:
|
||||
total_epg_count = EPGData.objects.filter(epg_source=epg_source).count()
|
||||
logger.info(f"No channels mapped to any EPG entries from source: {epg_source.name} "
|
||||
f"(source has {total_epg_count} EPG entries, 0 mapped)")
|
||||
# Update status - this is not an error, just no mapped entries
|
||||
epg_source.status = 'success'
|
||||
epg_source.save(update_fields=['status'])
|
||||
epg_source.last_message = f"No channels mapped to this EPG source ({total_epg_count} entries available)"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="success")
|
||||
return True
|
||||
|
||||
logger.info(f"Parsing programs for {epg_count} EPG entries from source: {epg_source.name}")
|
||||
# Get the mapped EPG entries with their tvg_ids
|
||||
mapped_epgs = EPGData.objects.filter(id__in=mapped_epg_ids).values('id', 'tvg_id')
|
||||
tvg_id_to_epg_id = {epg['tvg_id']: epg['id'] for epg in mapped_epgs if epg['tvg_id']}
|
||||
mapped_tvg_ids = set(tvg_id_to_epg_id.keys())
|
||||
|
||||
failed_entries = []
|
||||
program_count = 0
|
||||
channel_count = 0
|
||||
updated_count = 0
|
||||
processed = 0
|
||||
# Process in batches using cursor-based approach to limit memory usage
|
||||
last_id = 0
|
||||
while True:
|
||||
# Get a batch of EPG entries
|
||||
batch_entries = list(EPGData.objects.filter(
|
||||
epg_source=epg_source,
|
||||
id__gt=last_id
|
||||
).order_by('id')[:batch_size])
|
||||
total_epg_count = EPGData.objects.filter(epg_source=epg_source).count()
|
||||
mapped_count = len(mapped_tvg_ids)
|
||||
|
||||
if not batch_entries:
|
||||
break # No more entries to process
|
||||
logger.info(f"Parsing programs for {mapped_count} MAPPED channels from source: {epg_source.name} "
|
||||
f"(skipping {total_epg_count - mapped_count} unmapped EPG entries)")
|
||||
|
||||
# Update last_id for next iteration
|
||||
last_id = batch_entries[-1].id
|
||||
# Get the file path
|
||||
file_path = epg_source.extracted_file_path if epg_source.extracted_file_path else epg_source.file_path
|
||||
if not file_path:
|
||||
file_path = epg_source.get_cache_file()
|
||||
|
||||
# Process this batch
|
||||
for epg in batch_entries:
|
||||
if epg.tvg_id:
|
||||
try:
|
||||
result = parse_programs_for_tvg_id(epg.id)
|
||||
if result == "Task already running":
|
||||
logger.info(f"Program parse for {epg.id} already in progress, skipping")
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"EPG file not found at: {file_path}")
|
||||
|
||||
processed += 1
|
||||
progress = min(95, int((processed / epg_count) * 100)) if epg_count > 0 else 50
|
||||
send_epg_update(epg_source.id, "parsing_programs", progress)
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True)
|
||||
failed_entries.append(f"{epg.tvg_id}: {str(e)}")
|
||||
if epg_source.url:
|
||||
# Update the file path in the database
|
||||
new_path = epg_source.get_cache_file()
|
||||
logger.info(f"Updating file_path from '{file_path}' to '{new_path}'")
|
||||
epg_source.file_path = new_path
|
||||
epg_source.save(update_fields=['file_path'])
|
||||
logger.info(f"Fetching new EPG data from URL: {epg_source.url}")
|
||||
|
||||
# Force garbage collection after each batch
|
||||
batch_entries = None # Remove reference to help garbage collection
|
||||
# Fetch new data before continuing
|
||||
fetch_success = fetch_xmltv(epg_source)
|
||||
|
||||
if not fetch_success:
|
||||
logger.error(f"Failed to fetch EPG data for source: {epg_source.name}")
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"Failed to download EPG data"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file")
|
||||
return False
|
||||
|
||||
# Update file_path with the new location
|
||||
file_path = epg_source.extracted_file_path if epg_source.extracted_file_path else epg_source.file_path
|
||||
else:
|
||||
logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data")
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_message = f"No URL provided, cannot fetch EPG data"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided")
|
||||
return False
|
||||
|
||||
# SINGLE PASS PARSING: Parse the XML file once and collect all programs in memory
|
||||
# We parse FIRST, then do an atomic delete+insert to avoid race conditions
|
||||
# where clients might see empty/partial EPG data during the transition
|
||||
all_programs_to_create = []
|
||||
programs_by_channel = {tvg_id: 0 for tvg_id in mapped_tvg_ids} # Track count per channel
|
||||
total_programs = 0
|
||||
skipped_programs = 0
|
||||
last_progress_update = 0
|
||||
|
||||
try:
|
||||
logger.debug(f"Opening file for single-pass parsing: {file_path}")
|
||||
source_file = open(file_path, 'rb')
|
||||
|
||||
# Stream parse the file using lxml's iterparse
|
||||
program_parser = etree.iterparse(source_file, events=('end',), tag='programme', remove_blank_text=True, recover=True)
|
||||
|
||||
for _, elem in program_parser:
|
||||
channel_id = elem.get('channel')
|
||||
|
||||
# Skip programmes for unmapped channels immediately
|
||||
if channel_id not in mapped_tvg_ids:
|
||||
skipped_programs += 1
|
||||
# Clear element to free memory
|
||||
clear_element(elem)
|
||||
continue
|
||||
|
||||
# This programme is for a mapped channel - process it
|
||||
try:
|
||||
start_time = parse_xmltv_time(elem.get('start'))
|
||||
end_time = parse_xmltv_time(elem.get('stop'))
|
||||
title = None
|
||||
desc = None
|
||||
sub_title = None
|
||||
|
||||
# Efficiently process child elements
|
||||
for child in elem:
|
||||
if child.tag == 'title':
|
||||
title = child.text or 'No Title'
|
||||
elif child.tag == 'desc':
|
||||
desc = child.text or ''
|
||||
elif child.tag == 'sub-title':
|
||||
sub_title = child.text or ''
|
||||
|
||||
if not title:
|
||||
title = 'No Title'
|
||||
|
||||
# Extract custom properties
|
||||
custom_props = extract_custom_properties(elem)
|
||||
custom_properties_json = custom_props if custom_props else None
|
||||
|
||||
epg_id = tvg_id_to_epg_id[channel_id]
|
||||
all_programs_to_create.append(ProgramData(
|
||||
epg_id=epg_id,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
title=title,
|
||||
description=desc,
|
||||
sub_title=sub_title,
|
||||
tvg_id=channel_id,
|
||||
custom_properties=custom_properties_json
|
||||
))
|
||||
total_programs += 1
|
||||
programs_by_channel[channel_id] += 1
|
||||
|
||||
# Clear the element to free memory
|
||||
clear_element(elem)
|
||||
|
||||
# Send progress update (estimate based on programs processed)
|
||||
if total_programs - last_progress_update >= 5000:
|
||||
last_progress_update = total_programs
|
||||
# Cap at 70% during parsing phase (save 30% for DB operations)
|
||||
progress = min(70, 10 + int((total_programs / max(total_programs + 10000, 1)) * 60))
|
||||
send_epg_update(epg_source.id, "parsing_programs", progress,
|
||||
processed=total_programs, channels=mapped_count)
|
||||
|
||||
# Periodic garbage collection during parsing
|
||||
if total_programs % 5000 == 0:
|
||||
gc.collect()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing program for {channel_id}: {e}", exc_info=True)
|
||||
clear_element(elem)
|
||||
continue
|
||||
|
||||
except etree.XMLSyntaxError as xml_error:
|
||||
logger.error(f"XML syntax error parsing program data: {xml_error}")
|
||||
epg_source.status = EPGSource.STATUS_ERROR
|
||||
epg_source.last_message = f"XML parsing error: {str(xml_error)}"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", message=str(xml_error))
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing XML for programs: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
if source_file:
|
||||
source_file.close()
|
||||
source_file = None
|
||||
|
||||
# Now perform atomic delete + bulk insert
|
||||
# This ensures clients never see empty/partial EPG data
|
||||
logger.info(f"Parsed {total_programs} programs, performing atomic database update...")
|
||||
send_epg_update(epg_source.id, "parsing_programs", 75, message="Updating database...")
|
||||
|
||||
batch_size = 1000
|
||||
try:
|
||||
with transaction.atomic():
|
||||
# Delete existing programs for mapped EPGs
|
||||
deleted_count = ProgramData.objects.filter(epg_id__in=mapped_epg_ids).delete()[0]
|
||||
logger.debug(f"Deleted {deleted_count} existing programs")
|
||||
|
||||
# Clean up orphaned programs for unmapped EPG entries
|
||||
unmapped_epg_ids = list(EPGData.objects.filter(
|
||||
epg_source=epg_source
|
||||
).exclude(id__in=mapped_epg_ids).values_list('id', flat=True))
|
||||
|
||||
if unmapped_epg_ids:
|
||||
orphaned_count = ProgramData.objects.filter(epg_id__in=unmapped_epg_ids).delete()[0]
|
||||
if orphaned_count > 0:
|
||||
logger.info(f"Cleaned up {orphaned_count} orphaned programs for {len(unmapped_epg_ids)} unmapped EPG entries")
|
||||
|
||||
# Bulk insert all new programs in batches within the same transaction
|
||||
for i in range(0, len(all_programs_to_create), batch_size):
|
||||
batch = all_programs_to_create[i:i + batch_size]
|
||||
ProgramData.objects.bulk_create(batch)
|
||||
|
||||
# Update progress during insertion
|
||||
progress = 75 + int((i / len(all_programs_to_create)) * 20) if all_programs_to_create else 95
|
||||
if i % (batch_size * 5) == 0:
|
||||
send_epg_update(epg_source.id, "parsing_programs", min(95, progress),
|
||||
message=f"Inserting programs... {i}/{len(all_programs_to_create)}")
|
||||
|
||||
logger.info(f"Atomic update complete: deleted {deleted_count}, inserted {total_programs} programs")
|
||||
|
||||
except Exception as db_error:
|
||||
logger.error(f"Database error during atomic update: {db_error}", exc_info=True)
|
||||
epg_source.status = EPGSource.STATUS_ERROR
|
||||
epg_source.last_message = f"Database error: {str(db_error)}"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", message=str(db_error))
|
||||
return False
|
||||
finally:
|
||||
# Clear the large list to free memory
|
||||
all_programs_to_create = None
|
||||
gc.collect()
|
||||
|
||||
# If there were failures, include them in the message but continue
|
||||
if failed_entries:
|
||||
epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed
|
||||
error_summary = f"Failed to parse {len(failed_entries)} of {epg_count} entries"
|
||||
stats_summary = f"Processed {program_count} programs across {channel_count} channels. Updated: {updated_count}."
|
||||
epg_source.last_message = f"{stats_summary} Warning: {error_summary}"
|
||||
epg_source.updated_at = timezone.now()
|
||||
epg_source.save(update_fields=['status', 'last_message', 'updated_at'])
|
||||
# Count channels that actually got programs
|
||||
channels_with_programs = sum(1 for count in programs_by_channel.values() if count > 0)
|
||||
|
||||
# Send completion notification with mixed status
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100,
|
||||
status="success",
|
||||
message=epg_source.last_message)
|
||||
|
||||
# Explicitly release memory of large lists before returning
|
||||
del failed_entries
|
||||
gc.collect()
|
||||
|
||||
return True
|
||||
|
||||
# If all successful, set a comprehensive success message
|
||||
# Success message
|
||||
epg_source.status = EPGSource.STATUS_SUCCESS
|
||||
epg_source.last_message = f"Successfully processed {program_count} programs across {channel_count} channels. Updated: {updated_count}."
|
||||
epg_source.last_message = (
|
||||
f"Parsed {total_programs:,} programs for {channels_with_programs} channels "
|
||||
f"(skipped {skipped_programs:,} programs for {total_epg_count - mapped_count} unmapped channels)"
|
||||
)
|
||||
epg_source.updated_at = timezone.now()
|
||||
epg_source.save(update_fields=['status', 'last_message', 'updated_at'])
|
||||
|
||||
|
|
@ -1500,17 +1659,21 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
log_system_event(
|
||||
event_type='epg_refresh',
|
||||
source_name=epg_source.name,
|
||||
programs=program_count,
|
||||
channels=channel_count,
|
||||
updated=updated_count,
|
||||
programs=total_programs,
|
||||
channels=channels_with_programs,
|
||||
skipped_programs=skipped_programs,
|
||||
unmapped_channels=total_epg_count - mapped_count,
|
||||
)
|
||||
|
||||
# Send completion notification with status
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100,
|
||||
status="success",
|
||||
message=epg_source.last_message)
|
||||
message=epg_source.last_message,
|
||||
updated_at=epg_source.updated_at.isoformat())
|
||||
|
||||
logger.info(f"Completed parsing all programs for source: {epg_source.name}")
|
||||
logger.info(f"Completed parsing programs for source: {epg_source.name} - "
|
||||
f"{total_programs:,} programs for {channels_with_programs} channels, "
|
||||
f"skipped {skipped_programs:,} programs for unmapped channels")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -1525,14 +1688,19 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
return False
|
||||
finally:
|
||||
# Final memory cleanup and tracking
|
||||
|
||||
if source_file:
|
||||
try:
|
||||
source_file.close()
|
||||
except:
|
||||
pass
|
||||
source_file = None
|
||||
|
||||
# Explicitly release any remaining large data structures
|
||||
failed_entries = None
|
||||
program_count = None
|
||||
channel_count = None
|
||||
updated_count = None
|
||||
processed = None
|
||||
programs_to_create = None
|
||||
programs_by_channel = None
|
||||
mapped_epg_ids = None
|
||||
mapped_tvg_ids = None
|
||||
tvg_id_to_epg_id = None
|
||||
gc.collect()
|
||||
|
||||
# Add comprehensive memory cleanup at the end
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ def network_access_allowed(request, settings_key):
|
|||
cidrs = (
|
||||
network_access[settings_key].split(",")
|
||||
if settings_key in network_access
|
||||
else ["0.0.0.0/0"]
|
||||
else ["0.0.0.0/0", "::/0"]
|
||||
)
|
||||
|
||||
network_allowed = False
|
||||
|
|
|
|||
|
|
@ -41,8 +41,10 @@ export DISPATCHARR_PORT=${DISPATCHARR_PORT:-9191}
|
|||
export LIBVA_DRIVERS_PATH='/usr/local/lib/x86_64-linux-gnu/dri'
|
||||
export LD_LIBRARY_PATH='/usr/local/lib'
|
||||
export SECRET_FILE="/data/jwt"
|
||||
|
||||
# Ensure Django secret key exists or generate a new one
|
||||
if [ ! -f "$SECRET_FILE" ]; then
|
||||
echo "Generating new Django secret key..."
|
||||
old_umask=$(umask)
|
||||
umask 077
|
||||
tmpfile="$(mktemp "${SECRET_FILE}.XXXXXX")" || { echo "mktemp failed"; exit 1; }
|
||||
python3 - <<'PY' >"$tmpfile" || { echo "secret generation failed"; rm -f "$tmpfile"; exit 1; }
|
||||
|
|
@ -50,11 +52,8 @@ import secrets
|
|||
print(secrets.token_urlsafe(64))
|
||||
PY
|
||||
mv -f "$tmpfile" "$SECRET_FILE" || { echo "move failed"; rm -f "$tmpfile"; exit 1; }
|
||||
umask $old_umask
|
||||
fi
|
||||
|
||||
chown $PUID:$PGID "$SECRET_FILE" || true
|
||||
chmod 600 "$SECRET_FILE" || true
|
||||
|
||||
export DJANGO_SECRET_KEY="$(cat "$SECRET_FILE")"
|
||||
|
||||
# Process priority configuration
|
||||
|
|
@ -203,7 +202,7 @@ fi
|
|||
# Users can override via UWSGI_NICE_LEVEL environment variable in docker-compose
|
||||
# Start with nice as root, then use setpriv to drop privileges to dispatch user
|
||||
# This preserves both the nice value and environment variables
|
||||
nice -n $UWSGI_NICE_LEVEL su -p - "$POSTGRES_USER" -c "cd /app && exec uwsgi $uwsgi_args" & uwsgi_pid=$!
|
||||
nice -n $UWSGI_NICE_LEVEL su - "$POSTGRES_USER" -c "cd /app && exec /dispatcharrpy/bin/uwsgi $uwsgi_args" & uwsgi_pid=$!
|
||||
echo "✅ uwsgi started with PID $uwsgi_pid (nice $UWSGI_NICE_LEVEL)"
|
||||
pids+=("$uwsgi_pid")
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ proxy_cache_path /app/logo_cache levels=1:2 keys_zone=logo_cache:10m
|
|||
|
||||
server {
|
||||
listen NGINX_PORT;
|
||||
listen [::]:NGINX_PORT;
|
||||
|
||||
proxy_connect_timeout 75;
|
||||
proxy_send_timeout 300;
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ module = scripts.debug_wrapper:application
|
|||
virtualenv = /dispatcharrpy
|
||||
master = true
|
||||
env = DJANGO_SETTINGS_MODULE=dispatcharr.settings
|
||||
|
||||
socket = /app/uwsgi.sock
|
||||
chmod-socket = 777
|
||||
vacuum = true
|
||||
|
|
|
|||
|
|
@ -574,7 +574,7 @@ export const WebsocketProvider = ({ children }) => {
|
|||
const sourceId =
|
||||
parsedEvent.data.source || parsedEvent.data.account;
|
||||
const epg = epgs[sourceId];
|
||||
|
||||
|
||||
// Only update progress if the EPG still exists in the store
|
||||
// This prevents crashes when receiving updates for deleted EPGs
|
||||
if (epg) {
|
||||
|
|
@ -582,7 +582,9 @@ export const WebsocketProvider = ({ children }) => {
|
|||
updateEPGProgress(parsedEvent.data);
|
||||
} else {
|
||||
// EPG was deleted, ignore this update
|
||||
console.debug(`Ignoring EPG refresh update for deleted EPG ${sourceId}`);
|
||||
console.debug(
|
||||
`Ignoring EPG refresh update for deleted EPG ${sourceId}`
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
@ -621,6 +623,10 @@ export const WebsocketProvider = ({ children }) => {
|
|||
status: parsedEvent.data.status || 'success',
|
||||
last_message:
|
||||
parsedEvent.data.message || epg.last_message,
|
||||
// Use the timestamp from the backend if provided
|
||||
...(parsedEvent.data.updated_at && {
|
||||
updated_at: parsedEvent.data.updated_at,
|
||||
}),
|
||||
});
|
||||
|
||||
// Only show success notification if we've finished parsing programs and had no errors
|
||||
|
|
|
|||
|
|
@ -135,8 +135,10 @@ const ChannelBatchForm = ({ channelIds, isOpen, onClose }) => {
|
|||
if (values.stream_profile_id === '0') {
|
||||
changes.push(`• Stream Profile: Use Default`);
|
||||
} else {
|
||||
const profileName =
|
||||
streamProfiles[values.stream_profile_id]?.name || 'Selected Profile';
|
||||
const profile = streamProfiles.find(
|
||||
(p) => `${p.id}` === `${values.stream_profile_id}`
|
||||
);
|
||||
const profileName = profile?.name || 'Selected Profile';
|
||||
changes.push(`• Stream Profile: ${profileName}`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
|
|||
api_key: '',
|
||||
is_active: true,
|
||||
refresh_interval: 24,
|
||||
priority: 0,
|
||||
},
|
||||
|
||||
validate: {
|
||||
|
|
@ -69,6 +70,7 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
|
|||
api_key: epg.api_key,
|
||||
is_active: epg.is_active,
|
||||
refresh_interval: epg.refresh_interval,
|
||||
priority: epg.priority ?? 0,
|
||||
};
|
||||
form.setValues(values);
|
||||
setSourceType(epg.source_type);
|
||||
|
|
@ -148,14 +150,24 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
|
|||
key={form.key('url')}
|
||||
/>
|
||||
|
||||
<TextInput
|
||||
id="api_key"
|
||||
name="api_key"
|
||||
label="API Key"
|
||||
description="API key for services that require authentication"
|
||||
{...form.getInputProps('api_key')}
|
||||
key={form.key('api_key')}
|
||||
disabled={sourceType !== 'schedules_direct'}
|
||||
{sourceType === 'schedules_direct' && (
|
||||
<TextInput
|
||||
id="api_key"
|
||||
name="api_key"
|
||||
label="API Key"
|
||||
description="API key for services that require authentication"
|
||||
{...form.getInputProps('api_key')}
|
||||
key={form.key('api_key')}
|
||||
/>
|
||||
)}
|
||||
|
||||
<NumberInput
|
||||
min={0}
|
||||
max={999}
|
||||
label="Priority"
|
||||
description="Priority for EPG matching (higher numbers = higher priority). Used when multiple EPG sources have matching entries for a channel."
|
||||
{...form.getInputProps('priority')}
|
||||
key={form.key('priority')}
|
||||
/>
|
||||
|
||||
{/* Put checkbox at the same level as Refresh Interval */}
|
||||
|
|
|
|||
|
|
@ -105,6 +105,7 @@ const CustomTableHeader = ({
|
|||
...(header.column.columnDef.style &&
|
||||
header.column.columnDef.style),
|
||||
height: '100%',
|
||||
width: '100%',
|
||||
paddingRight: header.column.getCanResize() ? '8px' : '0px', // Add padding for resize handle
|
||||
}}
|
||||
>
|
||||
|
|
|
|||
|
|
@ -160,6 +160,9 @@ const EPGsTable = () => {
|
|||
case 'downloading':
|
||||
label = 'Downloading';
|
||||
break;
|
||||
case 'extracting':
|
||||
label = 'Extracting';
|
||||
break;
|
||||
case 'parsing_channels':
|
||||
label = 'Parsing Channels';
|
||||
break;
|
||||
|
|
@ -170,6 +173,22 @@ const EPGsTable = () => {
|
|||
return null;
|
||||
}
|
||||
|
||||
// Build additional info string from progress data
|
||||
let additionalInfo = '';
|
||||
if (progress.message) {
|
||||
additionalInfo = progress.message;
|
||||
} else if (
|
||||
progress.processed !== undefined &&
|
||||
progress.channels !== undefined
|
||||
) {
|
||||
additionalInfo = `${progress.processed.toLocaleString()} programs for ${progress.channels} channels`;
|
||||
} else if (
|
||||
progress.processed !== undefined &&
|
||||
progress.total !== undefined
|
||||
) {
|
||||
additionalInfo = `${progress.processed.toLocaleString()} / ${progress.total.toLocaleString()}`;
|
||||
}
|
||||
|
||||
return (
|
||||
<Stack spacing={2}>
|
||||
<Text size="xs">
|
||||
|
|
@ -181,7 +200,14 @@ const EPGsTable = () => {
|
|||
style={{ margin: '2px 0' }}
|
||||
/>
|
||||
{progress.speed && (
|
||||
<Text size="xs">Speed: {parseInt(progress.speed)} KB/s</Text>
|
||||
<Text size="xs" c="dimmed">
|
||||
Speed: {parseInt(progress.speed)} KB/s
|
||||
</Text>
|
||||
)}
|
||||
{additionalInfo && (
|
||||
<Text size="xs" c="dimmed" lineClamp={1}>
|
||||
{additionalInfo}
|
||||
</Text>
|
||||
)}
|
||||
</Stack>
|
||||
);
|
||||
|
|
@ -286,14 +312,35 @@ const EPGsTable = () => {
|
|||
|
||||
// Show success message for successful sources
|
||||
if (data.status === 'success') {
|
||||
const successMessage =
|
||||
data.last_message || 'EPG data refreshed successfully';
|
||||
return (
|
||||
<Text
|
||||
c="dimmed"
|
||||
size="xs"
|
||||
style={{ color: theme.colors.green[6], lineHeight: 1.3 }}
|
||||
>
|
||||
EPG data refreshed successfully
|
||||
</Text>
|
||||
<Tooltip label={successMessage} multiline width={300}>
|
||||
<Text
|
||||
c="dimmed"
|
||||
size="xs"
|
||||
lineClamp={2}
|
||||
style={{ color: theme.colors.green[6], lineHeight: 1.3 }}
|
||||
>
|
||||
{successMessage}
|
||||
</Text>
|
||||
</Tooltip>
|
||||
);
|
||||
}
|
||||
|
||||
// Show last_message for idle sources (from previous refresh)
|
||||
if (data.status === 'idle' && data.last_message) {
|
||||
return (
|
||||
<Tooltip label={data.last_message} multiline width={300}>
|
||||
<Text
|
||||
c="dimmed"
|
||||
size="xs"
|
||||
lineClamp={2}
|
||||
style={{ lineHeight: 1.3 }}
|
||||
>
|
||||
{data.last_message}
|
||||
</Text>
|
||||
</Tooltip>
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -183,7 +183,7 @@ const StreamsTable = () => {
|
|||
const [pageCount, setPageCount] = useState(0);
|
||||
const [paginationString, setPaginationString] = useState('');
|
||||
const [isLoading, setIsLoading] = useState(true);
|
||||
const [sorting, setSorting] = useState([{ id: 'name', desc: '' }]);
|
||||
const [sorting, setSorting] = useState([{ id: 'name', desc: false }]);
|
||||
const [selectedStreamIds, setSelectedStreamIds] = useState([]);
|
||||
|
||||
// Channel numbering modal state
|
||||
|
|
@ -299,6 +299,7 @@ const StreamsTable = () => {
|
|||
),
|
||||
},
|
||||
{
|
||||
header: 'Group',
|
||||
id: 'group',
|
||||
accessorFn: (row) =>
|
||||
channelGroups[row.channel_group]
|
||||
|
|
@ -320,6 +321,7 @@ const StreamsTable = () => {
|
|||
),
|
||||
},
|
||||
{
|
||||
header: 'M3U',
|
||||
id: 'm3u',
|
||||
size: columnSizing.m3u || 150,
|
||||
accessorFn: (row) =>
|
||||
|
|
@ -386,7 +388,14 @@ const StreamsTable = () => {
|
|||
|
||||
// Apply sorting
|
||||
if (sorting.length > 0) {
|
||||
const sortField = sorting[0].id;
|
||||
const columnId = sorting[0].id;
|
||||
// Map frontend column IDs to backend field names
|
||||
const fieldMapping = {
|
||||
name: 'name',
|
||||
group: 'channel_group__name',
|
||||
m3u: 'm3u_account__name',
|
||||
};
|
||||
const sortField = fieldMapping[columnId] || columnId;
|
||||
const sortDirection = sorting[0].desc ? '-' : '';
|
||||
params.append('ordering', `${sortDirection}${sortField}`);
|
||||
}
|
||||
|
|
@ -692,8 +701,8 @@ const StreamsTable = () => {
|
|||
const sortField = sorting[0]?.id;
|
||||
const sortDirection = sorting[0]?.desc;
|
||||
|
||||
if (sortField == column) {
|
||||
if (sortDirection == false) {
|
||||
if (sortField === column) {
|
||||
if (sortDirection === false) {
|
||||
setSorting([
|
||||
{
|
||||
id: column,
|
||||
|
|
@ -701,7 +710,8 @@ const StreamsTable = () => {
|
|||
},
|
||||
]);
|
||||
} else {
|
||||
setSorting([]);
|
||||
// Reset to default sort (name ascending) instead of clearing
|
||||
setSorting([{ id: 'name', desc: false }]);
|
||||
}
|
||||
} else {
|
||||
setSorting([
|
||||
|
|
@ -726,7 +736,7 @@ const StreamsTable = () => {
|
|||
switch (header.id) {
|
||||
case 'name':
|
||||
return (
|
||||
<Flex gap="sm">
|
||||
<Flex align="center" style={{ width: '100%', flex: 1 }}>
|
||||
<TextInput
|
||||
name="name"
|
||||
placeholder="Name"
|
||||
|
|
@ -737,19 +747,23 @@ const StreamsTable = () => {
|
|||
variant="unstyled"
|
||||
className="table-input-header"
|
||||
leftSection={<Search size={14} opacity={0.5} />}
|
||||
/>
|
||||
<Center>
|
||||
{React.createElement(sortingIcon, {
|
||||
onClick: () => onSortingChange('name'),
|
||||
style={{ flex: 1, minWidth: 0 }}
|
||||
rightSectionPointerEvents="auto"
|
||||
rightSection={React.createElement(sortingIcon, {
|
||||
onClick: (e) => {
|
||||
e.stopPropagation();
|
||||
onSortingChange('name');
|
||||
},
|
||||
size: 14,
|
||||
style: { cursor: 'pointer' },
|
||||
})}
|
||||
</Center>
|
||||
/>
|
||||
</Flex>
|
||||
);
|
||||
|
||||
case 'group':
|
||||
return (
|
||||
<Box onClick={handleSelectClick} style={{ width: '100%' }}>
|
||||
<Flex align="center" style={{ width: '100%', flex: 1 }}>
|
||||
<MultiSelect
|
||||
placeholder="Group"
|
||||
searchable
|
||||
|
|
@ -761,13 +775,23 @@ const StreamsTable = () => {
|
|||
variant="unstyled"
|
||||
className="table-input-header custom-multiselect"
|
||||
clearable
|
||||
style={{ flex: 1, minWidth: 0 }}
|
||||
rightSectionPointerEvents="auto"
|
||||
rightSection={React.createElement(sortingIcon, {
|
||||
onClick: (e) => {
|
||||
e.stopPropagation();
|
||||
onSortingChange('group');
|
||||
},
|
||||
size: 14,
|
||||
style: { cursor: 'pointer' },
|
||||
})}
|
||||
/>
|
||||
</Box>
|
||||
</Flex>
|
||||
);
|
||||
|
||||
case 'm3u':
|
||||
return (
|
||||
<Box onClick={handleSelectClick}>
|
||||
<Flex align="center" style={{ width: '100%', flex: 1 }}>
|
||||
<Select
|
||||
placeholder="M3U"
|
||||
searchable
|
||||
|
|
@ -782,8 +806,18 @@ const StreamsTable = () => {
|
|||
}))}
|
||||
variant="unstyled"
|
||||
className="table-input-header"
|
||||
style={{ flex: 1, minWidth: 0 }}
|
||||
rightSectionPointerEvents="auto"
|
||||
rightSection={React.createElement(sortingIcon, {
|
||||
onClick: (e) => {
|
||||
e.stopPropagation();
|
||||
onSortingChange('m3u');
|
||||
},
|
||||
size: 14,
|
||||
style: { cursor: 'pointer' },
|
||||
})}
|
||||
/>
|
||||
</Box>
|
||||
</Flex>
|
||||
);
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -278,7 +278,7 @@ const SettingsPage = () => {
|
|||
const networkAccessForm = useForm({
|
||||
mode: 'controlled',
|
||||
initialValues: Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => {
|
||||
acc[key] = '0.0.0.0/0,::0/0';
|
||||
acc[key] = '0.0.0.0/0,::/0';
|
||||
return acc;
|
||||
}, {}),
|
||||
validate: Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => {
|
||||
|
|
@ -358,7 +358,7 @@ const SettingsPage = () => {
|
|||
);
|
||||
networkAccessForm.setValues(
|
||||
Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => {
|
||||
acc[key] = networkAccessSettings[key] || '0.0.0.0/0,::0/0';
|
||||
acc[key] = networkAccessSettings[key] || '0.0.0.0/0,::/0';
|
||||
return acc;
|
||||
}, {})
|
||||
);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue