From a959ba17482e4a01f1a1ce51defe68048f2cd962 Mon Sep 17 00:00:00 2001 From: 3l3m3nt Date: Tue, 7 Oct 2025 20:02:47 +1300 Subject: [PATCH 01/45] Fix: Add IPv6 CIDR validation in Settings. Add ::0/0 as a default as well as 0.0.0.0/0 --- frontend/src/pages/Settings.jsx | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/frontend/src/pages/Settings.jsx b/frontend/src/pages/Settings.jsx index fb1f9184..ac481ffa 100644 --- a/frontend/src/pages/Settings.jsx +++ b/frontend/src/pages/Settings.jsx @@ -94,14 +94,16 @@ const SettingsPage = () => { const networkAccessForm = useForm({ mode: 'controlled', initialValues: Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => { - acc[key] = '0.0.0.0/0'; + acc[key] = '0.0.0.0/0,::0/0'; return acc; }, {}), validate: Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => { acc[key] = (value) => { const cidrs = value.split(','); + const ipv4CidrRegex = /^([0-9]{1,3}\.){3}[0-9]{1,3}\/\d+$/; + const ipv6CidrRegex = /(?:(?:(?:[A-F0-9]{1,4}:){6}|(?=(?:[A-F0-9]{0,4}:){0,6}(?:[0-9]{1,3}\.){3}[0-9]{1,3}(?![:.\w]))(([0-9A-F]{1,4}:){0,5}|:)((:[0-9A-F]{1,4}){1,5}:|:)|::(?:[A-F0-9]{1,4}:){5})(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|(?:[A-F0-9]{1,4}:){7}[A-F0-9]{1,4}|(?=(?:[A-F0-9]{0,4}:){0,7}[A-F0-9]{0,4}(?![:.\w]))(([0-9A-F]{1,4}:){1,7}|:)((:[0-9A-F]{1,4}){1,7}|:)|(?:[A-F0-9]{1,4}:){7}:|:(:[A-F0-9]{1,4}){7})(?![:.\w])\/(?:12[0-8]|1[01][0-9]|[1-9]?[0-9])/; for (const cidr of cidrs) { - if (cidr.match(/^([0-9]{1,3}\.){3}[0-9]{1,3}\/\d+$/)) { + if (cidr.match(ipv4CidrRegex) || cidr.match(ipv6CidrRegex)) { continue; } @@ -164,7 +166,7 @@ const SettingsPage = () => { ); networkAccessForm.setValues( Object.keys(NETWORK_ACCESS_OPTIONS).reduce((acc, key) => { - acc[key] = networkAccessSettings[key] || '0.0.0.0/0'; + acc[key] = networkAccessSettings[key] || '0.0.0.0/0,::0/0'; return acc; }, {}) ); From 951af5f3fb01eb30eaacbcb9a7a3f2132b488291 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 9 Oct 2025 15:28:37 -0500 Subject: [PATCH 02/45] Enhancement: Add auto-enable settings for new groups and categories in M3U and VOD components Bug Fix: Remove orphaned categories for VOD and Series Fixes #540 Closes #208 --- apps/m3u/serializers.py | 43 ++++++++--- apps/m3u/tasks.py | 62 ++++++--------- apps/vod/tasks.py | 76 ++++++++++++++++--- .../src/components/forms/LiveGroupFilter.jsx | 18 ++++- .../src/components/forms/M3UGroupFilter.jsx | 29 +++++++ .../components/forms/VODCategoryFilter.jsx | 13 ++++ 6 files changed, 183 insertions(+), 58 deletions(-) diff --git a/apps/m3u/serializers.py b/apps/m3u/serializers.py index 05462d0f..a607dc07 100644 --- a/apps/m3u/serializers.py +++ b/apps/m3u/serializers.py @@ -136,6 +136,9 @@ class M3UAccountSerializer(serializers.ModelSerializer): validators=[validate_flexible_url], ) enable_vod = serializers.BooleanField(required=False, write_only=True) + auto_enable_new_groups_live = serializers.BooleanField(required=False, write_only=True) + auto_enable_new_groups_vod = serializers.BooleanField(required=False, write_only=True) + auto_enable_new_groups_series = serializers.BooleanField(required=False, write_only=True) class Meta: model = M3UAccount @@ -164,6 +167,9 @@ class M3UAccountSerializer(serializers.ModelSerializer): "status", "last_message", "enable_vod", + "auto_enable_new_groups_live", + "auto_enable_new_groups_vod", + "auto_enable_new_groups_series", ] extra_kwargs = { "password": { @@ -175,23 +181,36 @@ class M3UAccountSerializer(serializers.ModelSerializer): def to_representation(self, instance): data = super().to_representation(instance) - # Parse custom_properties to get VOD preference + # Parse custom_properties to get VOD preference and auto_enable_new_groups settings custom_props = instance.custom_properties or {} data["enable_vod"] = custom_props.get("enable_vod", False) + data["auto_enable_new_groups_live"] = custom_props.get("auto_enable_new_groups_live", True) + data["auto_enable_new_groups_vod"] = custom_props.get("auto_enable_new_groups_vod", True) + data["auto_enable_new_groups_series"] = custom_props.get("auto_enable_new_groups_series", True) return data def update(self, instance, validated_data): - # Handle enable_vod preference + # Handle enable_vod preference and auto_enable_new_groups settings enable_vod = validated_data.pop("enable_vod", None) + auto_enable_new_groups_live = validated_data.pop("auto_enable_new_groups_live", None) + auto_enable_new_groups_vod = validated_data.pop("auto_enable_new_groups_vod", None) + auto_enable_new_groups_series = validated_data.pop("auto_enable_new_groups_series", None) + # Get existing custom_properties + custom_props = instance.custom_properties or {} + + # Update preferences if enable_vod is not None: - # Get existing custom_properties - custom_props = instance.custom_properties or {} - - # Update VOD preference custom_props["enable_vod"] = enable_vod - validated_data["custom_properties"] = custom_props + if auto_enable_new_groups_live is not None: + custom_props["auto_enable_new_groups_live"] = auto_enable_new_groups_live + if auto_enable_new_groups_vod is not None: + custom_props["auto_enable_new_groups_vod"] = auto_enable_new_groups_vod + if auto_enable_new_groups_series is not None: + custom_props["auto_enable_new_groups_series"] = auto_enable_new_groups_series + + validated_data["custom_properties"] = custom_props # Pop out channel group memberships so we can handle them manually channel_group_data = validated_data.pop("channel_group", []) @@ -225,14 +244,20 @@ class M3UAccountSerializer(serializers.ModelSerializer): return instance def create(self, validated_data): - # Handle enable_vod preference during creation + # Handle enable_vod preference and auto_enable_new_groups settings during creation enable_vod = validated_data.pop("enable_vod", False) + auto_enable_new_groups_live = validated_data.pop("auto_enable_new_groups_live", True) + auto_enable_new_groups_vod = validated_data.pop("auto_enable_new_groups_vod", True) + auto_enable_new_groups_series = validated_data.pop("auto_enable_new_groups_series", True) # Parse existing custom_properties or create new custom_props = validated_data.get("custom_properties", {}) - # Set VOD preference + # Set preferences (default to True for auto_enable_new_groups) custom_props["enable_vod"] = enable_vod + custom_props["auto_enable_new_groups_live"] = auto_enable_new_groups_live + custom_props["auto_enable_new_groups_vod"] = auto_enable_new_groups_vod + custom_props["auto_enable_new_groups_series"] = auto_enable_new_groups_series validated_data["custom_properties"] = custom_props return super().create(validated_data) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 0ba595c5..593b2704 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -488,25 +488,29 @@ def process_groups(account, groups): } logger.info(f"Currently {len(existing_groups)} existing groups") - group_objs = [] + # Check if we should auto-enable new groups based on account settings + account_custom_props = account.custom_properties or {} + auto_enable_new_groups_live = account_custom_props.get("auto_enable_new_groups_live", True) + + # Separate existing groups from groups that need to be created + existing_group_objs = [] groups_to_create = [] + for group_name, custom_props in groups.items(): - logger.debug(f"Handling group for M3U account {account.id}: {group_name}") - - if group_name not in existing_groups: - groups_to_create.append( - ChannelGroup( - name=group_name, - ) - ) + if group_name in existing_groups: + existing_group_objs.append(existing_groups[group_name]) else: - group_objs.append(existing_groups[group_name]) + groups_to_create.append(ChannelGroup(name=group_name)) + # Create new groups and fetch them back with IDs + newly_created_group_objs = [] if groups_to_create: - logger.debug(f"Creating {len(groups_to_create)} groups") - created = ChannelGroup.bulk_create_and_fetch(groups_to_create) - logger.debug(f"Created {len(created)} groups") - group_objs.extend(created) + logger.info(f"Creating {len(groups_to_create)} new groups for account {account.id}") + newly_created_group_objs = list(ChannelGroup.bulk_create_and_fetch(groups_to_create)) + logger.debug(f"Successfully created {len(newly_created_group_objs)} new groups") + + # Combine all groups + all_group_objs = existing_group_objs + newly_created_group_objs # Get existing relationships for this account existing_relationships = { @@ -536,7 +540,7 @@ def process_groups(account, groups): relations_to_delete.append(rel) logger.debug(f"Marking relationship for deletion: group '{group_name}' no longer exists in source for account {account.id}") - for group in group_objs: + for group in all_group_objs: custom_props = groups.get(group.name, {}) if group.name in existing_relationships: @@ -566,35 +570,17 @@ def process_groups(account, groups): else: logger.debug(f"xc_id unchanged for group '{group.name}' - account {account.id}") else: - # Create new relationship - but check if there's an existing relationship that might have user settings - # This can happen if the group was temporarily removed and is now back - try: - potential_existing = ChannelGroupM3UAccount.objects.filter( - m3u_account=account, - channel_group=group - ).first() + # Create new relationship - this group is new to this M3U account + # Use the auto_enable setting to determine if it should start enabled + if not auto_enable_new_groups_live: + logger.info(f"Group '{group.name}' is new to account {account.id} - creating relationship but DISABLED (auto_enable_new_groups_live=False)") - if potential_existing: - # Merge with existing custom properties to preserve user settings - existing_custom_props = potential_existing.custom_properties or {} - - # Merge new properties with existing ones - merged_custom_props = existing_custom_props.copy() - merged_custom_props.update(custom_props) - custom_props = merged_custom_props - logger.debug(f"Merged custom properties for existing relationship: group '{group.name}' - account {account.id}") - except Exception as e: - logger.debug(f"Could not check for existing relationship: {str(e)}") - # Fall back to using just the new custom properties - pass - - # Create new relationship relations_to_create.append( ChannelGroupM3UAccount( channel_group=group, m3u_account=account, custom_properties=custom_props, - enabled=True, # Default to enabled + enabled=auto_enable_new_groups_live, ) ) diff --git a/apps/vod/tasks.py b/apps/vod/tasks.py index 1a2e51ca..bc8ad80f 100644 --- a/apps/vod/tasks.py +++ b/apps/vod/tasks.py @@ -187,16 +187,28 @@ def batch_create_categories(categories_data, category_type, account): logger.debug(f"Found {len(existing_categories)} existing categories") + # Check if we should auto-enable new categories based on account settings + account_custom_props = account.custom_properties or {} + if category_type == 'movie': + auto_enable_new = account_custom_props.get("auto_enable_new_groups_vod", True) + else: # series + auto_enable_new = account_custom_props.get("auto_enable_new_groups_series", True) + # Create missing categories in batch new_categories = [] + for name in category_names: if name not in existing_categories: + # Always create new categories new_categories.append(VODCategory(name=name, category_type=category_type)) else: + # Existing category - create relationship with enabled based on auto_enable setting + # (category exists globally but is new to this account) relations_to_create.append(M3UVODCategoryRelation( category=existing_categories[name], m3u_account=account, custom_properties={}, + enabled=auto_enable_new, )) logger.debug(f"{len(new_categories)} new categories found") @@ -204,24 +216,68 @@ def batch_create_categories(categories_data, category_type, account): if new_categories: logger.debug("Creating new categories...") - created_categories = VODCategory.bulk_create_and_fetch(new_categories, ignore_conflicts=True) + created_categories = list(VODCategory.bulk_create_and_fetch(new_categories, ignore_conflicts=True)) + + # Create relations for newly created categories with enabled based on auto_enable setting + for cat in created_categories: + if not auto_enable_new: + logger.info(f"New {category_type} category '{cat.name}' created but DISABLED - auto_enable_new_groups is disabled for account {account.id}") + + relations_to_create.append( + M3UVODCategoryRelation( + category=cat, + m3u_account=account, + custom_properties={}, + enabled=auto_enable_new, + ) + ) + # Convert to dictionary for easy lookup newly_created = {cat.name: cat for cat in created_categories} - - relations_to_create += [ - M3UVODCategoryRelation( - category=cat, - m3u_account=account, - custom_properties={}, - ) for cat in newly_created.values() - ] - existing_categories.update(newly_created) # Create missing relations logger.debug("Updating category account relations...") M3UVODCategoryRelation.objects.bulk_create(relations_to_create, ignore_conflicts=True) + # Delete orphaned category relationships (categories no longer in the M3U source) + current_category_ids = set(existing_categories[name].id for name in category_names) + existing_relations = M3UVODCategoryRelation.objects.filter( + m3u_account=account, + category__category_type=category_type + ).select_related('category') + + relations_to_delete = [ + rel for rel in existing_relations + if rel.category_id not in current_category_ids + ] + + if relations_to_delete: + M3UVODCategoryRelation.objects.filter( + id__in=[rel.id for rel in relations_to_delete] + ).delete() + logger.info(f"Deleted {len(relations_to_delete)} orphaned {category_type} category relationships for account {account.id}: {[rel.category.name for rel in relations_to_delete]}") + + # Check if any of the deleted relationships left categories with no remaining associations + orphaned_category_ids = [] + for rel in relations_to_delete: + category = rel.category + + # Check if this category has any remaining M3U account relationships + remaining_relationships = M3UVODCategoryRelation.objects.filter( + category=category + ).exists() + + # If no relationships remain, it's safe to delete the category + if not remaining_relationships: + orphaned_category_ids.append(category.id) + logger.debug(f"Category '{category.name}' has no remaining associations and will be deleted") + + # Delete orphaned categories + if orphaned_category_ids: + VODCategory.objects.filter(id__in=orphaned_category_ids).delete() + logger.info(f"Deleted {len(orphaned_category_ids)} orphaned {category_type} categories with no remaining associations") + # 🔑 Fetch all relations for this account, for all categories # relations = { rel.id: rel for rel in M3UVODCategoryRelation.objects # .filter(category__in=existing_categories.values(), m3u_account=account) diff --git a/frontend/src/components/forms/LiveGroupFilter.jsx b/frontend/src/components/forms/LiveGroupFilter.jsx index 4a473afe..c5ac5f83 100644 --- a/frontend/src/components/forms/LiveGroupFilter.jsx +++ b/frontend/src/components/forms/LiveGroupFilter.jsx @@ -33,7 +33,13 @@ const OptionWithTooltip = forwardRef( ) ); -const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => { +const LiveGroupFilter = ({ + playlist, + groupStates, + setGroupStates, + autoEnableNewGroupsLive, + setAutoEnableNewGroupsLive, +}) => { const channelGroups = useChannelsStore((s) => s.channelGroups); const profiles = useChannelsStore((s) => s.profiles); const streamProfiles = useStreamProfilesStore((s) => s.profiles); @@ -159,6 +165,16 @@ const LiveGroupFilter = ({ playlist, groupStates, setGroupStates }) => { + + setAutoEnableNewGroupsLive(event.currentTarget.checked) + } + size="sm" + description="When disabled, new groups from the M3U source will be created but disabled by default. You can enable them manually later." + /> + { const [isLoading, setIsLoading] = useState(false); const [movieCategoryStates, setMovieCategoryStates] = useState([]); const [seriesCategoryStates, setSeriesCategoryStates] = useState([]); + const [autoEnableNewGroupsLive, setAutoEnableNewGroupsLive] = useState(true); + const [autoEnableNewGroupsVod, setAutoEnableNewGroupsVod] = useState(true); + const [autoEnableNewGroupsSeries, setAutoEnableNewGroupsSeries] = + useState(true); + + useEffect(() => { + if (!playlist) return; + + // Initialize account-level settings + setAutoEnableNewGroupsLive(playlist.auto_enable_new_groups_live ?? true); + setAutoEnableNewGroupsVod(playlist.auto_enable_new_groups_vod ?? true); + setAutoEnableNewGroupsSeries( + playlist.auto_enable_new_groups_series ?? true + ); + }, [playlist]); useEffect(() => { if (Object.keys(channelGroups).length === 0) { @@ -116,6 +131,14 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => { })) .filter((state) => state.enabled !== state.original_enabled); + // Update account-level settings via the proper account endpoint + await API.updatePlaylist({ + id: playlist.id, + auto_enable_new_groups_live: autoEnableNewGroupsLive, + auto_enable_new_groups_vod: autoEnableNewGroupsVod, + auto_enable_new_groups_series: autoEnableNewGroupsSeries, + }); + // Update group settings via API endpoint await API.updateM3UGroupSettings( playlist.id, @@ -176,6 +199,8 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => { playlist={playlist} groupStates={groupStates} setGroupStates={setGroupStates} + autoEnableNewGroupsLive={autoEnableNewGroupsLive} + setAutoEnableNewGroupsLive={setAutoEnableNewGroupsLive} /> @@ -185,6 +210,8 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => { categoryStates={movieCategoryStates} setCategoryStates={setMovieCategoryStates} type="movie" + autoEnableNewGroups={autoEnableNewGroupsVod} + setAutoEnableNewGroups={setAutoEnableNewGroupsVod} /> @@ -194,6 +221,8 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => { categoryStates={seriesCategoryStates} setCategoryStates={setSeriesCategoryStates} type="series" + autoEnableNewGroups={autoEnableNewGroupsSeries} + setAutoEnableNewGroups={setAutoEnableNewGroupsSeries} /> diff --git a/frontend/src/components/forms/VODCategoryFilter.jsx b/frontend/src/components/forms/VODCategoryFilter.jsx index 7b922f06..a6dccdd2 100644 --- a/frontend/src/components/forms/VODCategoryFilter.jsx +++ b/frontend/src/components/forms/VODCategoryFilter.jsx @@ -10,6 +10,7 @@ import { Text, Divider, Box, + Checkbox, } from '@mantine/core'; import { CircleCheck, CircleX } from 'lucide-react'; import useVODStore from '../../store/useVODStore'; @@ -19,6 +20,8 @@ const VODCategoryFilter = ({ categoryStates, setCategoryStates, type, + autoEnableNewGroups, + setAutoEnableNewGroups, }) => { const categories = useVODStore((s) => s.categories); const [filter, setFilter] = useState(''); @@ -85,6 +88,16 @@ const VODCategoryFilter = ({ return ( + + setAutoEnableNewGroups(event.currentTarget.checked) + } + size="sm" + description="When disabled, new categories from the provider will be created but disabled by default. You can enable them manually later." + /> + Date: Thu, 9 Oct 2025 19:10:38 -0500 Subject: [PATCH 03/45] Fix: Ensure channel_id and channel.uuid are converted to strings before processing. This fixes an issue where sending a stream switch event would fail if the event was sent from a non owning worker. Fixes [Bug]: Manually switching active stream not working when using XC client. Fixes #269 --- .../proxy/ts_proxy/services/channel_service.py | 18 +++++++++--------- apps/proxy/ts_proxy/views.py | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/proxy/ts_proxy/services/channel_service.py b/apps/proxy/ts_proxy/services/channel_service.py index 932479ea..ac8f3a10 100644 --- a/apps/proxy/ts_proxy/services/channel_service.py +++ b/apps/proxy/ts_proxy/services/channel_service.py @@ -600,25 +600,25 @@ class ChannelService: try: from apps.channels.models import Stream from django.utils import timezone - + stream = Stream.objects.get(id=stream_id) - + # Get existing stats or create new dict current_stats = stream.stream_stats or {} - + # Update with new stats for key, value in stats.items(): if value is not None: current_stats[key] = value - + # Save updated stats and timestamp stream.stream_stats = current_stats stream.stream_stats_updated_at = timezone.now() stream.save(update_fields=['stream_stats', 'stream_stats_updated_at']) - + logger.debug(f"Updated stream stats in database for stream {stream_id}: {stats}") return True - + except Exception as e: logger.error(f"Error updating stream stats in database for stream {stream_id}: {e}") return False @@ -678,7 +678,7 @@ class ChannelService: switch_request = { "event": EventType.STREAM_SWITCH, - "channel_id": channel_id, + "channel_id": str(channel_id), "url": new_url, "user_agent": user_agent, "stream_id": stream_id, @@ -703,7 +703,7 @@ class ChannelService: stop_request = { "event": EventType.CHANNEL_STOP, - "channel_id": channel_id, + "channel_id": str(channel_id), "requester_worker_id": proxy_server.worker_id, "timestamp": time.time() } @@ -726,7 +726,7 @@ class ChannelService: stop_request = { "event": EventType.CLIENT_STOP, - "channel_id": channel_id, + "channel_id": str(channel_id), "client_id": client_id, "requester_worker_id": proxy_server.worker_id, "timestamp": time.time() diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index e31d0418..bb07d697 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -491,7 +491,7 @@ def stream_xc(request, username, password, channel_id): channel = get_object_or_404(Channel, id=channel_id) # @TODO: we've got the file 'type' via extension, support this when we support multiple outputs - return stream_ts(request._request, channel.uuid) + return stream_ts(request._request, str(channel.uuid)) @csrf_exempt From fefab4c4c6dffeedb31a95e92e69dbc004e229ea Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 10 Oct 2025 15:26:02 -0500 Subject: [PATCH 04/45] Enhancement: Improve resource cleanup in ProxyServer and StreamManager classes to avoid "SystemError: (libev) error creating signal/async pipe: Too many open files" errors --- apps/proxy/ts_proxy/server.py | 29 ++++++++++++++++++--------- apps/proxy/ts_proxy/stream_manager.py | 24 ++++++++++++++++++++++ 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index da5daaa7..0d638c1a 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -131,6 +131,8 @@ class ProxyServer: max_retries = 10 base_retry_delay = 1 # Start with 1 second delay max_retry_delay = 30 # Cap at 30 seconds + pubsub_client = None + pubsub = None while True: try: @@ -339,20 +341,27 @@ class ProxyServer: logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})") gevent.sleep(final_delay) # REPLACE: time.sleep(final_delay) - # Try to clean up the old connection - try: - if 'pubsub' in locals(): - pubsub.close() - if 'pubsub_client' in locals(): - pubsub_client.close() - except: - pass - except Exception as e: logger.error(f"Error in event listener: {e}") # Add a short delay to prevent rapid retries on persistent errors gevent.sleep(5) # REPLACE: time.sleep(5) + finally: + # Always clean up PubSub connections in all error paths + try: + if pubsub: + pubsub.close() + pubsub = None + except Exception as e: + logger.debug(f"Error closing pubsub: {e}") + + try: + if pubsub_client: + pubsub_client.close() + pubsub_client = None + except Exception as e: + logger.debug(f"Error closing pubsub_client: {e}") + thread = threading.Thread(target=event_listener, daemon=True) thread.name = "redis-event-listener" thread.start() @@ -596,7 +605,7 @@ class ProxyServer: if channel_user_agent: metadata["user_agent"] = channel_user_agent - # CRITICAL FIX: Make sure stream_id is always set in metadata and properly logged + # Make sure stream_id is always set in metadata and properly logged if channel_stream_id: metadata["stream_id"] = str(channel_stream_id) logger.info(f"Storing stream_id {channel_stream_id} in metadata for channel {channel_id}") diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index e80d4527..be6a9c4e 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -1219,6 +1219,30 @@ class StreamManager: except Exception as e: logger.error(f"Final kill attempt failed for channel {self.channel_id}: {e}") + # Explicitly close all subprocess pipes to prevent file descriptor leaks + try: + if self.transcode_process.stdin: + self.transcode_process.stdin.close() + if self.transcode_process.stdout: + self.transcode_process.stdout.close() + if self.transcode_process.stderr: + self.transcode_process.stderr.close() + logger.debug(f"Closed all subprocess pipes for channel {self.channel_id}") + except Exception as e: + logger.debug(f"Error closing subprocess pipes for channel {self.channel_id}: {e}") + + # Join stderr reader thread to ensure it's fully terminated + if hasattr(self, 'stderr_reader_thread') and self.stderr_reader_thread and self.stderr_reader_thread.is_alive(): + try: + logger.debug(f"Waiting for stderr reader thread to terminate for channel {self.channel_id}") + self.stderr_reader_thread.join(timeout=2.0) + if self.stderr_reader_thread.is_alive(): + logger.warning(f"Stderr reader thread did not terminate within timeout for channel {self.channel_id}") + except Exception as e: + logger.debug(f"Error joining stderr reader thread for channel {self.channel_id}: {e}") + finally: + self.stderr_reader_thread = None + self.transcode_process = None self.transcode_process_active = False # Reset the flag From f58bc81c3661eebb6fdee331521938a0c1dea578 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 10 Oct 2025 17:52:05 -0500 Subject: [PATCH 05/45] Enhancement: Optimize EPG program fetching by implementing chunked retrieval and explicit ordering to improve performance and reduce memory issues. --- apps/output/views.py | 407 ++++++++++++++++++++++--------------------- 1 file changed, 211 insertions(+), 196 deletions(-) diff --git a/apps/output/views.py b/apps/output/views.py index 6eee7ccc..f10dfb2b 100644 --- a/apps/output/views.py +++ b/apps/output/views.py @@ -455,241 +455,256 @@ def generate_epg(request, profile_name=None, user=None): else: # For real EPG data - filter only if days parameter was specified if num_days > 0: - programs = channel.epg_data.programs.filter( + programs_qs = channel.epg_data.programs.filter( start_time__gte=now, start_time__lt=cutoff_date - ) + ).order_by('id') # Explicit ordering for consistent chunking else: # Return all programs if days=0 or not specified - programs = channel.epg_data.programs.all() + programs_qs = channel.epg_data.programs.all().order_by('id') - # Process programs in chunks to avoid memory issues + # Process programs in chunks to avoid cursor timeout issues program_batch = [] - batch_size = 100 + batch_size = 250 + chunk_size = 1000 # Fetch 1000 programs at a time from DB - for prog in programs.iterator(): # Use iterator to avoid loading all at once - start_str = prog.start_time.strftime("%Y%m%d%H%M%S %z") - stop_str = prog.end_time.strftime("%Y%m%d%H%M%S %z") + # Fetch chunks until no more results (avoids count() query) + offset = 0 + while True: + # Fetch a chunk of programs - this closes the cursor after fetching + program_chunk = list(programs_qs[offset:offset + chunk_size]) - program_xml = [f' '] - program_xml.append(f' {html.escape(prog.title)}') + # Break if no more programs + if not program_chunk: + break - # Add subtitle if available - if prog.sub_title: - program_xml.append(f" {html.escape(prog.sub_title)}") + # Process each program in the chunk + for prog in program_chunk: + start_str = prog.start_time.strftime("%Y%m%d%H%M%S %z") + stop_str = prog.end_time.strftime("%Y%m%d%H%M%S %z") - # Add description if available - if prog.description: - program_xml.append(f" {html.escape(prog.description)}") + program_xml = [f' '] + program_xml.append(f' {html.escape(prog.title)}') - # Process custom properties if available - if prog.custom_properties: - custom_data = prog.custom_properties or {} + # Add subtitle if available + if prog.sub_title: + program_xml.append(f" {html.escape(prog.sub_title)}") - # Add categories if available - if "categories" in custom_data and custom_data["categories"]: - for category in custom_data["categories"]: - program_xml.append(f" {html.escape(category)}") + # Add description if available + if prog.description: + program_xml.append(f" {html.escape(prog.description)}") - # Add keywords if available - if "keywords" in custom_data and custom_data["keywords"]: - for keyword in custom_data["keywords"]: - program_xml.append(f" {html.escape(keyword)}") + # Process custom properties if available + if prog.custom_properties: + custom_data = prog.custom_properties or {} - # Handle episode numbering - multiple formats supported - # Prioritize onscreen_episode over standalone episode for onscreen system - if "onscreen_episode" in custom_data: - program_xml.append(f' {html.escape(custom_data["onscreen_episode"])}') - elif "episode" in custom_data: - program_xml.append(f' E{custom_data["episode"]}') + # Add categories if available + if "categories" in custom_data and custom_data["categories"]: + for category in custom_data["categories"]: + program_xml.append(f" {html.escape(category)}") - # Handle dd_progid format - if 'dd_progid' in custom_data: - program_xml.append(f' {html.escape(custom_data["dd_progid"])}') + # Add keywords if available + if "keywords" in custom_data and custom_data["keywords"]: + for keyword in custom_data["keywords"]: + program_xml.append(f" {html.escape(keyword)}") - # Handle external database IDs - for system in ['thetvdb.com', 'themoviedb.org', 'imdb.com']: - if f'{system}_id' in custom_data: - program_xml.append(f' {html.escape(custom_data[f"{system}_id"])}') + # Handle episode numbering - multiple formats supported + # Prioritize onscreen_episode over standalone episode for onscreen system + if "onscreen_episode" in custom_data: + program_xml.append(f' {html.escape(custom_data["onscreen_episode"])}') + elif "episode" in custom_data: + program_xml.append(f' E{custom_data["episode"]}') - # Add season and episode numbers in xmltv_ns format if available - if "season" in custom_data and "episode" in custom_data: - season = ( - int(custom_data["season"]) - 1 - if str(custom_data["season"]).isdigit() - else 0 - ) - episode = ( - int(custom_data["episode"]) - 1 - if str(custom_data["episode"]).isdigit() - else 0 - ) - program_xml.append(f' {season}.{episode}.') + # Handle dd_progid format + if 'dd_progid' in custom_data: + program_xml.append(f' {html.escape(custom_data["dd_progid"])}') - # Add language information - if "language" in custom_data: - program_xml.append(f' {html.escape(custom_data["language"])}') + # Handle external database IDs + for system in ['thetvdb.com', 'themoviedb.org', 'imdb.com']: + if f'{system}_id' in custom_data: + program_xml.append(f' {html.escape(custom_data[f"{system}_id"])}') - if "original_language" in custom_data: - program_xml.append(f' {html.escape(custom_data["original_language"])}') + # Add season and episode numbers in xmltv_ns format if available + if "season" in custom_data and "episode" in custom_data: + season = ( + int(custom_data["season"]) - 1 + if str(custom_data["season"]).isdigit() + else 0 + ) + episode = ( + int(custom_data["episode"]) - 1 + if str(custom_data["episode"]).isdigit() + else 0 + ) + program_xml.append(f' {season}.{episode}.') - # Add length information - if "length" in custom_data and isinstance(custom_data["length"], dict): - length_value = custom_data["length"].get("value", "") - length_units = custom_data["length"].get("units", "minutes") - program_xml.append(f' {html.escape(str(length_value))}') + # Add language information + if "language" in custom_data: + program_xml.append(f' {html.escape(custom_data["language"])}') - # Add video information - if "video" in custom_data and isinstance(custom_data["video"], dict): - program_xml.append(" ") + if "original_language" in custom_data: + program_xml.append(f' {html.escape(custom_data["original_language"])}') - # Add audio information - if "audio" in custom_data and isinstance(custom_data["audio"], dict): - program_xml.append(" ") + # Add length information + if "length" in custom_data and isinstance(custom_data["length"], dict): + length_value = custom_data["length"].get("value", "") + length_units = custom_data["length"].get("units", "minutes") + program_xml.append(f' {html.escape(str(length_value))}') - # Add subtitles information - if "subtitles" in custom_data and isinstance(custom_data["subtitles"], list): - for subtitle in custom_data["subtitles"]: - if isinstance(subtitle, dict): - subtitle_type = subtitle.get("type", "") - type_attr = f' type="{html.escape(subtitle_type)}"' if subtitle_type else "" - program_xml.append(f" ") - if "language" in subtitle: - program_xml.append(f" {html.escape(subtitle['language'])}") - program_xml.append(" ") + # Add video information + if "video" in custom_data and isinstance(custom_data["video"], dict): + program_xml.append(" ") - # Add rating if available - if "rating" in custom_data: - rating_system = custom_data.get("rating_system", "TV Parental Guidelines") - program_xml.append(f' ') - program_xml.append(f' {html.escape(custom_data["rating"])}') - program_xml.append(f" ") + # Add audio information + if "audio" in custom_data and isinstance(custom_data["audio"], dict): + program_xml.append(" ") - # Add star ratings - if "star_ratings" in custom_data and isinstance(custom_data["star_ratings"], list): - for star_rating in custom_data["star_ratings"]: - if isinstance(star_rating, dict) and "value" in star_rating: - system_attr = f' system="{html.escape(star_rating["system"])}"' if "system" in star_rating else "" - program_xml.append(f" ") - program_xml.append(f" {html.escape(star_rating['value'])}") - program_xml.append(" ") + # Add subtitles information + if "subtitles" in custom_data and isinstance(custom_data["subtitles"], list): + for subtitle in custom_data["subtitles"]: + if isinstance(subtitle, dict): + subtitle_type = subtitle.get("type", "") + type_attr = f' type="{html.escape(subtitle_type)}"' if subtitle_type else "" + program_xml.append(f" ") + if "language" in subtitle: + program_xml.append(f" {html.escape(subtitle['language'])}") + program_xml.append(" ") - # Add reviews - if "reviews" in custom_data and isinstance(custom_data["reviews"], list): - for review in custom_data["reviews"]: - if isinstance(review, dict) and "content" in review: - review_type = review.get("type", "text") - attrs = [f'type="{html.escape(review_type)}"'] - if "source" in review: - attrs.append(f'source="{html.escape(review["source"])}"') - if "reviewer" in review: - attrs.append(f'reviewer="{html.escape(review["reviewer"])}"') - attr_str = " ".join(attrs) - program_xml.append(f' {html.escape(review["content"])}') + # Add rating if available + if "rating" in custom_data: + rating_system = custom_data.get("rating_system", "TV Parental Guidelines") + program_xml.append(f' ') + program_xml.append(f' {html.escape(custom_data["rating"])}') + program_xml.append(f" ") - # Add images - if "images" in custom_data and isinstance(custom_data["images"], list): - for image in custom_data["images"]: - if isinstance(image, dict) and "url" in image: - attrs = [] - for attr in ['type', 'size', 'orient', 'system']: - if attr in image: - attrs.append(f'{attr}="{html.escape(image[attr])}"') - attr_str = " " + " ".join(attrs) if attrs else "" - program_xml.append(f' {html.escape(image["url"])}') + # Add star ratings + if "star_ratings" in custom_data and isinstance(custom_data["star_ratings"], list): + for star_rating in custom_data["star_ratings"]: + if isinstance(star_rating, dict) and "value" in star_rating: + system_attr = f' system="{html.escape(star_rating["system"])}"' if "system" in star_rating else "" + program_xml.append(f" ") + program_xml.append(f" {html.escape(star_rating['value'])}") + program_xml.append(" ") - # Add enhanced credits handling - if "credits" in custom_data: - program_xml.append(" ") - credits = custom_data["credits"] + # Add reviews + if "reviews" in custom_data and isinstance(custom_data["reviews"], list): + for review in custom_data["reviews"]: + if isinstance(review, dict) and "content" in review: + review_type = review.get("type", "text") + attrs = [f'type="{html.escape(review_type)}"'] + if "source" in review: + attrs.append(f'source="{html.escape(review["source"])}"') + if "reviewer" in review: + attrs.append(f'reviewer="{html.escape(review["reviewer"])}"') + attr_str = " ".join(attrs) + program_xml.append(f' {html.escape(review["content"])}') - # Handle different credit types - for role in ['director', 'writer', 'adapter', 'producer', 'composer', 'editor', 'presenter', 'commentator', 'guest']: - if role in credits: - people = credits[role] - if isinstance(people, list): - for person in people: - program_xml.append(f" <{role}>{html.escape(person)}") - else: - program_xml.append(f" <{role}>{html.escape(people)}") + # Add images + if "images" in custom_data and isinstance(custom_data["images"], list): + for image in custom_data["images"]: + if isinstance(image, dict) and "url" in image: + attrs = [] + for attr in ['type', 'size', 'orient', 'system']: + if attr in image: + attrs.append(f'{attr}="{html.escape(image[attr])}"') + attr_str = " " + " ".join(attrs) if attrs else "" + program_xml.append(f' {html.escape(image["url"])}') - # Handle actors separately to include role and guest attributes - if "actor" in credits: - actors = credits["actor"] - if isinstance(actors, list): - for actor in actors: - if isinstance(actor, dict): - name = actor.get("name", "") - role_attr = f' role="{html.escape(actor["role"])}"' if "role" in actor else "" - guest_attr = ' guest="yes"' if actor.get("guest") else "" - program_xml.append(f" {html.escape(name)}") + # Add enhanced credits handling + if "credits" in custom_data: + program_xml.append(" ") + credits = custom_data["credits"] + + # Handle different credit types + for role in ['director', 'writer', 'adapter', 'producer', 'composer', 'editor', 'presenter', 'commentator', 'guest']: + if role in credits: + people = credits[role] + if isinstance(people, list): + for person in people: + program_xml.append(f" <{role}>{html.escape(person)}") else: - program_xml.append(f" {html.escape(actor)}") + program_xml.append(f" <{role}>{html.escape(people)}") + + # Handle actors separately to include role and guest attributes + if "actor" in credits: + actors = credits["actor"] + if isinstance(actors, list): + for actor in actors: + if isinstance(actor, dict): + name = actor.get("name", "") + role_attr = f' role="{html.escape(actor["role"])}"' if "role" in actor else "" + guest_attr = ' guest="yes"' if actor.get("guest") else "" + program_xml.append(f" {html.escape(name)}") + else: + program_xml.append(f" {html.escape(actor)}") + else: + program_xml.append(f" {html.escape(actors)}") + + program_xml.append(" ") + + # Add program date if available (full date, not just year) + if "date" in custom_data: + program_xml.append(f' {html.escape(custom_data["date"])}') + + # Add country if available + if "country" in custom_data: + program_xml.append(f' {html.escape(custom_data["country"])}') + + # Add icon if available + if "icon" in custom_data: + program_xml.append(f' ') + + # Add special flags as proper tags with enhanced handling + if custom_data.get("previously_shown", False): + prev_shown_details = custom_data.get("previously_shown_details", {}) + attrs = [] + if "start" in prev_shown_details: + attrs.append(f'start="{html.escape(prev_shown_details["start"])}"') + if "channel" in prev_shown_details: + attrs.append(f'channel="{html.escape(prev_shown_details["channel"])}"') + attr_str = " " + " ".join(attrs) if attrs else "" + program_xml.append(f" ") + + if custom_data.get("premiere", False): + premiere_text = custom_data.get("premiere_text", "") + if premiere_text: + program_xml.append(f" {html.escape(premiere_text)}") else: - program_xml.append(f" {html.escape(actors)}") + program_xml.append(" ") - program_xml.append(" ") + if custom_data.get("last_chance", False): + last_chance_text = custom_data.get("last_chance_text", "") + if last_chance_text: + program_xml.append(f" {html.escape(last_chance_text)}") + else: + program_xml.append(" ") - # Add program date if available (full date, not just year) - if "date" in custom_data: - program_xml.append(f' {html.escape(custom_data["date"])}') + if custom_data.get("new", False): + program_xml.append(" ") - # Add country if available - if "country" in custom_data: - program_xml.append(f' {html.escape(custom_data["country"])}') + if custom_data.get('live', False): + program_xml.append(' ') - # Add icon if available - if "icon" in custom_data: - program_xml.append(f' ') + program_xml.append(" ") - # Add special flags as proper tags with enhanced handling - if custom_data.get("previously_shown", False): - prev_shown_details = custom_data.get("previously_shown_details", {}) - attrs = [] - if "start" in prev_shown_details: - attrs.append(f'start="{html.escape(prev_shown_details["start"])}"') - if "channel" in prev_shown_details: - attrs.append(f'channel="{html.escape(prev_shown_details["channel"])}"') - attr_str = " " + " ".join(attrs) if attrs else "" - program_xml.append(f" ") + # Add to batch + program_batch.extend(program_xml) - if custom_data.get("premiere", False): - premiere_text = custom_data.get("premiere_text", "") - if premiere_text: - program_xml.append(f" {html.escape(premiere_text)}") - else: - program_xml.append(" ") + # Send batch when full or send keep-alive + if len(program_batch) >= batch_size: + yield '\n'.join(program_batch) + '\n' + program_batch = [] - if custom_data.get("last_chance", False): - last_chance_text = custom_data.get("last_chance_text", "") - if last_chance_text: - program_xml.append(f" {html.escape(last_chance_text)}") - else: - program_xml.append(" ") - - if custom_data.get("new", False): - program_xml.append(" ") - - if custom_data.get('live', False): - program_xml.append(' ') - - program_xml.append(" ") - - # Add to batch - program_batch.extend(program_xml) - - # Send batch when full or send keep-alive - if len(program_batch) >= batch_size: - yield '\n'.join(program_batch) + '\n' - program_batch = [] # Send keep-alive every batch + # Move to next chunk + offset += chunk_size # Send remaining programs in batch if program_batch: From d5f9ba7e5ebd24d9e3a52870d9300d44f5216e6c Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 10 Oct 2025 17:55:51 -0500 Subject: [PATCH 06/45] Sort EPG output by channel number. --- apps/output/views.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/output/views.py b/apps/output/views.py index f10dfb2b..15036710 100644 --- a/apps/output/views.py +++ b/apps/output/views.py @@ -342,9 +342,9 @@ def generate_epg(request, profile_name=None, user=None): channels = Channel.objects.filter( channelprofilemembership__channel_profile=channel_profile, channelprofilemembership__enabled=True, - ) + ).order_by("channel_number") else: - channels = Channel.objects.all() + channels = Channel.objects.all().order_by("channel_number") # Check if the request wants to use direct logo URLs instead of cache use_cached_logos = request.GET.get('cachedlogos', 'true').lower() != 'false' From d32abecb2570a88ab0c5f62883f0cab84b6aa97b Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 11 Oct 2025 10:36:04 -0500 Subject: [PATCH 07/45] Enhancement: Add confirmation dialog to stream delete functions. --- .../src/components/tables/StreamsTable.jsx | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/frontend/src/components/tables/StreamsTable.jsx b/frontend/src/components/tables/StreamsTable.jsx index c4ab3652..ff56973c 100644 --- a/frontend/src/components/tables/StreamsTable.jsx +++ b/frontend/src/components/tables/StreamsTable.jsx @@ -51,6 +51,7 @@ import useChannelsTableStore from '../../store/channelsTable'; import useWarningsStore from '../../store/warnings'; import { CustomTable, useTable } from './CustomTable'; import useLocalStorage from '../../hooks/useLocalStorage'; +import ConfirmationDialog from '../ConfirmationDialog'; const StreamRowActions = ({ theme, @@ -200,6 +201,12 @@ const StreamsTable = () => { const [rememberSingleChoice, setRememberSingleChoice] = useState(false); const [currentStreamForChannel, setCurrentStreamForChannel] = useState(null); + // Confirmation dialog state + const [confirmDeleteOpen, setConfirmDeleteOpen] = useState(false); + const [deleteTarget, setDeleteTarget] = useState(null); + const [streamToDelete, setStreamToDelete] = useState(null); + const [isBulkDelete, setIsBulkDelete] = useState(false); + // const [allRowsSelected, setAllRowsSelected] = useState(false); // Add local storage for page size @@ -510,13 +517,49 @@ const StreamsTable = () => { }; const deleteStream = async (id) => { + // Get stream details for the confirmation dialog + const streamObj = data.find((s) => s.id === id); + setStreamToDelete(streamObj); + setDeleteTarget(id); + setIsBulkDelete(false); + + // Skip warning if it's been suppressed + if (isWarningSuppressed('delete-stream')) { + return executeDeleteStream(id); + } + + setConfirmDeleteOpen(true); + }; + + const executeDeleteStream = async (id) => { await API.deleteStream(id); + fetchData(); + // Clear the selection for the deleted stream + setSelectedStreamIds([]); + table.setSelectedTableIds([]); + setConfirmDeleteOpen(false); }; const deleteStreams = async () => { + setIsBulkDelete(true); + setStreamToDelete(null); + + // Skip warning if it's been suppressed + if (isWarningSuppressed('delete-streams')) { + return executeDeleteStreams(); + } + + setConfirmDeleteOpen(true); + }; + + const executeDeleteStreams = async () => { setIsLoading(true); await API.deleteStreams(selectedStreamIds); setIsLoading(false); + fetchData(); + setSelectedStreamIds([]); + table.setSelectedTableIds([]); + setConfirmDeleteOpen(false); }; const closeStreamForm = () => { @@ -1175,6 +1218,39 @@ const StreamsTable = () => { + + setConfirmDeleteOpen(false)} + onConfirm={() => + isBulkDelete + ? executeDeleteStreams() + : executeDeleteStream(deleteTarget) + } + title={`Confirm ${isBulkDelete ? 'Bulk ' : ''}Stream Deletion`} + message={ + isBulkDelete ? ( + `Are you sure you want to delete ${selectedStreamIds.length} stream${selectedStreamIds.length !== 1 ? 's' : ''}? This action cannot be undone.` + ) : streamToDelete ? ( +
+ {`Are you sure you want to delete the following stream? + +Name: ${streamToDelete.name} +${streamToDelete.channel_group ? `Group: ${channelGroups[streamToDelete.channel_group]?.name || 'Unknown'}` : ''} +${streamToDelete.m3u_account ? `M3U Account: ${playlists.find((p) => p.id === streamToDelete.m3u_account)?.name || 'Unknown'}` : ''} + +This action cannot be undone.`} +
+ ) : ( + 'Are you sure you want to delete this stream? This action cannot be undone.' + ) + } + confirmLabel="Delete" + cancelLabel="Cancel" + actionKey={isBulkDelete ? 'delete-streams' : 'delete-stream'} + onSuppressChange={suppressWarning} + size="md" + /> ); }; From 6acb0da93393f0514d9751dab2c20c22983fa595 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 11 Oct 2025 10:46:52 -0500 Subject: [PATCH 08/45] Bug fix/Enhancement: Selecting many streams will no longer cause the stream table to create a new row for buttons. Also reordered buttons in the stream table slightly. --- .../src/components/tables/StreamsTable.jsx | 94 +++++++++---------- 1 file changed, 46 insertions(+), 48 deletions(-) diff --git a/frontend/src/components/tables/StreamsTable.jsx b/frontend/src/components/tables/StreamsTable.jsx index ff56973c..37ab86ee 100644 --- a/frontend/src/components/tables/StreamsTable.jsx +++ b/frontend/src/components/tables/StreamsTable.jsx @@ -874,8 +874,14 @@ const StreamsTable = () => { }} > {/* Top toolbar with Remove, Assign, Auto-match, and Add buttons */} - - + + - - - - + + - + + - - - - + +
+ {initialDataCount === 0 && (
From fbd83e61b70418aff350e9169d669831fc5afcf7 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 11 Oct 2025 10:54:11 -0500 Subject: [PATCH 09/45] Enhancement: Added tooltips to stream table fields. Also removed unneeded imports for logos. --- .../src/components/tables/StreamsTable.jsx | 65 ++++++++++--------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/frontend/src/components/tables/StreamsTable.jsx b/frontend/src/components/tables/StreamsTable.jsx index 37ab86ee..5e68b67b 100644 --- a/frontend/src/components/tables/StreamsTable.jsx +++ b/frontend/src/components/tables/StreamsTable.jsx @@ -3,7 +3,6 @@ import API from '../../api'; import StreamForm from '../forms/Stream'; import usePlaylistsStore from '../../store/playlists'; import useChannelsStore from '../../store/channels'; -import useLogosStore from '../../store/logos'; import { copyToClipboard, useDebounce } from '../../utils'; import { SquarePlus, @@ -67,7 +66,6 @@ const StreamRowActions = ({ (state) => state.channels.find((chan) => chan.id === selectedChannelIds[0])?.streams ); - const fetchLogos = useLogosStore((s) => s.fetchLogos); const addStreamToChannel = async () => { await API.updateChannel({ @@ -250,7 +248,6 @@ const StreamsTable = () => { const channelGroups = useChannelsStore((s) => s.channelGroups); const selectedChannelIds = useChannelsTableStore((s) => s.selectedChannelIds); - const fetchLogos = useChannelsStore((s) => s.fetchLogos); const channelSelectionStreams = useChannelsTableStore( (state) => state.channels.find((chan) => chan.id === selectedChannelIds[0])?.streams @@ -287,15 +284,17 @@ const StreamsTable = () => { grow: true, size: columnSizing.name || 200, cell: ({ getValue }) => ( - - {getValue()} - + + + {getValue()} + + ), }, { @@ -306,15 +305,17 @@ const StreamsTable = () => { : '', size: columnSizing.group || 150, cell: ({ getValue }) => ( - - {getValue()} - + + + {getValue()} + + ), }, { @@ -323,17 +324,17 @@ const StreamsTable = () => { accessorFn: (row) => playlists.find((playlist) => playlist.id === row.m3u_account)?.name, cell: ({ getValue }) => ( - - - {getValue()} - - + + + {getValue()} + + ), }, ], From fa08216600b014aad82b6dba7f9e1210a399c6d5 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 11 Oct 2025 18:08:20 -0500 Subject: [PATCH 10/45] Enhancement: Add chunk timeout configuration in ConfigHelper. Improve StreamManager timeout handling for consistency. Only 1 heartbeat thread per worker should be started now. Timeout on proxy reduced from 60 seconds to 5. --- apps/proxy/ts_proxy/client_manager.py | 70 ++++++++++----------------- apps/proxy/ts_proxy/config_helper.py | 5 ++ apps/proxy/ts_proxy/server.py | 68 +++++++++++++++----------- apps/proxy/ts_proxy/stream_manager.py | 25 +++++++++- 4 files changed, 93 insertions(+), 75 deletions(-) diff --git a/apps/proxy/ts_proxy/client_manager.py b/apps/proxy/ts_proxy/client_manager.py index d4b83d3a..3d89b3b8 100644 --- a/apps/proxy/ts_proxy/client_manager.py +++ b/apps/proxy/ts_proxy/client_manager.py @@ -8,7 +8,7 @@ import gevent from typing import Set, Optional from apps.proxy.config import TSConfig as Config from redis.exceptions import ConnectionError, TimeoutError -from .constants import EventType +from .constants import EventType, ChannelState, ChannelMetadataField from .config_helper import ConfigHelper from .redis_keys import RedisKeys from .utils import get_logger @@ -26,6 +26,7 @@ class ClientManager: self.lock = threading.Lock() self.last_active_time = time.time() self.worker_id = worker_id # Store worker ID as instance variable + self._heartbeat_running = True # Flag to control heartbeat thread # STANDARDIZED KEYS: Move client set under channel namespace self.client_set_key = RedisKeys.clients(channel_id) @@ -77,56 +78,28 @@ class ClientManager: logger.debug(f"Failed to trigger stats update: {e}") def _start_heartbeat_thread(self): - """Start thread to regularly refresh client presence in Redis""" + """Start thread to regularly refresh client presence in Redis for local clients""" def heartbeat_task(): - no_clients_count = 0 # Track consecutive empty cycles - max_empty_cycles = 3 # Exit after this many consecutive empty checks - logger.debug(f"Started heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") - while True: + while self._heartbeat_running: try: - # Wait for the interval - gevent.sleep(self.heartbeat_interval) + # Wait for the interval, but check stop flag frequently for quick shutdown + # Sleep in 1-second increments to allow faster response to stop signal + for _ in range(int(self.heartbeat_interval)): + if not self._heartbeat_running: + break + time.sleep(1) + + # Final check before doing work + if not self._heartbeat_running: + break # Send heartbeat for all local clients with self.lock: - if not self.clients or not self.redis_client: - # No clients left, increment our counter - no_clients_count += 1 - - # Check if we're in a shutdown delay period before exiting - in_shutdown_delay = False - if self.redis_client: - try: - disconnect_key = RedisKeys.last_client_disconnect(self.channel_id) - disconnect_time_bytes = self.redis_client.get(disconnect_key) - if disconnect_time_bytes: - disconnect_time = float(disconnect_time_bytes.decode('utf-8')) - elapsed = time.time() - disconnect_time - shutdown_delay = ConfigHelper.channel_shutdown_delay() - - if elapsed < shutdown_delay: - in_shutdown_delay = True - logger.debug(f"Channel {self.channel_id} in shutdown delay: {elapsed:.1f}s of {shutdown_delay}s elapsed") - except Exception as e: - logger.debug(f"Error checking shutdown delay: {e}") - - # Only exit if we've seen no clients for several consecutive checks AND we're not in shutdown delay - if no_clients_count >= max_empty_cycles and not in_shutdown_delay: - logger.info(f"No clients for channel {self.channel_id} after {no_clients_count} consecutive checks and not in shutdown delay, exiting heartbeat thread") - return # This exits the thread - - # Skip this cycle if we have no clients but continue if in shutdown delay - if not in_shutdown_delay: - continue - else: - # Reset counter during shutdown delay to prevent premature exit - no_clients_count = 0 - continue - else: - # Reset counter when we see clients - no_clients_count = 0 + # Skip this cycle if we have no local clients + if not self.clients: + continue # IMPROVED GHOST DETECTION: Check for stale clients before sending heartbeats current_time = time.time() @@ -197,11 +170,20 @@ class ClientManager: except Exception as e: logger.error(f"Error in client heartbeat thread: {e}") + logger.debug(f"Heartbeat thread exiting for channel {self.channel_id}") + thread = threading.Thread(target=heartbeat_task, daemon=True) thread.name = f"client-heartbeat-{self.channel_id}" thread.start() logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") + def stop(self): + """Stop the heartbeat thread and cleanup""" + logger.debug(f"Stopping ClientManager for channel {self.channel_id}") + self._heartbeat_running = False + # Give the thread a moment to exit gracefully + # Note: We don't join() here because it's a daemon thread and will exit on its own + def _execute_redis_command(self, command_func): """Execute Redis command with error handling""" if not self.redis_client: diff --git a/apps/proxy/ts_proxy/config_helper.py b/apps/proxy/ts_proxy/config_helper.py index d59fa1f9..62b889dc 100644 --- a/apps/proxy/ts_proxy/config_helper.py +++ b/apps/proxy/ts_proxy/config_helper.py @@ -100,3 +100,8 @@ class ConfigHelper: def channel_init_grace_period(): """Get channel initialization grace period in seconds""" return Config.get_channel_init_grace_period() + + @staticmethod + def chunk_timeout(): + """Get chunk timeout in seconds (used for both socket and HTTP read timeouts)""" + return ConfigHelper.get('CHUNK_TIMEOUT', 5) # Default 5 seconds diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 0d638c1a..cca827a9 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -495,17 +495,18 @@ class ProxyServer: ) return True - # Create buffer and client manager instances - buffer = StreamBuffer(channel_id, redis_client=self.redis_client) - client_manager = ClientManager( - channel_id, - redis_client=self.redis_client, - worker_id=self.worker_id - ) + # Create buffer and client manager instances (or reuse if they exist) + if channel_id not in self.stream_buffers: + buffer = StreamBuffer(channel_id, redis_client=self.redis_client) + self.stream_buffers[channel_id] = buffer - # Store in local tracking - self.stream_buffers[channel_id] = buffer - self.client_managers[channel_id] = client_manager + if channel_id not in self.client_managers: + client_manager = ClientManager( + channel_id, + redis_client=self.redis_client, + worker_id=self.worker_id + ) + self.client_managers[channel_id] = client_manager # IMPROVED: Set initializing state in Redis BEFORE any other operations if self.redis_client: @@ -559,13 +560,15 @@ class ProxyServer: logger.info(f"Channel {channel_id} already owned by worker {current_owner}") logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only") - # Create buffer but not stream manager - buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) - self.stream_buffers[channel_id] = buffer + # Create buffer but not stream manager (only if not already exists) + if channel_id not in self.stream_buffers: + buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) + self.stream_buffers[channel_id] = buffer - # Create client manager with channel_id and redis_client - client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) - self.client_managers[channel_id] = client_manager + # Create client manager with channel_id and redis_client (only if not already exists) + if channel_id not in self.client_managers: + client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) + self.client_managers[channel_id] = client_manager return True @@ -580,13 +583,15 @@ class ProxyServer: # Another worker just acquired ownership logger.info(f"Another worker just acquired ownership of channel {channel_id}") - # Create buffer but not stream manager - buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) - self.stream_buffers[channel_id] = buffer + # Create buffer but not stream manager (only if not already exists) + if channel_id not in self.stream_buffers: + buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) + self.stream_buffers[channel_id] = buffer - # Create client manager with channel_id and redis_client - client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) - self.client_managers[channel_id] = client_manager + # Create client manager with channel_id and redis_client (only if not already exists) + if channel_id not in self.client_managers: + client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) + self.client_managers[channel_id] = client_manager return True @@ -641,13 +646,14 @@ class ProxyServer: logger.info(f"Created StreamManager for channel {channel_id} with stream ID {channel_stream_id}") self.stream_managers[channel_id] = stream_manager - # Create client manager with channel_id, redis_client AND worker_id - client_manager = ClientManager( - channel_id=channel_id, - redis_client=self.redis_client, - worker_id=self.worker_id - ) - self.client_managers[channel_id] = client_manager + # Create client manager with channel_id, redis_client AND worker_id (only if not already exists) + if channel_id not in self.client_managers: + client_manager = ClientManager( + channel_id=channel_id, + redis_client=self.redis_client, + worker_id=self.worker_id + ) + self.client_managers[channel_id] = client_manager # Start stream manager thread only for the owner thread = threading.Thread(target=stream_manager.run, daemon=True) @@ -855,6 +861,10 @@ class ProxyServer: # Clean up client manager - SAFE CHECK HERE TOO if channel_id in self.client_managers: try: + client_manager = self.client_managers[channel_id] + # Stop the heartbeat thread before deleting + if hasattr(client_manager, 'stop'): + client_manager.stop() del self.client_managers[channel_id] logger.info(f"Removed client manager for channel {channel_id}") except KeyError: diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index be6a9c4e..adc70137 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -10,6 +10,7 @@ import gevent import re from typing import Optional, List from django.shortcuts import get_object_or_404 +from urllib3.exceptions import ReadTimeoutError from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel, Stream from apps.m3u.models import M3UAccount, M3UAccountProfile @@ -761,10 +762,12 @@ class StreamManager: self.current_session = session # Stream the URL with proper timeout handling + # Use same chunk timeout as socket connections for consistency + chunk_timeout = ConfigHelper.chunk_timeout() response = session.get( self.url, stream=True, - timeout=(10, 60) # 10s connect timeout, 60s read timeout + timeout=(5, chunk_timeout) # 5s connect timeout, configurable chunk timeout ) self.current_response = response @@ -832,6 +835,13 @@ class StreamManager: else: # Handle direct HTTP connection chunk_count = 0 + + # Check if response is still valid before attempting to read + if not self.current_response: + logger.debug(f"Response object is None for channel {self.channel_id}, connection likely closed") + self.connected = False + return + try: for chunk in self.current_response.iter_content(chunk_size=self.chunk_size): # Check if we've been asked to stop @@ -854,6 +864,17 @@ class StreamManager: if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: last_data_key = RedisKeys.last_data(self.buffer.channel_id) self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) + except (requests.exceptions.ReadTimeout, ReadTimeoutError, requests.exceptions.ConnectionError) as e: + if self.stop_requested or self.url_switching: + logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}") + else: + # Handle timeout errors - log and close connection, let main loop handle retry + logger.warning(f"Stream read timeout for channel {self.channel_id}: {e}") + + # Close the current connection + self._close_connection() + + return # Exit this method, main loop will retry based on retry_count except (AttributeError, ConnectionError) as e: if self.stop_requested or self.url_switching: logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}") @@ -1274,7 +1295,7 @@ class StreamManager: try: # Set timeout for chunk reads - chunk_timeout = ConfigHelper.get('CHUNK_TIMEOUT', 10) # Default 10 seconds + chunk_timeout = ConfigHelper.chunk_timeout() # Use centralized timeout configuration try: # Handle different socket types with timeout From 74280baa85e2c815b91ca7f10750cf2778b4e67d Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sat, 11 Oct 2025 19:45:21 -0500 Subject: [PATCH 11/45] Changed read timeout for http connection for the proxy server to 10 secounds to avoid unnecessary timeouts. Improved try_next_stream to not fail if the returned stream is the same. It will now try a different stream. Force a client to jump ahead in the buffer if they fall to far behind. --- apps/proxy/ts_proxy/stream_buffer.py | 10 +- apps/proxy/ts_proxy/stream_generator.py | 12 +++ apps/proxy/ts_proxy/stream_manager.py | 135 +++++++++++++++--------- 3 files changed, 106 insertions(+), 51 deletions(-) diff --git a/apps/proxy/ts_proxy/stream_buffer.py b/apps/proxy/ts_proxy/stream_buffer.py index a5169c3a..85feb5dd 100644 --- a/apps/proxy/ts_proxy/stream_buffer.py +++ b/apps/proxy/ts_proxy/stream_buffer.py @@ -303,6 +303,14 @@ class StreamBuffer: # Retrieve chunks chunks = self.get_chunks_exact(client_index, chunk_count) + # Check if we got significantly fewer chunks than expected (likely due to expiration) + # Only check if we expected multiple chunks and got none or very few + if chunk_count > 3 and len(chunks) == 0 and chunks_behind > 10: + # Chunks are missing - likely expired from Redis + # Return empty list to signal client should skip forward + logger.debug(f"Chunks missing for client at index {client_index}, buffer at {self.index} ({chunks_behind} behind)") + return [], client_index + # Check total size total_size = sum(len(c) for c in chunks) @@ -316,7 +324,7 @@ class StreamBuffer: additional_size = sum(len(c) for c in more_chunks) if total_size + additional_size <= MAX_SIZE: chunks.extend(more_chunks) - chunk_count += additional + chunk_count += len(more_chunks) # Fixed: count actual additional chunks retrieved return chunks, client_index + chunk_count diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index 817a7b82..368691b8 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -204,6 +204,18 @@ class StreamGenerator: self.empty_reads += 1 self.consecutive_empty += 1 + # Check if we're too far behind (chunks expired from Redis) + chunks_behind = self.buffer.index - self.local_index + if chunks_behind > 50: # If more than 50 chunks behind, jump forward + # Calculate new position: stay a few chunks behind current buffer + initial_behind = ConfigHelper.initial_behind_chunks() + new_index = max(self.local_index, self.buffer.index - initial_behind) + + logger.warning(f"[{self.client_id}] Client too far behind ({chunks_behind} chunks), jumping from {self.local_index} to {new_index}") + self.local_index = new_index + self.consecutive_empty = 0 # Reset since we're repositioning + continue # Try again immediately with new position + if self._should_send_keepalive(self.local_index): keepalive_packet = create_ts_packet('keepalive') logger.debug(f"[{self.client_id}] Sending keepalive packet while waiting at buffer head") diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index adc70137..e2f8663c 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -92,11 +92,13 @@ class StreamManager: self.tried_stream_ids.add(self.current_stream_id) logger.info(f"Loaded stream ID {self.current_stream_id} from Redis for channel {buffer.channel_id}") else: - logger.warning(f"No stream_id found in Redis for channel {channel_id}") + logger.warning(f"No stream_id found in Redis for channel {channel_id}. " + f"Stream switching will rely on URL comparison to avoid selecting the same stream.") except Exception as e: logger.warning(f"Error loading stream ID from Redis: {e}") else: - logger.warning(f"Unable to get stream ID for channel {channel_id} - stream switching may not work correctly") + logger.warning(f"Unable to get stream ID for channel {channel_id}. " + f"Stream switching will rely on URL comparison to avoid selecting the same stream.") logger.info(f"Initialized stream manager for channel {buffer.channel_id}") @@ -767,7 +769,7 @@ class StreamManager: response = session.get( self.url, stream=True, - timeout=(5, chunk_timeout) # 5s connect timeout, configurable chunk timeout + timeout=(5, 10) # 5s connect timeout, 10s read timeout ) self.current_response = response @@ -1378,7 +1380,17 @@ class StreamManager: # Only update if not already past connecting if not current_state or current_state in [ChannelState.INITIALIZING, ChannelState.CONNECTING]: # NEW CODE: Check if buffer has enough chunks - current_buffer_index = getattr(self.buffer, 'index', 0) + # IMPORTANT: Read from Redis, not local buffer.index, because in multi-worker setup + # each worker has its own StreamBuffer instance with potentially stale local index + buffer_index_key = RedisKeys.buffer_index(channel_id) + current_buffer_index = 0 + try: + redis_index = redis_client.get(buffer_index_key) + if redis_index: + current_buffer_index = int(redis_index) + except Exception as e: + logger.error(f"Error reading buffer index from Redis: {e}") + initial_chunks_needed = ConfigHelper.initial_behind_chunks() if current_buffer_index < initial_chunks_needed: @@ -1426,10 +1438,21 @@ class StreamManager: # Clean up completed timers self._buffer_check_timers = [t for t in self._buffer_check_timers if t.is_alive()] - if hasattr(self.buffer, 'index') and hasattr(self.buffer, 'channel_id'): - current_buffer_index = self.buffer.index - initial_chunks_needed = getattr(Config, 'INITIAL_BEHIND_CHUNKS', 10) + if hasattr(self.buffer, 'channel_id') and hasattr(self.buffer, 'redis_client'): channel_id = self.buffer.channel_id + redis_client = self.buffer.redis_client + + # IMPORTANT: Read from Redis, not local buffer.index + buffer_index_key = RedisKeys.buffer_index(channel_id) + current_buffer_index = 0 + try: + redis_index = redis_client.get(buffer_index_key) + if redis_index: + current_buffer_index = int(redis_index) + except Exception as e: + logger.error(f"Error reading buffer index from Redis: {e}") + + initial_chunks_needed = ConfigHelper.initial_behind_chunks() # Use ConfigHelper for consistency if current_buffer_index >= initial_chunks_needed: # We now have enough buffer, call _set_waiting_for_clients again @@ -1454,6 +1477,7 @@ class StreamManager: def _try_next_stream(self): """ Try to switch to the next available stream for this channel. + Will iterate through multiple alternate streams if needed to find one with a different URL. Returns: bool: True if successfully switched to a new stream, False otherwise @@ -1479,60 +1503,71 @@ class StreamManager: logger.warning(f"All {len(alternate_streams)} alternate streams have been tried for channel {self.channel_id}") return False - # Get the next stream to try - next_stream = untried_streams[0] - stream_id = next_stream['stream_id'] - profile_id = next_stream['profile_id'] # This is the M3U profile ID we need + # IMPROVED: Try multiple streams until we find one with a different URL + for next_stream in untried_streams: + stream_id = next_stream['stream_id'] + profile_id = next_stream['profile_id'] # This is the M3U profile ID we need - # Add to tried streams - self.tried_stream_ids.add(stream_id) + # Add to tried streams + self.tried_stream_ids.add(stream_id) - # Get stream info including URL using the profile_id we already have - logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}") - stream_info = get_stream_info_for_switch(self.channel_id, stream_id) + # Get stream info including URL using the profile_id we already have + logger.info(f"Trying next stream ID {stream_id} with profile ID {profile_id} for channel {self.channel_id}") + stream_info = get_stream_info_for_switch(self.channel_id, stream_id) - if 'error' in stream_info or not stream_info.get('url'): - logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}") - return False + if 'error' in stream_info or not stream_info.get('url'): + logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}") + continue # Try next stream instead of giving up - # Update URL and user agent - new_url = stream_info['url'] - new_user_agent = stream_info['user_agent'] - new_transcode = stream_info['transcode'] + # Update URL and user agent + new_url = stream_info['url'] + new_user_agent = stream_info['user_agent'] + new_transcode = stream_info['transcode'] - logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}") + # CRITICAL FIX: Check if the new URL is the same as current URL + # This can happen when current_stream_id is None and we accidentally select the same stream + if new_url == self.url: + logger.warning(f"Stream ID {stream_id} generates the same URL as current stream ({new_url}). " + f"Skipping this stream and trying next alternative.") + continue # Try next stream instead of giving up - # IMPORTANT: Just update the URL, don't stop the channel or release resources - switch_result = self.update_url(new_url, stream_id, profile_id) - if not switch_result: - logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}") - return False + logger.info(f"Switching from URL {self.url} to {new_url} for channel {self.channel_id}") - # Update stream ID tracking - self.current_stream_id = stream_id + # IMPORTANT: Just update the URL, don't stop the channel or release resources + switch_result = self.update_url(new_url, stream_id, profile_id) + if not switch_result: + logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}") + continue # Try next stream - # Store the new user agent and transcode settings - self.user_agent = new_user_agent - self.transcode = new_transcode + # Update stream ID tracking + self.current_stream_id = stream_id - # Update stream metadata in Redis - use the profile_id we got from get_alternate_streams - if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: - metadata_key = RedisKeys.channel_metadata(self.channel_id) - self.buffer.redis_client.hset(metadata_key, mapping={ - ChannelMetadataField.URL: new_url, - ChannelMetadataField.USER_AGENT: new_user_agent, - ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'], - ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams - ChannelMetadataField.STREAM_ID: str(stream_id), - ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()), - ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded" - }) + # Store the new user agent and transcode settings + self.user_agent = new_user_agent + self.transcode = new_transcode - # Log the switch - logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}") + # Update stream metadata in Redis - use the profile_id we got from get_alternate_streams + if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: + metadata_key = RedisKeys.channel_metadata(self.channel_id) + self.buffer.redis_client.hset(metadata_key, mapping={ + ChannelMetadataField.URL: new_url, + ChannelMetadataField.USER_AGENT: new_user_agent, + ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'], + ChannelMetadataField.M3U_PROFILE: str(profile_id), # Use the profile_id from get_alternate_streams + ChannelMetadataField.STREAM_ID: str(stream_id), + ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()), + ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded" + }) - logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}") - return True + # Log the switch + logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}") + + logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}") + return True + + # If we get here, we tried all streams but none worked + logger.error(f"Tried {len(untried_streams)} alternate streams but none were suitable for channel {self.channel_id}") + return False except Exception as e: logger.error(f"Error trying next stream for channel {self.channel_id}: {e}", exc_info=True) From 404d2f82a3d6ea2455bd2d4e01cf870b431a8669 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sun, 12 Oct 2025 09:42:15 -0500 Subject: [PATCH 12/45] Switch HTTP streamer to a thread and pipe its output to a local pipe where the fetch chunk can access it the same way our transcode processes would be accessed. Simplifies the code. --- apps/proxy/ts_proxy/config_helper.py | 6 +- apps/proxy/ts_proxy/http_streamer.py | 138 +++++++++++++++++++++++++ apps/proxy/ts_proxy/stream_manager.py | 141 ++++++++++---------------- 3 files changed, 194 insertions(+), 91 deletions(-) create mode 100644 apps/proxy/ts_proxy/http_streamer.py diff --git a/apps/proxy/ts_proxy/config_helper.py b/apps/proxy/ts_proxy/config_helper.py index 62b889dc..d7d33558 100644 --- a/apps/proxy/ts_proxy/config_helper.py +++ b/apps/proxy/ts_proxy/config_helper.py @@ -103,5 +103,9 @@ class ConfigHelper: @staticmethod def chunk_timeout(): - """Get chunk timeout in seconds (used for both socket and HTTP read timeouts)""" + """ + Get chunk timeout in seconds (used for both socket and HTTP read timeouts). + This controls how long we wait for each chunk before timing out. + Set this higher (e.g., 30s) for slow providers that may have intermittent delays. + """ return ConfigHelper.get('CHUNK_TIMEOUT', 5) # Default 5 seconds diff --git a/apps/proxy/ts_proxy/http_streamer.py b/apps/proxy/ts_proxy/http_streamer.py new file mode 100644 index 00000000..147d2c93 --- /dev/null +++ b/apps/proxy/ts_proxy/http_streamer.py @@ -0,0 +1,138 @@ +""" +HTTP Stream Reader - Thread-based HTTP stream reader that writes to a pipe. +This allows us to use the same fetch_chunk() path for both transcode and HTTP streams. +""" + +import threading +import os +import requests +from requests.adapters import HTTPAdapter +from .utils import get_logger + +logger = get_logger() + + +class HTTPStreamReader: + """Thread-based HTTP stream reader that writes to a pipe""" + + def __init__(self, url, user_agent=None, chunk_size=8192): + self.url = url + self.user_agent = user_agent + self.chunk_size = chunk_size + self.session = None + self.response = None + self.thread = None + self.pipe_read = None + self.pipe_write = None + self.running = False + + def start(self): + """Start the HTTP stream reader thread""" + # Create a pipe (works on Windows and Unix) + self.pipe_read, self.pipe_write = os.pipe() + + # Start the reader thread + self.running = True + self.thread = threading.Thread(target=self._read_stream, daemon=True) + self.thread.start() + + logger.info(f"Started HTTP stream reader thread for {self.url}") + return self.pipe_read + + def _read_stream(self): + """Thread worker that reads HTTP stream and writes to pipe""" + try: + # Build headers + headers = {} + if self.user_agent: + headers['User-Agent'] = self.user_agent + + logger.info(f"HTTP reader connecting to {self.url}") + + # Create session + self.session = requests.Session() + + # Disable retries for faster failure detection + adapter = HTTPAdapter(max_retries=0, pool_connections=1, pool_maxsize=1) + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + + # Stream the URL + self.response = self.session.get( + self.url, + headers=headers, + stream=True, + timeout=(5, 30) # 5s connect, 30s read + ) + + if self.response.status_code != 200: + logger.error(f"HTTP {self.response.status_code} from {self.url}") + return + + logger.info(f"HTTP reader connected successfully, streaming data...") + + # Stream chunks to pipe + chunk_count = 0 + for chunk in self.response.iter_content(chunk_size=self.chunk_size): + if not self.running: + break + + if chunk: + try: + # Write binary data to pipe + os.write(self.pipe_write, chunk) + chunk_count += 1 + + # Log progress periodically + if chunk_count % 1000 == 0: + logger.debug(f"HTTP reader streamed {chunk_count} chunks") + except OSError as e: + logger.error(f"Pipe write error: {e}") + break + + logger.info("HTTP stream ended") + + except requests.exceptions.RequestException as e: + logger.error(f"HTTP reader request error: {e}") + except Exception as e: + logger.error(f"HTTP reader unexpected error: {e}", exc_info=True) + finally: + self.running = False + # Close write end of pipe to signal EOF + try: + if self.pipe_write is not None: + os.close(self.pipe_write) + self.pipe_write = None + except: + pass + + def stop(self): + """Stop the HTTP stream reader""" + logger.info("Stopping HTTP stream reader") + self.running = False + + # Close response + if self.response: + try: + self.response.close() + except: + pass + + # Close session + if self.session: + try: + self.session.close() + except: + pass + + # Close write end of pipe + if self.pipe_write is not None: + try: + os.close(self.pipe_write) + self.pipe_write = None + except: + pass + + # Wait for thread + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=2.0) diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index e2f8663c..d1f4ded6 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -114,6 +114,9 @@ class StreamManager: self.stderr_reader_thread = None self.ffmpeg_input_phase = True # Track if we're still reading input info + # Add HTTP reader thread property + self.http_reader = None + def _create_session(self): """Create and configure requests session with optimal settings""" session = requests.Session() @@ -740,9 +743,9 @@ class StreamManager: def _establish_http_connection(self): - """Establish a direct HTTP connection to the stream""" + """Establish HTTP connection using thread-based reader (same as transcode path)""" try: - logger.debug(f"Using TS Proxy to connect to stream: {self.url}") + logger.debug(f"Using HTTP streamer thread to connect to stream: {self.url}") # Check if we already have active HTTP connections if self.current_response or self.current_session: @@ -759,43 +762,39 @@ class StreamManager: logger.debug(f"Closing existing transcode process before establishing HTTP connection for channel {self.channel_id}") self._close_socket() - # Create new session for each connection attempt - session = self._create_session() - self.current_session = session + # Use HTTPStreamReader to fetch stream and pipe to a readable file descriptor + # This allows us to use the same fetch_chunk() path as transcode + from .http_streamer import HTTPStreamReader - # Stream the URL with proper timeout handling - # Use same chunk timeout as socket connections for consistency - chunk_timeout = ConfigHelper.chunk_timeout() - response = session.get( - self.url, - stream=True, - timeout=(5, 10) # 5s connect timeout, 10s read timeout + # Create and start the HTTP stream reader + self.http_reader = HTTPStreamReader( + url=self.url, + user_agent=self.user_agent, + chunk_size=self.chunk_size ) - self.current_response = response - if response.status_code == 200: - self.connected = True - self.healthy = True - logger.info(f"Successfully connected to stream source for channel {self.channel_id}") + # Start the reader thread and get the read end of the pipe + pipe_fd = self.http_reader.start() - # Store connection start time for stability tracking - self.connection_start_time = time.time() + # Wrap the file descriptor in a file object (same as transcode stdout) + import os + self.socket = os.fdopen(pipe_fd, 'rb', buffering=0) + self.connected = True + self.healthy = True - # Set channel state to waiting for clients - self._set_waiting_for_clients() + logger.info(f"Successfully started HTTP streamer thread for channel {self.channel_id}") + + # Store connection start time for stability tracking + self.connection_start_time = time.time() + + # Set channel state to waiting for clients + self._set_waiting_for_clients() + + return True - return True - else: - logger.error(f"Failed to connect to stream for channel {self.channel_id}: HTTP {response.status_code}") - self._close_connection() - return False - except requests.exceptions.RequestException as e: - logger.error(f"HTTP request error: {e}") - self._close_connection() - return False except Exception as e: logger.error(f"Error establishing HTTP connection for channel {self.channel_id}: {e}", exc_info=True) - self._close_connection() + self._close_socket() return False def _update_bytes_processed(self, chunk_size): @@ -823,66 +822,19 @@ class StreamManager: logger.error(f"Error updating bytes processed: {e}") def _process_stream_data(self): - """Process stream data until disconnect or error""" + """Process stream data until disconnect or error - unified path for both transcode and HTTP""" try: - if self.transcode: - # Handle transcoded stream data - while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch: - if self.fetch_chunk(): - self.last_data_time = time.time() - else: - if not self.running: - break - gevent.sleep(0.1) # REPLACE time.sleep(0.1) - else: - # Handle direct HTTP connection - chunk_count = 0 - - # Check if response is still valid before attempting to read - if not self.current_response: - logger.debug(f"Response object is None for channel {self.channel_id}, connection likely closed") - self.connected = False - return - - try: - for chunk in self.current_response.iter_content(chunk_size=self.chunk_size): - # Check if we've been asked to stop - if self.stop_requested or self.url_switching or self.needs_stream_switch: - break - - if chunk: - # Track chunk size before adding to buffer - chunk_size = len(chunk) - self._update_bytes_processed(chunk_size) - - # Add chunk to buffer with TS packet alignment - success = self.buffer.add_chunk(chunk) - - if success: - self.last_data_time = time.time() - chunk_count += 1 - - # Update last data timestamp in Redis - if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: - last_data_key = RedisKeys.last_data(self.buffer.channel_id) - self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) - except (requests.exceptions.ReadTimeout, ReadTimeoutError, requests.exceptions.ConnectionError) as e: - if self.stop_requested or self.url_switching: - logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}") - else: - # Handle timeout errors - log and close connection, let main loop handle retry - logger.warning(f"Stream read timeout for channel {self.channel_id}: {e}") - - # Close the current connection - self._close_connection() - - return # Exit this method, main loop will retry based on retry_count - except (AttributeError, ConnectionError) as e: - if self.stop_requested or self.url_switching: - logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}") - else: - logger.error(f"Unexpected stream error for channel {self.channel_id}: {e}") - raise + # Both transcode and HTTP now use the same subprocess/socket approach + # This gives us perfect control: check flags between chunks, timeout just returns False + while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch: + if self.fetch_chunk(): + self.last_data_time = time.time() + else: + # fetch_chunk() returned False - could be timeout, no data, or error + if not self.running: + break + # Brief sleep before retry to avoid tight loop + gevent.sleep(0.1) except Exception as e: logger.error(f"Error processing stream data for channel {self.channel_id}: {e}", exc_info=True) @@ -1206,6 +1158,15 @@ class StreamManager: if self.current_response or self.current_session: self._close_connection() + # Stop HTTP reader thread if it exists + if hasattr(self, 'http_reader') and self.http_reader: + try: + logger.debug(f"Stopping HTTP reader thread for channel {self.channel_id}") + self.http_reader.stop() + self.http_reader = None + except Exception as e: + logger.debug(f"Error stopping HTTP reader for channel {self.channel_id}: {e}") + # Otherwise handle socket and transcode resources if self.socket: try: From 87d2131789b4644d2466740247d6d264c20607e1 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Mon, 13 Oct 2025 16:45:22 -0500 Subject: [PATCH 13/45] Bug fix: Fixes saving settings returning error. Fixes [Bug]: Error saving Stream Settings Fixes #535 --- frontend/src/pages/Settings.jsx | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/frontend/src/pages/Settings.jsx b/frontend/src/pages/Settings.jsx index d61b3b80..23928367 100644 --- a/frontend/src/pages/Settings.jsx +++ b/frontend/src/pages/Settings.jsx @@ -322,7 +322,8 @@ const SettingsPage = () => { let val = null; switch (key) { case 'm3u-hash-key': - val = value.value.split(','); + // Split comma-separated string, filter out empty strings + val = value.value ? value.value.split(',').filter(v => v) : []; break; case 'dvr-pre-offset-minutes': case 'dvr-post-offset-minutes': @@ -408,12 +409,26 @@ const SettingsPage = () => { for (const settingKey in values) { // Only compare against existing value if the setting exists const existing = settings[settingKey]; + + // Convert array values (like m3u-hash-key) to comma-separated strings + let stringValue; + if (Array.isArray(values[settingKey])) { + stringValue = values[settingKey].join(','); + } else { + stringValue = `${values[settingKey]}`; + } + + // Skip empty values to avoid validation errors + if (!stringValue) { + continue; + } + if (!existing) { // Create new setting on save - changedSettings[settingKey] = `${values[settingKey]}`; - } else if (String(values[settingKey]) !== String(existing.value)) { + changedSettings[settingKey] = stringValue; + } else if (stringValue !== String(existing.value)) { // If the user changed the setting's value from what's in the DB: - changedSettings[settingKey] = `${values[settingKey]}`; + changedSettings[settingKey] = stringValue; // Check if M3U hash key was changed if (settingKey === 'm3u-hash-key') { From 90d065df80a66d9773ef3b7edd33537462773a3f Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Mon, 13 Oct 2025 17:00:18 -0500 Subject: [PATCH 14/45] Enhancement: Show "Saved Successfully" message when changing stream settings and DVR settings. Also only show "Saved Successfully" when the api actually returns a success. --- frontend/src/pages/Settings.jsx | 143 +++++++++++++++++++++++--------- 1 file changed, 104 insertions(+), 39 deletions(-) diff --git a/frontend/src/pages/Settings.jsx b/frontend/src/pages/Settings.jsx index 23928367..a2693459 100644 --- a/frontend/src/pages/Settings.jsx +++ b/frontend/src/pages/Settings.jsx @@ -192,6 +192,7 @@ const SettingsPage = () => { useState([]); const [proxySettingsSaved, setProxySettingsSaved] = useState(false); + const [generalSettingsSaved, setGeneralSettingsSaved] = useState(false); const [rehashingStreams, setRehashingStreams] = useState(false); const [rehashSuccess, setRehashSuccess] = useState(false); const [rehashConfirmOpen, setRehashConfirmOpen] = useState(false); @@ -323,7 +324,7 @@ const SettingsPage = () => { switch (key) { case 'm3u-hash-key': // Split comma-separated string, filter out empty strings - val = value.value ? value.value.split(',').filter(v => v) : []; + val = value.value ? value.value.split(',').filter((v) => v) : []; break; case 'dvr-pre-offset-minutes': case 'dvr-post-offset-minutes': @@ -401,7 +402,17 @@ const SettingsPage = () => { loadComskipConfig(); }, []); + // Clear success states when switching accordion panels + useEffect(() => { + setGeneralSettingsSaved(false); + setProxySettingsSaved(false); + setNetworkAccessSaved(false); + setRehashSuccess(false); + }, [accordianValue]); + const onSubmit = async () => { + setGeneralSettingsSaved(false); + const values = form.getValues(); const changedSettings = {}; let m3uHashKeyChanged = false; @@ -409,7 +420,7 @@ const SettingsPage = () => { for (const settingKey in values) { // Only compare against existing value if the setting exists const existing = settings[settingKey]; - + // Convert array values (like m3u-hash-key) to comma-separated strings let stringValue; if (Array.isArray(values[settingKey])) { @@ -417,12 +428,12 @@ const SettingsPage = () => { } else { stringValue = `${values[settingKey]}`; } - + // Skip empty values to avoid validation errors if (!stringValue) { continue; } - + if (!existing) { // Create new setting on save changedSettings[settingKey] = stringValue; @@ -447,20 +458,36 @@ const SettingsPage = () => { } // Update each changed setting in the backend (create if missing) - for (const updatedKey in changedSettings) { - const existing = settings[updatedKey]; - if (existing && existing.id) { - await API.updateSetting({ - ...existing, - value: changedSettings[updatedKey], - }); - } else { - await API.createSetting({ - key: updatedKey, - name: updatedKey.replace(/-/g, ' '), - value: changedSettings[updatedKey], - }); + try { + for (const updatedKey in changedSettings) { + const existing = settings[updatedKey]; + if (existing && existing.id) { + const result = await API.updateSetting({ + ...existing, + value: changedSettings[updatedKey], + }); + // API functions return undefined on error + if (!result) { + throw new Error('Failed to update setting'); + } + } else { + const result = await API.createSetting({ + key: updatedKey, + name: updatedKey.replace(/-/g, ' '), + value: changedSettings[updatedKey], + }); + // API functions return undefined on error + if (!result) { + throw new Error('Failed to create setting'); + } + } } + + setGeneralSettingsSaved(true); + } catch (error) { + // Error notifications are already shown by API functions + // Just don't show the success message + console.error('Error saving settings:', error); } }; @@ -490,12 +517,19 @@ const SettingsPage = () => { const onProxySettingsSubmit = async () => { setProxySettingsSaved(false); - await API.updateSetting({ - ...settings['proxy-settings'], - value: JSON.stringify(proxySettingsForm.getValues()), - }); - - setProxySettingsSaved(true); + try { + const result = await API.updateSetting({ + ...settings['proxy-settings'], + value: JSON.stringify(proxySettingsForm.getValues()), + }); + // API functions return undefined on error + if (result) { + setProxySettingsSaved(true); + } + } catch (error) { + // Error notifications are already shown by API functions + console.error('Error saving proxy settings:', error); + } }; const onComskipUpload = async () => { @@ -583,29 +617,46 @@ const SettingsPage = () => { const executeSettingsSaveAndRehash = async () => { setRehashConfirmOpen(false); + setGeneralSettingsSaved(false); // Use the stored pending values that were captured before the dialog was shown const changedSettings = pendingChangedSettings || {}; // Update each changed setting in the backend (create if missing) - for (const updatedKey in changedSettings) { - const existing = settings[updatedKey]; - if (existing && existing.id) { - await API.updateSetting({ - ...existing, - value: changedSettings[updatedKey], - }); - } else { - await API.createSetting({ - key: updatedKey, - name: updatedKey.replace(/-/g, ' '), - value: changedSettings[updatedKey], - }); + try { + for (const updatedKey in changedSettings) { + const existing = settings[updatedKey]; + if (existing && existing.id) { + const result = await API.updateSetting({ + ...existing, + value: changedSettings[updatedKey], + }); + // API functions return undefined on error + if (!result) { + throw new Error('Failed to update setting'); + } + } else { + const result = await API.createSetting({ + key: updatedKey, + name: updatedKey.replace(/-/g, ' '), + value: changedSettings[updatedKey], + }); + // API functions return undefined on error + if (!result) { + throw new Error('Failed to create setting'); + } + } } - } - // Clear the pending values - setPendingChangedSettings(null); + // Clear the pending values + setPendingChangedSettings(null); + setGeneralSettingsSaved(true); + } catch (error) { + // Error notifications are already shown by API functions + // Just don't show the success message + console.error('Error saving settings:', error); + setPendingChangedSettings(null); + } }; const executeRehashStreamsOnly = async () => { @@ -726,6 +777,13 @@ const SettingsPage = () => {
+ {generalSettingsSaved && ( + + )} { Stream Settings + {generalSettingsSaved && ( + + )} + + {form.values.custom_properties?.name_source === 'stream' && ( + + )} + + { + const value = e.target.value; + setTitlePattern(value); + form.setFieldValue('custom_properties.title_pattern', value); + }} + error={form.errors['custom_properties.title_pattern']} + /> + + { + const value = e.target.value; + setTimePattern(value); + form.setFieldValue('custom_properties.time_pattern', value); + }} + /> + + { + const value = e.target.value; + setDatePattern(value); + form.setFieldValue('custom_properties.date_pattern', value); + }} + /> + + {/* Output Templates */} + + + + Use extracted groups from your patterns to format EPG titles and + descriptions. Reference groups using {'{groupname}'} syntax. + + + { + const value = e.target.value; + setTitleTemplate(value); + form.setFieldValue('custom_properties.title_template', value); + }} + /> + +