mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
centralized and lazy-loaded redis client singleton, check for manage.py commands so we don't init proxyservers (redis connection), put manage commmands before starting uwsgi
This commit is contained in:
parent
5abaddebf8
commit
7351264e8a
15 changed files with 213 additions and 174 deletions
|
|
@ -3,7 +3,7 @@ from django.core.exceptions import ValidationError
|
|||
from core.models import StreamProfile
|
||||
from django.conf import settings
|
||||
from core.models import StreamProfile, CoreSettings
|
||||
from core.utils import redis_client, execute_redis_command
|
||||
from core.utils import RedisClient
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
|
@ -19,8 +19,7 @@ from apps.m3u.models import M3UAccount
|
|||
# Add fallback functions if Redis isn't available
|
||||
def get_total_viewers(channel_id):
|
||||
"""Get viewer count from Redis or return 0 if Redis isn't available"""
|
||||
if redis_client is None:
|
||||
return 0
|
||||
redis_client = RedisClient.get_client()
|
||||
|
||||
try:
|
||||
return int(redis_client.get(f"channel:{channel_id}:viewers") or 0)
|
||||
|
|
@ -144,7 +143,7 @@ class Stream(models.Model):
|
|||
"""
|
||||
Finds an available stream for the requested channel and returns the selected stream and profile.
|
||||
"""
|
||||
|
||||
redis_client = RedisClient.get_client()
|
||||
profile_id = redis_client.get(f"stream_profile:{self.id}")
|
||||
if profile_id:
|
||||
profile_id = int(profile_id)
|
||||
|
|
@ -184,6 +183,8 @@ class Stream(models.Model):
|
|||
"""
|
||||
Called when a stream is finished to release the lock.
|
||||
"""
|
||||
redis_client = RedisClient.get_client()
|
||||
|
||||
stream_id = self.id
|
||||
# Get the matched profile for cleanup
|
||||
profile_id = redis_client.get(f"stream_profile:{stream_id}")
|
||||
|
|
@ -280,6 +281,7 @@ class Channel(models.Model):
|
|||
"""
|
||||
Finds an available stream for the requested channel and returns the selected stream and profile.
|
||||
"""
|
||||
redis_client = RedisClient.get_client()
|
||||
|
||||
# 2. Check if a stream is already active for this channel
|
||||
stream_id = redis_client.get(f"channel_stream:{self.id}")
|
||||
|
|
@ -326,6 +328,8 @@ class Channel(models.Model):
|
|||
"""
|
||||
Called when a stream is finished to release the lock.
|
||||
"""
|
||||
redis_client = RedisClient.get_client()
|
||||
|
||||
stream_id = redis_client.get(f"channel_stream:{self.id}")
|
||||
if not stream_id:
|
||||
logger.debug("Invalid stream ID pulled from channel index")
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
from rest_framework import serializers
|
||||
from rest_framework.response import Response
|
||||
from .models import M3UAccount, M3UFilter, ServerGroup, M3UAccountProfile
|
||||
from core.models import UserAgent
|
||||
from apps.channels.models import ChannelGroup, ChannelGroupM3UAccount
|
||||
|
|
@ -32,6 +33,19 @@ class M3UAccountProfileSerializer(serializers.ModelSerializer):
|
|||
|
||||
return super().create(validated_data)
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
if instance.is_default:
|
||||
raise serializers.ValidationError("Default profiles cannot be modified.")
|
||||
return super().update(instance, validated_data)
|
||||
|
||||
def destroy(self, request, *args, **kwargs):
|
||||
instance = self.get_object()
|
||||
if instance.is_default:
|
||||
return Response(
|
||||
{"error": "Default profiles cannot be deleted."},
|
||||
status=status.HTTP_400_BAD_REQUEST
|
||||
)
|
||||
return super().destroy(request, *args, **kwargs)
|
||||
|
||||
class M3UAccountSerializer(serializers.ModelSerializer):
|
||||
"""Serializer for M3U Account"""
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ from channels.layers import get_channel_layer
|
|||
from django.utils import timezone
|
||||
import time
|
||||
import json
|
||||
from core.utils import redis_client, acquire_task_lock, release_task_lock
|
||||
from core.utils import acquire_task_lock, release_task_lock
|
||||
from core.models import CoreSettings
|
||||
from asgiref.sync import async_to_sync
|
||||
|
||||
|
|
@ -173,6 +173,7 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys):
|
|||
stream_hashes = {}
|
||||
|
||||
# compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters]
|
||||
redis_client = RedisClient.get_client()
|
||||
|
||||
logger.debug(f"Processing batch of {len(batch)}")
|
||||
for stream_info in batch:
|
||||
|
|
@ -327,6 +328,7 @@ def refresh_single_m3u_account(account_id, use_cache=False):
|
|||
if not acquire_task_lock('refresh_single_m3u_account', account_id):
|
||||
return f"Task already running for account_id={account_id}."
|
||||
|
||||
redis_client = RedisClient.get_client()
|
||||
# Record start time
|
||||
start_time = time.time()
|
||||
send_progress_update(0, account_id)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import sys
|
||||
from django.apps import AppConfig
|
||||
|
||||
class ProxyConfig(AppConfig):
|
||||
|
|
@ -7,9 +8,10 @@ class ProxyConfig(AppConfig):
|
|||
|
||||
def ready(self):
|
||||
"""Initialize proxy servers when Django starts"""
|
||||
from .hls_proxy.server import ProxyServer as HLSProxyServer
|
||||
from .ts_proxy.server import ProxyServer as TSProxyServer
|
||||
|
||||
# Initialize proxy servers
|
||||
self.hls_proxy = HLSProxyServer()
|
||||
self.ts_proxy = TSProxyServer()
|
||||
if 'manage.py' not in sys.argv:
|
||||
from .hls_proxy.server import ProxyServer as HLSProxyServer
|
||||
from .ts_proxy.server import ProxyServer as TSProxyServer
|
||||
|
||||
# Initialize proxy servers
|
||||
self.hls_proxy = HLSProxyServer()
|
||||
self.ts_proxy = TSProxyServer()
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import redis
|
|||
import json
|
||||
import logging
|
||||
import re
|
||||
from core.utils import redis_client
|
||||
from core.utils import RedisClient
|
||||
from apps.proxy.ts_proxy.channel_status import ChannelStatus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -16,6 +16,8 @@ last_known_data = {}
|
|||
|
||||
@shared_task
|
||||
def fetch_channel_stats():
|
||||
redis_client = RedisClient.get_client()
|
||||
|
||||
try:
|
||||
# Basic info for all channels
|
||||
channel_pattern = "ts_proxy:channel:*:metadata"
|
||||
|
|
|
|||
|
|
@ -1,9 +0,0 @@
|
|||
"""Transport Stream proxy module"""
|
||||
|
||||
# Only class imports, no instance creation
|
||||
from .server import ProxyServer
|
||||
from .stream_manager import StreamManager
|
||||
from .stream_buffer import StreamBuffer
|
||||
from .client_manager import ClientManager
|
||||
|
||||
proxy_server = ProxyServer()
|
||||
13
apps/proxy/ts_proxy/apps.py
Normal file
13
apps/proxy/ts_proxy/apps.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
import sys
|
||||
from django.apps import AppConfig
|
||||
|
||||
class TSProxyConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'apps.proxy.ts_proxy'
|
||||
verbose_name = "TS Stream Proxies"
|
||||
|
||||
def ready(self):
|
||||
"""Initialize proxy servers when Django starts"""
|
||||
if 'manage.py' not in sys.argv:
|
||||
from .server import ProxyServer
|
||||
ProxyServer.get_instance()
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
import logging
|
||||
import time
|
||||
import re
|
||||
from . import proxy_server
|
||||
from .server import ProxyServer
|
||||
from .redis_keys import RedisKeys
|
||||
from .constants import TS_PACKET_SIZE, ChannelMetadataField
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
|
|
@ -22,6 +22,8 @@ class ChannelStatus:
|
|||
return (total_bytes * 8) / duration / 1000
|
||||
|
||||
def get_detailed_channel_info(channel_id):
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# Get channel metadata
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
metadata = proxy_server.redis_client.hgetall(metadata_key)
|
||||
|
|
@ -230,6 +232,8 @@ class ChannelStatus:
|
|||
@staticmethod
|
||||
def _execute_redis_command(command_func):
|
||||
"""Execute Redis command with error handling"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
if not proxy_server.redis_client:
|
||||
return None
|
||||
|
||||
|
|
@ -245,6 +249,8 @@ class ChannelStatus:
|
|||
@staticmethod
|
||||
def get_basic_channel_info(channel_id):
|
||||
"""Get basic channel information with Redis error handling"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
try:
|
||||
# Use _execute_redis_command for Redis operations
|
||||
metadata_key = RedisKeys.channel_metadata(channel_id)
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import json
|
|||
from typing import Dict, Optional, Set
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from apps.channels.models import Channel, Stream
|
||||
from core.utils import get_redis_client, get_redis_pubsub_client
|
||||
from core.utils import RedisClient
|
||||
from redis.exceptions import ConnectionError, TimeoutError
|
||||
from .stream_manager import StreamManager
|
||||
from .stream_buffer import StreamBuffer
|
||||
|
|
@ -32,6 +32,19 @@ logger = get_logger()
|
|||
|
||||
class ProxyServer:
|
||||
"""Manages TS proxy server instance with worker coordination"""
|
||||
_instance = None
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls):
|
||||
if cls._instance is None:
|
||||
from .server import ProxyServer
|
||||
from .stream_manager import StreamManager
|
||||
from .stream_buffer import StreamBuffer
|
||||
from .client_manager import ClientManager
|
||||
|
||||
cls._instance = ProxyServer()
|
||||
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize proxy server with worker identification"""
|
||||
|
|
@ -54,7 +67,7 @@ class ProxyServer:
|
|||
|
||||
try:
|
||||
# Use dedicated Redis client for proxy
|
||||
self.redis_client = get_redis_client()
|
||||
self.redis_client = RedisClient.get_client()
|
||||
if self.redis_client is not None:
|
||||
logger.info(f"Using dedicated Redis client for proxy server")
|
||||
logger.info(f"Worker ID: {self.worker_id}")
|
||||
|
|
@ -76,7 +89,7 @@ class ProxyServer:
|
|||
def _setup_redis_connection(self):
|
||||
"""Setup Redis connection with retry logic"""
|
||||
# Try to use get_redis_client utility instead of direct connection
|
||||
self.redis_client = get_redis_client(max_retries=self.redis_max_retries,
|
||||
self.redis_client = RedisClient.get_client(max_retries=self.redis_max_retries,
|
||||
retry_interval=self.redis_retry_interval)
|
||||
if self.redis_client:
|
||||
logger.info(f"Successfully connected to Redis using utility function")
|
||||
|
|
@ -121,7 +134,7 @@ class ProxyServer:
|
|||
while True:
|
||||
try:
|
||||
# Use dedicated PubSub client for event listener
|
||||
pubsub_client = get_redis_pubsub_client()
|
||||
pubsub_client = RedisClient.get_pubsub_client()
|
||||
if pubsub_client:
|
||||
logger.info("Using dedicated Redis PubSub client for event listener")
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import json
|
|||
from django.shortcuts import get_object_or_404
|
||||
from apps.channels.models import Channel
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from .. import proxy_server
|
||||
from ..server import ProxyServer
|
||||
from ..redis_keys import RedisKeys
|
||||
from ..constants import EventType, ChannelState, ChannelMetadataField
|
||||
from ..url_utils import get_stream_info_for_switch
|
||||
|
|
@ -36,6 +36,7 @@ class ChannelService:
|
|||
Returns:
|
||||
bool: Success status
|
||||
"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
# FIXED: First, ensure that Redis metadata including stream_id is set BEFORE channel initialization
|
||||
# This ensures the stream ID is available when the StreamManager looks it up
|
||||
if stream_id and proxy_server.redis_client:
|
||||
|
|
@ -94,6 +95,8 @@ class ChannelService:
|
|||
Returns:
|
||||
dict: Result information including success status and diagnostics
|
||||
"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# If no direct URL is provided but a target stream is, get URL from target stream
|
||||
stream_id = None
|
||||
if not new_url and target_stream_id:
|
||||
|
|
@ -211,6 +214,8 @@ class ChannelService:
|
|||
Returns:
|
||||
dict: Result information including previous state if available
|
||||
"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# Check if channel exists
|
||||
channel_exists = proxy_server.check_if_channel_exists(channel_id)
|
||||
if not channel_exists:
|
||||
|
|
@ -287,6 +292,7 @@ class ChannelService:
|
|||
dict: Result information
|
||||
"""
|
||||
logger.info(f"Request to stop client {client_id} on channel {channel_id}")
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# Set a Redis key for immediate detection
|
||||
key_set = False
|
||||
|
|
@ -350,6 +356,8 @@ class ChannelService:
|
|||
Returns:
|
||||
tuple: (valid, state, owner, details) - validity status, current state, owner, and diagnostic info
|
||||
"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
if not proxy_server.redis_client:
|
||||
return False, None, None, {"error": "Redis not available"}
|
||||
|
||||
|
|
@ -407,6 +415,8 @@ class ChannelService:
|
|||
@staticmethod
|
||||
def _update_channel_metadata(channel_id, url, user_agent=None, stream_id=None):
|
||||
"""Update channel metadata in Redis"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
if not proxy_server.redis_client:
|
||||
return False
|
||||
|
||||
|
|
@ -444,6 +454,8 @@ class ChannelService:
|
|||
@staticmethod
|
||||
def _publish_stream_switch_event(channel_id, new_url, user_agent=None, stream_id=None):
|
||||
"""Publish a stream switch event to Redis pubsub"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
if not proxy_server.redis_client:
|
||||
return False
|
||||
|
||||
|
|
@ -466,6 +478,8 @@ class ChannelService:
|
|||
@staticmethod
|
||||
def _publish_channel_stop_event(channel_id):
|
||||
"""Publish a channel stop event to Redis pubsub"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
if not proxy_server.redis_client:
|
||||
return False
|
||||
|
||||
|
|
@ -487,6 +501,8 @@ class ChannelService:
|
|||
@staticmethod
|
||||
def _publish_client_stop_event(channel_id, client_id):
|
||||
"""Publish a client stop event to Redis pubsub"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
if not proxy_server.redis_client:
|
||||
return False
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import time
|
|||
import logging
|
||||
import threading
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from . import proxy_server
|
||||
from .server import ProxyServer
|
||||
from .utils import create_ts_packet, get_logger
|
||||
from .redis_keys import RedisKeys
|
||||
from .utils import get_logger
|
||||
|
|
@ -97,6 +97,7 @@ class StreamGenerator:
|
|||
max_init_wait = getattr(Config, 'CLIENT_WAIT_TIMEOUT', 30)
|
||||
keepalive_interval = 0.5
|
||||
last_keepalive = 0
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# While init is happening, send keepalive packets
|
||||
while time.time() - initialization_start < max_init_wait:
|
||||
|
|
@ -143,6 +144,8 @@ class StreamGenerator:
|
|||
|
||||
def _setup_streaming(self):
|
||||
"""Setup streaming parameters and check resources."""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# Get buffer - stream manager may not exist in this worker
|
||||
buffer = proxy_server.stream_buffers.get(self.channel_id)
|
||||
stream_manager = proxy_server.stream_managers.get(self.channel_id)
|
||||
|
|
@ -218,6 +221,8 @@ class StreamGenerator:
|
|||
|
||||
def _check_resources(self):
|
||||
"""Check if required resources still exist."""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# Enhanced resource checks
|
||||
if self.channel_id not in proxy_server.stream_buffers:
|
||||
logger.info(f"[{self.client_id}] Channel buffer no longer exists, terminating stream")
|
||||
|
|
@ -264,6 +269,7 @@ class StreamGenerator:
|
|||
# Process and send chunks
|
||||
total_size = sum(len(c) for c in chunks)
|
||||
logger.debug(f"[{self.client_id}] Retrieved {len(chunks)} chunks ({total_size} bytes) from index {self.local_index+1} to {next_index}")
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# Send the chunks to the client
|
||||
for chunk in chunks:
|
||||
|
|
@ -346,6 +352,7 @@ class StreamGenerator:
|
|||
elapsed = time.time() - self.stream_start_time
|
||||
local_clients = 0
|
||||
total_clients = 0
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
if self.channel_id in proxy_server.client_managers:
|
||||
client_manager = proxy_server.client_managers[self.channel_id]
|
||||
|
|
@ -360,6 +367,8 @@ class StreamGenerator:
|
|||
"""
|
||||
Schedule channel shutdown if there are no clients left and we're the owner.
|
||||
"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
# If no clients left and we're the owner, schedule shutdown using the config value
|
||||
if local_clients == 0 and proxy_server.am_i_owner(self.channel_id):
|
||||
logger.info(f"No local clients left for channel {self.channel_id}, scheduling shutdown")
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ from django.http import StreamingHttpResponse, JsonResponse, HttpResponseRedirec
|
|||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.shortcuts import get_object_or_404
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from . import proxy_server
|
||||
from .server import ProxyServer
|
||||
from .channel_status import ChannelStatus
|
||||
from .stream_generator import create_stream_generator
|
||||
from .utils import get_client_ip
|
||||
|
|
@ -34,6 +34,7 @@ def stream_ts(request, channel_id):
|
|||
channel = get_stream_object(channel_id)
|
||||
|
||||
client_user_agent = None
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
try:
|
||||
# Generate a unique client ID
|
||||
|
|
@ -192,6 +193,8 @@ def stream_ts(request, channel_id):
|
|||
@permission_classes([IsAuthenticated])
|
||||
def change_stream(request, channel_id):
|
||||
"""Change stream URL for existing channel with enhanced diagnostics"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
try:
|
||||
data = json.loads(request.body)
|
||||
new_url = data.get('url')
|
||||
|
|
@ -243,6 +246,8 @@ def channel_status(request, channel_id=None):
|
|||
- /status/ returns basic summary of all channels
|
||||
- /status/{channel_id} returns detailed info about specific channel
|
||||
"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
try:
|
||||
# Check if Redis is available
|
||||
if not proxy_server.redis_client:
|
||||
|
|
@ -343,6 +348,8 @@ def stop_client(request, channel_id):
|
|||
@permission_classes([IsAuthenticated])
|
||||
def next_stream(request, channel_id):
|
||||
"""Switch to the next available stream for a channel"""
|
||||
proxy_server = ProxyServer.get_instance()
|
||||
|
||||
try:
|
||||
logger.info(f"Request to switch to next stream for channel {channel_id} received")
|
||||
|
||||
|
|
|
|||
239
core/utils.py
239
core/utils.py
|
|
@ -14,144 +14,124 @@ logger = logging.getLogger(__name__)
|
|||
# Import the command detector
|
||||
from .command_utils import is_management_command
|
||||
|
||||
def get_redis_client(max_retries=5, retry_interval=1):
|
||||
"""Get Redis client with connection validation and retry logic"""
|
||||
# Skip Redis connection for management commands like collectstatic
|
||||
if is_management_command():
|
||||
logger.info("Running as management command - skipping Redis initialization")
|
||||
return None
|
||||
class RedisClient:
|
||||
_client = None
|
||||
_pubsub_client = None
|
||||
|
||||
retry_count = 0
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
@classmethod
|
||||
def get_client(cls, max_retries=5, retry_interval=1):
|
||||
if cls._client is None:
|
||||
retry_count = 0
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
# Use standardized settings
|
||||
socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5)
|
||||
socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5)
|
||||
health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30)
|
||||
socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True)
|
||||
retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True)
|
||||
# Use standardized settings
|
||||
socket_timeout = getattr(settings, 'REDIS_SOCKET_TIMEOUT', 5)
|
||||
socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5)
|
||||
health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30)
|
||||
socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True)
|
||||
retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True)
|
||||
|
||||
# Create Redis client with better defaults
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=socket_timeout,
|
||||
socket_connect_timeout=socket_connect_timeout,
|
||||
socket_keepalive=socket_keepalive,
|
||||
health_check_interval=health_check_interval,
|
||||
retry_on_timeout=retry_on_timeout
|
||||
)
|
||||
# Create Redis client with better defaults
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=socket_timeout,
|
||||
socket_connect_timeout=socket_connect_timeout,
|
||||
socket_keepalive=socket_keepalive,
|
||||
health_check_interval=health_check_interval,
|
||||
retry_on_timeout=retry_on_timeout
|
||||
)
|
||||
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
client.flushdb()
|
||||
logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}")
|
||||
return client
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
client.flushdb()
|
||||
logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}")
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}")
|
||||
return None
|
||||
else:
|
||||
# Use exponential backoff for retries
|
||||
wait_time = retry_interval * (2 ** (retry_count - 1))
|
||||
logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
cls._client = client
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis: {e}")
|
||||
return None
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to connect to Redis after {max_retries} attempts: {e}")
|
||||
return None
|
||||
else:
|
||||
# Use exponential backoff for retries
|
||||
wait_time = retry_interval * (2 ** (retry_count - 1))
|
||||
logger.warning(f"Redis connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
|
||||
def get_redis_pubsub_client(max_retries=5, retry_interval=1):
|
||||
"""Get Redis client optimized for PubSub operations"""
|
||||
# Skip Redis connection for management commands like collectstatic
|
||||
if is_management_command():
|
||||
logger.info("Running as management command - skipping Redis PubSub initialization")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis: {e}")
|
||||
return None
|
||||
|
||||
retry_count = 0
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
return cls._client
|
||||
|
||||
# Use standardized settings but without socket timeouts for PubSub
|
||||
# Important: socket_timeout is None for PubSub operations
|
||||
socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5)
|
||||
socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True)
|
||||
health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30)
|
||||
retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True)
|
||||
@classmethod
|
||||
def get_pubsub_client(cls, max_retries=5, retry_interval=1):
|
||||
"""Get Redis client optimized for PubSub operations"""
|
||||
if cls._pubsub_client is None:
|
||||
retry_count = 0
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Get connection parameters from settings or environment
|
||||
redis_host = os.environ.get("REDIS_HOST", getattr(settings, 'REDIS_HOST', 'localhost'))
|
||||
redis_port = int(os.environ.get("REDIS_PORT", getattr(settings, 'REDIS_PORT', 6379)))
|
||||
redis_db = int(os.environ.get("REDIS_DB", getattr(settings, 'REDIS_DB', 0)))
|
||||
|
||||
# Create Redis client with PubSub-optimized settings - no timeout
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=None, # Critical: No timeout for PubSub operations
|
||||
socket_connect_timeout=socket_connect_timeout,
|
||||
socket_keepalive=socket_keepalive,
|
||||
health_check_interval=health_check_interval,
|
||||
retry_on_timeout=retry_on_timeout
|
||||
)
|
||||
# Use standardized settings but without socket timeouts for PubSub
|
||||
# Important: socket_timeout is None for PubSub operations
|
||||
socket_connect_timeout = getattr(settings, 'REDIS_SOCKET_CONNECT_TIMEOUT', 5)
|
||||
socket_keepalive = getattr(settings, 'REDIS_SOCKET_KEEPALIVE', True)
|
||||
health_check_interval = getattr(settings, 'REDIS_HEALTH_CHECK_INTERVAL', 30)
|
||||
retry_on_timeout = getattr(settings, 'REDIS_RETRY_ON_TIMEOUT', True)
|
||||
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}")
|
||||
# Create Redis client with PubSub-optimized settings - no timeout
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=None, # Critical: No timeout for PubSub operations
|
||||
socket_connect_timeout=socket_connect_timeout,
|
||||
socket_keepalive=socket_keepalive,
|
||||
health_check_interval=health_check_interval,
|
||||
retry_on_timeout=retry_on_timeout
|
||||
)
|
||||
|
||||
# We don't need the keepalive thread anymore since we're using proper PubSub handling
|
||||
return client
|
||||
# Validate connection with ping
|
||||
client.ping()
|
||||
logger.info(f"Connected to Redis for PubSub at {redis_host}:{redis_port}/{redis_db}")
|
||||
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}")
|
||||
return None
|
||||
else:
|
||||
# Use exponential backoff for retries
|
||||
wait_time = retry_interval * (2 ** (retry_count - 1))
|
||||
logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
# We don't need the keepalive thread anymore since we're using proper PubSub handling
|
||||
cls._pubsub_client = client
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis for PubSub: {e}")
|
||||
return None
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to connect to Redis for PubSub after {max_retries} attempts: {e}")
|
||||
return None
|
||||
else:
|
||||
# Use exponential backoff for retries
|
||||
wait_time = retry_interval * (2 ** (retry_count - 1))
|
||||
logger.warning(f"Redis PubSub connection failed. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
|
||||
def execute_redis_command(redis_client, command_func, default_return=None):
|
||||
"""
|
||||
Execute a Redis command with proper error handling
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error connecting to Redis for PubSub: {e}")
|
||||
return None
|
||||
|
||||
Args:
|
||||
redis_client: The Redis client instance
|
||||
command_func: Lambda function containing the Redis command to execute
|
||||
default_return: Value to return if command fails
|
||||
|
||||
Returns:
|
||||
Command result or default_return on failure
|
||||
"""
|
||||
if redis_client is None:
|
||||
return default_return
|
||||
|
||||
try:
|
||||
return command_func()
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
logger.warning(f"Redis connection error: {e}")
|
||||
return default_return
|
||||
except Exception as e:
|
||||
logger.error(f"Redis command error: {e}")
|
||||
return default_return
|
||||
return cls._pubsub_client
|
||||
|
||||
def acquire_task_lock(task_name, id):
|
||||
"""Acquire a lock to prevent concurrent task execution."""
|
||||
redis_client = get_redis_client()
|
||||
redis_client = RedisClient.get_client()
|
||||
lock_id = f"task_lock_{task_name}_{id}"
|
||||
|
||||
# Use the Redis SET command with NX (only set if not exists) and EX (set expiration)
|
||||
|
|
@ -164,7 +144,7 @@ def acquire_task_lock(task_name, id):
|
|||
|
||||
def release_task_lock(task_name, id):
|
||||
"""Release the lock after task execution."""
|
||||
redis_client = get_redis_client()
|
||||
redis_client = RedisClient.get_client()
|
||||
lock_id = f"task_lock_{task_name}_{id}"
|
||||
|
||||
# Remove the lock
|
||||
|
|
@ -179,22 +159,3 @@ def send_websocket_event(event, success, data):
|
|||
"data": {"success": True, "type": "epg_channels"}
|
||||
}
|
||||
)
|
||||
|
||||
# Initialize the global clients with retry logic
|
||||
# Skip Redis initialization if running as a management command
|
||||
if __name__ == '__main__':
|
||||
redis_client = None
|
||||
redis_pubsub_client = None
|
||||
logger.info("Running as management command - Redis clients set to None")
|
||||
else:
|
||||
redis_client = get_redis_client()
|
||||
redis_pubsub_client = get_redis_pubsub_client()
|
||||
|
||||
# Import and initialize the PubSub manager
|
||||
# Skip if running as management command or if Redis client is None
|
||||
if not is_management_command() and redis_client is not None:
|
||||
from .redis_pubsub import get_pubsub_manager
|
||||
pubsub_manager = get_pubsub_manager(redis_client)
|
||||
else:
|
||||
logger.info("PubSub manager not initialized (running as management command or Redis not available)")
|
||||
pubsub_manager = None
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ INSTALLED_APPS = [
|
|||
'apps.m3u',
|
||||
'apps.output',
|
||||
'apps.proxy.apps.ProxyConfig',
|
||||
'apps.proxy.ts_proxy',
|
||||
'core',
|
||||
'drf_yasg',
|
||||
'daphne',
|
||||
|
|
|
|||
|
|
@ -82,7 +82,6 @@ postgres_pid=$(su - postgres -c "/usr/lib/postgresql/14/bin/pg_ctl -D ${POSTGRES
|
|||
echo "✅ Postgres started with PID $postgres_pid"
|
||||
pids+=("$postgres_pid")
|
||||
|
||||
|
||||
uwsgi_file="/app/docker/uwsgi.ini"
|
||||
if [ "$DISPATCHARR_ENV" = "dev" ] && [ "$DISPATCHARR_DEBUG" != "true" ]; then
|
||||
uwsgi_file="/app/docker/uwsgi.dev.ini"
|
||||
|
|
@ -102,17 +101,16 @@ else
|
|||
pids+=("$nginx_pid")
|
||||
fi
|
||||
|
||||
cd /app
|
||||
python manage.py migrate --noinput
|
||||
python manage.py collectstatic --noinput
|
||||
|
||||
echo "🚀 Starting uwsgi..."
|
||||
su - $POSTGRES_USER -c "cd /app && uwsgi --ini $uwsgi_file &"
|
||||
uwsgi_pid=$(pgrep uwsgi | sort | head -n1)
|
||||
echo "✅ uwsgi started with PID $uwsgi_pid"
|
||||
pids+=("$uwsgi_pid")
|
||||
|
||||
|
||||
cd /app
|
||||
python manage.py migrate --noinput
|
||||
python manage.py collectstatic --noinput
|
||||
|
||||
# Wait for at least one process to exit and log the process that exited first
|
||||
if [ ${#pids[@]} -gt 0 ]; then
|
||||
echo "⏳ Waiting for processes to exit..."
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue