diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index 6f2ab137..833686f2 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -84,10 +84,10 @@ class StreamManager: return self.retry_count < self.max_retries def stop(self) -> None: - """Clean shutdown of stream manager""" + """Stop the stream manager and close all resources""" self.running = False - if self.socket: - self.socket.close() + self._close_socket() + logging.info("Stream manager resources released") def _process_complete_packets(self): """Process TS packets with detailed logging""" @@ -176,6 +176,8 @@ class StreamManager: health_thread = threading.Thread(target=self._monitor_health, daemon=True) health_thread.start() + current_response = None # Track the current response object + # Establish network connection import socket import requests @@ -192,6 +194,8 @@ class StreamManager: try: # Create an initial connection to get socket response = session.get(self.url, stream=True) + current_response = response # Store reference for cleanup + if response.status_code == 200: self.connected = True self.socket = response.raw._fp.fp.raw @@ -201,43 +205,48 @@ class StreamManager: # Main fetch loop while self.running and self.connected: if self.fetch_chunk(): - self.last_data_time = time.time() # Update last data time + self.last_data_time = time.time() else: if not self.running: break - # Short sleep to prevent CPU spinning time.sleep(0.1) - else: logging.error(f"Failed to connect to stream: HTTP {response.status_code}") - time.sleep(2) # Wait before retry - + time.sleep(2) finally: + # Properly close response before session + if current_response: + try: + # Close the response explicitly to avoid the urllib3 error + current_response.close() + except Exception as e: + logging.debug(f"Error closing response: {e}") + current_response = None session.close() else: - # Direct socket connection (UDP/TCP) logging.error(f"Unsupported URL scheme: {self.url}") - - # If we're still running but not connected, retry + + # Connection retry logic if self.running and not self.connected: self.retry_count += 1 if self.retry_count > self.max_retries: logging.error(f"Maximum retry attempts ({self.max_retries}) exceeded") break - timeout = min(2 ** self.retry_count, 30) # Exponential backoff + timeout = min(2 ** self.retry_count, 30) logging.info(f"Reconnecting in {timeout} seconds... (attempt {self.retry_count})") time.sleep(timeout) except Exception as e: logging.error(f"Connection error: {e}") self._close_socket() - time.sleep(5) # Wait before retry + time.sleep(5) except Exception as e: logging.error(f"Stream error: {e}") self._close_socket() finally: + # Final cleanup self._close_socket() logging.info("Stream manager stopped") @@ -273,7 +282,8 @@ class StreamManager: if self.socket: try: self.socket.close() - except Exception: + except Exception as e: + logging.debug(f"Error closing socket: {e}") pass self.socket = None self.connected = False @@ -795,34 +805,58 @@ class ProxyServer: return False def stop_channel(self, channel_id): - """Stop a channel and clean up resources""" + """Stop a channel and clean up resources with improved shutdown sequence""" try: logging.info(f"Stopping channel {channel_id}") - # Stop the stream manager + # Stop the stream manager first + stream_manager = None if channel_id in self.stream_managers: - manager = self.stream_managers[channel_id] - manager.running = False + stream_manager = self.stream_managers[channel_id] + # Signal thread to stop and close network resources + if hasattr(stream_manager, 'stop'): + stream_manager.stop() + else: + stream_manager.running = False + # Close any socket connection - if hasattr(manager, '_close_socket'): - manager._close_socket() - - # Remove from managers dict + if hasattr(stream_manager, '_close_socket'): + stream_manager._close_socket() + + # Now look for the thread and wait for it to finish + stream_thread_name = f"stream-{channel_id}" + stream_thread = None + + for thread in threading.enumerate(): + if thread.name == stream_thread_name: + stream_thread = thread + break + + if stream_thread and stream_thread.is_alive(): + logging.debug(f"Waiting for stream thread to terminate gracefully") + try: + # Very short timeout to prevent hanging the app + stream_thread.join(timeout=1.0) + except RuntimeError: + logging.debug("Could not join stream thread (may be current thread)") + + # Now it's safe to clean up the objects + if channel_id in self.stream_managers: del self.stream_managers[channel_id] logging.info(f"Removed stream manager for channel {channel_id}") - # Clean up buffer +# Clean up buffer if channel_id in self.stream_buffers: del self.stream_buffers[channel_id] logging.info(f"Removed stream buffer for channel {channel_id}") - # Clean up client manager +# Clean up client manager if channel_id in self.client_managers: del self.client_managers[channel_id] logging.info(f"Removed client manager for channel {channel_id}") - # Remove Redis activity key + # Clean up Redis if self.redis_client: activity_key = f"ts_proxy:active_channel:{channel_id}" self.redis_client.delete(activity_key)