mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
Initial integration.
This commit is contained in:
parent
604ff238d6
commit
85e41d5def
20 changed files with 484 additions and 303 deletions
1
apps/proxy/__init__.py
Normal file
1
apps/proxy/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
"""Proxy application package"""
|
||||
15
apps/proxy/apps.py
Normal file
15
apps/proxy/apps.py
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
class ProxyConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'apps.proxy'
|
||||
verbose_name = "Stream Proxies"
|
||||
|
||||
def ready(self):
|
||||
"""Initialize proxy servers when Django starts"""
|
||||
from .hls_proxy.server import ProxyServer as HLSProxyServer
|
||||
from .ts_proxy.server import ProxyServer as TSProxyServer
|
||||
|
||||
# Initialize proxy servers
|
||||
self.hls_proxy = HLSProxyServer()
|
||||
self.ts_proxy = TSProxyServer()
|
||||
24
apps/proxy/config.py
Normal file
24
apps/proxy/config.py
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
"""Shared configuration between proxy types"""
|
||||
|
||||
class BaseConfig:
|
||||
DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20'
|
||||
CHUNK_SIZE = 8192
|
||||
CLIENT_POLL_INTERVAL = 0.1
|
||||
MAX_RETRIES = 3
|
||||
|
||||
class HLSConfig(BaseConfig):
|
||||
MIN_SEGMENTS = 12
|
||||
MAX_SEGMENTS = 16
|
||||
WINDOW_SIZE = 12
|
||||
INITIAL_SEGMENTS = 3
|
||||
INITIAL_CONNECTION_WINDOW = 10
|
||||
CLIENT_TIMEOUT_FACTOR = 1.5
|
||||
CLIENT_CLEANUP_INTERVAL = 10
|
||||
FIRST_SEGMENT_TIMEOUT = 5.0
|
||||
INITIAL_BUFFER_SECONDS = 25.0
|
||||
MAX_INITIAL_SEGMENTS = 10
|
||||
BUFFER_READY_TIMEOUT = 30.0
|
||||
|
||||
class TSConfig(BaseConfig):
|
||||
BUFFER_SIZE = 1000
|
||||
RECONNECT_DELAY = 5
|
||||
0
apps/proxy/hls_proxy/__init__.py
Normal file
0
apps/proxy/hls_proxy/__init__.py
Normal file
|
|
@ -7,7 +7,6 @@ This proxy handles HLS live streams with support for:
|
|||
- Connection pooling and reuse
|
||||
"""
|
||||
|
||||
from flask import Flask, Response, request, jsonify
|
||||
import requests
|
||||
import threading
|
||||
import logging
|
||||
|
|
@ -18,33 +17,13 @@ import argparse
|
|||
from typing import Optional, Dict, List, Set, Deque
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Initialize Flask app
|
||||
app = Flask(__name__)
|
||||
from apps.proxy.config import HLSConfig as Config
|
||||
|
||||
# Global state management
|
||||
manifest_buffer = None # Stores current manifest content
|
||||
segment_buffers = {} # Maps sequence numbers to segment data
|
||||
buffer_lock = threading.Lock() # Synchronizes access to buffers
|
||||
|
||||
class Config:
|
||||
"""Configuration settings for stream handling and buffering"""
|
||||
# Buffer size settings
|
||||
MIN_SEGMENTS = 12 # Minimum segments to maintain
|
||||
MAX_SEGMENTS = 16 # Maximum segments to store
|
||||
WINDOW_SIZE = 12 # Number of segments in manifest window
|
||||
INITIAL_SEGMENTS = 3 # Initial segments to buffer before playback
|
||||
DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20'
|
||||
INITIAL_CONNECTION_WINDOW = 10 # Seconds to wait for first client
|
||||
CLIENT_TIMEOUT_FACTOR = 1.5 # Multiplier for target duration to determine client timeout
|
||||
CLIENT_CLEANUP_INTERVAL = 10 # Seconds between client cleanup checks
|
||||
FIRST_SEGMENT_TIMEOUT = 5.0 # Seconds to wait for first segment
|
||||
|
||||
# Initial buffering settings
|
||||
INITIAL_BUFFER_SECONDS = 25.0 # Initial buffer in seconds before allowing clients
|
||||
MAX_INITIAL_SEGMENTS = 10 # Maximum segments to fetch during initialization
|
||||
BUFFER_READY_TIMEOUT = 30.0 # Maximum time to wait for initial buffer (seconds)
|
||||
|
||||
class StreamBuffer:
|
||||
"""
|
||||
Manages buffering of stream segments with thread-safe access.
|
||||
|
|
@ -564,7 +543,7 @@ class StreamFetcher:
|
|||
|
||||
# Normal operation - get latest segment if we haven't already
|
||||
latest_segment = manifest.segments[-1]
|
||||
if latest_segment.uri in downloaded_segments:
|
||||
if (latest_segment.uri in downloaded_segments):
|
||||
# Wait for next manifest update
|
||||
time.sleep(self.manager.target_duration * 0.5)
|
||||
continue
|
||||
|
|
@ -817,176 +796,33 @@ def fetch_stream(fetcher: StreamFetcher, stop_event: threading.Event, start_sequ
|
|||
retry_delay = min(retry_delay * 2, max_retry_delay)
|
||||
manifest_update_needed = True
|
||||
|
||||
|
||||
|
||||
@app.before_request
|
||||
def log_request_info():
|
||||
"""
|
||||
Log client connections and important requests.
|
||||
|
||||
Logs:
|
||||
INFO:
|
||||
- First manifest request from new client
|
||||
- Stream switch requests
|
||||
DEBUG:
|
||||
- All other requests
|
||||
|
||||
Format:
|
||||
{client_ip} - {method} {path}
|
||||
"""
|
||||
if request.path == '/stream.m3u8' and not segment_buffers:
|
||||
# First manifest request from a client
|
||||
logging.info(f"New client connected from {request.remote_addr}")
|
||||
elif request.path.startswith('/change_stream'):
|
||||
# Keep stream switch requests as INFO
|
||||
logging.info(f"Stream switch requested from {request.remote_addr}")
|
||||
else:
|
||||
# Move routine requests to DEBUG
|
||||
logging.debug(f"{request.remote_addr} - {request.method} {request.path}")
|
||||
|
||||
# Configure Werkzeug logger to DEBUG
|
||||
logging.getLogger('werkzeug').setLevel(logging.DEBUG)
|
||||
|
||||
class ProxyServer:
|
||||
"""Manages HLS proxy server instance"""
|
||||
|
||||
def __init__(self, user_agent: Optional[str] = None):
|
||||
self.app = Flask(__name__)
|
||||
self.stream_managers: Dict[str, StreamManager] = {}
|
||||
self.stream_buffers: Dict[str, StreamBuffer] = {}
|
||||
self.client_managers: Dict[str, ClientManager] = {}
|
||||
self.fetch_threads: Dict[str, threading.Thread] = {}
|
||||
self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT
|
||||
self._setup_routes()
|
||||
|
||||
# Remove Flask-specific routing
|
||||
def _setup_routes(self) -> None:
|
||||
"""Configure Flask routes"""
|
||||
self.app.add_url_rule(
|
||||
'/stream/<channel_id>', # Changed from /<channel_id>/stream.m3u8
|
||||
view_func=self.stream_endpoint
|
||||
)
|
||||
self.app.add_url_rule(
|
||||
'/stream/<channel_id>/segments/<path:segment_name>', # Updated to match new pattern
|
||||
view_func=self.get_segment
|
||||
)
|
||||
self.app.add_url_rule(
|
||||
'/change_stream/<channel_id>', # Changed from /<channel_id>/change_stream
|
||||
view_func=self.change_stream,
|
||||
methods=['POST']
|
||||
)
|
||||
|
||||
def initialize_channel(self, url: str, channel_id: str) -> None:
|
||||
"""Initialize a new channel stream"""
|
||||
if channel_id in self.stream_managers:
|
||||
self.stop_channel(channel_id)
|
||||
|
||||
manager = StreamManager(
|
||||
url,
|
||||
channel_id,
|
||||
user_agent=self.user_agent
|
||||
)
|
||||
buffer = StreamBuffer()
|
||||
client_manager = ClientManager()
|
||||
|
||||
# Set up references
|
||||
manager.client_manager = client_manager
|
||||
manager.proxy_server = self
|
||||
|
||||
# Store resources
|
||||
self.stream_managers[channel_id] = manager
|
||||
self.stream_buffers[channel_id] = buffer
|
||||
self.client_managers[channel_id] = client_manager
|
||||
|
||||
# Create and store fetcher
|
||||
fetcher = StreamFetcher(manager, buffer)
|
||||
manager.fetcher = fetcher
|
||||
|
||||
# Start fetch thread
|
||||
self.fetch_threads[channel_id] = threading.Thread(
|
||||
target=fetcher.fetch_loop,
|
||||
name=f"StreamFetcher-{channel_id}",
|
||||
daemon=True
|
||||
)
|
||||
self.fetch_threads[channel_id].start()
|
||||
|
||||
# Start cleanup monitoring immediately
|
||||
manager.start_cleanup_thread()
|
||||
|
||||
logging.info(f"Initialized channel {channel_id} with URL {url}")
|
||||
|
||||
def stop_channel(self, channel_id: str) -> None:
|
||||
"""Stop and cleanup a channel"""
|
||||
if channel_id in self.stream_managers:
|
||||
self.stream_managers[channel_id].stop()
|
||||
if channel_id in self.fetch_threads:
|
||||
self.fetch_threads[channel_id].join(timeout=5)
|
||||
self._cleanup_channel(channel_id)
|
||||
|
||||
def _cleanup_channel(self, channel_id: str) -> None:
|
||||
"""
|
||||
Remove all resources associated with a channel.
|
||||
|
||||
Args:
|
||||
channel_id: Channel to cleanup
|
||||
|
||||
Removes:
|
||||
- Stream manager instance
|
||||
- Segment buffer
|
||||
- Client manager
|
||||
- Fetch thread reference
|
||||
|
||||
Thread safety:
|
||||
Should only be called after stream manager is stopped
|
||||
and fetch thread has completed
|
||||
"""
|
||||
|
||||
for collection in [self.stream_managers, self.stream_buffers,
|
||||
self.client_managers, self.fetch_threads]:
|
||||
collection.pop(channel_id, None)
|
||||
|
||||
def run(self, host: str = '0.0.0.0', port: int = 5000) -> None:
|
||||
"""Start the proxy server"""
|
||||
try:
|
||||
self.app.run(host=host, port=port, threaded=True)
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Shutting down gracefully...")
|
||||
self.shutdown()
|
||||
except Exception as e:
|
||||
logging.error(f"Server error: {e}")
|
||||
self.shutdown()
|
||||
raise
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""
|
||||
Stop all channels and cleanup resources.
|
||||
|
||||
Steps:
|
||||
1. Stop all active stream managers
|
||||
2. Join fetch threads
|
||||
3. Clean up channel resources
|
||||
4. Release system resources
|
||||
|
||||
Thread Safety:
|
||||
Safe to call from signal handlers or during shutdown
|
||||
"""
|
||||
for channel_id in list(self.stream_managers.keys()):
|
||||
self.stop_channel(channel_id)
|
||||
pass
|
||||
|
||||
# Update methods to return data instead of Flask Response objects
|
||||
def stream_endpoint(self, channel_id: str):
|
||||
"""Flask route handler for serving HLS manifests."""
|
||||
if channel_id not in self.stream_managers:
|
||||
return Response('Channel not found', status=404)
|
||||
return 'Channel not found', 404
|
||||
|
||||
manager = self.stream_managers[channel_id]
|
||||
|
||||
# Wait for initial buffer
|
||||
if not manager.buffer_ready.wait(Config.BUFFER_READY_TIMEOUT):
|
||||
logging.error(f"Timeout waiting for initial buffer for channel {channel_id}")
|
||||
return Response('Initial buffer not ready', status=503)
|
||||
return 'Initial buffer not ready', 503
|
||||
|
||||
try:
|
||||
if (channel_id not in self.stream_managers) or (not self.stream_managers[channel_id].running):
|
||||
return Response('Channel not found', status=404)
|
||||
return 'Channel not found', 404
|
||||
|
||||
manager = self.stream_managers[channel_id]
|
||||
buffer = self.stream_buffers[channel_id]
|
||||
|
|
@ -1006,7 +842,7 @@ class ProxyServer:
|
|||
|
||||
if time.time() - start_time > Config.FIRST_SEGMENT_TIMEOUT:
|
||||
logging.warning(f"Timeout waiting for first segment for channel {channel_id}")
|
||||
return Response('No segments available', status=503)
|
||||
return 'No segments available', 503
|
||||
|
||||
time.sleep(0.1) # Short sleep to prevent CPU spinning
|
||||
|
||||
|
|
@ -1050,7 +886,7 @@ class ProxyServer:
|
|||
|
||||
manifest_content = '\n'.join(new_manifest)
|
||||
logging.debug(f"Serving manifest with segments {min_seq}-{max_seq} (window: {len(window_segments)})")
|
||||
return Response(manifest_content, content_type='application/vnd.apple.mpegurl')
|
||||
return manifest_content, 200 # Return content and status code
|
||||
except ConnectionAbortedError:
|
||||
logging.debug("Client disconnected")
|
||||
return '', 499
|
||||
|
|
@ -1077,7 +913,7 @@ class ProxyServer:
|
|||
- Returns 404 on any error
|
||||
"""
|
||||
if channel_id not in self.stream_managers:
|
||||
return Response('Channel not found', status=404)
|
||||
return 'Channel not found', 404
|
||||
|
||||
try:
|
||||
# Record client activity
|
||||
|
|
@ -1089,7 +925,7 @@ class ProxyServer:
|
|||
|
||||
with buffer_lock:
|
||||
if segment_id in buffer:
|
||||
return Response(buffer[segment_id], content_type='video/MP2T')
|
||||
return buffer[segment_id], 200 # Return content and status code
|
||||
|
||||
logging.warning(f"Segment {segment_id} not found for channel {channel_id}")
|
||||
except Exception as e:
|
||||
|
|
@ -1122,45 +958,24 @@ class ProxyServer:
|
|||
- Maintains segment numbering
|
||||
"""
|
||||
if channel_id not in self.stream_managers:
|
||||
return jsonify({'error': 'Channel not found'}), 404
|
||||
return {'error': 'Channel not found'}, 404
|
||||
|
||||
new_url = request.json.get('url')
|
||||
if not new_url:
|
||||
return jsonify({'error': 'No URL provided'}), 400
|
||||
return {'error': 'No URL provided'}, 400
|
||||
|
||||
manager = self.stream_managers[channel_id]
|
||||
if manager.update_url(new_url):
|
||||
return jsonify({
|
||||
return {
|
||||
'message': 'Stream URL updated',
|
||||
'channel': channel_id,
|
||||
'url': new_url
|
||||
})
|
||||
return jsonify({
|
||||
}, 200
|
||||
return {
|
||||
'message': 'URL unchanged',
|
||||
'channel': channel_id,
|
||||
'url': new_url
|
||||
})
|
||||
|
||||
@app.before_request
|
||||
def log_request_info():
|
||||
"""
|
||||
Log client connections and important requests.
|
||||
|
||||
Log Levels:
|
||||
INFO:
|
||||
- First manifest request from new client
|
||||
- Stream switch requests
|
||||
DEBUG:
|
||||
- Segment requests
|
||||
- Routine manifest updates
|
||||
|
||||
Format:
|
||||
"{client_ip} - {method} {path}"
|
||||
|
||||
Side Effects:
|
||||
- Updates logging configuration based on request type
|
||||
- Tracks client connections
|
||||
"""
|
||||
}, 200
|
||||
|
||||
# Main Application Setup
|
||||
if __name__ == '__main__':
|
||||
10
apps/proxy/hls_proxy/urls.py
Normal file
10
apps/proxy/hls_proxy/urls.py
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
from django.urls import path
|
||||
from . import views
|
||||
|
||||
app_name = 'hls_proxy'
|
||||
|
||||
urlpatterns = [
|
||||
path('stream/<str:channel_id>', views.stream_endpoint, name='stream'),
|
||||
path('stream/<str:channel_id>/segments/<path:segment_name>', views.get_segment, name='segment'),
|
||||
path('change_stream/<str:channel_id>', views.change_stream, name='change_stream'),
|
||||
]
|
||||
36
apps/proxy/hls_proxy/views.py
Normal file
36
apps/proxy/hls_proxy/views.py
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
from django.http import StreamingHttpResponse, JsonResponse, HttpResponse
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.views.decorators.http import require_http_methods
|
||||
import json
|
||||
from .server import ProxyServer, Config
|
||||
|
||||
proxy_server = ProxyServer()
|
||||
|
||||
@require_http_methods(["GET"])
|
||||
def stream_endpoint(request, channel_id):
|
||||
"""Serve HLS manifest"""
|
||||
response = proxy_server.stream_endpoint(channel_id)
|
||||
return StreamingHttpResponse(
|
||||
response.response[0],
|
||||
content_type='application/vnd.apple.mpegurl',
|
||||
status=response.status_code
|
||||
)
|
||||
|
||||
@require_http_methods(["GET"])
|
||||
def get_segment(request, channel_id, segment_name):
|
||||
"""Serve MPEG-TS segments"""
|
||||
response = proxy_server.get_segment(channel_id, segment_name)
|
||||
if response[1] == 404:
|
||||
return HttpResponse(status=404)
|
||||
return StreamingHttpResponse(response[0], content_type='video/MP2T')
|
||||
|
||||
@csrf_exempt
|
||||
@require_http_methods(["POST"])
|
||||
def change_stream(request, channel_id):
|
||||
"""Handle stream URL changes"""
|
||||
try:
|
||||
data = json.loads(request.body)
|
||||
response = proxy_server.change_stream(channel_id)
|
||||
return JsonResponse(response[0], status=response[1])
|
||||
except json.JSONDecodeError:
|
||||
return JsonResponse({'error': 'Invalid JSON'}, status=400)
|
||||
67
apps/proxy/management/commands/proxy.py
Normal file
67
apps/proxy/management/commands/proxy.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
from django.core.management.base import BaseCommand
|
||||
from django.apps import apps
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Manage proxy servers'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'action',
|
||||
choices=['start', 'stop', 'restart'],
|
||||
help='Action to perform'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--type',
|
||||
choices=['hls', 'ts', 'all'],
|
||||
default='all',
|
||||
help='Type of proxy to manage'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--channel',
|
||||
help='Channel ID to manage'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--url',
|
||||
help='Stream URL (required for start)'
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
proxy_app = apps.get_app_config('proxy')
|
||||
action = options['action']
|
||||
proxy_type = options['type']
|
||||
channel = options.get('channel')
|
||||
url = options.get('url')
|
||||
|
||||
try:
|
||||
if action == 'start':
|
||||
if not url:
|
||||
raise ValueError("URL is required for start action")
|
||||
if proxy_type in ('hls', 'all'):
|
||||
proxy_app.hls_proxy.initialize_channel(url, channel or 'default')
|
||||
if proxy_type in ('ts', 'all'):
|
||||
proxy_app.ts_proxy.initialize_channel(url, channel or 'default')
|
||||
self.stdout.write(self.style.SUCCESS('Started proxy servers'))
|
||||
|
||||
elif action == 'stop':
|
||||
if proxy_type in ('hls', 'all'):
|
||||
if channel:
|
||||
proxy_app.hls_proxy.stop_channel(channel)
|
||||
else:
|
||||
proxy_app.hls_proxy.shutdown()
|
||||
if proxy_type in ('ts', 'all'):
|
||||
if channel:
|
||||
proxy_app.ts_proxy.stop_channel(channel)
|
||||
else:
|
||||
proxy_app.ts_proxy.shutdown()
|
||||
self.stdout.write(self.style.SUCCESS('Stopped proxy servers'))
|
||||
|
||||
elif action == 'restart':
|
||||
self.handle(*args, **dict(options, action='stop'))
|
||||
self.handle(*args, **dict(options, action='start'))
|
||||
self.stdout.write(self.style.SUCCESS('Restarted proxy servers'))
|
||||
|
||||
except Exception as e:
|
||||
self.stderr.write(self.style.ERROR(f'Error: {e}'))
|
||||
19
apps/proxy/signals.py
Normal file
19
apps/proxy/signals.py
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
from django.db.models.signals import pre_delete
|
||||
from django.dispatch import receiver
|
||||
from django.apps import apps
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@receiver(pre_delete)
|
||||
def cleanup_proxy_servers(sender, **kwargs):
|
||||
"""Clean up proxy servers when Django shuts down"""
|
||||
try:
|
||||
proxy_app = apps.get_app_config('proxy')
|
||||
for channel_id in list(proxy_app.hls_proxy.stream_managers.keys()):
|
||||
proxy_app.hls_proxy.stop_channel(channel_id)
|
||||
for channel_id in list(proxy_app.ts_proxy.stream_managers.keys()):
|
||||
proxy_app.ts_proxy.stop_channel(channel_id)
|
||||
logger.info("Proxy servers cleaned up successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Error during proxy server cleanup: {e}")
|
||||
0
apps/proxy/ts_proxy/__init__.py
Normal file
0
apps/proxy/ts_proxy/__init__.py
Normal file
|
|
@ -7,23 +7,13 @@ Handles live TS stream proxying with support for:
|
|||
- Connection state tracking
|
||||
"""
|
||||
|
||||
from flask import Flask, Response, request, jsonify
|
||||
import requests
|
||||
import threading
|
||||
import logging
|
||||
from collections import deque
|
||||
import time
|
||||
import os
|
||||
from typing import Optional, Set, Deque, Dict
|
||||
|
||||
# Configuration
|
||||
class Config:
|
||||
CHUNK_SIZE: int = 8192 # Buffer chunk size (bytes)
|
||||
BUFFER_SIZE: int = 1000 # Number of chunks to keep in memory
|
||||
RECONNECT_DELAY: int = 5 # Seconds between reconnection attempts
|
||||
CLIENT_POLL_INTERVAL: float = 0.1 # Seconds between client buffer checks
|
||||
MAX_RETRIES: int = 3 # Maximum connection retry attempts
|
||||
DEFAULT_USER_AGENT: str = 'VLC/3.0.20 LibVLC/3.0.20' # Default user agent
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
|
||||
class StreamManager:
|
||||
"""Manages TS stream state and connection handling"""
|
||||
|
|
@ -183,19 +173,12 @@ class ProxyServer:
|
|||
"""Manages TS proxy server instance"""
|
||||
|
||||
def __init__(self, user_agent: Optional[str] = None):
|
||||
self.app = Flask(__name__)
|
||||
self.stream_managers: Dict[str, StreamManager] = {}
|
||||
self.stream_buffers: Dict[str, StreamBuffer] = {}
|
||||
self.client_managers: Dict[str, ClientManager] = {}
|
||||
self.fetch_threads: Dict[str, threading.Thread] = {}
|
||||
self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT
|
||||
self._setup_routes()
|
||||
|
||||
def _setup_routes(self) -> None:
|
||||
"""Configure Flask routes"""
|
||||
self.app.route('/stream/<channel_id>')(self.stream_endpoint)
|
||||
self.app.route('/change_stream/<channel_id>', methods=['POST'])(self.change_stream)
|
||||
|
||||
def initialize_channel(self, url: str, channel_id: str) -> None:
|
||||
"""Initialize a new channel stream"""
|
||||
if channel_id in self.stream_managers:
|
||||
|
|
@ -221,7 +204,7 @@ class ProxyServer:
|
|||
)
|
||||
self.fetch_threads[channel_id].start()
|
||||
logging.info(f"Initialized channel {channel_id} with URL {url}")
|
||||
|
||||
|
||||
def stop_channel(self, channel_id: str) -> None:
|
||||
"""Stop and cleanup a channel"""
|
||||
if channel_id in self.stream_managers:
|
||||
|
|
@ -235,89 +218,8 @@ class ProxyServer:
|
|||
for collection in [self.stream_managers, self.stream_buffers,
|
||||
self.client_managers, self.fetch_threads]:
|
||||
collection.pop(channel_id, None)
|
||||
|
||||
def stream_endpoint(self, channel_id: str):
|
||||
"""Stream endpoint that serves TS data to clients"""
|
||||
if channel_id not in self.stream_managers:
|
||||
return Response('Channel not found', status=404)
|
||||
|
||||
def generate():
|
||||
client_id = threading.get_ident()
|
||||
buffer = self.stream_buffers[channel_id]
|
||||
client_manager = self.client_managers[channel_id]
|
||||
|
||||
client_manager.add_client(client_id)
|
||||
last_index = buffer.index
|
||||
|
||||
try:
|
||||
while True:
|
||||
with buffer.lock:
|
||||
if buffer.index > last_index:
|
||||
chunks_behind = buffer.index - last_index
|
||||
start_pos = max(0, len(buffer.buffer) - chunks_behind)
|
||||
|
||||
for i in range(start_pos, len(buffer.buffer)):
|
||||
yield buffer.buffer[i]
|
||||
last_index = buffer.index
|
||||
|
||||
threading.Event().wait(Config.CLIENT_POLL_INTERVAL)
|
||||
except GeneratorExit:
|
||||
remaining = client_manager.remove_client(client_id)
|
||||
if remaining == 0:
|
||||
logging.info(f"No clients remaining for channel {channel_id}")
|
||||
self.stop_channel(channel_id)
|
||||
|
||||
return Response(generate(), content_type='video/mp2t')
|
||||
|
||||
def change_stream(self, channel_id: str):
|
||||
"""Handle stream URL changes"""
|
||||
if channel_id not in self.stream_managers:
|
||||
return jsonify({'error': 'Channel not found'}), 404
|
||||
|
||||
new_url = request.json.get('url')
|
||||
if not new_url:
|
||||
return jsonify({'error': 'No URL provided'}), 400
|
||||
|
||||
manager = self.stream_managers[channel_id]
|
||||
if manager.update_url(new_url):
|
||||
return jsonify({
|
||||
'message': 'Stream URL updated',
|
||||
'channel': channel_id,
|
||||
'url': new_url
|
||||
})
|
||||
return jsonify({
|
||||
'message': 'URL unchanged',
|
||||
'channel': channel_id,
|
||||
'url': new_url
|
||||
})
|
||||
|
||||
def run(self, host: str = '0.0.0.0', port: int = 5000) -> None:
|
||||
"""Start the proxy server"""
|
||||
self.app.run(host=host, port=port, threaded=True)
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""Stop all channels and cleanup"""
|
||||
for channel_id in list(self.stream_managers.keys()):
|
||||
self.stop_channel(channel_id)
|
||||
|
||||
def main():
|
||||
"""Initialize and start the proxy server"""
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
|
||||
logging.getLogger('werkzeug').setLevel(logging.DEBUG)
|
||||
|
||||
proxy_server = ProxyServer()
|
||||
initial_url = os.getenv('STREAM_URL', 'http://example.com/stream.ts')
|
||||
proxy_server.initialize_channel(initial_url, "default_channel")
|
||||
|
||||
try:
|
||||
proxy_server.run()
|
||||
finally:
|
||||
proxy_server.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
9
apps/proxy/ts_proxy/urls.py
Normal file
9
apps/proxy/ts_proxy/urls.py
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
from django.urls import path
|
||||
from . import views
|
||||
|
||||
app_name = 'ts_proxy'
|
||||
|
||||
urlpatterns = [
|
||||
path('stream/<str:channel_id>', views.stream_ts, name='stream'),
|
||||
path('change_stream/<str:channel_id>', views.change_stream, name='change_stream'),
|
||||
]
|
||||
72
apps/proxy/ts_proxy/views.py
Normal file
72
apps/proxy/ts_proxy/views.py
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
from django.http import StreamingHttpResponse, JsonResponse
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.views.decorators.http import require_http_methods
|
||||
import json
|
||||
from .server import ProxyServer
|
||||
|
||||
proxy_server = ProxyServer()
|
||||
|
||||
@require_http_methods(["GET"])
|
||||
def stream_ts(request, channel_id):
|
||||
"""Handle TS stream requests"""
|
||||
if channel_id not in proxy_server.stream_managers:
|
||||
return StreamingHttpResponse('Channel not found', status=404)
|
||||
|
||||
def generate():
|
||||
client_id = threading.get_ident()
|
||||
buffer = proxy_server.stream_buffers[channel_id]
|
||||
client_manager = proxy_server.client_managers[channel_id]
|
||||
|
||||
client_manager.add_client(client_id)
|
||||
last_index = buffer.index
|
||||
|
||||
try:
|
||||
while True:
|
||||
with buffer.lock:
|
||||
if buffer.index > last_index:
|
||||
chunks_behind = buffer.index - last_index
|
||||
start_pos = max(0, len(buffer.buffer) - chunks_behind)
|
||||
|
||||
for i in range(start_pos, len(buffer.buffer)):
|
||||
yield buffer.buffer[i]
|
||||
last_index = buffer.index
|
||||
|
||||
time.sleep(Config.CLIENT_POLL_INTERVAL)
|
||||
except Exception:
|
||||
remaining = client_manager.remove_client(client_id)
|
||||
if remaining == 0:
|
||||
proxy_server.stop_channel(channel_id)
|
||||
raise
|
||||
|
||||
return StreamingHttpResponse(
|
||||
generate(),
|
||||
content_type='video/MP2T'
|
||||
)
|
||||
|
||||
@csrf_exempt
|
||||
@require_http_methods(["POST"])
|
||||
def change_stream(request, channel_id):
|
||||
"""Handle stream URL changes"""
|
||||
try:
|
||||
data = json.loads(request.body)
|
||||
new_url = data.get('url')
|
||||
if not new_url:
|
||||
return JsonResponse({'error': 'No URL provided'}, status=400)
|
||||
|
||||
if channel_id not in proxy_server.stream_managers:
|
||||
return JsonResponse({'error': 'Channel not found'}, status=404)
|
||||
|
||||
manager = proxy_server.stream_managers[channel_id]
|
||||
if manager.update_url(new_url):
|
||||
return JsonResponse({
|
||||
'message': 'Stream URL updated',
|
||||
'channel': channel_id,
|
||||
'url': new_url
|
||||
})
|
||||
return JsonResponse({
|
||||
'message': 'URL unchanged',
|
||||
'channel': channel_id,
|
||||
'url': new_url
|
||||
})
|
||||
except json.JSONDecodeError:
|
||||
return JsonResponse({'error': 'Invalid JSON'}, status=400)
|
||||
14
apps/proxy/urls.py
Normal file
14
apps/proxy/urls.py
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
from django.urls import path, include
|
||||
from rest_framework.routers import DefaultRouter
|
||||
from . import views
|
||||
|
||||
router = DefaultRouter()
|
||||
router.register(r'proxy', views.ProxyViewSet, basename='proxy')
|
||||
|
||||
app_name = 'proxy'
|
||||
|
||||
urlpatterns = [
|
||||
path('api/', include(router.urls)),
|
||||
path('hls/', include('apps.proxy.hls_proxy.urls')),
|
||||
path('ts/', include('apps.proxy.ts_proxy.urls')),
|
||||
]
|
||||
64
apps/proxy/views.py
Normal file
64
apps/proxy/views.py
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
from rest_framework import viewsets, status
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.response import Response
|
||||
from django.apps import apps
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ProxyViewSet(viewsets.ViewSet):
|
||||
"""ViewSet for managing proxy servers"""
|
||||
|
||||
@action(detail=False, methods=['post'])
|
||||
def start(self, request):
|
||||
"""Start a proxy server for a channel"""
|
||||
try:
|
||||
proxy_type = request.data.get('type', 'hls')
|
||||
channel_id = request.data.get('channel', 'default')
|
||||
url = request.data.get('url')
|
||||
|
||||
if not url:
|
||||
return Response(
|
||||
{'error': 'URL is required'},
|
||||
status=status.HTTP_400_BAD_REQUEST
|
||||
)
|
||||
|
||||
proxy_app = apps.get_app_config('proxy')
|
||||
proxy_server = getattr(proxy_app, f'{proxy_type}_proxy')
|
||||
proxy_server.initialize_channel(url, channel_id)
|
||||
|
||||
return Response({
|
||||
'message': f'{proxy_type.upper()} proxy started',
|
||||
'channel': channel_id,
|
||||
'url': url
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting proxy: {e}")
|
||||
return Response(
|
||||
{'error': str(e)},
|
||||
status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
||||
)
|
||||
|
||||
@action(detail=False, methods=['post'])
|
||||
def stop(self, request):
|
||||
"""Stop a proxy server for a channel"""
|
||||
try:
|
||||
proxy_type = request.data.get('type', 'hls')
|
||||
channel_id = request.data.get('channel', 'default')
|
||||
|
||||
proxy_app = apps.get_app_config('proxy')
|
||||
proxy_server = getattr(proxy_app, f'{proxy_type}_proxy')
|
||||
proxy_server.stop_channel(channel_id)
|
||||
|
||||
return Response({
|
||||
'message': f'{proxy_type.upper()} proxy stopped',
|
||||
'channel': channel_id
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping proxy: {e}")
|
||||
return Response(
|
||||
{'error': str(e)},
|
||||
status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
||||
)
|
||||
|
|
@ -19,6 +19,7 @@ INSTALLED_APPS = [
|
|||
'apps.hdhr',
|
||||
'apps.m3u',
|
||||
'apps.output',
|
||||
'apps.proxy.apps.ProxyConfig',
|
||||
'core',
|
||||
'drf_yasg',
|
||||
'django.contrib.admin',
|
||||
|
|
@ -155,3 +156,25 @@ SIMPLE_JWT = {
|
|||
'ROTATE_REFRESH_TOKENS': False, # Optional: Whether to rotate refresh tokens
|
||||
'BLACKLIST_AFTER_ROTATION': True, # Optional: Whether to blacklist refresh tokens
|
||||
}
|
||||
|
||||
# Proxy Settings
|
||||
PROXY_SETTINGS = {
|
||||
'HLS': {
|
||||
'DEFAULT_URL': '', # Default HLS stream URL if needed
|
||||
'BUFFER_SIZE': 1000,
|
||||
'USER_AGENT': 'VLC/3.0.20 LibVLC/3.0.20',
|
||||
'CHUNK_SIZE': 8192,
|
||||
'CLIENT_POLL_INTERVAL': 0.1,
|
||||
'MAX_RETRIES': 3,
|
||||
'MIN_SEGMENTS': 12,
|
||||
'MAX_SEGMENTS': 16,
|
||||
'WINDOW_SIZE': 12,
|
||||
'INITIAL_SEGMENTS': 3,
|
||||
},
|
||||
'TS': {
|
||||
'DEFAULT_URL': '', # Default TS stream URL if needed
|
||||
'BUFFER_SIZE': 1000,
|
||||
'RECONNECT_DELAY': 5,
|
||||
'USER_AGENT': 'VLC/3.0.20 LibVLC/3.0.20',
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,9 @@ urlpatterns = [
|
|||
# Catch-all route to serve React's index.html for non-API, non-admin paths
|
||||
path('', TemplateView.as_view(template_name='index.html')), # React entry point
|
||||
|
||||
# Add proxy apps
|
||||
path('proxy/', include('apps.proxy.urls')),
|
||||
|
||||
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
|
||||
|
||||
# Serve static files for development (React's JS, CSS, etc.)
|
||||
|
|
|
|||
11
frontend/src/components/Navigation.js
Normal file
11
frontend/src/components/Navigation.js
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
// ...existing imports...
|
||||
|
||||
const menuItems = [
|
||||
...existing items...,
|
||||
{
|
||||
key: 'proxy',
|
||||
label: 'Proxy Manager',
|
||||
icon: <ApiOutlined />,
|
||||
path: '/proxy',
|
||||
},
|
||||
];
|
||||
82
frontend/src/components/ProxyManager.js
Normal file
82
frontend/src/components/ProxyManager.js
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
import React, { useState } from 'react';
|
||||
import { Button, Form, Input, Select, message } from 'antd';
|
||||
import axios from 'axios';
|
||||
|
||||
const { Option } = Select;
|
||||
|
||||
const ProxyManager = () => {
|
||||
const [form] = Form.useForm();
|
||||
const [loading, setLoading] = useState(false);
|
||||
|
||||
const handleSubmit = async (values) => {
|
||||
setLoading(true);
|
||||
try {
|
||||
const { action, ...data } = values;
|
||||
await axios.post(`/proxy/api/proxy/${action}/`, data);
|
||||
message.success(`Proxy ${action} successful`);
|
||||
form.resetFields();
|
||||
} catch (error) {
|
||||
message.error(error.response?.data?.error || 'An error occurred');
|
||||
} finally {
|
||||
setLoading(false);
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<div className="proxy-manager">
|
||||
<h2>Proxy Manager</h2>
|
||||
<Form form={form} onFinish={handleSubmit} layout="vertical">
|
||||
<Form.Item
|
||||
name="type"
|
||||
label="Proxy Type"
|
||||
rules={[{ required: true }]}
|
||||
>
|
||||
<Select>
|
||||
<Option value="hls">HLS</Option>
|
||||
<Option value="ts">TS</Option>
|
||||
</Select>
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item
|
||||
name="channel"
|
||||
label="Channel ID"
|
||||
rules={[{ required: true }]}
|
||||
>
|
||||
<Input />
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item
|
||||
name="url"
|
||||
label="Stream URL"
|
||||
rules={[{ required: true, type: 'url' }]}
|
||||
>
|
||||
<Input />
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item>
|
||||
<Button.Group>
|
||||
<Button
|
||||
type="primary"
|
||||
onClick={() => form.submit()}
|
||||
loading={loading}
|
||||
>
|
||||
Start Proxy
|
||||
</Button>
|
||||
<Button
|
||||
danger
|
||||
onClick={() => {
|
||||
form.setFieldsValue({ action: 'stop' });
|
||||
form.submit();
|
||||
}}
|
||||
loading={loading}
|
||||
>
|
||||
Stop Proxy
|
||||
</Button>
|
||||
</Button.Group>
|
||||
</Form.Item>
|
||||
</Form>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
export default ProxyManager;
|
||||
14
frontend/src/routes.js
Normal file
14
frontend/src/routes.js
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
import ProxyManager from './components/ProxyManager';
|
||||
|
||||
// ...existing code...
|
||||
|
||||
const routes = [
|
||||
...existing routes...,
|
||||
{
|
||||
path: '/proxy',
|
||||
element: <ProxyManager />,
|
||||
name: 'Proxy Manager',
|
||||
},
|
||||
];
|
||||
|
||||
export default routes;
|
||||
Loading…
Add table
Add a link
Reference in a new issue