More refractoring and slight modifications.

This commit is contained in:
SergeantPanda 2025-03-15 14:35:34 -05:00
parent 0be133d341
commit 8f8b4ef371
3 changed files with 123 additions and 100 deletions

View file

@ -1,7 +1,7 @@
"""Shared configuration between proxy types"""
class BaseConfig:
DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20'
DEFAULT_USER_AGENT = 'VLC/3.0.20 LibVLC/3.0.20' # Will only be used if connection to settings fail
CHUNK_SIZE = 8192
CLIENT_POLL_INTERVAL = 0.1
MAX_RETRIES = 3

View file

@ -69,6 +69,9 @@ class StreamManager:
def run(self):
"""Main execution loop using HTTP streaming with improved connection handling"""
# Add a stop flag to the class properties
self.stop_requested = False
try:
# Start health monitor thread
health_thread = threading.Thread(target=self._monitor_health, daemon=True)
@ -78,6 +81,8 @@ class StreamManager:
while self.running:
if len(self.transcode_cmd) > 0:
# Start command process for transcoding
logger.debug(f"Starting transcode process: {self.transcode_cmd}")
self.transcode_process = subprocess.Popen(
self.transcode_cmd,
stdout=subprocess.PIPE,
@ -101,6 +106,8 @@ class StreamManager:
time.sleep(0.1)
else:
try:
# Using direct HTTP streaming
logger.debug(f"Using TS Proxy to connect to stream: {self.url}")
# Create new session for each connection attempt
session = self._create_session()
self.current_session = session
@ -125,7 +132,9 @@ class StreamManager:
try:
chunk_count = 0
for chunk in response.iter_content(chunk_size=self.chunk_size):
if not self.running:
# Check if we've been asked to stop
if self.stop_requested:
logger.info(f"Stream loop for channel {self.channel_id} stopping due to request")
break
if chunk:
@ -140,7 +149,15 @@ class StreamManager:
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data"
self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60)
except AttributeError as e:
except (AttributeError, ConnectionError) as e:
if self.stop_requested:
# This is expected during shutdown, just log at debug level
logger.debug(f"Expected connection error during shutdown: {e}")
else:
# Unexpected error during normal operation
logger.error(f"Unexpected stream error: {e}")
# Handle the error appropriately
except Exception as e:
# Handle the specific 'NoneType' object has no attribute 'read' error
if "'NoneType' object has no attribute 'read'" in str(e):
logger.warning(f"Connection closed by server (read {chunk_count} chunks before disconnect)")
@ -215,22 +232,26 @@ class StreamManager:
logger.info("Stream manager stopped")
def stop(self):
"""Stop the stream manager and clean up resources"""
self.running = False
if self.current_response:
"""Stop this stream"""
# Set the flag first
self.stop_requested = True
# Close any active response connection
if hasattr(self, 'current_response') and self.current_response: # CORRECT NAME
try:
self.current_response.close()
except:
self.current_response.close() # CORRECT NAME
except Exception:
pass
if self.current_session:
# Also close the session
if hasattr(self, 'current_session') and self.current_session:
try:
self.current_session.close()
except:
except Exception:
pass
logger.info("Stream manager resources released")
# Set running to false to ensure thread exits
self.running = False
def update_url(self, new_url):
"""Update stream URL and reconnect with HTTP streaming approach"""

View file

@ -21,7 +21,7 @@ logger = logging.getLogger("ts_proxy")
@require_GET
def stream_ts(request, channel_id):
"""Stream TS data to client with integrated channel initialization"""
"""Stream TS data to client with immediate response and keep-alive packets during initialization"""
client_user_agent = None
logger.info(f"Fetching channel ID {channel_id}")
channel = get_object_or_404(Channel, pk=channel_id)
@ -38,9 +38,11 @@ def stream_ts(request, channel_id):
logger.debug(f"[{client_id}] Client connected with user agent: {client_user_agent}")
break
# Check if channel exists or initialize it
# Start initialization if needed
channel_initializing = False
if not proxy_server.check_if_channel_exists(channel_id):
logger.info(f"[{client_id}] Channel {channel_id} needs initialization")
# Initialize the channel (but don't wait for completion)
logger.info(f"[{client_id}] Starting channel {channel_id} initialization")
# Get stream details from channel model
stream_id, profile_id = channel.get_stream()
@ -99,44 +101,11 @@ def stream_ts(request, channel_id):
time.sleep(0.1)
logger.info(f"[{client_id}] Successfully initialized channel {channel_id}")
# Wait for channel to become ready if it's initializing
if proxy_server.redis_client:
wait_start = time.time()
max_wait = getattr(Config, 'CLIENT_WAIT_TIMEOUT', 30) # Maximum wait time in seconds
# Check channel state
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
waiting = True
while waiting and time.time() - wait_start < max_wait:
metadata = proxy_server.redis_client.hgetall(metadata_key)
if not metadata or b'state' not in metadata:
logger.warning(f"[{client_id}] Channel {channel_id} metadata missing")
break
state = metadata[b'state'].decode('utf-8')
# If channel is ready for clients, continue
if state in ['waiting_for_clients', 'active']:
logger.info(f"[{client_id}] Channel {channel_id} ready (state={state}), proceeding with connection")
waiting = False
elif state in ['initializing', 'connecting']:
# Channel is still initializing or connecting, wait a bit longer
elapsed = time.time() - wait_start
logger.info(f"[{client_id}] Waiting for channel {channel_id} to become ready ({elapsed:.1f}s), current state: {state}")
time.sleep(0.5) # Wait 500ms before checking again
else:
# Unknown or error state
logger.warning(f"[{client_id}] Channel {channel_id} in unexpected state: {state}")
break
# Check if we timed out waiting
if waiting and time.time() - wait_start >= max_wait:
logger.warning(f"[{client_id}] Timeout waiting for channel {channel_id} to become ready")
return JsonResponse({'error': 'Timeout waiting for channel to initialize'}, status=503)
# CRITICAL FIX: Ensure local resources are properly initialized before streaming
channel_initializing = True
logger.info(f"[{client_id}] Channel {channel_id} initialization started")
# Register client - can do this regardless of initialization state
# Create local resources if needed
if channel_id not in proxy_server.stream_buffers or channel_id not in proxy_server.client_managers:
logger.warning(f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now")
@ -161,62 +130,94 @@ def stream_ts(request, channel_id):
return JsonResponse({'error': 'Failed to initialize channel locally'}, status=500)
logger.info(f"[{client_id}] Successfully initialized channel {channel_id} locally")
# Get stream buffer and client manager
# Register client
buffer = proxy_server.stream_buffers[channel_id]
client_manager = proxy_server.client_managers[channel_id]
client_manager.add_client(client_id, client_user_agent)
logger.info(f"[{client_id}] Client registered with channel {channel_id}")
# Start stream response
# Define a single generate function
def generate():
stream_start_time = time.time()
bytes_sent = 0
chunks_sent = 0
# Keep track of initialization state
initialization_start = time.time()
max_init_wait = getattr(Config, 'CLIENT_WAIT_TIMEOUT', 30)
channel_ready = not channel_initializing
keepalive_interval = 0.5
last_keepalive = 0
try:
logger.info(f"[{client_id}] New client connected to channel {channel_id} with user agent: {client_user_agent}")
# Add client to manager with user agent
client_manager = proxy_server.client_managers[channel_id]
client_count = client_manager.add_client(client_id, client_user_agent)
# If this is the first client, try to acquire ownership
if client_count == 1 and not proxy_server.am_i_owner(channel_id):
if proxy_server.try_acquire_ownership(channel_id):
logger.info(f"[{client_id}] First client, acquiring channel ownership")
# Get channel metadata from Redis
logger.info(f"[{client_id}] Stream generator started, channel_ready={channel_ready}")
# Wait for initialization to complete if needed
if not channel_ready:
# While init is happening, send keepalive packets
while time.time() - initialization_start < max_init_wait:
# Check if initialization has completed
if proxy_server.redis_client:
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
url_bytes = proxy_server.redis_client.hget(metadata_key, "url")
ua_bytes = proxy_server.redis_client.hget(metadata_key, "user_agent")
url = url_bytes.decode('utf-8') if url_bytes else None
user_agent = ua_bytes.decode('utf-8') if ua_bytes else None
if url:
# Create and start stream connection
from .stream_manager import StreamManager
logger.info(f"[{client_id}] Creating stream connection for URL: {url}")
buffer = proxy_server.stream_buffers[channel_id]
stream_manager = StreamManager(url, buffer, user_agent=user_agent)
proxy_server.stream_managers[channel_id] = stream_manager
thread = threading.Thread(target=stream_manager.run, daemon=True)
thread.name = f"stream-{channel_id}"
thread.start()
# Wait briefly for connection
wait_start = time.time()
while not stream_manager.connected:
if time.time() - wait_start > Config.CONNECTION_TIMEOUT:
break
time.sleep(0.1)
metadata = proxy_server.redis_client.hgetall(metadata_key)
if metadata and b'state' in metadata:
state = metadata[b'state'].decode('utf-8')
if state in ['waiting_for_clients', 'active']:
logger.info(f"[{client_id}] Channel {channel_id} now ready (state={state})")
channel_ready = True
break
elif state in ['error', 'stopped']:
error_message = metadata.get(b'error_message', b'Unknown error').decode('utf-8')
logger.error(f"[{client_id}] Channel {channel_id} in error state: {state}, message: {error_message}")
# Send error in a comment TS packet before giving up
error_packet = bytearray(188)
error_packet[0] = 0x47 # Sync byte
error_packet[1] = 0x1F # PID high bits
error_packet[2] = 0xFF # PID low bits
error_msg = f"Error: {error_message}".encode('utf-8')
error_packet[4:4+min(len(error_msg), 180)] = error_msg[:180]
yield bytes(error_packet)
return
else:
# Still initializing - send keepalive if needed
if time.time() - last_keepalive >= keepalive_interval:
keepalive_packet = bytearray(188)
keepalive_packet[0] = 0x47 # Sync byte
keepalive_packet[1] = 0x1F # PID high bits (null packet)
keepalive_packet[2] = 0xFF # PID low bits (null packet)
# Add status info in packet payload (will be ignored by players)
status_msg = f"Initializing: {state}".encode('utf-8')
keepalive_packet[4:4+min(len(status_msg), 180)] = status_msg[:180]
logger.debug(f"[{client_id}] Sending keepalive packet during initialization, state={state}")
yield bytes(keepalive_packet)
bytes_sent += len(keepalive_packet)
last_keepalive = time.time()
# Wait a bit before checking again (don't send too many keepalives)
time.sleep(0.1)
# Check if we timed out waiting
if not channel_ready:
logger.warning(f"[{client_id}] Timed out waiting for initialization")
error_packet = bytearray(188)
error_packet[0] = 0x47 # Sync byte
error_packet[1] = 0x1F # PID high bits
error_packet[2] = 0xFF # PID low bits
error_msg = f"Error: Initialization timeout".encode('utf-8')
error_packet[4:4+min(len(error_msg), 180)] = error_msg[:180]
yield bytes(error_packet)
return
# Channel is now ready - original streaming code goes here
logger.info(f"[{client_id}] Channel {channel_id} ready, starting normal streaming")
# Reset start time for real streaming
stream_start_time = time.time()
# Get buffer - stream manager may not exist in this worker
buffer = proxy_server.stream_buffers.get(channel_id)
stream_manager = proxy_server.stream_managers.get(channel_id)
@ -384,12 +385,13 @@ def stream_ts(request, channel_id):
shutdown_thread.daemon = True
shutdown_thread.start()
# IMPORTANT: Return the StreamingHttpResponse from the main function
response = StreamingHttpResponse(
streaming_content=generate(),
content_type='video/mp2t'
)
response['Cache-Control'] = 'no-cache'
return response
return response # This now properly returns from stream_ts
except Exception as e:
logger.error(f"Error in stream_ts: {e}", exc_info=True)