Merge pull request #574 from Dispatcharr/dev

Version 0.11.0
This commit is contained in:
SergeantPanda 2025-10-21 19:55:15 -05:00 committed by GitHub
commit d5de69cd6a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
53 changed files with 4269 additions and 798 deletions

View file

@ -0,0 +1,54 @@
# Generated migration to backfill stream_hash for existing custom streams
from django.db import migrations
import hashlib
def backfill_custom_stream_hashes(apps, schema_editor):
"""
Generate stream_hash for all custom streams that don't have one.
Uses stream ID to create a stable hash that won't change when name/url is edited.
"""
Stream = apps.get_model('dispatcharr_channels', 'Stream')
custom_streams_without_hash = Stream.objects.filter(
is_custom=True,
stream_hash__isnull=True
)
updated_count = 0
for stream in custom_streams_without_hash:
# Generate a stable hash using the stream's ID
# This ensures the hash never changes even if name/url is edited
unique_string = f"custom_stream_{stream.id}"
stream.stream_hash = hashlib.sha256(unique_string.encode()).hexdigest()
stream.save(update_fields=['stream_hash'])
updated_count += 1
if updated_count > 0:
print(f"Backfilled stream_hash for {updated_count} custom streams")
else:
print("No custom streams needed stream_hash backfill")
def reverse_backfill(apps, schema_editor):
"""
Reverse migration - clear stream_hash for custom streams.
Note: This will break preview functionality for custom streams.
"""
Stream = apps.get_model('dispatcharr_channels', 'Stream')
custom_streams = Stream.objects.filter(is_custom=True)
count = custom_streams.update(stream_hash=None)
print(f"Cleared stream_hash for {count} custom streams")
class Migration(migrations.Migration):
dependencies = [
('dispatcharr_channels', '0028_channel_created_at_channel_updated_at'),
]
operations = [
migrations.RunPython(backfill_custom_stream_hashes, reverse_backfill),
]

View file

@ -152,8 +152,14 @@ class Stream(models.Model):
stream = cls.objects.create(**fields_to_update)
return stream, True # True means it was created
# @TODO: honor stream's stream profile
def get_stream_profile(self):
"""
Get the stream profile for this stream.
Uses the stream's own profile if set, otherwise returns the default.
"""
if self.stream_profile:
return self.stream_profile
stream_profile = StreamProfile.objects.get(
id=CoreSettings.get_default_stream_profile_id()
)

View file

@ -45,6 +45,20 @@ def set_default_m3u_account(sender, instance, **kwargs):
else:
raise ValueError("No default M3UAccount found.")
@receiver(post_save, sender=Stream)
def generate_custom_stream_hash(sender, instance, created, **kwargs):
"""
Generate a stable stream_hash for custom streams after creation.
Uses the stream's ID to ensure the hash never changes even if name/url is edited.
"""
if instance.is_custom and not instance.stream_hash and created:
import hashlib
# Use stream ID for a stable, unique hash that never changes
unique_string = f"custom_stream_{instance.id}"
instance.stream_hash = hashlib.sha256(unique_string.encode()).hexdigest()
# Use update to avoid triggering signals again
Stream.objects.filter(id=instance.id).update(stream_hash=instance.stream_hash)
@receiver(post_save, sender=Channel)
def refresh_epg_programs(sender, instance, created, **kwargs):
"""

View file

@ -147,23 +147,37 @@ class EPGGridAPIView(APIView):
f"EPGGridAPIView: Found {count} program(s), including recently ended, currently running, and upcoming shows."
)
# Generate dummy programs for channels that have no EPG data
# Generate dummy programs for channels that have no EPG data OR dummy EPG sources
from apps.channels.models import Channel
from apps.epg.models import EPGSource
from django.db.models import Q
# Get channels with no EPG data
# Get channels with no EPG data at all (standard dummy)
channels_without_epg = Channel.objects.filter(Q(epg_data__isnull=True))
channels_count = channels_without_epg.count()
# Log more detailed information about channels missing EPG data
if channels_count > 0:
# Get channels with custom dummy EPG sources (generate on-demand with patterns)
channels_with_custom_dummy = Channel.objects.filter(
epg_data__epg_source__source_type='dummy'
).distinct()
# Log what we found
without_count = channels_without_epg.count()
custom_count = channels_with_custom_dummy.count()
if without_count > 0:
channel_names = [f"{ch.name} (ID: {ch.id})" for ch in channels_without_epg]
logger.warning(
f"EPGGridAPIView: Missing EPG data for these channels: {', '.join(channel_names)}"
logger.debug(
f"EPGGridAPIView: Channels needing standard dummy EPG: {', '.join(channel_names)}"
)
if custom_count > 0:
channel_names = [f"{ch.name} (ID: {ch.id})" for ch in channels_with_custom_dummy]
logger.debug(
f"EPGGridAPIView: Channels needing custom dummy EPG: {', '.join(channel_names)}"
)
logger.debug(
f"EPGGridAPIView: Found {channels_count} channels with no EPG data."
f"EPGGridAPIView: Found {without_count} channels needing standard dummy, {custom_count} needing custom dummy EPG."
)
# Serialize the regular programs
@ -205,12 +219,91 @@ class EPGGridAPIView(APIView):
# Generate and append dummy programs
dummy_programs = []
for channel in channels_without_epg:
# Use the channel UUID as tvg_id for dummy programs to match in the guide
# Import the function from output.views
from apps.output.views import generate_dummy_programs as gen_dummy_progs
# Handle channels with CUSTOM dummy EPG sources (with patterns)
for channel in channels_with_custom_dummy:
# For dummy EPGs, ALWAYS use channel UUID to ensure unique programs per channel
# This prevents multiple channels assigned to the same dummy EPG from showing identical data
# Each channel gets its own unique program data even if they share the same EPG source
dummy_tvg_id = str(channel.uuid)
try:
# Create programs every 4 hours for the next 24 hours
# Get the custom dummy EPG source
epg_source = channel.epg_data.epg_source if channel.epg_data else None
logger.debug(f"Generating custom dummy programs for channel: {channel.name} (ID: {channel.id})")
# Determine which name to parse based on custom properties
name_to_parse = channel.name
if epg_source and epg_source.custom_properties:
custom_props = epg_source.custom_properties
name_source = custom_props.get('name_source')
if name_source == 'stream':
# Get the stream index (1-based from user, convert to 0-based)
stream_index = custom_props.get('stream_index', 1) - 1
# Get streams ordered by channelstream order
channel_streams = channel.streams.all().order_by('channelstream__order')
if channel_streams.exists() and 0 <= stream_index < channel_streams.count():
stream = list(channel_streams)[stream_index]
name_to_parse = stream.name
logger.debug(f"Using stream name for parsing: {name_to_parse} (stream index: {stream_index})")
else:
logger.warning(f"Stream index {stream_index} not found for channel {channel.name}, falling back to channel name")
elif name_source == 'channel':
logger.debug(f"Using channel name for parsing: {name_to_parse}")
# Generate programs using custom patterns from the dummy EPG source
# Use the same tvg_id that will be set in the program data
generated = gen_dummy_progs(
channel_id=dummy_tvg_id,
channel_name=name_to_parse,
num_days=1,
program_length_hours=4,
epg_source=epg_source
)
# Custom dummy should always return data (either from patterns or fallback)
if generated:
logger.debug(f"Generated {len(generated)} custom dummy programs for {channel.name}")
# Convert generated programs to API format
for program in generated:
dummy_program = {
"id": f"dummy-custom-{channel.id}-{program['start_time'].hour}",
"epg": {"tvg_id": dummy_tvg_id, "name": channel.name},
"start_time": program['start_time'].isoformat(),
"end_time": program['end_time'].isoformat(),
"title": program['title'],
"description": program['description'],
"tvg_id": dummy_tvg_id,
"sub_title": None,
"custom_properties": None,
}
dummy_programs.append(dummy_program)
else:
logger.warning(f"No programs generated for custom dummy EPG channel: {channel.name}")
except Exception as e:
logger.error(
f"Error creating custom dummy programs for channel {channel.name} (ID: {channel.id}): {str(e)}"
)
# Handle channels with NO EPG data (standard dummy with humorous descriptions)
for channel in channels_without_epg:
# For channels with no EPG, use UUID to ensure uniqueness (matches frontend logic)
# The frontend uses: tvgRecord?.tvg_id ?? channel.uuid
# Since there's no EPG data, it will fall back to UUID
dummy_tvg_id = str(channel.uuid)
try:
logger.debug(f"Generating standard dummy programs for channel: {channel.name} (ID: {channel.id})")
# Create programs every 4 hours for the next 24 hours with humorous descriptions
for hour_offset in range(0, 24, 4):
# Use timedelta for time arithmetic instead of replace() to avoid hour overflow
start_time = now + timedelta(hours=hour_offset)
@ -238,7 +331,7 @@ class EPGGridAPIView(APIView):
# Create a dummy program in the same format as regular programs
dummy_program = {
"id": f"dummy-{channel.id}-{hour_offset}", # Create a unique ID
"id": f"dummy-standard-{channel.id}-{hour_offset}",
"epg": {"tvg_id": dummy_tvg_id, "name": channel.name},
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
@ -252,7 +345,7 @@ class EPGGridAPIView(APIView):
except Exception as e:
logger.error(
f"Error creating dummy programs for channel {channel.name} (ID: {channel.id}): {str(e)}"
f"Error creating standard dummy programs for channel {channel.name} (ID: {channel.id}): {str(e)}"
)
# Combine regular and dummy programs
@ -284,7 +377,22 @@ class EPGImportAPIView(APIView):
)
def post(self, request, format=None):
logger.info("EPGImportAPIView: Received request to import EPG data.")
refresh_epg_data.delay(request.data.get("id", None)) # Trigger Celery task
epg_id = request.data.get("id", None)
# Check if this is a dummy EPG source
try:
from .models import EPGSource
epg_source = EPGSource.objects.get(id=epg_id)
if epg_source.source_type == 'dummy':
logger.info(f"EPGImportAPIView: Skipping refresh for dummy EPG source {epg_id}")
return Response(
{"success": False, "message": "Dummy EPG sources do not require refreshing."},
status=status.HTTP_400_BAD_REQUEST,
)
except EPGSource.DoesNotExist:
pass # Let the task handle the missing source
refresh_epg_data.delay(epg_id) # Trigger Celery task
logger.info("EPGImportAPIView: Task dispatched to refresh EPG data.")
return Response(
{"success": True, "message": "EPG data import initiated."},
@ -308,3 +416,4 @@ class EPGDataViewSet(viewsets.ReadOnlyModelViewSet):
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]

View file

@ -0,0 +1,23 @@
# Generated by Django 5.2.4 on 2025-10-17 17:02
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('epg', '0017_alter_epgsource_url'),
]
operations = [
migrations.AddField(
model_name='epgsource',
name='custom_properties',
field=models.JSONField(blank=True, default=dict, help_text='Custom properties for dummy EPG configuration (regex patterns, timezone, duration, etc.)', null=True),
),
migrations.AlterField(
model_name='epgsource',
name='source_type',
field=models.CharField(choices=[('xmltv', 'XMLTV URL'), ('schedules_direct', 'Schedules Direct API'), ('dummy', 'Custom Dummy EPG')], max_length=20),
),
]

View file

@ -8,6 +8,7 @@ class EPGSource(models.Model):
SOURCE_TYPE_CHOICES = [
('xmltv', 'XMLTV URL'),
('schedules_direct', 'Schedules Direct API'),
('dummy', 'Custom Dummy EPG'),
]
STATUS_IDLE = 'idle'
@ -38,6 +39,12 @@ class EPGSource(models.Model):
refresh_task = models.ForeignKey(
PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True
)
custom_properties = models.JSONField(
default=dict,
blank=True,
null=True,
help_text="Custom properties for dummy EPG configuration (regex patterns, timezone, duration, etc.)"
)
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,

View file

@ -28,6 +28,7 @@ class EPGSourceSerializer(serializers.ModelSerializer):
'last_message',
'created_at',
'updated_at',
'custom_properties',
'epg_data_ids'
]

View file

@ -1,9 +1,9 @@
from django.db.models.signals import post_save, post_delete, pre_save
from django.dispatch import receiver
from .models import EPGSource
from .models import EPGSource, EPGData
from .tasks import refresh_epg_data, delete_epg_refresh_task_by_id
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from core.utils import is_protected_path
from core.utils import is_protected_path, send_websocket_update
import json
import logging
import os
@ -12,15 +12,77 @@ logger = logging.getLogger(__name__)
@receiver(post_save, sender=EPGSource)
def trigger_refresh_on_new_epg_source(sender, instance, created, **kwargs):
# Trigger refresh only if the source is newly created and active
if created and instance.is_active:
# Trigger refresh only if the source is newly created, active, and not a dummy EPG
if created and instance.is_active and instance.source_type != 'dummy':
refresh_epg_data.delay(instance.id)
@receiver(post_save, sender=EPGSource)
def create_dummy_epg_data(sender, instance, created, **kwargs):
"""
Automatically create EPGData for dummy EPG sources when they are created.
This allows channels to be assigned to dummy EPGs immediately without
requiring a refresh first.
"""
if instance.source_type == 'dummy':
# Ensure dummy EPGs always have idle status and no status message
if instance.status != EPGSource.STATUS_IDLE or instance.last_message:
instance.status = EPGSource.STATUS_IDLE
instance.last_message = None
instance.save(update_fields=['status', 'last_message'])
# Create a URL-friendly tvg_id from the dummy EPG name
# Replace spaces and special characters with underscores
friendly_tvg_id = instance.name.replace(' ', '_').replace('-', '_')
# Remove any characters that aren't alphanumeric or underscores
friendly_tvg_id = ''.join(c for c in friendly_tvg_id if c.isalnum() or c == '_')
# Convert to lowercase for consistency
friendly_tvg_id = friendly_tvg_id.lower()
# Prefix with 'dummy_' to make it clear this is a dummy EPG
friendly_tvg_id = f"dummy_{friendly_tvg_id}"
# Create or update the EPGData record
epg_data, data_created = EPGData.objects.get_or_create(
tvg_id=friendly_tvg_id,
epg_source=instance,
defaults={
'name': instance.name,
'icon_url': None
}
)
# Update name if it changed and record already existed
if not data_created and epg_data.name != instance.name:
epg_data.name = instance.name
epg_data.save(update_fields=['name'])
if data_created:
logger.info(f"Auto-created EPGData for dummy EPG source: {instance.name} (ID: {instance.id})")
# Send websocket update to notify frontend that EPG data has been created
# This allows the channel form to immediately show the new dummy EPG without refreshing
send_websocket_update('updates', 'update', {
'type': 'epg_data_created',
'source_id': instance.id,
'source_name': instance.name,
'epg_data_id': epg_data.id
})
else:
logger.debug(f"EPGData already exists for dummy EPG source: {instance.name} (ID: {instance.id})")
@receiver(post_save, sender=EPGSource)
def create_or_update_refresh_task(sender, instance, **kwargs):
"""
Create or update a Celery Beat periodic task when an EPGSource is created/updated.
Skip creating tasks for dummy EPG sources as they don't need refreshing.
"""
# Skip task creation for dummy EPGs
if instance.source_type == 'dummy':
# If there's an existing task, disable it
if instance.refresh_task:
instance.refresh_task.enabled = False
instance.refresh_task.save(update_fields=['enabled'])
return
task_name = f"epg_source-refresh-{instance.id}"
interval, _ = IntervalSchedule.objects.get_or_create(
every=int(instance.refresh_interval),
@ -80,7 +142,14 @@ def delete_refresh_task(sender, instance, **kwargs):
def update_status_on_active_change(sender, instance, **kwargs):
"""
When an EPGSource's is_active field changes, update the status accordingly.
For dummy EPGs, always ensure status is idle and no status message.
"""
# Dummy EPGs should always be idle with no status message
if instance.source_type == 'dummy':
instance.status = EPGSource.STATUS_IDLE
instance.last_message = None
return
if instance.pk: # Only for existing records, not new ones
try:
# Get the current record from the database

View file

@ -133,8 +133,9 @@ def delete_epg_refresh_task_by_id(epg_id):
@shared_task
def refresh_all_epg_data():
logger.info("Starting refresh_epg_data task.")
active_sources = EPGSource.objects.filter(is_active=True)
logger.debug(f"Found {active_sources.count()} active EPGSource(s).")
# Exclude dummy EPG sources from refresh - they don't need refreshing
active_sources = EPGSource.objects.filter(is_active=True).exclude(source_type='dummy')
logger.debug(f"Found {active_sources.count()} active EPGSource(s) (excluding dummy EPGs).")
for source in active_sources:
refresh_epg_data(source.id)
@ -180,6 +181,13 @@ def refresh_epg_data(source_id):
gc.collect()
return
# Skip refresh for dummy EPG sources - they don't need refreshing
if source.source_type == 'dummy':
logger.info(f"Skipping refresh for dummy EPG source {source.name} (ID: {source_id})")
release_task_lock('refresh_epg_data', source_id)
gc.collect()
return
# Continue with the normal processing...
logger.info(f"Processing EPGSource: {source.name} (type: {source.source_type})")
if source.source_type == 'xmltv':
@ -1943,3 +1951,20 @@ def detect_file_format(file_path=None, content=None):
# If we reach here, we couldn't reliably determine the format
return format_type, is_compressed, file_extension
def generate_dummy_epg(source):
"""
DEPRECATED: This function is no longer used.
Dummy EPG programs are now generated on-demand when they are requested
(during XMLTV export or EPG grid display), rather than being pre-generated
and stored in the database.
See: apps/output/views.py - generate_custom_dummy_programs()
This function remains for backward compatibility but should not be called.
"""
logger.warning(f"generate_dummy_epg() called for {source.name} but this function is deprecated. "
f"Dummy EPG programs are now generated on-demand.")
return True

View file

@ -136,6 +136,9 @@ class M3UAccountSerializer(serializers.ModelSerializer):
validators=[validate_flexible_url],
)
enable_vod = serializers.BooleanField(required=False, write_only=True)
auto_enable_new_groups_live = serializers.BooleanField(required=False, write_only=True)
auto_enable_new_groups_vod = serializers.BooleanField(required=False, write_only=True)
auto_enable_new_groups_series = serializers.BooleanField(required=False, write_only=True)
class Meta:
model = M3UAccount
@ -164,6 +167,9 @@ class M3UAccountSerializer(serializers.ModelSerializer):
"status",
"last_message",
"enable_vod",
"auto_enable_new_groups_live",
"auto_enable_new_groups_vod",
"auto_enable_new_groups_series",
]
extra_kwargs = {
"password": {
@ -175,23 +181,36 @@ class M3UAccountSerializer(serializers.ModelSerializer):
def to_representation(self, instance):
data = super().to_representation(instance)
# Parse custom_properties to get VOD preference
# Parse custom_properties to get VOD preference and auto_enable_new_groups settings
custom_props = instance.custom_properties or {}
data["enable_vod"] = custom_props.get("enable_vod", False)
data["auto_enable_new_groups_live"] = custom_props.get("auto_enable_new_groups_live", True)
data["auto_enable_new_groups_vod"] = custom_props.get("auto_enable_new_groups_vod", True)
data["auto_enable_new_groups_series"] = custom_props.get("auto_enable_new_groups_series", True)
return data
def update(self, instance, validated_data):
# Handle enable_vod preference
# Handle enable_vod preference and auto_enable_new_groups settings
enable_vod = validated_data.pop("enable_vod", None)
auto_enable_new_groups_live = validated_data.pop("auto_enable_new_groups_live", None)
auto_enable_new_groups_vod = validated_data.pop("auto_enable_new_groups_vod", None)
auto_enable_new_groups_series = validated_data.pop("auto_enable_new_groups_series", None)
# Get existing custom_properties
custom_props = instance.custom_properties or {}
# Update preferences
if enable_vod is not None:
# Get existing custom_properties
custom_props = instance.custom_properties or {}
# Update VOD preference
custom_props["enable_vod"] = enable_vod
validated_data["custom_properties"] = custom_props
if auto_enable_new_groups_live is not None:
custom_props["auto_enable_new_groups_live"] = auto_enable_new_groups_live
if auto_enable_new_groups_vod is not None:
custom_props["auto_enable_new_groups_vod"] = auto_enable_new_groups_vod
if auto_enable_new_groups_series is not None:
custom_props["auto_enable_new_groups_series"] = auto_enable_new_groups_series
validated_data["custom_properties"] = custom_props
# Pop out channel group memberships so we can handle them manually
channel_group_data = validated_data.pop("channel_group", [])
@ -225,14 +244,20 @@ class M3UAccountSerializer(serializers.ModelSerializer):
return instance
def create(self, validated_data):
# Handle enable_vod preference during creation
# Handle enable_vod preference and auto_enable_new_groups settings during creation
enable_vod = validated_data.pop("enable_vod", False)
auto_enable_new_groups_live = validated_data.pop("auto_enable_new_groups_live", True)
auto_enable_new_groups_vod = validated_data.pop("auto_enable_new_groups_vod", True)
auto_enable_new_groups_series = validated_data.pop("auto_enable_new_groups_series", True)
# Parse existing custom_properties or create new
custom_props = validated_data.get("custom_properties", {})
# Set VOD preference
# Set preferences (default to True for auto_enable_new_groups)
custom_props["enable_vod"] = enable_vod
custom_props["auto_enable_new_groups_live"] = auto_enable_new_groups_live
custom_props["auto_enable_new_groups_vod"] = auto_enable_new_groups_vod
custom_props["auto_enable_new_groups_series"] = auto_enable_new_groups_series
validated_data["custom_properties"] = custom_props
return super().create(validated_data)

View file

@ -488,25 +488,29 @@ def process_groups(account, groups):
}
logger.info(f"Currently {len(existing_groups)} existing groups")
group_objs = []
# Check if we should auto-enable new groups based on account settings
account_custom_props = account.custom_properties or {}
auto_enable_new_groups_live = account_custom_props.get("auto_enable_new_groups_live", True)
# Separate existing groups from groups that need to be created
existing_group_objs = []
groups_to_create = []
for group_name, custom_props in groups.items():
logger.debug(f"Handling group for M3U account {account.id}: {group_name}")
if group_name not in existing_groups:
groups_to_create.append(
ChannelGroup(
name=group_name,
)
)
if group_name in existing_groups:
existing_group_objs.append(existing_groups[group_name])
else:
group_objs.append(existing_groups[group_name])
groups_to_create.append(ChannelGroup(name=group_name))
# Create new groups and fetch them back with IDs
newly_created_group_objs = []
if groups_to_create:
logger.debug(f"Creating {len(groups_to_create)} groups")
created = ChannelGroup.bulk_create_and_fetch(groups_to_create)
logger.debug(f"Created {len(created)} groups")
group_objs.extend(created)
logger.info(f"Creating {len(groups_to_create)} new groups for account {account.id}")
newly_created_group_objs = list(ChannelGroup.bulk_create_and_fetch(groups_to_create))
logger.debug(f"Successfully created {len(newly_created_group_objs)} new groups")
# Combine all groups
all_group_objs = existing_group_objs + newly_created_group_objs
# Get existing relationships for this account
existing_relationships = {
@ -536,7 +540,7 @@ def process_groups(account, groups):
relations_to_delete.append(rel)
logger.debug(f"Marking relationship for deletion: group '{group_name}' no longer exists in source for account {account.id}")
for group in group_objs:
for group in all_group_objs:
custom_props = groups.get(group.name, {})
if group.name in existing_relationships:
@ -566,35 +570,17 @@ def process_groups(account, groups):
else:
logger.debug(f"xc_id unchanged for group '{group.name}' - account {account.id}")
else:
# Create new relationship - but check if there's an existing relationship that might have user settings
# This can happen if the group was temporarily removed and is now back
try:
potential_existing = ChannelGroupM3UAccount.objects.filter(
m3u_account=account,
channel_group=group
).first()
# Create new relationship - this group is new to this M3U account
# Use the auto_enable setting to determine if it should start enabled
if not auto_enable_new_groups_live:
logger.info(f"Group '{group.name}' is new to account {account.id} - creating relationship but DISABLED (auto_enable_new_groups_live=False)")
if potential_existing:
# Merge with existing custom properties to preserve user settings
existing_custom_props = potential_existing.custom_properties or {}
# Merge new properties with existing ones
merged_custom_props = existing_custom_props.copy()
merged_custom_props.update(custom_props)
custom_props = merged_custom_props
logger.debug(f"Merged custom properties for existing relationship: group '{group.name}' - account {account.id}")
except Exception as e:
logger.debug(f"Could not check for existing relationship: {str(e)}")
# Fall back to using just the new custom properties
pass
# Create new relationship
relations_to_create.append(
ChannelGroupM3UAccount(
channel_group=group,
m3u_account=account,
custom_properties=custom_props,
enabled=True, # Default to enabled
enabled=auto_enable_new_groups_live,
)
)
@ -1562,7 +1548,7 @@ def sync_auto_channels(account_id, scan_start_time=None):
# Get force_dummy_epg, group_override, and regex patterns from group custom_properties
group_custom_props = {}
force_dummy_epg = False
force_dummy_epg = False # Backward compatibility: legacy option to disable EPG
override_group_id = None
name_regex_pattern = None
name_replace_pattern = None
@ -1571,6 +1557,8 @@ def sync_auto_channels(account_id, scan_start_time=None):
channel_sort_order = None
channel_sort_reverse = False
stream_profile_id = None
custom_logo_id = None
custom_epg_id = None # New option: select specific EPG source (takes priority over force_dummy_epg)
if group_relation.custom_properties:
group_custom_props = group_relation.custom_properties
force_dummy_epg = group_custom_props.get("force_dummy_epg", False)
@ -1581,11 +1569,13 @@ def sync_auto_channels(account_id, scan_start_time=None):
)
name_match_regex = group_custom_props.get("name_match_regex")
channel_profile_ids = group_custom_props.get("channel_profile_ids")
custom_epg_id = group_custom_props.get("custom_epg_id")
channel_sort_order = group_custom_props.get("channel_sort_order")
channel_sort_reverse = group_custom_props.get(
"channel_sort_reverse", False
)
stream_profile_id = group_custom_props.get("stream_profile_id")
custom_logo_id = group_custom_props.get("custom_logo_id")
# Determine which group to use for created channels
target_group = channel_group
@ -1840,7 +1830,25 @@ def sync_auto_channels(account_id, scan_start_time=None):
# Handle logo updates
current_logo = None
if stream.logo_url:
if custom_logo_id:
# Use the custom logo specified in group settings
from apps.channels.models import Logo
try:
current_logo = Logo.objects.get(id=custom_logo_id)
except Logo.DoesNotExist:
logger.warning(
f"Custom logo with ID {custom_logo_id} not found for existing channel, falling back to stream logo"
)
# Fall back to stream logo if custom logo not found
if stream.logo_url:
current_logo, _ = Logo.objects.get_or_create(
url=stream.logo_url,
defaults={
"name": stream.name or stream.tvg_id or "Unknown"
},
)
elif stream.logo_url:
# No custom logo configured, use stream logo
from apps.channels.models import Logo
current_logo, _ = Logo.objects.get_or_create(
@ -1856,10 +1864,42 @@ def sync_auto_channels(account_id, scan_start_time=None):
# Handle EPG data updates
current_epg_data = None
if stream.tvg_id and not force_dummy_epg:
if custom_epg_id:
# Use the custom EPG specified in group settings (e.g., a dummy EPG)
from apps.epg.models import EPGSource
try:
epg_source = EPGSource.objects.get(id=custom_epg_id)
# For dummy EPGs, select the first (and typically only) EPGData entry from this source
if epg_source.source_type == 'dummy':
current_epg_data = EPGData.objects.filter(
epg_source=epg_source
).first()
if not current_epg_data:
logger.warning(
f"No EPGData found for dummy EPG source {epg_source.name} (ID: {custom_epg_id})"
)
else:
# For non-dummy sources, try to find existing EPGData by tvg_id
if stream.tvg_id:
current_epg_data = EPGData.objects.filter(
tvg_id=stream.tvg_id,
epg_source=epg_source
).first()
except EPGSource.DoesNotExist:
logger.warning(
f"Custom EPG source with ID {custom_epg_id} not found for existing channel, falling back to auto-match"
)
# Fall back to auto-match by tvg_id
if stream.tvg_id and not force_dummy_epg:
current_epg_data = EPGData.objects.filter(
tvg_id=stream.tvg_id
).first()
elif stream.tvg_id and not force_dummy_epg:
# Auto-match EPG by tvg_id (original behavior)
current_epg_data = EPGData.objects.filter(
tvg_id=stream.tvg_id
).first()
# If force_dummy_epg is True and no custom_epg_id, current_epg_data stays None
if existing_channel.epg_data != current_epg_data:
existing_channel.epg_data = current_epg_data
@ -1949,19 +1989,81 @@ def sync_auto_channels(account_id, scan_start_time=None):
ChannelProfileMembership.objects.bulk_create(memberships)
# Try to match EPG data
if stream.tvg_id and not force_dummy_epg:
if custom_epg_id:
# Use the custom EPG specified in group settings (e.g., a dummy EPG)
from apps.epg.models import EPGSource
try:
epg_source = EPGSource.objects.get(id=custom_epg_id)
# For dummy EPGs, select the first (and typically only) EPGData entry from this source
if epg_source.source_type == 'dummy':
epg_data = EPGData.objects.filter(
epg_source=epg_source
).first()
if epg_data:
channel.epg_data = epg_data
channel.save(update_fields=["epg_data"])
else:
logger.warning(
f"No EPGData found for dummy EPG source {epg_source.name} (ID: {custom_epg_id})"
)
else:
# For non-dummy sources, try to find existing EPGData by tvg_id
if stream.tvg_id:
epg_data = EPGData.objects.filter(
tvg_id=stream.tvg_id,
epg_source=epg_source
).first()
if epg_data:
channel.epg_data = epg_data
channel.save(update_fields=["epg_data"])
except EPGSource.DoesNotExist:
logger.warning(
f"Custom EPG source with ID {custom_epg_id} not found, falling back to auto-match"
)
# Fall back to auto-match by tvg_id
if stream.tvg_id and not force_dummy_epg:
epg_data = EPGData.objects.filter(
tvg_id=stream.tvg_id
).first()
if epg_data:
channel.epg_data = epg_data
channel.save(update_fields=["epg_data"])
elif stream.tvg_id and not force_dummy_epg:
# Auto-match EPG by tvg_id (original behavior)
epg_data = EPGData.objects.filter(
tvg_id=stream.tvg_id
).first()
if epg_data:
channel.epg_data = epg_data
channel.save(update_fields=["epg_data"])
elif stream.tvg_id and force_dummy_epg:
elif force_dummy_epg:
# Force dummy EPG with no custom EPG selected (set to None)
channel.epg_data = None
channel.save(update_fields=["epg_data"])
# Handle logo
if stream.logo_url:
if custom_logo_id:
# Use the custom logo specified in group settings
from apps.channels.models import Logo
try:
custom_logo = Logo.objects.get(id=custom_logo_id)
channel.logo = custom_logo
channel.save(update_fields=["logo"])
except Logo.DoesNotExist:
logger.warning(
f"Custom logo with ID {custom_logo_id} not found, falling back to stream logo"
)
# Fall back to stream logo if custom logo not found
if stream.logo_url:
logo, _ = Logo.objects.get_or_create(
url=stream.logo_url,
defaults={
"name": stream.name or stream.tvg_id or "Unknown"
},
)
channel.logo = logo
channel.save(update_fields=["logo"])
elif stream.logo_url:
from apps.channels.models import Logo
logo, _ = Logo.objects.get_or_create(

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,6 @@
"""Shared configuration between proxy types"""
import time
from django.db import connection
class BaseConfig:
DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20' # Will only be used if connection to settings fail
@ -12,13 +14,29 @@ class BaseConfig:
BUFFERING_TIMEOUT = 15 # Seconds to wait for buffering before switching streams
BUFFER_SPEED = 1 # What speed to condsider the stream buffering, 1x is normal speed, 2x is double speed, etc.
# Cache for proxy settings (class-level, shared across all instances)
_proxy_settings_cache = None
_proxy_settings_cache_time = 0
_proxy_settings_cache_ttl = 10 # Cache for 10 seconds
@classmethod
def get_proxy_settings(cls):
"""Get proxy settings from CoreSettings JSON data with fallback to defaults"""
"""Get proxy settings from CoreSettings JSON data with fallback to defaults (cached)"""
# Check if cache is still valid
now = time.time()
if cls._proxy_settings_cache is not None and (now - cls._proxy_settings_cache_time) < cls._proxy_settings_cache_ttl:
return cls._proxy_settings_cache
# Cache miss or expired - fetch from database
try:
from core.models import CoreSettings
return CoreSettings.get_proxy_settings()
settings = CoreSettings.get_proxy_settings()
cls._proxy_settings_cache = settings
cls._proxy_settings_cache_time = now
return settings
except Exception:
# Return defaults if database query fails
return {
"buffering_timeout": 15,
"buffering_speed": 1.0,
@ -26,6 +44,13 @@ class BaseConfig:
"channel_shutdown_delay": 0,
"channel_init_grace_period": 5,
}
finally:
# Always close the connection after reading settings
try:
connection.close()
except Exception:
pass
@classmethod
def get_redis_chunk_ttl(cls):

View file

@ -8,7 +8,7 @@ import gevent
from typing import Set, Optional
from apps.proxy.config import TSConfig as Config
from redis.exceptions import ConnectionError, TimeoutError
from .constants import EventType
from .constants import EventType, ChannelState, ChannelMetadataField
from .config_helper import ConfigHelper
from .redis_keys import RedisKeys
from .utils import get_logger
@ -26,6 +26,7 @@ class ClientManager:
self.lock = threading.Lock()
self.last_active_time = time.time()
self.worker_id = worker_id # Store worker ID as instance variable
self._heartbeat_running = True # Flag to control heartbeat thread
# STANDARDIZED KEYS: Move client set under channel namespace
self.client_set_key = RedisKeys.clients(channel_id)
@ -77,56 +78,28 @@ class ClientManager:
logger.debug(f"Failed to trigger stats update: {e}")
def _start_heartbeat_thread(self):
"""Start thread to regularly refresh client presence in Redis"""
"""Start thread to regularly refresh client presence in Redis for local clients"""
def heartbeat_task():
no_clients_count = 0 # Track consecutive empty cycles
max_empty_cycles = 3 # Exit after this many consecutive empty checks
logger.debug(f"Started heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
while True:
while self._heartbeat_running:
try:
# Wait for the interval
gevent.sleep(self.heartbeat_interval)
# Wait for the interval, but check stop flag frequently for quick shutdown
# Sleep in 1-second increments to allow faster response to stop signal
for _ in range(int(self.heartbeat_interval)):
if not self._heartbeat_running:
break
time.sleep(1)
# Final check before doing work
if not self._heartbeat_running:
break
# Send heartbeat for all local clients
with self.lock:
if not self.clients or not self.redis_client:
# No clients left, increment our counter
no_clients_count += 1
# Check if we're in a shutdown delay period before exiting
in_shutdown_delay = False
if self.redis_client:
try:
disconnect_key = RedisKeys.last_client_disconnect(self.channel_id)
disconnect_time_bytes = self.redis_client.get(disconnect_key)
if disconnect_time_bytes:
disconnect_time = float(disconnect_time_bytes.decode('utf-8'))
elapsed = time.time() - disconnect_time
shutdown_delay = ConfigHelper.channel_shutdown_delay()
if elapsed < shutdown_delay:
in_shutdown_delay = True
logger.debug(f"Channel {self.channel_id} in shutdown delay: {elapsed:.1f}s of {shutdown_delay}s elapsed")
except Exception as e:
logger.debug(f"Error checking shutdown delay: {e}")
# Only exit if we've seen no clients for several consecutive checks AND we're not in shutdown delay
if no_clients_count >= max_empty_cycles and not in_shutdown_delay:
logger.info(f"No clients for channel {self.channel_id} after {no_clients_count} consecutive checks and not in shutdown delay, exiting heartbeat thread")
return # This exits the thread
# Skip this cycle if we have no clients but continue if in shutdown delay
if not in_shutdown_delay:
continue
else:
# Reset counter during shutdown delay to prevent premature exit
no_clients_count = 0
continue
else:
# Reset counter when we see clients
no_clients_count = 0
# Skip this cycle if we have no local clients
if not self.clients:
continue
# IMPROVED GHOST DETECTION: Check for stale clients before sending heartbeats
current_time = time.time()
@ -197,11 +170,20 @@ class ClientManager:
except Exception as e:
logger.error(f"Error in client heartbeat thread: {e}")
logger.debug(f"Heartbeat thread exiting for channel {self.channel_id}")
thread = threading.Thread(target=heartbeat_task, daemon=True)
thread.name = f"client-heartbeat-{self.channel_id}"
thread.start()
logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
def stop(self):
"""Stop the heartbeat thread and cleanup"""
logger.debug(f"Stopping ClientManager for channel {self.channel_id}")
self._heartbeat_running = False
# Give the thread a moment to exit gracefully
# Note: We don't join() here because it's a daemon thread and will exit on its own
def _execute_redis_command(self, command_func):
"""Execute Redis command with error handling"""
if not self.redis_client:

View file

@ -100,3 +100,12 @@ class ConfigHelper:
def channel_init_grace_period():
"""Get channel initialization grace period in seconds"""
return Config.get_channel_init_grace_period()
@staticmethod
def chunk_timeout():
"""
Get chunk timeout in seconds (used for both socket and HTTP read timeouts).
This controls how long we wait for each chunk before timing out.
Set this higher (e.g., 30s) for slow providers that may have intermittent delays.
"""
return ConfigHelper.get('CHUNK_TIMEOUT', 5) # Default 5 seconds

View file

@ -0,0 +1,138 @@
"""
HTTP Stream Reader - Thread-based HTTP stream reader that writes to a pipe.
This allows us to use the same fetch_chunk() path for both transcode and HTTP streams.
"""
import threading
import os
import requests
from requests.adapters import HTTPAdapter
from .utils import get_logger
logger = get_logger()
class HTTPStreamReader:
"""Thread-based HTTP stream reader that writes to a pipe"""
def __init__(self, url, user_agent=None, chunk_size=8192):
self.url = url
self.user_agent = user_agent
self.chunk_size = chunk_size
self.session = None
self.response = None
self.thread = None
self.pipe_read = None
self.pipe_write = None
self.running = False
def start(self):
"""Start the HTTP stream reader thread"""
# Create a pipe (works on Windows and Unix)
self.pipe_read, self.pipe_write = os.pipe()
# Start the reader thread
self.running = True
self.thread = threading.Thread(target=self._read_stream, daemon=True)
self.thread.start()
logger.info(f"Started HTTP stream reader thread for {self.url}")
return self.pipe_read
def _read_stream(self):
"""Thread worker that reads HTTP stream and writes to pipe"""
try:
# Build headers
headers = {}
if self.user_agent:
headers['User-Agent'] = self.user_agent
logger.info(f"HTTP reader connecting to {self.url}")
# Create session
self.session = requests.Session()
# Disable retries for faster failure detection
adapter = HTTPAdapter(max_retries=0, pool_connections=1, pool_maxsize=1)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# Stream the URL
self.response = self.session.get(
self.url,
headers=headers,
stream=True,
timeout=(5, 30) # 5s connect, 30s read
)
if self.response.status_code != 200:
logger.error(f"HTTP {self.response.status_code} from {self.url}")
return
logger.info(f"HTTP reader connected successfully, streaming data...")
# Stream chunks to pipe
chunk_count = 0
for chunk in self.response.iter_content(chunk_size=self.chunk_size):
if not self.running:
break
if chunk:
try:
# Write binary data to pipe
os.write(self.pipe_write, chunk)
chunk_count += 1
# Log progress periodically
if chunk_count % 1000 == 0:
logger.debug(f"HTTP reader streamed {chunk_count} chunks")
except OSError as e:
logger.error(f"Pipe write error: {e}")
break
logger.info("HTTP stream ended")
except requests.exceptions.RequestException as e:
logger.error(f"HTTP reader request error: {e}")
except Exception as e:
logger.error(f"HTTP reader unexpected error: {e}", exc_info=True)
finally:
self.running = False
# Close write end of pipe to signal EOF
try:
if self.pipe_write is not None:
os.close(self.pipe_write)
self.pipe_write = None
except:
pass
def stop(self):
"""Stop the HTTP stream reader"""
logger.info("Stopping HTTP stream reader")
self.running = False
# Close response
if self.response:
try:
self.response.close()
except:
pass
# Close session
if self.session:
try:
self.session.close()
except:
pass
# Close write end of pipe
if self.pipe_write is not None:
try:
os.close(self.pipe_write)
self.pipe_write = None
except:
pass
# Wait for thread
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)

View file

@ -131,6 +131,8 @@ class ProxyServer:
max_retries = 10
base_retry_delay = 1 # Start with 1 second delay
max_retry_delay = 30 # Cap at 30 seconds
pubsub_client = None
pubsub = None
while True:
try:
@ -339,20 +341,27 @@ class ProxyServer:
logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})")
gevent.sleep(final_delay) # REPLACE: time.sleep(final_delay)
# Try to clean up the old connection
try:
if 'pubsub' in locals():
pubsub.close()
if 'pubsub_client' in locals():
pubsub_client.close()
except:
pass
except Exception as e:
logger.error(f"Error in event listener: {e}")
# Add a short delay to prevent rapid retries on persistent errors
gevent.sleep(5) # REPLACE: time.sleep(5)
finally:
# Always clean up PubSub connections in all error paths
try:
if pubsub:
pubsub.close()
pubsub = None
except Exception as e:
logger.debug(f"Error closing pubsub: {e}")
try:
if pubsub_client:
pubsub_client.close()
pubsub_client = None
except Exception as e:
logger.debug(f"Error closing pubsub_client: {e}")
thread = threading.Thread(target=event_listener, daemon=True)
thread.name = "redis-event-listener"
thread.start()
@ -486,17 +495,18 @@ class ProxyServer:
)
return True
# Create buffer and client manager instances
buffer = StreamBuffer(channel_id, redis_client=self.redis_client)
client_manager = ClientManager(
channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
# Create buffer and client manager instances (or reuse if they exist)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Store in local tracking
self.stream_buffers[channel_id] = buffer
self.client_managers[channel_id] = client_manager
if channel_id not in self.client_managers:
client_manager = ClientManager(
channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# IMPROVED: Set initializing state in Redis BEFORE any other operations
if self.redis_client:
@ -550,13 +560,15 @@ class ProxyServer:
logger.info(f"Channel {channel_id} already owned by worker {current_owner}")
logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only")
# Create buffer but not stream manager
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create buffer but not stream manager (only if not already exists)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id and redis_client (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
@ -571,13 +583,15 @@ class ProxyServer:
# Another worker just acquired ownership
logger.info(f"Another worker just acquired ownership of channel {channel_id}")
# Create buffer but not stream manager
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create buffer but not stream manager (only if not already exists)
if channel_id not in self.stream_buffers:
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id and redis_client (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
@ -596,7 +610,7 @@ class ProxyServer:
if channel_user_agent:
metadata["user_agent"] = channel_user_agent
# CRITICAL FIX: Make sure stream_id is always set in metadata and properly logged
# Make sure stream_id is always set in metadata and properly logged
if channel_stream_id:
metadata["stream_id"] = str(channel_stream_id)
logger.info(f"Storing stream_id {channel_stream_id} in metadata for channel {channel_id}")
@ -632,13 +646,14 @@ class ProxyServer:
logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}")
self.stream_managers[channel_id] = stream_manager
# Create client manager with channel_id, redis_client AND worker_id
client_manager = ClientManager(
channel_id=channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# Create client manager with channel_id, redis_client AND worker_id (only if not already exists)
if channel_id not in self.client_managers:
client_manager = ClientManager(
channel_id=channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# Start stream manager thread only for the owner
thread = threading.Thread(target=stream_manager.run, daemon=True)
@ -846,6 +861,10 @@ class ProxyServer:
# Clean up client manager - SAFE CHECK HERE TOO
if channel_id in self.client_managers:
try:
client_manager = self.client_managers[channel_id]
# Stop the heartbeat thread before deleting
if hasattr(client_manager, 'stop'):
client_manager.stop()
del self.client_managers[channel_id]
logger.info(f"Removed client manager for channel {channel_id}")
except KeyError:

View file

@ -597,31 +597,40 @@ class ChannelService:
@staticmethod
def _update_stream_stats_in_db(stream_id, **stats):
"""Update stream stats in database"""
from django.db import connection
try:
from apps.channels.models import Stream
from django.utils import timezone
stream = Stream.objects.get(id=stream_id)
# Get existing stats or create new dict
current_stats = stream.stream_stats or {}
# Update with new stats
for key, value in stats.items():
if value is not None:
current_stats[key] = value
# Save updated stats and timestamp
stream.stream_stats = current_stats
stream.stream_stats_updated_at = timezone.now()
stream.save(update_fields=['stream_stats', 'stream_stats_updated_at'])
logger.debug(f"Updated stream stats in database for stream {stream_id}: {stats}")
return True
except Exception as e:
logger.error(f"Error updating stream stats in database for stream {stream_id}: {e}")
return False
finally:
# Always close database connection after update
try:
connection.close()
except Exception:
pass
# Helper methods for Redis operations
@ -678,7 +687,7 @@ class ChannelService:
switch_request = {
"event": EventType.STREAM_SWITCH,
"channel_id": channel_id,
"channel_id": str(channel_id),
"url": new_url,
"user_agent": user_agent,
"stream_id": stream_id,
@ -703,7 +712,7 @@ class ChannelService:
stop_request = {
"event": EventType.CHANNEL_STOP,
"channel_id": channel_id,
"channel_id": str(channel_id),
"requester_worker_id": proxy_server.worker_id,
"timestamp": time.time()
}
@ -726,7 +735,7 @@ class ChannelService:
stop_request = {
"event": EventType.CLIENT_STOP,
"channel_id": channel_id,
"channel_id": str(channel_id),
"client_id": client_id,
"requester_worker_id": proxy_server.worker_id,
"timestamp": time.time()

View file

@ -303,6 +303,14 @@ class StreamBuffer:
# Retrieve chunks
chunks = self.get_chunks_exact(client_index, chunk_count)
# Check if we got significantly fewer chunks than expected (likely due to expiration)
# Only check if we expected multiple chunks and got none or very few
if chunk_count > 3 and len(chunks) == 0 and chunks_behind > 10:
# Chunks are missing - likely expired from Redis
# Return empty list to signal client should skip forward
logger.debug(f"Chunks missing for client at index {client_index}, buffer at {self.index} ({chunks_behind} behind)")
return [], client_index
# Check total size
total_size = sum(len(c) for c in chunks)
@ -316,7 +324,7 @@ class StreamBuffer:
additional_size = sum(len(c) for c in more_chunks)
if total_size + additional_size <= MAX_SIZE:
chunks.extend(more_chunks)
chunk_count += additional
chunk_count += len(more_chunks) # Fixed: count actual additional chunks retrieved
return chunks, client_index + chunk_count

View file

@ -204,6 +204,18 @@ class StreamGenerator:
self.empty_reads += 1
self.consecutive_empty += 1
# Check if we're too far behind (chunks expired from Redis)
chunks_behind = self.buffer.index - self.local_index
if chunks_behind > 50: # If more than 50 chunks behind, jump forward
# Calculate new position: stay a few chunks behind current buffer
initial_behind = ConfigHelper.initial_behind_chunks()
new_index = max(self.local_index, self.buffer.index - initial_behind)
logger.warning(f"[{self.client_id}] Client too far behind ({chunks_behind} chunks), jumping from {self.local_index} to {new_index}")
self.local_index = new_index
self.consecutive_empty = 0 # Reset since we're repositioning
continue # Try again immediately with new position
if self._should_send_keepalive(self.local_index):
keepalive_packet = create_ts_packet('keepalive')
logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head")

View file

@ -9,7 +9,9 @@ import subprocess
import gevent
import re
from typing import Optional, List
from django.db import connection
from django.shortcuts import get_object_or_404
from urllib3.exceptions import ReadTimeoutError
from apps.proxy.config import TSConfig as Config
from apps.channels.models import Channel, Stream
from apps.m3u.models import M3UAccount, M3UAccountProfile
@ -91,11 +93,13 @@ class StreamManager:
self.tried_stream_ids.add(self.current_stream_id)
logger.info(f"Loaded stream ID {self.current_stream_id} from Redis for channel {buffer.channel_id}")
else:
logger.warning(f"No stream_id found in Redis for channel {channel_id}")
logger.warning(f"No stream_id found in Redis for channel {channel_id}. "
f"Stream switching will rely on URL comparison to avoid selecting the same stream.")
except Exception as e:
logger.warning(f"Error loading stream ID from Redis: {e}")
else:
logger.warning(f"Unable to get stream ID for channel {channel_id} - stream switching may not work correctly")
logger.warning(f"Unable to get stream ID for channel {channel_id}. "
f"Stream switching will rely on URL comparison to avoid selecting the same stream.")
logger.info(f"Initialized stream manager for channel {buffer.channel_id}")
@ -111,6 +115,9 @@ class StreamManager:
self.stderr_reader_thread = None
self.ffmpeg_input_phase = True # Track if we're still reading input info
# Add HTTP reader thread property
self.http_reader = None
def _create_session(self):
"""Create and configure requests session with optimal settings"""
session = requests.Session()
@ -378,6 +385,12 @@ class StreamManager:
except Exception as e:
logger.error(f"Failed to update channel state in Redis: {e} for channel {self.channel_id}", exc_info=True)
# Close database connection for this thread
try:
connection.close()
except Exception:
pass
logger.info(f"Stream manager stopped for channel {self.channel_id}")
def _establish_transcode_connection(self):
@ -737,9 +750,9 @@ class StreamManager:
def _establish_http_connection(self):
"""Establish a direct HTTP connection to the stream"""
"""Establish HTTP connection using thread-based reader (same as transcode path)"""
try:
logger.debug(f"Using TS Proxy to connect to stream: {self.url}")
logger.debug(f"Using HTTP streamer thread to connect to stream: {self.url}")
# Check if we already have active HTTP connections
if self.current_response or self.current_session:
@ -756,41 +769,39 @@ class StreamManager:
logger.debug(f"Closing existing transcode process before establishing HTTP connection for channel {self.channel_id}")
self._close_socket()
# Create new session for each connection attempt
session = self._create_session()
self.current_session = session
# Use HTTPStreamReader to fetch stream and pipe to a readable file descriptor
# This allows us to use the same fetch_chunk() path as transcode
from .http_streamer import HTTPStreamReader
# Stream the URL with proper timeout handling
response = session.get(
self.url,
stream=True,
timeout=(10, 60) # 10s connect timeout, 60s read timeout
# Create and start the HTTP stream reader
self.http_reader = HTTPStreamReader(
url=self.url,
user_agent=self.user_agent,
chunk_size=self.chunk_size
)
self.current_response = response
if response.status_code == 200:
self.connected = True
self.healthy = True
logger.info(f"Successfully connected to stream source for channel {self.channel_id}")
# Start the reader thread and get the read end of the pipe
pipe_fd = self.http_reader.start()
# Store connection start time for stability tracking
self.connection_start_time = time.time()
# Wrap the file descriptor in a file object (same as transcode stdout)
import os
self.socket = os.fdopen(pipe_fd, 'rb', buffering=0)
self.connected = True
self.healthy = True
# Set channel state to waiting for clients
self._set_waiting_for_clients()
logger.info(f"Successfully started HTTP streamer thread for channel {self.channel_id}")
# Store connection start time for stability tracking
self.connection_start_time = time.time()
# Set channel state to waiting for clients
self._set_waiting_for_clients()
return True
return True
else:
logger.error(f"Failed to connect to stream for channel {self.channel_id}: HTTP {response.status_code}")
self._close_connection()
return False
except requests.exceptions.RequestException as e:
logger.error(f"HTTP request error: {e}")
self._close_connection()
return False
except Exception as e:
logger.error(f"Error establishing HTTP connection for channel {self.channel_id}: {e}", exc_info=True)
self._close_connection()
self._close_socket()
return False
def _update_bytes_processed(self, chunk_size):
@ -818,48 +829,19 @@ class StreamManager:
logger.error(f"Error updating bytes processed: {e}")
def _process_stream_data(self):
"""Process stream data until disconnect or error"""
"""Process stream data until disconnect or error - unified path for both transcode and HTTP"""
try:
if self.transcode:
# Handle transcoded stream data
while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch:
if self.fetch_chunk():
self.last_data_time = time.time()
else:
if not self.running:
break
gevent.sleep(0.1) # REPLACE time.sleep(0.1)
else:
# Handle direct HTTP connection
chunk_count = 0
try:
for chunk in self.current_response.iter_content(chunk_size=self.chunk_size):
# Check if we've been asked to stop
if self.stop_requested or self.url_switching or self.needs_stream_switch:
break
if chunk:
# Track chunk size before adding to buffer
chunk_size = len(chunk)
self._update_bytes_processed(chunk_size)
# Add chunk to buffer with TS packet alignment
success = self.buffer.add_chunk(chunk)
if success:
self.last_data_time = time.time()
chunk_count += 1
# Update last data timestamp in Redis
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
last_data_key = RedisKeys.last_data(self.buffer.channel_id)
self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60)
except (AttributeError, ConnectionError) as e:
if self.stop_requested or self.url_switching:
logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}")
else:
logger.error(f"Unexpected stream error for channel {self.channel_id}: {e}")
raise
# Both transcode and HTTP now use the same subprocess/socket approach
# This gives us perfect control: check flags between chunks, timeout just returns False
while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch:
if self.fetch_chunk():
self.last_data_time = time.time()
else:
# fetch_chunk() returned False - could be timeout, no data, or error
if not self.running:
break
# Brief sleep before retry to avoid tight loop
gevent.sleep(0.1)
except Exception as e:
logger.error(f"Error processing stream data for channel {self.channel_id}: {e}", exc_info=True)
@ -948,6 +930,7 @@ class StreamManager:
# Import both models for proper resource management
from apps.channels.models import Stream, Channel
from django.db import connection
# Update stream profile if we're switching streams
if self.current_stream_id and stream_id and self.current_stream_id != stream_id:
@ -965,8 +948,16 @@ class StreamManager:
logger.debug(f"Updated m3u profile for channel {self.channel_id} to use profile from stream {stream_id}")
else:
logger.warning(f"Failed to update stream profile for channel {self.channel_id}")
except Exception as e:
logger.error(f"Error updating stream profile for channel {self.channel_id}: {e}")
finally:
# Always close database connection after profile update
try:
connection.close()
except Exception:
pass
# CRITICAL: Set a flag to prevent immediate reconnection with old URL
self.url_switching = True
@ -1183,6 +1174,15 @@ class StreamManager:
if self.current_response or self.current_session:
self._close_connection()
# Stop HTTP reader thread if it exists
if hasattr(self, 'http_reader') and self.http_reader:
try:
logger.debug(f"Stopping HTTP reader thread for channel {self.channel_id}")
self.http_reader.stop()
self.http_reader = None
except Exception as e:
logger.debug(f"Error stopping HTTP reader for channel {self.channel_id}: {e}")
# Otherwise handle socket and transcode resources
if self.socket:
try:
@ -1219,6 +1219,30 @@ class StreamManager:
except Exception as e:
logger.error(f"Final kill attempt failed for channel {self.channel_id}: {e}")
# Explicitly close all subprocess pipes to prevent file descriptor leaks
try:
if self.transcode_process.stdin:
self.transcode_process.stdin.close()
if self.transcode_process.stdout:
self.transcode_process.stdout.close()
if self.transcode_process.stderr:
self.transcode_process.stderr.close()
logger.debug(f"Closed all subprocess pipes for channel {self.channel_id}")
except Exception as e:
logger.debug(f"Error closing subprocess pipes for channel {self.channel_id}: {e}")
# Join stderr reader thread to ensure it's fully terminated
if hasattr(self, 'stderr_reader_thread') and self.stderr_reader_thread and self.stderr_reader_thread.is_alive():
try:
logger.debug(f"Waiting for stderr reader thread to terminate for channel {self.channel_id}")
self.stderr_reader_thread.join(timeout=2.0)
if self.stderr_reader_thread.is_alive():
logger.warning(f"Stderr reader thread did not terminate within timeout for channel {self.channel_id}")
except Exception as e:
logger.debug(f"Error joining stderr reader thread for channel {self.channel_id}: {e}")
finally:
self.stderr_reader_thread = None
self.transcode_process = None
self.transcode_process_active = False # Reset the flag
@ -1250,7 +1274,7 @@ class StreamManager:
try:
# Set timeout for chunk reads
chunk_timeout = ConfigHelper.get('CHUNK_TIMEOUT', 10) # Default 10 seconds
chunk_timeout = ConfigHelper.chunk_timeout() # Use centralized timeout configuration
try:
# Handle different socket types with timeout
@ -1333,7 +1357,17 @@ class StreamManager:
# Only update if not already past connecting
if not current_state or current_state in [ChannelState.INITIALIZING, ChannelState.CONNECTING]:
# NEW CODE: Check if buffer has enough chunks
current_buffer_index = getattr(self.buffer, 'index', 0)
# IMPORTANT: Read from Redis, not local buffer.index, because in multi-worker setup
# each worker has its own StreamBuffer instance with potentially stale local index
buffer_index_key = RedisKeys.buffer_index(channel_id)
current_buffer_index = 0
try:
redis_index = redis_client.get(buffer_index_key)
if redis_index:
current_buffer_index = int(redis_index)
except Exception as e:
logger.error(f"Error reading buffer index from Redis: {e}")
initial_chunks_needed = ConfigHelper.initial_behind_chunks()
if current_buffer_index < initial_chunks_needed:
@ -1381,10 +1415,21 @@ class StreamManager:
# Clean up completed timers
self._buffer_check_timers = [t for t in self._buffer_check_timers if t.is_alive()]
if hasattr(self.buffer, 'index') and hasattr(self.buffer, 'channel_id'):
current_buffer_index = self.buffer.index
initial_chunks_needed = getattr(Config, 'INITIAL_BEHIND_CHUNKS', 10)
if hasattr(self.buffer, 'channel_id') and hasattr(self.buffer, 'redis_client'):
channel_id = self.buffer.channel_id
redis_client = self.buffer.redis_client
# IMPORTANT: Read from Redis, not local buffer.index
buffer_index_key = RedisKeys.buffer_index(channel_id)
current_buffer_index = 0
try:
redis_index = redis_client.get(buffer_index_key)
if redis_index:
current_buffer_index = int(redis_index)
except Exception as e:
logger.error(f"Error reading buffer index from Redis: {e}")
initial_chunks_needed = ConfigHelper.initial_behind_chunks() # Use ConfigHelper for consistency
if current_buffer_index >= initial_chunks_needed:
# We now have enough buffer, call _set_waiting_for_clients again
@ -1409,6 +1454,7 @@ class StreamManager:
def _try_next_stream(self):
"""
Try to switch to the next available stream for this channel.
Will iterate through multiple alternate streams if needed to find one with a different URL.
Returns:
bool: True if successfully switched to a new stream, False otherwise
@ -1434,60 +1480,71 @@ class StreamManager:
logger.warning(f"All {len(alternate_streams)} alternate streams have been tried for channel {self.channel_id}")
return False
# Get the next stream to try
next_stream = untried_streams[0]
stream_id = next_stream['stream_id']
profile_id = next_stream['profile_id'] # This is the M3U profile ID we need
# IMPROVED: Try multiple streams until we find one with a different URL
for next_stream in untried_streams:
stream_id = next_stream['stream_id']
profile_id = next_stream['profile_id'] # This is the M3U profile ID we need
# Add to tried streams
self.tried_stream_ids.add(stream_id)
# Add to tried streams
self.tried_stream_ids.add(stream_id)
# Get stream info including URL using the profile_id we already have
logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}")
stream_info = get_stream_info_for_switch(self.channel_id, stream_id)
# Get stream info including URL using the profile_id we already have
logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}")
stream_info = get_stream_info_for_switch(self.channel_id, stream_id)
if 'error' in stream_info or not stream_info.get('url'):
logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}")
return False
if 'error' in stream_info or not stream_info.get('url'):
logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}")
continue # Try next stream instead of giving up
# Update URL and user agent
new_url = stream_info['url']
new_user_agent = stream_info['user_agent']
new_transcode = stream_info['transcode']
# Update URL and user agent
new_url = stream_info['url']
new_user_agent = stream_info['user_agent']
new_transcode = stream_info['transcode']
logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}")
# CRITICAL FIX: Check if the new URL is the same as current URL
# This can happen when current_stream_id is None and we accidentally select the same stream
if new_url == self.url:
logger.warning(f"Stream ID {stream_id} generates the same URL as current stream ({new_url}). "
f"Skipping this stream and trying next alternative.")
continue # Try next stream instead of giving up
# IMPORTANT: Just update the URL, don't stop the channel or release resources
switch_result = self.update_url(new_url, stream_id, profile_id)
if not switch_result:
logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}")
return False
logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}")
# Update stream ID tracking
self.current_stream_id = stream_id
# IMPORTANT: Just update the URL, don't stop the channel or release resources
switch_result = self.update_url(new_url, stream_id, profile_id)
if not switch_result:
logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}")
continue # Try next stream
# Store the new user agent and transcode settings
self.user_agent = new_user_agent
self.transcode = new_transcode
# Update stream ID tracking
self.current_stream_id = stream_id
# Update stream metadata in Redis - use the profile_id we got from get_alternate_streams
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
metadata_key = RedisKeys.channel_metadata(self.channel_id)
self.buffer.redis_client.hset(metadata_key, mapping={
ChannelMetadataField.URL: new_url,
ChannelMetadataField.USER_AGENT: new_user_agent,
ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'],
ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams
ChannelMetadataField.STREAM_ID: str(stream_id),
ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()),
ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded"
})
# Store the new user agent and transcode settings
self.user_agent = new_user_agent
self.transcode = new_transcode
# Log the switch
logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}")
# Update stream metadata in Redis - use the profile_id we got from get_alternate_streams
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
metadata_key = RedisKeys.channel_metadata(self.channel_id)
self.buffer.redis_client.hset(metadata_key, mapping={
ChannelMetadataField.URL: new_url,
ChannelMetadataField.USER_AGENT: new_user_agent,
ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'],
ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams
ChannelMetadataField.STREAM_ID: str(stream_id),
ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()),
ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded"
})
logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}")
return True
# Log the switch
logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}")
logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}")
return True
# If we get here, we tried all streams but none worked
logger.error(f"Tried {len(untried_streams)} alternate streams but none were suitable for channel {self.channel_id}")
return False
except Exception as e:
logger.error(f"Error trying next stream for channel {self.channel_id}: {e}", exc_info=True)

View file

@ -8,7 +8,7 @@ from typing import Optional, Tuple, List
from django.shortcuts import get_object_or_404
from apps.channels.models import Channel, Stream
from apps.m3u.models import M3UAccount, M3UAccountProfile
from core.models import UserAgent, CoreSettings
from core.models import UserAgent, CoreSettings, StreamProfile
from .utils import get_logger
from uuid import UUID
import requests
@ -26,16 +26,67 @@ def get_stream_object(id: str):
def generate_stream_url(channel_id: str) -> Tuple[str, str, bool, Optional[int]]:
"""
Generate the appropriate stream URL for a channel based on its profile settings.
Generate the appropriate stream URL for a channel or stream based on its profile settings.
Args:
channel_id: The UUID of the channel
channel_id: The UUID of the channel or stream hash
Returns:
Tuple[str, str, bool, Optional[int]]: (stream_url, user_agent, transcode_flag, profile_id)
"""
try:
channel = get_stream_object(channel_id)
channel_or_stream = get_stream_object(channel_id)
# Handle direct stream preview (custom streams)
if isinstance(channel_or_stream, Stream):
stream = channel_or_stream
logger.info(f"Previewing stream directly: {stream.id} ({stream.name})")
# For custom streams, we need to get the M3U account and profile
m3u_account = stream.m3u_account
if not m3u_account:
logger.error(f"Stream {stream.id} has no M3U account")
return None, None, False, None
# Get the default profile for this M3U account (custom streams use default)
m3u_profiles = m3u_account.profiles.all()
profile = next((obj for obj in m3u_profiles if obj.is_default), None)
if not profile:
logger.error(f"No default profile found for M3U account {m3u_account.id}")
return None, None, False, None
# Get the appropriate user agent
stream_user_agent = m3u_account.get_user_agent().user_agent
if stream_user_agent is None:
stream_user_agent = UserAgent.objects.get(id=CoreSettings.get_default_user_agent_id())
logger.debug(f"No user agent found for account, using default: {stream_user_agent}")
# Get stream URL (no transformation for custom streams)
stream_url = stream.url
# Check if the stream has its own stream_profile set, otherwise use default
if stream.stream_profile:
stream_profile = stream.stream_profile
logger.debug(f"Using stream's own stream profile: {stream_profile.name}")
else:
stream_profile = StreamProfile.objects.get(
id=CoreSettings.get_default_stream_profile_id()
)
logger.debug(f"Using default stream profile: {stream_profile.name}")
# Check if transcoding is needed
if stream_profile.is_proxy() or stream_profile is None:
transcode = False
else:
transcode = True
stream_profile_id = stream_profile.id
return stream_url, stream_user_agent, transcode, stream_profile_id
# Handle channel preview (existing logic)
channel = channel_or_stream
# Get stream and profile for this channel
# Note: get_stream now returns 3 values (stream_id, profile_id, error_reason)

View file

@ -128,7 +128,7 @@ def stream_ts(request, channel_id):
ChannelService.stop_channel(channel_id)
# Use fixed retry interval and timeout
retry_timeout = 1.5 # 1.5 seconds total timeout
retry_timeout = 3 # 3 seconds total timeout
retry_interval = 0.1 # 100ms between attempts
wait_start_time = time.time()
@ -138,9 +138,10 @@ def stream_ts(request, channel_id):
profile_value = None
error_reason = None
attempt = 0
should_retry = True
# Try to get a stream with fixed interval retries
while time.time() - wait_start_time < retry_timeout:
while should_retry and time.time() - wait_start_time < retry_timeout:
attempt += 1
stream_url, stream_user_agent, transcode, profile_value = (
generate_stream_url(channel_id)
@ -152,35 +153,53 @@ def stream_ts(request, channel_id):
)
break
# If we failed because there are no streams assigned, don't retry
_, _, error_reason = channel.get_stream()
if error_reason and "maximum connection limits" not in error_reason:
logger.warning(
f"[{client_id}] Can't retry - error not related to connection limits: {error_reason}"
# On first failure, check if the error is retryable
if attempt == 1:
_, _, error_reason = channel.get_stream()
if error_reason and "maximum connection limits" not in error_reason:
logger.warning(
f"[{client_id}] Can't retry - error not related to connection limits: {error_reason}"
)
should_retry = False
break
# Check if we have time remaining for another sleep cycle
elapsed_time = time.time() - wait_start_time
remaining_time = retry_timeout - elapsed_time
# If we don't have enough time for the next sleep interval, break
# but only after we've already made an attempt (the while condition will try one more time)
if remaining_time <= retry_interval:
logger.info(
f"[{client_id}] Insufficient time ({remaining_time:.1f}s) for another sleep cycle, will make one final attempt"
)
break
# Wait 100ms before retrying
elapsed_time = time.time() - wait_start_time
remaining_time = retry_timeout - elapsed_time
if remaining_time > retry_interval:
# Wait before retrying
logger.info(
f"[{client_id}] Waiting {retry_interval*1000:.0f}ms for a connection to become available (attempt {attempt}, {remaining_time:.1f}s remaining)"
)
gevent.sleep(retry_interval)
retry_interval += 0.025 # Increase wait time by 25ms for next attempt
# Make one final attempt if we still don't have a stream, should retry, and haven't exceeded timeout
if stream_url is None and should_retry and time.time() - wait_start_time < retry_timeout:
attempt += 1
logger.info(
f"[{client_id}] Making final attempt {attempt} at timeout boundary"
)
stream_url, stream_user_agent, transcode, profile_value = (
generate_stream_url(channel_id)
)
if stream_url is not None:
logger.info(
f"[{client_id}] Waiting {retry_interval*1000:.0f}ms for a connection to become available (attempt {attempt}, {remaining_time:.1f}s remaining)"
f"[{client_id}] Successfully obtained stream on final attempt for channel {channel_id}"
)
gevent.sleep(retry_interval)
retry_interval += 0.025 # Increase wait time by 25ms for next attempt
if stream_url is None:
# Make sure to release any stream locks that might have been acquired
if hasattr(channel, "streams") and channel.streams.exists():
for stream in channel.streams.all():
try:
stream.release_stream()
logger.info(
f"[{client_id}] Released stream {stream.id} for channel {channel_id}"
)
except Exception as e:
logger.error(f"[{client_id}] Error releasing stream: {e}")
# Release the channel's stream lock if one was acquired
# Note: Only call this if get_stream() actually assigned a stream
# In our case, if stream_url is None, no stream was ever assigned, so don't release
# Get the specific error message if available
wait_duration = f"{int(time.time() - wait_start_time)}s"
@ -189,6 +208,9 @@ def stream_ts(request, channel_id):
if error_reason
else "No available streams for this channel"
)
logger.info(
f"[{client_id}] Failed to obtain stream after {attempt} attempts over {wait_duration}: {error_msg}"
)
return JsonResponse(
{"error": error_msg, "waited": wait_duration}, status=503
) # 503 Service Unavailable is appropriate here
@ -474,24 +496,33 @@ def stream_xc(request, username, password, channel_id):
print(f"Fetchin channel with ID: {channel_id}")
if user.user_level < 10:
filters = {
"id": int(channel_id),
"channelprofilemembership__enabled": True,
"user_level__lte": user.user_level,
}
user_profile_count = user.channel_profiles.count()
if user.channel_profiles.count() > 0:
channel_profiles = user.channel_profiles.all()
filters["channelprofilemembership__channel_profile__in"] = channel_profiles
# If user has ALL profiles or NO profiles, give unrestricted access
if user_profile_count == 0:
# No profile filtering - user sees all channels based on user_level
filters = {
"id": int(channel_id),
"user_level__lte": user.user_level
}
channel = Channel.objects.filter(**filters).first()
else:
# User has specific limited profiles assigned
filters = {
"id": int(channel_id),
"channelprofilemembership__enabled": True,
"user_level__lte": user.user_level,
"channelprofilemembership__channel_profile__in": user.channel_profiles.all()
}
channel = Channel.objects.filter(**filters).distinct().first()
channel = Channel.objects.filter(**filters).distinct().first()
if not channel:
return JsonResponse({"error": "Not found"}, status=404)
else:
channel = get_object_or_404(Channel, id=channel_id)
# @TODO: we've got the file 'type' via extension, support this when we support multiple outputs
return stream_ts(request._request, channel.uuid)
return stream_ts(request._request, str(channel.uuid))
@csrf_exempt

View file

@ -187,16 +187,28 @@ def batch_create_categories(categories_data, category_type, account):
logger.debug(f"Found {len(existing_categories)} existing categories")
# Check if we should auto-enable new categories based on account settings
account_custom_props = account.custom_properties or {}
if category_type == 'movie':
auto_enable_new = account_custom_props.get("auto_enable_new_groups_vod", True)
else: # series
auto_enable_new = account_custom_props.get("auto_enable_new_groups_series", True)
# Create missing categories in batch
new_categories = []
for name in category_names:
if name not in existing_categories:
# Always create new categories
new_categories.append(VODCategory(name=name, category_type=category_type))
else:
# Existing category - create relationship with enabled based on auto_enable setting
# (category exists globally but is new to this account)
relations_to_create.append(M3UVODCategoryRelation(
category=existing_categories[name],
m3u_account=account,
custom_properties={},
enabled=auto_enable_new,
))
logger.debug(f"{len(new_categories)} new categories found")
@ -204,24 +216,68 @@ def batch_create_categories(categories_data, category_type, account):
if new_categories:
logger.debug("Creating new categories...")
created_categories = VODCategory.bulk_create_and_fetch(new_categories, ignore_conflicts=True)
created_categories = list(VODCategory.bulk_create_and_fetch(new_categories, ignore_conflicts=True))
# Create relations for newly created categories with enabled based on auto_enable setting
for cat in created_categories:
if not auto_enable_new:
logger.info(f"New {category_type} category '{cat.name}' created but DISABLED - auto_enable_new_groups is disabled for account {account.id}")
relations_to_create.append(
M3UVODCategoryRelation(
category=cat,
m3u_account=account,
custom_properties={},
enabled=auto_enable_new,
)
)
# Convert to dictionary for easy lookup
newly_created = {cat.name: cat for cat in created_categories}
relations_to_create += [
M3UVODCategoryRelation(
category=cat,
m3u_account=account,
custom_properties={},
) for cat in newly_created.values()
]
existing_categories.update(newly_created)
# Create missing relations
logger.debug("Updating category account relations...")
M3UVODCategoryRelation.objects.bulk_create(relations_to_create, ignore_conflicts=True)
# Delete orphaned category relationships (categories no longer in the M3U source)
current_category_ids = set(existing_categories[name].id for name in category_names)
existing_relations = M3UVODCategoryRelation.objects.filter(
m3u_account=account,
category__category_type=category_type
).select_related('category')
relations_to_delete = [
rel for rel in existing_relations
if rel.category_id not in current_category_ids
]
if relations_to_delete:
M3UVODCategoryRelation.objects.filter(
id__in=[rel.id for rel in relations_to_delete]
).delete()
logger.info(f"Deleted {len(relations_to_delete)} orphaned {category_type} category relationships for account {account.id}: {[rel.category.name for rel in relations_to_delete]}")
# Check if any of the deleted relationships left categories with no remaining associations
orphaned_category_ids = []
for rel in relations_to_delete:
category = rel.category
# Check if this category has any remaining M3U account relationships
remaining_relationships = M3UVODCategoryRelation.objects.filter(
category=category
).exists()
# If no relationships remain, it's safe to delete the category
if not remaining_relationships:
orphaned_category_ids.append(category.id)
logger.debug(f"Category '{category.name}' has no remaining associations and will be deleted")
# Delete orphaned categories
if orphaned_category_ids:
VODCategory.objects.filter(id__in=orphaned_category_ids).delete()
logger.info(f"Deleted {len(orphaned_category_ids)} orphaned {category_type} categories with no remaining associations")
# 🔑 Fetch all relations for this account, for all categories
# relations = { rel.id: rel for rel in M3UVODCategoryRelation.objects
# .filter(category__in=existing_categories.values(), m3u_account=account)

View file

@ -2,7 +2,7 @@
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .api_views import UserAgentViewSet, StreamProfileViewSet, CoreSettingsViewSet, environment, version, rehash_streams_endpoint
from .api_views import UserAgentViewSet, StreamProfileViewSet, CoreSettingsViewSet, environment, version, rehash_streams_endpoint, TimezoneListView
router = DefaultRouter()
router.register(r'useragents', UserAgentViewSet, basename='useragent')
@ -12,5 +12,6 @@ urlpatterns = [
path('settings/env/', environment, name='token_refresh'),
path('version/', version, name='version'),
path('rehash-streams/', rehash_streams_endpoint, name='rehash_streams'),
path('timezones/', TimezoneListView.as_view(), name='timezones'),
path('', include(router.urls)),
]

View file

@ -5,10 +5,12 @@ import ipaddress
import logging
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from django.shortcuts import get_object_or_404
from rest_framework.permissions import IsAuthenticated
from rest_framework.decorators import api_view, permission_classes, action
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from .models import (
UserAgent,
StreamProfile,
@ -328,25 +330,69 @@ def rehash_streams_endpoint(request):
# Get the current hash keys from settings
hash_key_setting = CoreSettings.objects.get(key=STREAM_HASH_KEY)
hash_keys = hash_key_setting.value.split(",")
# Queue the rehash task
task = rehash_streams.delay(hash_keys)
return Response({
"success": True,
"message": "Stream rehashing task has been queued",
"task_id": task.id
}, status=status.HTTP_200_OK)
except CoreSettings.DoesNotExist:
return Response({
"success": False,
"message": "Hash key settings not found"
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"Error triggering rehash streams: {e}")
return Response({
"success": False,
"message": "Failed to trigger rehash task"
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# ─────────────────────────────
# Timezone List API
# ─────────────────────────────
class TimezoneListView(APIView):
"""
API endpoint that returns all available timezones supported by pytz.
Returns a list of timezone names grouped by region for easy selection.
This is a general utility endpoint that can be used throughout the application.
"""
def get_permissions(self):
return [Authenticated()]
@swagger_auto_schema(
operation_description="Get list of all supported timezones",
responses={200: openapi.Response('List of timezones with grouping by region')}
)
def get(self, request):
import pytz
# Get all common timezones (excludes deprecated ones)
all_timezones = sorted(pytz.common_timezones)
# Group by region for better UX
grouped = {}
for tz in all_timezones:
if '/' in tz:
region = tz.split('/')[0]
if region not in grouped:
grouped[region] = []
grouped[region].append(tz)
else:
# Handle special zones like UTC, GMT, etc.
if 'Other' not in grouped:
grouped['Other'] = []
grouped['Other'].append(tz)
return Response({
'timezones': all_timezones,
'grouped': grouped,
'count': len(all_timezones)
})

View file

@ -50,13 +50,21 @@ app.conf.update(
)
# Add memory cleanup after task completion
#@task_postrun.connect # Use the imported signal
@task_postrun.connect # Use the imported signal
def cleanup_task_memory(**kwargs):
"""Clean up memory after each task completes"""
"""Clean up memory and database connections after each task completes"""
from django.db import connection
# Get task name from kwargs
task_name = kwargs.get('task').name if kwargs.get('task') else ''
# Only run cleanup for memory-intensive tasks
# Close database connection for this Celery worker process
try:
connection.close()
except Exception:
pass
# Only run memory cleanup for memory-intensive tasks
memory_intensive_tasks = [
'apps.m3u.tasks.refresh_single_m3u_account',
'apps.m3u.tasks.refresh_m3u_accounts',

View file

@ -134,6 +134,7 @@ else:
"PASSWORD": os.environ.get("POSTGRES_PASSWORD", "secret"),
"HOST": os.environ.get("POSTGRES_HOST", "localhost"),
"PORT": int(os.environ.get("POSTGRES_PORT", 5432)),
"CONN_MAX_AGE": DATABASE_CONN_MAX_AGE,
}
}

View file

@ -14,6 +14,15 @@ services:
- REDIS_HOST=localhost
- CELERY_BROKER_URL=redis://localhost:6379/0
- DISPATCHARR_LOG_LEVEL=info
# Process Priority Configuration (Optional)
# Lower values = higher priority. Range: -20 (highest) to 19 (lowest)
# Negative values require cap_add: SYS_NICE (uncomment below)
#- UWSGI_NICE_LEVEL=-5 # uWSGI/FFmpeg/Streaming (default: 0, recommended: -5 for high priority)
#- CELERY_NICE_LEVEL=5 # Celery/EPG/Background tasks (default: 5, low priority)
#
# Uncomment to enable high priority for streaming (required if UWSGI_NICE_LEVEL < 0)
#cap_add:
# - SYS_NICE
# Optional for hardware acceleration
#devices:
# - /dev/dri:/dev/dri # For Intel/AMD GPU acceleration (VA-API)

View file

@ -18,3 +18,12 @@ services:
- REDIS_HOST=localhost
- CELERY_BROKER_URL=redis://localhost:6379/0
- DISPATCHARR_LOG_LEVEL=trace
# Process Priority Configuration (Optional)
# Lower values = higher priority. Range: -20 (highest) to 19 (lowest)
# Negative values require cap_add: SYS_NICE (uncomment below)
#- UWSGI_NICE_LEVEL=-5 # uWSGI/FFmpeg/Streaming (default: 0, recommended: -5 for high priority)
#- CELERY_NICE_LEVEL=5 # Celery/EPG/Background tasks (default: 5, low priority)
#
# Uncomment to enable high priority for streaming (required if UWSGI_NICE_LEVEL < 0)
#cap_add:
# - SYS_NICE

View file

@ -17,6 +17,15 @@ services:
- REDIS_HOST=localhost
- CELERY_BROKER_URL=redis://localhost:6379/0
- DISPATCHARR_LOG_LEVEL=debug
# Process Priority Configuration (Optional)
# Lower values = higher priority. Range: -20 (highest) to 19 (lowest)
# Negative values require cap_add: SYS_NICE (uncomment below)
#- UWSGI_NICE_LEVEL=-5 # uWSGI/FFmpeg/Streaming (default: 0, recommended: -5 for high priority)
#- CELERY_NICE_LEVEL=5 # Celery/EPG/Background tasks (default: 5, low priority)
#
# Uncomment to enable high priority for streaming (required if UWSGI_NICE_LEVEL < 0)
#cap_add:
# - SYS_NICE
pgadmin:
image: dpage/pgadmin4

View file

@ -17,6 +17,15 @@ services:
- REDIS_HOST=redis
- CELERY_BROKER_URL=redis://redis:6379/0
- DISPATCHARR_LOG_LEVEL=info
# Process Priority Configuration (Optional)
# Lower values = higher priority. Range: -20 (highest) to 19 (lowest)
# Negative values require cap_add: SYS_NICE (uncomment below)
#- UWSGI_NICE_LEVEL=-5 # uWSGI/FFmpeg/Streaming (default: 0, recommended: -5 for high priority)
#- CELERY_NICE_LEVEL=5 # Celery/EPG/Background tasks (default: 5, low priority)
#
# Uncomment to enable high priority for streaming (required if UWSGI_NICE_LEVEL < 0)
#cap_add:
# - SYS_NICE
# Optional for hardware acceleration
#group_add:
# - video

View file

@ -40,6 +40,18 @@ export REDIS_DB=${REDIS_DB:-0}
export DISPATCHARR_PORT=${DISPATCHARR_PORT:-9191}
export LIBVA_DRIVERS_PATH='/usr/local/lib/x86_64-linux-gnu/dri'
export LD_LIBRARY_PATH='/usr/local/lib'
# Process priority configuration
# UWSGI_NICE_LEVEL: Absolute nice value for uWSGI/streaming (default: 0 = normal priority)
# CELERY_NICE_LEVEL: Absolute nice value for Celery/background tasks (default: 5 = low priority)
# Note: The script will automatically calculate the relative offset for Celery since it's spawned by uWSGI
export UWSGI_NICE_LEVEL=${UWSGI_NICE_LEVEL:-0}
CELERY_NICE_ABSOLUTE=${CELERY_NICE_LEVEL:-5}
# Calculate relative nice value for Celery (since nice is relative to parent process)
# Celery is spawned by uWSGI, so we need to add the offset to reach the desired absolute value
export CELERY_NICE_LEVEL=$((CELERY_NICE_ABSOLUTE - UWSGI_NICE_LEVEL))
# Set LIBVA_DRIVER_NAME if user has specified it
if [ -v LIBVA_DRIVER_NAME ]; then
export LIBVA_DRIVER_NAME
@ -78,6 +90,7 @@ if [[ ! -f /etc/profile.d/dispatcharr.sh ]]; then
DISPATCHARR_ENV DISPATCHARR_DEBUG DISPATCHARR_LOG_LEVEL
REDIS_HOST REDIS_DB POSTGRES_DIR DISPATCHARR_PORT
DISPATCHARR_VERSION DISPATCHARR_TIMESTAMP LIBVA_DRIVERS_PATH LIBVA_DRIVER_NAME LD_LIBRARY_PATH
CELERY_NICE_LEVEL UWSGI_NICE_LEVEL
)
# Process each variable for both profile.d and environment
@ -96,7 +109,16 @@ fi
chmod +x /etc/profile.d/dispatcharr.sh
pip install django-filter
# Ensure root's .bashrc sources the profile.d scripts for interactive non-login shells
if ! grep -q "profile.d/dispatcharr.sh" /root/.bashrc 2>/dev/null; then
cat >> /root/.bashrc << 'EOF'
# Source Dispatcharr environment variables
if [ -f /etc/profile.d/dispatcharr.sh ]; then
. /etc/profile.d/dispatcharr.sh
fi
EOF
fi
# Run init scripts
echo "Starting user setup..."
@ -161,10 +183,13 @@ if [ "$DISPATCHARR_DEBUG" != "true" ]; then
uwsgi_args+=" --disable-logging"
fi
# Launch uwsgi -p passes environment variables to the process
su -p - $POSTGRES_USER -c "cd /app && uwsgi $uwsgi_args &"
uwsgi_pid=$(pgrep uwsgi | sort | head -n1)
echo "✅ uwsgi started with PID $uwsgi_pid"
# Launch uwsgi with configurable nice level (default: -10 for high priority)
# Users can override via UWSGI_NICE_LEVEL environment variable in docker-compose
# Start with nice as root, then use setpriv to drop privileges to dispatch user
# This preserves both the nice value and environment variables
cd /app && nice -n $UWSGI_NICE_LEVEL setpriv --reuid=$POSTGRES_USER --regid=$POSTGRES_USER --clear-groups -- uwsgi $uwsgi_args &
uwsgi_pid=$!
echo "✅ uwsgi started with PID $uwsgi_pid (nice $UWSGI_NICE_LEVEL)"
pids+=("$uwsgi_pid")
# sed -i 's/protected-mode yes/protected-mode no/g' /etc/redis/redis.conf
@ -209,7 +234,7 @@ echo "🔍 Running hardware acceleration check..."
# Wait for at least one process to exit and log the process that exited first
if [ ${#pids[@]} -gt 0 ]; then
echo "⏳ Waiting for processes to exit..."
echo "⏳ Dispatcharr is running. Monitoring processes..."
while kill -0 "${pids[@]}" 2>/dev/null; do
sleep 1 # Wait for a second before checking again
done

View file

@ -1,25 +1,60 @@
#!/bin/bash
mkdir -p /data/logos
mkdir -p /data/recordings
mkdir -p /data/uploads/m3us
mkdir -p /data/uploads/epgs
mkdir -p /data/m3us
mkdir -p /data/epgs
mkdir -p /data/plugins
mkdir -p /app/logo_cache
mkdir -p /app/media
# Define directories that need to exist and be owned by PUID:PGID
DATA_DIRS=(
"/data/logos"
"/data/recordings"
"/data/uploads/m3us"
"/data/uploads/epgs"
"/data/m3us"
"/data/epgs"
"/data/plugins"
)
APP_DIRS=(
"/app/logo_cache"
"/app/media"
)
# Create all directories
for dir in "${DATA_DIRS[@]}" "${APP_DIRS[@]}"; do
mkdir -p "$dir"
done
# Ensure /app itself is owned by PUID:PGID (needed for uwsgi socket creation)
if [ "$(id -u)" = "0" ] && [ -d "/app" ]; then
if [ "$(stat -c '%u:%g' /app)" != "$PUID:$PGID" ]; then
echo "Fixing ownership for /app (non-recursive)"
chown $PUID:$PGID /app
fi
fi
sed -i "s/NGINX_PORT/${DISPATCHARR_PORT}/g" /etc/nginx/sites-enabled/default
# NOTE: mac doesn't run as root, so only manage permissions
# if this script is running as root
if [ "$(id -u)" = "0" ]; then
# Needs to own ALL of /data except db, we handle that below
chown -R $PUID:$PGID /data
chown -R $PUID:$PGID /app
# Fix data directories (non-recursive to avoid touching user files)
for dir in "${DATA_DIRS[@]}"; do
if [ -d "$dir" ] && [ "$(stat -c '%u:%g' "$dir")" != "$PUID:$PGID" ]; then
echo "Fixing ownership for $dir"
chown $PUID:$PGID "$dir"
fi
done
# Fix app directories (recursive since they're managed by the app)
for dir in "${APP_DIRS[@]}"; do
if [ -d "$dir" ] && [ "$(stat -c '%u:%g' "$dir")" != "$PUID:$PGID" ]; then
echo "Fixing ownership for $dir (recursive)"
chown -R $PUID:$PGID "$dir"
fi
done
# Database permissions
if [ -d /data/db ] && [ "$(stat -c '%u' /data/db)" != "$(id -u postgres)" ]; then
echo "Fixing ownership for /data/db"
chown -R postgres:postgres /data/db
fi
# Permissions
chown -R postgres:postgres /data/db
chmod +x /data
fi
fi

View file

@ -7,9 +7,10 @@ exec-before = python /app/scripts/wait_for_redis.py
; Start Redis first
attach-daemon = redis-server
; Then start other services
attach-daemon = nice -n 5 celery -A dispatcharr worker --autoscale=6,1
attach-daemon = nice -n 5 celery -A dispatcharr beat
; Then start other services with configurable nice level (default: 5 for low priority)
; Users can override via CELERY_NICE_LEVEL environment variable in docker-compose
attach-daemon = nice -n $(CELERY_NICE_LEVEL) celery -A dispatcharr worker --autoscale=6,1
attach-daemon = nice -n $(CELERY_NICE_LEVEL) celery -A dispatcharr beat
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
attach-daemon = cd /app/frontend && npm run dev

View file

@ -9,9 +9,10 @@ exec-pre = python /app/scripts/wait_for_redis.py
; Start Redis first
attach-daemon = redis-server
; Then start other services
attach-daemon = nice -n 5 celery -A dispatcharr worker --autoscale=6,1
attach-daemon = nice -n 5 celery -A dispatcharr beat
; Then start other services with configurable nice level (default: 5 for low priority)
; Users can override via CELERY_NICE_LEVEL environment variable in docker-compose
attach-daemon = nice -n $(CELERY_NICE_LEVEL) celery -A dispatcharr worker --autoscale=6,1
attach-daemon = nice -n $(CELERY_NICE_LEVEL) celery -A dispatcharr beat
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
attach-daemon = cd /app/frontend && npm run dev

View file

@ -9,9 +9,10 @@ exec-pre = python /app/scripts/wait_for_redis.py
; Start Redis first
attach-daemon = redis-server
; Then start other services
attach-daemon = nice -n 5 celery -A dispatcharr worker --autoscale=6,1
attach-daemon = nice -n 5 celery -A dispatcharr beat
; Then start other services with configurable nice level (default: 5 for low priority)
; Users can override via CELERY_NICE_LEVEL environment variable in docker-compose
attach-daemon = nice -n $(CELERY_NICE_LEVEL) celery -A dispatcharr worker --autoscale=6,1
attach-daemon = nice -n $(CELERY_NICE_LEVEL) celery -A dispatcharr beat
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
# Core settings

View file

@ -642,6 +642,16 @@ export const WebsocketProvider = ({ children }) => {
}
break;
case 'epg_data_created':
// A new EPG data entry was created (e.g., for a dummy EPG)
// Fetch EPG data so the channel form can immediately assign it
try {
await fetchEPGData();
} catch (e) {
console.warn('Failed to refresh EPG data after creation:', e);
}
break;
case 'stream_rehash':
// Handle stream rehash progress updates
if (parsedEvent.data.action === 'starting') {

View file

@ -1118,6 +1118,21 @@ export default class API {
}
}
static async getTimezones() {
try {
const response = await request(`${host}/api/core/timezones/`);
return response;
} catch (e) {
errorNotification('Failed to retrieve timezones', e);
// Return fallback data instead of throwing
return {
timezones: ['UTC', 'US/Eastern', 'US/Central', 'US/Mountain', 'US/Pacific'],
grouped: {},
count: 5
};
}
}
static async getStreamProfiles() {
try {
const response = await request(`${host}/api/core/streamprofiles/`);

File diff suppressed because it is too large Load diff

View file

@ -1,31 +1,22 @@
// Modal.js
import React, { useState, useEffect } from 'react';
import API from '../../api';
import useEPGsStore from '../../store/epgs';
import {
LoadingOverlay,
TextInput,
Button,
Checkbox,
Modal,
Flex,
NativeSelect,
NumberInput,
Space,
Grid,
Group,
FileInput,
Title,
Text,
Divider,
Stack,
Group,
Divider,
Box,
Text,
} from '@mantine/core';
import { isNotEmpty, useForm } from '@mantine/form';
const EPG = ({ epg = null, isOpen, onClose }) => {
const epgs = useEPGsStore((state) => state.epgs);
// Remove the file state and handler since we're not supporting file uploads
const [sourceType, setSourceType] = useState('xmltv');
const form = useForm({
@ -49,13 +40,9 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
const values = form.getValues();
if (epg?.id) {
// Remove file from API call
await API.updateEPG({ id: epg.id, ...values });
} else {
// Remove file from API call
await API.addEPG({
...values,
});
await API.addEPG(values);
}
form.reset();
@ -73,11 +60,12 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
refresh_interval: epg.refresh_interval,
};
form.setValues(values);
setSourceType(epg.source_type); // Update source type state
setSourceType(epg.source_type);
} else {
form.reset();
setSourceType('xmltv'); // Reset to xmltv
setSourceType('xmltv');
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [epg]);
// Function to handle source type changes
@ -156,7 +144,7 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
description="API key for services that require authentication"
{...form.getInputProps('api_key')}
key={form.key('api_key')}
disabled={sourceType !== 'schedules_direct'} // Use the state variable
disabled={sourceType !== 'schedules_direct'}
/>
{/* Put checkbox at the same level as Refresh Interval */}
@ -171,8 +159,8 @@ const EPG = ({ epg = null, isOpen, onClose }) => {
style={{
display: 'flex',
alignItems: 'center',
height: '30px', // Reduced height
marginTop: '-4px', // Slight negative margin to move it up
height: '30px',
marginTop: '-4px',
}}
>
<Checkbox

View file

@ -16,11 +16,20 @@ import {
Box,
MultiSelect,
Tooltip,
Popover,
ScrollArea,
Center,
} from '@mantine/core';
import { Info } from 'lucide-react';
import useChannelsStore from '../../store/channels';
import useStreamProfilesStore from '../../store/streamProfiles';
import { CircleCheck, CircleX } from 'lucide-react';
import { useChannelLogoSelection } from '../../hooks/useSmartLogos';
import { FixedSizeList as List } from 'react-window';
import LazyLogo from '../LazyLogo';
import LogoForm from './Logo';
import logo from '../../images/logo.png';
import API from '../../api';
// Custom item component for MultiSelect with tooltip
const OptionWithTooltip = forwardRef(
@ -33,12 +42,33 @@ const OptionWithTooltip = forwardRef(
)
);
const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
const LiveGroupFilter = ({
playlist,
groupStates,
setGroupStates,
autoEnableNewGroupsLive,
setAutoEnableNewGroupsLive,
}) => {
const channelGroups = useChannelsStore((s) => s.channelGroups);
const profiles = useChannelsStore((s) => s.profiles);
const streamProfiles = useStreamProfilesStore((s) => s.profiles);
const fetchStreamProfiles = useStreamProfilesStore((s) => s.fetchProfiles);
const [groupFilter, setGroupFilter] = useState('');
const [epgSources, setEpgSources] = useState([]);
// Logo selection functionality
const {
logos: channelLogos,
ensureLogosLoaded,
isLoading: logosLoading,
} = useChannelLogoSelection();
const [logoModalOpen, setLogoModalOpen] = useState(false);
const [currentEditingGroupId, setCurrentEditingGroupId] = useState(null);
// Ensure logos are loaded when component mounts
useEffect(() => {
ensureLogosLoaded();
}, [ensureLogosLoaded]);
// Fetch stream profiles when component mounts
useEffect(() => {
@ -47,6 +77,19 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
}
}, [streamProfiles.length, fetchStreamProfiles]);
// Fetch EPG sources when component mounts
useEffect(() => {
const fetchEPGSources = async () => {
try {
const sources = await API.getEPGs();
setEpgSources(sources || []);
} catch (error) {
console.error('Failed to fetch EPG sources:', error);
}
};
fetchEPGSources();
}, []);
useEffect(() => {
if (Object.keys(channelGroups).length === 0) {
return;
@ -62,7 +105,7 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
typeof group.custom_properties === 'string'
? JSON.parse(group.custom_properties)
: group.custom_properties;
} catch (e) {
} catch {
customProps = {};
}
}
@ -109,21 +152,27 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
);
};
// Toggle force_dummy_epg in custom_properties for a group
const toggleForceDummyEPG = (id) => {
setGroupStates(
groupStates.map((state) => {
if (state.channel_group == id) {
const customProps = { ...(state.custom_properties || {}) };
customProps.force_dummy_epg = !customProps.force_dummy_epg;
return {
...state,
custom_properties: customProps,
};
}
return state;
})
);
// Handle logo selection from LogoForm
const handleLogoSuccess = ({ logo }) => {
if (logo && logo.id && currentEditingGroupId !== null) {
setGroupStates(
groupStates.map((state) => {
if (state.channel_group === currentEditingGroupId) {
return {
...state,
custom_properties: {
...state.custom_properties,
custom_logo_id: logo.id,
},
};
}
return state;
})
);
ensureLogosLoaded(); // Refresh logos
}
setLogoModalOpen(false);
setCurrentEditingGroupId(null);
};
const selectAll = () => {
@ -159,6 +208,16 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
</Text>
</Alert>
<Checkbox
label="Automatically enable new groups discovered on future scans"
checked={autoEnableNewGroupsLive}
onChange={(event) =>
setAutoEnableNewGroupsLive(event.currentTarget.checked)
}
size="sm"
description="When disabled, new groups from the M3U source will be created but disabled by default. You can enable them manually later."
/>
<Flex gap="sm">
<TextInput
placeholder="Filter groups..."
@ -254,10 +313,10 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
placeholder="Select options..."
data={[
{
value: 'force_dummy_epg',
label: 'Force Dummy EPG',
value: 'force_epg',
label: 'Force EPG Source',
description:
'Assign a dummy EPG to all channels in this group if no EPG is matched',
'Force a specific EPG source for all auto-synced channels, or disable EPG assignment entirely',
},
{
value: 'group_override',
@ -295,12 +354,22 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
description:
'Assign a specific stream profile to all channels in this group during auto sync',
},
{
value: 'custom_logo',
label: 'Custom Logo',
description:
'Assign a custom logo to all auto-synced channels in this group',
},
]}
itemComponent={OptionWithTooltip}
value={(() => {
const selectedValues = [];
if (group.custom_properties?.force_dummy_epg) {
selectedValues.push('force_dummy_epg');
if (
group.custom_properties?.custom_epg_id !==
undefined ||
group.custom_properties?.force_dummy_epg
) {
selectedValues.push('force_epg');
}
if (
group.custom_properties?.group_override !==
@ -340,6 +409,12 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
) {
selectedValues.push('stream_profile_assignment');
}
if (
group.custom_properties?.custom_logo_id !==
undefined
) {
selectedValues.push('custom_logo');
}
return selectedValues;
})()}
onChange={(values) => {
@ -353,13 +428,25 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
...(state.custom_properties || {}),
};
// Handle force_dummy_epg
if (
selectedOptions.includes('force_dummy_epg')
) {
newCustomProps.force_dummy_epg = true;
// Handle force_epg
if (selectedOptions.includes('force_epg')) {
// Migrate from old force_dummy_epg if present
if (
newCustomProps.force_dummy_epg &&
newCustomProps.custom_epg_id === undefined
) {
// Migrate: force_dummy_epg=true becomes custom_epg_id=null
newCustomProps.custom_epg_id = null;
delete newCustomProps.force_dummy_epg;
} else if (
newCustomProps.custom_epg_id === undefined
) {
// New configuration: initialize with null (no EPG/default dummy)
newCustomProps.custom_epg_id = null;
}
} else {
delete newCustomProps.force_dummy_epg;
// Only remove custom_epg_id when deselected
delete newCustomProps.custom_epg_id;
}
// Handle group_override
@ -459,6 +546,17 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
delete newCustomProps.stream_profile_id;
}
// Handle custom_logo
if (selectedOptions.includes('custom_logo')) {
if (
newCustomProps.custom_logo_id === undefined
) {
newCustomProps.custom_logo_id = null;
}
} else {
delete newCustomProps.custom_logo_id;
}
return {
...state,
custom_properties: newCustomProps,
@ -785,6 +883,315 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
/>
</Tooltip>
)}
{/* Show logo selector only if custom_logo is selected */}
{group.custom_properties?.custom_logo_id !==
undefined && (
<Box>
<Group justify="space-between">
<Popover
opened={group.logoPopoverOpened || false}
onChange={(opened) => {
setGroupStates(
groupStates.map((state) => {
if (
state.channel_group ===
group.channel_group
) {
return {
...state,
logoPopoverOpened: opened,
};
}
return state;
})
);
if (opened) {
ensureLogosLoaded();
}
}}
withArrow
>
<Popover.Target>
<TextInput
label="Custom Logo"
readOnly
value={
channelLogos[
group.custom_properties?.custom_logo_id
]?.name || 'Default'
}
onClick={() => {
setGroupStates(
groupStates.map((state) => {
if (
state.channel_group ===
group.channel_group
) {
return {
...state,
logoPopoverOpened: true,
};
}
return {
...state,
logoPopoverOpened: false,
};
})
);
}}
size="xs"
/>
</Popover.Target>
<Popover.Dropdown
onMouseDown={(e) => e.stopPropagation()}
>
<Group>
<TextInput
placeholder="Filter logos..."
size="xs"
value={group.logoFilter || ''}
onChange={(e) => {
const val = e.currentTarget.value;
setGroupStates(
groupStates.map((state) =>
state.channel_group ===
group.channel_group
? {
...state,
logoFilter: val,
}
: state
)
);
}}
/>
{logosLoading && (
<Text size="xs" c="dimmed">
Loading...
</Text>
)}
</Group>
<ScrollArea style={{ height: 200 }}>
{(() => {
const logoOptions = [
{ id: '0', name: 'Default' },
...Object.values(channelLogos),
];
const filteredLogos = logoOptions.filter(
(logo) =>
logo.name
.toLowerCase()
.includes(
(
group.logoFilter || ''
).toLowerCase()
)
);
if (filteredLogos.length === 0) {
return (
<Center style={{ height: 200 }}>
<Text size="sm" c="dimmed">
{group.logoFilter
? 'No logos match your filter'
: 'No logos available'}
</Text>
</Center>
);
}
return (
<List
height={200}
itemCount={filteredLogos.length}
itemSize={55}
style={{ width: '100%' }}
>
{({ index, style }) => {
const logoItem = filteredLogos[index];
return (
<div
style={{
...style,
cursor: 'pointer',
padding: '5px',
borderRadius: '4px',
}}
onClick={() => {
setGroupStates(
groupStates.map((state) => {
if (
state.channel_group ===
group.channel_group
) {
return {
...state,
custom_properties: {
...state.custom_properties,
custom_logo_id:
logoItem.id,
},
logoPopoverOpened: false,
};
}
return state;
})
);
}}
onMouseEnter={(e) => {
e.currentTarget.style.backgroundColor =
'rgb(68, 68, 68)';
}}
onMouseLeave={(e) => {
e.currentTarget.style.backgroundColor =
'transparent';
}}
>
<Center
style={{
flexDirection: 'column',
gap: '2px',
}}
>
<img
src={
logoItem.cache_url || logo
}
height="30"
style={{
maxWidth: 80,
objectFit: 'contain',
}}
alt={logoItem.name || 'Logo'}
onError={(e) => {
if (e.target.src !== logo) {
e.target.src = logo;
}
}}
/>
<Text
size="xs"
c="dimmed"
ta="center"
style={{
maxWidth: 80,
overflow: 'hidden',
textOverflow: 'ellipsis',
whiteSpace: 'nowrap',
}}
>
{logoItem.name || 'Default'}
</Text>
</Center>
</div>
);
}}
</List>
);
})()}
</ScrollArea>
</Popover.Dropdown>
</Popover>
<Stack gap="xs" align="center">
<LazyLogo
logoId={group.custom_properties?.custom_logo_id}
alt="custom logo"
style={{ height: 40 }}
/>
</Stack>
</Group>
<Button
onClick={() => {
setCurrentEditingGroupId(group.channel_group);
setLogoModalOpen(true);
}}
fullWidth
variant="default"
size="xs"
mt="xs"
>
Upload or Create Logo
</Button>
</Box>
)}
{/* Show EPG selector when force_epg is selected */}
{(group.custom_properties?.custom_epg_id !== undefined ||
group.custom_properties?.force_dummy_epg) && (
<Tooltip
label="Force a specific EPG source for all auto-synced channels in this group. For dummy EPGs, all channels will share the same EPG data. For regular EPG sources (XMLTV, Schedules Direct), channels will be matched by their tvg_id within that source. Select 'No EPG' to disable EPG assignment."
withArrow
>
<Select
label="EPG Source"
placeholder="No EPG (Disabled)"
value={(() => {
// Handle migration from force_dummy_epg
if (
group.custom_properties?.custom_epg_id !==
undefined
) {
// Convert to string, use '0' for null/no EPG
return group.custom_properties.custom_epg_id ===
null
? '0'
: group.custom_properties.custom_epg_id.toString();
} else if (
group.custom_properties?.force_dummy_epg
) {
// Show "No EPG" for old force_dummy_epg configs
return '0';
}
return '0';
})()}
onChange={(value) => {
// Convert back: '0' means no EPG (null)
const newValue =
value === '0' ? null : parseInt(value);
setGroupStates(
groupStates.map((state) => {
if (
state.channel_group === group.channel_group
) {
return {
...state,
custom_properties: {
...state.custom_properties,
custom_epg_id: newValue,
},
};
}
return state;
})
);
}}
data={[
{ value: '0', label: 'No EPG (Disabled)' },
...epgSources.map((source) => ({
value: source.id.toString(),
label: `${source.name} (${
source.source_type === 'dummy'
? 'Dummy'
: source.source_type === 'xmltv'
? 'XMLTV'
: source.source_type ===
'schedules_direct'
? 'Schedules Direct'
: source.source_type
})`,
})),
]}
clearable
searchable
size="xs"
/>
</Tooltip>
)}
</>
)}
</Stack>
@ -792,6 +1199,16 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => {
))}
</SimpleGrid>
</Box>
{/* Logo Upload Modal */}
<LogoForm
isOpen={logoModalOpen}
onClose={() => {
setLogoModalOpen(false);
setCurrentEditingGroupId(null);
}}
onSuccess={handleLogoSuccess}
/>
</Stack>
);
};

View file

@ -55,6 +55,21 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => {
const [isLoading, setIsLoading] = useState(false);
const [movieCategoryStates, setMovieCategoryStates] = useState([]);
const [seriesCategoryStates, setSeriesCategoryStates] = useState([]);
const [autoEnableNewGroupsLive, setAutoEnableNewGroupsLive] = useState(true);
const [autoEnableNewGroupsVod, setAutoEnableNewGroupsVod] = useState(true);
const [autoEnableNewGroupsSeries, setAutoEnableNewGroupsSeries] =
useState(true);
useEffect(() => {
if (!playlist) return;
// Initialize account-level settings
setAutoEnableNewGroupsLive(playlist.auto_enable_new_groups_live ?? true);
setAutoEnableNewGroupsVod(playlist.auto_enable_new_groups_vod ?? true);
setAutoEnableNewGroupsSeries(
playlist.auto_enable_new_groups_series ?? true
);
}, [playlist]);
useEffect(() => {
if (Object.keys(channelGroups).length === 0) {
@ -116,6 +131,14 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => {
}))
.filter((state) => state.enabled !== state.original_enabled);
// Update account-level settings via the proper account endpoint
await API.updatePlaylist({
id: playlist.id,
auto_enable_new_groups_live: autoEnableNewGroupsLive,
auto_enable_new_groups_vod: autoEnableNewGroupsVod,
auto_enable_new_groups_series: autoEnableNewGroupsSeries,
});
// Update group settings via API endpoint
await API.updateM3UGroupSettings(
playlist.id,
@ -176,6 +199,8 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => {
playlist={playlist}
groupStates={groupStates}
setGroupStates={setGroupStates}
autoEnableNewGroupsLive={autoEnableNewGroupsLive}
setAutoEnableNewGroupsLive={setAutoEnableNewGroupsLive}
/>
</Tabs.Panel>
@ -185,6 +210,8 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => {
categoryStates={movieCategoryStates}
setCategoryStates={setMovieCategoryStates}
type="movie"
autoEnableNewGroups={autoEnableNewGroupsVod}
setAutoEnableNewGroups={setAutoEnableNewGroupsVod}
/>
</Tabs.Panel>
@ -194,6 +221,8 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => {
categoryStates={seriesCategoryStates}
setCategoryStates={setSeriesCategoryStates}
type="series"
autoEnableNewGroups={autoEnableNewGroupsSeries}
setAutoEnableNewGroups={setAutoEnableNewGroupsSeries}
/>
</Tabs.Panel>
</Tabs>

View file

@ -25,10 +25,22 @@ const Stream = ({ stream = null, isOpen, onClose }) => {
}),
onSubmit: async (values, { setSubmitting, resetForm }) => {
console.log(values);
// Convert string IDs back to integers for the API
const payload = {
...values,
channel_group: values.channel_group
? parseInt(values.channel_group, 10)
: null,
stream_profile_id: values.stream_profile_id
? parseInt(values.stream_profile_id, 10)
: null,
};
if (stream?.id) {
await API.updateStream({ id: stream.id, ...values });
await API.updateStream({ id: stream.id, ...payload });
} else {
await API.addStream(values);
await API.addStream(payload);
}
resetForm();
@ -42,12 +54,18 @@ const Stream = ({ stream = null, isOpen, onClose }) => {
formik.setValues({
name: stream.name,
url: stream.url,
channel_group: stream.channel_group,
stream_profile_id: stream.stream_profile_id,
// Convert IDs to strings to match Select component values
channel_group: stream.channel_group
? String(stream.channel_group)
: null,
stream_profile_id: stream.stream_profile_id
? String(stream.stream_profile_id)
: '',
});
} else {
formik.resetForm();
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [stream]);
if (!isOpen) {

View file

@ -10,6 +10,7 @@ import {
Text,
Divider,
Box,
Checkbox,
} from '@mantine/core';
import { CircleCheck, CircleX } from 'lucide-react';
import useVODStore from '../../store/useVODStore';
@ -19,6 +20,8 @@ const VODCategoryFilter = ({
categoryStates,
setCategoryStates,
type,
autoEnableNewGroups,
setAutoEnableNewGroups,
}) => {
const categories = useVODStore((s) => s.categories);
const [filter, setFilter] = useState('');
@ -85,6 +88,16 @@ const VODCategoryFilter = ({
return (
<Stack style={{ paddingTop: 10 }}>
<Checkbox
label={`Automatically enable new ${type === 'movie' ? 'movie' : 'series'} categories discovered on future scans`}
checked={autoEnableNewGroups}
onChange={(event) =>
setAutoEnableNewGroups(event.currentTarget.checked)
}
size="sm"
description="When disabled, new categories from the provider will be created but disabled by default. You can enable them manually later."
/>
<Flex gap="sm">
<TextInput
placeholder="Filter categories..."

View file

@ -307,6 +307,7 @@ const ChannelsTable = ({}) => {
const [channelToDelete, setChannelToDelete] = useState(null);
// Column sizing state for resizable columns
// Store in localStorage but with empty object as default
const [columnSizing, setColumnSizing] = useLocalStorage(
'channels-table-column-sizing',
{}
@ -882,7 +883,12 @@ const ChannelsTable = ({}) => {
),
},
],
[selectedProfileId, channelGroups, logos, theme, columnSizing]
// Note: columnSizing is intentionally excluded from dependencies to prevent
// columns from being recreated during drag operations (which causes infinite loops).
// The column.size values are only used for INITIAL sizing - TanStack Table manages
// the actual sizes through its own state after initialization.
// eslint-disable-next-line react-hooks/exhaustive-deps
[selectedProfileId, channelGroups, logos, theme]
);
const renderHeaderCell = (header) => {
@ -979,17 +985,18 @@ const ChannelsTable = ({}) => {
filters,
pagination,
sorting,
columnSizing,
setColumnSizing,
manualPagination: true,
manualSorting: true,
manualFiltering: true,
enableRowSelection: true,
onRowSelectionChange: onRowSelectionChange,
state: {
columnSizing,
pagination,
sorting,
},
onColumnSizingChange: setColumnSizing,
columnResizeMode: 'onChange',
getExpandedRowHeight: (row) => {
return 20 + 28 * row.original.streams.length;
},

View file

@ -2,6 +2,7 @@ import React, { useEffect, useMemo, useRef, useState } from 'react';
import API from '../../api';
import useEPGsStore from '../../store/epgs';
import EPGForm from '../forms/EPG';
import DummyEPGForm from '../forms/DummyEPG';
import { TableHelper } from '../../helpers';
import {
ActionIcon,
@ -17,6 +18,7 @@ import {
Progress,
Stack,
Group,
Menu,
} from '@mantine/core';
import { notifications } from '@mantine/notifications';
import {
@ -27,6 +29,7 @@ import {
SquareMinus,
SquarePen,
SquarePlus,
ChevronDown,
} from 'lucide-react';
import dayjs from 'dayjs';
import useSettingsStore from '../../store/settings';
@ -62,6 +65,7 @@ const getStatusColor = (status) => {
const RowActions = ({ tableSize, row, editEPG, deleteEPG, refreshEPG }) => {
const iconSize =
tableSize == 'default' ? 'sm' : tableSize == 'compact' ? 'xs' : 'md';
const isDummyEPG = row.original.source_type === 'dummy';
return (
<>
@ -88,7 +92,7 @@ const RowActions = ({ tableSize, row, editEPG, deleteEPG, refreshEPG }) => {
size={iconSize} // Use standardized icon size
color="blue.5" // Red color for delete actions
onClick={() => refreshEPG(row.original.id)}
disabled={!row.original.is_active}
disabled={!row.original.is_active || isDummyEPG}
>
<RefreshCcw size={tableSize === 'compact' ? 16 : 18} />{' '}
{/* Small icon size */}
@ -100,6 +104,7 @@ const RowActions = ({ tableSize, row, editEPG, deleteEPG, refreshEPG }) => {
const EPGsTable = () => {
const [epg, setEPG] = useState(null);
const [epgModalOpen, setEPGModalOpen] = useState(false);
const [dummyEpgModalOpen, setDummyEpgModalOpen] = useState(false);
const [rowSelection, setRowSelection] = useState([]);
const [confirmDeleteOpen, setConfirmDeleteOpen] = useState(false);
const [deleteTarget, setDeleteTarget] = useState(null);
@ -224,11 +229,14 @@ const EPGsTable = () => {
size: 100,
cell: ({ row }) => {
const data = row.original;
const isDummyEPG = data.source_type === 'dummy';
// Dummy EPGs always show idle status
const displayStatus = isDummyEPG ? 'idle' : data.status;
// Always show status text, even when there's progress happening
return (
<Text size="sm" fw={500} c={getStatusColor(data.status)}>
{formatStatusText(data.status)}
<Text size="sm" fw={500} c={getStatusColor(displayStatus)}>
{formatStatusText(displayStatus)}
</Text>
);
},
@ -241,6 +249,12 @@ const EPGsTable = () => {
grow: true,
cell: ({ row }) => {
const data = row.original;
const isDummyEPG = data.source_type === 'dummy';
// Dummy EPGs don't have status messages
if (isDummyEPG) {
return null;
}
// Check if there's an active progress for this EPG - show progress first if active
if (
@ -305,15 +319,19 @@ const EPGsTable = () => {
mantineTableBodyCellProps: {
align: 'left',
},
cell: ({ row, cell }) => (
<Box sx={{ display: 'flex', justifyContent: 'center' }}>
<Switch
size="xs"
checked={cell.getValue()}
onChange={() => toggleActive(row.original)}
/>
</Box>
),
cell: ({ row, cell }) => {
const isDummyEPG = row.original.source_type === 'dummy';
return (
<Box sx={{ display: 'flex', justifyContent: 'center' }}>
<Switch
size="xs"
checked={cell.getValue()}
onChange={() => toggleActive(row.original)}
disabled={isDummyEPG}
/>
</Box>
);
},
},
{
id: 'actions',
@ -329,9 +347,24 @@ const EPGsTable = () => {
const editEPG = async (epg = null) => {
setEPG(epg);
// Open the appropriate modal based on source type
if (epg?.source_type === 'dummy') {
setDummyEpgModalOpen(true);
} else {
setEPGModalOpen(true);
}
};
const createStandardEPG = () => {
setEPG(null);
setEPGModalOpen(true);
};
const createDummyEPG = () => {
setEPG(null);
setDummyEpgModalOpen(true);
};
const deleteEPG = async (id) => {
// Get EPG details for the confirmation dialog
const epgObj = epgs[id];
@ -365,6 +398,11 @@ const EPGsTable = () => {
setEPGModalOpen(false);
};
const closeDummyEPGForm = () => {
setEPG(null);
setDummyEpgModalOpen(false);
};
useEffect(() => {
setData(
Object.values(epgs).sort((a, b) => {
@ -522,21 +560,31 @@ const EPGsTable = () => {
>
EPGs
</Text>
<Button
leftSection={<SquarePlus size={18} />}
variant="light"
size="xs"
onClick={() => editEPG()}
p={5}
color="green"
style={{
borderWidth: '1px',
borderColor: 'green',
color: 'white',
}}
>
Add EPG
</Button>
<Menu shadow="md" width={200}>
<Menu.Target>
<Button
leftSection={<SquarePlus size={18} />}
rightSection={<ChevronDown size={16} />}
variant="light"
size="xs"
p={5}
color="green"
style={{
borderWidth: '1px',
borderColor: 'green',
color: 'white',
}}
>
Add EPG
</Button>
</Menu.Target>
<Menu.Dropdown>
<Menu.Item onClick={createStandardEPG}>
Standard EPG Source
</Menu.Item>
<Menu.Item onClick={createDummyEPG}>Dummy EPG Source</Menu.Item>
</Menu.Dropdown>
</Menu>
</Flex>
<Paper
@ -579,6 +627,11 @@ const EPGsTable = () => {
</Box>
<EPGForm epg={epg} isOpen={epgModalOpen} onClose={closeEPGForm} />
<DummyEPGForm
epg={epg}
isOpen={dummyEpgModalOpen}
onClose={closeDummyEPGForm}
/>
<ConfirmationDialog
opened={confirmDeleteOpen}

View file

@ -3,7 +3,6 @@ import API from '../../api';
import StreamForm from '../forms/Stream';
import usePlaylistsStore from '../../store/playlists';
import useChannelsStore from '../../store/channels';
import useLogosStore from '../../store/logos';
import { copyToClipboard, useDebounce } from '../../utils';
import {
SquarePlus,
@ -51,6 +50,7 @@ import useChannelsTableStore from '../../store/channelsTable';
import useWarningsStore from '../../store/warnings';
import { CustomTable, useTable } from './CustomTable';
import useLocalStorage from '../../hooks/useLocalStorage';
import ConfirmationDialog from '../ConfirmationDialog';
const StreamRowActions = ({
theme,
@ -66,7 +66,6 @@ const StreamRowActions = ({
(state) =>
state.channels.find((chan) => chan.id === selectedChannelIds[0])?.streams
);
const fetchLogos = useLogosStore((s) => s.fetchLogos);
const addStreamToChannel = async () => {
await API.updateChannel({
@ -200,6 +199,12 @@ const StreamsTable = () => {
const [rememberSingleChoice, setRememberSingleChoice] = useState(false);
const [currentStreamForChannel, setCurrentStreamForChannel] = useState(null);
// Confirmation dialog state
const [confirmDeleteOpen, setConfirmDeleteOpen] = useState(false);
const [deleteTarget, setDeleteTarget] = useState(null);
const [streamToDelete, setStreamToDelete] = useState(null);
const [isBulkDelete, setIsBulkDelete] = useState(false);
// const [allRowsSelected, setAllRowsSelected] = useState(false);
// Add local storage for page size
@ -243,7 +248,6 @@ const StreamsTable = () => {
const channelGroups = useChannelsStore((s) => s.channelGroups);
const selectedChannelIds = useChannelsTableStore((s) => s.selectedChannelIds);
const fetchLogos = useChannelsStore((s) => s.fetchLogos);
const channelSelectionStreams = useChannelsTableStore(
(state) =>
state.channels.find((chan) => chan.id === selectedChannelIds[0])?.streams
@ -280,15 +284,17 @@ const StreamsTable = () => {
grow: true,
size: columnSizing.name || 200,
cell: ({ getValue }) => (
<Box
style={{
whiteSpace: 'pre',
overflow: 'hidden',
textOverflow: 'ellipsis',
}}
>
{getValue()}
</Box>
<Tooltip label={getValue()} openDelay={500}>
<Box
style={{
whiteSpace: 'pre',
overflow: 'hidden',
textOverflow: 'ellipsis',
}}
>
{getValue()}
</Box>
</Tooltip>
),
},
{
@ -299,15 +305,17 @@ const StreamsTable = () => {
: '',
size: columnSizing.group || 150,
cell: ({ getValue }) => (
<Box
style={{
whiteSpace: 'pre',
overflow: 'hidden',
textOverflow: 'ellipsis',
}}
>
{getValue()}
</Box>
<Tooltip label={getValue()} openDelay={500}>
<Box
style={{
whiteSpace: 'pre',
overflow: 'hidden',
textOverflow: 'ellipsis',
}}
>
{getValue()}
</Box>
</Tooltip>
),
},
{
@ -316,17 +324,17 @@ const StreamsTable = () => {
accessorFn: (row) =>
playlists.find((playlist) => playlist.id === row.m3u_account)?.name,
cell: ({ getValue }) => (
<Box
style={{
whiteSpace: 'nowrap',
overflow: 'hidden',
textOverflow: 'ellipsis',
}}
>
<Tooltip label={getValue()} openDelay={500}>
<Box>{getValue()}</Box>
</Tooltip>
</Box>
<Tooltip label={getValue()} openDelay={500}>
<Box
style={{
whiteSpace: 'nowrap',
overflow: 'hidden',
textOverflow: 'ellipsis',
}}
>
{getValue()}
</Box>
</Tooltip>
),
},
],
@ -510,13 +518,49 @@ const StreamsTable = () => {
};
const deleteStream = async (id) => {
// Get stream details for the confirmation dialog
const streamObj = data.find((s) => s.id === id);
setStreamToDelete(streamObj);
setDeleteTarget(id);
setIsBulkDelete(false);
// Skip warning if it's been suppressed
if (isWarningSuppressed('delete-stream')) {
return executeDeleteStream(id);
}
setConfirmDeleteOpen(true);
};
const executeDeleteStream = async (id) => {
await API.deleteStream(id);
fetchData();
// Clear the selection for the deleted stream
setSelectedStreamIds([]);
table.setSelectedTableIds([]);
setConfirmDeleteOpen(false);
};
const deleteStreams = async () => {
setIsBulkDelete(true);
setStreamToDelete(null);
// Skip warning if it's been suppressed
if (isWarningSuppressed('delete-streams')) {
return executeDeleteStreams();
}
setConfirmDeleteOpen(true);
};
const executeDeleteStreams = async () => {
setIsLoading(true);
await API.deleteStreams(selectedStreamIds);
setIsLoading(false);
fetchData();
setSelectedStreamIds([]);
table.setSelectedTableIds([]);
setConfirmDeleteOpen(false);
};
const closeStreamForm = () => {
@ -831,8 +875,14 @@ const StreamsTable = () => {
}}
>
{/* Top toolbar with Remove, Assign, Auto-match, and Add buttons */}
<Group justify="space-between" style={{ paddingLeft: 10 }}>
<Box>
<Flex
justify="space-between"
align="center"
wrap="nowrap"
style={{ padding: 10 }}
gap={6}
>
<Flex gap={6} wrap="nowrap" style={{ flexShrink: 0 }}>
<Button
leftSection={<SquarePlus size={18} />}
variant={
@ -866,55 +916,47 @@ const StreamsTable = () => {
>
Add Streams to Channel
</Button>
</Box>
<Box
style={{
display: 'flex',
justifyContent: 'flex-end',
padding: 10,
}}
>
<Flex gap={6}>
<Button
leftSection={<SquareMinus size={18} />}
variant="default"
size="xs"
onClick={deleteStreams}
disabled={selectedStreamIds.length == 0}
>
Remove
</Button>
<Button
leftSection={<SquarePlus size={18} />}
variant="default"
size="xs"
onClick={createChannelsFromStreams}
p={5}
disabled={selectedStreamIds.length == 0}
>
{`Create Channels (${selectedStreamIds.length})`}
</Button>
</Flex>
<Button
leftSection={<SquarePlus size={18} />}
variant="default"
size="xs"
onClick={createChannelsFromStreams}
p={5}
disabled={selectedStreamIds.length == 0}
>
{`Create Channels (${selectedStreamIds.length})`}
</Button>
<Flex gap={6} wrap="nowrap" style={{ flexShrink: 0 }}>
<Button
leftSection={<SquarePlus size={18} />}
variant="light"
size="xs"
onClick={() => editStream()}
p={5}
color={theme.tailwind.green[5]}
style={{
borderWidth: '1px',
borderColor: theme.tailwind.green[5],
color: 'white',
}}
>
Create Stream
</Button>
<Button
leftSection={<SquarePlus size={18} />}
variant="light"
size="xs"
onClick={() => editStream()}
p={5}
color={theme.tailwind.green[5]}
style={{
borderWidth: '1px',
borderColor: theme.tailwind.green[5],
color: 'white',
}}
>
Create Stream
</Button>
</Flex>
</Box>
</Group>
<Button
leftSection={<SquareMinus size={18} />}
variant="default"
size="xs"
onClick={deleteStreams}
disabled={selectedStreamIds.length == 0}
>
Remove
</Button>
</Flex>
</Flex>
{initialDataCount === 0 && (
<Center style={{ paddingTop: 20 }}>
@ -1175,6 +1217,39 @@ const StreamsTable = () => {
</Group>
</Stack>
</Modal>
<ConfirmationDialog
opened={confirmDeleteOpen}
onClose={() => setConfirmDeleteOpen(false)}
onConfirm={() =>
isBulkDelete
? executeDeleteStreams()
: executeDeleteStream(deleteTarget)
}
title={`Confirm ${isBulkDelete ? 'Bulk ' : ''}Stream Deletion`}
message={
isBulkDelete ? (
`Are you sure you want to delete ${selectedStreamIds.length} stream${selectedStreamIds.length !== 1 ? 's' : ''}? This action cannot be undone.`
) : streamToDelete ? (
<div style={{ whiteSpace: 'pre-line' }}>
{`Are you sure you want to delete the following stream?
Name: ${streamToDelete.name}
${streamToDelete.channel_group ? `Group: ${channelGroups[streamToDelete.channel_group]?.name || 'Unknown'}` : ''}
${streamToDelete.m3u_account ? `M3U Account: ${playlists.find((p) => p.id === streamToDelete.m3u_account)?.name || 'Unknown'}` : ''}
This action cannot be undone.`}
</div>
) : (
'Are you sure you want to delete this stream? This action cannot be undone.'
)
}
confirmLabel="Delete"
cancelLabel="Cancel"
actionKey={isBulkDelete ? 'delete-streams' : 'delete-stream'}
onSuppressChange={suppressWarning}
size="md"
/>
</>
);
};

View file

@ -250,6 +250,7 @@ export default function TVChannelGuide({ startDate, endDate }) {
const logos = useLogosStore((s) => s.logos);
const tvgsById = useEPGsStore((s) => s.tvgsById);
const epgs = useEPGsStore((s) => s.epgs);
const [programs, setPrograms] = useState([]);
const [guideChannels, setGuideChannels] = useState([]);
@ -400,8 +401,8 @@ export default function TVChannelGuide({ startDate, endDate }) {
: defaultEnd;
const channelIdByTvgId = useMemo(
() => buildChannelIdMap(guideChannels, tvgsById),
[guideChannels, tvgsById]
() => buildChannelIdMap(guideChannels, tvgsById, epgs),
[guideChannels, tvgsById, epgs]
);
const channelById = useMemo(() => {
@ -1476,6 +1477,7 @@ export default function TVChannelGuide({ startDate, endDate }) {
{filteredChannels.length > 0 ? (
<VariableSizeList
className="guide-list-outer"
height={virtualizedHeight}
width={virtualizedWidth}
itemCount={filteredChannels.length}

View file

@ -192,6 +192,7 @@ const SettingsPage = () => {
useState([]);
const [proxySettingsSaved, setProxySettingsSaved] = useState(false);
const [generalSettingsSaved, setGeneralSettingsSaved] = useState(false);
const [rehashingStreams, setRehashingStreams] = useState(false);
const [rehashSuccess, setRehashSuccess] = useState(false);
const [rehashConfirmOpen, setRehashConfirmOpen] = useState(false);
@ -277,14 +278,16 @@ const SettingsPage = () => {
const networkAccessForm = useForm({
mode: 'controlled',
initialValues: Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => {
acc[key] = '0.0.0.0/0';
acc[key] = '0.0.0.0/0,::0/0';
return acc;
}, {}),
validate: Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => {
acc[key] = (value) => {
const cidrs = value.split(',');
const ipv4CidrRegex = /^([0-9]{1,3}\.){3}[0-9]{1,3}\/\d+$/;
const ipv6CidrRegex = /(?:(?:(?:[A-F0-9]{1,4}:){6}|(?=(?:[A-F0-9]{0,4}:){0,6}(?:[0-9]{1,3}\.){3}[0-9]{1,3}(?![:.\w]))(([0-9A-F]{1,4}:){0,5}|:)((:[0-9A-F]{1,4}){1,5}:|:)|::(?:[A-F0-9]{1,4}:){5})(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|(?:[A-F0-9]{1,4}:){7}[A-F0-9]{1,4}|(?=(?:[A-F0-9]{0,4}:){0,7}[A-F0-9]{0,4}(?![:.\w]))(([0-9A-F]{1,4}:){1,7}|:)((:[0-9A-F]{1,4}){1,7}|:)|(?:[A-F0-9]{1,4}:){7}:|:(:[A-F0-9]{1,4}){7})(?![:.\w])\/(?:12[0-8]|1[01][0-9]|[1-9]?[0-9])/;
for (const cidr of cidrs) {
if (cidr.match(/^([0-9]{1,3}\.){3}[0-9]{1,3}\/\d+$/)) {
if (cidr.match(ipv4CidrRegex) || cidr.match(ipv6CidrRegex)) {
continue;
}
@ -322,7 +325,8 @@ const SettingsPage = () => {
let val = null;
switch (key) {
case 'm3u-hash-key':
val = value.value.split(',');
// Split comma-separated string, filter out empty strings
val = value.value ? value.value.split(',').filter((v) => v) : [];
break;
case 'dvr-pre-offset-minutes':
case 'dvr-post-offset-minutes':
@ -353,7 +357,7 @@ const SettingsPage = () => {
);
networkAccessForm.setValues(
Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => {
acc[key] = networkAccessSettings[key] || '0.0.0.0/0';
acc[key] = networkAccessSettings[key] || '0.0.0.0/0,::0/0';
return acc;
}, {})
);
@ -400,7 +404,17 @@ const SettingsPage = () => {
loadComskipConfig();
}, []);
// Clear success states when switching accordion panels
useEffect(() => {
setGeneralSettingsSaved(false);
setProxySettingsSaved(false);
setNetworkAccessSaved(false);
setRehashSuccess(false);
}, [accordianValue]);
const onSubmit = async () => {
setGeneralSettingsSaved(false);
const values = form.getValues();
const changedSettings = {};
let m3uHashKeyChanged = false;
@ -408,12 +422,26 @@ const SettingsPage = () => {
for (const settingKey in values) {
// Only compare against existing value if the setting exists
const existing = settings[settingKey];
// Convert array values (like m3u-hash-key) to comma-separated strings
let stringValue;
if (Array.isArray(values[settingKey])) {
stringValue = values[settingKey].join(',');
} else {
stringValue = `${values[settingKey]}`;
}
// Skip empty values to avoid validation errors
if (!stringValue) {
continue;
}
if (!existing) {
// Create new setting on save
changedSettings[settingKey] = `${values[settingKey]}`;
} else if (String(values[settingKey]) !== String(existing.value)) {
changedSettings[settingKey] = stringValue;
} else if (stringValue !== String(existing.value)) {
// If the user changed the setting's value from what's in the DB:
changedSettings[settingKey] = `${values[settingKey]}`;
changedSettings[settingKey] = stringValue;
// Check if M3U hash key was changed
if (settingKey === 'm3u-hash-key') {
@ -432,20 +460,36 @@ const SettingsPage = () => {
}
// Update each changed setting in the backend (create if missing)
for (const updatedKey in changedSettings) {
const existing = settings[updatedKey];
if (existing && existing.id) {
await API.updateSetting({
...existing,
value: changedSettings[updatedKey],
});
} else {
await API.createSetting({
key: updatedKey,
name: updatedKey.replace(/-/g, ' '),
value: changedSettings[updatedKey],
});
try {
for (const updatedKey in changedSettings) {
const existing = settings[updatedKey];
if (existing && existing.id) {
const result = await API.updateSetting({
...existing,
value: changedSettings[updatedKey],
});
// API functions return undefined on error
if (!result) {
throw new Error('Failed to update setting');
}
} else {
const result = await API.createSetting({
key: updatedKey,
name: updatedKey.replace(/-/g, ' '),
value: changedSettings[updatedKey],
});
// API functions return undefined on error
if (!result) {
throw new Error('Failed to create setting');
}
}
}
setGeneralSettingsSaved(true);
} catch (error) {
// Error notifications are already shown by API functions
// Just don't show the success message
console.error('Error saving settings:', error);
}
};
@ -475,12 +519,19 @@ const SettingsPage = () => {
const onProxySettingsSubmit = async () => {
setProxySettingsSaved(false);
await API.updateSetting({
...settings['proxy-settings'],
value: JSON.stringify(proxySettingsForm.getValues()),
});
setProxySettingsSaved(true);
try {
const result = await API.updateSetting({
...settings['proxy-settings'],
value: JSON.stringify(proxySettingsForm.getValues()),
});
// API functions return undefined on error
if (result) {
setProxySettingsSaved(true);
}
} catch (error) {
// Error notifications are already shown by API functions
console.error('Error saving proxy settings:', error);
}
};
const onComskipUpload = async () => {
@ -568,29 +619,46 @@ const SettingsPage = () => {
const executeSettingsSaveAndRehash = async () => {
setRehashConfirmOpen(false);
setGeneralSettingsSaved(false);
// Use the stored pending values that were captured before the dialog was shown
const changedSettings = pendingChangedSettings || {};
// Update each changed setting in the backend (create if missing)
for (const updatedKey in changedSettings) {
const existing = settings[updatedKey];
if (existing && existing.id) {
await API.updateSetting({
...existing,
value: changedSettings[updatedKey],
});
} else {
await API.createSetting({
key: updatedKey,
name: updatedKey.replace(/-/g, ' '),
value: changedSettings[updatedKey],
});
try {
for (const updatedKey in changedSettings) {
const existing = settings[updatedKey];
if (existing && existing.id) {
const result = await API.updateSetting({
...existing,
value: changedSettings[updatedKey],
});
// API functions return undefined on error
if (!result) {
throw new Error('Failed to update setting');
}
} else {
const result = await API.createSetting({
key: updatedKey,
name: updatedKey.replace(/-/g, ' '),
value: changedSettings[updatedKey],
});
// API functions return undefined on error
if (!result) {
throw new Error('Failed to create setting');
}
}
}
}
// Clear the pending values
setPendingChangedSettings(null);
// Clear the pending values
setPendingChangedSettings(null);
setGeneralSettingsSaved(true);
} catch (error) {
// Error notifications are already shown by API functions
// Just don't show the success message
console.error('Error saving settings:', error);
setPendingChangedSettings(null);
}
};
const executeRehashStreamsOnly = async () => {
@ -711,6 +779,13 @@ const SettingsPage = () => {
<Accordion.Panel>
<form onSubmit={form.onSubmit(onSubmit)}>
<Stack gap="sm">
{generalSettingsSaved && (
<Alert
variant="light"
color="green"
title="Saved Successfully"
/>
)}
<Switch
label="Enable Comskip (remove commercials after recording)"
{...form.getInputProps('dvr-comskip-enabled', {
@ -874,6 +949,13 @@ const SettingsPage = () => {
<Accordion.Control>Stream Settings</Accordion.Control>
<Accordion.Panel>
<form onSubmit={form.onSubmit(onSubmit)}>
{generalSettingsSaved && (
<Alert
variant="light"
color="green"
title="Saved Successfully"
/>
)}
<Select
searchable
{...form.getInputProps('default-user-agent')}

View file

@ -67,3 +67,14 @@
.tv-guide {
position: relative;
}
/* Hide bottom horizontal scrollbar for the guide's virtualized list only */
.tv-guide .guide-list-outer {
/* Prevent horizontal page scrollbar while preserving internal scroll behavior */
overflow-x: hidden !important;
}
/* Also hide scrollbars visually across browsers for the outer container */
.tv-guide .guide-list-outer::-webkit-scrollbar {
height: 0px;
}

View file

@ -3,13 +3,30 @@ import dayjs from 'dayjs';
export const PROGRAM_HEIGHT = 90;
export const EXPANDED_PROGRAM_HEIGHT = 180;
export function buildChannelIdMap(channels, tvgsById) {
export function buildChannelIdMap(channels, tvgsById, epgs = {}) {
const map = new Map();
channels.forEach((channel) => {
const tvgRecord = channel.epg_data_id
? tvgsById[channel.epg_data_id]
: null;
const tvgId = tvgRecord?.tvg_id ?? channel.uuid;
// For dummy EPG sources, ALWAYS use channel UUID to ensure unique programs per channel
// This prevents multiple channels with the same dummy EPG from showing identical data
let tvgId;
if (tvgRecord?.epg_source) {
const epgSource = epgs[tvgRecord.epg_source];
if (epgSource?.source_type === 'dummy') {
// Dummy EPG: use channel UUID for uniqueness
tvgId = channel.uuid;
} else {
// Regular EPG: use tvg_id from EPG data, or fall back to channel UUID
tvgId = tvgRecord.tvg_id ?? channel.uuid;
}
} else {
// No EPG data: use channel UUID
tvgId = channel.uuid;
}
if (tvgId) {
const tvgKey = String(tvgId);
if (!map.has(tvgKey)) {

View file

@ -16,6 +16,7 @@ django-cors-headers
djangorestframework-simplejwt
m3u8
rapidfuzz==3.13.0
regex # Required by transformers but also used for advanced regex features
tzlocal
# PyTorch dependencies (CPU only)