diff --git a/apps/channels/models.py b/apps/channels/models.py index bd4045ba..5ba5974e 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -1,6 +1,12 @@ from django.db import models from django.core.exceptions import ValidationError from core.models import StreamProfile +from django.conf import settings +from core.models import StreamProfile, CoreSettings +from core.utils import redis_client +import logging + +logger = logging.getLogger(__name__) # If you have an M3UAccount model in apps.m3u, you can still import it: from apps.m3u.models import M3UAccount @@ -100,6 +106,89 @@ class Channel(models.Model): def __str__(self): return f"{self.channel_number} - {self.channel_name}" + def get_stream_profile(self): + stream_profile = self.stream_profile + if not stream_profile: + stream_profile = StreamProfile.objects.get(id=CoreSettings.objects.get(key="default-stream-profile").value) + + return stream_profile + + def get_stream(self): + """ + Finds an available stream for the requested channel and returns the selected stream and profile. + """ + + # 2. Check if a stream is already active for this channel + stream_id = redis_client.get(f"channel_stream:{self.id}") + if stream_id: + stream_id = int(stream_id) + profile_id = redis_client.get(f"stream_profile:{stream_id}") + if profile_id: + profile_id = int(profile_id) + return stream_id, profile_id + + # 3. Iterate through channel streams and their profiles + for stream in self.streams.all().order_by('channelstream__order'): + # Retrieve the M3U account associated with the stream. + m3u_account = stream.m3u_account + m3u_profiles = m3u_account.profiles.all() + default_profile = next((obj for obj in m3u_profiles if obj.is_default), None) + profiles = [default_profile] + [obj for obj in m3u_profiles if not obj.is_default] + + for profile in profiles: + # Skip inactive profiles + if profile.is_active == False: + continue + + profile_connections_key = f"profile_connections:{profile.id}" + current_connections = int(redis_client.get(profile_connections_key) or 0) + + # Check if profile has available slots (or unlimited connections) + if profile.max_streams == 0 or current_connections < profile.max_streams: + # Start a new stream + redis_client.set(f"channel_stream:{self.id}", stream.id) + redis_client.set(f"stream_profile:{stream.id}", profile.id) # Store only the matched profile + + # Increment connection count for profiles with limits + if profile.max_streams > 0: + redis_client.incr(profile_connections_key) + + return stream.id, profile.id # Return newly assigned stream and matched profile + + # 4. No available streams + return None, None + + def release_stream(self): + """ + Called when a stream is finished to release the lock. + """ + stream_id = redis_client.get(f"channel_stream:{self.id}") + if not stream_id: + logger.debug("Invalid stream ID pulled from channel index") + return + + redis_client.delete(f"channel_stream:{self.id}") # Remove active stream + + stream_id = int(stream_id) + logger.debug(f"Found stream ID {stream_id} associated with channel stream {self.id}") + + # Get the matched profile for cleanup + profile_id = redis_client.get(f"stream_profile:{stream_id}") + if not profile_id: + logger.debug("Invalid profile ID pulled from stream index") + return + + redis_client.delete(f"stream_profile:{stream_id}") # Remove profile association + + profile_id = int(profile_id) + logger.debug(f"Found profile ID {profile_id} associated with stream {stream_id}") + + profile_connections_key = f"profile_connections:{profile_id}" + + # Only decrement if the profile had a max_connections limit + current_count = int(redis_client.get(profile_connections_key) or 0) + if current_count > 0: + redis_client.decr(profile_connections_key) class ChannelGroup(models.Model): name = models.CharField(max_length=100, unique=True) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 9129f4df..79985645 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -10,21 +10,23 @@ Handles live TS stream proxying with support for: import requests import threading import logging -import socket +import socket import random +import subprocess from collections import deque import time import sys from typing import Optional, Set, Deque, Dict import json from apps.proxy.config import TSConfig as Config +from apps.channels.models import Channel logger = logging.getLogger("ts_proxy") class StreamManager: """Manages a connection to a TS stream without using raw sockets""" - - def __init__(self, url, buffer, user_agent=None): + + def __init__(self, url, buffer, user_agent=None, transcode_cmd=[]): # Basic properties self.url = url self.buffer = buffer @@ -34,28 +36,33 @@ class StreamManager: self.max_retries = Config.MAX_RETRIES self.current_response = None self.current_session = None - + + # Sockets used for transcode jobs + self.socket = None + self.transcode_cmd = transcode_cmd + self.transcode_process = None + # User agent for connection self.user_agent = user_agent or Config.DEFAULT_USER_AGENT - + # Stream health monitoring self.last_data_time = time.time() self.healthy = True self.health_check_interval = Config.HEALTH_CHECK_INTERVAL self.chunk_size = getattr(Config, 'CHUNK_SIZE', 8192) - + logger.info(f"Initialized stream manager for channel {buffer.channel_id}") - + def _create_session(self): """Create and configure requests session with optimal settings""" session = requests.Session() - + # Configure session headers session.headers.update({ 'User-Agent': self.user_agent, 'Connection': 'keep-alive' }) - + # Set up connection pooling for better performance adapter = requests.adapters.HTTPAdapter( pool_connections=1, # Single connection for this stream @@ -63,146 +70,174 @@ class StreamManager: max_retries=3, # Auto-retry for failed requests pool_block=False # Don't block when pool is full ) - + # Apply adapter to both HTTP and HTTPS session.mount('http://', adapter) session.mount('https://', adapter) - + return session - + def run(self): """Main execution loop using HTTP streaming with improved connection handling""" try: # Start health monitor thread health_thread = threading.Thread(target=self._monitor_health, daemon=True) health_thread.start() - + logger.info(f"Starting stream for URL: {self.url}") - + while self.running: - try: - # Create new session for each connection attempt - session = self._create_session() - self.current_session = session - - # Stream the URL with proper timeout handling - response = session.get( - self.url, - stream=True, - timeout=(10, 60) # 10s connect timeout, 60s read timeout + if len(self.transcode_cmd) > 0: + self.transcode_process = subprocess.Popen( + self.transcode_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, # Suppress FFmpeg logs + bufsize=188 * 64 # Buffer optimized for TS packets ) - self.current_response = response - - if response.status_code == 200: - self.connected = True - self.healthy = True - logger.info("Successfully connected to stream source") - + self.socket = self.transcode_process.stdout # Read from FFmpeg output + self.connected = True + + if self.socket is not None: # Set channel state to waiting for clients self._set_waiting_for_clients() - - # Process the stream in chunks with improved error handling - try: - chunk_count = 0 - for chunk in response.iter_content(chunk_size=self.chunk_size): + + # Main fetch loop + while self.running and self.connected: + if self.fetch_chunk(): + self.last_data_time = time.time() + else: if not self.running: break - - if chunk: - # Add chunk to buffer with TS packet alignment - success = self.buffer.add_chunk(chunk) - - if success: - self.last_data_time = time.time() - chunk_count += 1 - - # Update last data timestamp in Redis - if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: - last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" - self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) - except AttributeError as e: - # Handle the specific 'NoneType' object has no attribute 'read' error - if "'NoneType' object has no attribute 'read'" in str(e): - logger.warning(f"Connection closed by server (read {chunk_count} chunks before disconnect)") - else: - # Re-raise unexpected AttributeError - raise - else: - logger.error(f"Failed to connect to stream: HTTP {response.status_code}") - time.sleep(2) - - except requests.exceptions.ReadTimeout: - logger.warning("Read timeout - server stopped sending data") - self.connected = False - time.sleep(1) - - except requests.RequestException as e: - logger.error(f"HTTP request error: {e}") - self.connected = False - time.sleep(5) - - finally: - # Clean up response and session - if self.current_response: - try: - self.current_response.close() - except Exception as e: - logger.debug(f"Error closing response: {e}") - self.current_response = None - - if self.current_session: - try: - self.current_session.close() - except Exception as e: - logger.debug(f"Error closing session: {e}") - self.current_session = None - + time.sleep(0.1) + else: + try: + # Create new session for each connection attempt + session = self._create_session() + self.current_session = session + + # Stream the URL with proper timeout handling + response = session.get( + self.url, + stream=True, + timeout=(10, 60) # 10s connect timeout, 60s read timeout + ) + self.current_response = response + + if response.status_code == 200: + self.connected = True + self.healthy = True + logger.info("Successfully connected to stream source") + + # Set channel state to waiting for clients + self._set_waiting_for_clients() + + # Process the stream in chunks with improved error handling + try: + chunk_count = 0 + for chunk in response.iter_content(chunk_size=self.chunk_size): + if not self.running: + break + + if chunk: + # Add chunk to buffer with TS packet alignment + success = self.buffer.add_chunk(chunk) + + if success: + self.last_data_time = time.time() + chunk_count += 1 + + # Update last data timestamp in Redis + if hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: + last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" + self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) + except AttributeError as e: + # Handle the specific 'NoneType' object has no attribute 'read' error + if "'NoneType' object has no attribute 'read'" in str(e): + logger.warning(f"Connection closed by server (read {chunk_count} chunks before disconnect)") + else: + # Re-raise unexpected AttributeError + raise + else: + logger.error(f"Failed to connect to stream: HTTP {response.status_code}") + time.sleep(2) + + except requests.exceptions.ReadTimeout: + logger.warning("Read timeout - server stopped sending data") + self.connected = False + time.sleep(1) + + except requests.RequestException as e: + logger.error(f"HTTP request error: {e}") + self.connected = False + time.sleep(5) + + finally: + # Clean up response and session + if self.current_response: + try: + self.current_response.close() + except Exception as e: + logger.debug(f"Error closing response: {e}") + self.current_response = None + + if self.current_session: + try: + self.current_session.close() + except Exception as e: + logger.debug(f"Error closing session: {e}") + self.current_session = None + # Connection retry logic if self.running and not self.connected: self.retry_count += 1 if self.retry_count > self.max_retries: logger.error(f"Maximum retry attempts ({self.max_retries}) exceeded") break - + timeout = min(2 ** self.retry_count, 30) logger.info(f"Reconnecting in {timeout} seconds... (attempt {self.retry_count})") time.sleep(timeout) - + except Exception as e: logger.error(f"Stream error: {e}", exc_info=True) + if self.socket is not None: + self._close_socket() finally: self.connected = False - + if self.current_response: try: self.current_response.close() except: pass - + if self.current_session: try: - self.current_session.close() + self.current_session.close() except: pass - + + if self.socket: + self._close_socket() + logger.info("Stream manager stopped") - + def stop(self): """Stop the stream manager and clean up resources""" self.running = False - + if self.current_response: try: self.current_response.close() except: pass - + if self.current_session: try: self.current_session.close() except: pass - + logger.info("Stream manager resources released") def update_url(self, new_url): @@ -210,20 +245,20 @@ class StreamManager: if new_url == self.url: logger.info(f"URL unchanged: {new_url}") return False - + logger.info(f"Switching stream URL from {self.url} to {new_url}") - + # Close existing HTTP connection resources instead of socket self._close_connection() # Use our new method instead of _close_socket - + # Update URL and reset connection state old_url = self.url self.url = new_url self.connected = False - + # Reset retry counter to allow immediate reconnect self.retry_count = 0 - + return True def should_retry(self) -> bool: @@ -240,7 +275,7 @@ class StreamManager: if self.healthy: logger.warning("Stream health check: No data received for 10+ seconds") self.healthy = False - + # After 30 seconds with no data, force reconnection if now - self.last_data_time > 30: logger.warning("Stream appears dead, forcing reconnection") @@ -251,12 +286,12 @@ class StreamManager: # Stream is receiving data again after being unhealthy logger.info("Stream health restored, receiving data again") self.healthy = True - + except Exception as e: logger.error(f"Error in health monitor: {e}") - + time.sleep(self.health_check_interval) - + def _close_connection(self): """Close HTTP connection resources""" # Close response if it exists @@ -266,7 +301,7 @@ class StreamManager: except Exception as e: logger.debug(f"Error closing response: {e}") self.current_response = None - + # Close session if it exists if hasattr(self, 'current_session') and self.current_session: try: @@ -278,13 +313,28 @@ class StreamManager: # Keep backward compatibility - let's create an alias to the new method def _close_socket(self): """Backward compatibility wrapper for _close_connection""" - return self._close_connection() + if self.current_response: + return self._close_connection() + if self.socket: + try: + self.socket.close() + except Exception as e: + logging.debug(f"Error closing socket: {e}") + pass + + self.socket = None + self.connected = False + + if self.transcode_process: + self.transcode_process.terminate() + self.transcode_process.wait() + self.transcode_process = None def fetch_chunk(self): """Fetch data from socket with direct pass-through to buffer""" if not self.connected or not self.socket: return False - + try: # Read data chunk - no need to align with TS packet size anymore try: @@ -293,35 +343,35 @@ class StreamManager: chunk = self.socket.recv(Config.CHUNK_SIZE) # Standard socket else: chunk = self.socket.read(Config.CHUNK_SIZE) # SocketIO object - + except AttributeError: # Fall back to read() if recv() isn't available chunk = self.socket.read(Config.CHUNK_SIZE) - + if not chunk: # Connection closed by server logger.warning("Server closed connection") self._close_socket() self.connected = False return False - + # Add directly to buffer without TS-specific processing success = self.buffer.add_chunk(chunk) - + # Update last data timestamp in Redis if successful if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) - + return True - + except (socket.timeout, socket.error) as e: # Socket error logger.error(f"Socket error: {e}") self._close_socket() self.connected = False return False - + except Exception as e: logger.error(f"Error in fetch_chunk: {e}") return False @@ -332,13 +382,13 @@ class StreamManager: if hasattr(self.buffer, 'channel_id') and hasattr(self.buffer, 'redis_client'): channel_id = self.buffer.channel_id redis_client = self.buffer.redis_client - + if channel_id and redis_client: current_time = str(time.time()) - + # SIMPLIFIED: Always use direct Redis update for reliability metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - + # Check current state first current_state = None try: @@ -347,7 +397,7 @@ class StreamManager: current_state = metadata[b'state'].decode('utf-8') except Exception as e: logger.error(f"Error checking current state: {e}") - + # Only update if not already past connecting if not current_state or current_state in ["initializing", "connecting"]: # Update directly - don't rely on proxy_server reference @@ -357,7 +407,7 @@ class StreamManager: "state_changed_at": current_time } redis_client.hset(metadata_key, mapping=update_data) - + # Get configured grace period or default grace_period = getattr(Config, 'CHANNEL_INIT_GRACE_PERIOD', 20) logger.info(f"STREAM MANAGER: Updated channel {channel_id} state: {current_state or 'None'} → waiting_for_clients") @@ -372,24 +422,24 @@ class StreamManager: try: # Read up to CHUNK_SIZE bytes chunk = self.sock.recv(self.CHUNK_SIZE) - + if not chunk: # Connection closed logger.debug("Connection closed by remote host") return False - - # If we got data, just add it directly to the buffer + + # If we got data, just add it directly to the buffer if chunk: success = self.buffer.add_chunk(chunk) - + # Update last data timestamp in Redis if successful if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data" self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) # 1 minute expiry - + return success return True - + except socket.timeout: # Expected timeout - no data available return True @@ -400,20 +450,20 @@ class StreamManager: class StreamBuffer: """Manages stream data buffering with optimized chunk storage""" - + def __init__(self, channel_id=None, redis_client=None): self.channel_id = channel_id self.redis_client = redis_client self.lock = threading.Lock() self.index = 0 self.TS_PACKET_SIZE = 188 - + # STANDARDIZED KEYS: Move buffer keys under channel namespace self.buffer_index_key = f"ts_proxy:channel:{channel_id}:buffer:index" self.buffer_prefix = f"ts_proxy:channel:{channel_id}:buffer:chunk:" - + self.chunk_ttl = getattr(Config, 'REDIS_CHUNK_TTL', 60) - + # Initialize from Redis if available if self.redis_client and channel_id: try: @@ -423,38 +473,38 @@ class StreamBuffer: logger.info(f"Initialized buffer from Redis with index {self.index}") except Exception as e: logger.error(f"Error initializing buffer from Redis: {e}") - + self._write_buffer = bytearray() self.target_chunk_size = getattr(Config, 'BUFFER_CHUNK_SIZE', 188 * 5644) # ~1MB default - + def add_chunk(self, chunk): """Add data with optimized Redis storage and TS packet alignment""" if not chunk: return False - + try: # Accumulate partial packets between chunks if not hasattr(self, '_partial_packet'): self._partial_packet = bytearray() - + # Combine with any previous partial packet combined_data = bytearray(self._partial_packet) + bytearray(chunk) - + # Calculate complete packets complete_packets_size = (len(combined_data) // 188) * 188 - + if complete_packets_size == 0: # Not enough data for a complete packet self._partial_packet = combined_data return True - + # Split into complete packets and remainder complete_packets = combined_data[:complete_packets_size] self._partial_packet = combined_data[complete_packets_size:] - + # Add completed packets to write buffer self._write_buffer.extend(complete_packets) - + # Only write to Redis when we have enough data for an optimized chunk writes_done = 0 with self.lock: @@ -462,54 +512,54 @@ class StreamBuffer: # Extract a full chunk chunk_data = self._write_buffer[:self.target_chunk_size] self._write_buffer = self._write_buffer[self.target_chunk_size:] - + # Write optimized chunk to Redis if self.redis_client: chunk_index = self.redis_client.incr(self.buffer_index_key) chunk_key = f"{self.buffer_prefix}{chunk_index}" self.redis_client.setex(chunk_key, self.chunk_ttl, bytes(chunk_data)) - + # Update local tracking self.index = chunk_index writes_done += 1 - + if writes_done > 0: logger.debug(f"Added {writes_done} optimized chunks ({self.target_chunk_size} bytes each) to Redis") - + return True - + except Exception as e: logger.error(f"Error adding chunk to buffer: {e}") return False - + def get_chunks(self, start_index=None): """Get chunks from the buffer with detailed logging""" try: request_id = f"req_{random.randint(1000, 9999)}" logger.debug(f"[{request_id}] get_chunks called with start_index={start_index}") - + if not self.redis_client: logger.error("Redis not available, cannot retrieve chunks") return [] - + # If no start_index provided, use most recent chunks if start_index is None: start_index = max(0, self.index - 10) # Start closer to current position logger.debug(f"[{request_id}] No start_index provided, using {start_index}") - + # Get current index from Redis current_index = int(self.redis_client.get(self.buffer_index_key) or 0) - + # Calculate range of chunks to retrieve start_id = start_index + 1 chunks_behind = current_index - start_id - + # Adaptive chunk retrieval based on how far behind if chunks_behind > 100: fetch_count = 15 logger.debug(f"[{request_id}] Client very behind ({chunks_behind} chunks), fetching {fetch_count}") elif chunks_behind > 50: - fetch_count = 10 + fetch_count = 10 logger.debug(f"[{request_id}] Client moderately behind ({chunks_behind} chunks), fetching {fetch_count}") elif chunks_behind > 20: fetch_count = 5 @@ -517,87 +567,87 @@ class StreamBuffer: else: fetch_count = 3 logger.debug(f"[{request_id}] Client up-to-date (only {chunks_behind} chunks behind), fetching {fetch_count}") - + end_id = min(current_index + 1, start_id + fetch_count) - + if start_id >= end_id: logger.debug(f"[{request_id}] No new chunks to fetch (start_id={start_id}, end_id={end_id})") return [] - + # Log the range we're retrieving logger.debug(f"[{request_id}] Retrieving chunks {start_id} to {end_id-1} (total: {end_id-start_id})") - + # Directly fetch from Redis using pipeline for efficiency pipe = self.redis_client.pipeline() for idx in range(start_id, end_id): chunk_key = f"{self.buffer_prefix}{idx}" pipe.get(chunk_key) - + results = pipe.execute() - + # Process results chunks = [result for result in results if result is not None] - + # Count non-None results found_chunks = len(chunks) missing_chunks = len(results) - found_chunks - + if missing_chunks > 0: logger.debug(f"[{request_id}] Missing {missing_chunks}/{len(results)} chunks in Redis") - + # Update local tracking if chunks: self.index = end_id - 1 - + # Final log message chunk_sizes = [len(c) for c in chunks] total_bytes = sum(chunk_sizes) if chunks else 0 logger.debug(f"[{request_id}] Returning {len(chunks)} chunks ({total_bytes} bytes)") - + return chunks - + except Exception as e: logger.error(f"Error getting chunks from buffer: {e}", exc_info=True) return [] - + def get_chunks_exact(self, start_index, count): """Get exactly the requested number of chunks from given index""" try: if not self.redis_client: logger.error("Redis not available, cannot retrieve chunks") return [] - + # Calculate range to retrieve start_id = start_index + 1 end_id = start_id + count - + # Get current buffer position current_index = int(self.redis_client.get(self.buffer_index_key) or 0) - + # If requesting beyond current buffer, return what we have if start_id > current_index: return [] - + # Cap end at current buffer position end_id = min(end_id, current_index + 1) - + # Directly fetch from Redis using pipeline pipe = self.redis_client.pipeline() for idx in range(start_id, end_id): chunk_key = f"{self.buffer_prefix}{idx}" pipe.get(chunk_key) - + results = pipe.execute() - + # Filter out None results chunks = [result for result in results if result is not None] - + # Update local index if needed if chunks and start_id + len(chunks) - 1 > self.index: self.index = start_id + len(chunks) - 1 - + return chunks - + except Exception as e: logger.error(f"Error getting exact chunks: {e}", exc_info=True) return [] @@ -609,10 +659,10 @@ class StreamBuffer: if hasattr(self, '_write_buffer') and len(self._write_buffer) > 0: # Ensure remaining data is aligned to TS packets complete_size = (len(self._write_buffer) // 188) * 188 - + if complete_size > 0: final_chunk = self._write_buffer[:complete_size] - + # Write final chunk to Redis with self.lock: if self.redis_client: @@ -624,18 +674,18 @@ class StreamBuffer: logger.info(f"Flushed final chunk of {len(final_chunk)} bytes to Redis") except Exception as e: logger.error(f"Error flushing final chunk: {e}") - + # Clear buffers self._write_buffer = bytearray() if hasattr(self, '_partial_packet'): self._partial_packet = bytearray() - + except Exception as e: logger.error(f"Error during buffer stop: {e}") class ClientManager: """Manages client connections with no duplicates""" - + def __init__(self, channel_id=None, redis_client=None, heartbeat_interval=1, worker_id=None): self.channel_id = channel_id self.redis_client = redis_client @@ -643,17 +693,17 @@ class ClientManager: self.lock = threading.Lock() self.last_active_time = time.time() self.worker_id = worker_id # Store worker ID as instance variable - + # STANDARDIZED KEYS: Move client set under channel namespace self.client_set_key = f"ts_proxy:channel:{channel_id}:clients" self.client_ttl = getattr(Config, 'CLIENT_RECORD_TTL', 60) self.heartbeat_interval = getattr(Config, 'CLIENT_HEARTBEAT_INTERVAL', 10) self.last_heartbeat_time = {} - + # Start heartbeat thread for local clients self._start_heartbeat_thread() self._registered_clients = set() # Track already registered client IDs - + def _start_heartbeat_thread(self): """Start thread to regularly refresh client presence in Redis""" def heartbeat_task(): @@ -661,20 +711,20 @@ class ClientManager: try: # Wait for the interval time.sleep(self.heartbeat_interval) - + # Send heartbeat for all local clients with self.lock: if not self.clients or not self.redis_client: continue - + # IMPROVED GHOST DETECTION: Check for stale clients before sending heartbeats current_time = time.time() clients_to_remove = set() - + # First identify clients that should be removed for client_id in self.clients: client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}" - + # Check if client exists in Redis at all exists = self.redis_client.exists(client_key) if not exists: @@ -682,98 +732,98 @@ class ClientManager: logger.warning(f"Found ghost client {client_id} - expired in Redis but still in local set") clients_to_remove.add(client_id) continue - + # Check for stale activity using last_active field last_active = self.redis_client.hget(client_key, "last_active") if last_active: last_active_time = float(last_active.decode('utf-8')) time_since_activity = current_time - last_active_time - + # If client hasn't been active for too long, mark for removal # Use configurable threshold for detection ghost_threshold = getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0) if time_since_activity > self.heartbeat_interval * ghost_threshold: logger.warning(f"Detected ghost client {client_id} - last active {time_since_activity:.1f}s ago") clients_to_remove.add(client_id) - + # Remove ghost clients in a separate step for client_id in clients_to_remove: self.remove_client(client_id) - + if clients_to_remove: logger.info(f"Removed {len(clients_to_remove)} ghost clients from channel {self.channel_id}") - + # Now send heartbeats only for remaining clients pipe = self.redis_client.pipeline() current_time = time.time() - + for client_id in self.clients: # Skip clients we just marked for removal if client_id in clients_to_remove: continue - + # Skip if we just sent a heartbeat recently if client_id in self.last_heartbeat_time: time_since_last = current_time - self.last_heartbeat_time[client_id] if time_since_last < self.heartbeat_interval * 0.8: continue - + # Only update clients that remain client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}" pipe.hset(client_key, "last_active", str(current_time)) pipe.expire(client_key, self.client_ttl) - + # Keep client in the set with TTL pipe.sadd(self.client_set_key, client_id) pipe.expire(self.client_set_key, self.client_ttl) - + # Track last heartbeat locally self.last_heartbeat_time[client_id] = current_time - + # Execute all commands atomically pipe.execute() - + # Only notify if we have real clients if self.clients and not all(c in clients_to_remove for c in self.clients): self._notify_owner_of_activity() - + except Exception as e: logger.error(f"Error in client heartbeat thread: {e}") - + thread = threading.Thread(target=heartbeat_task, daemon=True) thread.name = f"client-heartbeat-{self.channel_id}" thread.start() logger.debug(f"Started client heartbeat thread for channel {self.channel_id} (interval: {self.heartbeat_interval}s)") - + def _notify_owner_of_activity(self): """Notify channel owner that clients are active on this worker""" if not self.redis_client or not self.clients: return - + try: worker_id = self.worker_id or "unknown" - + # STANDARDIZED KEY: Worker info under channel namespace worker_key = f"ts_proxy:channel:{self.channel_id}:worker:{worker_id}" self.redis_client.setex(worker_key, self.client_ttl, str(len(self.clients))) - + # STANDARDIZED KEY: Activity timestamp under channel namespace activity_key = f"ts_proxy:channel:{self.channel_id}:activity" self.redis_client.setex(activity_key, self.client_ttl, str(time.time())) except Exception as e: logger.error(f"Error notifying owner of client activity: {e}") - + def add_client(self, client_id, user_agent=None): """Add a client with duplicate prevention""" if client_id in self._registered_clients: logger.debug(f"Client {client_id} already registered, skipping") return False - + self._registered_clients.add(client_id) - + # FIX: Consistent key naming - note the 's' in 'clients' client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}" - + # Prepare client data current_time = str(time.time()) client_data = { @@ -782,27 +832,27 @@ class ClientManager: "last_active": current_time, "worker_id": self.worker_id or "unknown" } - + try: with self.lock: # Store client in local set self.clients.add(client_id) - + # Store in Redis if self.redis_client: # FIXED: Store client data just once with proper key self.redis_client.hset(client_key, mapping=client_data) self.redis_client.expire(client_key, self.client_ttl) - + # Add to the client set self.redis_client.sadd(self.client_set_key, client_id) self.redis_client.expire(self.client_set_key, self.client_ttl) - + # Clear any initialization timer self.redis_client.delete(f"ts_proxy:channel:{self.channel_id}:init_time") - + self._notify_owner_of_activity() - + # Publish client connected event with user agent event_data = { "event": "client_connected", @@ -811,60 +861,60 @@ class ClientManager: "worker_id": self.worker_id or "unknown", "timestamp": time.time() } - + if user_agent: event_data["user_agent"] = user_agent logger.debug(f"Storing user agent '{user_agent}' for client {client_id}") else: logger.debug(f"No user agent provided for client {client_id}") - + self.redis_client.publish( - f"ts_proxy:events:{self.channel_id}", + f"ts_proxy:events:{self.channel_id}", json.dumps(event_data) ) - + # Get total clients across all workers total_clients = self.get_total_client_count() logger.info(f"New client connected: {client_id} (local: {len(self.clients)}, total: {total_clients})") - + self.last_heartbeat_time[client_id] = time.time() - + return len(self.clients) - + except Exception as e: logger.error(f"Error adding client {client_id}: {e}") return False - + def remove_client(self, client_id): """Remove a client from this channel and Redis""" with self.lock: if client_id in self.clients: self.clients.remove(client_id) - + if client_id in self.last_heartbeat_time: del self.last_heartbeat_time[client_id] - + self.last_active_time = time.time() - + if self.redis_client: # Remove from channel's client set self.redis_client.srem(self.client_set_key, client_id) - + # STANDARDIZED KEY: Delete individual client keys client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}" self.redis_client.delete(client_key) - + # Check if this was the last client remaining = self.redis_client.scard(self.client_set_key) or 0 if remaining == 0: logger.warning(f"Last client removed: {client_id} - channel may shut down soon") - + # Trigger disconnect time tracking even if we're not the owner disconnect_key = f"ts_proxy:channel:{self.channel_id}:last_client_disconnect_time" self.redis_client.setex(disconnect_key, 60, str(time.time())) - + self._notify_owner_of_activity() - + # Publish client disconnected event event_data = json.dumps({ "event": "client_disconnected", @@ -875,41 +925,41 @@ class ClientManager: "remaining_clients": remaining }) self.redis_client.publish(f"ts_proxy:events:{self.channel_id}", event_data) - + total_clients = self.get_total_client_count() logger.info(f"Client disconnected: {client_id} (local: {len(self.clients)}, total: {total_clients})") - + return len(self.clients) - + def get_client_count(self): """Get local client count""" with self.lock: return len(self.clients) - + def get_total_client_count(self): """Get total client count across all workers""" if not self.redis_client: return len(self.clients) - + try: # Count members in the client set return self.redis_client.scard(self.client_set_key) or 0 except Exception as e: logger.error(f"Error getting total client count: {e}") return len(self.clients) # Fall back to local count - + def refresh_client_ttl(self): """Refresh TTL for active clients to prevent expiration""" if not self.redis_client: return - + try: # Refresh TTL for all clients belonging to this worker for client_id in self.clients: # STANDARDIZED: Use channel namespace for client keys client_key = f"ts_proxy:channel:{self.channel_id}:clients:{client_id}" self.redis_client.expire(client_key, self.client_ttl) - + # Refresh TTL on the set itself self.redis_client.expire(self.client_set_key, self.client_ttl) except Exception as e: @@ -917,7 +967,7 @@ class ClientManager: class StreamFetcher: """Handles stream data fetching""" - + def __init__(self, manager: StreamManager, buffer: StreamBuffer): self.manager = manager self.buffer = buffer @@ -943,10 +993,10 @@ class StreamFetcher: if not self.manager.should_retry(): logger.error(f"Failed to connect after {self.manager.max_retries} attempts") return False - + if not self.manager.running: return False - + self.manager.retry_count += 1 logger.info(f"Connecting to stream: {self.manager.url} " f"(attempt {self.manager.retry_count}/{self.manager.max_retries})") @@ -965,13 +1015,13 @@ class StreamFetcher: if not self.manager.running: logger.info("Stream fetch stopped - shutting down") return - + if chunk: if self.manager.ready_event.is_set(): logger.info("Stream switch in progress, closing connection") self.manager.ready_event.clear() break - + with self.buffer.lock: self.buffer.buffer.append(chunk) self.buffer.index += 1 @@ -980,10 +1030,10 @@ class StreamFetcher: """Handle stream connection errors""" logger.error(f"Stream connection error: {error}") self.manager.connected = False - + if not self.manager.running: return - + logger.info(f"Attempting to reconnect in {Config.RECONNECT_DELAY} seconds...") if not wait_for_running(self.manager, Config.RECONNECT_DELAY): return @@ -999,26 +1049,26 @@ def wait_for_running(manager: StreamManager, delay: float) -> bool: class ProxyServer: """Manages TS proxy server instance with worker coordination""" - + def __init__(self): """Initialize proxy server with worker identification""" self.stream_managers = {} self.stream_buffers = {} self.client_managers = {} - + # Generate a unique worker ID import socket import os pid = os.getpid() hostname = socket.gethostname() self.worker_id = f"{hostname}:{pid}" - + # Connect to Redis self.redis_client = None try: import redis from django.conf import settings - + redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0') self.redis_client = redis.from_url(redis_url) logger.info(f"Connected to Redis at {redis_url}") @@ -1026,37 +1076,37 @@ class ProxyServer: except Exception as e: self.redis_client = None logger.error(f"Failed to connect to Redis: {e}") - + # Start cleanup thread self.cleanup_interval = getattr(Config, 'CLEANUP_INTERVAL', 60) self._start_cleanup_thread() - + # Start event listener for Redis pubsub messages self._start_event_listener() - + def _start_event_listener(self): """Listen for events from other workers""" if not self.redis_client: return - + def event_listener(): try: pubsub = self.redis_client.pubsub() pubsub.psubscribe("ts_proxy:events:*") - + logger.info("Started Redis event listener for client activity") - + for message in pubsub.listen(): if message["type"] != "pmessage": continue - + try: channel = message["channel"].decode("utf-8") data = json.loads(message["data"].decode("utf-8")) - + event_type = data.get("event") channel_id = data.get("channel_id") - + if channel_id and event_type: # For owner, update client status immediately if self.am_i_owner(channel_id): @@ -1066,7 +1116,7 @@ class ProxyServer: # RENAMED: no_clients_since → last_client_disconnect_time disconnect_key = f"ts_proxy:channel:{channel_id}:last_client_disconnect_time" self.redis_client.delete(disconnect_key) - + elif event_type == "client_disconnected": logger.debug(f"Owner received client_disconnected event for channel {channel_id}") # Check if any clients remain @@ -1074,27 +1124,27 @@ class ProxyServer: # VERIFY REDIS CLIENT COUNT DIRECTLY client_set_key = f"ts_proxy:channel:{channel_id}:clients" total = self.redis_client.scard(client_set_key) or 0 - + if total == 0: logger.debug(f"No clients left after disconnect event - stopping channel {channel_id}") # Set the disconnect timer for other workers to see disconnect_key = f"ts_proxy:channel:{channel_id}:last_client_disconnect_time" self.redis_client.setex(disconnect_key, 60, str(time.time())) - + # Get configured shutdown delay or default shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 0) - + if shutdown_delay > 0: logger.info(f"Waiting {shutdown_delay}s before stopping channel...") time.sleep(shutdown_delay) - + # Re-check client count before stopping total = self.redis_client.scard(client_set_key) or 0 if total > 0: logger.info(f"New clients connected during shutdown delay - aborting shutdown") self.redis_client.delete(disconnect_key) return - + # Stop the channel directly self.stop_channel(channel_id) @@ -1104,7 +1154,7 @@ class ProxyServer: # Handle stream switch request new_url = data.get("url") user_agent = data.get("user_agent") - + if new_url and channel_id in self.stream_managers: # Update metadata in Redis if self.redis_client: @@ -1112,18 +1162,18 @@ class ProxyServer: self.redis_client.hset(metadata_key, "url", new_url) if user_agent: self.redis_client.hset(metadata_key, "user_agent", user_agent) - + # Set switch status status_key = f"ts_proxy:channel:{channel_id}:switch_status" self.redis_client.set(status_key, "switching") - + # Perform the stream switch stream_manager = self.stream_managers[channel_id] success = stream_manager.update_url(new_url) - + if success: logger.info(f"Stream switch initiated for channel {channel_id}") - + # Publish confirmation switch_result = { "event": "stream_switched", @@ -1133,16 +1183,16 @@ class ProxyServer: "timestamp": time.time() } self.redis_client.publish( - f"ts_proxy:events:{channel_id}", + f"ts_proxy:events:{channel_id}", json.dumps(switch_result) ) - + # Update status if self.redis_client: self.redis_client.set(status_key, "switched") else: logger.error(f"Failed to switch stream for channel {channel_id}") - + # Publish failure switch_result = { "event": "stream_switched", @@ -1152,7 +1202,7 @@ class ProxyServer: "timestamp": time.time() } self.redis_client.publish( - f"ts_proxy:events:{channel_id}", + f"ts_proxy:events:{channel_id}", json.dumps(switch_result) ) except Exception as e: @@ -1162,7 +1212,7 @@ class ProxyServer: time.sleep(5) # Wait before reconnecting # Try to restart the listener self._start_event_listener() - + thread = threading.Thread(target=event_listener, daemon=True) thread.name = "redis-event-listener" thread.start() @@ -1171,7 +1221,7 @@ class ProxyServer: """Get the worker ID that owns this channel with proper error handling""" if not self.redis_client: return None - + try: lock_key = f"ts_proxy:channel:{channel_id}:owner" owner = self.redis_client.get(lock_key) @@ -1181,30 +1231,30 @@ class ProxyServer: except Exception as e: logger.error(f"Error getting channel owner: {e}") return None - + def am_i_owner(self, channel_id): """Check if this worker is the owner of the channel""" owner = self.get_channel_owner(channel_id) return owner == self.worker_id - + def try_acquire_ownership(self, channel_id, ttl=30): """Try to become the owner of this channel using proper locking""" if not self.redis_client: return True # If no Redis, always become owner - + try: # Create a lock key with proper namespace lock_key = f"ts_proxy:channel:{channel_id}:owner" - + # Use Redis SETNX for atomic locking - only succeeds if the key doesn't exist acquired = self.redis_client.setnx(lock_key, self.worker_id) - + # If acquired, set expiry to prevent orphaned locks if acquired: self.redis_client.expire(lock_key, ttl) logger.info(f"Worker {self.worker_id} acquired ownership of channel {channel_id}") return True - + # If not acquired, check if we already own it (might be a retry) current_owner = self.redis_client.get(lock_key) if current_owner and current_owner.decode('utf-8') == self.worker_id: @@ -1212,22 +1262,22 @@ class ProxyServer: self.redis_client.expire(lock_key, ttl) logger.info(f"Worker {self.worker_id} refreshed ownership of channel {channel_id}") return True - + # Someone else owns it return False - + except Exception as e: logger.error(f"Error acquiring channel ownership: {e}") return False - + def release_ownership(self, channel_id): """Release ownership of this channel safely""" if not self.redis_client: return - + try: lock_key = f"ts_proxy:channel:{channel_id}:owner" - + # Only delete if we're the current owner to prevent race conditions current = self.redis_client.get(lock_key) if current and current.decode('utf-8') == self.worker_id: @@ -1235,16 +1285,16 @@ class ProxyServer: logger.info(f"Released ownership of channel {channel_id}") except Exception as e: logger.error(f"Error releasing channel ownership: {e}") - + def extend_ownership(self, channel_id, ttl=30): """Extend ownership lease with grace period""" if not self.redis_client: return False - + try: - lock_key = f"ts_proxy:channel:{channel_id}:owner" + lock_key = f"ts_proxy:channel:{channel_id}:owner" current = self.redis_client.get(lock_key) - + # Only extend if we're still the owner if current and current.decode('utf-8') == self.worker_id: self.redis_client.expire(lock_key, ttl) @@ -1253,73 +1303,73 @@ class ProxyServer: except Exception as e: logger.error(f"Error extending ownership: {e}") return False - - def initialize_channel(self, url, channel_id, user_agent=None): + + def initialize_channel(self, url, channel_id, user_agent=None, transcode_cmd=[]): """Initialize a channel without redundant active key""" try: # Get channel URL from Redis if available channel_url = url channel_user_agent = user_agent - + # First check if channel metadata already exists existing_metadata = None metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - + if self.redis_client: existing_metadata = self.redis_client.hgetall(metadata_key) - + # If no url was passed, try to get from Redis if not url and existing_metadata: url_bytes = existing_metadata.get(b'url') if url_bytes: channel_url = url_bytes.decode('utf-8') - + ua_bytes = existing_metadata.get(b'user_agent') if ua_bytes: channel_user_agent = ua_bytes.decode('utf-8') - + # Check if channel is already owned current_owner = self.get_channel_owner(channel_id) - + # Exit early if another worker owns the channel if current_owner and current_owner != self.worker_id: logger.info(f"Channel {channel_id} already owned by worker {current_owner}") logger.info(f"This worker ({self.worker_id}) will read from Redis buffer only") - + # Create buffer but not stream manager buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) self.stream_buffers[channel_id] = buffer - + # Create client manager with channel_id and redis_client client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) self.client_managers[channel_id] = client_manager - + return True - + # Only continue with full initialization if URL is provided # or we can get it from Redis if not channel_url: logger.error(f"No URL available for channel {channel_id}") return False - + # Try to acquire ownership with Redis locking if not self.try_acquire_ownership(channel_id): # Another worker just acquired ownership logger.info(f"Another worker just acquired ownership of channel {channel_id}") - + # Create buffer but not stream manager buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) self.stream_buffers[channel_id] = buffer - + # Create client manager with channel_id and redis_client client_manager = ClientManager(channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id) self.client_managers[channel_id] = client_manager - + return True - + # We now own the channel - ONLY NOW should we set metadata with initializing state logger.info(f"Worker {self.worker_id} is now the owner of channel {channel_id}") - + if self.redis_client: # NOW create or update metadata with initializing state metadata = { @@ -1331,49 +1381,49 @@ class ProxyServer: } if channel_user_agent: metadata["user_agent"] = channel_user_agent - + # Set channel metadata self.redis_client.hset(metadata_key, mapping=metadata) self.redis_client.expire(metadata_key, 3600) # 1 hour TTL - + # Create stream buffer buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) logger.debug(f"Created StreamBuffer for channel {channel_id}") self.stream_buffers[channel_id] = buffer - + # Only the owner worker creates the actual stream manager - stream_manager = StreamManager(channel_url, buffer, user_agent=channel_user_agent) + stream_manager = StreamManager(channel_url, buffer, user_agent=channel_user_agent, transcode_cmd=transcode_cmd) logger.debug(f"Created StreamManager for channel {channel_id}") self.stream_managers[channel_id] = stream_manager - + # Create client manager with channel_id, redis_client AND worker_id client_manager = ClientManager( - channel_id=channel_id, + channel_id=channel_id, redis_client=self.redis_client, worker_id=self.worker_id ) self.client_managers[channel_id] = client_manager - + # Start stream manager thread only for the owner thread = threading.Thread(target=stream_manager.run, daemon=True) thread.name = f"stream-{channel_id}" thread.start() logger.info(f"Started stream manager thread for channel {channel_id}") - + # If we're the owner, we need to set the channel state rather than starting a grace period immediately if self.am_i_owner(channel_id): self.update_channel_state(channel_id, "connecting", { "init_time": str(time.time()), "owner": self.worker_id }) - + # Set connection attempt start time attempt_key = f"ts_proxy:channel:{channel_id}:connection_attempt_time" self.redis_client.setex(attempt_key, 60, str(time.time())) - - logger.info(f"Channel {channel_id} in connecting state - will start grace period after connection") + + logger.info(f"Channel {channel_id} in connecting state - will start grace period after connection") return True - + except Exception as e: logger.error(f"Error initializing channel {channel_id}: {e}", exc_info=True) # Release ownership on failure @@ -1385,40 +1435,40 @@ class ProxyServer: # Check local memory first if channel_id in self.stream_managers or channel_id in self.stream_buffers: return True - + # Check Redis using the standard key pattern if self.redis_client: # Primary check - look for channel metadata metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - + # If metadata exists, return true if self.redis_client.exists(metadata_key): return True - + # Additional checks if metadata doesn't exist additional_keys = [ f"ts_proxy:channel:{channel_id}:clients", f"ts_proxy:channel:{channel_id}:buffer:index", f"ts_proxy:channel:{channel_id}:owner" ] - + for key in additional_keys: if self.redis_client.exists(key): return True - + return False def stop_channel(self, channel_id): """Stop a channel with proper ownership handling""" try: logger.info(f"Stopping channel {channel_id}") - + # Only stop the actual stream manager if we're the owner if self.am_i_owner(channel_id): logger.info(f"This worker ({self.worker_id}) is the owner - closing provider connection") if channel_id in self.stream_managers: stream_manager = self.stream_managers[channel_id] - + # Signal thread to stop and close resources if hasattr(stream_manager, 'stop'): stream_manager.stop() @@ -1426,16 +1476,16 @@ class ProxyServer: stream_manager.running = False if hasattr(stream_manager, '_close_socket'): stream_manager._close_socket() - + # Wait for stream thread to finish stream_thread_name = f"stream-{channel_id}" stream_thread = None - + for thread in threading.enumerate(): if thread.name == stream_thread_name: stream_thread = thread break - + if stream_thread and stream_thread.is_alive(): logger.info(f"Waiting for stream thread to terminate") try: @@ -1445,27 +1495,27 @@ class ProxyServer: logger.warning(f"Stream thread did not terminate within timeout") except RuntimeError: logger.debug("Could not join stream thread (may be current thread)") - + # Release ownership self.release_ownership(channel_id) logger.info(f"Released ownership of channel {channel_id}") - + # Always clean up local resources if channel_id in self.stream_managers: del self.stream_managers[channel_id] logger.info(f"Removed stream manager for channel {channel_id}") - + if channel_id in self.stream_buffers: del self.stream_buffers[channel_id] logger.info(f"Removed stream buffer for channel {channel_id}") - + if channel_id in self.client_managers: del self.client_managers[channel_id] logger.info(f"Removed client manager for channel {channel_id}") - + # Clean up Redis keys self._clean_redis_keys(channel_id) - + return True except Exception as e: logger.error(f"Error stopping channel {channel_id}: {e}", exc_info=True) @@ -1474,18 +1524,18 @@ class ProxyServer: def check_inactive_channels(self): """Check for inactive channels (no clients) and stop them""" channels_to_stop = [] - + for channel_id, client_manager in self.client_managers.items(): if client_manager.get_client_count() == 0: channels_to_stop.append(channel_id) - + for channel_id in channels_to_stop: logger.info(f"Auto-stopping inactive channel {channel_id}") self.stop_channel(channel_id) def _cleanup_channel(self, channel_id: str) -> None: """Remove channel resources""" - for collection in [self.stream_managers, self.stream_buffers, + for collection in [self.stream_managers, self.stream_buffers, self.client_managers, self.fetch_threads]: collection.pop(channel_id, None) @@ -1504,7 +1554,7 @@ class ProxyServer: if self.am_i_owner(channel_id): # Extend ownership lease self.extend_ownership(channel_id) - + # Get channel state from metadata hash channel_state = "unknown" if self.redis_client: @@ -1518,11 +1568,11 @@ class ProxyServer: if channel_id in self.client_managers: client_manager = self.client_managers[channel_id] total_clients = client_manager.get_total_client_count() - + # Log client count periodically if time.time() % 30 < 1: # Every ~30 seconds logger.info(f"Channel {channel_id} has {total_clients} clients, state: {channel_state}") - + # If in connecting or waiting_for_clients state, check grace period if channel_state in ["connecting", "waiting_for_clients"]: # Get connection ready time from metadata @@ -1532,22 +1582,22 @@ class ProxyServer: connection_ready_time = float(metadata[b'connection_ready_time'].decode('utf-8')) except (ValueError, TypeError): pass - + # If still connecting, give it more time if channel_state == "connecting": logger.debug(f"Channel {channel_id} still connecting - not checking for clients yet") continue - + # If waiting for clients, check grace period if connection_ready_time: grace_period = getattr(Config, 'CHANNEL_INIT_GRACE_PERIOD', 20) time_since_ready = time.time() - connection_ready_time - + # Add this debug log - logger.debug(f"GRACE PERIOD CHECK: Channel {channel_id} in {channel_state} state, " + logger.debug(f"GRACE PERIOD CHECK: Channel {channel_id} in {channel_state} state, " f"time_since_ready={time_since_ready:.1f}s, grace_period={grace_period}s, " f"total_clients={total_clients}") - + if time_since_ready <= grace_period: # Still within grace period logger.debug(f"Channel {channel_id} in grace period - {time_since_ready:.1f}s of {grace_period}s elapsed") @@ -1572,7 +1622,7 @@ class ProxyServer: # Check if there's a pending no-clients timeout disconnect_key = f"ts_proxy:channel:{channel_id}:last_client_disconnect_time" disconnect_time = None - + if self.redis_client: disconnect_value = self.redis_client.get(disconnect_key) if disconnect_value: @@ -1580,9 +1630,9 @@ class ProxyServer: disconnect_time = float(disconnect_value.decode('utf-8')) except (ValueError, TypeError) as e: logger.error(f"Invalid disconnect time for channel {channel_id}: {e}") - + current_time = time.time() - + if not disconnect_time: # First time seeing zero clients, set timestamp if self.redis_client: @@ -1594,19 +1644,19 @@ class ProxyServer: self.stop_channel(channel_id) else: # Still in shutdown delay period - logger.debug(f"Channel {channel_id} shutdown timer: " - f"{current_time - disconnect_time:.1f}s of " + logger.debug(f"Channel {channel_id} shutdown timer: " + f"{current_time - disconnect_time:.1f}s of " f"{getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 5)}s elapsed") else: # There are clients or we're still connecting - clear any disconnect timestamp if self.redis_client: self.redis_client.delete(f"ts_proxy:channel:{channel_id}:last_client_disconnect_time") - + except Exception as e: logger.error(f"Error in cleanup thread: {e}", exc_info=True) - + time.sleep(getattr(Config, 'CLEANUP_CHECK_INTERVAL', 1)) - + thread = threading.Thread(target=cleanup_task, daemon=True) thread.name = "ts-proxy-cleanup" thread.start() @@ -1616,28 +1666,28 @@ class ProxyServer: """Check for orphaned channels in Redis (owner worker crashed)""" if not self.redis_client: return - + try: # Get all active channel keys channel_pattern = "ts_proxy:channel:*:metadata" channel_keys = self.redis_client.keys(channel_pattern) - + for key in channel_keys: try: channel_id = key.decode('utf-8').split(':')[2] - + # Skip channels we already have locally if channel_id in self.stream_buffers: continue - + # Check if this channel has an owner owner = self.get_channel_owner(channel_id) - + if not owner: # Check if there are any clients client_set_key = f"ts_proxy:channel:{channel_id}:clients" client_count = self.redis_client.scard(client_set_key) or 0 - + if client_count > 0: # Orphaned channel with clients - we could take ownership logger.info(f"Found orphaned channel {channel_id} with {client_count} clients") @@ -1647,7 +1697,7 @@ class ProxyServer: self._clean_redis_keys(channel_id) except Exception as e: logger.error(f"Error processing channel key {key}: {e}") - + except Exception as e: logger.error(f"Error checking orphaned channels: {e}") @@ -1655,16 +1705,18 @@ class ProxyServer: """Clean up all Redis keys for a channel""" if not self.redis_client: return - + try: + channel = Channel.objects.get(id=channel_id) + channel.release_stream() # All keys are now under the channel namespace for easy pattern matching channel_pattern = f"ts_proxy:channel:{channel_id}:*" all_keys = self.redis_client.keys(channel_pattern) - + if all_keys: self.redis_client.delete(*all_keys) logger.info(f"Cleaned up {len(all_keys)} Redis keys for channel {channel_id}") - + except Exception as e: logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}") @@ -1672,12 +1724,12 @@ class ProxyServer: """Refresh TTL for active channels using standard keys""" if not self.redis_client: return - + # Refresh registry entries for channels we own for channel_id in self.stream_managers.keys(): # Use standard key pattern metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - + # Update activity timestamp in metadata only self.redis_client.hset(metadata_key, "last_active", str(time.time())) self.redis_client.expire(metadata_key, 3600) # Reset TTL on metadata hash @@ -1686,39 +1738,37 @@ class ProxyServer: """Update channel state with proper history tracking and logging""" if not self.redis_client: return False - + try: metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - + # Get current state for logging current_state = None metadata = self.redis_client.hgetall(metadata_key) if metadata and b'state' in metadata: current_state = metadata[b'state'].decode('utf-8') - + # Only update if state is actually changing if current_state == new_state: logger.debug(f"Channel {channel_id} state unchanged: {current_state}") return True - + # Prepare update data update_data = { "state": new_state, "state_changed_at": str(time.time()) } - + # Add optional additional fields if additional_fields: update_data.update(additional_fields) - + # Update the metadata self.redis_client.hset(metadata_key, mapping=update_data) - + # Log the transition logger.info(f"Channel {channel_id} state transition: {current_state or 'None'} → {new_state}") return True except Exception as e: logger.error(f"Error updating channel state: {e}") return False - - diff --git a/apps/proxy/ts_proxy/urls.py b/apps/proxy/ts_proxy/urls.py index 77c22534..50cbaa9c 100644 --- a/apps/proxy/ts_proxy/urls.py +++ b/apps/proxy/ts_proxy/urls.py @@ -5,6 +5,5 @@ app_name = 'ts_proxy' urlpatterns = [ path('stream/', views.stream_ts, name='stream'), - path('initialize/', views.initialize_stream, name='initialize'), path('change_stream/', views.change_stream, name='change_stream'), -] \ No newline at end of file +] diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index 399e93c3..80a04021 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -4,12 +4,17 @@ import time import random import sys import os +import re from django.http import StreamingHttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods, require_GET +from django.shortcuts import get_object_or_404 from apps.proxy.config import TSConfig as Config from .server import ProxyServer from apps.proxy.ts_proxy.server import logging as server_logging +from apps.channels.models import Channel, Stream +from apps.m3u.models import M3UAccount, M3UAccountProfile +from core.models import UserAgent # Configure logging properly to ensure visibility logger = server_logging @@ -17,24 +22,14 @@ logger = server_logging # Initialize proxy server proxy_server = ProxyServer() -@csrf_exempt -@require_http_methods(["POST"]) -def initialize_stream(request, channel_id): +def initialize_stream(channel_id, url, user_agent, transcode_cmd): """Initialize a new stream channel with initialization-based ownership""" try: - data = json.loads(request.body) - url = data.get('url') - if not url: - return JsonResponse({'error': 'No URL provided'}, status=400) - - # Get optional user_agent from request - user_agent = data.get('user_agent') - # Try to acquire ownership and create connection - success = proxy_server.initialize_channel(url, channel_id, user_agent) + success = proxy_server.initialize_channel(url, channel_id, user_agent, transcode_cmd) if not success: - return JsonResponse({'error': 'Failed to initialize channel'}, status=500) - + return False + # If we're the owner, wait for connection if proxy_server.am_i_owner(channel_id): # Wait for connection to be established @@ -53,59 +48,84 @@ def initialize_stream(request, channel_id): 'error': 'Failed to connect' }, status=502) time.sleep(0.1) - + # Return success response with owner status - return JsonResponse({ - 'message': 'Stream initialized and connected', - 'channel': channel_id, - 'url': url, - 'owner': proxy_server.am_i_owner(channel_id) - }) - - except json.JSONDecodeError: - return JsonResponse({'error': 'Invalid JSON'}, status=400) + return True + except Exception as e: logger.error(f"Failed to initialize stream: {e}") - return JsonResponse({'error': str(e)}, status=500) + return False @require_GET def stream_ts(request, channel_id): """Stream TS data to client with single client registration""" + user_agent = None + logger.info(f"Fetching channel ID {channel_id}") + channel = get_object_or_404(Channel, pk=channel_id) + try: # Generate a unique client ID client_id = f"client_{int(time.time() * 1000)}_{random.randint(1000, 9999)}" logger.info(f"[{client_id}] Requested stream for channel {channel_id}") - - # Extract user agent only once - user_agent = None - for header in ['HTTP_USER_AGENT', 'User-Agent', 'user-agent']: - if header in request.META: - user_agent = request.META[header] - logger.debug(f"[{client_id}] Found user agent in header: {header}") - break - + # Check if channel exists or initialize it if not proxy_server.check_if_channel_exists(channel_id): - logger.error(f"[{client_id}] Channel {channel_id} not found") - return JsonResponse({'error': 'Channel not found'}, status=404) - + stream_id, profile_id = channel.get_stream() + if stream_id is None or profile_id is None: + return JsonResponse({'error': 'Channel not available'}, status=404) + + # Load in necessary objects for the stream + logger.info(f"Fetching stream ID {stream_id}") + stream = get_object_or_404(Stream, pk=stream_id) + logger.info(f"Fetching profile ID {profile_id}") + profile = get_object_or_404(M3UAccountProfile, pk=profile_id) + + # Load in the user-agent for the account + m3u_account = M3UAccount.objects.get(id=profile.m3u_account.id) + user_agent = UserAgent.objects.get(id=m3u_account.user_agent.id).user_agent + + # Generate stream URL based on the selected profile + input_url = stream.custom_url or stream.url + logger.debug("Executing the following pattern replacement:") + logger.debug(f" search: {profile.search_pattern}") + safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', profile.replace_pattern) + logger.debug(f" replace: {profile.replace_pattern}") + logger.debug(f" safe replace: {safe_replace_pattern}") + stream_url = re.sub(profile.search_pattern, safe_replace_pattern, input_url) + logger.debug(f"Generated stream url: {stream_url}") + + # Generate transcode command + # @TODO: once complete, provide option to direct proxy + transcode_cmd = channel.get_stream_profile().build_command(stream_url) + + if not initialize_stream(channel_id, stream_url, user_agent, transcode_cmd): + return JsonResponse({'error': 'Failed to initialize channel'}, status=500) + + if user_agent is None: + # Extract if not set + for header in ['HTTP_USER_AGENT', 'User-Agent', 'user-agent']: + if header in request.META: + user_agent = request.META[header] + logger.debug(f"[{client_id}] Found user agent in header: {header}") + break + # Wait for channel to become ready if it's initializing if proxy_server.redis_client: wait_start = time.time() max_wait = getattr(Config, 'CLIENT_WAIT_TIMEOUT', 30) # Maximum wait time in seconds - + # Check channel state metadata_key = f"ts_proxy:channel:{channel_id}:metadata" waiting = True - + while waiting and time.time() - wait_start < max_wait: metadata = proxy_server.redis_client.hgetall(metadata_key) if not metadata or b'state' not in metadata: logger.warning(f"[{client_id}] Channel {channel_id} metadata missing") break - + state = metadata[b'state'].decode('utf-8') - + # If channel is ready for clients, continue if state in ['waiting_for_clients', 'active']: logger.info(f"[{client_id}] Channel {channel_id} ready (state={state}), proceeding with connection") @@ -119,7 +139,7 @@ def stream_ts(request, channel_id): # Unknown or error state logger.warning(f"[{client_id}] Channel {channel_id} in unexpected state: {state}") break - + # Check if we timed out waiting if waiting and time.time() - wait_start >= max_wait: logger.warning(f"[{client_id}] Timeout waiting for channel {channel_id} to become ready") @@ -128,7 +148,7 @@ def stream_ts(request, channel_id): # CRITICAL FIX: Ensure local resources are properly initialized before streaming if channel_id not in proxy_server.stream_buffers or channel_id not in proxy_server.client_managers: logger.warning(f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now") - + # Get URL from Redis metadata url = None if proxy_server.redis_client: @@ -136,40 +156,40 @@ def stream_ts(request, channel_id): url_bytes = proxy_server.redis_client.hget(metadata_key, "url") if url_bytes: url = url_bytes.decode('utf-8') - + # Initialize local resources - pass the user_agent we extracted earlier success = proxy_server.initialize_channel(url, channel_id, user_agent) if not success: logger.error(f"[{client_id}] Failed to initialize channel {channel_id} locally") return JsonResponse({'error': 'Failed to initialize channel locally'}, status=500) - + logger.info(f"[{client_id}] Successfully initialized channel {channel_id} locally") - + # Get stream buffer and client manager buffer = proxy_server.stream_buffers[channel_id] client_manager = proxy_server.client_managers[channel_id] client_manager.add_client(client_id, user_agent) logger.info(f"[{client_id}] Client registered with channel {channel_id}") - + # Start stream response - def generate(): + def generate(): stream_start_time = time.time() bytes_sent = 0 chunks_sent = 0 - + try: # ENHANCED USER AGENT DETECTION - check multiple possible headers user_agent = None - + # Try multiple possible header formats ua_headers = ['HTTP_USER_AGENT', 'User-Agent', 'user-agent', 'User_Agent'] - + for header in ua_headers: if header in request.META: user_agent = request.META[header] logger.debug(f"Found user agent in header: {header}") break - + # Try request.headers dictionary (Django 2.2+) if not user_agent and hasattr(request, 'headers'): for header in ['User-Agent', 'user-agent']: @@ -177,7 +197,7 @@ def stream_ts(request, channel_id): user_agent = request.headers[header] logger.debug(f"Found user agent in request.headers: {header}") break - + # Final fallback - check if in any header with case-insensitive matching if not user_agent: for key, value in request.META.items(): @@ -185,63 +205,63 @@ def stream_ts(request, channel_id): user_agent = value logger.debug(f"Found user agent in alternate header: {key}") break - + # Log headers for debugging user agent issues if not user_agent: # Log all headers to help troubleshoot headers = {k: v for k, v in request.META.items() if k.startswith('HTTP_')} logger.debug(f"No user agent found in request. Available headers: {headers}") user_agent = "Unknown-Client" # Default value instead of None - + logger.info(f"[{client_id}] New client connected to channel {channel_id} with user agent: {user_agent}") - + # Add client to manager with user agent client_manager = proxy_server.client_managers[channel_id] client_count = client_manager.add_client(client_id, user_agent) - + # If this is the first client, try to acquire ownership if client_count == 1 and not proxy_server.am_i_owner(channel_id): if proxy_server.try_acquire_ownership(channel_id): logger.info(f"[{client_id}] First client, acquiring channel ownership") - + # Get channel metadata from Redis if proxy_server.redis_client: metadata_key = f"ts_proxy:channel:{channel_id}:metadata" url_bytes = proxy_server.redis_client.hget(metadata_key, "url") ua_bytes = proxy_server.redis_client.hget(metadata_key, "user_agent") - + url = url_bytes.decode('utf-8') if url_bytes else None user_agent = ua_bytes.decode('utf-8') if ua_bytes else None - + if url: # Create and start stream connection from .server import StreamManager # Import here to avoid circular import - + logger.info(f"[{client_id}] Creating stream connection for URL: {url}") buffer = proxy_server.stream_buffers[channel_id] - + stream_manager = StreamManager(url, buffer, user_agent=user_agent) proxy_server.stream_managers[channel_id] = stream_manager - + thread = threading.Thread(target=stream_manager.run, daemon=True) thread.name = f"stream-{channel_id}" thread.start() - + # Wait briefly for connection wait_start = time.time() while not stream_manager.connected: if time.time() - wait_start > Config.CONNECTION_TIMEOUT: break time.sleep(0.1) - + # Get buffer - stream manager may not exist in this worker buffer = proxy_server.stream_buffers.get(channel_id) stream_manager = proxy_server.stream_managers.get(channel_id) - + if not buffer: logger.error(f"[{client_id}] No buffer found for channel {channel_id}") return - + # Client state tracking - use config for initial position initial_behind = getattr(Config, 'INITIAL_BEHIND_CHUNKS', 10) current_buffer_index = buffer.index @@ -255,30 +275,30 @@ def stream_ts(request, channel_id): chunks_sent = 0 stream_start_time = time.time() consecutive_empty = 0 # Track consecutive empty reads - + # Timing parameters from config ts_packet_size = 188 - target_bitrate = Config.TARGET_BITRATE + target_bitrate = Config.TARGET_BITRATE packets_per_second = target_bitrate / (8 * ts_packet_size) - + logger.info(f"[{client_id}] Starting stream at index {local_index} (buffer at {buffer.index})") - + # Check if we're the owner worker is_owner_worker = proxy_server.am_i_owner(channel_id) if hasattr(proxy_server, 'am_i_owner') else True - + # Main streaming loop while True: # Get chunks at client's position using improved strategy chunks, next_index = get_client_data(buffer, local_index) - + if chunks: empty_reads = 0 consecutive_empty = 0 - + # Process and send chunks total_size = sum(len(c) for c in chunks) logger.debug(f"[{client_id}] Retrieved {len(chunks)} chunks ({total_size} bytes) from index {local_index+1} to {next_index}") - + # CRITICAL FIX: Actually send the chunks to the client for chunk in chunks: try: @@ -286,7 +306,7 @@ def stream_ts(request, channel_id): yield chunk bytes_sent += len(chunk) chunks_sent += 1 - + # Log every 100 chunks for visibility if chunks_sent % 100 == 0: elapsed = time.time() - stream_start_time @@ -295,7 +315,7 @@ def stream_ts(request, channel_id): except Exception as e: logger.error(f"[{client_id}] Error sending chunk to client: {e}") raise # Re-raise to exit the generator - + # Update index after successfully sending all chunks local_index = next_index last_yield_time = time.time() @@ -303,14 +323,14 @@ def stream_ts(request, channel_id): # No chunks available empty_reads += 1 consecutive_empty += 1 - + # Check if we're caught up to buffer head at_buffer_head = local_index >= buffer.index - + # If we're at buffer head and no data is coming, send keepalive # Only check stream manager health if it exists stream_healthy = stream_manager.healthy if stream_manager else True - + if at_buffer_head and not stream_healthy and consecutive_empty >= 5: # Create a null TS packet as keepalive (188 bytes filled with padding) # This prevents VLC from hitting EOF @@ -318,7 +338,7 @@ def stream_ts(request, channel_id): keepalive_packet[0] = 0x47 # Sync byte keepalive_packet[1] = 0x1F # PID high bits (null packet) keepalive_packet[2] = 0xFF # PID low bits (null packet) - + logger.debug(f"[{client_id}] Sending keepalive packet while waiting at buffer head") yield bytes(keepalive_packet) bytes_sent += len(keepalive_packet) @@ -329,12 +349,12 @@ def stream_ts(request, channel_id): # Standard wait sleep_time = min(0.1 * consecutive_empty, 1.0) # Progressive backoff up to 1s time.sleep(sleep_time) - + # Log empty reads periodically if empty_reads % 50 == 0: stream_status = "healthy" if (stream_manager and stream_manager.healthy) else "unknown" logger.debug(f"[{client_id}] Waiting for chunks beyond {local_index} (buffer at {buffer.index}, stream: {stream_status})") - + # CRITICAL FIX: Check for client disconnect during wait periods # Django/WSGI might not immediately detect disconnections, but we can check periodically if consecutive_empty > 10: # After some number of empty reads @@ -349,7 +369,7 @@ def stream_ts(request, channel_id): # Error reading from connection, likely closed logger.info(f"[{client_id}] Connection error, client likely disconnected") break - + # Disconnect after long inactivity # For non-owner workers, we're more lenient with timeout if time.time() - last_yield_time > Config.STREAM_TIMEOUT: @@ -360,25 +380,25 @@ def stream_ts(request, channel_id): # Non-owner worker without data for too long logger.warning(f"[{client_id}] Non-owner worker with no data for {Config.STREAM_TIMEOUT}s, disconnecting") break - + # ADD THIS: Check if worker has more recent chunks but still stuck # This can indicate the client is disconnected but we're not detecting it if consecutive_empty > 100 and buffer.index > local_index + 50: logger.warning(f"[{client_id}] Possible ghost client: buffer has advanced {buffer.index - local_index} chunks ahead but client stuck at {local_index}") break - + except Exception as e: logger.error(f"[{client_id}] Stream error: {e}", exc_info=True) finally: # Client cleanup elapsed = time.time() - stream_start_time local_clients = 0 - + if channel_id in proxy_server.client_managers: local_clients = proxy_server.client_managers[channel_id].remove_client(client_id) total_clients = proxy_server.client_managers[channel_id].get_total_client_count() logger.info(f"[{client_id}] Disconnected after {elapsed:.2f}s, {bytes_sent/1024:.1f}KB in {chunks_sent} chunks (local: {local_clients}, total: {total_clients})") - + # If no clients left and we're the owner, schedule shutdown using the config value if local_clients == 0 and proxy_server.am_i_owner(channel_id): logger.info(f"No local clients left for channel {channel_id}, scheduling shutdown") @@ -387,7 +407,7 @@ def stream_ts(request, channel_id): shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 5) logger.info(f"Waiting {shutdown_delay}s before checking if channel should be stopped") time.sleep(shutdown_delay) - + # After delay, check global client count if channel_id in proxy_server.client_managers: total = proxy_server.client_managers[channel_id].get_total_client_count() @@ -396,17 +416,17 @@ def stream_ts(request, channel_id): proxy_server.stop_channel(channel_id) else: logger.info(f"Not shutting down channel {channel_id}, {total} clients still connected") - + shutdown_thread = threading.Thread(target=delayed_shutdown) shutdown_thread.daemon = True shutdown_thread.start() - + response = StreamingHttpResponse( streaming_content=generate(), content_type='video/mp2t' ) return response - + except Exception as e: logger.error(f"Error in stream_ts: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) @@ -419,16 +439,16 @@ def change_stream(request, channel_id): data = json.loads(request.body) new_url = data.get('url') user_agent = data.get('user_agent') - + if not new_url: return JsonResponse({'error': 'No URL provided'}, status=400) - + logger.info(f"Attempting to change stream URL for channel {channel_id} to {new_url}") - + # Enhanced channel detection in_local_managers = channel_id in proxy_server.stream_managers in_local_buffers = channel_id in proxy_server.stream_buffers - + # First check Redis directly before using our wrapper method redis_keys = None if proxy_server.redis_client: @@ -437,17 +457,17 @@ def change_stream(request, channel_id): redis_keys = [k.decode('utf-8') for k in redis_keys] if redis_keys else [] except Exception as e: logger.error(f"Error checking Redis keys: {e}") - + # Now use our standard check channel_exists = proxy_server.check_if_channel_exists(channel_id) - + # Log detailed diagnostics logger.info(f"Channel {channel_id} diagnostics: " f"in_local_managers={in_local_managers}, " f"in_local_buffers={in_local_buffers}, " f"redis_keys_count={len(redis_keys) if redis_keys else 0}, " f"channel_exists={channel_exists}") - + if not channel_exists: # If channel doesn't exist but we found Redis keys, force initialize it if redis_keys: @@ -463,17 +483,17 @@ def change_stream(request, channel_id): 'redis_keys': redis_keys, } }, status=404) - + # Update metadata in Redis regardless of ownership - this ensures URL is updated # even if the owner worker is handling another request if proxy_server.redis_client: try: metadata_key = f"ts_proxy:channel:{channel_id}:metadata" - + # First check if the key exists and what type it is key_type = proxy_server.redis_client.type(metadata_key).decode('utf-8') logger.debug(f"Redis key {metadata_key} is of type: {key_type}") - + # Use the appropriate method based on the key type if key_type == 'hash': proxy_server.redis_client.hset(metadata_key, "url", new_url) @@ -492,21 +512,21 @@ def change_stream(request, channel_id): if user_agent: metadata["user_agent"] = user_agent proxy_server.redis_client.hset(metadata_key, mapping=metadata) - + # Set switch request flag to ensure all workers see it switch_key = f"ts_proxy:channel:{channel_id}:switch_request" proxy_server.redis_client.setex(switch_key, 30, new_url) # 30 second TTL - + logger.info(f"Updated metadata for channel {channel_id} in Redis") except Exception as e: logger.error(f"Error updating Redis metadata: {e}", exc_info=True) - + # If we're the owner, update directly if proxy_server.am_i_owner(channel_id) and channel_id in proxy_server.stream_managers: logger.info(f"This worker is the owner, changing stream URL for channel {channel_id}") manager = proxy_server.stream_managers[channel_id] old_url = manager.url - + # Update the stream result = manager.update_url(new_url) logger.info(f"Stream URL changed from {old_url} to {new_url}, result: {result}") @@ -517,7 +537,7 @@ def change_stream(request, channel_id): 'owner': True, 'worker_id': proxy_server.worker_id }) - + # If we're not the owner, publish an event for the owner to pick up else: logger.info(f"This worker is not the owner, requesting URL change via Redis PubSub") @@ -530,12 +550,12 @@ def change_stream(request, channel_id): "requester": proxy_server.worker_id, "timestamp": time.time() } - + proxy_server.redis_client.publish( - f"ts_proxy:events:{channel_id}", + f"ts_proxy:events:{channel_id}", json.dumps(switch_request) ) - + return JsonResponse({ 'message': 'Stream URL change requested', 'channel': channel_id, @@ -543,7 +563,7 @@ def change_stream(request, channel_id): 'owner': False, 'worker_id': proxy_server.worker_id }) - + except json.JSONDecodeError: return JsonResponse({'error': 'Invalid JSON'}, status=400) except Exception as e: @@ -557,10 +577,10 @@ def get_client_data(buffer, local_index): MAX_CHUNKS = 20 # Safety limit to prevent memory spikes TARGET_SIZE = 1024 * 1024 # Target ~1MB per response (typical media buffer) MAX_SIZE = 2 * 1024 * 1024 # Hard cap at 2MB - + # Calculate how far behind we are chunks_behind = buffer.index - local_index - + # Determine optimal chunk count if chunks_behind <= MIN_CHUNKS: # Not much data, retrieve what's available @@ -571,23 +591,23 @@ def get_client_data(buffer, local_index): else: # Way behind, retrieve MAX_CHUNKS to avoid memory pressure chunk_count = MAX_CHUNKS - + # Retrieve chunks chunks = buffer.get_chunks_exact(local_index, chunk_count) - + # Check total size total_size = sum(len(c) for c in chunks) - + # If we're under target and have more chunks available, get more if total_size < TARGET_SIZE and chunks_behind > chunk_count: # Calculate how many more chunks we can get additional = min(MAX_CHUNKS - chunk_count, chunks_behind - chunk_count) more_chunks = buffer.get_chunks_exact(local_index + chunk_count, additional) - + # Check if adding more would exceed MAX_SIZE additional_size = sum(len(c) for c in more_chunks) if total_size + additional_size <= MAX_SIZE: chunks.extend(more_chunks) chunk_count += additional - - return chunks, local_index + chunk_count \ No newline at end of file + + return chunks, local_index + chunk_count diff --git a/core/models.py b/core/models.py index 6f353126..2309fdf4 100644 --- a/core/models.py +++ b/core/models.py @@ -47,6 +47,14 @@ class StreamProfile(models.Model): def __str__(self): return self.profile_name + def build_command(self, stream_url): + cmd = [] + if self.command == "ffmpeg": + cmd = ["ffmpeg", "-i", stream_url] + self.parameters.split() + ["pipe:1"] + elif self.command == "streamlink": + cmd = ["streamlink", stream_url] + self.parameters.split() + + return cmd class CoreSettings(models.Model): key = models.CharField( diff --git a/core/utils.py b/core/utils.py new file mode 100644 index 00000000..07a51ff8 --- /dev/null +++ b/core/utils.py @@ -0,0 +1,9 @@ +import redis +from django.conf import settings + +# Global Redis connection (Singleton) +redis_client = redis.Redis( + host=getattr(settings, "REDIS_HOST", "localhost"), + port=6379, + db=int(getattr(settings, "REDIS_DB", "0")) +)