Dispatcharr/apps/proxy/ts_proxy/server.py
2025-03-11 13:29:01 -05:00

1583 lines
73 KiB
Python

"""
Transport Stream (TS) Proxy Server
Handles live TS stream proxying with support for:
- Stream switching
- Buffer management
- Multiple client connections
- Connection state tracking
"""
import requests
import threading
import logging
import socket
import random
from collections import deque
import time
import sys
from typing import Optional, Set, Deque, Dict
import json
from apps.proxy.config import TSConfig as Config
# Configure root logger for this module
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - TS_PROXY - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
# Force immediate output
print("TS PROXY SERVER MODULE LOADED", file=sys.stderr)
class StreamManager:
"""Manages a connection to a TS stream with continuity tracking"""
def __init__(self, url, buffer, user_agent=None):
# Existing initialization code
self.url = url
self.buffer = buffer
self.running = True
self.connected = False
self.socket = None
self.ready_event = threading.Event()
self.retry_count = 0
self.max_retries = Config.MAX_RETRIES
# User agent for connection
self.user_agent = user_agent or Config.DEFAULT_USER_AGENT
# TS packet handling
self.TS_PACKET_SIZE = 188
self.recv_buffer = bytearray()
self.sync_found = False
self.continuity_counters = {}
# Stream health monitoring
self.last_data_time = time.time()
self.healthy = True
self.health_check_interval = Config.HEALTH_CHECK_INTERVAL
# Buffer management
self._last_buffer_check = time.time()
logging.info(f"Initialized stream manager for channel {buffer.channel_id}")
def _create_session(self) -> requests.Session:
"""Create and configure requests session"""
session = requests.Session()
session.headers.update({
'User-Agent': self.user_agent,
'Connection': 'keep-alive'
})
return session
def update_url(self, new_url: str) -> bool:
"""Update stream URL and reconnect"""
if new_url == self.url:
return False
logging.info(f"Switching stream URL from {self.url} to {new_url}")
self.url = new_url
self.connected = False
self._close_socket() # Close existing connection
# Signal health monitor to reconnect immediately
self.last_data_time = 0
return True
def should_retry(self) -> bool:
"""Check if connection retry is allowed"""
return self.retry_count < self.max_retries
def stop(self) -> None:
"""Stop the stream manager and close all resources"""
self.running = False
self._close_socket()
logging.info("Stream manager resources released")
def _process_complete_packets(self):
"""Process TS packets with improved resync capability"""
try:
# Enhanced sync byte detection with re-sync capability
if (not self.sync_found or
(len(self.recv_buffer) >= 188 and self.recv_buffer[0] != 0x47)):
# Need to find sync pattern if we haven't found it yet or lost sync
if len(self.recv_buffer) >= 376: # Need at least 2 packet lengths
sync_found = False
# Look for at least two sync bytes (0x47) at 188-byte intervals
for i in range(min(188, len(self.recv_buffer) - 188)):
if (self.recv_buffer[i] == 0x47 and
self.recv_buffer[i + 188] == 0x47):
# If already had sync but lost it, log the issue
if self.sync_found:
logging.warning(f"Re-syncing TS stream at position {i} (lost sync)")
else:
logging.debug(f"TS sync found at position {i}")
# Trim buffer to start at first sync byte
self.recv_buffer = self.recv_buffer[i:]
self.sync_found = True
sync_found = True
break
# If we couldn't find sync in this buffer, discard partial data
if not sync_found:
logging.warning(f"Failed to find sync pattern - discarding {len(self.recv_buffer) - 188} bytes")
if len(self.recv_buffer) > 188:
self.recv_buffer = self.recv_buffer[-188:] # Keep last chunk for next attempt
return False
# If we don't have a complete packet yet, wait for more data
if len(self.recv_buffer) < 188:
return False
# Calculate how many complete packets we have
packet_count = len(self.recv_buffer) // 188
if packet_count == 0:
return False
# Verify all packets have sync bytes
all_synced = True
for i in range(0, packet_count):
if self.recv_buffer[i * 188] != 0x47:
all_synced = False
break
# If not all packets are synced, re-scan for sync
if not all_synced:
self.sync_found = False # Force re-sync on next call
return False
# Extract complete packets
packets = self.recv_buffer[:packet_count * 188]
# Keep remaining data in buffer
self.recv_buffer = self.recv_buffer[packet_count * 188:]
# Send packets to buffer
if packets:
# Log first and last sync byte to validate alignment
first_sync = packets[0] if len(packets) > 0 else None
last_sync = packets[188 * (packet_count - 1)] if packet_count > 0 else None
if first_sync != 0x47 or last_sync != 0x47:
logging.warning(f"TS packet alignment issue: first_sync=0x{first_sync:02x}, last_sync=0x{last_sync:02x}")
# Don't process misaligned packets
return False
before_index = self.buffer.index
success = self.buffer.add_chunk(bytes(packets))
after_index = self.buffer.index
# Log successful write
if success:
logging.debug(f"Added chunk: {packet_count} packets, buffer index {before_index}{after_index}")
else:
logging.warning("Failed to add chunk to buffer")
# If successful, update last data timestamp in Redis
if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data"
self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) # 1 minute expiry
return success
return False
except Exception as e:
logging.error(f"Error processing TS packets: {e}", exc_info=True)
self.sync_found = False # Reset sync state on error
return False
def _process_ts_data(self, chunk):
"""Process received data and add to buffer"""
if not chunk:
return False
# Add to existing buffer
self.recv_buffer.extend(chunk)
# Process complete packets now
return self._process_complete_packets()
def run(self):
"""Main execution loop with stream health monitoring"""
try:
# Check if buffer already has data - in which case we might not need to connect
if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
buffer_index = self.buffer.redis_client.get(f"ts_proxy:buffer:{self.buffer.channel_id}:index")
if buffer_index and int(buffer_index) > 0:
# There's already data in Redis, check if it's recent (within last 10 seconds)
last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data"
last_data = self.buffer.redis_client.get(last_data_key)
if last_data:
last_time = float(last_data)
if time.time() - last_time < 10:
logging.info(f"Recent data found in Redis, no need to reconnect")
self.connected = True
self.healthy = True
return
# Start health monitor thread
health_thread = threading.Thread(target=self._monitor_health, daemon=True)
health_thread.start()
current_response = None # Track the current response object
current_session = None # Track the current session
# Establish network connection
import socket
import requests
logging.info(f"Starting stream for URL: {self.url}")
while self.running:
try:
# Parse URL
if self.url.startswith("http"):
# HTTP connection
session = self._create_session()
current_session = session
try:
# Create an initial connection to get socket
response = session.get(self.url, stream=True)
current_response = response
if response.status_code == 200:
self.connected = True
self.socket = response.raw._fp.fp.raw
self.healthy = True
logging.info("Successfully connected to stream source")
# Connection successful - START GRACE PERIOD HERE
self._set_waiting_for_clients()
# Main fetch loop
while self.running and self.connected:
if self.fetch_chunk():
self.last_data_time = time.time()
else:
if not self.running:
break
time.sleep(0.1)
else:
logging.error(f"Failed to connect to stream: HTTP {response.status_code}")
time.sleep(2)
finally:
# Properly close response before session
if current_response:
try:
# Close the response explicitly to avoid the urllib3 error
current_response.close()
except Exception as e:
logging.debug(f"Error closing response: {e}")
current_response = None
if current_session:
try:
current_session.close()
except Exception as e:
logging.debug(f"Error closing session: {e}")
current_session = None
else:
logging.error(f"Unsupported URL scheme: {self.url}")
# Connection retry logic
if self.running and not self.connected:
self.retry_count += 1
if self.retry_count > self.max_retries:
logging.error(f"Maximum retry attempts ({self.max_retries}) exceeded")
break
timeout = min(2 ** self.retry_count, 30)
logging.info(f"Reconnecting in {timeout} seconds... (attempt {self.retry_count})")
time.sleep(timeout)
except Exception as e:
logging.error(f"Connection error: {e}")
self._close_socket()
time.sleep(5)
except Exception as e:
logging.error(f"Stream error: {e}")
self._close_socket()
finally:
# Final cleanup
self._close_socket()
logging.info("Stream manager stopped")
def _monitor_health(self):
"""Monitor stream health and attempt recovery if needed"""
while self.running:
try:
now = time.time()
if now - self.last_data_time > 10 and self.connected:
# No data for 10 seconds, mark as unhealthy
if self.healthy:
logging.warning("Stream health check: No data received for 10+ seconds")
self.healthy = False
# After 30 seconds with no data, force reconnection
if now - self.last_data_time > 30:
logging.warning("Stream appears dead, forcing reconnection")
self._close_socket()
self.connected = False
self.last_data_time = time.time() # Reset timer for the reconnect
elif self.connected and not self.healthy:
# Stream is receiving data again after being unhealthy
logging.info("Stream health restored, receiving data again")
self.healthy = True
except Exception as e:
logging.error(f"Error in health monitor: {e}")
time.sleep(self.health_check_interval)
def _close_socket(self):
"""Close the socket connection safely"""
if self.socket:
try:
self.socket.close()
except Exception as e:
logging.debug(f"Error closing socket: {e}")
pass
self.socket = None
self.connected = False
def fetch_chunk(self):
"""Fetch data from socket with improved buffer management"""
if not self.connected or not self.socket:
return False
try:
# SocketIO objects use read instead of recv and don't support settimeout
try:
# Try to read data chunk - use a multiple of TS packet size
if hasattr(self.socket, 'recv'):
chunk = self.socket.recv(188 * 64) # Standard socket
else:
chunk = self.socket.read(188 * 64) # SocketIO object
except AttributeError:
# Fall back to read() if recv() isn't available
chunk = self.socket.read(188 * 64)
if not chunk:
# Connection closed by server
logging.warning("Server closed connection")
self._close_socket()
self.connected = False
return False
# Process this chunk
self._process_ts_data(chunk)
# Memory management - clear any internal buffers periodically
current_time = time.time()
if current_time - self._last_buffer_check > 60: # Check every minute
self._last_buffer_check = current_time
if len(self.recv_buffer) > 188 * 1024: # If buffer is extremely large
logging.warning(f"Receive buffer unusually large ({len(self.recv_buffer)} bytes), trimming")
# Keep only recent data, aligned to TS packet boundary
keep_size = 188 * 128 # Keep reasonable buffer
self.recv_buffer = self.recv_buffer[-keep_size:]
return True
except (socket.timeout, socket.error) as e:
# Socket error
logging.error(f"Socket error: {e}")
self._close_socket()
self.connected = False
return False
except Exception as e:
logging.error(f"Error in fetch_chunk: {e}")
return False
def _set_waiting_for_clients(self):
"""Set channel state to waiting for clients after successful connection"""
try:
if hasattr(self.buffer, 'channel_id') and hasattr(self.buffer, 'redis_client'):
channel_id = self.buffer.channel_id
redis_client = self.buffer.redis_client
if channel_id and redis_client:
# Set state to waiting
state_key = f"ts_proxy:channel:{channel_id}:state"
redis_client.set(state_key, "waiting_for_clients")
# Set time when connection became ready and waiting for clients
# RENAMED: grace_start → connection_ready_time
ready_key = f"ts_proxy:channel:{channel_id}:connection_ready_time"
redis_client.setex(ready_key, 120, str(time.time()))
# Get configured grace period or default
grace_period = getattr(Config, 'CHANNEL_INIT_GRACE_PERIOD', 20)
logging.info(f"Started initial connection grace period ({grace_period}s) for channel {channel_id}")
except Exception as e:
logging.error(f"Error setting waiting for clients state: {e}")
class StreamBuffer:
"""Manages stream data buffering using Redis for persistence"""
def __init__(self, channel_id=None, redis_client=None):
self.channel_id = channel_id
self.redis_client = redis_client
self.lock = threading.Lock()
self.index = 0
self.TS_PACKET_SIZE = 188
# STANDARDIZED KEYS: Move buffer keys under channel namespace
self.buffer_index_key = f"ts_proxy:channel:{channel_id}:buffer:index"
self.buffer_prefix = f"ts_proxy:channel:{channel_id}:buffer:chunk:"
self.chunk_ttl = getattr(Config, 'REDIS_CHUNK_TTL', 60)
# Initialize from Redis if available
if self.redis_client and channel_id:
try:
current_index = self.redis_client.get(self.buffer_index_key)
if current_index:
self.index = int(current_index)
logging.info(f"Initialized buffer from Redis with index {self.index}")
except Exception as e:
logging.error(f"Error initializing buffer from Redis: {e}")
def add_chunk(self, chunk):
"""Add a chunk to the buffer"""
if not chunk:
return False
try:
# Ensure chunk is properly aligned with TS packets
if len(chunk) % self.TS_PACKET_SIZE != 0:
logging.warning(f"Received non-aligned chunk of size {len(chunk)}")
aligned_size = (len(chunk) // self.TS_PACKET_SIZE) * self.TS_PACKET_SIZE
if (aligned_size == 0):
return False
chunk = chunk[:aligned_size]
with self.lock:
# Increment index atomically
if self.redis_client:
# Use Redis to store and track chunks
chunk_index = self.redis_client.incr(self.buffer_index_key)
chunk_key = f"{self.buffer_prefix}{chunk_index}"
self.redis_client.setex(chunk_key, self.chunk_ttl, chunk)
# Update local tracking of position only
self.index = chunk_index
return True
else:
# No Redis - can't function in multi-worker mode
logging.error("Redis not available, cannot store chunks")
return False
except Exception as e:
logging.error(f"Error adding chunk to buffer: {e}")
return False
def get_chunks(self, start_index=None):
"""Get chunks from the buffer with detailed logging"""
try:
request_id = f"req_{random.randint(1000, 9999)}"
logging.debug(f"[{request_id}] get_chunks called with start_index={start_index}")
if not self.redis_client:
logging.error("Redis not available, cannot retrieve chunks")
return []
# If no start_index provided, use most recent chunks
if start_index is None:
start_index = max(0, self.index - 10) # Start closer to current position
logging.debug(f"[{request_id}] No start_index provided, using {start_index}")
# Get current index from Redis
current_index = int(self.redis_client.get(self.buffer_index_key) or 0)
# Calculate range of chunks to retrieve
start_id = start_index + 1
chunks_behind = current_index - start_id
# Adaptive chunk retrieval based on how far behind
if chunks_behind > 100:
fetch_count = 15
logging.debug(f"[{request_id}] Client very behind ({chunks_behind} chunks), fetching {fetch_count}")
elif chunks_behind > 50:
fetch_count = 10
logging.debug(f"[{request_id}] Client moderately behind ({chunks_behind} chunks), fetching {fetch_count}")
elif chunks_behind > 20:
fetch_count = 5
logging.debug(f"[{request_id}] Client slightly behind ({chunks_behind} chunks), fetching {fetch_count}")
else:
fetch_count = 3
logging.debug(f"[{request_id}] Client up-to-date (only {chunks_behind} chunks behind), fetching {fetch_count}")
end_id = min(current_index + 1, start_id + fetch_count)
if start_id >= end_id:
logging.debug(f"[{request_id}] No new chunks to fetch (start_id={start_id}, end_id={end_id})")
return []
# Log the range we're retrieving
logging.debug(f"[{request_id}] Retrieving chunks {start_id} to {end_id-1} (total: {end_id-start_id})")
# Directly fetch from Redis using pipeline for efficiency
pipe = self.redis_client.pipeline()
for idx in range(start_id, end_id):
chunk_key = f"{self.buffer_prefix}{idx}"
pipe.get(chunk_key)
results = pipe.execute()
# Process results
chunks = [result for result in results if result is not None]
# Count non-None results
found_chunks = len(chunks)
missing_chunks = len(results) - found_chunks
if missing_chunks > 0:
logging.debug(f"[{request_id}] Missing {missing_chunks}/{len(results)} chunks in Redis")
# Update local tracking
if chunks:
self.index = end_id - 1
# Final log message
chunk_sizes = [len(c) for c in chunks]
total_bytes = sum(chunk_sizes) if chunks else 0
logging.debug(f"[{request_id}] Returning {len(chunks)} chunks ({total_bytes} bytes)")
return chunks
except Exception as e:
logging.error(f"Error getting chunks from buffer: {e}", exc_info=True)
return []
def get_chunks_exact(self, start_index, count):
"""Get exactly the requested number of chunks from given index"""
try:
if not self.redis_client:
logging.error("Redis not available, cannot retrieve chunks")
return []
# Calculate range to retrieve
start_id = start_index + 1
end_id = start_id + count
# Get current buffer position
current_index = int(self.redis_client.get(self.buffer_index_key) or 0)
# If requesting beyond current buffer, return what we have
if start_id > current_index:
return []
# Cap end at current buffer position
end_id = min(end_id, current_index + 1)
# Directly fetch from Redis using pipeline
pipe = self.redis_client.pipeline()
for idx in range(start_id, end_id):
chunk_key = f"{self.buffer_prefix}{idx}"
pipe.get(chunk_key)
results = pipe.execute()
# Filter out None results
chunks = [result for result in results if result is not None]
# Update local index if needed
if chunks and start_id + len(chunks) - 1 > self.index:
self.index = start_id + len(chunks) - 1
return chunks
except Exception as e:
logging.error(f"Error getting exact chunks: {e}", exc_info=True)
return []
class ClientManager:
"""Manages connected clients for a channel with cross-worker visibility"""
def __init__(self, channel_id, redis_client=None, worker_id=None):
self.channel_id = channel_id
self.redis_client = redis_client
self.worker_id = worker_id
self.clients = set()
self.lock = threading.Lock()
self.last_active_time = time.time()
# STANDARDIZED KEYS: Move client set under channel namespace
self.client_set_key = f"ts_proxy:channel:{channel_id}:clients"
self.client_ttl = getattr(Config, 'CLIENT_RECORD_TTL', 60)
self.heartbeat_interval = getattr(Config, 'CLIENT_HEARTBEAT_INTERVAL', 10)
self.last_heartbeat_time = {}
# Start heartbeat thread for local clients
self._start_heartbeat_thread()
def _start_heartbeat_thread(self):
"""Start thread to regularly refresh client presence in Redis"""
def heartbeat_task():
while True:
try:
# Wait for the interval
time.sleep(self.heartbeat_interval)
# Send heartbeat for all local clients
with self.lock:
if not self.clients or not self.redis_client:
continue
# Use pipeline for efficiency
pipe = self.redis_client.pipeline()
current_time = time.time()
# For each client, update its TTL and timestamp
for client_id in self.clients:
# Skip if we just sent a heartbeat recently
if client_id in self.last_heartbeat_time:
time_since_last = current_time - self.last_heartbeat_time[client_id]
if time_since_last < self.heartbeat_interval * 0.8:
continue
# FIXED: Update client hash instead of separate activity key
client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}"
# Only update the last_active field in the hash, preserving other fields
pipe.hset(client_key, "last_active", str(current_time))
pipe.expire(client_key, self.client_ttl) # Refresh TTL
# Keep client in the set with TTL
pipe.sadd(self.client_set_key, client_id)
pipe.expire(self.client_set_key, self.client_ttl)
# Track last heartbeat locally
self.last_heartbeat_time[client_id] = current_time
# Execute all commands atomically
pipe.execute()
# Notify channel owner of client activity
self._notify_owner_of_activity()
except Exception as e:
logging.error(f"Error in client heartbeat thread: {e}")
thread = threading.Thread(target=heartbeat_task, daemon=True)
thread.name = f"client-heartbeat-{self.channel_id}"
thread.start()
logging.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)")
def _notify_owner_of_activity(self):
"""Notify channel owner that clients are active on this worker"""
if not self.redis_client or not self.clients:
return
try:
worker_id = self.worker_id or "unknown"
# STANDARDIZED KEY: Worker info under channel namespace
worker_key = f"ts_proxy:channel:{self.channel_id}:worker:{worker_id}"
self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients)))
# STANDARDIZED KEY: Activity timestamp under channel namespace
activity_key = f"ts_proxy:channel:{self.channel_id}:activity"
self.redis_client.setex(activity_key, self.client_ttl, str(time.time()))
except Exception as e:
logging.error(f"Error notifying owner of client activity: {e}")
def add_client(self, client_id, user_agent=None):
"""Add a client to this channel locally and in Redis"""
with self.lock:
self.clients.add(client_id)
self.last_active_time = time.time()
if self.redis_client:
current_time = str(time.time())
# Add to channel's client set
self.redis_client.sadd(self.client_set_key, client_id)
self.redis_client.expire(self.client_set_key, self.client_ttl)
# STANDARDIZED KEY: Individual client under channel namespace
client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}"
# Store client info as a hash with all info in one place
client_data = {
"last_active": current_time,
"worker_id": self.worker_id or "unknown",
"connect_time": current_time
}
# Add user agent if provided
if user_agent:
client_data["user_agent"] = user_agent
# Use HSET to store client data as a hash
self.redis_client.hset(client_key, mapping=client_data)
self.redis_client.expire(client_key, self.client_ttl)
# Clear any initialization timer
self.redis_client.delete(f"ts_proxy:channel:{self.channel_id}:init_time")
self._notify_owner_of_activity()
# Publish client connected event with user agent
event_data = {
"event": "client_connected",
"channel_id": self.channel_id,
"client_id": client_id,
"worker_id": self.worker_id or "unknown",
"timestamp": time.time()
}
if user_agent:
event_data["user_agent"] = user_agent
logging.debug(f"Storing user agent '{user_agent}' for client {client_id}")
else:
logging.debug(f"No user agent provided for client {client_id}")
self.redis_client.publish(
f"ts_proxy:events:{self.channel_id}",
json.dumps(event_data)
)
# Get total clients across all workers
total_clients = self.get_total_client_count()
logging.info(f"New client connected: {client_id} (local: {len(self.clients)}, total: {total_clients})")
self.last_heartbeat_time[client_id] = time.time()
return len(self.clients)
def remove_client(self, client_id):
"""Remove a client from this channel and Redis"""
with self.lock:
if client_id in self.clients:
self.clients.remove(client_id)
if client_id in self.last_heartbeat_time:
del self.last_heartbeat_time[client_id]
self.last_active_time = time.time()
if self.redis_client:
# Remove from channel's client set
self.redis_client.srem(self.client_set_key, client_id)
# STANDARDIZED KEY: Delete individual client keys
client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}"
self.redis_client.delete(client_key)
# Check if this was the last client
remaining = self.redis_client.scard(self.client_set_key) or 0
if remaining == 0:
logging.warning(f"Last client removed: {client_id} - channel may shut down soon")
# Trigger disconnect time tracking even if we're not the owner
disconnect_key = f"ts_proxy:channel:{self.channel_id}:last_client_disconnect_time"
self.redis_client.setex(disconnect_key, 60, str(time.time()))
self._notify_owner_of_activity()
# Publish client disconnected event
event_data = json.dumps({
"event": "client_disconnected",
"channel_id": self.channel_id,
"client_id": client_id,
"worker_id": self.worker_id or "unknown",
"timestamp": time.time(),
"remaining_clients": remaining
})
self.redis_client.publish(f"ts_proxy:events:{self.channel_id}", event_data)
total_clients = self.get_total_client_count()
logging.info(f"Client disconnected: {client_id} (local: {len(self.clients)}, total: {total_clients})")
return len(self.clients)
def get_client_count(self):
"""Get local client count"""
with self.lock:
return len(self.clients)
def get_total_client_count(self):
"""Get total client count across all workers"""
if not self.redis_client:
return len(self.clients)
try:
# Count members in the client set
return self.redis_client.scard(self.client_set_key) or 0
except Exception as e:
logging.error(f"Error getting total client count: {e}")
return len(self.clients) # Fall back to local count
def refresh_client_ttl(self):
"""Refresh TTL for active clients to prevent expiration"""
if not self.redis_client:
return
try:
# Refresh TTL for all clients belonging to this worker
for client_id in self.clients:
# STANDARDIZED: Use channel namespace for client keys
client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}"
self.redis_client.expire(client_key, self.client_ttl)
# Refresh TTL on the set itself
self.redis_client.expire(self.client_set_key, self.client_ttl)
except Exception as e:
logging.error(f"Error refreshing client TTL: {e}")
class StreamFetcher:
"""Handles stream data fetching"""
def __init__(self, manager: StreamManager, buffer: StreamBuffer):
self.manager = manager
self.buffer = buffer
def fetch_loop(self) -> None:
"""Main fetch loop for stream data"""
while self.manager.running:
try:
if not self._handle_connection():
continue
with self.manager.session.get(self.manager.url, stream=True) as response:
if response.status_code == 200:
self._handle_successful_connection()
self._process_stream(response)
except requests.exceptions.RequestException as e:
self._handle_connection_error(e)
def _handle_connection(self) -> bool:
"""Handle connection state and retries"""
if not self.manager.connected:
if not self.manager.should_retry():
logging.error(f"Failed to connect after {self.manager.max_retries} attempts")
return False
if not self.manager.running:
return False
self.manager.retry_count += 1
logging.info(f"Connecting to stream: {self.manager.url} "
f"(attempt {self.manager.retry_count}/{self.manager.max_retries})")
return True
def _handle_successful_connection(self) -> None:
"""Handle successful stream connection"""
if not self.manager.connected:
logging.info("Stream connected successfully")
self.manager.connected = True
self.manager.retry_count = 0
def _process_stream(self, response: requests.Response) -> None:
"""Process incoming stream data"""
for chunk in response.iter_content(chunk_size=Config.CHUNK_SIZE):
if not self.manager.running:
logging.info("Stream fetch stopped - shutting down")
return
if chunk:
if self.manager.ready_event.is_set():
logging.info("Stream switch in progress, closing connection")
self.manager.ready_event.clear()
break
with self.buffer.lock:
self.buffer.buffer.append(chunk)
self.buffer.index += 1
def _handle_connection_error(self, error: Exception) -> None:
"""Handle stream connection errors"""
logging.error(f"Stream connection error: {error}")
self.manager.connected = False
if not self.manager.running:
return
logging.info(f"Attempting to reconnect in {Config.RECONNECT_DELAY} seconds...")
if not wait_for_running(self.manager, Config.RECONNECT_DELAY):
return
def wait_for_running(manager: StreamManager, delay: float) -> bool:
"""Wait while checking manager running state"""
start = time.time()
while time.time() - start < delay:
if not manager.running:
return False
threading.Event().wait(0.1)
return True
class ProxyServer:
"""Manages TS proxy server instance with worker coordination"""
def __init__(self):
"""Initialize proxy server with worker identification"""
self.stream_managers = {}
self.stream_buffers = {}
self.client_managers = {}
# Generate a unique worker ID
import socket
import os
pid = os.getpid()
hostname = socket.gethostname()
self.worker_id = f"{hostname}:{pid}"
# Connect to Redis
self.redis_client = None
try:
import redis
from django.conf import settings
redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0')
self.redis_client = redis.from_url(redis_url)
logging.info(f"Connected to Redis at {redis_url}")
logging.info(f"Worker ID: {self.worker_id}")
except Exception as e:
self.redis_client = None
logging.error(f"Failed to connect to Redis: {e}")
# Start cleanup thread
self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60)
self._start_cleanup_thread()
# Start event listener for Redis pubsub messages
self._start_event_listener()
def _start_event_listener(self):
"""Listen for events from other workers"""
if not self.redis_client:
return
def event_listener():
try:
pubsub = self.redis_client.pubsub()
pubsub.psubscribe("ts_proxy:events:*")
logging.info("Started Redis event listener for client activity")
for message in pubsub.listen():
if message["type"] != "pmessage":
continue
try:
channel = message["channel"].decode("utf-8")
data = json.loads(message["data"].decode("utf-8"))
event_type = data.get("event")
channel_id = data.get("channel_id")
if channel_id and event_type:
# For owner, update client status immediately
if self.am_i_owner(channel_id):
if event_type == "client_connected":
logging.debug(f"Owner received client_connected event for channel {channel_id}")
# Reset any disconnect timer
# RENAMED: no_clients_since → last_client_disconnect_time
disconnect_key = f"ts_proxy:channel:{channel_id}:last_client_disconnect_time"
self.redis_client.delete(disconnect_key)
elif event_type == "client_disconnected":
logging.debug(f"Owner received client_disconnected event for channel {channel_id}")
# Check if any clients remain
if channel_id in self.client_managers:
# VERIFY REDIS CLIENT COUNT DIRECTLY
client_set_key = f"ts_proxy:channel:{channel_id}:clients"
total = self.redis_client.scard(client_set_key) or 0
if total == 0:
logging.debug(f"No clients left after disconnect event - stopping channel {channel_id}")
# Set the disconnect timer for other workers to see
disconnect_key = f"ts_proxy:channel:{channel_id}:last_client_disconnect_time"
self.redis_client.setex(disconnect_key, 60, str(time.time()))
# Get configured shutdown delay or default
shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0)
if shutdown_delay > 0:
logging.info(f"Waiting {shutdown_delay}s before stopping channel...")
time.sleep(shutdown_delay)
# Re-check client count before stopping
total = self.redis_client.scard(client_set_key) or 0
if total > 0:
logging.info(f"New clients connected during shutdown delay - aborting shutdown")
self.redis_client.delete(disconnect_key)
return
# Stop the channel directly
self.stop_channel(channel_id)
elif event_type == "stream_switch":
logging.info(f"Owner received stream switch request for channel {channel_id}")
# Handle stream switch request
new_url = data.get("url")
user_agent = data.get("user_agent")
if new_url and channel_id in self.stream_managers:
# Update metadata in Redis
if self.redis_client:
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
self.redis_client.hset(metadata_key, "url", new_url)
if user_agent:
self.redis_client.hset(metadata_key, "user_agent", user_agent)
# Set switch status
status_key = f"ts_proxy:channel:{channel_id}:switch_status"
self.redis_client.set(status_key, "switching")
# Perform the stream switch
stream_manager = self.stream_managers[channel_id]
success = stream_manager.update_url(new_url)
if success:
logging.info(f"Stream switch initiated for channel {channel_id}")
# Publish confirmation
switch_result = {
"event": "stream_switched",
"channel_id": channel_id,
"success": True,
"url": new_url,
"timestamp": time.time()
}
self.redis_client.publish(
f"ts_proxy:events:{channel_id}",
json.dumps(switch_result)
)
# Update status
if self.redis_client:
self.redis_client.set(status_key, "switched")
else:
logging.error(f"Failed to switch stream for channel {channel_id}")
# Publish failure
switch_result = {
"event": "stream_switched",
"channel_id": channel_id,
"success": False,
"url": new_url,
"timestamp": time.time()
}
self.redis_client.publish(
f"ts_proxy:events:{channel_id}",
json.dumps(switch_result)
)
except Exception as e:
logging.error(f"Error processing event message: {e}")
except Exception as e:
logging.error(f"Error in event listener: {e}")
time.sleep(5) # Wait before reconnecting
# Try to restart the listener
self._start_event_listener()
thread = threading.Thread(target=event_listener, daemon=True)
thread.name = "redis-event-listener"
thread.start()
def get_channel_owner(self, channel_id):
"""Get the worker ID that owns this channel with proper error handling"""
if not self.redis_client:
return None
try:
lock_key = f"ts_proxy:channel:{channel_id}:owner"
owner = self.redis_client.get(lock_key)
if owner:
return owner.decode('utf-8')
return None
except Exception as e:
logging.error(f"Error getting channel owner: {e}")
return None
def am_i_owner(self, channel_id):
"""Check if this worker is the owner of the channel"""
owner = self.get_channel_owner(channel_id)
return owner == self.worker_id
def try_acquire_ownership(self, channel_id, ttl=30):
"""Try to become the owner of this channel using proper locking"""
if not self.redis_client:
return True # If no Redis, always become owner
try:
# Create a lock key with proper namespace
lock_key = f"ts_proxy:channel:{channel_id}:owner"
# Use Redis SETNX for atomic locking - only succeeds if the key doesn't exist
acquired = self.redis_client.setnx(lock_key, self.worker_id)
# If acquired, set expiry to prevent orphaned locks
if acquired:
self.redis_client.expire(lock_key, ttl)
logging.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}")
return True
# If not acquired, check if we already own it (might be a retry)
current_owner = self.redis_client.get(lock_key)
if current_owner and current_owner.decode('utf-8') == self.worker_id:
# Refresh TTL
self.redis_client.expire(lock_key, ttl)
logging.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}")
return True
# Someone else owns it
return False
except Exception as e:
logging.error(f"Error acquiring channel ownership: {e}")
return False
def release_ownership(self, channel_id):
"""Release ownership of this channel safely"""
if not self.redis_client:
return
try:
lock_key = f"ts_proxy:channel:{channel_id}:owner"
# Only delete if we're the current owner to prevent race conditions
current = self.redis_client.get(lock_key)
if current and current.decode('utf-8') == self.worker_id:
self.redis_client.delete(lock_key)
logging.info(f"Released ownership of channel {channel_id}")
except Exception as e:
logging.error(f"Error releasing channel ownership: {e}")
def extend_ownership(self, channel_id, ttl=30):
"""Extend ownership lease with grace period"""
if not self.redis_client:
return False
try:
lock_key = f"ts_proxy:channel:{channel_id}:owner"
current = self.redis_client.get(lock_key)
# Only extend if we're still the owner
if current and current.decode('utf-8') == self.worker_id:
self.redis_client.expire(lock_key, ttl)
return True
return False
except Exception as e:
logging.error(f"Error extending ownership: {e}")
return False
def initialize_channel(self, url, channel_id, user_agent=None):
"""Initialize a channel with standardized Redis keys"""
try:
# Get channel URL from Redis if available
channel_url = url
channel_user_agent = user_agent
if self.redis_client:
# Store stream metadata - can be done regardless of ownership
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
active_key = f"ts_proxy:channel:{channel_id}:active"
# Store metadata as hash
metadata = {
"url": url if url else "",
"init_time": str(time.time()),
"owner": self.worker_id,
"state": "initializing"
}
if user_agent:
metadata["user_agent"] = user_agent
# Set channel metadata
self.redis_client.hset(metadata_key, mapping=metadata)
self.redis_client.expire(metadata_key, 3600) # 1 hour TTL
# Set simple activity marker - used for quick existence checks
self.redis_client.setex(active_key, 300, "1") # 5 min TTL
# If no url was passed, try to get from Redis
if not url:
url_bytes = self.redis_client.hget(metadata_key, "url")
if url_bytes:
channel_url = url_bytes.decode('utf-8')
ua_bytes = self.redis_client.hget(metadata_key, "user_agent")
if ua_bytes:
channel_user_agent = ua_bytes.decode('utf-8')
# Check if channel is already owned
current_owner = self.get_channel_owner(channel_id)
# Exit early if another worker owns the channel
if current_owner and current_owner != self.worker_id:
logging.info(f"Channel {channel_id} already owned by worker {current_owner}")
logging.info(f"This worker ({self.worker_id}) will read from Redis buffer only")
# Create buffer but not stream manager
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
# Only continue with full initialization if URL is provided
# or we can get it from Redis
if not channel_url:
logging.error(f"No URL available for channel {channel_id}")
return False
# Try to acquire ownership with Redis locking
if not self.try_acquire_ownership(channel_id):
# Another worker just acquired ownership
logging.info(f"Another worker just acquired ownership of channel {channel_id}")
# Create buffer but not stream manager
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
self.stream_buffers[channel_id] = buffer
# Create client manager with channel_id and redis_client
client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id)
self.client_managers[channel_id] = client_manager
return True
# We now own the channel - create stream manager
logging.info(f"Worker {self.worker_id} is now the owner of channel {channel_id}")
# Create stream buffer
buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client)
logging.debug(f"Created StreamBuffer for channel {channel_id}")
self.stream_buffers[channel_id] = buffer
# Only the owner worker creates the actual stream manager
stream_manager = StreamManager(channel_url, buffer, user_agent=channel_user_agent)
logging.debug(f"Created StreamManager for channel {channel_id}")
self.stream_managers[channel_id] = stream_manager
# Create client manager with channel_id, redis_client AND worker_id
client_manager = ClientManager(
channel_id=channel_id,
redis_client=self.redis_client,
worker_id=self.worker_id
)
self.client_managers[channel_id] = client_manager
# Start stream manager thread only for the owner
thread = threading.Thread(target=stream_manager.run, daemon=True)
thread.name = f"stream-{channel_id}"
thread.start()
logging.info(f"Started stream manager thread for channel {channel_id}")
# If we're the owner, we need to set the channel state rather than starting a grace period immediately
if self.am_i_owner(channel_id):
# Set channel state to "connecting"
if self.redis_client:
state_key = f"ts_proxy:channel:{channel_id}:state"
self.redis_client.set(state_key, "connecting")
# Set connection attempt start time for monitoring
# RENAMED: connect_time → connection_attempt_time
attempt_key = f"ts_proxy:channel:{channel_id}:connection_attempt_time"
self.redis_client.setex(attempt_key, 60, str(time.time()))
logging.info(f"Channel {channel_id} in connecting state - will start grace period after connection")
return True
except Exception as e:
logging.error(f"Error initializing channel {channel_id}: {e}", exc_info=True)
# Release ownership on failure
self.release_ownership(channel_id)
return False
def check_if_channel_exists(self, channel_id):
"""Check if a channel exists using standardized key structure"""
# Check local memory first
if channel_id in self.stream_managers or channel_id in self.stream_buffers:
return True
# Check Redis using the standard key pattern
if self.redis_client:
# Primary check - look for channel metadata
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
# If metadata exists, return true
if self.redis_client.exists(metadata_key):
return True
# Additional checks if metadata doesn't exist
additional_keys = [
f"ts_proxy:channel:{channel_id}:active",
f"ts_proxy:channel:{channel_id}:clients",
f"ts_proxy:channel:{channel_id}:buffer:index",
f"ts_proxy:channel:{channel_id}:owner"
]
for key in additional_keys:
if self.redis_client.exists(key):
return True
return False
def stop_channel(self, channel_id):
"""Stop a channel with proper ownership handling"""
try:
logging.info(f"Stopping channel {channel_id}")
# Only stop the actual stream manager if we're the owner
if self.am_i_owner(channel_id):
logging.info(f"This worker ({self.worker_id}) is the owner - closing provider connection")
if channel_id in self.stream_managers:
stream_manager = self.stream_managers[channel_id]
# Signal thread to stop and close resources
if hasattr(stream_manager, 'stop'):
stream_manager.stop()
else:
stream_manager.running = False
if hasattr(stream_manager, '_close_socket'):
stream_manager._close_socket()
# Wait for stream thread to finish
stream_thread_name = f"stream-{channel_id}"
stream_thread = None
for thread in threading.enumerate():
if thread.name == stream_thread_name:
stream_thread = thread
break
if stream_thread and stream_thread.is_alive():
logging.info(f"Waiting for stream thread to terminate")
try:
# Very short timeout to prevent hanging the app
stream_thread.join(timeout=2.0)
if stream_thread.is_alive():
logging.warning(f"Stream thread did not terminate within timeout")
except RuntimeError:
logging.debug("Could not join stream thread (may be current thread)")
# Release ownership
self.release_ownership(channel_id)
logging.info(f"Released ownership of channel {channel_id}")
# Always clean up local resources
if channel_id in self.stream_managers:
del self.stream_managers[channel_id]
logging.info(f"Removed stream manager for channel {channel_id}")
if channel_id in self.stream_buffers:
del self.stream_buffers[channel_id]
logging.info(f"Removed stream buffer for channel {channel_id}")
if channel_id in self.client_managers:
del self.client_managers[channel_id]
logging.info(f"Removed client manager for channel {channel_id}")
# Clean up Redis keys
self._clean_redis_keys(channel_id)
return True
except Exception as e:
logging.error(f"Error stopping channel {channel_id}: {e}", exc_info=True)
return False
def check_inactive_channels(self):
"""Check for inactive channels (no clients) and stop them"""
channels_to_stop = []
for channel_id, client_manager in self.client_managers.items():
if client_manager.get_client_count() == 0:
channels_to_stop.append(channel_id)
for channel_id in channels_to_stop:
logging.info(f"Auto-stopping inactive channel {channel_id}")
self.stop_channel(channel_id)
def _cleanup_channel(self, channel_id: str) -> None:
"""Remove channel resources"""
for collection in [self.stream_managers, self.stream_buffers,
self.client_managers, self.fetch_threads]:
collection.pop(channel_id, None)
def shutdown(self) -> None:
"""Stop all channels and cleanup"""
for channel_id in list(self.stream_managers.keys()):
self.stop_channel(channel_id)
def _start_cleanup_thread(self):
"""Start background thread to maintain ownership and clean up resources"""
def cleanup_task():
while True:
try:
# For channels we own, check total clients and cleanup as needed
for channel_id in list(self.stream_managers.keys()):
if self.am_i_owner(channel_id):
# Extend ownership lease
self.extend_ownership(channel_id)
# Get channel state
channel_state = "unknown"
if self.redis_client:
state_key = f"ts_proxy:channel:{channel_id}:state"
state_bytes = self.redis_client.get(state_key)
if state_bytes:
channel_state = state_bytes.decode('utf-8')
# Check if channel has any clients left
total_clients = 0
if channel_id in self.client_managers:
client_manager = self.client_managers[channel_id]
total_clients = client_manager.get_total_client_count()
# VERIFY REDIS CLIENT COUNT DIRECTLY - Double check client count
if self.redis_client:
client_set_key = f"ts_proxy:channel:{channel_id}:clients"
redis_client_count = self.redis_client.scard(client_set_key) or 0
if redis_client_count != total_clients:
logging.warning(f"Client count mismatch for channel {channel_id}: "
f"manager={total_clients}, redis={redis_client_count}")
# Trust Redis count as source of truth
total_clients = redis_client_count
# Log client count periodically
if time.time() % 30 < 1: # Every ~30 seconds
logging.info(f"Channel {channel_id} has {total_clients} clients, state: {channel_state}")
# If in waiting_for_clients state, check if grace period expired
if channel_state == "waiting_for_clients" and total_clients == 0:
# ... existing grace period logic ...
pass
# If active and no clients, start normal shutdown procedure
elif channel_state not in ["connecting", "waiting_for_clients"] and total_clients == 0:
# Check if there's a pending no-clients timeout
disconnect_key = f"ts_proxy:channel:{channel_id}:last_client_disconnect_time"
disconnect_time = None
if self.redis_client:
disconnect_value = self.redis_client.get(disconnect_key)
if disconnect_value:
try:
disconnect_time = float(disconnect_value.decode('utf-8'))
except (ValueError, TypeError) as e:
logging.error(f"Invalid disconnect time for channel {channel_id}: {e}")
current_time = time.time()
if not disconnect_time:
# First time seeing zero clients, set timestamp
if self.redis_client:
self.redis_client.setex(disconnect_key, 60, str(current_time))
logging.warning(f"No clients detected for channel {channel_id}, starting shutdown timer")
elif current_time - disconnect_time > getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 5):
# We've had no clients for the shutdown delay period - FORCE STOP
logging.warning(f"No clients for {current_time - disconnect_time:.1f}s, stopping channel {channel_id}")
self.stop_channel(channel_id)
else:
# Still in shutdown delay period
logging.debug(f"Channel {channel_id} shutdown timer: "
f"{current_time - disconnect_time:.1f}s of "
f"{getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 5)}s elapsed")
else:
# There are clients or we're still connecting - clear any disconnect timestamp
if self.redis_client:
self.redis_client.delete(f"ts_proxy:channel:{channel_id}:last_client_disconnect_time")
# ... rest of the cleanup thread ...
except Exception as e:
logging.error(f"Error in cleanup thread: {e}", exc_info=True)
time.sleep(getattr(Config, 'CLEANUP_CHECK_INTERVAL', 1))
thread = threading.Thread(target=cleanup_task, daemon=True)
thread.name = "ts-proxy-cleanup"
thread.start()
logging.info(f"Started TS proxy cleanup thread (interval: {getattr(Config, 'CLEANUP_CHECK_INTERVAL', 3)}s)")
def _check_orphaned_channels(self):
"""Check for orphaned channels in Redis (owner worker crashed)"""
if not self.redis_client:
return
try:
# Get all active channel keys
channel_pattern = "ts_proxy:channel:*:metadata"
channel_keys = self.redis_client.keys(channel_pattern)
for key in channel_keys:
try:
channel_id = key.decode('utf-8').split(':')[2]
# Skip channels we already have locally
if channel_id in self.stream_buffers:
continue
# Check if this channel has an owner
owner = self.get_channel_owner(channel_id)
if not owner:
# Check if there are any clients
client_set_key = f"ts_proxy:channel:{channel_id}:clients"
client_count = self.redis_client.scard(client_set_key) or 0
if client_count > 0:
# Orphaned channel with clients - we could take ownership
logging.info(f"Found orphaned channel {channel_id} with {client_count} clients")
else:
# Orphaned channel with no clients - clean it up
logging.info(f"Cleaning up orphaned channel {channel_id}")
self._clean_redis_keys(channel_id)
except Exception as e:
logging.error(f"Error processing channel key {key}: {e}")
except Exception as e:
logging.error(f"Error checking orphaned channels: {e}")
def _clean_redis_keys(self, channel_id):
"""Clean up all Redis keys for a channel"""
if not self.redis_client:
return
try:
# All keys are now under the channel namespace for easy pattern matching
channel_pattern = f"ts_proxy:channel:{channel_id}:*"
all_keys = self.redis_client.keys(channel_pattern)
if all_keys:
self.redis_client.delete(*all_keys)
logging.info(f"Cleaned up {len(all_keys)} Redis keys for channel {channel_id}")
except Exception as e:
logging.error(f"Error cleaning Redis keys for channel {channel_id}: {e}")
def refresh_channel_registry(self):
"""Refresh TTL for active channels using standard keys"""
if not self.redis_client:
return
# Refresh registry entries for channels we own
for channel_id in self.stream_managers.keys():
# Use standard key pattern
active_key = f"ts_proxy:channel:{channel_id}:active"
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
# Update activity timestamp
self.redis_client.setex(active_key, 300, "1") # 5 minute TTL
self.redis_client.hset(metadata_key, "last_active", str(time.time()))