Merge pull request #8 from SergeantPanda/Proxy

Proxy
This commit is contained in:
dekzter 2025-03-02 15:34:21 -05:00 committed by GitHub
commit 710b3bdba0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 1551 additions and 0 deletions

1227
apps/proxy/hls_proxy Normal file

File diff suppressed because it is too large Load diff

323
apps/proxy/ts_proxy Normal file
View file

@ -0,0 +1,323 @@
"""
Transport Stream (TS) Proxy Server
Handles live TS stream proxying with support for:
- Stream switching
- Buffer management
- Multiple client connections
- Connection state tracking
"""
from flask import Flask, Response, request, jsonify
import requests
import threading
import logging
from collections import deque
import time
import os
from typing import Optional, Set, Deque, Dict
# Configuration
class Config:
CHUNK_SIZE: int = 8192 # Buffer chunk size (bytes)
BUFFER_SIZE: int = 1000 # Number of chunks to keep in memory
RECONNECT_DELAY: int = 5 # Seconds between reconnection attempts
CLIENT_POLL_INTERVAL: float = 0.1 # Seconds between client buffer checks
MAX_RETRIES: int = 3 # Maximum connection retry attempts
DEFAULT_USER_AGENT: str = 'VLC/3.0.20 LibVLC/3.0.20' # Default user agent
class StreamManager:
"""Manages TS stream state and connection handling"""
def __init__(self, initial_url: str, channel_id: str, user_agent: Optional[str] = None):
self.current_url: str = initial_url
self.channel_id: str = channel_id
self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT
self.url_changed: threading.Event = threading.Event()
self.running: bool = True
self.session: requests.Session = self._create_session()
self.connected: bool = False
self.retry_count: int = 0
logging.info(f"Initialized stream manager for channel {channel_id}")
def _create_session(self) -> requests.Session:
"""Create and configure requests session"""
session = requests.Session()
session.headers.update({
'User-Agent': self.user_agent,
'Connection': 'keep-alive'
})
return session
def update_url(self, new_url: str) -> bool:
"""Update stream URL and signal connection change"""
if new_url != self.current_url:
logging.info(f"Stream switch initiated: {self.current_url} -> {new_url}")
self.current_url = new_url
self.connected = False
self.url_changed.set()
return True
return False
def should_retry(self) -> bool:
"""Check if connection retry is allowed"""
return self.retry_count < Config.MAX_RETRIES
def stop(self) -> None:
"""Clean shutdown of stream manager"""
self.running = False
if self.session:
self.session.close()
class StreamBuffer:
"""Manages stream data buffering"""
def __init__(self):
self.buffer: Deque[bytes] = deque(maxlen=Config.BUFFER_SIZE)
self.lock: threading.Lock = threading.Lock()
self.index: int = 0
class ClientManager:
"""Manages active client connections"""
def __init__(self):
self.active_clients: Set[int] = set()
self.lock: threading.Lock = threading.Lock()
def add_client(self, client_id: int) -> None:
"""Add new client connection"""
with self.lock:
self.active_clients.add(client_id)
logging.info(f"New client connected: {client_id} (total: {len(self.active_clients)})")
def remove_client(self, client_id: int) -> int:
"""Remove client and return remaining count"""
with self.lock:
self.active_clients.remove(client_id)
remaining = len(self.active_clients)
logging.info(f"Client disconnected: {client_id} (remaining: {remaining})")
return remaining
class StreamFetcher:
"""Handles stream data fetching"""
def __init__(self, manager: StreamManager, buffer: StreamBuffer):
self.manager = manager
self.buffer = buffer
def fetch_loop(self) -> None:
"""Main fetch loop for stream data"""
while self.manager.running:
try:
if not self._handle_connection():
continue
with self.manager.session.get(self.manager.current_url, stream=True) as response:
if response.status_code == 200:
self._handle_successful_connection()
self._process_stream(response)
except requests.exceptions.RequestException as e:
self._handle_connection_error(e)
def _handle_connection(self) -> bool:
"""Handle connection state and retries"""
if not self.manager.connected:
if not self.manager.should_retry():
logging.error(f"Failed to connect after {Config.MAX_RETRIES} attempts")
return False
if not self.manager.running:
return False
self.manager.retry_count += 1
logging.info(f"Connecting to stream: {self.manager.current_url} "
f"(attempt {self.manager.retry_count}/{Config.MAX_RETRIES})")
return True
def _handle_successful_connection(self) -> None:
"""Handle successful stream connection"""
if not self.manager.connected:
logging.info("Stream connected successfully")
self.manager.connected = True
self.manager.retry_count = 0
def _process_stream(self, response: requests.Response) -> None:
"""Process incoming stream data"""
for chunk in response.iter_content(chunk_size=Config.CHUNK_SIZE):
if not self.manager.running:
logging.info("Stream fetch stopped - shutting down")
return
if chunk:
if self.manager.url_changed.is_set():
logging.info("Stream switch in progress, closing connection")
self.manager.url_changed.clear()
break
with self.buffer.lock:
self.buffer.buffer.append(chunk)
self.buffer.index += 1
def _handle_connection_error(self, error: Exception) -> None:
"""Handle stream connection errors"""
logging.error(f"Stream connection error: {error}")
self.manager.connected = False
if not self.manager.running:
return
logging.info(f"Attempting to reconnect in {Config.RECONNECT_DELAY} seconds...")
if not wait_for_running(self.manager, Config.RECONNECT_DELAY):
return
def wait_for_running(manager: StreamManager, delay: float) -> bool:
"""Wait while checking manager running state"""
start = time.time()
while time.time() - start < delay:
if not manager.running:
return False
threading.Event().wait(0.1)
return True
class ProxyServer:
"""Manages TS proxy server instance"""
def __init__(self, user_agent: Optional[str] = None):
self.app = Flask(__name__)
self.stream_managers: Dict[str, StreamManager] = {}
self.stream_buffers: Dict[str, StreamBuffer] = {}
self.client_managers: Dict[str, ClientManager] = {}
self.fetch_threads: Dict[str, threading.Thread] = {}
self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT
self._setup_routes()
def _setup_routes(self) -> None:
"""Configure Flask routes"""
self.app.route('/stream/<channel_id>')(self.stream_endpoint)
self.app.route('/change_stream/<channel_id>', methods=['POST'])(self.change_stream)
def initialize_channel(self, url: str, channel_id: str) -> None:
"""Initialize a new channel stream"""
if channel_id in self.stream_managers:
self.stop_channel(channel_id)
self.stream_managers[channel_id] = StreamManager(
url,
channel_id,
user_agent=self.user_agent
)
self.stream_buffers[channel_id] = StreamBuffer()
self.client_managers[channel_id] = ClientManager()
fetcher = StreamFetcher(
self.stream_managers[channel_id],
self.stream_buffers[channel_id]
)
self.fetch_threads[channel_id] = threading.Thread(
target=fetcher.fetch_loop,
name=f"StreamFetcher-{channel_id}",
daemon=True
)
self.fetch_threads[channel_id].start()
logging.info(f"Initialized channel {channel_id} with URL {url}")
def stop_channel(self, channel_id: str) -> None:
"""Stop and cleanup a channel"""
if channel_id in self.stream_managers:
self.stream_managers[channel_id].stop()
if channel_id in self.fetch_threads:
self.fetch_threads[channel_id].join(timeout=5)
self._cleanup_channel(channel_id)
def _cleanup_channel(self, channel_id: str) -> None:
"""Remove channel resources"""
for collection in [self.stream_managers, self.stream_buffers,
self.client_managers, self.fetch_threads]:
collection.pop(channel_id, None)
def stream_endpoint(self, channel_id: str):
"""Stream endpoint that serves TS data to clients"""
if channel_id not in self.stream_managers:
return Response('Channel not found', status=404)
def generate():
client_id = threading.get_ident()
buffer = self.stream_buffers[channel_id]
client_manager = self.client_managers[channel_id]
client_manager.add_client(client_id)
last_index = buffer.index
try:
while True:
with buffer.lock:
if buffer.index > last_index:
chunks_behind = buffer.index - last_index
start_pos = max(0, len(buffer.buffer) - chunks_behind)
for i in range(start_pos, len(buffer.buffer)):
yield buffer.buffer[i]
last_index = buffer.index
threading.Event().wait(Config.CLIENT_POLL_INTERVAL)
except GeneratorExit:
remaining = client_manager.remove_client(client_id)
if remaining == 0:
logging.info(f"No clients remaining for channel {channel_id}")
self.stop_channel(channel_id)
return Response(generate(), content_type='video/mp2t')
def change_stream(self, channel_id: str):
"""Handle stream URL changes"""
if channel_id not in self.stream_managers:
return jsonify({'error': 'Channel not found'}), 404
new_url = request.json.get('url')
if not new_url:
return jsonify({'error': 'No URL provided'}), 400
manager = self.stream_managers[channel_id]
if manager.update_url(new_url):
return jsonify({
'message': 'Stream URL updated',
'channel': channel_id,
'url': new_url
})
return jsonify({
'message': 'URL unchanged',
'channel': channel_id,
'url': new_url
})
def run(self, host: str = '0.0.0.0', port: int = 5000) -> None:
"""Start the proxy server"""
self.app.run(host=host, port=port, threaded=True)
def shutdown(self) -> None:
"""Stop all channels and cleanup"""
for channel_id in list(self.stream_managers.keys()):
self.stop_channel(channel_id)
def main():
"""Initialize and start the proxy server"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.getLogger('werkzeug').setLevel(logging.DEBUG)
proxy_server = ProxyServer()
initial_url = os.getenv('STREAM_URL', 'http://example.com/stream.ts')
proxy_server.initialize_channel(initial_url, "default_channel")
try:
proxy_server.run()
finally:
proxy_server.shutdown()
if __name__ == '__main__':
main()

View file

@ -14,5 +14,6 @@ yt-dlp
gevent==24.11.1
django-cors-headers
djangorestframework-simplejwt
m3u8
rapidfuzz==3.12.1
sentence-transformers==3.4.1