diff --git a/apps/proxy/hls_proxy b/apps/proxy/hls_proxy new file mode 100644 index 00000000..98ba38a5 --- /dev/null +++ b/apps/proxy/hls_proxy @@ -0,0 +1,1227 @@ +""" +HLS Proxy Server with Advanced Stream Switching Support +This proxy handles HLS live streams with support for: +- Stream switching with proper discontinuity handling +- Buffer management +- Segment validation +- Connection pooling and reuse +""" + +from flask import Flask, Response, request, jsonify +import requests +import threading +import logging +import m3u8 +import time +from urllib.parse import urlparse, urljoin +import argparse +from typing import Optional, Dict, List, Set, Deque +import sys +import os + +# Initialize Flask app +app = Flask(__name__) + +# Global state management +manifest_buffer = None # Stores current manifest content +segment_buffers = {} # Maps sequence numbers to segment data +buffer_lock = threading.Lock() # Synchronizes access to buffers + +class Config: + """Configuration settings for stream handling and buffering""" + # Buffer size settings + MIN_SEGMENTS = 12 # Minimum segments to maintain + MAX_SEGMENTS = 16 # Maximum segments to store + WINDOW_SIZE = 12 # Number of segments in manifest window + INITIAL_SEGMENTS = 3 # Initial segments to buffer before playback + DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20' + INITIAL_CONNECTION_WINDOW = 10 # Seconds to wait for first client + CLIENT_TIMEOUT_FACTOR = 1.5 # Multiplier for target duration to determine client timeout + CLIENT_CLEANUP_INTERVAL = 10 # Seconds between client cleanup checks + FIRST_SEGMENT_TIMEOUT = 5.0 # Seconds to wait for first segment + + # Initial buffering settings + INITIAL_BUFFER_SECONDS = 25.0 # Initial buffer in seconds before allowing clients + MAX_INITIAL_SEGMENTS = 10 # Maximum segments to fetch during initialization + BUFFER_READY_TIMEOUT = 30.0 # Maximum time to wait for initial buffer (seconds) + +class StreamBuffer: + """ + Manages buffering of stream segments with thread-safe access. + + Attributes: + buffer (Dict[int, bytes]): Maps sequence numbers to segment data + lock (threading.Lock): Thread safety for buffer access + + Features: + - Thread-safe segment storage and retrieval + - Automatic cleanup of old segments + - Sequence number based indexing + """ + + def __init__(self): + self.buffer: Dict[int, bytes] = {} # Maps sequence numbers to segment data + self.lock: threading.Lock = threading.Lock() + + def __getitem__(self, key: int) -> Optional[bytes]: + """Get segment data by sequence number""" + return self.buffer.get(key) + + def __setitem__(self, key: int, value: bytes): + """Store segment data by sequence number""" + self.buffer[key] = value + # Cleanup old segments if we exceed MAX_SEGMENTS + if len(self.buffer) > Config.MAX_SEGMENTS: + keys = sorted(self.buffer.keys()) + # Keep the most recent MAX_SEGMENTS + to_remove = keys[:-Config.MAX_SEGMENTS] + for k in to_remove: + del self.buffer[k] + + def __contains__(self, key: int) -> bool: + """Check if sequence number exists in buffer""" + return key in self.buffer + + def keys(self) -> List[int]: + """Get list of available sequence numbers""" + return list(self.buffer.keys()) + + def cleanup(self, keep_sequences: List[int]): + """Remove segments not in keep list""" + for seq in list(self.buffer.keys()): + if seq not in keep_sequences: + del self.buffer[seq] + +class ClientManager: + """Manages client connections and activity tracking""" + + def __init__(self): + self.last_activity = {} # Maps client IPs to last activity timestamp + self.lock = threading.Lock() + + def record_activity(self, client_ip: str): + """Record client activity timestamp""" + with self.lock: + prev_time = self.last_activity.get(client_ip) + current_time = time.time() + self.last_activity[client_ip] = current_time + if not prev_time: + logging.info(f"New client connected: {client_ip}") + else: + logging.debug(f"Client activity: {client_ip}") + + def cleanup_inactive(self, timeout: float) -> bool: + """Remove inactive clients""" + now = time.time() + with self.lock: + active_clients = { + ip: last_time + for ip, last_time in self.last_activity.items() + if (now - last_time) < timeout + } + + removed = set(self.last_activity.keys()) - set(active_clients.keys()) + if removed: + for ip in removed: + inactive_time = now - self.last_activity[ip] + logging.warning(f"Client {ip} inactive for {inactive_time:.1f}s, removing") + + self.last_activity = active_clients + if active_clients: + oldest = min(now - t for t in active_clients.values()) + logging.debug(f"Active clients: {len(active_clients)}, oldest activity: {oldest:.1f}s ago") + + return len(active_clients) == 0 + +class StreamManager: + """ + Manages HLS stream state and switching logic. + + Attributes: + current_url (str): Current stream URL + channel_id (str): Unique channel identifier + running (bool): Stream activity flag + next_sequence (int): Next sequence number to assign + buffered_sequences (set): Currently buffered sequence numbers + source_changes (set): Sequences where stream source changed + + Features: + - Stream URL management + - Sequence number assignment + - Discontinuity tracking + - Thread coordination + - Buffer state management + """ + def __init__(self, initial_url: str, channel_id: str, user_agent: Optional[str] = None): + # Stream state + self.current_url = initial_url + self.channel_id = channel_id + self.user_agent = user_agent or Config.DEFAULT_USER_AGENT + self.running = True + self.switching_stream = False + + # Sequence tracking + self.next_sequence = 0 + self.highest_sequence = 0 + self.buffered_sequences = set() + self.downloaded_sources = {} + self.segment_durations = {} + + # Source tracking + self.current_source = None + self.source_changes = set() + self.stream_switch_count = 0 + + # Threading + self.fetcher = None + self.fetch_thread = None + self.url_changed = threading.Event() + + # Add manifest info + self.target_duration = 10.0 # Default, will be updated from manifest + self.manifest_version = 3 # Default, will be updated from manifest + + self.cleanup_thread = None + self.cleanup_running = False # New flag to control cleanup thread + self.cleanup_enabled = False # New flag to control when cleanup starts + self.initialization_time = time.time() # Add initialization timestamp + self.first_client_connected = False + self.cleanup_started = False # New flag to track cleanup state + + # Add client manager reference + self.client_manager = None + self.proxy_server = None # Reference to proxy server for cleanup + self.cleanup_thread = None + self.cleanup_interval = Config.CLIENT_CLEANUP_INTERVAL + + logging.info(f"Initialized stream manager for channel {channel_id}") + + # Buffer state tracking + self.buffer_ready = threading.Event() + self.buffered_duration = 0.0 + self.initial_buffering = True + + def update_url(self, new_url: str) -> bool: + """ + Handle stream URL changes with proper discontinuity marking. + + Args: + new_url: New stream URL to switch to + + Returns: + bool: True if URL changed, False if unchanged + + Side effects: + - Sets switching_stream flag + - Updates current_url + - Maintains sequence numbering + - Marks discontinuity point + - Signals fetch thread + """ + if new_url != self.current_url: + with buffer_lock: + self.switching_stream = True + self.current_url = new_url + + # Continue sequence numbering from last sequence + if self.buffered_sequences: + self.next_sequence = max(self.buffered_sequences) + 1 + + # Mark discontinuity at next sequence + self.source_changes.add(self.next_sequence) + + logging.info(f"Stream switch - next sequence will start at {self.next_sequence}") + + # Clear state but maintain sequence numbers + self.downloaded_sources.clear() + self.segment_durations.clear() + self.current_source = None + + # Signal thread to switch URL + self.url_changed.set() + + return True + return False + + def get_next_sequence(self, source_id: str) -> Optional[int]: + """ + Assign sequence numbers to segments with source change detection. + + Args: + source_id: Unique identifier for segment source + + Returns: + int: Next available sequence number + None: If segment already downloaded + + Side effects: + - Updates buffered sequences set + - Tracks source changes for discontinuity + - Maintains sequence numbering + """ + if source_id in self.downloaded_sources: + return None + + seq = self.next_sequence + while (seq in self.buffered_sequences): + seq += 1 + + # Track source changes for discontinuity markers + source_prefix = source_id.split('_')[0] + if not self.switching_stream and self.current_source and self.current_source != source_prefix: + self.source_changes.add(seq) + logging.debug(f"Source change detected at sequence {seq}") + self.current_source = source_prefix + + # Update tracking + self.downloaded_sources[source_id] = seq + self.buffered_sequences.add(seq) + self.next_sequence = seq + 1 + self.highest_sequence = max(self.highest_sequence, seq) + + return seq + + def _fetch_loop(self): + """Background thread for continuous stream fetching""" + while self.running: + try: + fetcher = StreamFetcher(self, self.buffer) + fetch_stream(fetcher, self.url_changed, self.next_sequence) + except Exception as e: + logging.error(f"Stream error: {e}") + time.sleep(5) # Wait before retry + + self.url_changed.clear() + + def start(self): + """Start the background fetch thread""" + if not self.fetch_thread or not self.fetch_thread.is_alive(): + self.running = True + self.fetch_thread = threading.Thread( + target=self._fetch_loop, + name="StreamFetcher", + daemon=True # Thread will exit when main program does + ) + self.fetch_thread.start() + logging.info("Stream manager started") + + def stop(self): + """Stop the stream manager and cleanup resources""" + self.running = False + self.cleanup_running = False + if self.fetch_thread and self.fetch_thread.is_alive(): + self.url_changed.set() + self.fetch_thread.join(timeout=5) + logging.info(f"Stream manager stopped for channel {self.channel_id}") + + def enable_cleanup(self): + """Enable cleanup after first client connects""" + if not self.first_client_connected: + self.first_client_connected = True + logging.info(f"First client connected to channel {self.channel_id}") + + def start_cleanup_thread(self): + """Start background thread for client activity monitoring""" + def cleanup_loop(): + # Wait for initial connection window + start_time = time.time() + while self.cleanup_running and (time.time() - start_time) < Config.INITIAL_CONNECTION_WINDOW: + if self.first_client_connected: + break + time.sleep(1) + + if not self.first_client_connected: + logging.info(f"Channel {self.channel_id}: No clients connected within {Config.INITIAL_CONNECTION_WINDOW}s window") + self.proxy_server.stop_channel(self.channel_id) + return + + # Normal client activity monitoring + while self.cleanup_running and self.running: + try: + timeout = self.target_duration * Config.CLIENT_TIMEOUT_FACTOR + if self.client_manager.cleanup_inactive(timeout): + logging.info(f"Channel {self.channel_id}: All clients disconnected for {timeout:.1f}s") + self.proxy_server.stop_channel(self.channel_id) + break + except Exception as e: + logging.error(f"Cleanup error: {e}") + if "cannot join current thread" not in str(e): + time.sleep(Config.CLIENT_CLEANUP_INTERVAL) + time.sleep(Config.CLIENT_CLEANUP_INTERVAL) + + if not self.cleanup_started: + self.cleanup_started = True + self.cleanup_running = True + self.cleanup_thread = threading.Thread( + target=cleanup_loop, + name=f"Cleanup-{self.channel_id}", + daemon=True + ) + self.cleanup_thread.start() + logging.info(f"Started cleanup thread for channel {self.channel_id}") + +class StreamFetcher: + """ + Handles HTTP requests for stream segments with connection pooling. + + Attributes: + manager (StreamManager): Associated stream manager instance + buffer (StreamBuffer): Buffer for storing segments + session (requests.Session): Persistent HTTP session + redirect_cache (dict): Cache for redirect responses + + Features: + - Connection pooling and reuse + - Redirect caching + - Rate limiting + - Automatic retries + - Host fallback + """ + def __init__(self, manager: StreamManager, buffer: StreamBuffer): + self.manager = manager + self.buffer = buffer + self.stream_url = manager.current_url + self.session = requests.Session() + + # Configure session headers + self.session.headers.update({ + 'User-Agent': manager.user_agent, + 'Connection': 'keep-alive' + }) + + # Set up connection pooling + adapter = requests.adapters.HTTPAdapter( + pool_connections=2, # Number of connection pools + pool_maxsize=4, # Connections per pool + max_retries=3, # Auto-retry failed requests + pool_block=False # Don't block when pool is full + ) + + # Apply adapter to both HTTP and HTTPS + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + + # Request optimization + self.last_request_time = 0 + self.min_request_interval = 0.05 # Minimum time between requests + self.last_host = None # Cache last successful host + self.redirect_cache = {} # Cache redirect responses + self.redirect_cache_limit = 1000 + + def cleanup_redirect_cache(self): + """Remove old redirect cache entries""" + if len(self.redirect_cache) > self.redirect_cache_limit: + self.redirect_cache.clear() + + def get_base_host(self, url: str) -> str: + """ + Extract base host from URL. + + Args: + url: Full URL to parse + + Returns: + str: Base host in format 'scheme://hostname' + + Example: + 'http://example.com/path' -> 'http://example.com' + """ + try: + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + except Exception as e: + logging.error(f"Error extracting base host: {e}") + return url + + def download(self, url: str) -> tuple[bytes, str]: + """ + Download content with connection reuse and redirect handling. + + Args: + url: URL to download from + + Returns: + tuple containing: + bytes: Downloaded content + str: Final URL after any redirects + + Features: + - Connection pooling/reuse + - Redirect caching + - Rate limiting + - Host fallback on failure + - Automatic retries + """ + now = time.time() + wait_time = self.last_request_time + self.min_request_interval - now + if (wait_time > 0): + time.sleep(wait_time) + + try: + # Use cached redirect if available + if url in self.redirect_cache: + logging.debug(f"Using cached redirect for {url}") + final_url = self.redirect_cache[url] + response = self.session.get(final_url, timeout=10) + else: + response = self.session.get(url, allow_redirects=True, timeout=10) + if response.history: # Cache redirects + logging.debug(f"Caching redirect for {url} -> {response.url}") + self.redirect_cache[url] = response.url + + self.last_request_time = time.time() + + if response.status_code == 200: + self.last_host = self.get_base_host(response.url) + + return response.content, response.url + + except Exception as e: + logging.error(f"Download error: {e}") + if self.last_host and not url.startswith(self.last_host): + # Use urljoin to handle path resolution + new_url = urljoin(self.last_host + '/', url.split('://')[-1].split('/', 1)[-1]) + logging.debug(f"Retrying with last host: {new_url}") + return self.download(new_url) + raise + + def fetch_loop(self): + """Main fetch loop for stream data""" + retry_delay = 1 + max_retry_delay = 8 + last_manifest_time = 0 + downloaded_segments = set() # Track downloaded segment URIs + + while self.manager.running: + try: + current_time = time.time() + + # Check manifest update timing + if last_manifest_time: + time_since_last = current_time - last_manifest_time + if time_since_last < (self.manager.target_duration * 0.5): + time.sleep(self.manager.target_duration * 0.5 - time_since_last) + continue + + # Get manifest data + manifest_data, final_url = self.download(self.manager.current_url) + manifest = m3u8.loads(manifest_data.decode()) + + # Update manifest info + if manifest.target_duration: + self.manager.target_duration = float(manifest.target_duration) + if manifest.version: + self.manager.manifest_version = manifest.version + + if not manifest.segments: + continue + + if self.manager.initial_buffering: + segments_to_fetch = [] + current_duration = 0.0 + successful_downloads = 0 # Initialize counter here + + # Start from the end of the manifest + for segment in reversed(manifest.segments): + current_duration += float(segment.duration) + segments_to_fetch.append(segment) + + # Stop when we have enough duration or hit max segments + if (current_duration >= Config.INITIAL_BUFFER_SECONDS or + len(segments_to_fetch) >= Config.MAX_INITIAL_SEGMENTS): + break + + # Reverse back to chronological order + segments_to_fetch.reverse() + + # Download initial segments + for segment in segments_to_fetch: + try: + segment_url = urljoin(final_url, segment.uri) + segment_data, _ = self.download(segment_url) + + validation = verify_segment(segment_data) + if validation.get('valid', False): + with self.buffer.lock: + seq = self.manager.next_sequence + self.buffer[seq] = segment_data + duration = float(segment.duration) + self.manager.segment_durations[seq] = duration + self.manager.buffered_duration += duration + self.manager.next_sequence += 1 + successful_downloads += 1 + logging.debug(f"Buffered initial segment {seq} (source: {segment.uri}, duration: {duration}s)") + except Exception as e: + logging.error(f"Initial segment download error: {e}") + + # Only mark buffer ready if we got some segments + if successful_downloads > 0: + self.manager.initial_buffering = False + self.manager.buffer_ready.set() + logging.info(f"Initial buffer ready with {successful_downloads} segments " + f"({self.manager.buffered_duration:.1f}s of content)") + continue + + # Normal operation - get latest segment if we haven't already + latest_segment = manifest.segments[-1] + if latest_segment.uri in downloaded_segments: + # Wait for next manifest update + time.sleep(self.manager.target_duration * 0.5) + continue + + try: + segment_url = urljoin(final_url, latest_segment.uri) + segment_data, _ = self.download(segment_url) + + # Try several times if segment validation fails + max_retries = 3 + retry_count = 0 + while retry_count < max_retries: + verification = verify_segment(segment_data) + if verification.get('valid', False): + break + logging.warning(f"Invalid segment, retry {retry_count + 1}/{max_retries}: {verification.get('error')}") + time.sleep(0.5) # Short delay before retry + segment_data, _ = self.download(segment_url) + retry_count += 1 + + if verification.get('valid', False): + with self.buffer.lock: + seq = self.manager.next_sequence + self.buffer[seq] = segment_data + self.manager.segment_durations[seq] = float(latest_segment.duration) + self.manager.next_sequence += 1 + downloaded_segments.add(latest_segment.uri) + logging.debug(f"Stored segment {seq} (source: {latest_segment.uri}, " + f"duration: {latest_segment.duration}s, " + f"size: {len(segment_data)})") + + # Update timing + last_manifest_time = time.time() + retry_delay = 1 # Reset retry delay on success + else: + logging.error(f"Segment validation failed after {max_retries} retries") + + except Exception as e: + logging.error(f"Segment download error: {e}") + continue + + # Cleanup old segment URIs from tracking + if len(downloaded_segments) > 100: + downloaded_segments.clear() + + except Exception as e: + logging.error(f"Fetch error: {e}") + time.sleep(retry_delay) + retry_delay = min(retry_delay * 2, max_retry_delay) + +def get_segment_sequence(segment_uri: str) -> Optional[int]: + """ + Extract sequence number from segment URI pattern. + + Args: + segment_uri: Segment filename or path + + Returns: + int: Extracted sequence number if found + None: If no valid sequence number can be extracted + + Handles common patterns like: + - Numerical sequences (e.g., segment_1234.ts) + - Complex patterns with stream IDs (e.g., stream_123_456.ts) + """ + + try: + # Try numerical sequence (e.g., 1038_3693.ts) + if '_' in segment_uri: + return int(segment_uri.split('_')[-1].split('.')[0]) + return None + except ValueError: + return None + +# Update verify_segment with more thorough checks +def verify_segment(data: bytes) -> dict: + """ + Verify MPEG-TS segment integrity and structure. + + Args: + data: Raw segment data bytes + + Returns: + dict containing: + valid (bool): True if segment passes all checks + packets (int): Number of valid packets found + size (int): Total segment size in bytes + error (str): Description if validation fails + + Checks: + - Minimum size requirements + - Packet size alignment + - Sync byte presence + - Transport error indicators + """ + + # Check minimum size + if len(data) < 188: + return {'valid': False, 'error': 'Segment too short'} + + # Verify segment size is multiple of packet size + if len(data) % 188 != 0: + return {'valid': False, 'error': 'Invalid segment size'} + + valid_packets = 0 + total_packets = len(data) // 188 + + # Scan all packets in segment + for i in range(0, len(data), 188): + packet = data[i:i+188] + + # Check packet completeness + if len(packet) != 188: + return {'valid': False, 'error': 'Incomplete packet'} + + # Verify sync byte + if packet[0] != 0x47: + return {'valid': False, 'error': f'Invalid sync byte at offset {i}'} + + # Check transport error indicator + if packet[1] & 0x80: + return {'valid': False, 'error': 'Transport error indicator set'} + + valid_packets += 1 + + return { + 'valid': True, + 'packets': valid_packets, + 'size': len(data) + } + +def fetch_stream(fetcher: StreamFetcher, stop_event: threading.Event, start_sequence: int = 0): + """ + Main streaming function that handles manifest updates and segment downloads. + + Args: + fetcher: StreamFetcher instance to handle HTTP requests + stop_event: Threading event to signal when to stop fetching + start_sequence: Initial sequence number to start from + + The function implements the core HLS fetching logic: + - Fetches and parses manifest files + - Downloads new segments when they become available + - Handles stream switches with proper discontinuity marking + - Maintains buffer state and segment sequence numbering + """ + # Remove global stream_manager reference + retry_delay = 1 + max_retry_delay = 8 + last_segment_time = 0 + buffer_initialized = False + manifest_update_needed = True + segment_duration = None + + while not stop_event.is_set(): + try: + now = time.time() + + # Only update manifest when it's time for next segment + should_update = ( + manifest_update_needed or + not segment_duration or + (last_segment_time and (now - last_segment_time) >= segment_duration * 0.8) + ) + + if should_update: + manifest_data, final_url = fetcher.download(fetcher.stream_url) + manifest = m3u8.loads(manifest_data.decode()) + + if not manifest.segments: + continue + + with buffer_lock: + manifest_content = manifest_data.decode() + new_segments = {} + + if fetcher.manager.switching_stream: # Use fetcher.manager instead of stream_manager + # Stream switch - only get latest segment + manifest_segments = [manifest.segments[-1]] + seq_start = fetcher.manager.next_sequence + max_segments = 1 + logging.debug(f"Processing stream switch - getting latest segment at sequence {seq_start}") + elif not buffer_initialized: + # Initial buffer + manifest_segments = manifest.segments[-Config.INITIAL_SEGMENTS:] + seq_start = fetcher.manager.next_sequence + max_segments = Config.INITIAL_SEGMENTS + logging.debug(f"Starting initial buffer at sequence {seq_start}") + else: + # Normal operation + manifest_segments = [manifest.segments[-1]] + seq_start = fetcher.manager.next_sequence + max_segments = 1 + + # Map segments + segments_mapped = 0 + for segment in manifest_segments: + if segments_mapped >= max_segments: + break + + source_id = segment.uri.split('/')[-1].split('.')[0] + next_seq = fetcher.manager.get_next_sequence(source_id) + + if next_seq is not None: + duration = float(segment.duration) + new_segments[next_seq] = { + 'uri': segment.uri, + 'duration': duration, + 'source_id': source_id + } + fetcher.manager.segment_durations[next_seq] = duration + segments_mapped += 1 + + manifest_buffer = manifest_content + + # Download segments + for sequence_id, segment_info in new_segments.items(): + try: + segment_url = f"{fetcher.last_host}{segment_info['uri']}" + logging.debug(f"Downloading {segment_info['uri']} as segment {sequence_id}.ts " + f"(source: {segment_info['source_id']}, duration: {segment_info['duration']:.3f}s)") + + segment_data, _ = fetcher.download(segment_url) + validation = verify_segment(segment_data) + + if validation.get('valid', False): + with buffer_lock: + segment_buffers[sequence_id] = segment_data + logging.debug(f"Downloaded and verified segment {sequence_id} (packets: {validation['packets']})") + + if fetcher.manager.switching_stream: + fetcher.manager.switching_stream = False + stop_event.set() # Force fetcher restart with new URL + break + elif not buffer_initialized and len(segment_buffers) >= Config.INITIAL_SEGMENTS: + buffer_initialized = True + manifest_update_needed = True + break + except Exception as e: + logging.error(f"Segment download error: {e}") + continue + + else: + # Short sleep to prevent CPU spinning + threading.Event().wait(0.1) + + except Exception as e: + logging.error(f"Manifest error: {e}") + threading.Event().wait(retry_delay) + retry_delay = min(retry_delay * 2, max_retry_delay) + manifest_update_needed = True + + + +@app.before_request +def log_request_info(): + """ + Log client connections and important requests. + + Logs: + INFO: + - First manifest request from new client + - Stream switch requests + DEBUG: + - All other requests + + Format: + {client_ip} - {method} {path} + """ + if request.path == '/stream.m3u8' and not segment_buffers: + # First manifest request from a client + logging.info(f"New client connected from {request.remote_addr}") + elif request.path.startswith('/change_stream'): + # Keep stream switch requests as INFO + logging.info(f"Stream switch requested from {request.remote_addr}") + else: + # Move routine requests to DEBUG + logging.debug(f"{request.remote_addr} - {request.method} {request.path}") + +# Configure Werkzeug logger to DEBUG +logging.getLogger('werkzeug').setLevel(logging.DEBUG) + +class ProxyServer: + """Manages HLS proxy server instance""" + + def __init__(self, user_agent: Optional[str] = None): + self.app = Flask(__name__) + self.stream_managers: Dict[str, StreamManager] = {} + self.stream_buffers: Dict[str, StreamBuffer] = {} + self.client_managers: Dict[str, ClientManager] = {} + self.fetch_threads: Dict[str, threading.Thread] = {} + self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT + self._setup_routes() + + def _setup_routes(self) -> None: + """Configure Flask routes""" + self.app.add_url_rule( + '/stream/', # Changed from //stream.m3u8 + view_func=self.stream_endpoint + ) + self.app.add_url_rule( + '/stream//segments/', # Updated to match new pattern + view_func=self.get_segment + ) + self.app.add_url_rule( + '/change_stream/', # Changed from //change_stream + view_func=self.change_stream, + methods=['POST'] + ) + + def initialize_channel(self, url: str, channel_id: str) -> None: + """Initialize a new channel stream""" + if channel_id in self.stream_managers: + self.stop_channel(channel_id) + + manager = StreamManager( + url, + channel_id, + user_agent=self.user_agent + ) + buffer = StreamBuffer() + client_manager = ClientManager() + + # Set up references + manager.client_manager = client_manager + manager.proxy_server = self + + # Store resources + self.stream_managers[channel_id] = manager + self.stream_buffers[channel_id] = buffer + self.client_managers[channel_id] = client_manager + + # Create and store fetcher + fetcher = StreamFetcher(manager, buffer) + manager.fetcher = fetcher + + # Start fetch thread + self.fetch_threads[channel_id] = threading.Thread( + target=fetcher.fetch_loop, + name=f"StreamFetcher-{channel_id}", + daemon=True + ) + self.fetch_threads[channel_id].start() + + # Start cleanup monitoring immediately + manager.start_cleanup_thread() + + logging.info(f"Initialized channel {channel_id} with URL {url}") + + def stop_channel(self, channel_id: str) -> None: + """Stop and cleanup a channel""" + if channel_id in self.stream_managers: + self.stream_managers[channel_id].stop() + if channel_id in self.fetch_threads: + self.fetch_threads[channel_id].join(timeout=5) + self._cleanup_channel(channel_id) + + def _cleanup_channel(self, channel_id: str) -> None: + """ + Remove all resources associated with a channel. + + Args: + channel_id: Channel to cleanup + + Removes: + - Stream manager instance + - Segment buffer + - Client manager + - Fetch thread reference + + Thread safety: + Should only be called after stream manager is stopped + and fetch thread has completed + """ + + for collection in [self.stream_managers, self.stream_buffers, + self.client_managers, self.fetch_threads]: + collection.pop(channel_id, None) + + def run(self, host: str = '0.0.0.0', port: int = 5000) -> None: + """Start the proxy server""" + try: + self.app.run(host=host, port=port, threaded=True) + except KeyboardInterrupt: + logging.info("Shutting down gracefully...") + self.shutdown() + except Exception as e: + logging.error(f"Server error: {e}") + self.shutdown() + raise + + def shutdown(self) -> None: + """ + Stop all channels and cleanup resources. + + Steps: + 1. Stop all active stream managers + 2. Join fetch threads + 3. Clean up channel resources + 4. Release system resources + + Thread Safety: + Safe to call from signal handlers or during shutdown + """ + for channel_id in list(self.stream_managers.keys()): + self.stop_channel(channel_id) + + def stream_endpoint(self, channel_id: str): + """Flask route handler for serving HLS manifests.""" + if channel_id not in self.stream_managers: + return Response('Channel not found', status=404) + + manager = self.stream_managers[channel_id] + + # Wait for initial buffer + if not manager.buffer_ready.wait(Config.BUFFER_READY_TIMEOUT): + logging.error(f"Timeout waiting for initial buffer for channel {channel_id}") + return Response('Initial buffer not ready', status=503) + + try: + if (channel_id not in self.stream_managers) or (not self.stream_managers[channel_id].running): + return Response('Channel not found', status=404) + + manager = self.stream_managers[channel_id] + buffer = self.stream_buffers[channel_id] + + # Record client activity and enable cleanup + client_ip = request.remote_addr + manager.enable_cleanup() + self.client_managers[channel_id].record_activity(client_ip) + + # Wait for first segment with timeout + start_time = time.time() + while True: + with buffer.lock: + available = sorted(buffer.keys()) + if available: + break + + if time.time() - start_time > Config.FIRST_SEGMENT_TIMEOUT: + logging.warning(f"Timeout waiting for first segment for channel {channel_id}") + return Response('No segments available', status=503) + + time.sleep(0.1) # Short sleep to prevent CPU spinning + + # Rest of manifest generation code... + with buffer.lock: + max_seq = max(available) + # Find the first segment after any discontinuity + discontinuity_start = min(available) + for seq in available: + if seq in manager.source_changes: + discontinuity_start = seq + break + + # Calculate window bounds starting from discontinuity + if len(available) <= Config.INITIAL_SEGMENTS: + min_seq = discontinuity_start + else: + min_seq = max( + discontinuity_start, + max_seq - Config.WINDOW_SIZE + 1 + ) + + # Build manifest with proper tags + new_manifest = ['#EXTM3U'] + new_manifest.append(f'#EXT-X-VERSION:{manager.manifest_version}') + new_manifest.append(f'#EXT-X-MEDIA-SEQUENCE:{min_seq}') + new_manifest.append(f'#EXT-X-TARGETDURATION:{int(manager.target_duration)}') + + # Filter segments within window + window_segments = [s for s in available if min_seq <= s <= max_seq] + + # Add segments with discontinuity handling + for seq in window_segments: + if seq in manager.source_changes: + new_manifest.append('#EXT-X-DISCONTINUITY') + logging.debug(f"Added discontinuity marker before segment {seq}") + + duration = manager.segment_durations.get(seq, 10.0) + new_manifest.append(f'#EXTINF:{duration},') + new_manifest.append(f'/stream/{channel_id}/segments/{seq}.ts') + + manifest_content = '\n'.join(new_manifest) + logging.debug(f"Serving manifest with segments {min_seq}-{max_seq} (window: {len(window_segments)})") + return Response(manifest_content, content_type='application/vnd.apple.mpegurl') + except ConnectionAbortedError: + logging.debug("Client disconnected") + return '', 499 + except Exception as e: + logging.error(f"Stream endpoint error: {e}") + return '', 500 + + def get_segment(self, channel_id: str, segment_name: str): + """ + Serve individual MPEG-TS segments to clients. + + Args: + channel_id: Unique identifier for the channel + segment_name: Segment filename (e.g., '123.ts') + + Returns: + Flask Response: + - MPEG-TS segment data with video/MP2T content type + - 404 if segment or channel not found + + Error Handling: + - Logs warning if segment not found + - Logs error on unexpected exceptions + - Returns 404 on any error + """ + if channel_id not in self.stream_managers: + return Response('Channel not found', status=404) + + try: + # Record client activity + client_ip = request.remote_addr + self.client_managers[channel_id].record_activity(client_ip) + + segment_id = int(segment_name.split('.')[0]) + buffer = self.stream_buffers[channel_id] + + with buffer_lock: + if segment_id in buffer: + return Response(buffer[segment_id], content_type='video/MP2T') + + logging.warning(f"Segment {segment_id} not found for channel {channel_id}") + except Exception as e: + logging.error(f"Error serving segment {segment_name}: {e}") + return '', 404 + + def change_stream(self, channel_id: str): + """ + Handle stream URL changes via POST request. + + Args: + channel_id: Channel to modify + + Expected JSON body: + { + "url": "new_stream_url" + } + + Returns: + JSON response with: + - Success/failure message + - Channel ID + - New/current URL + - HTTP 404 if channel not found + - HTTP 400 if URL missing from request + + Side effects: + - Updates stream manager URL + - Triggers stream switch sequence + - Maintains segment numbering + """ + if channel_id not in self.stream_managers: + return jsonify({'error': 'Channel not found'}), 404 + + new_url = request.json.get('url') + if not new_url: + return jsonify({'error': 'No URL provided'}), 400 + + manager = self.stream_managers[channel_id] + if manager.update_url(new_url): + return jsonify({ + 'message': 'Stream URL updated', + 'channel': channel_id, + 'url': new_url + }) + return jsonify({ + 'message': 'URL unchanged', + 'channel': channel_id, + 'url': new_url + }) + + @app.before_request + def log_request_info(): + """ + Log client connections and important requests. + + Log Levels: + INFO: + - First manifest request from new client + - Stream switch requests + DEBUG: + - Segment requests + - Routine manifest updates + + Format: + "{client_ip} - {method} {path}" + + Side Effects: + - Updates logging configuration based on request type + - Tracks client connections + """ + +# Main Application Setup +if __name__ == '__main__': + # Command line argument parsing + parser = argparse.ArgumentParser(description='HLS Proxy Server with Stream Switching') + parser.add_argument( + '--url', '-u', + required=True, + help='Initial HLS stream URL to proxy' + ) + parser.add_argument( + '--channel', '-c', + required=True, + help='Channel ID for the stream (default: default)' + ) + parser.add_argument( + '--port', '-p', + type=int, + default=5000, + help='Local port to serve proxy on (default: 5000)' + ) + parser.add_argument( + '--host', '-H', + default='0.0.0.0', + help='Interface to bind server to (default: all interfaces)' + ) + parser.add_argument( + '--user-agent', '-ua', + help='Custom User-Agent string' + ) + parser.add_argument( + '--debug', + action='store_true', + help='Enable debug logging' + ) + args = parser.parse_args() + + # Configure logging + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + try: + # Initialize proxy server + proxy = ProxyServer(user_agent=args.user_agent) + + # Initialize channel with provided URL + proxy.initialize_channel(args.url, args.channel) + + logging.info(f"Starting HLS proxy server on {args.host}:{args.port}") + logging.info(f"Initial stream URL: {args.url}") + logging.info(f"Channel ID: {args.channel}") + + # Run Flask development server + proxy.run(host=args.host, port=args.port) + + except Exception as e: + logging.error(f"Failed to start server: {e}") + sys.exit(1) + finally: + if 'proxy' in locals(): + proxy.shutdown() diff --git a/apps/proxy/ts_proxy b/apps/proxy/ts_proxy new file mode 100644 index 00000000..ec6d62d7 --- /dev/null +++ b/apps/proxy/ts_proxy @@ -0,0 +1,323 @@ +""" +Transport Stream (TS) Proxy Server +Handles live TS stream proxying with support for: +- Stream switching +- Buffer management +- Multiple client connections +- Connection state tracking +""" + +from flask import Flask, Response, request, jsonify +import requests +import threading +import logging +from collections import deque +import time +import os +from typing import Optional, Set, Deque, Dict + +# Configuration +class Config: + CHUNK_SIZE: int = 8192 # Buffer chunk size (bytes) + BUFFER_SIZE: int = 1000 # Number of chunks to keep in memory + RECONNECT_DELAY: int = 5 # Seconds between reconnection attempts + CLIENT_POLL_INTERVAL: float = 0.1 # Seconds between client buffer checks + MAX_RETRIES: int = 3 # Maximum connection retry attempts + DEFAULT_USER_AGENT: str = 'VLC/3.0.20 LibVLC/3.0.20' # Default user agent + +class StreamManager: + """Manages TS stream state and connection handling""" + + def __init__(self, initial_url: str, channel_id: str, user_agent: Optional[str] = None): + self.current_url: str = initial_url + self.channel_id: str = channel_id + self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT + self.url_changed: threading.Event = threading.Event() + self.running: bool = True + self.session: requests.Session = self._create_session() + self.connected: bool = False + self.retry_count: int = 0 + logging.info(f"Initialized stream manager for channel {channel_id}") + + def _create_session(self) -> requests.Session: + """Create and configure requests session""" + session = requests.Session() + session.headers.update({ + 'User-Agent': self.user_agent, + 'Connection': 'keep-alive' + }) + return session + + def update_url(self, new_url: str) -> bool: + """Update stream URL and signal connection change""" + if new_url != self.current_url: + logging.info(f"Stream switch initiated: {self.current_url} -> {new_url}") + self.current_url = new_url + self.connected = False + self.url_changed.set() + return True + return False + + def should_retry(self) -> bool: + """Check if connection retry is allowed""" + return self.retry_count < Config.MAX_RETRIES + + def stop(self) -> None: + """Clean shutdown of stream manager""" + self.running = False + if self.session: + self.session.close() + +class StreamBuffer: + """Manages stream data buffering""" + + def __init__(self): + self.buffer: Deque[bytes] = deque(maxlen=Config.BUFFER_SIZE) + self.lock: threading.Lock = threading.Lock() + self.index: int = 0 + +class ClientManager: + """Manages active client connections""" + + def __init__(self): + self.active_clients: Set[int] = set() + self.lock: threading.Lock = threading.Lock() + + def add_client(self, client_id: int) -> None: + """Add new client connection""" + with self.lock: + self.active_clients.add(client_id) + logging.info(f"New client connected: {client_id} (total: {len(self.active_clients)})") + + def remove_client(self, client_id: int) -> int: + """Remove client and return remaining count""" + with self.lock: + self.active_clients.remove(client_id) + remaining = len(self.active_clients) + logging.info(f"Client disconnected: {client_id} (remaining: {remaining})") + return remaining + +class StreamFetcher: + """Handles stream data fetching""" + + def __init__(self, manager: StreamManager, buffer: StreamBuffer): + self.manager = manager + self.buffer = buffer + + def fetch_loop(self) -> None: + """Main fetch loop for stream data""" + while self.manager.running: + try: + if not self._handle_connection(): + continue + + with self.manager.session.get(self.manager.current_url, stream=True) as response: + if response.status_code == 200: + self._handle_successful_connection() + self._process_stream(response) + + except requests.exceptions.RequestException as e: + self._handle_connection_error(e) + + def _handle_connection(self) -> bool: + """Handle connection state and retries""" + if not self.manager.connected: + if not self.manager.should_retry(): + logging.error(f"Failed to connect after {Config.MAX_RETRIES} attempts") + return False + + if not self.manager.running: + return False + + self.manager.retry_count += 1 + logging.info(f"Connecting to stream: {self.manager.current_url} " + f"(attempt {self.manager.retry_count}/{Config.MAX_RETRIES})") + return True + + def _handle_successful_connection(self) -> None: + """Handle successful stream connection""" + if not self.manager.connected: + logging.info("Stream connected successfully") + self.manager.connected = True + self.manager.retry_count = 0 + + def _process_stream(self, response: requests.Response) -> None: + """Process incoming stream data""" + for chunk in response.iter_content(chunk_size=Config.CHUNK_SIZE): + if not self.manager.running: + logging.info("Stream fetch stopped - shutting down") + return + + if chunk: + if self.manager.url_changed.is_set(): + logging.info("Stream switch in progress, closing connection") + self.manager.url_changed.clear() + break + + with self.buffer.lock: + self.buffer.buffer.append(chunk) + self.buffer.index += 1 + + def _handle_connection_error(self, error: Exception) -> None: + """Handle stream connection errors""" + logging.error(f"Stream connection error: {error}") + self.manager.connected = False + + if not self.manager.running: + return + + logging.info(f"Attempting to reconnect in {Config.RECONNECT_DELAY} seconds...") + if not wait_for_running(self.manager, Config.RECONNECT_DELAY): + return + +def wait_for_running(manager: StreamManager, delay: float) -> bool: + """Wait while checking manager running state""" + start = time.time() + while time.time() - start < delay: + if not manager.running: + return False + threading.Event().wait(0.1) + return True + +class ProxyServer: + """Manages TS proxy server instance""" + + def __init__(self, user_agent: Optional[str] = None): + self.app = Flask(__name__) + self.stream_managers: Dict[str, StreamManager] = {} + self.stream_buffers: Dict[str, StreamBuffer] = {} + self.client_managers: Dict[str, ClientManager] = {} + self.fetch_threads: Dict[str, threading.Thread] = {} + self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT + self._setup_routes() + + def _setup_routes(self) -> None: + """Configure Flask routes""" + self.app.route('/stream/')(self.stream_endpoint) + self.app.route('/change_stream/', methods=['POST'])(self.change_stream) + + def initialize_channel(self, url: str, channel_id: str) -> None: + """Initialize a new channel stream""" + if channel_id in self.stream_managers: + self.stop_channel(channel_id) + + self.stream_managers[channel_id] = StreamManager( + url, + channel_id, + user_agent=self.user_agent + ) + self.stream_buffers[channel_id] = StreamBuffer() + self.client_managers[channel_id] = ClientManager() + + fetcher = StreamFetcher( + self.stream_managers[channel_id], + self.stream_buffers[channel_id] + ) + + self.fetch_threads[channel_id] = threading.Thread( + target=fetcher.fetch_loop, + name=f"StreamFetcher-{channel_id}", + daemon=True + ) + self.fetch_threads[channel_id].start() + logging.info(f"Initialized channel {channel_id} with URL {url}") + + def stop_channel(self, channel_id: str) -> None: + """Stop and cleanup a channel""" + if channel_id in self.stream_managers: + self.stream_managers[channel_id].stop() + if channel_id in self.fetch_threads: + self.fetch_threads[channel_id].join(timeout=5) + self._cleanup_channel(channel_id) + + def _cleanup_channel(self, channel_id: str) -> None: + """Remove channel resources""" + for collection in [self.stream_managers, self.stream_buffers, + self.client_managers, self.fetch_threads]: + collection.pop(channel_id, None) + + def stream_endpoint(self, channel_id: str): + """Stream endpoint that serves TS data to clients""" + if channel_id not in self.stream_managers: + return Response('Channel not found', status=404) + + def generate(): + client_id = threading.get_ident() + buffer = self.stream_buffers[channel_id] + client_manager = self.client_managers[channel_id] + + client_manager.add_client(client_id) + last_index = buffer.index + + try: + while True: + with buffer.lock: + if buffer.index > last_index: + chunks_behind = buffer.index - last_index + start_pos = max(0, len(buffer.buffer) - chunks_behind) + + for i in range(start_pos, len(buffer.buffer)): + yield buffer.buffer[i] + last_index = buffer.index + + threading.Event().wait(Config.CLIENT_POLL_INTERVAL) + except GeneratorExit: + remaining = client_manager.remove_client(client_id) + if remaining == 0: + logging.info(f"No clients remaining for channel {channel_id}") + self.stop_channel(channel_id) + + return Response(generate(), content_type='video/mp2t') + + def change_stream(self, channel_id: str): + """Handle stream URL changes""" + if channel_id not in self.stream_managers: + return jsonify({'error': 'Channel not found'}), 404 + + new_url = request.json.get('url') + if not new_url: + return jsonify({'error': 'No URL provided'}), 400 + + manager = self.stream_managers[channel_id] + if manager.update_url(new_url): + return jsonify({ + 'message': 'Stream URL updated', + 'channel': channel_id, + 'url': new_url + }) + return jsonify({ + 'message': 'URL unchanged', + 'channel': channel_id, + 'url': new_url + }) + + def run(self, host: str = '0.0.0.0', port: int = 5000) -> None: + """Start the proxy server""" + self.app.run(host=host, port=port, threaded=True) + + def shutdown(self) -> None: + """Stop all channels and cleanup""" + for channel_id in list(self.stream_managers.keys()): + self.stop_channel(channel_id) + +def main(): + """Initialize and start the proxy server""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.getLogger('werkzeug').setLevel(logging.DEBUG) + + proxy_server = ProxyServer() + initial_url = os.getenv('STREAM_URL', 'http://example.com/stream.ts') + proxy_server.initialize_channel(initial_url, "default_channel") + + try: + proxy_server.run() + finally: + proxy_server.shutdown() + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt index 618c84b3..c184e82a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,5 +14,6 @@ yt-dlp gevent==24.11.1 django-cors-headers djangorestframework-simplejwt +m3u8 rapidfuzz==3.12.1 sentence-transformers==3.4.1