mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 18:54:58 +00:00
Improved try_next_stream to not fail if the returned stream is the same. It will now try a different stream. Force a client to jump ahead in the buffer if they fall to far behind.
340 lines
14 KiB
Python
340 lines
14 KiB
Python
"""Buffer management for TS streams"""
|
|
|
|
import threading
|
|
import logging
|
|
import time
|
|
from collections import deque
|
|
from typing import Optional, Deque
|
|
import random
|
|
from apps.proxy.config import TSConfig as Config
|
|
from .redis_keys import RedisKeys
|
|
from .config_helper import ConfigHelper
|
|
from .constants import TS_PACKET_SIZE
|
|
from .utils import get_logger
|
|
import gevent.event
|
|
import gevent # Make sure this import is at the top
|
|
|
|
logger = get_logger()
|
|
|
|
class StreamBuffer:
|
|
"""Manages stream data buffering with optimized chunk storage"""
|
|
|
|
def __init__(self, channel_id=None, redis_client=None):
|
|
self.channel_id = channel_id
|
|
self.redis_client = redis_client
|
|
self.lock = threading.Lock()
|
|
self.index = 0
|
|
self.TS_PACKET_SIZE = TS_PACKET_SIZE
|
|
|
|
# STANDARDIZED KEYS: Use RedisKeys class instead of hardcoded patterns
|
|
self.buffer_index_key = RedisKeys.buffer_index(channel_id) if channel_id else ""
|
|
self.buffer_prefix = RedisKeys.buffer_chunk_prefix(channel_id) if channel_id else ""
|
|
|
|
self.chunk_ttl = ConfigHelper.redis_chunk_ttl()
|
|
|
|
# Initialize from Redis if available
|
|
if self.redis_client and channel_id:
|
|
try:
|
|
current_index = self.redis_client.get(self.buffer_index_key)
|
|
if current_index:
|
|
self.index = int(current_index)
|
|
logger.info(f"Initialized buffer from Redis with index {self.index}")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing buffer from Redis: {e}")
|
|
|
|
self._write_buffer = bytearray()
|
|
self.target_chunk_size = ConfigHelper.get('BUFFER_CHUNK_SIZE', TS_PACKET_SIZE * 5644) # ~1MB default
|
|
|
|
# Track timers for proper cleanup
|
|
self.stopping = False
|
|
self.fill_timers = []
|
|
self.chunk_available = gevent.event.Event()
|
|
|
|
def add_chunk(self, chunk):
|
|
"""Add data with optimized Redis storage and TS packet alignment"""
|
|
if not chunk:
|
|
return False
|
|
|
|
try:
|
|
# Accumulate partial packets between chunks
|
|
if not hasattr(self, '_partial_packet'):
|
|
self._partial_packet = bytearray()
|
|
|
|
# Combine with any previous partial packet
|
|
combined_data = bytearray(self._partial_packet) + bytearray(chunk)
|
|
|
|
# Calculate complete packets
|
|
complete_packets_size = (len(combined_data) // self.TS_PACKET_SIZE) * self.TS_PACKET_SIZE
|
|
|
|
if complete_packets_size == 0:
|
|
# Not enough data for a complete packet
|
|
self._partial_packet = combined_data
|
|
return True
|
|
|
|
# Split into complete packets and remainder
|
|
complete_packets = combined_data[:complete_packets_size]
|
|
self._partial_packet = combined_data[complete_packets_size:]
|
|
|
|
# Add completed packets to write buffer
|
|
self._write_buffer.extend(complete_packets)
|
|
|
|
# Only write to Redis when we have enough data for an optimized chunk
|
|
writes_done = 0
|
|
with self.lock:
|
|
while len(self._write_buffer) >= self.target_chunk_size:
|
|
# Extract a full chunk
|
|
chunk_data = self._write_buffer[:self.target_chunk_size]
|
|
self._write_buffer = self._write_buffer[self.target_chunk_size:]
|
|
|
|
# Write optimized chunk to Redis
|
|
if self.redis_client:
|
|
chunk_index = self.redis_client.incr(self.buffer_index_key)
|
|
chunk_key = RedisKeys.buffer_chunk(self.channel_id, chunk_index)
|
|
self.redis_client.setex(chunk_key, self.chunk_ttl, bytes(chunk_data))
|
|
|
|
# Update local tracking
|
|
self.index = chunk_index
|
|
writes_done += 1
|
|
|
|
if writes_done > 0:
|
|
logger.debug(f"Added {writes_done} chunks ({self.target_chunk_size} bytes each) to Redis for channel {self.channel_id} at index {self.index}")
|
|
|
|
self.chunk_available.set() # Signal that new data is available
|
|
self.chunk_available.clear() # Reset for next notification
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding chunk to buffer: {e}")
|
|
return False
|
|
|
|
def get_chunks(self, start_index=None):
|
|
"""Get chunks from the buffer with detailed logging"""
|
|
try:
|
|
request_id = f"req_{random.randint(1000, 9999)}"
|
|
logger.debug(f"[{request_id}] get_chunks called with start_index={start_index}")
|
|
|
|
if not self.redis_client:
|
|
logger.error("Redis not available, cannot retrieve chunks")
|
|
return []
|
|
|
|
# If no start_index provided, use most recent chunks
|
|
if start_index is None:
|
|
start_index = max(0, self.index - 10) # Start closer to current position
|
|
logger.debug(f"[{request_id}] No start_index provided, using {start_index}")
|
|
|
|
# Get current index from Redis
|
|
current_index = int(self.redis_client.get(self.buffer_index_key) or 0)
|
|
|
|
# Calculate range of chunks to retrieve
|
|
start_id = start_index + 1
|
|
chunks_behind = current_index - start_id
|
|
|
|
# Adaptive chunk retrieval based on how far behind
|
|
if chunks_behind > 100:
|
|
fetch_count = 15
|
|
logger.debug(f"[{request_id}] Client very behind ({chunks_behind} chunks), fetching {fetch_count}")
|
|
elif chunks_behind > 50:
|
|
fetch_count = 10
|
|
logger.debug(f"[{request_id}] Client moderately behind ({chunks_behind} chunks), fetching {fetch_count}")
|
|
elif chunks_behind > 20:
|
|
fetch_count = 5
|
|
logger.debug(f"[{request_id}] Client slightly behind ({chunks_behind} chunks), fetching {fetch_count}")
|
|
else:
|
|
fetch_count = 3
|
|
logger.debug(f"[{request_id}] Client up-to-date (only {chunks_behind} chunks behind), fetching {fetch_count}")
|
|
|
|
end_id = min(current_index + 1, start_id + fetch_count)
|
|
|
|
if start_id >= end_id:
|
|
logger.debug(f"[{request_id}] No new chunks to fetch (start_id={start_id}, end_id={end_id})")
|
|
return []
|
|
|
|
# Log the range we're retrieving
|
|
logger.debug(f"[{request_id}] Retrieving chunks {start_id} to {end_id-1} (total: {end_id-start_id})")
|
|
|
|
# Directly fetch from Redis using pipeline for efficiency
|
|
pipe = self.redis_client.pipeline()
|
|
for idx in range(start_id, end_id):
|
|
chunk_key = RedisKeys.buffer_chunk(self.channel_id, idx)
|
|
pipe.get(chunk_key)
|
|
|
|
results = pipe.execute()
|
|
|
|
# Process results
|
|
chunks = [result for result in results if result is not None]
|
|
|
|
# Count non-None results
|
|
found_chunks = len(chunks)
|
|
missing_chunks = len(results) - found_chunks
|
|
|
|
if missing_chunks > 0:
|
|
logger.debug(f"[{request_id}] Missing {missing_chunks}/{len(results)} chunks in Redis")
|
|
|
|
# Update local tracking
|
|
if chunks:
|
|
self.index = end_id - 1
|
|
|
|
# Final log message
|
|
chunk_sizes = [len(c) for c in chunks]
|
|
total_bytes = sum(chunk_sizes) if chunks else 0
|
|
logger.debug(f"[{request_id}] Returning {len(chunks)} chunks ({total_bytes} bytes)")
|
|
|
|
return chunks
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting chunks from buffer: {e}", exc_info=True)
|
|
return []
|
|
|
|
def get_chunks_exact(self, start_index, count):
|
|
"""Get exactly the requested number of chunks from given index"""
|
|
try:
|
|
if not self.redis_client:
|
|
logger.error("Redis not available, cannot retrieve chunks")
|
|
return []
|
|
|
|
# Calculate range to retrieve
|
|
start_id = start_index + 1
|
|
end_id = start_id + count
|
|
|
|
# Get current buffer position
|
|
current_index = int(self.redis_client.get(self.buffer_index_key) or 0)
|
|
|
|
# If requesting beyond current buffer, return what we have
|
|
if start_id > current_index:
|
|
return []
|
|
|
|
# Cap end at current buffer position
|
|
end_id = min(end_id, current_index + 1)
|
|
|
|
# Directly fetch from Redis using pipeline
|
|
pipe = self.redis_client.pipeline()
|
|
for idx in range(start_id, end_id):
|
|
chunk_key = RedisKeys.buffer_chunk(self.channel_id, idx)
|
|
pipe.get(chunk_key)
|
|
|
|
results = pipe.execute()
|
|
|
|
# Filter out None results
|
|
chunks = [result for result in results if result is not None]
|
|
|
|
# Update local index if needed
|
|
if chunks and start_id + len(chunks) - 1 > self.index:
|
|
self.index = start_id + len(chunks) - 1
|
|
|
|
return chunks
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting exact chunks: {e}", exc_info=True)
|
|
return []
|
|
|
|
def stop(self):
|
|
"""Stop the buffer and cancel all timers"""
|
|
# Set stopping flag first to prevent new timer creation
|
|
self.stopping = True
|
|
|
|
# Cancel all pending timers
|
|
timers_cancelled = 0
|
|
for timer in list(self.fill_timers):
|
|
try:
|
|
if timer and not timer.dead: # Changed from timer.is_alive()
|
|
timer.kill() # Changed from timer.cancel()
|
|
timers_cancelled += 1
|
|
except Exception as e:
|
|
logger.error(f"Error canceling timer: {e}")
|
|
|
|
if timers_cancelled:
|
|
logger.info(f"Cancelled {timers_cancelled} buffer timers for channel {self.channel_id}")
|
|
|
|
# Clear timer list
|
|
self.fill_timers.clear()
|
|
|
|
try:
|
|
# Flush any remaining data in the write buffer
|
|
if hasattr(self, '_write_buffer') and len(self._write_buffer) > 0:
|
|
# Ensure remaining data is aligned to TS packets
|
|
complete_size = (len(self._write_buffer) // 188) * 188
|
|
|
|
if complete_size > 0:
|
|
final_chunk = self._write_buffer[:complete_size]
|
|
|
|
# Write final chunk to Redis
|
|
with self.lock:
|
|
if self.redis_client:
|
|
try:
|
|
chunk_index = self.redis_client.incr(self.buffer_index_key)
|
|
chunk_key = f"{self.buffer_prefix}{chunk_index}"
|
|
self.redis_client.setex(chunk_key, self.chunk_ttl, bytes(final_chunk))
|
|
self.index = chunk_index
|
|
logger.info(f"Flushed final chunk of {len(final_chunk)} bytes to Redis")
|
|
except Exception as e:
|
|
logger.error(f"Error flushing final chunk: {e}")
|
|
|
|
# Clear buffers
|
|
self._write_buffer = bytearray()
|
|
if hasattr(self, '_partial_packet'):
|
|
self._partial_packet = bytearray()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during buffer stop: {e}")
|
|
|
|
def get_optimized_client_data(self, client_index):
|
|
"""Get optimal amount of data for client streaming based on position and target size"""
|
|
# Define limits
|
|
MIN_CHUNKS = 3 # Minimum chunks to read for efficiency
|
|
MAX_CHUNKS = 20 # Safety limit to prevent memory spikes
|
|
TARGET_SIZE = 1024 * 1024 # Target ~1MB per response (typical media buffer)
|
|
MAX_SIZE = 2 * 1024 * 1024 # Hard cap at 2MB
|
|
|
|
# Calculate how far behind we are
|
|
chunks_behind = self.index - client_index
|
|
|
|
# Determine optimal chunk count
|
|
if chunks_behind <= MIN_CHUNKS:
|
|
# Not much data, retrieve what's available
|
|
chunk_count = max(1, chunks_behind)
|
|
elif chunks_behind <= MAX_CHUNKS:
|
|
# Reasonable amount behind, catch up completely
|
|
chunk_count = chunks_behind
|
|
else:
|
|
# Way behind, retrieve MAX_CHUNKS to avoid memory pressure
|
|
chunk_count = MAX_CHUNKS
|
|
|
|
# Retrieve chunks
|
|
chunks = self.get_chunks_exact(client_index, chunk_count)
|
|
|
|
# Check if we got significantly fewer chunks than expected (likely due to expiration)
|
|
# Only check if we expected multiple chunks and got none or very few
|
|
if chunk_count > 3 and len(chunks) == 0 and chunks_behind > 10:
|
|
# Chunks are missing - likely expired from Redis
|
|
# Return empty list to signal client should skip forward
|
|
logger.debug(f"Chunks missing for client at index {client_index}, buffer at {self.index} ({chunks_behind} behind)")
|
|
return [], client_index
|
|
|
|
# Check total size
|
|
total_size = sum(len(c) for c in chunks)
|
|
|
|
# If we're under target and have more chunks available, get more
|
|
if total_size < TARGET_SIZE and chunks_behind > chunk_count:
|
|
# Calculate how many more chunks we can get
|
|
additional = min(MAX_CHUNKS - chunk_count, chunks_behind - chunk_count)
|
|
more_chunks = self.get_chunks_exact(client_index + chunk_count, additional)
|
|
|
|
# Check if adding more would exceed MAX_SIZE
|
|
additional_size = sum(len(c) for c in more_chunks)
|
|
if total_size + additional_size <= MAX_SIZE:
|
|
chunks.extend(more_chunks)
|
|
chunk_count += len(more_chunks) # Fixed: count actual additional chunks retrieved
|
|
|
|
return chunks, client_index + chunk_count
|
|
|
|
# Add a new method to safely create timers
|
|
def schedule_timer(self, delay, callback, *args, **kwargs):
|
|
"""Schedule a timer and track it for proper cleanup"""
|
|
if self.stopping:
|
|
return None
|
|
|
|
# Replace threading.Timer with gevent.spawn_later for better compatibility
|
|
timer = gevent.spawn_later(delay, callback, *args, **kwargs)
|
|
self.fill_timers.append(timer)
|
|
return timer
|