From 250955309565b05190637cd1bb29c554792154d2 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Sun, 9 Mar 2025 15:10:56 -0500 Subject: [PATCH] Streaming is working very well. --- apps/proxy/config.py | 2 +- apps/proxy/ts_proxy/server.py | 14 ++++++----- apps/proxy/ts_proxy/views.py | 44 +++++++++++++++++++++++++---------- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/apps/proxy/config.py b/apps/proxy/config.py index c9d82f0d..cda48d15 100644 --- a/apps/proxy/config.py +++ b/apps/proxy/config.py @@ -37,7 +37,7 @@ class TSConfig(BaseConfig): HEALTH_CHECK_INTERVAL = 5 # Check stream health every N seconds # Redis settings - REDIS_CHUNK_EXPIRY = 3600 # Chunks expire after 1 hour in Redis + REDIS_CHUNK_TTL = 3600 # Chunks expire after 1 hour in Redis (renamed from REDIS_CHUNK_EXPIRY) # User agent DEFAULT_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' \ No newline at end of file diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 33d6a730..b13b2b1e 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -40,10 +40,10 @@ class StreamManager: self.socket = None self.ready_event = threading.Event() self.retry_count = 0 - self.max_retries = 3 + self.max_retries = Config.MAX_RETRIES # User agent for connection - self.user_agent = user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + self.user_agent = user_agent or Config.DEFAULT_USER_AGENT # TS packet handling self.TS_PACKET_SIZE = 188 @@ -54,7 +54,7 @@ class StreamManager: # Stream health monitoring self.last_data_time = time.time() self.healthy = True - self.health_check_interval = 5 # Check health every 5 seconds + self.health_check_interval = Config.HEALTH_CHECK_INTERVAL # Buffer management self._last_buffer_check = time.time() @@ -722,10 +722,12 @@ class ProxyServer: self.redis_client = None logging.error(f"Failed to connect to Redis: {e}") - def initialize_channel(self, url, channel_id): - """Initialize a channel with enhanced logging""" + def initialize_channel(self, url, channel_id, user_agent=None): + """Initialize a channel with enhanced logging and user-agent support""" try: logging.info(f"Initializing channel {channel_id} with URL: {url}") + if user_agent: + logging.info(f"Using custom User-Agent: {user_agent}") # Clean up any existing Redis entries for this channel if self.redis_client: @@ -773,7 +775,7 @@ class ProxyServer: self.stream_buffers[channel_id] = buffer - stream_manager = StreamManager(url, buffer) + stream_manager = StreamManager(url, buffer, user_agent=user_agent) logging.debug(f"Created StreamManager for channel {channel_id}") self.stream_managers[channel_id] = stream_manager diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index e5c043b7..81ece613 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -29,15 +29,18 @@ proxy_server = ProxyServer() @csrf_exempt @require_http_methods(["POST"]) def initialize_stream(request, channel_id): - """Initialize a new stream channel""" + """Initialize a new stream channel with optional user-agent""" try: data = json.loads(request.body) url = data.get('url') if not url: return JsonResponse({'error': 'No URL provided'}, status=400) - # Start the channel - proxy_server.initialize_channel(url, channel_id) + # Get optional user_agent from request + user_agent = data.get('user_agent') + + # Start the channel with user_agent if provided + proxy_server.initialize_channel(url, channel_id, user_agent) # Wait for connection to be established manager = proxy_server.stream_managers[channel_id] @@ -90,8 +93,9 @@ def stream_ts(request, channel_id): logger.error(f"[{client_id}] No buffer/stream manager for channel {channel_id}") return - # Client state tracking - local_index = max(0, buffer.index - 30) # Start 30 chunks behind + # Client state tracking - use config for initial position + local_index = max(0, buffer.index - Config.INITIAL_BEHIND_CHUNKS) + initial_position = local_index last_yield_time = time.time() empty_reads = 0 bytes_sent = 0 @@ -99,9 +103,9 @@ def stream_ts(request, channel_id): stream_start_time = time.time() consecutive_empty = 0 # Track consecutive empty reads - # Timing parameters + # Timing parameters from config ts_packet_size = 188 - target_bitrate = 8000000 # ~8 Mbps + target_bitrate = Config.TARGET_BITRATE packets_per_second = target_bitrate / (8 * ts_packet_size) logger.info(f"[{client_id}] Starting stream at index {local_index} (buffer at {buffer.index})") @@ -109,7 +113,7 @@ def stream_ts(request, channel_id): # Main streaming loop while True: # Get chunks at client's position - chunks = buffer.get_chunks_exact(local_index, 5) + chunks = buffer.get_chunks_exact(local_index, Config.CHUNK_BATCH_SIZE) if chunks: # Reset empty counters since we got data @@ -124,12 +128,28 @@ def stream_ts(request, channel_id): logger.debug(f"[{client_id}] Retrieved {len(chunks)} chunks ({total_size} bytes) from index {start_idx} to {end_idx}") + # Calculate total packet count for this batch to maintain timing + total_packets = sum(len(chunk) // ts_packet_size for chunk in chunks) + batch_start_time = time.time() + packets_sent_in_batch = 0 + # Send chunks with pacing for chunk in chunks: + packets_in_chunk = len(chunk) // ts_packet_size bytes_sent += len(chunk) chunks_sent += 1 yield chunk - time.sleep(0.01) # Small spacing between chunks + + # Pacing logic + packets_sent_in_batch += packets_in_chunk + elapsed = time.time() - batch_start_time + target_time = packets_sent_in_batch / packets_per_second + + # If we're sending too fast, add a small delay + if elapsed < target_time and packets_sent_in_batch < total_packets: + sleep_time = min(target_time - elapsed, 0.05) + if sleep_time > 0.001: + time.sleep(sleep_time) # Log progress periodically if chunks_sent % 100 == 0: @@ -162,7 +182,7 @@ def stream_ts(request, channel_id): bytes_sent += len(keepalive_packet) last_yield_time = time.time() consecutive_empty = 0 # Reset consecutive counter but keep total empty_reads - time.sleep(0.5) # Longer sleep after keepalive + time.sleep(Config.KEEPALIVE_INTERVAL) else: # Standard wait sleep_time = min(0.1 * consecutive_empty, 1.0) # Progressive backoff up to 1s @@ -173,8 +193,8 @@ def stream_ts(request, channel_id): logger.debug(f"[{client_id}] Waiting for chunks beyond {local_index} (buffer at {buffer.index}, stream health: {stream_manager.healthy})") # Disconnect after long inactivity, but only if stream is dead - if time.time() - last_yield_time > 30 and not stream_manager.healthy: - logger.warning(f"[{client_id}] No data for 30s and stream unhealthy, disconnecting") + if time.time() - last_yield_time > Config.STREAM_TIMEOUT and not stream_manager.healthy: + logger.warning(f"[{client_id}] No data for {Config.STREAM_TIMEOUT}s and stream unhealthy, disconnecting") break except Exception as e: