From 85e41d5def8001361d32144fe8db7173a9949bf5 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Mon, 3 Mar 2025 18:40:30 -0600 Subject: [PATCH] Initial integration. --- apps/proxy/__init__.py | 1 + apps/proxy/apps.py | 15 ++ apps/proxy/config.py | 24 ++ apps/proxy/hls_proxy/__init__.py | 0 apps/proxy/{hls_proxy => hls_proxy/server.py} | 221 ++---------------- apps/proxy/hls_proxy/urls.py | 10 + apps/proxy/hls_proxy/views.py | 36 +++ apps/proxy/management/commands/proxy.py | 67 ++++++ apps/proxy/signals.py | 19 ++ apps/proxy/ts_proxy/__init__.py | 0 apps/proxy/{ts_proxy => ts_proxy/server.py} | 102 +------- apps/proxy/ts_proxy/urls.py | 9 + apps/proxy/ts_proxy/views.py | 72 ++++++ apps/proxy/urls.py | 14 ++ apps/proxy/views.py | 64 +++++ dispatcharr/settings.py | 23 ++ dispatcharr/urls.py | 3 + frontend/src/components/Navigation.js | 11 + frontend/src/components/ProxyManager.js | 82 +++++++ frontend/src/routes.js | 14 ++ 20 files changed, 484 insertions(+), 303 deletions(-) create mode 100644 apps/proxy/__init__.py create mode 100644 apps/proxy/apps.py create mode 100644 apps/proxy/config.py create mode 100644 apps/proxy/hls_proxy/__init__.py rename apps/proxy/{hls_proxy => hls_proxy/server.py} (84%) create mode 100644 apps/proxy/hls_proxy/urls.py create mode 100644 apps/proxy/hls_proxy/views.py create mode 100644 apps/proxy/management/commands/proxy.py create mode 100644 apps/proxy/signals.py create mode 100644 apps/proxy/ts_proxy/__init__.py rename apps/proxy/{ts_proxy => ts_proxy/server.py} (68%) create mode 100644 apps/proxy/ts_proxy/urls.py create mode 100644 apps/proxy/ts_proxy/views.py create mode 100644 apps/proxy/urls.py create mode 100644 apps/proxy/views.py create mode 100644 frontend/src/components/Navigation.js create mode 100644 frontend/src/components/ProxyManager.js create mode 100644 frontend/src/routes.js diff --git a/apps/proxy/__init__.py b/apps/proxy/__init__.py new file mode 100644 index 00000000..3310e884 --- /dev/null +++ b/apps/proxy/__init__.py @@ -0,0 +1 @@ +"""Proxy application package""" \ No newline at end of file diff --git a/apps/proxy/apps.py b/apps/proxy/apps.py new file mode 100644 index 00000000..c8c42088 --- /dev/null +++ b/apps/proxy/apps.py @@ -0,0 +1,15 @@ +from django.apps import AppConfig + +class ProxyConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'apps.proxy' + verbose_name = "Stream Proxies" + + def ready(self): + """Initialize proxy servers when Django starts""" + from .hls_proxy.server import ProxyServer as HLSProxyServer + from .ts_proxy.server import ProxyServer as TSProxyServer + + # Initialize proxy servers + self.hls_proxy = HLSProxyServer() + self.ts_proxy = TSProxyServer() \ No newline at end of file diff --git a/apps/proxy/config.py b/apps/proxy/config.py new file mode 100644 index 00000000..2395e7e3 --- /dev/null +++ b/apps/proxy/config.py @@ -0,0 +1,24 @@ +"""Shared configuration between proxy types""" + +class BaseConfig: + DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20' + CHUNK_SIZE = 8192 + CLIENT_POLL_INTERVAL = 0.1 + MAX_RETRIES = 3 + +class HLSConfig(BaseConfig): + MIN_SEGMENTS = 12 + MAX_SEGMENTS = 16 + WINDOW_SIZE = 12 + INITIAL_SEGMENTS = 3 + INITIAL_CONNECTION_WINDOW = 10 + CLIENT_TIMEOUT_FACTOR = 1.5 + CLIENT_CLEANUP_INTERVAL = 10 + FIRST_SEGMENT_TIMEOUT = 5.0 + INITIAL_BUFFER_SECONDS = 25.0 + MAX_INITIAL_SEGMENTS = 10 + BUFFER_READY_TIMEOUT = 30.0 + +class TSConfig(BaseConfig): + BUFFER_SIZE = 1000 + RECONNECT_DELAY = 5 \ No newline at end of file diff --git a/apps/proxy/hls_proxy/__init__.py b/apps/proxy/hls_proxy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/proxy/hls_proxy b/apps/proxy/hls_proxy/server.py similarity index 84% rename from apps/proxy/hls_proxy rename to apps/proxy/hls_proxy/server.py index 98ba38a5..c51e5e04 100644 --- a/apps/proxy/hls_proxy +++ b/apps/proxy/hls_proxy/server.py @@ -7,7 +7,6 @@ This proxy handles HLS live streams with support for: - Connection pooling and reuse """ -from flask import Flask, Response, request, jsonify import requests import threading import logging @@ -18,33 +17,13 @@ import argparse from typing import Optional, Dict, List, Set, Deque import sys import os - -# Initialize Flask app -app = Flask(__name__) +from apps.proxy.config import HLSConfig as Config # Global state management manifest_buffer = None # Stores current manifest content segment_buffers = {} # Maps sequence numbers to segment data buffer_lock = threading.Lock() # Synchronizes access to buffers -class Config: - """Configuration settings for stream handling and buffering""" - # Buffer size settings - MIN_SEGMENTS = 12 # Minimum segments to maintain - MAX_SEGMENTS = 16 # Maximum segments to store - WINDOW_SIZE = 12 # Number of segments in manifest window - INITIAL_SEGMENTS = 3 # Initial segments to buffer before playback - DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20' - INITIAL_CONNECTION_WINDOW = 10 # Seconds to wait for first client - CLIENT_TIMEOUT_FACTOR = 1.5 # Multiplier for target duration to determine client timeout - CLIENT_CLEANUP_INTERVAL = 10 # Seconds between client cleanup checks - FIRST_SEGMENT_TIMEOUT = 5.0 # Seconds to wait for first segment - - # Initial buffering settings - INITIAL_BUFFER_SECONDS = 25.0 # Initial buffer in seconds before allowing clients - MAX_INITIAL_SEGMENTS = 10 # Maximum segments to fetch during initialization - BUFFER_READY_TIMEOUT = 30.0 # Maximum time to wait for initial buffer (seconds) - class StreamBuffer: """ Manages buffering of stream segments with thread-safe access. @@ -564,7 +543,7 @@ class StreamFetcher: # Normal operation - get latest segment if we haven't already latest_segment = manifest.segments[-1] - if latest_segment.uri in downloaded_segments: + if (latest_segment.uri in downloaded_segments): # Wait for next manifest update time.sleep(self.manager.target_duration * 0.5) continue @@ -817,176 +796,33 @@ def fetch_stream(fetcher: StreamFetcher, stop_event: threading.Event, start_sequ retry_delay = min(retry_delay * 2, max_retry_delay) manifest_update_needed = True - - -@app.before_request -def log_request_info(): - """ - Log client connections and important requests. - - Logs: - INFO: - - First manifest request from new client - - Stream switch requests - DEBUG: - - All other requests - - Format: - {client_ip} - {method} {path} - """ - if request.path == '/stream.m3u8' and not segment_buffers: - # First manifest request from a client - logging.info(f"New client connected from {request.remote_addr}") - elif request.path.startswith('/change_stream'): - # Keep stream switch requests as INFO - logging.info(f"Stream switch requested from {request.remote_addr}") - else: - # Move routine requests to DEBUG - logging.debug(f"{request.remote_addr} - {request.method} {request.path}") - -# Configure Werkzeug logger to DEBUG -logging.getLogger('werkzeug').setLevel(logging.DEBUG) - class ProxyServer: - """Manages HLS proxy server instance""" - def __init__(self, user_agent: Optional[str] = None): - self.app = Flask(__name__) self.stream_managers: Dict[str, StreamManager] = {} self.stream_buffers: Dict[str, StreamBuffer] = {} self.client_managers: Dict[str, ClientManager] = {} self.fetch_threads: Dict[str, threading.Thread] = {} self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT - self._setup_routes() + # Remove Flask-specific routing def _setup_routes(self) -> None: - """Configure Flask routes""" - self.app.add_url_rule( - '/stream/', # Changed from //stream.m3u8 - view_func=self.stream_endpoint - ) - self.app.add_url_rule( - '/stream//segments/', # Updated to match new pattern - view_func=self.get_segment - ) - self.app.add_url_rule( - '/change_stream/', # Changed from //change_stream - view_func=self.change_stream, - methods=['POST'] - ) - - def initialize_channel(self, url: str, channel_id: str) -> None: - """Initialize a new channel stream""" - if channel_id in self.stream_managers: - self.stop_channel(channel_id) - - manager = StreamManager( - url, - channel_id, - user_agent=self.user_agent - ) - buffer = StreamBuffer() - client_manager = ClientManager() - - # Set up references - manager.client_manager = client_manager - manager.proxy_server = self - - # Store resources - self.stream_managers[channel_id] = manager - self.stream_buffers[channel_id] = buffer - self.client_managers[channel_id] = client_manager - - # Create and store fetcher - fetcher = StreamFetcher(manager, buffer) - manager.fetcher = fetcher - - # Start fetch thread - self.fetch_threads[channel_id] = threading.Thread( - target=fetcher.fetch_loop, - name=f"StreamFetcher-{channel_id}", - daemon=True - ) - self.fetch_threads[channel_id].start() - - # Start cleanup monitoring immediately - manager.start_cleanup_thread() - - logging.info(f"Initialized channel {channel_id} with URL {url}") - - def stop_channel(self, channel_id: str) -> None: - """Stop and cleanup a channel""" - if channel_id in self.stream_managers: - self.stream_managers[channel_id].stop() - if channel_id in self.fetch_threads: - self.fetch_threads[channel_id].join(timeout=5) - self._cleanup_channel(channel_id) - - def _cleanup_channel(self, channel_id: str) -> None: - """ - Remove all resources associated with a channel. - - Args: - channel_id: Channel to cleanup - - Removes: - - Stream manager instance - - Segment buffer - - Client manager - - Fetch thread reference - - Thread safety: - Should only be called after stream manager is stopped - and fetch thread has completed - """ - - for collection in [self.stream_managers, self.stream_buffers, - self.client_managers, self.fetch_threads]: - collection.pop(channel_id, None) - - def run(self, host: str = '0.0.0.0', port: int = 5000) -> None: - """Start the proxy server""" - try: - self.app.run(host=host, port=port, threaded=True) - except KeyboardInterrupt: - logging.info("Shutting down gracefully...") - self.shutdown() - except Exception as e: - logging.error(f"Server error: {e}") - self.shutdown() - raise - - def shutdown(self) -> None: - """ - Stop all channels and cleanup resources. - - Steps: - 1. Stop all active stream managers - 2. Join fetch threads - 3. Clean up channel resources - 4. Release system resources - - Thread Safety: - Safe to call from signal handlers or during shutdown - """ - for channel_id in list(self.stream_managers.keys()): - self.stop_channel(channel_id) + pass + # Update methods to return data instead of Flask Response objects def stream_endpoint(self, channel_id: str): - """Flask route handler for serving HLS manifests.""" if channel_id not in self.stream_managers: - return Response('Channel not found', status=404) + return 'Channel not found', 404 manager = self.stream_managers[channel_id] # Wait for initial buffer if not manager.buffer_ready.wait(Config.BUFFER_READY_TIMEOUT): logging.error(f"Timeout waiting for initial buffer for channel {channel_id}") - return Response('Initial buffer not ready', status=503) + return 'Initial buffer not ready', 503 try: if (channel_id not in self.stream_managers) or (not self.stream_managers[channel_id].running): - return Response('Channel not found', status=404) + return 'Channel not found', 404 manager = self.stream_managers[channel_id] buffer = self.stream_buffers[channel_id] @@ -1006,7 +842,7 @@ class ProxyServer: if time.time() - start_time > Config.FIRST_SEGMENT_TIMEOUT: logging.warning(f"Timeout waiting for first segment for channel {channel_id}") - return Response('No segments available', status=503) + return 'No segments available', 503 time.sleep(0.1) # Short sleep to prevent CPU spinning @@ -1050,7 +886,7 @@ class ProxyServer: manifest_content = '\n'.join(new_manifest) logging.debug(f"Serving manifest with segments {min_seq}-{max_seq} (window: {len(window_segments)})") - return Response(manifest_content, content_type='application/vnd.apple.mpegurl') + return manifest_content, 200 # Return content and status code except ConnectionAbortedError: logging.debug("Client disconnected") return '', 499 @@ -1077,7 +913,7 @@ class ProxyServer: - Returns 404 on any error """ if channel_id not in self.stream_managers: - return Response('Channel not found', status=404) + return 'Channel not found', 404 try: # Record client activity @@ -1089,7 +925,7 @@ class ProxyServer: with buffer_lock: if segment_id in buffer: - return Response(buffer[segment_id], content_type='video/MP2T') + return buffer[segment_id], 200 # Return content and status code logging.warning(f"Segment {segment_id} not found for channel {channel_id}") except Exception as e: @@ -1122,45 +958,24 @@ class ProxyServer: - Maintains segment numbering """ if channel_id not in self.stream_managers: - return jsonify({'error': 'Channel not found'}), 404 + return {'error': 'Channel not found'}, 404 new_url = request.json.get('url') if not new_url: - return jsonify({'error': 'No URL provided'}), 400 + return {'error': 'No URL provided'}, 400 manager = self.stream_managers[channel_id] if manager.update_url(new_url): - return jsonify({ + return { 'message': 'Stream URL updated', 'channel': channel_id, 'url': new_url - }) - return jsonify({ + }, 200 + return { 'message': 'URL unchanged', 'channel': channel_id, 'url': new_url - }) - - @app.before_request - def log_request_info(): - """ - Log client connections and important requests. - - Log Levels: - INFO: - - First manifest request from new client - - Stream switch requests - DEBUG: - - Segment requests - - Routine manifest updates - - Format: - "{client_ip} - {method} {path}" - - Side Effects: - - Updates logging configuration based on request type - - Tracks client connections - """ + }, 200 # Main Application Setup if __name__ == '__main__': diff --git a/apps/proxy/hls_proxy/urls.py b/apps/proxy/hls_proxy/urls.py new file mode 100644 index 00000000..3f7b901b --- /dev/null +++ b/apps/proxy/hls_proxy/urls.py @@ -0,0 +1,10 @@ +from django.urls import path +from . import views + +app_name = 'hls_proxy' + +urlpatterns = [ + path('stream/', views.stream_endpoint, name='stream'), + path('stream//segments/', views.get_segment, name='segment'), + path('change_stream/', views.change_stream, name='change_stream'), +] \ No newline at end of file diff --git a/apps/proxy/hls_proxy/views.py b/apps/proxy/hls_proxy/views.py new file mode 100644 index 00000000..361475f0 --- /dev/null +++ b/apps/proxy/hls_proxy/views.py @@ -0,0 +1,36 @@ +from django.http import StreamingHttpResponse, JsonResponse, HttpResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods +import json +from .server import ProxyServer, Config + +proxy_server = ProxyServer() + +@require_http_methods(["GET"]) +def stream_endpoint(request, channel_id): + """Serve HLS manifest""" + response = proxy_server.stream_endpoint(channel_id) + return StreamingHttpResponse( + response.response[0], + content_type='application/vnd.apple.mpegurl', + status=response.status_code + ) + +@require_http_methods(["GET"]) +def get_segment(request, channel_id, segment_name): + """Serve MPEG-TS segments""" + response = proxy_server.get_segment(channel_id, segment_name) + if response[1] == 404: + return HttpResponse(status=404) + return StreamingHttpResponse(response[0], content_type='video/MP2T') + +@csrf_exempt +@require_http_methods(["POST"]) +def change_stream(request, channel_id): + """Handle stream URL changes""" + try: + data = json.loads(request.body) + response = proxy_server.change_stream(channel_id) + return JsonResponse(response[0], status=response[1]) + except json.JSONDecodeError: + return JsonResponse({'error': 'Invalid JSON'}, status=400) \ No newline at end of file diff --git a/apps/proxy/management/commands/proxy.py b/apps/proxy/management/commands/proxy.py new file mode 100644 index 00000000..f82f08e8 --- /dev/null +++ b/apps/proxy/management/commands/proxy.py @@ -0,0 +1,67 @@ +from django.core.management.base import BaseCommand +from django.apps import apps +import logging + +logger = logging.getLogger(__name__) + +class Command(BaseCommand): + help = 'Manage proxy servers' + + def add_arguments(self, parser): + parser.add_argument( + 'action', + choices=['start', 'stop', 'restart'], + help='Action to perform' + ) + parser.add_argument( + '--type', + choices=['hls', 'ts', 'all'], + default='all', + help='Type of proxy to manage' + ) + parser.add_argument( + '--channel', + help='Channel ID to manage' + ) + parser.add_argument( + '--url', + help='Stream URL (required for start)' + ) + + def handle(self, *args, **options): + proxy_app = apps.get_app_config('proxy') + action = options['action'] + proxy_type = options['type'] + channel = options.get('channel') + url = options.get('url') + + try: + if action == 'start': + if not url: + raise ValueError("URL is required for start action") + if proxy_type in ('hls', 'all'): + proxy_app.hls_proxy.initialize_channel(url, channel or 'default') + if proxy_type in ('ts', 'all'): + proxy_app.ts_proxy.initialize_channel(url, channel or 'default') + self.stdout.write(self.style.SUCCESS('Started proxy servers')) + + elif action == 'stop': + if proxy_type in ('hls', 'all'): + if channel: + proxy_app.hls_proxy.stop_channel(channel) + else: + proxy_app.hls_proxy.shutdown() + if proxy_type in ('ts', 'all'): + if channel: + proxy_app.ts_proxy.stop_channel(channel) + else: + proxy_app.ts_proxy.shutdown() + self.stdout.write(self.style.SUCCESS('Stopped proxy servers')) + + elif action == 'restart': + self.handle(*args, **dict(options, action='stop')) + self.handle(*args, **dict(options, action='start')) + self.stdout.write(self.style.SUCCESS('Restarted proxy servers')) + + except Exception as e: + self.stderr.write(self.style.ERROR(f'Error: {e}')) \ No newline at end of file diff --git a/apps/proxy/signals.py b/apps/proxy/signals.py new file mode 100644 index 00000000..cce54b15 --- /dev/null +++ b/apps/proxy/signals.py @@ -0,0 +1,19 @@ +from django.db.models.signals import pre_delete +from django.dispatch import receiver +from django.apps import apps +import logging + +logger = logging.getLogger(__name__) + +@receiver(pre_delete) +def cleanup_proxy_servers(sender, **kwargs): + """Clean up proxy servers when Django shuts down""" + try: + proxy_app = apps.get_app_config('proxy') + for channel_id in list(proxy_app.hls_proxy.stream_managers.keys()): + proxy_app.hls_proxy.stop_channel(channel_id) + for channel_id in list(proxy_app.ts_proxy.stream_managers.keys()): + proxy_app.ts_proxy.stop_channel(channel_id) + logger.info("Proxy servers cleaned up successfully") + except Exception as e: + logger.error(f"Error during proxy server cleanup: {e}") \ No newline at end of file diff --git a/apps/proxy/ts_proxy/__init__.py b/apps/proxy/ts_proxy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/proxy/ts_proxy b/apps/proxy/ts_proxy/server.py similarity index 68% rename from apps/proxy/ts_proxy rename to apps/proxy/ts_proxy/server.py index ec6d62d7..95dc8720 100644 --- a/apps/proxy/ts_proxy +++ b/apps/proxy/ts_proxy/server.py @@ -7,23 +7,13 @@ Handles live TS stream proxying with support for: - Connection state tracking """ -from flask import Flask, Response, request, jsonify import requests import threading import logging from collections import deque import time -import os from typing import Optional, Set, Deque, Dict - -# Configuration -class Config: - CHUNK_SIZE: int = 8192 # Buffer chunk size (bytes) - BUFFER_SIZE: int = 1000 # Number of chunks to keep in memory - RECONNECT_DELAY: int = 5 # Seconds between reconnection attempts - CLIENT_POLL_INTERVAL: float = 0.1 # Seconds between client buffer checks - MAX_RETRIES: int = 3 # Maximum connection retry attempts - DEFAULT_USER_AGENT: str = 'VLC/3.0.20 LibVLC/3.0.20' # Default user agent +from apps.proxy.config import TSConfig as Config class StreamManager: """Manages TS stream state and connection handling""" @@ -183,19 +173,12 @@ class ProxyServer: """Manages TS proxy server instance""" def __init__(self, user_agent: Optional[str] = None): - self.app = Flask(__name__) self.stream_managers: Dict[str, StreamManager] = {} self.stream_buffers: Dict[str, StreamBuffer] = {} self.client_managers: Dict[str, ClientManager] = {} self.fetch_threads: Dict[str, threading.Thread] = {} self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT - self._setup_routes() - def _setup_routes(self) -> None: - """Configure Flask routes""" - self.app.route('/stream/')(self.stream_endpoint) - self.app.route('/change_stream/', methods=['POST'])(self.change_stream) - def initialize_channel(self, url: str, channel_id: str) -> None: """Initialize a new channel stream""" if channel_id in self.stream_managers: @@ -221,7 +204,7 @@ class ProxyServer: ) self.fetch_threads[channel_id].start() logging.info(f"Initialized channel {channel_id} with URL {url}") - + def stop_channel(self, channel_id: str) -> None: """Stop and cleanup a channel""" if channel_id in self.stream_managers: @@ -235,89 +218,8 @@ class ProxyServer: for collection in [self.stream_managers, self.stream_buffers, self.client_managers, self.fetch_threads]: collection.pop(channel_id, None) - - def stream_endpoint(self, channel_id: str): - """Stream endpoint that serves TS data to clients""" - if channel_id not in self.stream_managers: - return Response('Channel not found', status=404) - - def generate(): - client_id = threading.get_ident() - buffer = self.stream_buffers[channel_id] - client_manager = self.client_managers[channel_id] - - client_manager.add_client(client_id) - last_index = buffer.index - - try: - while True: - with buffer.lock: - if buffer.index > last_index: - chunks_behind = buffer.index - last_index - start_pos = max(0, len(buffer.buffer) - chunks_behind) - - for i in range(start_pos, len(buffer.buffer)): - yield buffer.buffer[i] - last_index = buffer.index - - threading.Event().wait(Config.CLIENT_POLL_INTERVAL) - except GeneratorExit: - remaining = client_manager.remove_client(client_id) - if remaining == 0: - logging.info(f"No clients remaining for channel {channel_id}") - self.stop_channel(channel_id) - - return Response(generate(), content_type='video/mp2t') - def change_stream(self, channel_id: str): - """Handle stream URL changes""" - if channel_id not in self.stream_managers: - return jsonify({'error': 'Channel not found'}), 404 - - new_url = request.json.get('url') - if not new_url: - return jsonify({'error': 'No URL provided'}), 400 - - manager = self.stream_managers[channel_id] - if manager.update_url(new_url): - return jsonify({ - 'message': 'Stream URL updated', - 'channel': channel_id, - 'url': new_url - }) - return jsonify({ - 'message': 'URL unchanged', - 'channel': channel_id, - 'url': new_url - }) - - def run(self, host: str = '0.0.0.0', port: int = 5000) -> None: - """Start the proxy server""" - self.app.run(host=host, port=port, threaded=True) - def shutdown(self) -> None: """Stop all channels and cleanup""" for channel_id in list(self.stream_managers.keys()): self.stop_channel(channel_id) - -def main(): - """Initialize and start the proxy server""" - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' - ) - - logging.getLogger('werkzeug').setLevel(logging.DEBUG) - - proxy_server = ProxyServer() - initial_url = os.getenv('STREAM_URL', 'http://example.com/stream.ts') - proxy_server.initialize_channel(initial_url, "default_channel") - - try: - proxy_server.run() - finally: - proxy_server.shutdown() - -if __name__ == '__main__': - main() diff --git a/apps/proxy/ts_proxy/urls.py b/apps/proxy/ts_proxy/urls.py new file mode 100644 index 00000000..798f4b11 --- /dev/null +++ b/apps/proxy/ts_proxy/urls.py @@ -0,0 +1,9 @@ +from django.urls import path +from . import views + +app_name = 'ts_proxy' + +urlpatterns = [ + path('stream/', views.stream_ts, name='stream'), + path('change_stream/', views.change_stream, name='change_stream'), +] \ No newline at end of file diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py new file mode 100644 index 00000000..a6f626de --- /dev/null +++ b/apps/proxy/ts_proxy/views.py @@ -0,0 +1,72 @@ +from django.http import StreamingHttpResponse, JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods +import json +from .server import ProxyServer + +proxy_server = ProxyServer() + +@require_http_methods(["GET"]) +def stream_ts(request, channel_id): + """Handle TS stream requests""" + if channel_id not in proxy_server.stream_managers: + return StreamingHttpResponse('Channel not found', status=404) + + def generate(): + client_id = threading.get_ident() + buffer = proxy_server.stream_buffers[channel_id] + client_manager = proxy_server.client_managers[channel_id] + + client_manager.add_client(client_id) + last_index = buffer.index + + try: + while True: + with buffer.lock: + if buffer.index > last_index: + chunks_behind = buffer.index - last_index + start_pos = max(0, len(buffer.buffer) - chunks_behind) + + for i in range(start_pos, len(buffer.buffer)): + yield buffer.buffer[i] + last_index = buffer.index + + time.sleep(Config.CLIENT_POLL_INTERVAL) + except Exception: + remaining = client_manager.remove_client(client_id) + if remaining == 0: + proxy_server.stop_channel(channel_id) + raise + + return StreamingHttpResponse( + generate(), + content_type='video/MP2T' + ) + +@csrf_exempt +@require_http_methods(["POST"]) +def change_stream(request, channel_id): + """Handle stream URL changes""" + try: + data = json.loads(request.body) + new_url = data.get('url') + if not new_url: + return JsonResponse({'error': 'No URL provided'}, status=400) + + if channel_id not in proxy_server.stream_managers: + return JsonResponse({'error': 'Channel not found'}, status=404) + + manager = proxy_server.stream_managers[channel_id] + if manager.update_url(new_url): + return JsonResponse({ + 'message': 'Stream URL updated', + 'channel': channel_id, + 'url': new_url + }) + return JsonResponse({ + 'message': 'URL unchanged', + 'channel': channel_id, + 'url': new_url + }) + except json.JSONDecodeError: + return JsonResponse({'error': 'Invalid JSON'}, status=400) \ No newline at end of file diff --git a/apps/proxy/urls.py b/apps/proxy/urls.py new file mode 100644 index 00000000..dab71150 --- /dev/null +++ b/apps/proxy/urls.py @@ -0,0 +1,14 @@ +from django.urls import path, include +from rest_framework.routers import DefaultRouter +from . import views + +router = DefaultRouter() +router.register(r'proxy', views.ProxyViewSet, basename='proxy') + +app_name = 'proxy' + +urlpatterns = [ + path('api/', include(router.urls)), + path('hls/', include('apps.proxy.hls_proxy.urls')), + path('ts/', include('apps.proxy.ts_proxy.urls')), +] \ No newline at end of file diff --git a/apps/proxy/views.py b/apps/proxy/views.py new file mode 100644 index 00000000..d816992f --- /dev/null +++ b/apps/proxy/views.py @@ -0,0 +1,64 @@ +from rest_framework import viewsets, status +from rest_framework.decorators import action +from rest_framework.response import Response +from django.apps import apps +import logging + +logger = logging.getLogger(__name__) + +class ProxyViewSet(viewsets.ViewSet): + """ViewSet for managing proxy servers""" + + @action(detail=False, methods=['post']) + def start(self, request): + """Start a proxy server for a channel""" + try: + proxy_type = request.data.get('type', 'hls') + channel_id = request.data.get('channel', 'default') + url = request.data.get('url') + + if not url: + return Response( + {'error': 'URL is required'}, + status=status.HTTP_400_BAD_REQUEST + ) + + proxy_app = apps.get_app_config('proxy') + proxy_server = getattr(proxy_app, f'{proxy_type}_proxy') + proxy_server.initialize_channel(url, channel_id) + + return Response({ + 'message': f'{proxy_type.upper()} proxy started', + 'channel': channel_id, + 'url': url + }) + + except Exception as e: + logger.error(f"Error starting proxy: {e}") + return Response( + {'error': str(e)}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + @action(detail=False, methods=['post']) + def stop(self, request): + """Stop a proxy server for a channel""" + try: + proxy_type = request.data.get('type', 'hls') + channel_id = request.data.get('channel', 'default') + + proxy_app = apps.get_app_config('proxy') + proxy_server = getattr(proxy_app, f'{proxy_type}_proxy') + proxy_server.stop_channel(channel_id) + + return Response({ + 'message': f'{proxy_type.upper()} proxy stopped', + 'channel': channel_id + }) + + except Exception as e: + logger.error(f"Error stopping proxy: {e}") + return Response( + {'error': str(e)}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) \ No newline at end of file diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index ab421fe7..1b596b6c 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -19,6 +19,7 @@ INSTALLED_APPS = [ 'apps.hdhr', 'apps.m3u', 'apps.output', + 'apps.proxy.apps.ProxyConfig', 'core', 'drf_yasg', 'django.contrib.admin', @@ -155,3 +156,25 @@ SIMPLE_JWT = { 'ROTATE_REFRESH_TOKENS': False, # Optional: Whether to rotate refresh tokens 'BLACKLIST_AFTER_ROTATION': True, # Optional: Whether to blacklist refresh tokens } + +# Proxy Settings +PROXY_SETTINGS = { + 'HLS': { + 'DEFAULT_URL': '', # Default HLS stream URL if needed + 'BUFFER_SIZE': 1000, + 'USER_AGENT': 'VLC/3.0.20 LibVLC/3.0.20', + 'CHUNK_SIZE': 8192, + 'CLIENT_POLL_INTERVAL': 0.1, + 'MAX_RETRIES': 3, + 'MIN_SEGMENTS': 12, + 'MAX_SEGMENTS': 16, + 'WINDOW_SIZE': 12, + 'INITIAL_SEGMENTS': 3, + }, + 'TS': { + 'DEFAULT_URL': '', # Default TS stream URL if needed + 'BUFFER_SIZE': 1000, + 'RECONNECT_DELAY': 5, + 'USER_AGENT': 'VLC/3.0.20 LibVLC/3.0.20', + } +} diff --git a/dispatcharr/urls.py b/dispatcharr/urls.py index 4aa72aa6..46992c56 100644 --- a/dispatcharr/urls.py +++ b/dispatcharr/urls.py @@ -50,6 +50,9 @@ urlpatterns = [ # Catch-all route to serve React's index.html for non-API, non-admin paths path('', TemplateView.as_view(template_name='index.html')), # React entry point + # Add proxy apps + path('proxy/', include('apps.proxy.urls')), + ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) # Serve static files for development (React's JS, CSS, etc.) diff --git a/frontend/src/components/Navigation.js b/frontend/src/components/Navigation.js new file mode 100644 index 00000000..29fbc028 --- /dev/null +++ b/frontend/src/components/Navigation.js @@ -0,0 +1,11 @@ +// ...existing imports... + +const menuItems = [ + ...existing items..., + { + key: 'proxy', + label: 'Proxy Manager', + icon: , + path: '/proxy', + }, +]; \ No newline at end of file diff --git a/frontend/src/components/ProxyManager.js b/frontend/src/components/ProxyManager.js new file mode 100644 index 00000000..7baa3154 --- /dev/null +++ b/frontend/src/components/ProxyManager.js @@ -0,0 +1,82 @@ +import React, { useState } from 'react'; +import { Button, Form, Input, Select, message } from 'antd'; +import axios from 'axios'; + +const { Option } = Select; + +const ProxyManager = () => { + const [form] = Form.useForm(); + const [loading, setLoading] = useState(false); + + const handleSubmit = async (values) => { + setLoading(true); + try { + const { action, ...data } = values; + await axios.post(`/proxy/api/proxy/${action}/`, data); + message.success(`Proxy ${action} successful`); + form.resetFields(); + } catch (error) { + message.error(error.response?.data?.error || 'An error occurred'); + } finally { + setLoading(false); + } + }; + + return ( +
+

Proxy Manager

+
+ + + + + + + + + + + + + + + + + + +
+
+ ); +}; + +export default ProxyManager; \ No newline at end of file diff --git a/frontend/src/routes.js b/frontend/src/routes.js new file mode 100644 index 00000000..1de363cf --- /dev/null +++ b/frontend/src/routes.js @@ -0,0 +1,14 @@ +import ProxyManager from './components/ProxyManager'; + +// ...existing code... + +const routes = [ + ...existing routes..., + { + path: '/proxy', + element: , + name: 'Proxy Manager', + }, +]; + +export default routes;