diff --git a/apps/accounts/api_views.py b/apps/accounts/api_views.py index f6b48e55..c4b544b0 100644 --- a/apps/accounts/api_views.py +++ b/apps/accounts/api_views.py @@ -22,7 +22,22 @@ class TokenObtainPairView(TokenObtainPairView): if not network_access_allowed(request, "UI"): return Response({"error": "Forbidden"}, status=status.HTTP_403_FORBIDDEN) - return super().post(request, *args, **kwargs) + # Get the response from the parent class first + response = super().post(request, *args, **kwargs) + + # If login was successful, update last_login + if response.status_code == 200: + username = request.data.get("username") + if username: + from django.utils import timezone + try: + user = User.objects.get(username=username) + user.last_login = timezone.now() + user.save(update_fields=['last_login']) + except User.DoesNotExist: + pass # User doesn't exist, but login somehow succeeded + + return response class TokenRefreshView(TokenRefreshView): @@ -87,6 +102,11 @@ class AuthViewSet(viewsets.ViewSet): if user: login(request, user) + # Update last_login timestamp + from django.utils import timezone + user.last_login = timezone.now() + user.save(update_fields=['last_login']) + return Response( { "message": "Login successful", diff --git a/apps/accounts/serializers.py b/apps/accounts/serializers.py index 5aa81f3e..865d29af 100644 --- a/apps/accounts/serializers.py +++ b/apps/accounts/serializers.py @@ -39,6 +39,14 @@ class UserSerializer(serializers.ModelSerializer): "password", "channel_profiles", "custom_properties", + "avatar_config", + "is_active", + "is_staff", + "is_superuser", + "last_login", + "date_joined", + "first_name", + "last_name", ] def create(self, validated_data): diff --git a/apps/proxy/config.py b/apps/proxy/config.py index ca246b78..9ce5b66c 100644 --- a/apps/proxy/config.py +++ b/apps/proxy/config.py @@ -57,6 +57,8 @@ class TSConfig(BaseConfig): INITIAL_BEHIND_CHUNKS = 4 # How many chunks behind to start a client (4 chunks = ~1MB) CHUNK_BATCH_SIZE = 5 # How many chunks to fetch in one batch KEEPALIVE_INTERVAL = 0.5 # Seconds between keepalive packets when at buffer head + # Chunk read timeout + CHUNK_TIMEOUT = 5 # Seconds to wait for each chunk read # Streaming settings TARGET_BITRATE = 8000000 # Target bitrate (8 Mbps) @@ -80,6 +82,8 @@ class TSConfig(BaseConfig): FAILOVER_GRACE_PERIOD = 20 # Extra time (seconds) to allow for stream switching before disconnecting clients URL_SWITCH_TIMEOUT = 20 # Max time allowed for a stream switch operation + + # Database-dependent settings with fallbacks @classmethod def get_channel_shutdown_delay(cls): diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 4699091a..da5daaa7 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -708,7 +708,7 @@ class ProxyServer: elif state in [ChannelState.STOPPING, ChannelState.STOPPED, ChannelState.ERROR]: # These states indicate the channel should be reinitialized logger.info(f"Channel {channel_id} exists but in terminal state: {state}") - return False + return True else: # Unknown or initializing state, check how long it's been in this state if b'state_changed_at' in metadata: diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index 64a764bf..f7c538c2 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -135,6 +135,37 @@ class StreamManager: return session + def _wait_for_existing_processes_to_close(self, timeout=5.0): + """Wait for existing processes/connections to fully close before establishing new ones""" + start_time = time.time() + + while time.time() - start_time < timeout: + # Check if transcode process is still running + if self.transcode_process and self.transcode_process.poll() is None: + logger.debug(f"Waiting for existing transcode process to terminate for channel {self.channel_id}") + gevent.sleep(0.1) + continue + + # Check if HTTP connections are still active + if self.current_response or self.current_session: + logger.debug(f"Waiting for existing HTTP connections to close for channel {self.channel_id}") + gevent.sleep(0.1) + continue + + # Check if socket is still active + if self.socket: + logger.debug(f"Waiting for existing socket to close for channel {self.channel_id}") + gevent.sleep(0.1) + continue + + # All processes/connections are closed + logger.debug(f"All existing processes closed for channel {self.channel_id}") + return True + + # Timeout reached + logger.warning(f"Timeout waiting for existing processes to close for channel {self.channel_id} after {timeout}s") + return False + def run(self): """Main execution loop using HTTP streaming with improved connection handling and stream switching""" # Add a stop flag to the class properties @@ -151,7 +182,7 @@ class StreamManager: health_thread = threading.Thread(target=self._monitor_health, daemon=True) health_thread.start() - logger.info(f"Starting stream for URL: {self.url}") + logger.info(f"Starting stream for URL: {self.url} for channel {self.channel_id}") # Main stream switching loop - we'll try different streams if needed while self.running and stream_switch_attempts <= max_stream_switches: @@ -162,11 +193,37 @@ class StreamManager: f"Resetting switching state.") self._reset_url_switching_state() + # NEW: Check for health monitor recovery requests + if hasattr(self, 'needs_reconnect') and self.needs_reconnect and not self.url_switching: + logger.info(f"Health monitor requested reconnect for channel {self.channel_id}") + self.needs_reconnect = False + + # Attempt reconnect without changing streams + if self._attempt_reconnect(): + logger.info(f"Health-requested reconnect successful for channel {self.channel_id}") + continue # Go back to main loop + else: + logger.warning(f"Health-requested reconnect failed, will try stream switch for channel {self.channel_id}") + self.needs_stream_switch = True + + if hasattr(self, 'needs_stream_switch') and self.needs_stream_switch and not self.url_switching: + logger.info(f"Health monitor requested stream switch for channel {self.channel_id}") + self.needs_stream_switch = False + + if self._try_next_stream(): + logger.info(f"Health-requested stream switch successful for channel {self.channel_id}") + stream_switch_attempts += 1 + self.retry_count = 0 # Reset retries for new stream + continue # Go back to main loop with new stream + else: + logger.error(f"Health-requested stream switch failed for channel {self.channel_id}") + # Continue with normal flow + # Check stream type before connecting stream_type = detect_stream_type(self.url) if self.transcode == False and stream_type == StreamType.HLS: - logger.info(f"Detected HLS stream: {self.url}") - logger.info(f"HLS streams will be handled with FFmpeg for now - future version will support HLS natively") + logger.info(f"Detected HLS stream: {self.url} for channel {self.channel_id}") + logger.info(f"HLS streams will be handled with FFmpeg for now - future version will support HLS natively for channel {self.channel_id}") # Enable transcoding for HLS streams self.transcode = True # We'll override the stream profile selection with ffmpeg in the transcoding section @@ -175,13 +232,13 @@ class StreamManager: self.retry_count = 0 url_failed = False if self.url_switching: - logger.debug("Skipping connection attempt during URL switch") + logger.debug(f"Skipping connection attempt during URL switch for channel {self.channel_id}") gevent.sleep(0.1) # REPLACE time.sleep(0.1) continue # Connection retry loop for current URL - while self.running and self.retry_count < self.max_retries and not url_failed: + while self.running and self.retry_count < self.max_retries and not url_failed and not self.needs_stream_switch: - logger.info(f"Connection attempt {self.retry_count + 1}/{self.max_retries} for URL: {self.url}") + logger.info(f"Connection attempt {self.retry_count + 1}/{self.max_retries} for URL: {self.url} for channel {self.channel_id}") # Handle connection based on whether we transcode or not connection_result = False @@ -203,8 +260,12 @@ class StreamManager: # This indicates we had a stable connection for a while before failing connection_duration = time.time() - connection_start_time stable_connection_threshold = 30 # 30 seconds threshold + + if self.needs_stream_switch: + logger.info(f"Stream needs to switch after {connection_duration:.1f} seconds for channel: {self.channel_id}") + break # Exit to switch streams if connection_duration > stable_connection_threshold: - logger.info(f"Stream was stable for {connection_duration:.1f} seconds, resetting switch attempts counter") + logger.info(f"Stream was stable for {connection_duration:.1f} seconds, resetting switch attempts counter for channel: {self.channel_id}") stream_switch_attempts = 0 # Connection failed or ended - decide what to do next @@ -219,15 +280,15 @@ class StreamManager: # If we've reached max retries, mark this URL as failed if self.retry_count >= self.max_retries: url_failed = True - logger.warning(f"Maximum retry attempts ({self.max_retries}) reached for URL: {self.url}") + logger.warning(f"Maximum retry attempts ({self.max_retries}) reached for URL: {self.url} for channel: {self.channel_id}") else: # Wait with exponential backoff before retrying timeout = min(.25 * self.retry_count, 3) # Cap at 3 seconds - logger.info(f"Reconnecting in {timeout} seconds... (attempt {self.retry_count}/{self.max_retries})") + logger.info(f"Reconnecting in {timeout} seconds... (attempt {self.retry_count}/{self.max_retries}) for channel: {self.channel_id}") gevent.sleep(timeout) # REPLACE time.sleep(timeout) except Exception as e: - logger.error(f"Connection error: {e}", exc_info=True) + logger.error(f"Connection error on channel: {self.channel_id}: {e}", exc_info=True) self.retry_count += 1 self.connected = False @@ -236,25 +297,25 @@ class StreamManager: else: # Wait with exponential backoff before retrying timeout = min(.25 * self.retry_count, 3) # Cap at 3 seconds - logger.info(f"Reconnecting in {timeout} seconds after error... (attempt {self.retry_count}/{self.max_retries})") + logger.info(f"Reconnecting in {timeout} seconds after error... (attempt {self.retry_count}/{self.max_retries}) for channel: {self.channel_id}") gevent.sleep(timeout) # REPLACE time.sleep(timeout) # If URL failed and we're still running, try switching to another stream if url_failed and self.running: - logger.info(f"URL {self.url} failed after {self.retry_count} attempts, trying next stream") + logger.info(f"URL {self.url} failed after {self.retry_count} attempts, trying next stream for channel: {self.channel_id}") # Try to switch to next stream switch_result = self._try_next_stream() if switch_result: # Successfully switched to a new stream, continue with the new URL stream_switch_attempts += 1 - logger.info(f"Successfully switched to new URL: {self.url} (switch attempt {stream_switch_attempts}/{max_stream_switches})") + logger.info(f"Successfully switched to new URL: {self.url} (switch attempt {stream_switch_attempts}/{max_stream_switches}) for channel: {self.channel_id}") # Reset retry count for the new stream - important for the loop to work correctly self.retry_count = 0 # Continue outer loop with new URL - DON'T add a break statement here else: # No more streams to try - logger.error(f"Failed to find alternative streams after {stream_switch_attempts} attempts") + logger.error(f"Failed to find alternative streams after {stream_switch_attempts} attempts for channel: {self.channel_id}") break elif not self.running: # Normal shutdown was requested @@ -278,7 +339,7 @@ class StreamManager: # Make sure transcode process is terminated if self.transcode_process_active: - logger.info("Ensuring transcode process is terminated in finally block") + logger.info(f"Ensuring transcode process is terminated in finally block for channel: {self.channel_id}") self._close_socket() # Close all connections @@ -315,7 +376,7 @@ class StreamManager: stop_key = RedisKeys.channel_stopping(self.channel_id) self.buffer.redis_client.setex(stop_key, 60, "true") except Exception as e: - logger.error(f"Failed to update channel state in Redis: {e}") + logger.error(f"Failed to update channel state in Redis: {e} for channel {self.channel_id}", exc_info=True) logger.info(f"Stream manager stopped for channel {self.channel_id}") @@ -323,6 +384,22 @@ class StreamManager: """Establish a connection using transcoding""" try: logger.debug(f"Building transcode command for channel {self.channel_id}") + + # Check if we already have a running transcode process + if self.transcode_process and self.transcode_process.poll() is None: + logger.info(f"Existing transcode process found for channel {self.channel_id}, closing before establishing new connection") + self._close_socket() + + # Wait for the process to fully terminate + if not self._wait_for_existing_processes_to_close(): + logger.error(f"Failed to close existing transcode process for channel {self.channel_id}") + return False + + # Also check for any lingering HTTP connections + if self.current_response or self.current_session: + logger.debug(f"Closing existing HTTP connections before establishing transcode connection for channel {self.channel_id}") + self._close_connection() + channel = get_stream_object(self.channel_id) # Use FFmpeg specifically for HLS streams @@ -334,13 +411,13 @@ class StreamManager: except StreamProfile.DoesNotExist: # Fall back to channel's profile if FFmpeg not found stream_profile = channel.get_stream_profile() - logger.warning("FFmpeg profile not found, using channel default profile") + logger.warning(f"FFmpeg profile not found, using channel default profile for channel: {self.channel_id}") else: stream_profile = channel.get_stream_profile() # Build and start transcode command self.transcode_cmd = stream_profile.build_command(self.url, self.user_agent) - logger.debug(f"Starting transcode process: {self.transcode_cmd}") + logger.debug(f"Starting transcode process: {self.transcode_cmd} for channel: {self.channel_id}") # Modified to capture stderr instead of discarding it self.transcode_process = subprocess.Popen( @@ -367,7 +444,7 @@ class StreamManager: return True except Exception as e: - logger.error(f"Error establishing transcode connection: {e}", exc_info=True) + logger.error(f"Error establishing transcode connection for channel: {self.channel_id}: {e}", exc_info=True) self._close_socket() return False @@ -516,25 +593,25 @@ class StreamManager: # Determine log level based on content if any(keyword in content_lower for keyword in ['error', 'failed', 'cannot', 'invalid', 'corrupt']): - logger.error(f"FFmpeg stderr: {content}") + logger.error(f"FFmpeg stderr for channel {self.channel_id}: {content}") elif any(keyword in content_lower for keyword in ['warning', 'deprecated', 'ignoring']): - logger.warning(f"FFmpeg stderr: {content}") + logger.warning(f"FFmpeg stderr for channel {self.channel_id}: {content}") elif content.startswith('frame=') or 'fps=' in content or 'speed=' in content: # Stats lines - log at trace level to avoid spam - logger.trace(f"FFmpeg stats: {content}") + logger.trace(f"FFmpeg stats for channel {self.channel_id}: {content}") elif any(keyword in content_lower for keyword in ['input', 'output', 'stream', 'video', 'audio']): # Stream info - log at info level - logger.info(f"FFmpeg info: {content}") + logger.info(f"FFmpeg info for channel {self.channel_id}: {content}") if content.startswith('Input #0'): # If it's input 0, parse stream info from .services.channel_service import ChannelService ChannelService.parse_and_store_stream_info(self.channel_id, content, "input") else: # Everything else at debug level - logger.debug(f"FFmpeg stderr: {content}") + logger.debug(f"FFmpeg stderr for channel {self.channel_id}: {content}") except Exception as e: - logger.error(f"Error logging stderr content: {e}") + logger.error(f"Error logging stderr content for channel {self.channel_id}: {e}") def _parse_ffmpeg_stats(self, stats_line): """Parse FFmpeg stats line and extract speed, fps, and bitrate""" @@ -576,7 +653,7 @@ class StreamManager: actual_fps_str = f"{actual_fps:.1f}" if actual_fps is not None else "N/A" ffmpeg_output_bitrate_str = f"{ffmpeg_output_bitrate:.1f}" if ffmpeg_output_bitrate is not None else "N/A" # Log the stats - logger.debug(f"FFmpeg stats - Speed: {ffmpeg_speed}x, FFmpeg FPS: {ffmpeg_fps}, " + logger.debug(f"FFmpeg stats for channel {self.channel_id}: - Speed: {ffmpeg_speed}x, FFmpeg FPS: {ffmpeg_fps}, " f"Actual FPS: {actual_fps_str}, " f"Output Bitrate: {ffmpeg_output_bitrate_str} kbps") # If we have a valid speed, check for buffering @@ -656,6 +733,21 @@ class StreamManager: try: logger.debug(f"Using TS Proxy to connect to stream: {self.url}") + # Check if we already have active HTTP connections + if self.current_response or self.current_session: + logger.info(f"Existing HTTP connection found for channel {self.channel_id}, closing before establishing new connection") + self._close_connection() + + # Wait for connections to fully close + if not self._wait_for_existing_processes_to_close(): + logger.error(f"Failed to close existing HTTP connections for channel {self.channel_id}") + return False + + # Also check for any lingering transcode processes + if self.transcode_process and self.transcode_process.poll() is None: + logger.debug(f"Closing existing transcode process before establishing HTTP connection for channel {self.channel_id}") + self._close_socket() + # Create new session for each connection attempt session = self._create_session() self.current_session = session @@ -671,7 +763,7 @@ class StreamManager: if response.status_code == 200: self.connected = True self.healthy = True - logger.info(f"Successfully connected to stream source") + logger.info(f"Successfully connected to stream source for channel {self.channel_id}") # Store connection start time for stability tracking self.connection_start_time = time.time() @@ -681,7 +773,7 @@ class StreamManager: return True else: - logger.error(f"Failed to connect to stream: HTTP {response.status_code}") + logger.error(f"Failed to connect to stream for channel {self.channel_id}: HTTP {response.status_code}") self._close_connection() return False except requests.exceptions.RequestException as e: @@ -689,7 +781,7 @@ class StreamManager: self._close_connection() return False except Exception as e: - logger.error(f"Error establishing HTTP connection: {e}", exc_info=True) + logger.error(f"Error establishing HTTP connection for channel {self.channel_id}: {e}", exc_info=True) self._close_connection() return False @@ -722,7 +814,7 @@ class StreamManager: try: if self.transcode: # Handle transcoded stream data - while self.running and self.connected: + while self.running and self.connected and not self.stop_requested and not self.needs_stream_switch: if self.fetch_chunk(): self.last_data_time = time.time() else: @@ -735,7 +827,7 @@ class StreamManager: try: for chunk in self.current_response.iter_content(chunk_size=self.chunk_size): # Check if we've been asked to stop - if self.stop_requested or self.url_switching: + if self.stop_requested or self.url_switching or self.needs_stream_switch: break if chunk: @@ -756,35 +848,35 @@ class StreamManager: self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) except (AttributeError, ConnectionError) as e: if self.stop_requested or self.url_switching: - logger.debug(f"Expected connection error during shutdown/URL switch: {e}") + logger.debug(f"Expected connection error during shutdown/URL switch for channel {self.channel_id}: {e}") else: - logger.error(f"Unexpected stream error: {e}") + logger.error(f"Unexpected stream error for channel {self.channel_id}: {e}") raise except Exception as e: - logger.error(f"Error processing stream data: {e}", exc_info=True) + logger.error(f"Error processing stream data for channel {self.channel_id}: {e}", exc_info=True) # If we exit the loop, connection is closed or failed self.connected = False def _close_all_connections(self): """Close all connection resources""" - if self.socket: + if self.socket or self.transcode_process: try: self._close_socket() except Exception as e: - logger.debug(f"Error closing socket: {e}") + logger.debug(f"Error closing socket for channel {self.channel_id}: {e}") if self.current_response: try: self.current_response.close() except Exception as e: - logger.debug(f"Error closing response: {e}") + logger.debug(f"Error closing response for channel {self.channel_id}: {e}") if self.current_session: try: self.current_session.close() except Exception as e: - logger.debug(f"Error closing session: {e}") + logger.debug(f"Error closing session for channel {self.channel_id}: {e}") # Clear references self.socket = None @@ -811,7 +903,7 @@ class StreamManager: if timer and timer.is_alive(): timer.cancel() except Exception as e: - logger.error(f"Error canceling buffer check timer: {e}") + logger.error(f"Error canceling buffer check timer for channel {self.channel_id}: {e}") self._buffer_check_timers.clear() @@ -844,7 +936,7 @@ class StreamManager: logger.info(f"URL unchanged: {new_url}") return False - logger.info(f"Switching stream URL from {self.url} to {new_url}") + logger.info(f"Switching stream URL from {self.url} to {new_url} for channel {self.channel_id}") # Import both models for proper resource management from apps.channels.models import Stream, Channel @@ -875,10 +967,10 @@ class StreamManager: try: # Check which type of connection we're using and close it properly if self.transcode or self.socket: - logger.debug("Closing transcode process before URL change") + logger.debug(f"Closing transcode process before URL change for channel {self.channel_id}") self._close_socket() else: - logger.debug("Closing HTTP connection before URL change") + logger.debug(f"Closing HTTP connection before URL change for channel {self.channel_id}") self._close_connection() # Update URL and reset connection state @@ -892,7 +984,7 @@ class StreamManager: self.current_stream_id = stream_id # Add stream ID to tried streams for proper tracking self.tried_stream_ids.add(stream_id) - logger.info(f"Updated stream ID from {old_stream_id} to {stream_id} for channel {self.buffer.channel_id}") + logger.info(f"Updated stream ID from {old_stream_id} to {stream_id} for channel {self.channel_id}") # Reset retry counter to allow immediate reconnect self.retry_count = 0 @@ -907,25 +999,27 @@ class StreamManager: return True except Exception as e: - logger.error(f"Error during URL update: {e}", exc_info=True) + logger.error(f"Error during URL update for channel {self.channel_id}: {e}", exc_info=True) return False finally: # CRITICAL FIX: Always reset the URL switching flag when done, whether successful or not self.url_switching = False - logger.info(f"Stream switch completed for channel {self.buffer.channel_id}") + logger.info(f"Stream switch completed for channel {self.channel_id}") def should_retry(self) -> bool: """Check if connection retry is allowed""" return self.retry_count < self.max_retries def _monitor_health(self): - """Monitor stream health and attempt recovery if needed""" + """Monitor stream health and set flags for the main loop to handle recovery""" consecutive_unhealthy_checks = 0 - health_recovery_attempts = 0 - reconnect_attempts = 0 - max_health_recovery_attempts = ConfigHelper.get('MAX_HEALTH_RECOVERY_ATTEMPTS', 2) - max_reconnect_attempts = ConfigHelper.get('MAX_RECONNECT_ATTEMPTS', 3) - min_stable_time = ConfigHelper.get('MIN_STABLE_TIME_BEFORE_RECONNECT', 30) # seconds + max_unhealthy_checks = 3 + + # Add flags for the main loop to check + self.needs_reconnect = False + self.needs_stream_switch = False + self.last_health_action_time = 0 + action_cooldown = 30 # Prevent rapid recovery attempts while self.running: try: @@ -934,48 +1028,43 @@ class StreamManager: timeout_threshold = getattr(Config, 'CONNECTION_TIMEOUT', 10) if inactivity_duration > timeout_threshold and self.connected: - # Mark unhealthy if no data for too long if self.healthy: - logger.warning(f"Stream unhealthy - no data for {inactivity_duration:.1f}s") + logger.warning(f"Stream unhealthy for channel {self.channel_id} - no data for {inactivity_duration:.1f}s") self.healthy = False - # Track consecutive unhealthy checks consecutive_unhealthy_checks += 1 - # After several unhealthy checks in a row, try recovery - if consecutive_unhealthy_checks >= 3 and health_recovery_attempts < max_health_recovery_attempts: - # Calculate how long the stream was stable before failing + # Only set flags if enough time has passed since last action + if (consecutive_unhealthy_checks >= max_unhealthy_checks and + now - self.last_health_action_time > action_cooldown): + + # Calculate stability to decide on action type connection_start_time = getattr(self, 'connection_start_time', 0) stable_time = self.last_data_time - connection_start_time if connection_start_time > 0 else 0 - if stable_time >= min_stable_time and reconnect_attempts < max_reconnect_attempts: - # Stream was stable for a while, try reconnecting first - logger.warning(f"Stream was stable for {stable_time:.1f}s before failing. " - f"Attempting reconnect {reconnect_attempts + 1}/{max_reconnect_attempts}") - reconnect_attempts += 1 - threading.Thread(target=self._attempt_reconnect, daemon=True).start() + if stable_time >= 30: # Stream was stable, try reconnect first + if not self.needs_reconnect: + logger.info(f"Setting reconnect flag for stable stream (stable for {stable_time:.1f}s) for channel {self.channel_id}") + self.needs_reconnect = True + self.last_health_action_time = now else: - # Stream was not stable long enough, or reconnects failed too many times - # Try switching to another stream - if reconnect_attempts > 0: - logger.warning(f"Reconnect attempts exhausted ({reconnect_attempts}/{max_reconnect_attempts}). " - f"Attempting stream switch recovery") - else: - logger.warning(f"Stream was only stable for {stable_time:.1f}s (<{min_stable_time}s). " - f"Skipping reconnect, attempting stream switch") + # Stream wasn't stable, suggest stream switch + if not self.needs_stream_switch: + logger.info(f"Setting stream switch flag for unstable stream (stable for {stable_time:.1f}s) for channel {self.channel_id}") + self.needs_stream_switch = True + self.last_health_action_time = now + + consecutive_unhealthy_checks = 0 # Reset after setting flag - health_recovery_attempts += 1 - reconnect_attempts = 0 # Reset for next time - threading.Thread(target=self._attempt_health_recovery, daemon=True).start() elif self.connected and not self.healthy: # Auto-recover health when data resumes - logger.info(f"Stream health restored") + logger.info(f"Stream health restored for channel {self.channel_id} - data resumed after {inactivity_duration:.1f}s") self.healthy = True consecutive_unhealthy_checks = 0 - health_recovery_attempts = 0 - reconnect_attempts = 0 + # Clear recovery flags when healthy again + self.needs_reconnect = False + self.needs_stream_switch = False - # If healthy, reset unhealthy counter (but keep other state) if self.healthy: consecutive_unhealthy_checks = 0 @@ -991,45 +1080,52 @@ class StreamManager: # Don't try to reconnect if we're already switching URLs if self.url_switching: - # Add timeout check to prevent permanent deadlock - if time.time() - self.url_switch_start_time > self.url_switch_timeout: - logger.warning(f"URL switching has been in progress too long ({time.time() - self.url_switch_start_time:.1f}s), " - f"resetting switching state and allowing reconnect") - self._reset_url_switching_state() - else: - logger.info("URL switching already in progress, skipping reconnect") - return False + logger.info(f"URL switching already in progress, skipping reconnect for channel {self.channel_id}") + return False - # Close existing connection - if self.transcode or self.socket: - self._close_socket() - else: - self._close_connection() + # Set a flag to prevent concurrent operations + if hasattr(self, 'reconnecting') and self.reconnecting: + logger.info(f"Reconnect already in progress, skipping for channel {self.channel_id}") + return False - self.connected = False + self.reconnecting = True - # Attempt to establish a new connection using the same URL - connection_result = False try: + # Close existing connection and wait for it to fully terminate + if self.transcode or self.socket: + logger.debug(f"Closing transcode process before reconnect for channel {self.channel_id}") + self._close_socket() + else: + logger.debug(f"Closing HTTP connection before reconnect for channel {self.channel_id}") + self._close_connection() + + # Wait for all processes to fully close before attempting reconnect + if not self._wait_for_existing_processes_to_close(): + logger.warning(f"Some processes may still be running during reconnect for channel {self.channel_id}") + + self.connected = False + + # Attempt to establish a new connection using the same URL + connection_result = False if self.transcode: connection_result = self._establish_transcode_connection() else: connection_result = self._establish_http_connection() if connection_result: - # Store connection start time to measure stability self.connection_start_time = time.time() logger.info(f"Reconnect successful for channel {self.channel_id}") return True else: logger.warning(f"Reconnect failed for channel {self.channel_id}") return False - except Exception as e: - logger.error(f"Error during reconnect: {e}", exc_info=True) - return False + + finally: + self.reconnecting = False except Exception as e: - logger.error(f"Error in reconnect attempt: {e}", exc_info=True) + logger.error(f"Error in reconnect attempt for channel {self.channel_id}: {e}", exc_info=True) + self.reconnecting = False return False def _attempt_health_recovery(self): @@ -1039,7 +1135,7 @@ class StreamManager: # Don't try to switch if we're already in the process of switching URLs if self.url_switching: - logger.info("URL switching already in progress, skipping health recovery") + logger.info(f"URL switching already in progress, skipping health recovery for channel {self.channel_id}") return # Try to switch to next stream @@ -1052,7 +1148,7 @@ class StreamManager: return False except Exception as e: - logger.error(f"Error in health recovery attempt: {e}", exc_info=True) + logger.error(f"Error in health recovery attempt for channel {self.channel_id}: {e}", exc_info=True) return False def _close_connection(self): @@ -1062,7 +1158,7 @@ class StreamManager: try: self.current_response.close() except Exception as e: - logger.debug(f"Error closing response: {e}") + logger.debug(f"Error closing response for channel {self.channel_id}: {e}") self.current_response = None # Close session if it exists @@ -1070,7 +1166,7 @@ class StreamManager: try: self.current_session.close() except Exception as e: - logger.debug(f"Error closing session: {e}") + logger.debug(f"Error closing session for channel {self.channel_id}: {e}") self.current_session = None def _close_socket(self): @@ -1084,7 +1180,7 @@ class StreamManager: try: self.socket.close() except Exception as e: - logger.debug(f"Error closing socket: {e}") + logger.debug(f"Error closing socket for channel {self.channel_id}: {e}") pass # Enhanced transcode process cleanup with more aggressive termination @@ -1099,21 +1195,21 @@ class StreamManager: self.transcode_process.wait(timeout=1.0) except subprocess.TimeoutExpired: # If it doesn't terminate quickly, kill it - logger.warning(f"Transcode process didn't terminate within timeout, killing forcefully") + logger.warning(f"Transcode process didn't terminate within timeout, killing forcefully for channel {self.channel_id}") self.transcode_process.kill() try: self.transcode_process.wait(timeout=1.0) except subprocess.TimeoutExpired: - logger.error(f"Failed to kill transcode process even with force") + logger.error(f"Failed to kill transcode process even with force for channel {self.channel_id}") except Exception as e: - logger.debug(f"Error terminating transcode process: {e}") + logger.debug(f"Error terminating transcode process for channel {self.channel_id}: {e}") # Final attempt: try to kill directly try: self.transcode_process.kill() except Exception as e: - logger.error(f"Final kill attempt failed: {e}") + logger.error(f"Final kill attempt failed for channel {self.channel_id}: {e}") self.transcode_process = None self.transcode_process_active = False # Reset the flag @@ -1125,7 +1221,7 @@ class StreamManager: self.buffer.redis_client.delete(transcode_key) logger.debug(f"Cleared transcode active flag for channel {self.channel_id}") except Exception as e: - logger.debug(f"Error clearing transcode flag: {e}") + logger.debug(f"Error clearing transcode flag for channel {self.channel_id}: {e}") self.socket = None self.connected = False # Cancel any remaining buffer check timers @@ -1135,31 +1231,47 @@ class StreamManager: timer.cancel() logger.debug(f"Cancelled buffer check timer during socket close for channel {self.channel_id}") except Exception as e: - logger.debug(f"Error canceling timer during socket close: {e}") + logger.debug(f"Error canceling timer during socket close for channel {self.channel_id}: {e}") self._buffer_check_timers = [] def fetch_chunk(self): - """Fetch data from socket with direct pass-through to buffer""" + """Fetch data from socket with timeout handling""" if not self.connected or not self.socket: return False try: - # Read data chunk - no need to align with TS packet size anymore - try: - # Try to read data chunk - if hasattr(self.socket, 'recv'): - chunk = self.socket.recv(Config.CHUNK_SIZE) # Standard socket - else: - chunk = self.socket.read(Config.CHUNK_SIZE) # SocketIO object + # Set timeout for chunk reads + chunk_timeout = ConfigHelper.get('CHUNK_TIMEOUT', 10) # Default 10 seconds - except AttributeError: - # Fall back to read() if recv() isn't available - chunk = self.socket.read(Config.CHUNK_SIZE) + try: + # Handle different socket types with timeout + if hasattr(self.socket, 'recv'): + # Standard socket - set timeout + original_timeout = self.socket.gettimeout() + self.socket.settimeout(chunk_timeout) + chunk = self.socket.recv(Config.CHUNK_SIZE) + self.socket.settimeout(original_timeout) # Restore original timeout + else: + # SocketIO object (transcode process stdout) - use select for timeout + import select + ready, _, _ = select.select([self.socket], [], [], chunk_timeout) + + if not ready: + # Timeout occurred + logger.debug(f"Chunk read timeout ({chunk_timeout}s) for channel {self.channel_id}") + return False + + chunk = self.socket.read(Config.CHUNK_SIZE) + + except socket.timeout: + # Socket timeout occurred + logger.debug(f"Socket timeout ({chunk_timeout}s) for channel {self.channel_id}") + return False if not chunk: # Connection closed by server - logger.warning("Server closed connection") + logger.warning(f"Server closed connection for channel {self.channel_id}") self._close_socket() self.connected = False return False @@ -1170,6 +1282,7 @@ class StreamManager: # Add directly to buffer without TS-specific processing success = self.buffer.add_chunk(chunk) + # Update last data timestamp in Redis if successful if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client: last_data_key = RedisKeys.last_data(self.buffer.channel_id) @@ -1247,7 +1360,7 @@ class StreamManager: else: logger.debug(f"Not changing state: channel {channel_id} already in {current_state} state") except Exception as e: - logger.error(f"Error setting waiting for clients state: {e}") + logger.error(f"Error setting waiting for clients state for channel {channel_id}: {e}") def _check_buffer_and_set_state(self): """Check buffer size and set state to waiting_for_clients when ready""" @@ -1282,7 +1395,7 @@ class StreamManager: return True # Return value to indicate check was successful except Exception as e: - logger.error(f"Error in buffer check: {e}") + logger.error(f"Error in buffer check for channel {self.channel_id}: {e}") return False def _try_next_stream(self): @@ -1326,7 +1439,7 @@ class StreamManager: stream_info = get_stream_info_for_switch(self.channel_id, stream_id) if 'error' in stream_info or not stream_info.get('url'): - logger.error(f"Error getting info for stream {stream_id}: {stream_info.get('error', 'No URL')}") + logger.error(f"Error getting info for stream {stream_id} for channel {self.channel_id}: {stream_info.get('error', 'No URL')}") return False # Update URL and user agent @@ -1339,7 +1452,7 @@ class StreamManager: # IMPORTANT: Just update the URL, don't stop the channel or release resources switch_result = self.update_url(new_url, stream_id, profile_id) if not switch_result: - logger.error(f"Failed to update URL for stream ID {stream_id}") + logger.error(f"Failed to update URL for stream ID {stream_id} for channel {self.channel_id}") return False # Update stream ID tracking @@ -1365,7 +1478,7 @@ class StreamManager: # Log the switch logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id} with M3U profile {profile_id}") - logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url}") + logger.info(f"Successfully switched to stream ID {stream_id} with URL {new_url} for channel {self.channel_id}") return True except Exception as e: diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index b9ba3e65..7192937d 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -83,15 +83,7 @@ def stream_ts(request, channel_id): if state_field in metadata: channel_state = metadata[state_field].decode("utf-8") - # IMPROVED: Check for *any* state that indicates initialization is in progress - active_states = [ - ChannelState.INITIALIZING, - ChannelState.CONNECTING, - ChannelState.WAITING_FOR_CLIENTS, - ChannelState.ACTIVE, - ChannelState.BUFFERING, - ] - if channel_state in active_states: + if channel_state: # Channel is being initialized or already active - no need for reinitialization needs_initialization = False logger.debug( @@ -132,7 +124,7 @@ def stream_ts(request, channel_id): logger.warning( f"[{client_id}] Channel {channel_id} in state {channel_state}, forcing cleanup" ) - proxy_server.stop_channel(channel_id) + ChannelService.stop_channel(channel_id) # Use max retry attempts and connection timeout from config max_retries = ConfigHelper.max_retries() diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 2d9f7fdc..412cf808 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -98,9 +98,11 @@ chmod +x /etc/profile.d/dispatcharr.sh pip install django-filter # Run init scripts -echo "Starting init process..." +echo "Starting user setup..." . /app/docker/init/01-user-setup.sh +echo "Setting up PostgreSQL..." . /app/docker/init/02-postgres.sh +echo "Starting init process..." . /app/docker/init/03-init-dispatcharr.sh # Start PostgreSQL diff --git a/frontend/src/components/FloatingVideo.jsx b/frontend/src/components/FloatingVideo.jsx index 7f1e1c53..8b131dd3 100644 --- a/frontend/src/components/FloatingVideo.jsx +++ b/frontend/src/components/FloatingVideo.jsx @@ -185,7 +185,12 @@ export default function FloatingVideo() { }, [isVisible, streamUrl]); // Modified hideVideo handler to clean up player first - const handleClose = () => { + const handleClose = (e) => { + // Prevent event propagation to avoid triggering drag events + if (e) { + e.stopPropagation(); + e.preventDefault(); + } safeDestroyPlayer(); // Small delay before hiding the video component to ensure cleanup is complete setTimeout(() => { @@ -215,8 +220,24 @@ export default function FloatingVideo() { }} > {/* Simple header row with a close button */} - - + + e.stopPropagation()} + onTouchStart={(e) => e.stopPropagation()} + style={{ + minHeight: '32px', + minWidth: '32px', + cursor: 'pointer', + touchAction: 'manipulation' + }} + /> {/* Video container with relative positioning for the overlay */} diff --git a/frontend/src/components/Sidebar.jsx b/frontend/src/components/Sidebar.jsx index 83bc2fc3..6d69e9e7 100644 --- a/frontend/src/components/Sidebar.jsx +++ b/frontend/src/components/Sidebar.jsx @@ -90,45 +90,45 @@ const Sidebar = ({ collapsed, toggleDrawer, drawerWidth, miniDrawerWidth }) => { const navItems = authUser && authUser.user_level == USER_LEVELS.ADMIN ? [ - { - label: 'Channels', - icon: , - path: '/channels', - badge: `(${Object.keys(channels).length})`, - }, - { - label: 'M3U & EPG Manager', - icon: , - path: '/sources', - }, - { label: 'TV Guide', icon: , path: '/guide' }, - { label: 'DVR', icon: