mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
merged in sergeantpanda upstream
This commit is contained in:
commit
a58993ba7d
3 changed files with 346 additions and 153 deletions
|
|
@ -155,7 +155,8 @@ class StreamManager:
|
|||
if "'NoneType' object has no attribute 'read'" in str(e):
|
||||
logger.warning(f"Connection closed by server (read {chunk_count} chunks before disconnect)")
|
||||
else:
|
||||
# Re-raise unexpected AttributeError
|
||||
# Re-raise unexpected AttributeErrors
|
||||
logger.error(f"Unexpected AttributeError: {e}")
|
||||
raise
|
||||
else:
|
||||
logger.error(f"Failed to connect to stream: HTTP {response.status_code}")
|
||||
|
|
@ -200,11 +201,15 @@ class StreamManager:
|
|||
|
||||
except Exception as e:
|
||||
logger.error(f"Stream error: {e}", exc_info=True)
|
||||
if self.socket is not None:
|
||||
self._close_socket()
|
||||
finally:
|
||||
self.connected = False
|
||||
|
||||
if self.socket:
|
||||
try:
|
||||
self._close_socket()
|
||||
except:
|
||||
pass
|
||||
|
||||
if self.current_response:
|
||||
try:
|
||||
self.current_response.close()
|
||||
|
|
@ -217,9 +222,6 @@ class StreamManager:
|
|||
except:
|
||||
pass
|
||||
|
||||
if self.socket:
|
||||
self._close_socket()
|
||||
|
||||
logger.info("Stream manager stopped")
|
||||
|
||||
def stop(self):
|
||||
|
|
@ -270,21 +272,14 @@ class StreamManager:
|
|||
while self.running:
|
||||
try:
|
||||
now = time.time()
|
||||
if now - self.last_data_time > 10 and self.connected:
|
||||
# No data for 10 seconds, mark as unhealthy
|
||||
if now - self.last_data_time > getattr(Config, 'CONNECTION_TIMEOUT', 10) and self.connected:
|
||||
# Mark unhealthy if no data for too long
|
||||
if self.healthy:
|
||||
logger.warning("Stream health check: No data received for 10+ seconds")
|
||||
logger.warning(f"Stream unhealthy - no data for {now - self.last_data_time:.1f}s")
|
||||
self.healthy = False
|
||||
|
||||
# After 30 seconds with no data, force reconnection
|
||||
if now - self.last_data_time > 30:
|
||||
logger.warning("Stream appears dead, forcing reconnection")
|
||||
self._close_socket()
|
||||
self.connected = False
|
||||
self.last_data_time = time.time() # Reset timer for the reconnect
|
||||
elif self.connected and not self.healthy:
|
||||
# Stream is receiving data again after being unhealthy
|
||||
logger.info("Stream health restored, receiving data again")
|
||||
# Auto-recover health when data resumes
|
||||
logger.info("Stream health restored")
|
||||
self.healthy = True
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -326,8 +321,13 @@ class StreamManager:
|
|||
self.connected = False
|
||||
|
||||
if self.transcode_process:
|
||||
self.transcode_process.terminate()
|
||||
self.transcode_process.wait()
|
||||
try:
|
||||
self.transcode_process.terminate()
|
||||
self.transcode_process.wait()
|
||||
except Exception as e:
|
||||
logging.debug(f"Error terminating transcode process: {e}")
|
||||
pass
|
||||
|
||||
self.transcode_process = None
|
||||
|
||||
def fetch_chunk(self):
|
||||
|
|
@ -417,37 +417,6 @@ class StreamManager:
|
|||
except Exception as e:
|
||||
logger.error(f"Error setting waiting for clients state: {e}")
|
||||
|
||||
def _read_stream(self):
|
||||
"""Read from stream with minimal processing"""
|
||||
try:
|
||||
# Read up to CHUNK_SIZE bytes
|
||||
chunk = self.sock.recv(self.CHUNK_SIZE)
|
||||
|
||||
if not chunk:
|
||||
# Connection closed
|
||||
logger.debug("Connection closed by remote host")
|
||||
return False
|
||||
|
||||
# If we got data, just add it directly to the buffer
|
||||
if chunk:
|
||||
success = self.buffer.add_chunk(chunk)
|
||||
|
||||
# Update last data timestamp in Redis if successful
|
||||
if success and hasattr(self.buffer, 'redis_client') and self.buffer.redis_client:
|
||||
last_data_key = f"ts_proxy:channel:{self.buffer.channel_id}:last_data"
|
||||
self.buffer.redis_client.set(last_data_key, str(time.time()), ex=60) # 1 minute expiry
|
||||
|
||||
return success
|
||||
return True
|
||||
|
||||
except socket.timeout:
|
||||
# Expected timeout - no data available
|
||||
return True
|
||||
except Exception as e:
|
||||
# Error reading from socket
|
||||
logger.error(f"Error reading from stream: {e}")
|
||||
return False
|
||||
|
||||
class StreamBuffer:
|
||||
"""Manages stream data buffering with optimized chunk storage"""
|
||||
|
||||
|
|
@ -728,8 +697,7 @@ class ClientManager:
|
|||
# Check if client exists in Redis at all
|
||||
exists = self.redis_client.exists(client_key)
|
||||
if not exists:
|
||||
# Client entry has expired in Redis but still in our local set
|
||||
logger.warning(f"Found ghost client {client_id} - expired in Redis but still in local set")
|
||||
logger.debug(f"Client {client_id} no longer exists in Redis, removing locally")
|
||||
clients_to_remove.add(client_id)
|
||||
continue
|
||||
|
||||
|
|
@ -737,13 +705,10 @@ class ClientManager:
|
|||
last_active = self.redis_client.hget(client_key, "last_active")
|
||||
if last_active:
|
||||
last_active_time = float(last_active.decode('utf-8'))
|
||||
time_since_activity = current_time - last_active_time
|
||||
ghost_timeout = self.heartbeat_interval * getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0)
|
||||
|
||||
# If client hasn't been active for too long, mark for removal
|
||||
# Use configurable threshold for detection
|
||||
ghost_threshold = getattr(Config, 'GHOST_CLIENT_MULTIPLIER', 5.0)
|
||||
if time_since_activity > self.heartbeat_interval * ghost_threshold:
|
||||
logger.warning(f"Detected ghost client {client_id} - last active {time_since_activity:.1f}s ago")
|
||||
if current_time - last_active_time > ghost_timeout:
|
||||
logger.debug(f"Client {client_id} inactive for {current_time - last_active_time:.1f}s, removing as ghost")
|
||||
clients_to_remove.add(client_id)
|
||||
|
||||
# Remove ghost clients in a separate step
|
||||
|
|
@ -764,8 +729,8 @@ class ClientManager:
|
|||
|
||||
# Skip if we just sent a heartbeat recently
|
||||
if client_id in self.last_heartbeat_time:
|
||||
time_since_last = current_time - self.last_heartbeat_time[client_id]
|
||||
if time_since_last < self.heartbeat_interval * 0.8:
|
||||
time_since_heartbeat = current_time - self.last_heartbeat_time[client_id]
|
||||
if time_since_heartbeat < self.heartbeat_interval * 0.5: # Only heartbeat at half interval minimum
|
||||
continue
|
||||
|
||||
# Only update clients that remain
|
||||
|
|
@ -965,88 +930,6 @@ class ClientManager:
|
|||
except Exception as e:
|
||||
logger.error(f"Error refreshing client TTL: {e}")
|
||||
|
||||
class StreamFetcher:
|
||||
"""Handles stream data fetching"""
|
||||
|
||||
def __init__(self, manager: StreamManager, buffer: StreamBuffer):
|
||||
self.manager = manager
|
||||
self.buffer = buffer
|
||||
|
||||
def fetch_loop(self) -> None:
|
||||
"""Main fetch loop for stream data"""
|
||||
while self.manager.running:
|
||||
try:
|
||||
if not self._handle_connection():
|
||||
continue
|
||||
|
||||
with self.manager.session.get(self.manager.url, stream=True) as response:
|
||||
if response.status_code == 200:
|
||||
self._handle_successful_connection()
|
||||
self._process_stream(response)
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self._handle_connection_error(e)
|
||||
|
||||
def _handle_connection(self) -> bool:
|
||||
"""Handle connection state and retries"""
|
||||
if not self.manager.connected:
|
||||
if not self.manager.should_retry():
|
||||
logger.error(f"Failed to connect after {self.manager.max_retries} attempts")
|
||||
return False
|
||||
|
||||
if not self.manager.running:
|
||||
return False
|
||||
|
||||
self.manager.retry_count += 1
|
||||
logger.info(f"Connecting to stream: {self.manager.url} "
|
||||
f"(attempt {self.manager.retry_count}/{self.manager.max_retries})")
|
||||
return True
|
||||
|
||||
def _handle_successful_connection(self) -> None:
|
||||
"""Handle successful stream connection"""
|
||||
if not self.manager.connected:
|
||||
logger.info("Stream connected successfully")
|
||||
self.manager.connected = True
|
||||
self.manager.retry_count = 0
|
||||
|
||||
def _process_stream(self, response: requests.Response) -> None:
|
||||
"""Process incoming stream data"""
|
||||
for chunk in response.iter_content(chunk_size=Config.CHUNK_SIZE):
|
||||
if not self.manager.running:
|
||||
logger.info("Stream fetch stopped - shutting down")
|
||||
return
|
||||
|
||||
if chunk:
|
||||
if self.manager.ready_event.is_set():
|
||||
logger.info("Stream switch in progress, closing connection")
|
||||
self.manager.ready_event.clear()
|
||||
break
|
||||
|
||||
with self.buffer.lock:
|
||||
self.buffer.buffer.append(chunk)
|
||||
self.buffer.index += 1
|
||||
|
||||
def _handle_connection_error(self, error: Exception) -> None:
|
||||
"""Handle stream connection errors"""
|
||||
logger.error(f"Stream connection error: {error}")
|
||||
self.manager.connected = False
|
||||
|
||||
if not self.manager.running:
|
||||
return
|
||||
|
||||
logger.info(f"Attempting to reconnect in {Config.RECONNECT_DELAY} seconds...")
|
||||
if not wait_for_running(self.manager, Config.RECONNECT_DELAY):
|
||||
return
|
||||
|
||||
def wait_for_running(manager: StreamManager, delay: float) -> bool:
|
||||
"""Wait while checking manager running state"""
|
||||
start = time.time()
|
||||
while time.time() - start < delay:
|
||||
if not manager.running:
|
||||
return False
|
||||
threading.Event().wait(0.1)
|
||||
return True
|
||||
|
||||
class ProxyServer:
|
||||
"""Manages TS proxy server instance with worker coordination"""
|
||||
|
||||
|
|
@ -1702,23 +1585,41 @@ class ProxyServer:
|
|||
logger.error(f"Error checking orphaned channels: {e}")
|
||||
|
||||
def _clean_redis_keys(self, channel_id):
|
||||
"""Clean up all Redis keys for a channel"""
|
||||
"""Clean up all Redis keys for a channel more efficiently"""
|
||||
# Release the channel, stream, and profile keys from the channel
|
||||
channel = Channel.objects.get(id=channel_id)
|
||||
channel.release_stream()
|
||||
|
||||
if not self.redis_client:
|
||||
return
|
||||
return 0
|
||||
|
||||
try:
|
||||
channel = Channel.objects.get(id=channel_id)
|
||||
channel.release_stream()
|
||||
# All keys are now under the channel namespace for easy pattern matching
|
||||
channel_pattern = f"ts_proxy:channel:{channel_id}:*"
|
||||
all_keys = self.redis_client.keys(channel_pattern)
|
||||
# Define key patterns to scan for
|
||||
patterns = [
|
||||
f"ts_proxy:channel:{channel_id}:*", # All channel keys
|
||||
f"ts_proxy:events:{channel_id}" # Event channel
|
||||
]
|
||||
|
||||
if all_keys:
|
||||
self.redis_client.delete(*all_keys)
|
||||
logger.info(f"Cleaned up {len(all_keys)} Redis keys for channel {channel_id}")
|
||||
total_deleted = 0
|
||||
|
||||
for pattern in patterns:
|
||||
cursor = 0
|
||||
while True:
|
||||
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=100)
|
||||
if keys:
|
||||
self.redis_client.delete(*keys)
|
||||
total_deleted += len(keys)
|
||||
|
||||
# Exit when cursor returns to 0
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
logger.info(f"Cleaned up {total_deleted} Redis keys for channel {channel_id}")
|
||||
return total_deleted
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning Redis keys for channel {channel_id}: {e}")
|
||||
return 0
|
||||
|
||||
def refresh_channel_registry(self):
|
||||
"""Refresh TTL for active channels using standard keys"""
|
||||
|
|
|
|||
|
|
@ -6,4 +6,6 @@ app_name = 'ts_proxy'
|
|||
urlpatterns = [
|
||||
path('stream/<str:channel_id>', views.stream_ts, name='stream'),
|
||||
path('change_stream/<str:channel_id>', views.change_stream, name='change_stream'),
|
||||
path('status', views.channel_status, name='channel_status'),
|
||||
path('status/<str:channel_id>', views.channel_status, name='channel_status_detail'),
|
||||
]
|
||||
|
|
|
|||
|
|
@ -570,6 +570,296 @@ def change_stream(request, channel_id):
|
|||
logger.error(f"Failed to change stream: {e}", exc_info=True)
|
||||
return JsonResponse({'error': str(e)}, status=500)
|
||||
|
||||
@require_GET
|
||||
def channel_status(request, channel_id=None):
|
||||
"""
|
||||
Returns status information about channels with detail level based on request:
|
||||
- /status/ returns basic summary of all channels
|
||||
- /status/{channel_id} returns detailed info about specific channel
|
||||
"""
|
||||
try:
|
||||
# Check if Redis is available
|
||||
if not proxy_server.redis_client:
|
||||
return JsonResponse({'error': 'Redis connection not available'}, status=500)
|
||||
|
||||
# Function for detailed channel info (used when channel_id is provided)
|
||||
def get_detailed_channel_info(channel_id):
|
||||
# Get channel metadata
|
||||
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
|
||||
metadata = proxy_server.redis_client.hgetall(metadata_key)
|
||||
|
||||
if not metadata:
|
||||
return None
|
||||
|
||||
# Get detailed info - existing implementation
|
||||
# Get channel metadata
|
||||
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
|
||||
metadata = proxy_server.redis_client.hgetall(metadata_key)
|
||||
|
||||
if not metadata:
|
||||
return None
|
||||
|
||||
# Basic channel info
|
||||
buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index")
|
||||
|
||||
info = {
|
||||
'channel_id': channel_id,
|
||||
'state': metadata.get(b'state', b'unknown').decode('utf-8'),
|
||||
'url': metadata.get(b'url', b'').decode('utf-8'),
|
||||
'created_at': metadata.get(b'created_at', b'0').decode('utf-8'),
|
||||
'owner': metadata.get(b'owner', b'unknown').decode('utf-8'),
|
||||
|
||||
# Properly decode the buffer index value
|
||||
'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0,
|
||||
}
|
||||
|
||||
# Add timing information
|
||||
if b'state_changed_at' in metadata:
|
||||
state_changed_at = float(metadata[b'state_changed_at'].decode('utf-8'))
|
||||
info['state_changed_at'] = state_changed_at
|
||||
info['state_duration'] = time.time() - state_changed_at
|
||||
|
||||
if b'created_at' in metadata:
|
||||
created_at = float(metadata[b'created_at'].decode('utf-8'))
|
||||
info['created_at'] = created_at
|
||||
info['uptime'] = time.time() - created_at
|
||||
|
||||
# Get client information
|
||||
client_set_key = f"ts_proxy:channel:{channel_id}:clients"
|
||||
client_ids = proxy_server.redis_client.smembers(client_set_key)
|
||||
clients = []
|
||||
|
||||
for client_id in client_ids:
|
||||
client_id_str = client_id.decode('utf-8')
|
||||
client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}"
|
||||
client_data = proxy_server.redis_client.hgetall(client_key)
|
||||
|
||||
if client_data:
|
||||
client_info = {
|
||||
'client_id': client_id_str,
|
||||
'user_agent': client_data.get(b'user_agent', b'unknown').decode('utf-8'),
|
||||
'worker_id': client_data.get(b'worker_id', b'unknown').decode('utf-8'),
|
||||
}
|
||||
|
||||
if b'connected_at' in client_data:
|
||||
connected_at = float(client_data[b'connected_at'].decode('utf-8'))
|
||||
client_info['connected_at'] = connected_at
|
||||
client_info['connection_duration'] = time.time() - connected_at
|
||||
|
||||
if b'last_active' in client_data:
|
||||
last_active = float(client_data[b'last_active'].decode('utf-8'))
|
||||
client_info['last_active'] = last_active
|
||||
client_info['last_active_ago'] = time.time() - last_active
|
||||
|
||||
clients.append(client_info)
|
||||
|
||||
info['clients'] = clients
|
||||
info['client_count'] = len(clients)
|
||||
|
||||
# Get buffer health with improved diagnostics
|
||||
buffer_stats = {
|
||||
'chunks': info['buffer_index'],
|
||||
'diagnostics': {}
|
||||
}
|
||||
|
||||
# Sample a few recent chunks to check sizes with better error handling
|
||||
if info['buffer_index'] > 0:
|
||||
try:
|
||||
sample_chunks = min(5, info['buffer_index'])
|
||||
chunk_sizes = []
|
||||
chunk_keys_found = []
|
||||
chunk_keys_missing = []
|
||||
|
||||
# Check if the keys exist before getting
|
||||
for i in range(info['buffer_index']-sample_chunks+1, info['buffer_index']+1):
|
||||
chunk_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{i}"
|
||||
|
||||
# Check if key exists first
|
||||
if proxy_server.redis_client.exists(chunk_key):
|
||||
chunk_data = proxy_server.redis_client.get(chunk_key)
|
||||
if chunk_data:
|
||||
chunk_size = len(chunk_data)
|
||||
chunk_sizes.append(chunk_size)
|
||||
chunk_keys_found.append(i)
|
||||
|
||||
# Check for TS alignment (packets are 188 bytes)
|
||||
ts_packets = chunk_size // 188
|
||||
ts_aligned = chunk_size % 188 == 0
|
||||
|
||||
# Add for first chunk only to avoid too much data
|
||||
if len(chunk_keys_found) == 1:
|
||||
buffer_stats['diagnostics']['first_chunk'] = {
|
||||
'index': i,
|
||||
'size': chunk_size,
|
||||
'ts_packets': ts_packets,
|
||||
'aligned': ts_aligned,
|
||||
'first_byte': chunk_data[0] if chunk_size > 0 else None
|
||||
}
|
||||
else:
|
||||
chunk_keys_missing.append(i)
|
||||
|
||||
# Add detailed diagnostics
|
||||
if chunk_sizes:
|
||||
buffer_stats['avg_chunk_size'] = sum(chunk_sizes) / len(chunk_sizes)
|
||||
buffer_stats['recent_chunk_sizes'] = chunk_sizes
|
||||
buffer_stats['keys_found'] = chunk_keys_found
|
||||
buffer_stats['keys_missing'] = chunk_keys_missing
|
||||
|
||||
# Calculate data rate
|
||||
total_data = sum(chunk_sizes)
|
||||
buffer_stats['total_sample_bytes'] = total_data
|
||||
|
||||
# Add TS packet analysis
|
||||
total_ts_packets = total_data // 188
|
||||
buffer_stats['estimated_ts_packets'] = total_ts_packets
|
||||
buffer_stats['is_ts_aligned'] = all(size % 188 == 0 for size in chunk_sizes)
|
||||
else:
|
||||
# If no chunks found, scan for keys to help debug
|
||||
all_buffer_keys = []
|
||||
cursor = 0
|
||||
|
||||
buffer_key_pattern = f"ts_proxy:channel:{channel_id}:buffer:chunk:*"
|
||||
|
||||
while True:
|
||||
cursor, keys = proxy_server.redis_client.scan(cursor, match=buffer_key_pattern, count=100)
|
||||
if keys:
|
||||
all_buffer_keys.extend([k.decode('utf-8') for k in keys])
|
||||
if cursor == 0 or len(all_buffer_keys) >= 20: # Limit to 20 keys
|
||||
break
|
||||
|
||||
buffer_stats['diagnostics']['all_buffer_keys'] = all_buffer_keys[:20] # First 20 keys
|
||||
buffer_stats['diagnostics']['total_buffer_keys'] = len(all_buffer_keys)
|
||||
|
||||
except Exception as e:
|
||||
# Capture any errors for diagnostics
|
||||
buffer_stats['error'] = str(e)
|
||||
buffer_stats['diagnostics']['exception'] = str(e)
|
||||
|
||||
# Add TTL information to see if chunks are expiring
|
||||
chunk_ttl_key = f"ts_proxy:channel:{channel_id}:buffer:chunk:{info['buffer_index']}"
|
||||
chunk_ttl = proxy_server.redis_client.ttl(chunk_ttl_key)
|
||||
buffer_stats['latest_chunk_ttl'] = chunk_ttl
|
||||
|
||||
info['buffer_stats'] = buffer_stats
|
||||
|
||||
# Get local worker info if available
|
||||
if channel_id in proxy_server.stream_managers:
|
||||
manager = proxy_server.stream_managers[channel_id]
|
||||
info['local_manager'] = {
|
||||
'healthy': manager.healthy,
|
||||
'connected': manager.connected,
|
||||
'last_data_time': manager.last_data_time,
|
||||
'last_data_age': time.time() - manager.last_data_time
|
||||
}
|
||||
|
||||
return info
|
||||
|
||||
# Function for basic channel info (used for all channels summary)
|
||||
def get_basic_channel_info(channel_id):
|
||||
# Get channel metadata
|
||||
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
|
||||
metadata = proxy_server.redis_client.hgetall(metadata_key)
|
||||
|
||||
if not metadata:
|
||||
return None
|
||||
|
||||
# Basic channel info only - omit diagnostics and details
|
||||
buffer_index_value = proxy_server.redis_client.get(f"ts_proxy:channel:{channel_id}:buffer:index")
|
||||
|
||||
# Count clients (using efficient count method)
|
||||
client_set_key = f"ts_proxy:channel:{channel_id}:clients"
|
||||
client_count = proxy_server.redis_client.scard(client_set_key) or 0
|
||||
|
||||
# Calculate uptime
|
||||
created_at = float(metadata.get(b'init_time', b'0').decode('utf-8'))
|
||||
uptime = time.time() - created_at if created_at > 0 else 0
|
||||
|
||||
# Simplified info
|
||||
info = {
|
||||
'channel_id': channel_id,
|
||||
'state': metadata.get(b'state', b'unknown').decode('utf-8'),
|
||||
'url': metadata.get(b'url', b'').decode('utf-8'),
|
||||
'owner': metadata.get(b'owner', b'unknown').decode('utf-8'),
|
||||
'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0,
|
||||
'client_count': client_count,
|
||||
'uptime': uptime
|
||||
}
|
||||
|
||||
# Quick health check if available locally
|
||||
if channel_id in proxy_server.stream_managers:
|
||||
manager = proxy_server.stream_managers[channel_id]
|
||||
info['healthy'] = manager.healthy
|
||||
|
||||
# Get concise client information
|
||||
clients = []
|
||||
client_set_key = f"ts_proxy:channel:{channel_id}:clients"
|
||||
client_ids = proxy_server.redis_client.smembers(client_set_key)
|
||||
|
||||
# Process only if we have clients and keep it limited
|
||||
if client_ids:
|
||||
# Get up to 10 clients for the basic view
|
||||
for client_id in list(client_ids)[:10]:
|
||||
client_id_str = client_id.decode('utf-8')
|
||||
client_key = f"ts_proxy:channel:{channel_id}:clients:{client_id_str}"
|
||||
|
||||
# Efficient way - just retrieve the essentials
|
||||
client_info = {
|
||||
'client_id': client_id_str,
|
||||
'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent')
|
||||
}
|
||||
|
||||
if client_info['user_agent']:
|
||||
client_info['user_agent'] = client_info['user_agent'].decode('utf-8')
|
||||
else:
|
||||
client_info['user_agent'] = 'unknown'
|
||||
|
||||
# Just get connected_at for client age
|
||||
connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at')
|
||||
if connected_at_bytes:
|
||||
connected_at = float(connected_at_bytes.decode('utf-8'))
|
||||
client_info['connected_since'] = time.time() - connected_at
|
||||
|
||||
clients.append(client_info)
|
||||
|
||||
# Add clients to info
|
||||
info['clients'] = clients
|
||||
|
||||
return info
|
||||
|
||||
# Handle single channel or all channels
|
||||
if channel_id:
|
||||
# Detailed info for specific channel
|
||||
channel_info = get_detailed_channel_info(channel_id)
|
||||
if channel_info:
|
||||
return JsonResponse(channel_info)
|
||||
else:
|
||||
return JsonResponse({'error': f'Channel {channel_id} not found'}, status=404)
|
||||
else:
|
||||
# Basic info for all channels
|
||||
channel_pattern = "ts_proxy:channel:*:metadata"
|
||||
all_channels = []
|
||||
|
||||
# Extract channel IDs from keys
|
||||
cursor = 0
|
||||
while True:
|
||||
cursor, keys = proxy_server.redis_client.scan(cursor, match=channel_pattern)
|
||||
for key in keys:
|
||||
channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8'))
|
||||
if channel_id_match:
|
||||
ch_id = channel_id_match.group(1)
|
||||
channel_info = get_basic_channel_info(ch_id)
|
||||
if channel_info:
|
||||
all_channels.append(channel_info)
|
||||
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
return JsonResponse({'channels': all_channels, 'count': len(all_channels)})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in channel_status: {e}", exc_info=True)
|
||||
return JsonResponse({'error': str(e)}, status=500)
|
||||
|
||||
def get_client_data(buffer, local_index):
|
||||
"""Get optimal amount of data for client"""
|
||||
# Define limits
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue