From 1fcedab1abe1b492bcf4b3bdcb3a03d3e30f0b34 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 27 Mar 2025 09:49:51 -0500 Subject: [PATCH 01/13] Remote debugging initial commit. --- .dockerignore | 1 + .gitignore | 9 +++- docker/docker-compose.debug.yml | 19 +++++++ docker/entrypoint.sh | 18 ++++--- docker/init/99-init-dev.sh | 8 ++- docker/uwsgi.debug.ini | 81 +++++++++++++++++++++++++++++ scripts/debug_wrapper.py | 90 +++++++++++++++++++++++++++++++++ scripts/standalone_debug.py | 36 +++++++++++++ 8 files changed, 254 insertions(+), 8 deletions(-) create mode 100644 docker/docker-compose.debug.yml create mode 100644 docker/uwsgi.debug.ini create mode 100644 scripts/debug_wrapper.py create mode 100644 scripts/standalone_debug.py diff --git a/.dockerignore b/.dockerignore index e0cc78f0..5073af60 100755 --- a/.dockerignore +++ b/.dockerignore @@ -1,5 +1,6 @@ **/__pycache__ **/.venv +**/venv **/.classpath **/.dockerignore **/.env diff --git a/.gitignore b/.gitignore index b6631ac0..1dd591d4 100755 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .DS_Store **/__pycache__/ **/.vscode/ +**/venv *.pyc node_modules/ .history/ @@ -10,4 +11,10 @@ docker/Dockerfile DEV static/ data/ .next -next-env.d.ts \ No newline at end of file +next-env.d.ts +media/ +celerybeat-schedule* +dump.rdb +debugpy* +uwsgi.sock +package-lock.json \ No newline at end of file diff --git a/docker/docker-compose.debug.yml b/docker/docker-compose.debug.yml new file mode 100644 index 00000000..40a87bfe --- /dev/null +++ b/docker/docker-compose.debug.yml @@ -0,0 +1,19 @@ +services: + dispatcharr: + # build: + # context: .. + # dockerfile: docker/Dockerfile.dev + image: dispatcharr/dispatcharr + container_name: dispatcharr_debug + ports: + - 5656:5656 # API port + - 9193:9191 # Web UI port + - 8001:8001 # Socket port + - 5678:5678 # Debugging port + volumes: + - ../:/app + environment: + - DISPATCHARR_ENV=dev + - DISPATCHARR_DEBUG=true + - REDIS_HOST=localhost + - CELERY_BROKER_URL=redis://localhost:6379/0 diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index d04edcb0..1e1cb22f 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -49,6 +49,7 @@ if [[ ! -f /etc/profile.d/dispatcharr.sh ]]; then echo "export POSTGRES_HOST=$POSTGRES_HOST" >> /etc/profile.d/dispatcharr.sh echo "export POSTGRES_PORT=$POSTGRES_PORT" >> /etc/profile.d/dispatcharr.sh echo "export DISPATCHARR_ENV=$DISPATCHARR_ENV" >> /etc/profile.d/dispatcharr.sh + echo "export DISPATCHARR_DEBUG=$DISPATCHARR_DEBUG" >> /etc/profile.d/dispatcharr.sh echo "export REDIS_HOST=$REDIS_HOST" >> /etc/profile.d/dispatcharr.sh echo "export REDIS_DB=$REDIS_DB" >> /etc/profile.d/dispatcharr.sh fi @@ -75,8 +76,18 @@ postgres_pid=$(su - postgres -c "/usr/lib/postgresql/14/bin/pg_ctl -D /data stat echo "✅ Postgres started with PID $postgres_pid" pids+=("$postgres_pid") -if [ "$DISPATCHARR_ENV" = "dev" ]; then + +uwsgi_file="/app/docker/uwsgi.ini" +if [ "$DISPATCHARR_ENV" = "dev" ] && [ "$DISPATCHARR_DEBUG" != "true" ]; then + uwsgi_file="/app/docker/uwsgi.dev.ini" +elif [ "$DISPATCHARR_DEBUG" = "true" ]; then + uwsgi_file="/app/docker/uwsgi.debug.ini" +fi + + +if [[ "$DISPATCHARR_ENV" = "dev" ]]; then . /app/docker/init/99-init-dev.sh + else echo "🚀 Starting nginx..." nginx @@ -85,10 +96,6 @@ else pids+=("$nginx_pid") fi -uwsgi_file="/app/docker/uwsgi.ini" -if [ "$DISPATCHARR_ENV" = "dev" ]; then - uwsgi_file="/app/docker/uwsgi.dev.ini" -fi echo "🚀 Starting uwsgi..." su - $POSTGRES_USER -c "cd /app && uwsgi --ini $uwsgi_file &" @@ -97,7 +104,6 @@ echo "✅ uwsgi started with PID $uwsgi_pid" pids+=("$uwsgi_pid") - cd /app python manage.py migrate --noinput python manage.py collectstatic --noinput diff --git a/docker/init/99-init-dev.sh b/docker/init/99-init-dev.sh index 3e9ecc0a..861c8307 100644 --- a/docker/init/99-init-dev.sh +++ b/docker/init/99-init-dev.sh @@ -15,5 +15,11 @@ fi # Install frontend dependencies cd /app/frontend && npm install - +# Install pip dependencies cd /app && pip install -r requirements.txt + +# Install debugpy for remote debugging +if [ "$DISPATCHARR_DEBUG" = "true" ]; then + echo "=== setting up debugpy ===" + pip install debugpy +fi diff --git a/docker/uwsgi.debug.ini b/docker/uwsgi.debug.ini new file mode 100644 index 00000000..f8df7bdc --- /dev/null +++ b/docker/uwsgi.debug.ini @@ -0,0 +1,81 @@ +[uwsgi] +; exec-before = python manage.py collectstatic --noinput +; exec-before = python manage.py migrate --noinput + +; First run Redis availability check script once +exec-before = python /app/scripts/wait_for_redis.py + +; Start Redis first +attach-daemon = redis-server +; Then start other services +attach-daemon = celery -A dispatcharr worker -l info +attach-daemon = celery -A dispatcharr beat -l info +attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application +attach-daemon = cd /app/frontend && npm run dev + +# Core settings +chdir = /app +module = scripts.debug_wrapper:application +virtualenv = /dispatcharrpy +master = true +env = DJANGO_SETTINGS_MODULE=dispatcharr.settings +socket = /app/uwsgi.sock +chmod-socket = 777 +vacuum = true +die-on-term = true +static-map = /static=/app/static + +# Worker configuration +workers = 1 +threads = 4 +enable-threads = true +lazy-apps = true + +# HTTP server +http = 0.0.0.0:5656 +http-keepalive = 1 +buffer-size = 65536 +http-timeout = 600 + +# Async mode (use gevent for high concurrency) +gevent = 100 +async = 100 + +# Performance tuning +thunder-lock = true +log-4xx = true +log-5xx = true +disable-logging = false + +; Longer timeouts for debugging sessions +harakiri = 3600 +socket-timeout = 3600 +http-timeout = 3600 + + +# Ignore unknown options +ignore-sigpipe = true +ignore-write-errors = true +disable-write-exception = true + +# Explicitly disable for-server option that confuses debugpy +for-server = false + +# Debugging settings +py-autoreload = 1 +honour-stdin = true + +# Environment variables +env = PYTHONPATH=/app +env = PYTHONUNBUFFERED=1 +env = PYDEVD_DISABLE_FILE_VALIDATION=1 +env = PYTHONUTF8=1 +env = PYTHONXOPT=-Xfrozen_modules=off +env = PYDEVD_DEBUG=1 +env = DEBUGPY_LOG_DIR=/app/debugpy_logs + +# Debugging control variables +env = WAIT_FOR_DEBUGGER=false +env = DEBUG_TIMEOUT=30 + + diff --git a/scripts/debug_wrapper.py b/scripts/debug_wrapper.py new file mode 100644 index 00000000..2339fe87 --- /dev/null +++ b/scripts/debug_wrapper.py @@ -0,0 +1,90 @@ +""" +Debug wrapper for the WSGI application. +This module initializes debugpy and then imports the actual application. +""" +import sys +import os +import time +import logging +import inspect + +# Configure logging to output to both console and file +os.makedirs('/app/debugpy_logs', exist_ok=True) +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s [%(levelname)s] %(name)s - %(message)s', + handlers=[ + logging.FileHandler('/app/debugpy_logs/debug_wrapper.log'), + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger('debug_wrapper') + +# Log system info +logger.info(f"Python version: {sys.version}") +logger.info(f"Current directory: {os.getcwd()}") +logger.info(f"Files in current directory: {os.listdir()}") +logger.info(f"Python path: {sys.path}") + +# Default timeout in seconds +DEBUG_TIMEOUT = int(os.environ.get('DEBUG_TIMEOUT', '30')) +# Whether to wait for debugger to attach +WAIT_FOR_DEBUGGER = os.environ.get('WAIT_FOR_DEBUGGER', 'false').lower() == 'true' + +logger.info(f"DEBUG_TIMEOUT: {DEBUG_TIMEOUT}") +logger.info(f"WAIT_FOR_DEBUGGER: {WAIT_FOR_DEBUGGER}") + +try: + import debugpy + from debugpy import configure + logger.info("Successfully imported debugpy") + + # Critical: Configure debugpy to use regular Python for the adapter, not uwsgi + python_path = '/usr/local/bin/python3' + if os.path.exists(python_path): + logger.info(f"Setting debugpy adapter to use Python interpreter: {python_path}") + debugpy.configure(python=python_path) + else: + logger.warning(f"Python path {python_path} not found. Using system default.") + + # Don't wait for connection, just set up the debugging session + logger.info("Initializing debugpy on 0.0.0.0:5678...") + try: + # Use connect instead of listen to avoid the adapter process + debugpy.listen(("0.0.0.0", 5678)) + logger.info("debugpy now listening on 0.0.0.0:5678") + + if WAIT_FOR_DEBUGGER: + logger.info(f"Waiting for debugger to attach (timeout: {DEBUG_TIMEOUT}s)...") + start_time = time.time() + while not debugpy.is_client_connected() and (time.time() - start_time < DEBUG_TIMEOUT): + time.sleep(1) + logger.info("Waiting for debugger connection...") + + if debugpy.is_client_connected(): + logger.info("Debugger attached!") + else: + logger.info(f"Debugger not attached after {DEBUG_TIMEOUT}s, continuing anyway...") + except Exception as e: + logger.error(f"Error with debugpy.listen: {e}", exc_info=True) + logger.info("Continuing without debugging...") + +except ImportError: + logger.error("debugpy not installed, continuing without debugging support") +except Exception as e: + logger.error(f"Failed to initialize debugpy: {e}", exc_info=True) + logger.info("Continuing without debugging support") + +# Now import the actual WSGI application +logger.info("Loading WSGI application...") +try: + from dispatcharr.wsgi import application + logger.info("WSGI application loaded successfully") + + # Log the application details + logger.info(f"Application type: {type(application)}") + logger.info(f"Application callable: {inspect.isfunction(application) or inspect.ismethod(application)}") + +except Exception as e: + logger.error(f"Error loading WSGI application: {e}", exc_info=True) + raise diff --git a/scripts/standalone_debug.py b/scripts/standalone_debug.py new file mode 100644 index 00000000..69558fab --- /dev/null +++ b/scripts/standalone_debug.py @@ -0,0 +1,36 @@ +""" +Standalone debug entry point for the Django application. +This provides a cleaner way to debug without uWSGI complications. + +Run this directly with Python to debug: + python standalone_debug.py +""" +import os +import sys +import debugpy +import logging + +# Configure basic logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s [%(levelname)s] %(name)s - %(message)s' +) +logger = logging.getLogger('standalone_debug') + +# Setup Django environment +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dispatcharr.settings') + +# Setup debugpy and wait for connection +logger.info("Setting up debugpy...") +debugpy.listen(("0.0.0.0", 5678)) +logger.info("Waiting for debugger to attach... Connect to 0.0.0.0:5678") +debugpy.wait_for_client() +logger.info("Debugger attached!") + +# Import Django and run the development server +logger.info("Starting Django development server...") +import django +django.setup() + +from django.core.management import execute_from_command_line +execute_from_command_line(['manage.py', 'runserver', '0.0.0.0:8000']) From b9f4893261d7dd9672cdac3ed31544600f0d698c Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 27 Mar 2025 16:11:04 -0500 Subject: [PATCH 02/13] Fixes broken channels stats after field name change from 'profile' to 'stream_profile' --- apps/proxy/ts_proxy/channel_status.py | 5 ++--- frontend/src/pages/Stats.jsx | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py index 799da605..b9f1596a 100644 --- a/apps/proxy/ts_proxy/channel_status.py +++ b/apps/proxy/ts_proxy/channel_status.py @@ -37,8 +37,7 @@ class ChannelStatus: 'channel_id': channel_id, 'state': metadata.get(ChannelMetadataField.STATE.encode('utf-8'), b'unknown').decode('utf-8'), 'url': metadata.get(ChannelMetadataField.URL.encode('utf-8'), b'').decode('utf-8'), - 'profile': metadata.get(ChannelMetadataField.STREAM_PROFILE.encode('utf-8'), - metadata.get(ChannelMetadataField.PROFILE.encode('utf-8'), b'unknown')).decode('utf-8'), + 'stream_profile': metadata.get(ChannelMetadataField.STREAM_PROFILE.encode('utf-8'), b'').decode('utf-8'), 'started_at': metadata.get(ChannelMetadataField.INIT_TIME.encode('utf-8'), b'0').decode('utf-8'), 'owner': metadata.get(ChannelMetadataField.OWNER.encode('utf-8'), b'unknown').decode('utf-8'), 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, @@ -273,7 +272,7 @@ class ChannelStatus: 'channel_id': channel_id, 'state': metadata.get(ChannelMetadataField.STATE.encode('utf-8'), b'unknown').decode('utf-8'), 'url': metadata.get(ChannelMetadataField.URL.encode('utf-8'), b'').decode('utf-8'), - 'profile': metadata.get(ChannelMetadataField.PROFILE.encode('utf-8'), b'unknown').decode('utf-8'), + 'stream_profile': metadata.get(ChannelMetadataField.STREAM_PROFILE.encode('utf-8'), b'').decode('utf-8'), 'owner': metadata.get(ChannelMetadataField.OWNER.encode('utf-8'), b'unknown').decode('utf-8'), 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, 'client_count': client_count, diff --git a/frontend/src/pages/Stats.jsx b/frontend/src/pages/Stats.jsx index 62675ff6..1428daa2 100644 --- a/frontend/src/pages/Stats.jsx +++ b/frontend/src/pages/Stats.jsx @@ -315,7 +315,7 @@ const ChannelsPage = () => { ...channels[channelsByUUID[ch.channel_id]], bitrates, stream_profile: streamProfiles.find( - (profile) => profile.id == parseInt(ch.profile) + (profile) => profile.id == parseInt(ch.stream_profile) ), }; From d7d3138703d51a8b523ce399e6994a5b7886e133 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 27 Mar 2025 17:52:27 -0500 Subject: [PATCH 03/13] Changed variable name for clarity. --- apps/proxy/ts_proxy/url_utils.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/proxy/ts_proxy/url_utils.py b/apps/proxy/ts_proxy/url_utils.py index de4c9109..f68caead 100644 --- a/apps/proxy/ts_proxy/url_utils.py +++ b/apps/proxy/ts_proxy/url_utils.py @@ -65,11 +65,9 @@ def generate_stream_url(channel_id: str) -> Tuple[str, str, bool]: else: transcode = True - # Get profile name as string - use id for backward compatibility - # but we'll store it in the STREAM_PROFILE field - profile_value = stream_profile.id + stream_profile_id = stream_profile.id - return stream_url, stream_user_agent, transcode, profile_value + return stream_url, stream_user_agent, transcode, stream_profile_id def transform_url(input_url: str, search_pattern: str, replace_pattern: str) -> str: """ From 3f440d855e534ffefa3f24f825704908220dbd7f Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 27 Mar 2025 18:50:48 -0500 Subject: [PATCH 04/13] Replaced invalid character in logs. --- apps/proxy/ts_proxy/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 282c5a74..5c18064b 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -1157,7 +1157,7 @@ class ProxyServer: self.redis_client.hset(metadata_key, mapping=update_data) # Log the transition - logger.info(f"Channel {channel_id} state transition: {current_state or 'None'} → {new_state}") + logger.info(f"Channel {channel_id} state transition: {current_state or 'None'} -> {new_state}") return True except Exception as e: logger.error(f"Error updating channel state: {e}") From 4e9a52c80ebc8d39f0eb0f1b57d929243e3e2b8a Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 27 Mar 2025 18:53:59 -0500 Subject: [PATCH 05/13] Removed invalid character in logging. --- apps/proxy/ts_proxy/stream_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index 98a4ff0a..6cf31ffa 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -782,7 +782,7 @@ class StreamManager: # Get configured grace period or default grace_period = ConfigHelper.get('CHANNEL_INIT_GRACE_PERIOD', 20) - logger.info(f"STREAM MANAGER: Updated channel {channel_id} state: {current_state or 'None'} → {ChannelState.WAITING_FOR_CLIENTS} with {current_buffer_index} buffer chunks") + logger.info(f"STREAM MANAGER: Updated channel {channel_id} state: {current_state or 'None'} -> {ChannelState.WAITING_FOR_CLIENTS} with {current_buffer_index} buffer chunks") logger.info(f"Started initial connection grace period ({grace_period}s) for channel {channel_id}") else: logger.debug(f"Not changing state: channel {channel_id} already in {current_state} state") From ce6e019e6a2e390a7da8f0d877f4d2c557bbea73 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 27 Mar 2025 19:24:43 -0500 Subject: [PATCH 06/13] Add m3u_profile (id) to redis when channel is initialized. --- apps/proxy/ts_proxy/services/channel_service.py | 11 +++++++---- apps/proxy/ts_proxy/views.py | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/proxy/ts_proxy/services/channel_service.py b/apps/proxy/ts_proxy/services/channel_service.py index 451af7d5..8b8a3d26 100644 --- a/apps/proxy/ts_proxy/services/channel_service.py +++ b/apps/proxy/ts_proxy/services/channel_service.py @@ -20,7 +20,7 @@ class ChannelService: """Service class for channel operations""" @staticmethod - def initialize_channel(channel_id, stream_url, user_agent, transcode=False, profile_value=None, stream_id=None): + def initialize_channel(channel_id, stream_url, user_agent, transcode=False, stream_profile_value=None, stream_id=None, m3u_profile_id=None): """ Initialize a channel with the given parameters. @@ -29,8 +29,9 @@ class ChannelService: stream_url: URL of the stream user_agent: User agent for the stream connection transcode: Whether to transcode the stream - profile_value: Stream profile value to store in metadata + stream_profile_value: Stream profile value to store in metadata stream_id: ID of the stream being used + m3u_profile_id: ID of the M3U profile being used Returns: bool: Success status @@ -67,10 +68,12 @@ class ChannelService: if success and proxy_server.redis_client: metadata_key = RedisKeys.channel_metadata(channel_id) update_data = {} - if profile_value: - update_data[ChannelMetadataField.STREAM_PROFILE] = profile_value + if stream_profile_value: + update_data[ChannelMetadataField.STREAM_PROFILE] = stream_profile_value if stream_id: update_data[ChannelMetadataField.STREAM_ID] = str(stream_id) + if m3u_profile_id: + update_data[ChannelMetadataField.M3U_PROFILE] = str(m3u_profile_id) if update_data: proxy_server.redis_client.hset(metadata_key, mapping=update_data) diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 41074b6d..8455d490 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -91,8 +91,8 @@ def stream_ts(request, channel_id): return JsonResponse({'error': 'Channel not available'}, status=404) # Get the stream ID from the channel - stream_id, profile_id = channel.get_stream() - logger.info(f"Channel {channel_id} using stream ID {stream_id}, profile ID {profile_id}") + stream_id, m3u_profile_id = channel.get_stream() + logger.info(f"Channel {channel_id} using stream ID {stream_id}, m3u account profile ID {m3u_profile_id}") # Generate transcode command if needed stream_profile = channel.get_stream_profile() @@ -101,7 +101,7 @@ def stream_ts(request, channel_id): # Initialize channel with the stream's user agent (not the client's) success = ChannelService.initialize_channel( - channel_id, stream_url, stream_user_agent, transcode, profile_value, stream_id + channel_id, stream_url, stream_user_agent, transcode, profile_value, stream_id, m3u_profile_id ) if not success: From e2d9e233dab2d532877acb2e3b5b73943993cd98 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 27 Mar 2025 20:49:24 -0500 Subject: [PATCH 07/13] Added ability to send a next stream command. --- apps/proxy/ts_proxy/stream_manager.py | 3 +- apps/proxy/ts_proxy/url_utils.py | 14 ++--- apps/proxy/ts_proxy/urls.py | 1 + apps/proxy/ts_proxy/views.py | 76 ++++++++++++++++++++++++++- 4 files changed, 84 insertions(+), 10 deletions(-) diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index 6cf31ffa..317ec176 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -888,7 +888,8 @@ class StreamManager: self.buffer.redis_client.hset(metadata_key, mapping={ ChannelMetadataField.URL: new_url, ChannelMetadataField.USER_AGENT: new_user_agent, - ChannelMetadataField.STREAM_PROFILE: stream_info['profile'], + ChannelMetadataField.STREAM_PROFILE: stream_info['stream_profile'], + ChannelMetadataField.M3U_PROFILE: stream_info['m3u_profile_id'], ChannelMetadataField.STREAM_ID: str(stream_id), ChannelMetadataField.STREAM_SWITCH_TIME: str(time.time()), ChannelMetadataField.STREAM_SWITCH_REASON: "max_retries_exceeded" diff --git a/apps/proxy/ts_proxy/url_utils.py b/apps/proxy/ts_proxy/url_utils.py index f68caead..c4767dd1 100644 --- a/apps/proxy/ts_proxy/url_utils.py +++ b/apps/proxy/ts_proxy/url_utils.py @@ -132,21 +132,21 @@ def get_stream_info_for_switch(channel_id: str, target_stream_id: Optional[int] ).first() if default_profile: - profile_id = default_profile.id + m3u_profile_id = default_profile.id else: logger.error(f"No profile found for stream {stream_id}") return {'error': 'No profile found for stream'} else: # Use first available profile - profile_id = profiles.first().id + m3u_profile_id = profiles.first().id else: - stream_id, profile_id = channel.get_stream() - if stream_id is None or profile_id is None: + stream_id, m3u_profile_id = channel.get_stream() + if stream_id is None or m3u_profile_id is None: return {'error': 'No stream assigned to channel'} # Get the stream and profile objects directly stream = get_object_or_404(Stream, pk=stream_id) - profile = get_object_or_404(M3UAccountProfile, pk=profile_id) + profile = get_object_or_404(M3UAccountProfile, pk=m3u_profile_id) # Get the user agent from the M3U account m3u_account = M3UAccount.objects.get(id=profile.m3u_account.id) @@ -166,9 +166,9 @@ def get_stream_info_for_switch(channel_id: str, target_stream_id: Optional[int] 'url': stream_url, 'user_agent': user_agent, 'transcode': transcode, - 'profile': profile_value, + 'stream_profile': profile_value, 'stream_id': stream_id, - 'profile_id': profile_id + 'm3u_profile_id': m3u_profile_id } except Exception as e: logger.error(f"Error getting stream info for switch: {e}", exc_info=True) diff --git a/apps/proxy/ts_proxy/urls.py b/apps/proxy/ts_proxy/urls.py index cba06b6f..cb236aa2 100644 --- a/apps/proxy/ts_proxy/urls.py +++ b/apps/proxy/ts_proxy/urls.py @@ -10,4 +10,5 @@ urlpatterns = [ path('status/', views.channel_status, name='channel_status_detail'), path('stop/', views.stop_channel, name='stop_channel'), path('stop_client/', views.stop_client, name='stop_client'), + path('next_stream/', views.next_stream, name='next_stream'), ] diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 8455d490..67a75c4d 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -21,7 +21,7 @@ from rest_framework.permissions import IsAuthenticated from .constants import ChannelState, EventType, StreamType, ChannelMetadataField from .config_helper import ConfigHelper from .services.channel_service import ChannelService -from .url_utils import generate_stream_url, transform_url, get_stream_info_for_switch, get_stream_object +from .url_utils import generate_stream_url, transform_url, get_stream_info_for_switch, get_stream_object, get_alternate_streams from .utils import get_logger from uuid import UUID @@ -189,7 +189,7 @@ def stream_ts(request, channel_id): @csrf_exempt @api_view(['POST']) -@permission_classes([IsAuthenticated]) +#@permission_classes([IsAuthenticated]) def change_stream(request, channel_id): """Change stream URL for existing channel with enhanced diagnostics""" try: @@ -337,3 +337,75 @@ def stop_client(request, channel_id): except Exception as e: logger.error(f"Failed to stop client: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) + +@csrf_exempt +@api_view(['POST']) +#@permission_classes([IsAuthenticated]) +def next_stream(request, channel_id): + """Switch to the next available stream for a channel""" + try: + logger.info(f"Request to switch to next stream for channel {channel_id} received") + + # Check if the channel exists + channel = get_stream_object(channel_id) + + # Get current stream info to know which one we're currently using + current_stream_id, profile_id = channel.get_stream() + + if not current_stream_id: + return JsonResponse({'error': 'No current stream found for channel'}, status=404) + + # Get alternate streams excluding the current one + alternate_streams = get_alternate_streams(channel_id, current_stream_id) + + if not alternate_streams: + return JsonResponse({ + 'error': 'No alternate streams available for this channel', + 'current_stream_id': current_stream_id + }, status=404) + + # Pick the next stream from alternatives + next_stream = alternate_streams[0] # Get the first alternative + next_stream_id = next_stream['stream_id'] + + # Get full stream info including URL for the next stream + stream_info = get_stream_info_for_switch(channel_id, next_stream_id) + + if 'error' in stream_info: + return JsonResponse({ + 'error': stream_info['error'], + 'current_stream_id': current_stream_id, + 'next_stream_id': next_stream_id + }, status=404) + + # Now use the ChannelService to change the stream URL + result = ChannelService.change_stream_url( + channel_id, + stream_info['url'], + stream_info['user_agent'] + ) + + if result.get('status') == 'error': + return JsonResponse({ + 'error': result.get('message', 'Unknown error'), + 'diagnostics': result.get('diagnostics', {}), + 'current_stream_id': current_stream_id, + 'next_stream_id': next_stream_id + }, status=404) + + # Format success response + response_data = { + 'message': 'Stream switched to next available', + 'channel': channel_id, + 'previous_stream_id': current_stream_id, + 'new_stream_id': next_stream_id, + 'new_url': stream_info['url'], + 'owner': result.get('direct_update', False), + 'worker_id': proxy_server.worker_id + } + + return JsonResponse(response_data) + + except Exception as e: + logger.error(f"Failed to switch to next stream: {e}", exc_info=True) + return JsonResponse({'error': str(e)}, status=500) From 2f995b16fd286fdd9aaf70e6d9efad5c5a45c1d8 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 28 Mar 2025 08:50:57 -0500 Subject: [PATCH 08/13] Fixes stream switches not honouring user selected order. --- .../ts_proxy/services/channel_service.py | 33 +++++++++++-------- apps/proxy/ts_proxy/url_utils.py | 8 ++--- apps/proxy/ts_proxy/views.py | 25 +++++++++++--- 3 files changed, 42 insertions(+), 24 deletions(-) diff --git a/apps/proxy/ts_proxy/services/channel_service.py b/apps/proxy/ts_proxy/services/channel_service.py index 8b8a3d26..72209c96 100644 --- a/apps/proxy/ts_proxy/services/channel_service.py +++ b/apps/proxy/ts_proxy/services/channel_service.py @@ -95,6 +95,7 @@ class ChannelService: dict: Result information including success status and diagnostics """ # If no direct URL is provided but a target stream is, get URL from target stream + stream_id = None if not new_url and target_stream_id: stream_info = get_stream_info_for_switch(channel_id, target_stream_id) if 'error' in stream_info: @@ -104,6 +105,10 @@ class ChannelService: } new_url = stream_info['url'] user_agent = stream_info['user_agent'] + stream_id = target_stream_id + elif target_stream_id: + # If we have both URL and target_stream_id, use the target_stream_id + stream_id = target_stream_id # Check if channel exists in_local_managers = channel_id in proxy_server.stream_managers @@ -155,7 +160,7 @@ class ChannelService: # Update metadata in Redis regardless of ownership if proxy_server.redis_client: try: - ChannelService._update_channel_metadata(channel_id, new_url, user_agent) + ChannelService._update_channel_metadata(channel_id, new_url, user_agent, stream_id) result['metadata_updated'] = True except Exception as e: logger.error(f"Error updating Redis metadata: {e}", exc_info=True) @@ -180,7 +185,7 @@ class ChannelService: # If we're not the owner, publish an event for the owner to pick up logger.info(f"Not the owner, requesting URL change via Redis PubSub") if proxy_server.redis_client: - ChannelService._publish_stream_switch_event(channel_id, new_url, user_agent) + ChannelService._publish_stream_switch_event(channel_id, new_url, user_agent, stream_id) result.update({ 'direct_update': False, 'event_published': True, @@ -400,7 +405,7 @@ class ChannelService: # Helper methods for Redis operations @staticmethod - def _update_channel_metadata(channel_id, url, user_agent=None): + def _update_channel_metadata(channel_id, url, user_agent=None, stream_id=None): """Update channel metadata in Redis""" if not proxy_server.redis_client: return False @@ -411,23 +416,22 @@ class ChannelService: key_type = proxy_server.redis_client.type(metadata_key).decode('utf-8') logger.debug(f"Redis key {metadata_key} is of type: {key_type}") + # Build metadata update dict + metadata = {ChannelMetadataField.URL: url} + if user_agent: + metadata[ChannelMetadataField.USER_AGENT] = user_agent + if stream_id: + metadata[ChannelMetadataField.STREAM_ID] = str(stream_id) + logger.info(f"Updating stream ID to {stream_id} in Redis for channel {channel_id}") + # Use the appropriate method based on the key type if key_type == 'hash': - proxy_server.redis_client.hset(metadata_key, ChannelMetadataField.URL, url) - if user_agent: - proxy_server.redis_client.hset(metadata_key, ChannelMetadataField.USER_AGENT, user_agent) + proxy_server.redis_client.hset(metadata_key, mapping=metadata) elif key_type == 'none': # Key doesn't exist yet - # Create new hash with all required fields - metadata = {ChannelMetadataField.URL: url} - if user_agent: - metadata[ChannelMetadataField.USER_AGENT] = user_agent proxy_server.redis_client.hset(metadata_key, mapping=metadata) else: # If key exists with wrong type, delete it and recreate proxy_server.redis_client.delete(metadata_key) - metadata = {ChannelMetadataField.URL: url} - if user_agent: - metadata[ChannelMetadataField.USER_AGENT] = user_agent proxy_server.redis_client.hset(metadata_key, mapping=metadata) # Set switch request flag to ensure all workers see it @@ -438,7 +442,7 @@ class ChannelService: return True @staticmethod - def _publish_stream_switch_event(channel_id, new_url, user_agent=None): + def _publish_stream_switch_event(channel_id, new_url, user_agent=None, stream_id=None): """Publish a stream switch event to Redis pubsub""" if not proxy_server.redis_client: return False @@ -448,6 +452,7 @@ class ChannelService: "channel_id": channel_id, "url": new_url, "user_agent": user_agent, + "stream_id": stream_id, "requester": proxy_server.worker_id, "timestamp": time.time() } diff --git a/apps/proxy/ts_proxy/url_utils.py b/apps/proxy/ts_proxy/url_utils.py index c4767dd1..04440c03 100644 --- a/apps/proxy/ts_proxy/url_utils.py +++ b/apps/proxy/ts_proxy/url_utils.py @@ -194,8 +194,8 @@ def get_alternate_streams(channel_id: str, current_stream_id: Optional[int] = No logger.debug(f"Looking for alternate streams for channel {channel_id}, current stream ID: {current_stream_id}") - # Get all assigned streams for this channel - streams = channel.streams.all() + # Get all assigned streams for this channel using the correct ordering from the channelstream table + streams = channel.streams.all().order_by('channelstream__order') logger.debug(f"Channel {channel_id} has {streams.count()} total assigned streams") if not streams.exists(): @@ -204,7 +204,7 @@ def get_alternate_streams(channel_id: str, current_stream_id: Optional[int] = No alternate_streams = [] - # Process each stream + # Process each stream in the user-defined order for stream in streams: # Log each stream we're checking logger.debug(f"Checking stream ID {stream.id} ({stream.name}) for channel {channel_id}") @@ -215,8 +215,6 @@ def get_alternate_streams(channel_id: str, current_stream_id: Optional[int] = No continue # Find compatible profiles for this stream - # FIX: Looking at the error message, M3UAccountProfile doesn't have a 'stream' field - # We need to find which field relates M3UAccountProfile to Stream try: # Check if we can find profiles via m3u_account profiles = M3UAccountProfile.objects.filter(m3u_account=stream.m3u_account) diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 67a75c4d..375a7b79 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -346,13 +346,27 @@ def next_stream(request, channel_id): try: logger.info(f"Request to switch to next stream for channel {channel_id} received") - # Check if the channel exists - channel = get_stream_object(channel_id) + # First check if channel is active in Redis + current_stream_id = None + profile_id = None - # Get current stream info to know which one we're currently using - current_stream_id, profile_id = channel.get_stream() + if proxy_server.redis_client: + metadata_key = RedisKeys.channel_metadata(channel_id) + if proxy_server.redis_client.exists(metadata_key): + # Get current stream ID from Redis + stream_id_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.STREAM_ID) + if stream_id_bytes: + current_stream_id = int(stream_id_bytes.decode('utf-8')) + logger.info(f"Found current stream ID {current_stream_id} in Redis for channel {channel_id}") + + # Get M3U profile from Redis if available + profile_id_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.M3U_PROFILE) + if profile_id_bytes: + profile_id = int(profile_id_bytes.decode('utf-8')) + logger.info(f"Found M3U profile ID {profile_id} in Redis for channel {channel_id}") if not current_stream_id: + # Channel is not running return JsonResponse({'error': 'No current stream found for channel'}, status=404) # Get alternate streams excluding the current one @@ -382,7 +396,8 @@ def next_stream(request, channel_id): result = ChannelService.change_stream_url( channel_id, stream_info['url'], - stream_info['user_agent'] + stream_info['user_agent'], + next_stream_id # Pass the stream_id to be stored in Redis ) if result.get('status') == 'error': From cbf4fa33d6f00cccafafd0625a521bb9fcb23174 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 28 Mar 2025 09:04:04 -0500 Subject: [PATCH 09/13] Better method to keep track of next stream to use. --- apps/proxy/ts_proxy/views.py | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 375a7b79..652d6055 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -346,6 +346,9 @@ def next_stream(request, channel_id): try: logger.info(f"Request to switch to next stream for channel {channel_id} received") + # Check if the channel exists + channel = get_stream_object(channel_id) + # First check if channel is active in Redis current_stream_id = None profile_id = None @@ -369,18 +372,38 @@ def next_stream(request, channel_id): # Channel is not running return JsonResponse({'error': 'No current stream found for channel'}, status=404) - # Get alternate streams excluding the current one - alternate_streams = get_alternate_streams(channel_id, current_stream_id) + # Get all streams for this channel in their defined order + streams = list(channel.streams.all().order_by('channelstream__order')) - if not alternate_streams: + if len(streams) <= 1: return JsonResponse({ 'error': 'No alternate streams available for this channel', 'current_stream_id': current_stream_id }, status=404) - # Pick the next stream from alternatives - next_stream = alternate_streams[0] # Get the first alternative - next_stream_id = next_stream['stream_id'] + # Find the current stream's position in the list + current_index = None + for i, stream in enumerate(streams): + if stream.id == current_stream_id: + current_index = i + break + + if current_index is None: + logger.warning(f"Current stream ID {current_stream_id} not found in channel's streams list") + # Fall back to the first stream that's not the current one + next_stream = next((s for s in streams if s.id != current_stream_id), None) + if not next_stream: + return JsonResponse({ + 'error': 'Could not find current stream in channel list', + 'current_stream_id': current_stream_id + }, status=404) + else: + # Get the next stream in the rotation (with wrap-around) + next_index = (current_index + 1) % len(streams) + next_stream = streams[next_index] + + next_stream_id = next_stream.id + logger.info(f"Rotating to next stream ID {next_stream_id} for channel {channel_id}") # Get full stream info including URL for the next stream stream_info = get_stream_info_for_switch(channel_id, next_stream_id) From 530788877d01583a7e6eceeddeb5d57d1b5f2527 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 28 Mar 2025 09:39:35 -0500 Subject: [PATCH 10/13] Implement heartbeat thread exit condition after consecutive empty checks --- apps/proxy/ts_proxy/client_manager.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/apps/proxy/ts_proxy/client_manager.py b/apps/proxy/ts_proxy/client_manager.py index 42d7e04d..98dbf072 100644 --- a/apps/proxy/ts_proxy/client_manager.py +++ b/apps/proxy/ts_proxy/client_manager.py @@ -38,6 +38,11 @@ class ClientManager: def _start_heartbeat_thread(self): """Start thread to regularly refresh client presence in Redis""" def heartbeat_task(): + no_clients_count = 0 # Track consecutive empty cycles + max_empty_cycles = 3 # Exit after this many consecutive empty checks + + logger.debug(f"Started heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") + while True: try: # Wait for the interval @@ -46,7 +51,19 @@ class ClientManager: # Send heartbeat for all local clients with self.lock: if not self.clients or not self.redis_client: + # No clients left, increment our counter + no_clients_count += 1 + + # If we've seen no clients for several consecutive checks, exit the thread + if no_clients_count >= max_empty_cycles: + logger.info(f"No clients for channel {self.channel_id} after {no_clients_count} consecutive checks, exiting heartbeat thread") + return # This exits the thread + + # Skip this cycle if we have no clients continue + else: + # Reset counter when we see clients + no_clients_count = 0 # IMPROVED GHOST DETECTION: Check for stale clients before sending heartbeats current_time = time.time() From c99e6b68bcf1c5bbcde1bb5acee9373c567a51a0 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 28 Mar 2025 09:39:57 -0500 Subject: [PATCH 11/13] Increase the number of threads from 4 to 8 in uwsgi configuration for improved concurrency --- docker/uwsgi.debug.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/uwsgi.debug.ini b/docker/uwsgi.debug.ini index f8df7bdc..957f4f4c 100644 --- a/docker/uwsgi.debug.ini +++ b/docker/uwsgi.debug.ini @@ -27,7 +27,7 @@ static-map = /static=/app/static # Worker configuration workers = 1 -threads = 4 +threads = 8 enable-threads = true lazy-apps = true From dde9e9687891645740293183762e7815c9467292 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 28 Mar 2025 11:03:36 -0500 Subject: [PATCH 12/13] Update Redis key prefix for Celery task results --- dispatcharr/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 9b381bfd..95e67c71 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -153,7 +153,7 @@ CELERY_RESULT_BACKEND = CELERY_BROKER_URL # Configure Redis key prefix CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = { - 'prefix': 'celery-task:', # Set the Redis key prefix for Celery + 'global_keyprefix': 'celery-tasks:', # Set the Redis key prefix for Celery } # Set TTL (Time-to-Live) for task results (in seconds) From 8292e8ae171a9b7941ed52f2693caca8d86f2983 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Fri, 28 Mar 2025 11:58:07 -0500 Subject: [PATCH 13/13] Change next_stream and change_stream back to authentication required after changing for testing. --- apps/proxy/ts_proxy/views.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 652d6055..e65a1744 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -189,7 +189,7 @@ def stream_ts(request, channel_id): @csrf_exempt @api_view(['POST']) -#@permission_classes([IsAuthenticated]) +@permission_classes([IsAuthenticated]) def change_stream(request, channel_id): """Change stream URL for existing channel with enhanced diagnostics""" try: @@ -340,7 +340,7 @@ def stop_client(request, channel_id): @csrf_exempt @api_view(['POST']) -#@permission_classes([IsAuthenticated]) +@permission_classes([IsAuthenticated]) def next_stream(request, channel_id): """Switch to the next available stream for a channel""" try: