Gracefully handles errors during channel cleanup.

This commit is contained in:
SergeantPanda 2025-03-09 16:43:38 -05:00
parent 47c6346a6b
commit bfe4f95df2

View file

@ -84,10 +84,10 @@ class StreamManager:
return self.retry_count < self.max_retries
def stop(self) -> None:
"""Clean shutdown of stream manager"""
"""Stop the stream manager and close all resources"""
self.running = False
if self.socket:
self.socket.close()
self._close_socket()
logging.info("Stream manager resources released")
def _process_complete_packets(self):
"""Process TS packets with detailed logging"""
@ -176,6 +176,8 @@ class StreamManager:
health_thread = threading.Thread(target=self._monitor_health, daemon=True)
health_thread.start()
current_response = None # Track the current response object
# Establish network connection
import socket
import requests
@ -192,6 +194,8 @@ class StreamManager:
try:
# Create an initial connection to get socket
response = session.get(self.url, stream=True)
current_response = response # Store reference for cleanup
if response.status_code == 200:
self.connected = True
self.socket = response.raw._fp.fp.raw
@ -201,43 +205,48 @@ class StreamManager:
# Main fetch loop
while self.running and self.connected:
if self.fetch_chunk():
self.last_data_time = time.time() # Update last data time
self.last_data_time = time.time()
else:
if not self.running:
break
# Short sleep to prevent CPU spinning
time.sleep(0.1)
else:
logging.error(f"Failed to connect to stream: HTTP {response.status_code}")
time.sleep(2) # Wait before retry
time.sleep(2)
finally:
# Properly close response before session
if current_response:
try:
# Close the response explicitly to avoid the urllib3 error
current_response.close()
except Exception as e:
logging.debug(f"Error closing response: {e}")
current_response = None
session.close()
else:
# Direct socket connection (UDP/TCP)
logging.error(f"Unsupported URL scheme: {self.url}")
# If we're still running but not connected, retry
# Connection retry logic
if self.running and not self.connected:
self.retry_count += 1
if self.retry_count > self.max_retries:
logging.error(f"Maximum retry attempts ({self.max_retries}) exceeded")
break
timeout = min(2 ** self.retry_count, 30) # Exponential backoff
timeout = min(2 ** self.retry_count, 30)
logging.info(f"Reconnecting in {timeout} seconds... (attempt {self.retry_count})")
time.sleep(timeout)
except Exception as e:
logging.error(f"Connection error: {e}")
self._close_socket()
time.sleep(5) # Wait before retry
time.sleep(5)
except Exception as e:
logging.error(f"Stream error: {e}")
self._close_socket()
finally:
# Final cleanup
self._close_socket()
logging.info("Stream manager stopped")
@ -273,7 +282,8 @@ class StreamManager:
if self.socket:
try:
self.socket.close()
except Exception:
except Exception as e:
logging.debug(f"Error closing socket: {e}")
pass
self.socket = None
self.connected = False
@ -795,34 +805,58 @@ class ProxyServer:
return False
def stop_channel(self, channel_id):
"""Stop a channel and clean up resources"""
"""Stop a channel and clean up resources with improved shutdown sequence"""
try:
logging.info(f"Stopping channel {channel_id}")
# Stop the stream manager
# Stop the stream manager first
stream_manager = None
if channel_id in self.stream_managers:
manager = self.stream_managers[channel_id]
manager.running = False
stream_manager = self.stream_managers[channel_id]
# Signal thread to stop and close network resources
if hasattr(stream_manager, 'stop'):
stream_manager.stop()
else:
stream_manager.running = False
# Close any socket connection
if hasattr(manager, '_close_socket'):
manager._close_socket()
# Remove from managers dict
if hasattr(stream_manager, '_close_socket'):
stream_manager._close_socket()
# Now look for the thread and wait for it to finish
stream_thread_name = f"stream-{channel_id}"
stream_thread = None
for thread in threading.enumerate():
if thread.name == stream_thread_name:
stream_thread = thread
break
if stream_thread and stream_thread.is_alive():
logging.debug(f"Waiting for stream thread to terminate gracefully")
try:
# Very short timeout to prevent hanging the app
stream_thread.join(timeout=1.0)
except RuntimeError:
logging.debug("Could not join stream thread (may be current thread)")
# Now it's safe to clean up the objects
if channel_id in self.stream_managers:
del self.stream_managers[channel_id]
logging.info(f"Removed stream manager for channel {channel_id}")
# Clean up buffer
# Clean up buffer
if channel_id in self.stream_buffers:
del self.stream_buffers[channel_id]
logging.info(f"Removed stream buffer for channel {channel_id}")
# Clean up client manager
# Clean up client manager
if channel_id in self.client_managers:
del self.client_managers[channel_id]
logging.info(f"Removed client manager for channel {channel_id}")
# Remove Redis activity key
# Clean up Redis
if self.redis_client:
activity_key = f"ts_proxy:active_channel:{channel_id}"
self.redis_client.delete(activity_key)