From 50e9075bb50f99bff878266ebf8648da5bb72f51 Mon Sep 17 00:00:00 2001 From: dekzter Date: Sat, 25 Oct 2025 08:15:39 -0400 Subject: [PATCH] initial run of a binary and encoded redis client - no more encoding / decoding data into redis, huge PITA (still some outstanding spots I need to patch) --- apps/channels/api_views.py | 6 +- apps/channels/tasks.py | 4 +- apps/proxy/tasks.py | 2 +- apps/proxy/ts_proxy/channel_status.py | 208 ++++++------------ apps/proxy/ts_proxy/server.py | 75 +++---- .../ts_proxy/services/channel_service.py | 20 +- apps/proxy/ts_proxy/stream_manager.py | 14 +- apps/proxy/ts_proxy/url_utils.py | 8 +- .../multi_worker_connection_manager.py | 14 +- apps/proxy/vod_proxy/views.py | 16 +- core/redis_pubsub.py | 4 - core/tasks.py | 2 +- core/utils.py | 192 ++++++++-------- dispatcharr/persistent_lock.py | 2 +- 14 files changed, 253 insertions(+), 314 deletions(-) diff --git a/apps/channels/api_views.py b/apps/channels/api_views.py index 862de7f9..aa25ca85 100644 --- a/apps/channels/api_views.py +++ b/apps/channels/api_views.py @@ -1849,14 +1849,12 @@ class RecordingViewSet(viewsets.ModelViewSet): client_set_key = RedisKeys.clients(channel_uuid) client_ids = r.smembers(client_set_key) or [] stopped = 0 - for raw_id in client_ids: + for cid in client_ids: try: - cid = raw_id.decode("utf-8") if isinstance(raw_id, (bytes, bytearray)) else str(raw_id) meta_key = RedisKeys.client_metadata(channel_uuid, cid) ua = r.hget(meta_key, "user_agent") - ua_s = ua.decode("utf-8") if isinstance(ua, (bytes, bytearray)) else (ua or "") # Identify DVR recording client by its user agent - if ua_s and "Dispatcharr-DVR" in ua_s: + if ua and "Dispatcharr-DVR" in ua: try: ChannelService.stop_client(channel_uuid, cid) stopped += 1 diff --git a/apps/channels/tasks.py b/apps/channels/tasks.py index 3943cf16..28da31b3 100755 --- a/apps/channels/tasks.py +++ b/apps/channels/tasks.py @@ -1875,14 +1875,14 @@ def run_recording(recording_id, channel_id, start_time_str, end_time_str): md = r.hgetall(metadata_key) if md: def _gv(bkey): - return md.get(bkey.encode('utf-8')) + return md.get(bkey) def _d(bkey, cast=str): v = _gv(bkey) try: if v is None: return None - s = v.decode('utf-8') + s = v return cast(s) if cast is not str else s except Exception: return None diff --git a/apps/proxy/tasks.py b/apps/proxy/tasks.py index 68843712..d42e4d9a 100644 --- a/apps/proxy/tasks.py +++ b/apps/proxy/tasks.py @@ -31,7 +31,7 @@ def fetch_channel_stats(): while True: cursor, keys = redis_client.scan(cursor, match=channel_pattern) for key in keys: - channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8')) + channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key) if channel_id_match: ch_id = channel_id_match.group(1) channel_info = ChannelStatus.get_basic_channel_info(ch_id) diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py index 8f1d0649..015d5ca2 100644 --- a/apps/proxy/ts_proxy/channel_status.py +++ b/apps/proxy/ts_proxy/channel_status.py @@ -38,19 +38,19 @@ class ChannelStatus: info = { 'channel_id': channel_id, - 'state': metadata.get(ChannelMetadataField.STATE.encode('utf-8'), b'unknown').decode('utf-8'), - 'url': metadata.get(ChannelMetadataField.URL.encode('utf-8'), b'').decode('utf-8'), - 'stream_profile': metadata.get(ChannelMetadataField.STREAM_PROFILE.encode('utf-8'), b'').decode('utf-8'), - 'started_at': metadata.get(ChannelMetadataField.INIT_TIME.encode('utf-8'), b'0').decode('utf-8'), - 'owner': metadata.get(ChannelMetadataField.OWNER.encode('utf-8'), b'unknown').decode('utf-8'), - 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + 'state': metadata.get(ChannelMetadataField.STATE, 'unknown'), + 'url': metadata.get(ChannelMetadataField.URL, ''), + 'stream_profile': metadata.get(ChannelMetadataField.STREAM_PROFILE, ''), + 'started_at': metadata.get(ChannelMetadataField.INIT_TIME, '0'), + 'owner': metadata.get(ChannelMetadataField.OWNER, 'unknown'), + 'buffer_index': int(buffer_index_value) if buffer_index_value else 0, } # Add stream ID and name information - stream_id_bytes = metadata.get(ChannelMetadataField.STREAM_ID.encode('utf-8')) + stream_id_bytes = metadata.get(ChannelMetadataField.STREAM_ID) if stream_id_bytes: try: - stream_id = int(stream_id_bytes.decode('utf-8')) + stream_id = int(stream_id_bytes) info['stream_id'] = stream_id # Look up stream name from database @@ -65,10 +65,10 @@ class ChannelStatus: logger.warning(f"Invalid stream_id format in Redis: {stream_id_bytes}") # Add M3U profile information - m3u_profile_id_bytes = metadata.get(ChannelMetadataField.M3U_PROFILE.encode('utf-8')) + m3u_profile_id_bytes = metadata.get(ChannelMetadataField.M3U_PROFILE) if m3u_profile_id_bytes: try: - m3u_profile_id = int(m3u_profile_id_bytes.decode('utf-8')) + m3u_profile_id = int(m3u_profile_id_bytes) info['m3u_profile_id'] = m3u_profile_id # Look up M3U profile name from database @@ -83,22 +83,22 @@ class ChannelStatus: logger.warning(f"Invalid m3u_profile_id format in Redis: {m3u_profile_id_bytes}") # Add timing information - state_changed_field = ChannelMetadataField.STATE_CHANGED_AT.encode('utf-8') + state_changed_field = ChannelMetadataField.STATE_CHANGED_AT if state_changed_field in metadata: - state_changed_at = float(metadata[state_changed_field].decode('utf-8')) + state_changed_at = float(metadata[state_changed_field]) info['state_changed_at'] = state_changed_at info['state_duration'] = time.time() - state_changed_at - init_time_field = ChannelMetadataField.INIT_TIME.encode('utf-8') + init_time_field = ChannelMetadataField.INIT_TIME if init_time_field in metadata: - created_at = float(metadata[init_time_field].decode('utf-8')) + created_at = float(metadata[init_time_field]) info['started_at'] = created_at info['uptime'] = time.time() - created_at # Add data throughput information - total_bytes_field = ChannelMetadataField.TOTAL_BYTES.encode('utf-8') + total_bytes_field = ChannelMetadataField.TOTAL_BYTES if total_bytes_field in metadata: - total_bytes = int(metadata[total_bytes_field].decode('utf-8')) + total_bytes = int(metadata[total_bytes_field]) info['total_bytes'] = total_bytes # Format total bytes in human-readable form @@ -128,40 +128,40 @@ class ChannelStatus: clients = [] for client_id in client_ids: - client_id_str = client_id.decode('utf-8') + client_id_str = client_id client_key = RedisKeys.client_metadata(channel_id, client_id_str) client_data = proxy_server.redis_client.hgetall(client_key) if client_data: client_info = { 'client_id': client_id_str, - 'user_agent': client_data.get(b'user_agent', b'unknown').decode('utf-8'), - 'worker_id': client_data.get(b'worker_id', b'unknown').decode('utf-8'), + 'user_agent': client_data.get('user_agent', 'unknown'), + 'worker_id': client_data.get('worker_id', 'unknown'), } - if b'connected_at' in client_data: - connected_at = float(client_data[b'connected_at'].decode('utf-8')) + if 'connected_at' in client_data: + connected_at = float(client_data['connected_at']) client_info['connected_at'] = connected_at client_info['connection_duration'] = time.time() - connected_at - if b'last_active' in client_data: - last_active = float(client_data[b'last_active'].decode('utf-8')) + if 'last_active' in client_data: + last_active = float(client_data['last_active']) client_info['last_active'] = last_active client_info['last_active_ago'] = time.time() - last_active # Add transfer rate statistics - if b'bytes_sent' in client_data: - client_info['bytes_sent'] = int(client_data[b'bytes_sent'].decode('utf-8')) + if 'bytes_sent' in client_data: + client_info['bytes_sent'] = int(client_data['bytes_sent']) # Add average transfer rate - if b'avg_rate_KBps' in client_data: - client_info['avg_rate_KBps'] = float(client_data[b'avg_rate_KBps'].decode('utf-8')) - elif b'transfer_rate_KBps' in client_data: # For backward compatibility - client_info['avg_rate_KBps'] = float(client_data[b'transfer_rate_KBps'].decode('utf-8')) + if 'avg_rate_KBps' in client_data: + client_info['avg_rate_KBps'] = float(client_data['avg_rate_KBps']) + elif 'transfer_rate_KBps' in client_data: # For backward compatibility + client_info['avg_rate_KBps'] = float(client_data['transfer_rate_KBps']) # Add current transfer rate - if b'current_rate_KBps' in client_data: - client_info['current_rate_KBps'] = float(client_data[b'current_rate_KBps'].decode('utf-8')) + if 'current_rate_KBps' in client_data: + client_info['current_rate_KBps'] = float(client_data['current_rate_KBps']) clients.append(client_info) @@ -235,7 +235,7 @@ class ChannelStatus: while True: cursor, keys = proxy_server.redis_client.scan(cursor, match=buffer_key_pattern, count=100) if keys: - all_buffer_keys.extend([k.decode('utf-8') for k in keys]) + all_buffer_keys.extend([k for k in keys]) if cursor == 0 or len(all_buffer_keys) >= 20: # Limit to 20 keys break @@ -265,61 +265,22 @@ class ChannelStatus: } # Add FFmpeg stream information - video_codec = metadata.get(ChannelMetadataField.VIDEO_CODEC.encode('utf-8')) - if video_codec: - info['video_codec'] = video_codec.decode('utf-8') - - resolution = metadata.get(ChannelMetadataField.RESOLUTION.encode('utf-8')) - if resolution: - info['resolution'] = resolution.decode('utf-8') - - source_fps = metadata.get(ChannelMetadataField.SOURCE_FPS.encode('utf-8')) - if source_fps: - info['source_fps'] = float(source_fps.decode('utf-8')) - - pixel_format = metadata.get(ChannelMetadataField.PIXEL_FORMAT.encode('utf-8')) - if pixel_format: - info['pixel_format'] = pixel_format.decode('utf-8') - - source_bitrate = metadata.get(ChannelMetadataField.SOURCE_BITRATE.encode('utf-8')) - if source_bitrate: - info['source_bitrate'] = float(source_bitrate.decode('utf-8')) - - audio_codec = metadata.get(ChannelMetadataField.AUDIO_CODEC.encode('utf-8')) - if audio_codec: - info['audio_codec'] = audio_codec.decode('utf-8') - - sample_rate = metadata.get(ChannelMetadataField.SAMPLE_RATE.encode('utf-8')) - if sample_rate: - info['sample_rate'] = int(sample_rate.decode('utf-8')) - - audio_channels = metadata.get(ChannelMetadataField.AUDIO_CHANNELS.encode('utf-8')) - if audio_channels: - info['audio_channels'] = audio_channels.decode('utf-8') - - audio_bitrate = metadata.get(ChannelMetadataField.AUDIO_BITRATE.encode('utf-8')) - if audio_bitrate: - info['audio_bitrate'] = float(audio_bitrate.decode('utf-8')) + info['video_codec'] = metadata.get(ChannelMetadataField.VIDEO_CODEC) + info['resolution'] = metadata.get(ChannelMetadataField.RESOLUTION) + info['source_fps'] = metadata.get(ChannelMetadataField.SOURCE_FPS) + info['pixel_format'] = metadata.get(ChannelMetadataField.PIXEL_FORMAT) + info['source_bitrate'] = metadata.get(ChannelMetadataField.SOURCE_BITRATE) + info['audio_codec'] = metadata.get(ChannelMetadataField.AUDIO_CODEC) + info['sample_rate'] = metadata.get(ChannelMetadataField.SAMPLE_RATE) + info['audio_channels'] = metadata.get(ChannelMetadataField.AUDIO_CHANNELS) + info['audio_bitrate'] = metadata.get(ChannelMetadataField.AUDIO_BITRATE) # Add FFmpeg performance stats - ffmpeg_speed = metadata.get(ChannelMetadataField.FFMPEG_SPEED.encode('utf-8')) - if ffmpeg_speed: - info['ffmpeg_speed'] = float(ffmpeg_speed.decode('utf-8')) - - ffmpeg_fps = metadata.get(ChannelMetadataField.FFMPEG_FPS.encode('utf-8')) - if ffmpeg_fps: - info['ffmpeg_fps'] = float(ffmpeg_fps.decode('utf-8')) - - actual_fps = metadata.get(ChannelMetadataField.ACTUAL_FPS.encode('utf-8')) - if actual_fps: - info['actual_fps'] = float(actual_fps.decode('utf-8')) - - ffmpeg_bitrate = metadata.get(ChannelMetadataField.FFMPEG_BITRATE.encode('utf-8')) - if ffmpeg_bitrate: - info['ffmpeg_bitrate'] = float(ffmpeg_bitrate.decode('utf-8')) - stream_type = metadata.get(ChannelMetadataField.STREAM_TYPE.encode('utf-8')) - if stream_type: - info['stream_type'] = stream_type.decode('utf-8') + info['ffmpeg_speed'] = metadata.get(ChannelMetadataField.FFMPEG_SPEED) + info['ffmpeg_fps'] = metadata.get(ChannelMetadataField.FFMPEG_FPS) + info['actual_fps'] = metadata.get(ChannelMetadataField.ACTUAL_FPS) + info['ffmpeg_bitrate'] = metadata.get(ChannelMetadataField.FFMPEG_BITRATE) + info['stream_type'] = metadata.get(ChannelMetadataField.STREAM_TYPE) return info @@ -364,33 +325,27 @@ class ChannelStatus: client_count = proxy_server.redis_client.scard(client_set_key) or 0 # Calculate uptime - init_time_bytes = metadata.get(ChannelMetadataField.INIT_TIME.encode('utf-8'), b'0') - created_at = float(init_time_bytes.decode('utf-8')) + init_time_bytes = metadata.get(ChannelMetadataField.INIT_TIME, '0') + created_at = float(init_time_bytes) uptime = time.time() - created_at if created_at > 0 else 0 - # Safely decode bytes or use defaults - def safe_decode(bytes_value, default="unknown"): - if bytes_value is None: - return default - return bytes_value.decode('utf-8') - # Simplified info info = { 'channel_id': channel_id, - 'state': safe_decode(metadata.get(ChannelMetadataField.STATE.encode('utf-8'))), - 'url': safe_decode(metadata.get(ChannelMetadataField.URL.encode('utf-8')), ""), - 'stream_profile': safe_decode(metadata.get(ChannelMetadataField.STREAM_PROFILE.encode('utf-8')), ""), - 'owner': safe_decode(metadata.get(ChannelMetadataField.OWNER.encode('utf-8'))), - 'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0, + 'state': metadata.get(ChannelMetadataField.STATE), + 'url': metadata.get(ChannelMetadataField.URL, ""), + 'stream_profile': metadata.get(ChannelMetadataField.STREAM_PROFILE, ""), + 'owner': metadata.get(ChannelMetadataField.OWNER), + 'buffer_index': int(buffer_index_value) if buffer_index_value else 0, 'client_count': client_count, 'uptime': uptime } # Add stream ID and name information - stream_id_bytes = metadata.get(ChannelMetadataField.STREAM_ID.encode('utf-8')) + stream_id_bytes = metadata.get(ChannelMetadataField.STREAM_ID) if stream_id_bytes: try: - stream_id = int(stream_id_bytes.decode('utf-8')) + stream_id = int(stream_id_bytes) info['stream_id'] = stream_id # Look up stream name from database @@ -405,9 +360,9 @@ class ChannelStatus: logger.warning(f"Invalid stream_id format in Redis: {stream_id_bytes}") # Add data throughput information to basic info - total_bytes_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.TOTAL_BYTES.encode('utf-8')) + total_bytes_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.TOTAL_BYTES) if total_bytes_bytes: - total_bytes = int(total_bytes_bytes.decode('utf-8')) + total_bytes = int(total_bytes_bytes) info['total_bytes'] = total_bytes # Calculate and add bitrate @@ -434,26 +389,25 @@ class ChannelStatus: if client_ids: # Get up to 10 clients for the basic view for client_id in list(client_ids)[:10]: - client_id_str = client_id.decode('utf-8') - client_key = RedisKeys.client_metadata(channel_id, client_id_str) + client_key = RedisKeys.client_metadata(channel_id, client_id) # Efficient way - just retrieve the essentials client_info = { - 'client_id': client_id_str, + 'client_id': client_id, } # Safely get user_agent and ip_address user_agent_bytes = proxy_server.redis_client.hget(client_key, 'user_agent') - client_info['user_agent'] = safe_decode(user_agent_bytes) + client_info['user_agent'] = user_agent_bytes ip_address_bytes = proxy_server.redis_client.hget(client_key, 'ip_address') if ip_address_bytes: - client_info['ip_address'] = safe_decode(ip_address_bytes) + client_info['ip_address'] = ip_address_bytes # Just get connected_at for client age connected_at_bytes = proxy_server.redis_client.hget(client_key, 'connected_at') if connected_at_bytes: - connected_at = float(connected_at_bytes.decode('utf-8')) + connected_at = float(connected_at_bytes) client_info['connected_since'] = time.time() - connected_at clients.append(client_info) @@ -462,10 +416,10 @@ class ChannelStatus: info['clients'] = clients # Add M3U profile information - m3u_profile_id_bytes = metadata.get(ChannelMetadataField.M3U_PROFILE.encode('utf-8')) - if m3u_profile_id_bytes: + m3u_profile_id = metadata.get(ChannelMetadataField.M3U_PROFILE) + if m3u_profile_id: try: - m3u_profile_id = int(m3u_profile_id_bytes.decode('utf-8')) + m3u_profile_id = int(m3u_profile_id) info['m3u_profile_id'] = m3u_profile_id # Look up M3U profile name from database @@ -477,32 +431,16 @@ class ChannelStatus: except (ImportError, DatabaseError) as e: logger.warning(f"Failed to get M3U profile name for ID {m3u_profile_id}: {e}") except ValueError: - logger.warning(f"Invalid m3u_profile_id format in Redis: {m3u_profile_id_bytes}") + logger.warning(f"Invalid m3u_profile_id format in Redis: {m3u_profile_id}") # Add stream info to basic info as well - video_codec = metadata.get(ChannelMetadataField.VIDEO_CODEC.encode('utf-8')) - if video_codec: - info['video_codec'] = video_codec.decode('utf-8') - - resolution = metadata.get(ChannelMetadataField.RESOLUTION.encode('utf-8')) - if resolution: - info['resolution'] = resolution.decode('utf-8') - - source_fps = metadata.get(ChannelMetadataField.SOURCE_FPS.encode('utf-8')) - if source_fps: - info['source_fps'] = float(source_fps.decode('utf-8')) - ffmpeg_speed = metadata.get(ChannelMetadataField.FFMPEG_SPEED.encode('utf-8')) - if ffmpeg_speed: - info['ffmpeg_speed'] = float(ffmpeg_speed.decode('utf-8')) - audio_codec = metadata.get(ChannelMetadataField.AUDIO_CODEC.encode('utf-8')) - if audio_codec: - info['audio_codec'] = audio_codec.decode('utf-8') - audio_channels = metadata.get(ChannelMetadataField.AUDIO_CHANNELS.encode('utf-8')) - if audio_channels: - info['audio_channels'] = audio_channels.decode('utf-8') - stream_type = metadata.get(ChannelMetadataField.STREAM_TYPE.encode('utf-8')) - if stream_type: - info['stream_type'] = stream_type.decode('utf-8') + info['video_codec'] = metadata.get(ChannelMetadataField.VIDEO_CODEC) + info['resolution'] = metadata.get(ChannelMetadataField.RESOLUTION) + info['source_fps'] = metadata.get(ChannelMetadataField.SOURCE_FPS) + info['ffmpeg_speed'] = metadata.get(ChannelMetadataField.FFMPEG_SPEED) + info['audio_codec'] = metadata.get(ChannelMetadataField.AUDIO_CODEC) + info['audio_channels'] = metadata.get(ChannelMetadataField.AUDIO_CHANNELS) + info['stream_type'] = metadata.get(ChannelMetadataField.STREAM_TYPE) return info except Exception as e: diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index cca827a9..7a59f530 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -157,7 +157,8 @@ class ProxyServer: socket_timeout=60, socket_connect_timeout=10, socket_keepalive=True, - health_check_interval=30 + health_check_interval=30, + decode_responses=True ) logger.info("Created fallback Redis PubSub client for event listener") @@ -178,8 +179,8 @@ class ProxyServer: continue try: - channel = message["channel"].decode("utf-8") - data = json.loads(message["data"].decode("utf-8")) + channel = message["channel"] + data = json.loads(message["data"]) event_type = data.get("event") channel_id = data.get("channel_id") @@ -374,7 +375,7 @@ class ProxyServer: try: lock_key = RedisKeys.channel_owner(channel_id) return self._execute_redis_command( - lambda: self.redis_client.get(lock_key).decode('utf-8') if self.redis_client.get(lock_key) else None + lambda: self.redis_client.get(lock_key) if self.redis_client.get(lock_key) else None ) except Exception as e: logger.error(f"Error getting channel owner: {e}") @@ -415,7 +416,7 @@ class ProxyServer: current_owner = self._execute_redis_command( lambda: self.redis_client.get(lock_key) ) - if current_owner and current_owner.decode('utf-8') == self.worker_id: + if current_owner and current_owner == self.worker_id: # Refresh TTL self._execute_redis_command( lambda: self.redis_client.expire(lock_key, ttl) @@ -440,7 +441,7 @@ class ProxyServer: # Only delete if we're the current owner to prevent race conditions current = self.redis_client.get(lock_key) - if current and current.decode('utf-8') == self.worker_id: + if current and current == self.worker_id: self.redis_client.delete(lock_key) logger.info(f"Released ownership of channel {channel_id}") @@ -462,7 +463,7 @@ class ProxyServer: current = self.redis_client.get(lock_key) # Only extend if we're still the owner - if current and current.decode('utf-8') == self.worker_id: + if current and current == self.worker_id: self.redis_client.expire(lock_key, ttl) return True return False @@ -478,15 +479,15 @@ class ProxyServer: metadata_key = RedisKeys.channel_metadata(channel_id) if self.redis_client.exists(metadata_key): metadata = self.redis_client.hgetall(metadata_key) - if b'state' in metadata: - state = metadata[b'state'].decode('utf-8') + if 'state' in metadata: + state = metadata['state'] active_states = [ChannelState.INITIALIZING, ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS, ChannelState.ACTIVE, ChannelState.BUFFERING] if state in active_states: logger.info(f"Channel {channel_id} already being initialized with state {state}") # Create buffer and client manager only if we don't have them if channel_id not in self.stream_buffers: - self.stream_buffers[channel_id] = StreamBuffer(channel_id, redis_client=self.redis_client) + self.stream_buffers[channel_id] = StreamBuffer(channel_id, redis_client=RedisClient.get_buffer()) if channel_id not in self.client_managers: self.client_managers[channel_id] = ClientManager( channel_id, @@ -497,7 +498,7 @@ class ProxyServer: # Create buffer and client manager instances (or reuse if they exist) if channel_id not in self.stream_buffers: - buffer = StreamBuffer(channel_id, redis_client=self.redis_client) + buffer = StreamBuffer(channel_id, redis_client=RedisClient.get_buffer()) self.stream_buffers[channel_id] = buffer if channel_id not in self.client_managers: @@ -536,18 +537,18 @@ class ProxyServer: # If no url was passed, try to get from Redis if not url and existing_metadata: - url_bytes = existing_metadata.get(b'url') + url_bytes = existing_metadata.get('url') if url_bytes: - channel_url = url_bytes.decode('utf-8') + channel_url = url_bytes - ua_bytes = existing_metadata.get(b'user_agent') + ua_bytes = existing_metadata.get('user_agent') if ua_bytes: - channel_user_agent = ua_bytes.decode('utf-8') + channel_user_agent = ua_bytes # Get stream ID from metadata if not provided - if not channel_stream_id and b'stream_id' in existing_metadata: + if not channel_stream_id and 'stream_id' in existing_metadata: try: - channel_stream_id = int(existing_metadata[b'stream_id'].decode('utf-8')) + channel_stream_id = int(existing_metadata['stream_id']) logger.debug(f"Found stream_id {channel_stream_id} in metadata for channel {channel_id}") except (ValueError, TypeError) as e: logger.debug(f"Could not parse stream_id from metadata: {e}") @@ -562,7 +563,7 @@ class ProxyServer: # Create buffer but not stream manager (only if not already exists) if channel_id not in self.stream_buffers: - buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) + buffer = StreamBuffer(channel_id=channel_id, redis_client=RedisClient.get_buffer()) self.stream_buffers[channel_id] = buffer # Create client manager with channel_id and redis_client (only if not already exists) @@ -585,7 +586,7 @@ class ProxyServer: # Create buffer but not stream manager (only if not already exists) if channel_id not in self.stream_buffers: - buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) + buffer = StreamBuffer(channel_id=channel_id, redis_client=RedisClient.get_buffer()) self.stream_buffers[channel_id] = buffer # Create client manager with channel_id and redis_client (only if not already exists) @@ -624,12 +625,12 @@ class ProxyServer: # Verify the stream_id was set correctly in Redis stream_id_value = self.redis_client.hget(metadata_key, "stream_id") if stream_id_value: - logger.info(f"Verified stream_id {stream_id_value.decode('utf-8')} is set in Redis for channel {channel_id}") + logger.info(f"Verified stream_id {stream_id_value} is set in Redis for channel {channel_id}") else: logger.warning(f"Failed to set stream_id in Redis for channel {channel_id}") # Create stream buffer - buffer = StreamBuffer(channel_id=channel_id, redis_client=self.redis_client) + buffer = StreamBuffer(channel_id=channel_id, redis_client=RedisClient.get_buffer()) logger.debug(f"Created StreamBuffer for channel {channel_id}") self.stream_buffers[channel_id] = buffer @@ -700,8 +701,8 @@ class ProxyServer: metadata = self.redis_client.hgetall(metadata_key) # Get channel state and owner - state = metadata.get(b'state', b'unknown').decode('utf-8') - owner = metadata.get(b'owner', b'').decode('utf-8') + state = metadata.get('state', 'unknown') + owner = metadata.get('owner', '') # States that indicate the channel is running properly valid_states = [ChannelState.ACTIVE, ChannelState.WAITING_FOR_CLIENTS, @@ -726,8 +727,8 @@ class ProxyServer: return True else: # Unknown or initializing state, check how long it's been in this state - if b'state_changed_at' in metadata: - state_changed_at = float(metadata[b'state_changed_at'].decode('utf-8')) + if 'state_changed_at' in metadata: + state_changed_at = float(metadata['state_changed_at']) state_age = time.time() - state_changed_at # If in initializing state for too long, consider it stale @@ -762,8 +763,8 @@ class ProxyServer: # If we have metadata, log details for debugging if metadata: - state = metadata.get(b'state', b'unknown').decode('utf-8') - owner = metadata.get(b'owner', b'unknown').decode('utf-8') + state = metadata.get('state', 'unknown') + owner = metadata.get('owner', 'unknown') logger.info(f"Zombie channel details - state: {state}, owner: {owner}") # Clean up Redis keys @@ -931,8 +932,8 @@ class ProxyServer: if self.redis_client: metadata_key = RedisKeys.channel_metadata(channel_id) metadata = self.redis_client.hgetall(metadata_key) - if metadata and b'state' in metadata: - channel_state = metadata[b'state'].decode('utf-8') + if metadata and 'state' in metadata: + channel_state = metadata['state'] # Check if channel has any clients left total_clients = 0 @@ -948,9 +949,9 @@ class ProxyServer: if channel_state in [ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS]: # Get connection ready time from metadata connection_ready_time = None - if metadata and b'connection_ready_time' in metadata: + if metadata and 'connection_ready_time' in metadata: try: - connection_ready_time = float(metadata[b'connection_ready_time'].decode('utf-8')) + connection_ready_time = float(metadata['connection_ready_time']) except (ValueError, TypeError): pass @@ -981,8 +982,8 @@ class ProxyServer: # Grace period expired but we have clients - mark channel as active logger.info(f"Grace period expired with {total_clients} clients - marking channel {channel_id} as active") old_state = "unknown" - if metadata and b'state' in metadata: - old_state = metadata[b'state'].decode('utf-8') + if metadata and 'state' in metadata: + old_state = metadata['state'] if self.update_channel_state(channel_id, ChannelState.ACTIVE, { "grace_period_ended_at": str(time.time()), "clients_at_activation": str(total_clients) @@ -998,7 +999,7 @@ class ProxyServer: disconnect_value = self.redis_client.get(disconnect_key) if disconnect_value: try: - disconnect_time = float(disconnect_value.decode('utf-8')) + disconnect_time = float(disconnect_value) except (ValueError, TypeError) as e: logger.error(f"Invalid disconnect time for channel {channel_id}: {e}") @@ -1076,7 +1077,7 @@ class ProxyServer: for key in channel_keys: try: - channel_id = key.decode('utf-8').split(':')[2] + channel_id = key.split(':')[2] # Skip channels we already have locally if channel_id in self.stream_buffers: @@ -1170,8 +1171,8 @@ class ProxyServer: # Get current state for logging current_state = None metadata = self.redis_client.hgetall(metadata_key) - if metadata and b'state' in metadata: - current_state = metadata[b'state'].decode('utf-8') + if metadata and 'state' in metadata: + current_state = metadata['state'] # Only update if state is actually changing if current_state == new_state: diff --git a/apps/proxy/ts_proxy/services/channel_service.py b/apps/proxy/ts_proxy/services/channel_service.py index 551e2d27..6e7d3878 100644 --- a/apps/proxy/ts_proxy/services/channel_service.py +++ b/apps/proxy/ts_proxy/services/channel_service.py @@ -59,7 +59,7 @@ class ChannelService: # Verify the stream_id was set stream_id_value = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.STREAM_ID) if stream_id_value: - logger.debug(f"Verified stream_id {stream_id_value.decode('utf-8')} is now set in Redis") + logger.debug(f"Verified stream_id {stream_id_value} is now set in Redis") else: logger.error(f"Failed to set stream_id {stream_id} in Redis before initialization") @@ -129,7 +129,7 @@ class ChannelService: try: # This is inefficient but used for diagnostics - in production would use more targeted checks redis_keys = proxy_server.redis_client.keys(f"ts_proxy:*:{channel_id}*") - redis_keys = [k.decode('utf-8') for k in redis_keys] if redis_keys else [] + redis_keys = [k for k in redis_keys] if redis_keys else [] except Exception as e: logger.error(f"Error checking Redis keys: {e}") @@ -234,8 +234,8 @@ class ChannelService: metadata_key = RedisKeys.channel_metadata(channel_id) try: metadata = proxy_server.redis_client.hgetall(metadata_key) - if metadata and b'state' in metadata: - state = metadata[b'state'].decode('utf-8') + if metadata and 'state' in metadata: + state = metadata['state'] channel_info = {"state": state} # Immediately mark as stopping in metadata so clients detect it faster @@ -375,8 +375,8 @@ class ChannelService: metadata = proxy_server.redis_client.hgetall(metadata_key) # Extract state and owner - state = metadata.get(ChannelMetadataField.STATE.encode(), b'unknown').decode('utf-8') - owner = metadata.get(ChannelMetadataField.OWNER.encode(), b'unknown').decode('utf-8') + state = metadata.get(ChannelMetadataField.STATE.encode(), 'unknown') + owner = metadata.get(ChannelMetadataField.OWNER.encode(), 'unknown') # Valid states indicate channel is running properly valid_states = [ChannelState.ACTIVE, ChannelState.WAITING_FOR_CLIENTS, ChannelState.CONNECTING] @@ -402,7 +402,7 @@ class ChannelService: } if last_data: - last_data_time = float(last_data.decode('utf-8')) + last_data_time = float(last_data) data_age = time.time() - last_data_time details["last_data_age"] = data_age @@ -598,7 +598,7 @@ class ChannelService: def _update_stream_stats_in_db(stream_id, **stats): """Update stream stats in database""" from django.db import connection - + try: from apps.channels.models import Stream from django.utils import timezone @@ -624,7 +624,7 @@ class ChannelService: except Exception as e: logger.error(f"Error updating stream stats in database for stream {stream_id}: {e}") return False - + finally: # Always close database connection after update try: @@ -645,7 +645,7 @@ class ChannelService: metadata_key = RedisKeys.channel_metadata(channel_id) # First check if the key exists and what type it is - key_type = proxy_server.redis_client.type(metadata_key).decode('utf-8') + key_type = proxy_server.redis_client.type(metadata_key) logger.debug(f"Redis key {metadata_key} is of type: {key_type}") # Build metadata update dict diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index 99ae8027..73118768 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -89,7 +89,7 @@ class StreamManager: # Try to get stream_id specifically stream_id_bytes = buffer.redis_client.hget(metadata_key, "stream_id") if stream_id_bytes: - self.current_stream_id = int(stream_id_bytes.decode('utf-8')) + self.current_stream_id = int(stream_id_bytes) self.tried_stream_ids.add(self.current_stream_id) logger.info(f"Loaded stream ID {self.current_stream_id} from Redis for channel {buffer.channel_id}") else: @@ -362,7 +362,7 @@ class StreamManager: current_owner = self.buffer.redis_client.get(owner_key) # Use the worker_id that was passed in during initialization - if current_owner and self.worker_id and current_owner.decode('utf-8') == self.worker_id: + if current_owner and self.worker_id and current_owner == self.worker_id: # Determine the appropriate error message based on retry failures if self.tried_stream_ids and len(self.tried_stream_ids) > 0: error_message = f"All {len(self.tried_stream_ids)} stream options failed" @@ -948,10 +948,10 @@ class StreamManager: logger.debug(f"Updated m3u profile for channel {self.channel_id} to use profile from stream {stream_id}") else: logger.warning(f"Failed to update stream profile for channel {self.channel_id}") - + except Exception as e: logger.error(f"Error updating stream profile for channel {self.channel_id}: {e}") - + finally: # Always close database connection after profile update try: @@ -1348,9 +1348,9 @@ class StreamManager: current_state = None try: metadata = redis_client.hgetall(metadata_key) - state_field = ChannelMetadataField.STATE.encode('utf-8') + state_field = ChannelMetadataField.STATE if metadata and state_field in metadata: - current_state = metadata[state_field].decode('utf-8') + current_state = metadata[state_field] except Exception as e: logger.error(f"Error checking current state: {e}") @@ -1555,4 +1555,4 @@ class StreamManager: """Safely reset the URL switching state if it gets stuck""" self.url_switching = False self.url_switch_start_time = 0 - logger.info(f"Reset URL switching state for channel {self.channel_id}") \ No newline at end of file + logger.info(f"Reset URL switching state for channel {self.channel_id}") diff --git a/apps/proxy/ts_proxy/url_utils.py b/apps/proxy/ts_proxy/url_utils.py index db53cc74..0bfde312 100644 --- a/apps/proxy/ts_proxy/url_utils.py +++ b/apps/proxy/ts_proxy/url_utils.py @@ -215,9 +215,9 @@ def get_stream_info_for_switch(channel_id: str, target_stream_id: Optional[int] existing_stream_id = redis_client.get(f"channel_stream:{channel.id}") if existing_stream_id: # Decode bytes to string/int for proper Redis key lookup - existing_stream_id = existing_stream_id.decode('utf-8') + existing_stream_id = existing_stream_id existing_profile_id = redis_client.get(f"stream_profile:{existing_stream_id}") - if existing_profile_id and int(existing_profile_id.decode('utf-8')) == profile.id: + if existing_profile_id and int(existing_profile_id) == profile.id: channel_using_profile = True logger.debug(f"Channel {channel.id} already using profile {profile.id}") @@ -353,9 +353,9 @@ def get_alternate_streams(channel_id: str, current_stream_id: Optional[int] = No existing_stream_id = redis_client.get(f"channel_stream:{channel.id}") if existing_stream_id: # Decode bytes to string/int for proper Redis key lookup - existing_stream_id = existing_stream_id.decode('utf-8') + existing_stream_id = existing_stream_id existing_profile_id = redis_client.get(f"stream_profile:{existing_stream_id}") - if existing_profile_id and int(existing_profile_id.decode('utf-8')) == profile.id: + if existing_profile_id and int(existing_profile_id) == profile.id: channel_using_profile = True logger.debug(f"Channel {channel.id} already using profile {profile.id}") diff --git a/apps/proxy/vod_proxy/multi_worker_connection_manager.py b/apps/proxy/vod_proxy/multi_worker_connection_manager.py index fefc8739..e1fe4c4b 100644 --- a/apps/proxy/vod_proxy/multi_worker_connection_manager.py +++ b/apps/proxy/vod_proxy/multi_worker_connection_manager.py @@ -219,7 +219,7 @@ class RedisBackedVODConnection: # Convert bytes keys/values to strings if needed if isinstance(list(data.keys())[0], bytes): - data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data.items()} + data = {k: v for k, v in data.items()} return SerializableConnectionState.from_dict(data) except Exception as e: @@ -1115,14 +1115,14 @@ class MultiWorkerVODConnectionManager: # Convert bytes to strings if needed if isinstance(list(data.keys())[0], bytes): - data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data.items()} + data = {k: v for k, v in data.items()} last_activity = float(data.get('last_activity', 0)) active_streams = int(data.get('active_streams', 0)) # Clean up if stale and no active streams if (current_time - last_activity > max_age_seconds) and active_streams == 0: - session_id = key.decode('utf-8').replace('vod_persistent_connection:', '') + session_id = key.replace('vod_persistent_connection:', '') logger.info(f"Cleaning up stale connection: {session_id}") # Clean up connection and related keys @@ -1219,7 +1219,7 @@ class MultiWorkerVODConnectionManager: if connection_data: # Convert bytes to strings if needed if isinstance(list(connection_data.keys())[0], bytes): - connection_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in connection_data.items()} + connection_data = {k: v for k, v in connection_data.items()} profile_id = connection_data.get('m3u_profile_id') if profile_id: @@ -1279,7 +1279,7 @@ class MultiWorkerVODConnectionManager: # Convert bytes keys/values to strings if needed if isinstance(list(connection_data.keys())[0], bytes): - connection_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in connection_data.items()} + connection_data = {k: v for k, v in connection_data.items()} # Check if content matches (using consolidated data) stored_content_type = connection_data.get('content_obj_type', '') @@ -1289,7 +1289,7 @@ class MultiWorkerVODConnectionManager: continue # Extract session ID - session_id = key.decode('utf-8').replace('vod_persistent_connection:', '') + session_id = key.replace('vod_persistent_connection:', '') # Check if Redis-backed connection exists and has no active streams redis_connection = RedisBackedVODConnection(session_id, self.redis_client) @@ -1367,4 +1367,4 @@ class MultiWorkerVODConnectionManager: return redis_connection.get_session_metadata() except Exception as e: logger.error(f"Error getting session info for {session_id}: {e}") - return None \ No newline at end of file + return None diff --git a/apps/proxy/vod_proxy/views.py b/apps/proxy/vod_proxy/views.py index 00ed8a10..12e1d071 100644 --- a/apps/proxy/vod_proxy/views.py +++ b/apps/proxy/vod_proxy/views.py @@ -550,14 +550,7 @@ class VODStreamView(View): connection_data = redis_client.hgetall(persistent_connection_key) if connection_data: - # Decode Redis hash data - decoded_data = {} - for k, v in connection_data.items(): - k_str = k.decode('utf-8') if isinstance(k, bytes) else k - v_str = v.decode('utf-8') if isinstance(v, bytes) else v - decoded_data[k_str] = v_str - - existing_profile_id = decoded_data.get('m3u_profile_id') + existing_profile_id = connection_data.get('m3u_profile_id') if existing_profile_id: try: existing_profile = M3UAccountProfile.objects.get( @@ -770,19 +763,16 @@ class VODStatsView(View): for key in keys: try: - key_str = key.decode('utf-8') if isinstance(key, bytes) else key connection_data = redis_client.hgetall(key) if connection_data: # Extract session ID from key - session_id = key_str.replace('vod_persistent_connection:', '') + session_id = key.replace('vod_persistent_connection:', '') # Decode Redis hash data combined_data = {} for k, v in connection_data.items(): - k_str = k.decode('utf-8') if isinstance(k, bytes) else k - v_str = v.decode('utf-8') if isinstance(v, bytes) else v - combined_data[k_str] = v_str + combined_data[k] = v # Get content info from the connection data (using correct field names) content_type = combined_data.get('content_obj_type', 'unknown') diff --git a/core/redis_pubsub.py b/core/redis_pubsub.py index 5d0032b0..b1f48b1f 100644 --- a/core/redis_pubsub.py +++ b/core/redis_pubsub.py @@ -201,10 +201,6 @@ class RedisPubSubManager: channel = message.get('channel') if channel: - # Decode binary channel name if needed - if isinstance(channel, bytes): - channel = channel.decode('utf-8') - # Find and call the appropriate handler handler = self.message_handlers.get(channel) if handler: diff --git a/core/tasks.py b/core/tasks.py index f757613b..257d8660 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -404,7 +404,7 @@ def fetch_channel_stats(): while True: cursor, keys = redis_client.scan(cursor, match=channel_pattern) for key in keys: - channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8')) + channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key) if channel_id_match: ch_id = channel_id_match.group(1) channel_info = ChannelStatus.get_basic_channel_info(ch_id) diff --git a/core/utils.py b/core/utils.py index 36ac5fef..9cc4c26f 100644 --- a/core/utils.py +++ b/core/utils.py @@ -42,101 +42,116 @@ def natural_sort_key(text): return [convert(c) for c in re.split('([0-9]+)', text)] class RedisClient: + _initialized = False _client = None + _buffer = None _pubsub_client = None + @classmethod + def _init_client(cls, decode_responses=True, max_retries=5, retry_interval=1): + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + + # Use standardized settings + socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5) + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + + # Create Redis client with better defaults + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout, + decode_responses=decode_responses + ) + + # Validate connection with ping + client.ping() + if cls._initialized is False: + client.flushdb() + cls._initialized = True + + # Disable persistence on first connection - improves performance + # Only try to disable if not in a read-only environment + try: + client.config_set('save', '') # Disable RDB snapshots + client.config_set('appendonly', 'no') # Disable AOF logging + + # Set optimal memory settings with environment variable support + # Get max memory from environment or use a larger default (512MB instead of 256MB) + #max_memory = os.environ.get('REDIS_MAX_MEMORY', '512mb') + #eviction_policy = os.environ.get('REDIS_EVICTION_POLICY', 'allkeys-lru') + + # Apply memory settings + #client.config_set('maxmemory-policy', eviction_policy) + #client.config_set('maxmemory', max_memory) + + #logger.info(f"Redis configured with maxmemory={max_memory}, policy={eviction_policy}") + + # Disable protected mode when in debug mode + if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true': + client.config_set('protected-mode', 'no') # Disable protected mode in debug + logger.warning("Redis protected mode disabled for debug environment") + + logger.trace("Redis persistence disabled for better performance") + except redis.exceptions.ResponseError as e: + # Improve error handling for Redis configuration errors + if "OOM" in str(e): + logger.error(f"Redis OOM during configuration: {e}") + # Try to increase maxmemory as an emergency measure + try: + client.config_set('maxmemory', '768mb') + logger.warning("Applied emergency Redis memory increase to 768MB") + except: + pass + else: + logger.error(f"Redis configuration error: {e}") + + logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") + break + + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) + + except Exception as e: + logger.error(f"Unexpected error connecting to Redis: {e}") + return None + + return client + @classmethod def get_client(cls, max_retries=5, retry_interval=1): if cls._client is None: - retry_count = 0 - while retry_count < max_retries: - try: - # Get connection parameters from settings or environment - redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) - redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) - redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - - # Use standardized settings - socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5) - socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) - health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) - socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) - retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) - - # Create Redis client with better defaults - client = redis.Redis( - host=redis_host, - port=redis_port, - db=redis_db, - socket_timeout=socket_timeout, - socket_connect_timeout=socket_connect_timeout, - socket_keepalive=socket_keepalive, - health_check_interval=health_check_interval, - retry_on_timeout=retry_on_timeout - ) - - # Validate connection with ping - client.ping() - client.flushdb() - - # Disable persistence on first connection - improves performance - # Only try to disable if not in a read-only environment - try: - client.config_set('save', '') # Disable RDB snapshots - client.config_set('appendonly', 'no') # Disable AOF logging - - # Set optimal memory settings with environment variable support - # Get max memory from environment or use a larger default (512MB instead of 256MB) - #max_memory = os.environ.get('REDIS_MAX_MEMORY', '512mb') - #eviction_policy = os.environ.get('REDIS_EVICTION_POLICY', 'allkeys-lru') - - # Apply memory settings - #client.config_set('maxmemory-policy', eviction_policy) - #client.config_set('maxmemory', max_memory) - - #logger.info(f"Redis configured with maxmemory={max_memory}, policy={eviction_policy}") - - # Disable protected mode when in debug mode - if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true': - client.config_set('protected-mode', 'no') # Disable protected mode in debug - logger.warning("Redis protected mode disabled for debug environment") - - logger.trace("Redis persistence disabled for better performance") - except redis.exceptions.ResponseError as e: - # Improve error handling for Redis configuration errors - if "OOM" in str(e): - logger.error(f"Redis OOM during configuration: {e}") - # Try to increase maxmemory as an emergency measure - try: - client.config_set('maxmemory', '768mb') - logger.warning("Applied emergency Redis memory increase to 768MB") - except: - pass - else: - logger.error(f"Redis configuration error: {e}") - - logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") - - cls._client = client - break - - except (ConnectionError, TimeoutError) as e: - retry_count += 1 - if retry_count >= max_retries: - logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}") - return None - else: - # Use exponential backoff for retries - wait_time = retry_interval * (2 ** (retry_count - 1)) - logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") - time.sleep(wait_time) - - except Exception as e: - logger.error(f"Unexpected error connecting to Redis: {e}") - return None - + cls._client = cls._init_client(decode_responses=True, max_retries=max_retries, retry_interval=retry_interval) return cls._client + @classmethod + def get_buffer(cls, max_retries=5, retry_interval=1): + """Get Redis client optimized for binary data (no decoding)""" + if cls._buffer is None: + cls._buffer = cls._init_client(decode_responses=False, max_retries=max_retries, retry_interval=retry_interval) + return cls._buffer + @classmethod def get_pubsub_client(cls, max_retries=5, retry_interval=1): """Get Redis client optimized for PubSub operations""" @@ -165,7 +180,8 @@ class RedisClient: socket_connect_timeout=socket_connect_timeout, socket_keepalive=socket_keepalive, health_check_interval=health_check_interval, - retry_on_timeout=retry_on_timeout + retry_on_timeout=retry_on_timeout, + decode_responses=True ) # Validate connection with ping diff --git a/dispatcharr/persistent_lock.py b/dispatcharr/persistent_lock.py index 360c9b5d..fe62be43 100644 --- a/dispatcharr/persistent_lock.py +++ b/dispatcharr/persistent_lock.py @@ -48,7 +48,7 @@ class PersistentLock: Returns True if the expiration was successfully extended. """ current_value = self.redis_client.get(self.lock_key) - if current_value and current_value.decode("utf-8") == self.lock_token: + if current_value and current_value == self.lock_token: self.redis_client.expire(self.lock_key, self.lock_timeout) self.has_lock = False return True