From 7351264e8a4194cefb0076e7f4b31f26be0473c4 Mon Sep 17 00:00:00 2001 From: dekzter Date: Fri, 4 Apr 2025 16:18:12 -0400 Subject: [PATCH] centralized and lazy-loaded redis client singleton, check for manage.py commands so we don't init proxyservers (redis connection), put manage commmands before starting uwsgi --- apps/channels/models.py | 12 +- apps/m3u/serializers.py | 14 + apps/m3u/tasks.py | 4 +- apps/proxy/apps.py | 14 +- apps/proxy/tasks.py | 4 +- apps/proxy/ts_proxy/__init__.py | 9 - apps/proxy/ts_proxy/apps.py | 13 + apps/proxy/ts_proxy/channel_status.py | 8 +- apps/proxy/ts_proxy/server.py | 21 +- .../ts_proxy/services/channel_service.py | 18 +- apps/proxy/ts_proxy/stream_generator.py | 11 +- apps/proxy/ts_proxy/views.py | 9 +- core/utils.py | 239 ++++++++---------- dispatcharr/settings.py | 1 + docker/entrypoint.sh | 10 +- 15 files changed, 213 insertions(+), 174 deletions(-) create mode 100644 apps/proxy/ts_proxy/apps.py diff --git a/apps/channels/models.py b/apps/channels/models.py index 0e6d1c52..8a8ddcec 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -3,7 +3,7 @@ from django.core.exceptions import ValidationError from core.models import StreamProfile from django.conf import settings from core.models import StreamProfile, CoreSettings -from core.utils import redis_client, execute_redis_command +from core.utils import RedisClient import logging import uuid from datetime import datetime @@ -19,8 +19,7 @@ from apps.m3u.models import M3UAccount # Add fallback functions if Redis isn't available def get_total_viewers(channel_id): """Get viewer count from Redis or return 0 if Redis isn't available""" - if redis_client is None: - return 0 + redis_client = RedisClient.get_client() try: return int(redis_client.get(f"channel:{channel_id}:viewers") or 0) @@ -144,7 +143,7 @@ class Stream(models.Model): """ Finds an available stream for the requested channel and returns the selected stream and profile. """ - + redis_client = RedisClient.get_client() profile_id = redis_client.get(f"stream_profile:{self.id}") if profile_id: profile_id = int(profile_id) @@ -184,6 +183,8 @@ class Stream(models.Model): """ Called when a stream is finished to release the lock. """ + redis_client = RedisClient.get_client() + stream_id = self.id # Get the matched profile for cleanup profile_id = redis_client.get(f"stream_profile:{stream_id}") @@ -280,6 +281,7 @@ class Channel(models.Model): """ Finds an available stream for the requested channel and returns the selected stream and profile. """ + redis_client = RedisClient.get_client() # 2. Check if a stream is already active for this channel stream_id = redis_client.get(f"channel_stream:{self.id}") @@ -326,6 +328,8 @@ class Channel(models.Model): """ Called when a stream is finished to release the lock. """ + redis_client = RedisClient.get_client() + stream_id = redis_client.get(f"channel_stream:{self.id}") if not stream_id: logger.debug("Invalid stream ID pulled from channel index") diff --git a/apps/m3u/serializers.py b/apps/m3u/serializers.py index 67715717..b977486a 100644 --- a/apps/m3u/serializers.py +++ b/apps/m3u/serializers.py @@ -1,4 +1,5 @@ from rest_framework import serializers +from rest_framework.response import Response from .models import M3UAccount, M3UFilter, ServerGroup, M3UAccountProfile from core.models import UserAgent from apps.channels.models import ChannelGroup, ChannelGroupM3UAccount @@ -32,6 +33,19 @@ class M3UAccountProfileSerializer(serializers.ModelSerializer): return super().create(validated_data) + def update(self, instance, validated_data): + if instance.is_default: + raise serializers.ValidationError("Default profiles cannot be modified.") + return super().update(instance, validated_data) + + def destroy(self, request, *args, **kwargs): + instance = self.get_object() + if instance.is_default: + return Response( + {"error": "Default profiles cannot be deleted."}, + status=status.HTTP_400_BAD_REQUEST + ) + return super().destroy(request, *args, **kwargs) class M3UAccountSerializer(serializers.ModelSerializer): """Serializer for M3U Account""" diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 5d8e337c..a98f022b 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -16,7 +16,7 @@ from channels.layers import get_channel_layer from django.utils import timezone import time import json -from core.utils import redis_client, acquire_task_lock, release_task_lock +from core.utils import acquire_task_lock, release_task_lock from core.models import CoreSettings from asgiref.sync import async_to_sync @@ -173,6 +173,7 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): stream_hashes = {} # compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters] + redis_client = RedisClient.get_client() logger.debug(f"Processing batch of {len(batch)}") for stream_info in batch: @@ -327,6 +328,7 @@ def refresh_single_m3u_account(account_id, use_cache=False): if not acquire_task_lock('refresh_single_m3u_account', account_id): return f"Task already running for account_id={account_id}." + redis_client = RedisClient.get_client() # Record start time start_time = time.time() send_progress_update(0, account_id) diff --git a/apps/proxy/apps.py b/apps/proxy/apps.py index c8c42088..d1c8b966 100644 --- a/apps/proxy/apps.py +++ b/apps/proxy/apps.py @@ -1,3 +1,4 @@ +import sys from django.apps import AppConfig class ProxyConfig(AppConfig): @@ -7,9 +8,10 @@ class ProxyConfig(AppConfig): def ready(self): """Initialize proxy servers when Django starts""" - from .hls_proxy.server import ProxyServer as HLSProxyServer - from .ts_proxy.server import ProxyServer as TSProxyServer - - # Initialize proxy servers - self.hls_proxy = HLSProxyServer() - self.ts_proxy = TSProxyServer() \ No newline at end of file + if 'manage.py' not in sys.argv: + from .hls_proxy.server import ProxyServer as HLSProxyServer + from .ts_proxy.server import ProxyServer as TSProxyServer + + # Initialize proxy servers + self.hls_proxy = HLSProxyServer() + self.ts_proxy = TSProxyServer() diff --git a/apps/proxy/tasks.py b/apps/proxy/tasks.py index 37a1f8f9..a4aaf8e5 100644 --- a/apps/proxy/tasks.py +++ b/apps/proxy/tasks.py @@ -6,7 +6,7 @@ import redis import json import logging import re -from core.utils import redis_client +from core.utils import RedisClient from apps.proxy.ts_proxy.channel_status import ChannelStatus logger = logging.getLogger(__name__) @@ -16,6 +16,8 @@ last_known_data = {} @shared_task def fetch_channel_stats(): + redis_client = RedisClient.get_client() + try: # Basic info for all channels channel_pattern = "ts_proxy:channel:*:metadata" diff --git a/apps/proxy/ts_proxy/__init__.py b/apps/proxy/ts_proxy/__init__.py index 2f48eb4b..e69de29b 100644 --- a/apps/proxy/ts_proxy/__init__.py +++ b/apps/proxy/ts_proxy/__init__.py @@ -1,9 +0,0 @@ -"""Transport Stream proxy module""" - -# Only class imports, no instance creation -from .server import ProxyServer -from .stream_manager import StreamManager -from .stream_buffer import StreamBuffer -from .client_manager import ClientManager - -proxy_server = ProxyServer() diff --git a/apps/proxy/ts_proxy/apps.py b/apps/proxy/ts_proxy/apps.py new file mode 100644 index 00000000..116720df --- /dev/null +++ b/apps/proxy/ts_proxy/apps.py @@ -0,0 +1,13 @@ +import sys +from django.apps import AppConfig + +class TSProxyConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'apps.proxy.ts_proxy' + verbose_name = "TS Stream Proxies" + + def ready(self): + """Initialize proxy servers when Django starts""" + if 'manage.py' not in sys.argv: + from .server import ProxyServer + ProxyServer.get_instance() diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py index b9f1596a..d4e33f02 100644 --- a/apps/proxy/ts_proxy/channel_status.py +++ b/apps/proxy/ts_proxy/channel_status.py @@ -1,7 +1,7 @@ import logging import time import re -from . import proxy_server +from .server import ProxyServer from .redis_keys import RedisKeys from .constants import TS_PACKET_SIZE, ChannelMetadataField from redis.exceptions import ConnectionError, TimeoutError @@ -22,6 +22,8 @@ class ChannelStatus: return (total_bytes * 8) / duration / 1000 def get_detailed_channel_info(channel_id): + proxy_server = ProxyServer.get_instance() + # Get channel metadata metadata_key = RedisKeys.channel_metadata(channel_id) metadata = proxy_server.redis_client.hgetall(metadata_key) @@ -230,6 +232,8 @@ class ChannelStatus: @staticmethod def _execute_redis_command(command_func): """Execute Redis command with error handling""" + proxy_server = ProxyServer.get_instance() + if not proxy_server.redis_client: return None @@ -245,6 +249,8 @@ class ChannelStatus: @staticmethod def get_basic_channel_info(channel_id): """Get basic channel information with Redis error handling""" + proxy_server = ProxyServer.get_instance() + try: # Use _execute_redis_command for Redis operations metadata_key = RedisKeys.channel_metadata(channel_id) diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index c11bb16a..1a04ffb5 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -18,7 +18,7 @@ import json from typing import Dict, Optional, Set from apps.proxy.config import TSConfig as Config from apps.channels.models import Channel, Stream -from core.utils import get_redis_client, get_redis_pubsub_client +from core.utils import RedisClient from redis.exceptions import ConnectionError, TimeoutError from .stream_manager import StreamManager from .stream_buffer import StreamBuffer @@ -32,6 +32,19 @@ logger = get_logger() class ProxyServer: """Manages TS proxy server instance with worker coordination""" + _instance = None + + @classmethod + def get_instance(cls): + if cls._instance is None: + from .server import ProxyServer + from .stream_manager import StreamManager + from .stream_buffer import StreamBuffer + from .client_manager import ClientManager + + cls._instance = ProxyServer() + + return cls._instance def __init__(self): """Initialize proxy server with worker identification""" @@ -54,7 +67,7 @@ class ProxyServer: try: # Use dedicated Redis client for proxy - self.redis_client = get_redis_client() + self.redis_client = RedisClient.get_client() if self.redis_client is not None: logger.info(f"Using dedicated Redis client for proxy server") logger.info(f"Worker ID: {self.worker_id}") @@ -76,7 +89,7 @@ class ProxyServer: def _setup_redis_connection(self): """Setup Redis connection with retry logic""" # Try to use get_redis_client utility instead of direct connection - self.redis_client = get_redis_client(max_retries=self.redis_max_retries, + self.redis_client = RedisClient.get_client(max_retries=self.redis_max_retries, retry_interval=self.redis_retry_interval) if self.redis_client: logger.info(f"Successfully connected to Redis using utility function") @@ -121,7 +134,7 @@ class ProxyServer: while True: try: # Use dedicated PubSub client for event listener - pubsub_client = get_redis_pubsub_client() + pubsub_client = RedisClient.get_pubsub_client() if pubsub_client: logger.info("Using dedicated Redis PubSub client for event listener") else: diff --git a/apps/proxy/ts_proxy/services/channel_service.py b/apps/proxy/ts_proxy/services/channel_service.py index 72209c96..3ac62af4 100644 --- a/apps/proxy/ts_proxy/services/channel_service.py +++ b/apps/proxy/ts_proxy/services/channel_service.py @@ -9,7 +9,7 @@ import json from django.shortcuts import get_object_or_404 from apps.channels.models import Channel from apps.proxy.config import TSConfig as Config -from .. import proxy_server +from ..server import ProxyServer from ..redis_keys import RedisKeys from ..constants import EventType, ChannelState, ChannelMetadataField from ..url_utils import get_stream_info_for_switch @@ -36,6 +36,7 @@ class ChannelService: Returns: bool: Success status """ + proxy_server = ProxyServer.get_instance() # FIXED: First, ensure that Redis metadata including stream_id is set BEFORE channel initialization # This ensures the stream ID is available when the StreamManager looks it up if stream_id and proxy_server.redis_client: @@ -94,6 +95,8 @@ class ChannelService: Returns: dict: Result information including success status and diagnostics """ + proxy_server = ProxyServer.get_instance() + # If no direct URL is provided but a target stream is, get URL from target stream stream_id = None if not new_url and target_stream_id: @@ -211,6 +214,8 @@ class ChannelService: Returns: dict: Result information including previous state if available """ + proxy_server = ProxyServer.get_instance() + # Check if channel exists channel_exists = proxy_server.check_if_channel_exists(channel_id) if not channel_exists: @@ -287,6 +292,7 @@ class ChannelService: dict: Result information """ logger.info(f"Request to stop client {client_id} on channel {channel_id}") + proxy_server = ProxyServer.get_instance() # Set a Redis key for immediate detection key_set = False @@ -350,6 +356,8 @@ class ChannelService: Returns: tuple: (valid, state, owner, details) - validity status, current state, owner, and diagnostic info """ + proxy_server = ProxyServer.get_instance() + if not proxy_server.redis_client: return False, None, None, {"error": "Redis not available"} @@ -407,6 +415,8 @@ class ChannelService: @staticmethod def _update_channel_metadata(channel_id, url, user_agent=None, stream_id=None): """Update channel metadata in Redis""" + proxy_server = ProxyServer.get_instance() + if not proxy_server.redis_client: return False @@ -444,6 +454,8 @@ class ChannelService: @staticmethod def _publish_stream_switch_event(channel_id, new_url, user_agent=None, stream_id=None): """Publish a stream switch event to Redis pubsub""" + proxy_server = ProxyServer.get_instance() + if not proxy_server.redis_client: return False @@ -466,6 +478,8 @@ class ChannelService: @staticmethod def _publish_channel_stop_event(channel_id): """Publish a channel stop event to Redis pubsub""" + proxy_server = ProxyServer.get_instance() + if not proxy_server.redis_client: return False @@ -487,6 +501,8 @@ class ChannelService: @staticmethod def _publish_client_stop_event(channel_id, client_id): """Publish a client stop event to Redis pubsub""" + proxy_server = ProxyServer.get_instance() + if not proxy_server.redis_client: return False diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index abf52d3a..f566c4ab 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -7,7 +7,7 @@ import time import logging import threading from apps.proxy.config import TSConfig as Config -from . import proxy_server +from .server import ProxyServer from .utils import create_ts_packet, get_logger from .redis_keys import RedisKeys from .utils import get_logger @@ -97,6 +97,7 @@ class StreamGenerator: max_init_wait = getattr(Config, 'CLIENT_WAIT_TIMEOUT', 30) keepalive_interval = 0.5 last_keepalive = 0 + proxy_server = ProxyServer.get_instance() # While init is happening, send keepalive packets while time.time() - initialization_start < max_init_wait: @@ -143,6 +144,8 @@ class StreamGenerator: def _setup_streaming(self): """Setup streaming parameters and check resources.""" + proxy_server = ProxyServer.get_instance() + # Get buffer - stream manager may not exist in this worker buffer = proxy_server.stream_buffers.get(self.channel_id) stream_manager = proxy_server.stream_managers.get(self.channel_id) @@ -218,6 +221,8 @@ class StreamGenerator: def _check_resources(self): """Check if required resources still exist.""" + proxy_server = ProxyServer.get_instance() + # Enhanced resource checks if self.channel_id not in proxy_server.stream_buffers: logger.info(f"[{self.client_id}] Channel buffer no longer exists, terminating stream") @@ -264,6 +269,7 @@ class StreamGenerator: # Process and send chunks total_size = sum(len(c) for c in chunks) logger.debug(f"[{self.client_id}] Retrieved {len(chunks)} chunks ({total_size} bytes) from index {self.local_index+1} to {next_index}") + proxy_server = ProxyServer.get_instance() # Send the chunks to the client for chunk in chunks: @@ -346,6 +352,7 @@ class StreamGenerator: elapsed = time.time() - self.stream_start_time local_clients = 0 total_clients = 0 + proxy_server = ProxyServer.get_instance() if self.channel_id in proxy_server.client_managers: client_manager = proxy_server.client_managers[self.channel_id] @@ -360,6 +367,8 @@ class StreamGenerator: """ Schedule channel shutdown if there are no clients left and we're the owner. """ + proxy_server = ProxyServer.get_instance() + # If no clients left and we're the owner, schedule shutdown using the config value if local_clients == 0 and proxy_server.am_i_owner(self.channel_id): logger.info(f"No local clients left for channel {self.channel_id}, scheduling shutdown") diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index e65a1744..ad3ccbc9 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -7,7 +7,7 @@ from django.http import StreamingHttpResponse, JsonResponse, HttpResponseRedirec from django.views.decorators.csrf import csrf_exempt from django.shortcuts import get_object_or_404 from apps.proxy.config import TSConfig as Config -from . import proxy_server +from .server import ProxyServer from .channel_status import ChannelStatus from .stream_generator import create_stream_generator from .utils import get_client_ip @@ -34,6 +34,7 @@ def stream_ts(request, channel_id): channel = get_stream_object(channel_id) client_user_agent = None + proxy_server = ProxyServer.get_instance() try: # Generate a unique client ID @@ -192,6 +193,8 @@ def stream_ts(request, channel_id): @permission_classes([IsAuthenticated]) def change_stream(request, channel_id): """Change stream URL for existing channel with enhanced diagnostics""" + proxy_server = ProxyServer.get_instance() + try: data = json.loads(request.body) new_url = data.get('url') @@ -243,6 +246,8 @@ def channel_status(request, channel_id=None): - /status/ returns basic summary of all channels - /status/{channel_id} returns detailed info about specific channel """ + proxy_server = ProxyServer.get_instance() + try: # Check if Redis is available if not proxy_server.redis_client: @@ -343,6 +348,8 @@ def stop_client(request, channel_id): @permission_classes([IsAuthenticated]) def next_stream(request, channel_id): """Switch to the next available stream for a channel""" + proxy_server = ProxyServer.get_instance() + try: logger.info(f"Request to switch to next stream for channel {channel_id} received") diff --git a/core/utils.py b/core/utils.py index e89e5f33..ca6fa75f 100644 --- a/core/utils.py +++ b/core/utils.py @@ -14,144 +14,124 @@ logger = logging.getLogger(__name__) # Import the command detector from .command_utils import is_management_command -def get_redis_client(max_retries=5, retry_interval=1): - """Get Redis client with connection validation and retry logic""" - # Skip Redis connection for management commands like collectstatic - if is_management_command(): - logger.info("Running as management command - skipping Redis initialization") - return None +class RedisClient: + _client = None + _pubsub_client = None - retry_count = 0 - while retry_count < max_retries: - try: - # Get connection parameters from settings or environment - redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) - redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) - redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + @classmethod + def get_client(cls, max_retries=5, retry_interval=1): + if cls._client is None: + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - # Use standardized settings - socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5) - socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) - health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) - socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) - retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + # Use standardized settings + socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5) + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) - # Create Redis client with better defaults - client = redis.Redis( - host=redis_host, - port=redis_port, - db=redis_db, - socket_timeout=socket_timeout, - socket_connect_timeout=socket_connect_timeout, - socket_keepalive=socket_keepalive, - health_check_interval=health_check_interval, - retry_on_timeout=retry_on_timeout - ) + # Create Redis client with better defaults + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout + ) - # Validate connection with ping - client.ping() - client.flushdb() - logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") - return client + # Validate connection with ping + client.ping() + client.flushdb() + logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") - except (ConnectionError, TimeoutError) as e: - retry_count += 1 - if retry_count >= max_retries: - logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}") - return None - else: - # Use exponential backoff for retries - wait_time = retry_interval * (2 ** (retry_count - 1)) - logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") - time.sleep(wait_time) + cls._client = client + break - except Exception as e: - logger.error(f"Unexpected error connecting to Redis: {e}") - return None + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) -def get_redis_pubsub_client(max_retries=5, retry_interval=1): - """Get Redis client optimized for PubSub operations""" - # Skip Redis connection for management commands like collectstatic - if is_management_command(): - logger.info("Running as management command - skipping Redis PubSub initialization") - return None + except Exception as e: + logger.error(f"Unexpected error connecting to Redis: {e}") + return None - retry_count = 0 - while retry_count < max_retries: - try: - # Get connection parameters from settings or environment - redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) - redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) - redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) + return cls._client - # Use standardized settings but without socket timeouts for PubSub - # Important: socket_timeout is None for PubSub operations - socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) - socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) - health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) - retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) + @classmethod + def get_pubsub_client(cls, max_retries=5, retry_interval=1): + """Get Redis client optimized for PubSub operations""" + if cls._pubsub_client is None: + retry_count = 0 + while retry_count < max_retries: + try: + # Get connection parameters from settings or environment + redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost')) + redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379))) + redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0))) - # Create Redis client with PubSub-optimized settings - no timeout - client = redis.Redis( - host=redis_host, - port=redis_port, - db=redis_db, - socket_timeout=None, # Critical: No timeout for PubSub operations - socket_connect_timeout=socket_connect_timeout, - socket_keepalive=socket_keepalive, - health_check_interval=health_check_interval, - retry_on_timeout=retry_on_timeout - ) + # Use standardized settings but without socket timeouts for PubSub + # Important: socket_timeout is None for PubSub operations + socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5) + socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True) + health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30) + retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True) - # Validate connection with ping - client.ping() - logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}") + # Create Redis client with PubSub-optimized settings - no timeout + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + socket_timeout=None, # Critical: No timeout for PubSub operations + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + health_check_interval=health_check_interval, + retry_on_timeout=retry_on_timeout + ) - # We don't need the keepalive thread anymore since we're using proper PubSub handling - return client + # Validate connection with ping + client.ping() + logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}") - except (ConnectionError, TimeoutError) as e: - retry_count += 1 - if retry_count >= max_retries: - logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}") - return None - else: - # Use exponential backoff for retries - wait_time = retry_interval * (2 ** (retry_count - 1)) - logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") - time.sleep(wait_time) + # We don't need the keepalive thread anymore since we're using proper PubSub handling + cls._pubsub_client = client + break - except Exception as e: - logger.error(f"Unexpected error connecting to Redis for PubSub: {e}") - return None + except (ConnectionError, TimeoutError) as e: + retry_count += 1 + if retry_count >= max_retries: + logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}") + return None + else: + # Use exponential backoff for retries + wait_time = retry_interval * (2 ** (retry_count - 1)) + logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})") + time.sleep(wait_time) -def execute_redis_command(redis_client, command_func, default_return=None): - """ - Execute a Redis command with proper error handling + except Exception as e: + logger.error(f"Unexpected error connecting to Redis for PubSub: {e}") + return None - Args: - redis_client: The Redis client instance - command_func: Lambda function containing the Redis command to execute - default_return: Value to return if command fails - - Returns: - Command result or default_return on failure - """ - if redis_client is None: - return default_return - - try: - return command_func() - except (ConnectionError, TimeoutError) as e: - logger.warning(f"Redis connection error: {e}") - return default_return - except Exception as e: - logger.error(f"Redis command error: {e}") - return default_return + return cls._pubsub_client def acquire_task_lock(task_name, id): """Acquire a lock to prevent concurrent task execution.""" - redis_client = get_redis_client() + redis_client = RedisClient.get_client() lock_id = f"task_lock_{task_name}_{id}" # Use the Redis SET command with NX (only set if not exists) and EX (set expiration) @@ -164,7 +144,7 @@ def acquire_task_lock(task_name, id): def release_task_lock(task_name, id): """Release the lock after task execution.""" - redis_client = get_redis_client() + redis_client = RedisClient.get_client() lock_id = f"task_lock_{task_name}_{id}" # Remove the lock @@ -179,22 +159,3 @@ def send_websocket_event(event, success, data): "data": {"success": True, "type": "epg_channels"} } ) - -# Initialize the global clients with retry logic -# Skip Redis initialization if running as a management command -if __name__ == '__main__': - redis_client = None - redis_pubsub_client = None - logger.info("Running as management command - Redis clients set to None") -else: - redis_client = get_redis_client() - redis_pubsub_client = get_redis_pubsub_client() - -# Import and initialize the PubSub manager -# Skip if running as management command or if Redis client is None -if not is_management_command() and redis_client is not None: - from .redis_pubsub import get_pubsub_manager - pubsub_manager = get_pubsub_manager(redis_client) -else: - logger.info("PubSub manager not initialized (running as management command or Redis not available)") - pubsub_manager = None diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 7617b6c7..7001e03b 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -22,6 +22,7 @@ INSTALLED_APPS = [ 'apps.m3u', 'apps.output', 'apps.proxy.apps.ProxyConfig', + 'apps.proxy.ts_proxy', 'core', 'drf_yasg', 'daphne', diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index fd6cde00..e6622206 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -82,7 +82,6 @@ postgres_pid=$(su - postgres -c "/usr/lib/postgresql/14/bin/pg_ctl -D ${POSTGRES echo "✅ Postgres started with PID $postgres_pid" pids+=("$postgres_pid") - uwsgi_file="/app/docker/uwsgi.ini" if [ "$DISPATCHARR_ENV" = "dev" ] && [ "$DISPATCHARR_DEBUG" != "true" ]; then uwsgi_file="/app/docker/uwsgi.dev.ini" @@ -102,17 +101,16 @@ else pids+=("$nginx_pid") fi +cd /app +python manage.py migrate --noinput +python manage.py collectstatic --noinput + echo "🚀 Starting uwsgi..." su - $POSTGRES_USER -c "cd /app && uwsgi --ini $uwsgi_file &" uwsgi_pid=$(pgrep uwsgi | sort | head -n1) echo "✅ uwsgi started with PID $uwsgi_pid" pids+=("$uwsgi_pid") - -cd /app -python manage.py migrate --noinput -python manage.py collectstatic --noinput - # Wait for at least one process to exit and log the process that exited first if [ ${#pids[@]} -gt 0 ]; then echo "⏳ Waiting for processes to exit..."