mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
integrating proxy with rest of the application, added transcoding to ts proxy
This commit is contained in:
parent
3a0df9c6cd
commit
0607957f67
5 changed files with 541 additions and 415 deletions
|
|
@ -1,6 +1,8 @@
|
|||
from django.db import models
|
||||
from django.core.exceptions import ValidationError
|
||||
from core.models import StreamProfile
|
||||
from django.conf import settings
|
||||
from core.models import StreamProfile, CoreSettings
|
||||
from core.utils import redis_client
|
||||
|
||||
# If you have an M3UAccount model in apps.m3u, you can still import it:
|
||||
from apps.m3u.models import M3UAccount
|
||||
|
|
@ -100,6 +102,75 @@ class Channel(models.Model):
|
|||
def __str__(self):
|
||||
return f"{self.channel_number} - {self.channel_name}"
|
||||
|
||||
def get_stream_profile(self):
|
||||
stream_profile = self.stream_profile
|
||||
if not stream_profile:
|
||||
stream_profile = StreamProfile.objects.get(id=CoreSettings.objects.get(key="default-stream-profile").value)
|
||||
|
||||
return stream_profile
|
||||
|
||||
def get_stream(self):
|
||||
"""
|
||||
Finds an available stream for the requested channel and returns the selected stream and profile.
|
||||
"""
|
||||
|
||||
# 2. Check if a stream is already active for this channel
|
||||
stream_id = redis_client.get(f"channel_stream:{self.id}")
|
||||
if stream_id:
|
||||
profile_id = redis_client.get(f"stream_profile:{stream_id}")
|
||||
if profile_id:
|
||||
return stream_id, profile_id
|
||||
|
||||
# 3. Iterate through channel streams and their profiles
|
||||
for stream in self.streams.all().order_by('channelstream__order'):
|
||||
# Retrieve the M3U account associated with the stream.
|
||||
m3u_account = stream.m3u_account
|
||||
m3u_profiles = m3u_account.profiles.all()
|
||||
default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
|
||||
profiles = [default_profile] + [obj for obj in m3u_profiles if not obj.is_default]
|
||||
|
||||
for profile in profiles:
|
||||
# Skip inactive profiles
|
||||
if profile.is_active == False:
|
||||
continue
|
||||
|
||||
profile_connections_key = f"profile_connections:{profile.id}"
|
||||
current_connections = int(redis_client.get(profile_connections_key) or 0)
|
||||
|
||||
# Check if profile has available slots (or unlimited connections)
|
||||
if profile.max_streams == 0 or current_connections < profile.max_streams:
|
||||
# Start a new stream
|
||||
redis_client.set(f"channel_stream:{self.id}", stream.url)
|
||||
redis_client.set(f"stream_profile:{stream.url}", profile.id) # Store only the matched profile
|
||||
|
||||
# Increment connection count for profiles with limits
|
||||
if profile.max_streams > 0:
|
||||
redis_client.incr(profile_connections_key)
|
||||
|
||||
return stream.id, profile.id # Return newly assigned stream and matched profile
|
||||
|
||||
# 4. No available streams
|
||||
return None, None
|
||||
|
||||
def release_stream(self):
|
||||
"""
|
||||
Called when a stream is finished to release the lock.
|
||||
"""
|
||||
stream_id = redis_client.get(f"channel_stream:{self.id}")
|
||||
if stream_id:
|
||||
redis_client.delete(f"channel_stream:{self.id}") # Remove active stream
|
||||
|
||||
# Get the matched profile for cleanup
|
||||
profile_id = redis_client.get(f"stream_profile:{stream_id}")
|
||||
if profile_id:
|
||||
profile_connections_key = f"profile_connections:{profile_id}"
|
||||
|
||||
# Only decrement if the profile had a max_connections limit
|
||||
current_count = int(redis_client.get(profile_connections_key) or 0)
|
||||
if current_count > 0:
|
||||
redis_client.decr(profile_connections_key)
|
||||
|
||||
redis_client.delete(f"stream_profile:{stream_id}") # Remove profile association
|
||||
|
||||
class ChannelGroup(models.Model):
|
||||
name = models.CharField(max_length=100, unique=True)
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -5,6 +5,6 @@ app_name = 'ts_proxy'
|
|||
|
||||
urlpatterns = [
|
||||
path('stream/<str:channel_id>', views.stream_ts, name='stream'),
|
||||
path('initialize/<str:channel_id>', views.initialize_stream, name='initialize'),
|
||||
# path('initialize/<str:channel_id>', views.initialize_stream, name='initialize'),
|
||||
path('change_stream/<str:channel_id>', views.change_stream, name='change_stream'),
|
||||
]
|
||||
]
|
||||
|
|
|
|||
|
|
@ -5,11 +5,16 @@ import time
|
|||
import random
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
from django.http import StreamingHttpResponse, JsonResponse
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.views.decorators.http import require_http_methods, require_GET
|
||||
from django.shortcuts import get_object_or_404
|
||||
from apps.proxy.config import TSConfig as Config
|
||||
from .server import ProxyServer
|
||||
from apps.channels.models import Channel, Stream
|
||||
from apps.m3u.models import M3UAccount, M3UAccountProfile
|
||||
from core.models import UserAgent
|
||||
|
||||
# Configure logging properly to ensure visibility
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -26,24 +31,14 @@ print("TS PROXY VIEWS INITIALIZED", file=sys.stderr)
|
|||
# Initialize proxy server
|
||||
proxy_server = ProxyServer()
|
||||
|
||||
@csrf_exempt
|
||||
@require_http_methods(["POST"])
|
||||
def initialize_stream(request, channel_id):
|
||||
def initialize_stream(channel_id, url, user_agent, transcode_cmd):
|
||||
"""Initialize a new stream channel with initialization-based ownership"""
|
||||
try:
|
||||
data = json.loads(request.body)
|
||||
url = data.get('url')
|
||||
if not url:
|
||||
return JsonResponse({'error': 'No URL provided'}, status=400)
|
||||
|
||||
# Get optional user_agent from request
|
||||
user_agent = data.get('user_agent')
|
||||
|
||||
# Try to acquire ownership and create connection
|
||||
success = proxy_server.initialize_channel(url, channel_id, user_agent)
|
||||
success = proxy_server.initialize_channel(url, channel_id, user_agent, transcode_cmd)
|
||||
if not success:
|
||||
return JsonResponse({'error': 'Failed to initialize channel'}, status=500)
|
||||
|
||||
|
||||
# If we're the owner, wait for connection
|
||||
if proxy_server.am_i_owner(channel_id):
|
||||
# Wait for connection to be established
|
||||
|
|
@ -62,7 +57,7 @@ def initialize_stream(request, channel_id):
|
|||
'error': 'Failed to connect'
|
||||
}, status=502)
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
# Return success response with owner status
|
||||
return JsonResponse({
|
||||
'message': 'Stream initialized and connected',
|
||||
|
|
@ -70,7 +65,7 @@ def initialize_stream(request, channel_id):
|
|||
'url': url,
|
||||
'owner': proxy_server.am_i_owner(channel_id)
|
||||
})
|
||||
|
||||
|
||||
except json.JSONDecodeError:
|
||||
return JsonResponse({'error': 'Invalid JSON'}, status=400)
|
||||
except Exception as e:
|
||||
|
|
@ -80,24 +75,54 @@ def initialize_stream(request, channel_id):
|
|||
@require_GET
|
||||
def stream_ts(request, channel_id):
|
||||
"""Stream TS data to client with improved waiting for initialization"""
|
||||
user_agent = None
|
||||
channel = get_object_or_404(Channel, pk=channel_id)
|
||||
|
||||
try:
|
||||
# Check if channel exists or initialize it
|
||||
if not proxy_server.check_if_channel_exists(channel_id):
|
||||
return JsonResponse({'error': 'Channel not found'}, status=404)
|
||||
stream_id, profile_id = channel.get_stream()
|
||||
if stream_id is None or profile_id is None:
|
||||
return JsonResponse({'error': 'Channel not available'}, status=404)
|
||||
|
||||
# Load in necessary objects for the stream
|
||||
stream = get_object_or_404(Stream, pk=stream_id)
|
||||
profile = get_object_or_404(M3UAccountProfile, pk=profile_id)
|
||||
|
||||
# Load in the user-agent for the account
|
||||
m3u_account = M3UAccount.objects.get(id=profile.m3u_account.id)
|
||||
user_agent = UserAgent.objects.get(id=m3u_account.user_agent.id).user_agent
|
||||
|
||||
# Generate stream URL based on the selected profile
|
||||
input_url = stream.custom_url or stream.url
|
||||
logger.debug("Executing the following pattern replacement:")
|
||||
logger.debug(f" search: {profile.search_pattern}")
|
||||
safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', profile.replace_pattern)
|
||||
logger.debug(f" replace: {profile.replace_pattern}")
|
||||
logger.debug(f" safe replace: {safe_replace_pattern}")
|
||||
stream_url = re.sub(profile.search_pattern, safe_replace_pattern, input_url)
|
||||
logger.debug(f"Generated stream url: {stream_url}")
|
||||
|
||||
# Generate transcode command
|
||||
# @TODO: once complete, provide option to direct proxy
|
||||
transcode_cmd = channel.get_stream_profile().build_command(stream_url)
|
||||
|
||||
initialize_stream(channel_id, stream_url, user_agent, transcode_cmd)
|
||||
|
||||
|
||||
# Get user agent from request headers
|
||||
user_agent = None
|
||||
for header in ['HTTP_USER_AGENT', 'User-Agent', 'user-agent']:
|
||||
if header in request.META:
|
||||
user_agent = request.META[header]
|
||||
logger.debug(f"Found user agent in header: {header}")
|
||||
break
|
||||
|
||||
if user_agent is None:
|
||||
for header in ['HTTP_USER_AGENT', 'User-Agent', 'user-agent']:
|
||||
if header in request.META:
|
||||
user_agent = request.META[header]
|
||||
logger.debug(f"Found user agent in header: {header}")
|
||||
break
|
||||
|
||||
# Wait for channel to become ready if it's initializing
|
||||
if proxy_server.redis_client:
|
||||
wait_start = time.time()
|
||||
max_wait = getattr(Config, 'CLIENT_WAIT_TIMEOUT', 30) # Maximum wait time in seconds
|
||||
|
||||
|
||||
# Check channel state
|
||||
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
|
||||
while time.time() - wait_start < max_wait:
|
||||
|
|
@ -105,9 +130,9 @@ def stream_ts(request, channel_id):
|
|||
if not metadata or b'state' not in metadata:
|
||||
logger.warning(f"Channel {channel_id} metadata missing")
|
||||
break
|
||||
|
||||
|
||||
state = metadata[b'state'].decode('utf-8')
|
||||
|
||||
|
||||
# If channel is already active or waiting for clients, no need to wait
|
||||
if state in ['waiting_for_clients', 'active']:
|
||||
logger.debug(f"Channel {channel_id} ready (state={state})")
|
||||
|
|
@ -121,7 +146,7 @@ def stream_ts(request, channel_id):
|
|||
# Unknown or error state
|
||||
logger.warning(f"Channel {channel_id} in unexpected state: {state}")
|
||||
break
|
||||
|
||||
|
||||
# Check if we timed out waiting
|
||||
if time.time() - wait_start >= max_wait:
|
||||
logger.warning(f"Timeout waiting for channel {channel_id} to become ready")
|
||||
|
|
@ -131,7 +156,7 @@ def stream_ts(request, channel_id):
|
|||
# This handles the case where channel exists in Redis but not in this worker
|
||||
if channel_id not in proxy_server.stream_buffers or channel_id not in proxy_server.client_managers:
|
||||
logger.warning(f"Channel {channel_id} exists in Redis but not initialized in this worker - initializing now")
|
||||
|
||||
|
||||
# Get URL from Redis metadata if available
|
||||
url = None
|
||||
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
|
||||
|
|
@ -139,15 +164,15 @@ def stream_ts(request, channel_id):
|
|||
url_bytes = proxy_server.redis_client.hget(metadata_key, "url")
|
||||
if url_bytes:
|
||||
url = url_bytes.decode('utf-8')
|
||||
|
||||
|
||||
# Initialize local resources (won't recreate stream connection if another worker owns it)
|
||||
success = proxy_server.initialize_channel(url, channel_id, user_agent)
|
||||
if not success:
|
||||
logger.error(f"Failed to initialize channel {channel_id} locally")
|
||||
return JsonResponse({'error': 'Failed to initialize channel locally'}, status=500)
|
||||
|
||||
|
||||
logger.info(f"Successfully initialized channel {channel_id} locally")
|
||||
|
||||
|
||||
# Double-check after initialization
|
||||
if channel_id not in proxy_server.client_managers:
|
||||
logger.error(f"Critical error: Channel {channel_id} client manager still missing after initialization")
|
||||
|
|
@ -159,20 +184,20 @@ def stream_ts(request, channel_id):
|
|||
stream_start_time = time.time()
|
||||
bytes_sent = 0
|
||||
chunks_sent = 0
|
||||
|
||||
|
||||
try:
|
||||
# ENHANCED USER AGENT DETECTION - check multiple possible headers
|
||||
user_agent = None
|
||||
|
||||
|
||||
# Try multiple possible header formats
|
||||
ua_headers = ['HTTP_USER_AGENT', 'User-Agent', 'user-agent', 'User_Agent']
|
||||
|
||||
|
||||
for header in ua_headers:
|
||||
if header in request.META:
|
||||
user_agent = request.META[header]
|
||||
logger.debug(f"Found user agent in header: {header}")
|
||||
break
|
||||
|
||||
|
||||
# Try request.headers dictionary (Django 2.2+)
|
||||
if not user_agent and hasattr(request, 'headers'):
|
||||
for header in ['User-Agent', 'user-agent']:
|
||||
|
|
@ -180,7 +205,7 @@ def stream_ts(request, channel_id):
|
|||
user_agent = request.headers[header]
|
||||
logger.debug(f"Found user agent in request.headers: {header}")
|
||||
break
|
||||
|
||||
|
||||
# Final fallback - check if in any header with case-insensitive matching
|
||||
if not user_agent:
|
||||
for key, value in request.META.items():
|
||||
|
|
@ -188,63 +213,63 @@ def stream_ts(request, channel_id):
|
|||
user_agent = value
|
||||
logger.debug(f"Found user agent in alternate header: {key}")
|
||||
break
|
||||
|
||||
|
||||
# Log headers for debugging user agent issues
|
||||
if not user_agent:
|
||||
# Log all headers to help troubleshoot
|
||||
headers = {k: v for k, v in request.META.items() if k.startswith('HTTP_')}
|
||||
logger.debug(f"No user agent found in request. Available headers: {headers}")
|
||||
user_agent = "Unknown-Client" # Default value instead of None
|
||||
|
||||
|
||||
logger.info(f"[{client_id}] New client connected to channel {channel_id} with user agent: {user_agent}")
|
||||
|
||||
|
||||
# Add client to manager with user agent
|
||||
client_manager = proxy_server.client_managers[channel_id]
|
||||
client_count = client_manager.add_client(client_id, user_agent)
|
||||
|
||||
|
||||
# If this is the first client, try to acquire ownership
|
||||
if client_count == 1 and not proxy_server.am_i_owner(channel_id):
|
||||
if proxy_server.try_acquire_ownership(channel_id):
|
||||
logger.info(f"[{client_id}] First client, acquiring channel ownership")
|
||||
|
||||
|
||||
# Get channel metadata from Redis
|
||||
if proxy_server.redis_client:
|
||||
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
|
||||
url_bytes = proxy_server.redis_client.hget(metadata_key, "url")
|
||||
ua_bytes = proxy_server.redis_client.hget(metadata_key, "user_agent")
|
||||
|
||||
|
||||
url = url_bytes.decode('utf-8') if url_bytes else None
|
||||
user_agent = ua_bytes.decode('utf-8') if ua_bytes else None
|
||||
|
||||
|
||||
if url:
|
||||
# Create and start stream connection
|
||||
from .server import StreamManager # Import here to avoid circular import
|
||||
|
||||
|
||||
logger.info(f"[{client_id}] Creating stream connection for URL: {url}")
|
||||
buffer = proxy_server.stream_buffers[channel_id]
|
||||
|
||||
|
||||
stream_manager = StreamManager(url, buffer, user_agent=user_agent)
|
||||
proxy_server.stream_managers[channel_id] = stream_manager
|
||||
|
||||
|
||||
thread = threading.Thread(target=stream_manager.run, daemon=True)
|
||||
thread.name = f"stream-{channel_id}"
|
||||
thread.start()
|
||||
|
||||
|
||||
# Wait briefly for connection
|
||||
wait_start = time.time()
|
||||
while not stream_manager.connected:
|
||||
if time.time() - wait_start > Config.CONNECTION_TIMEOUT:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
# Get buffer - stream manager may not exist in this worker
|
||||
buffer = proxy_server.stream_buffers.get(channel_id)
|
||||
stream_manager = proxy_server.stream_managers.get(channel_id)
|
||||
|
||||
|
||||
if not buffer:
|
||||
logger.error(f"[{client_id}] No buffer found for channel {channel_id}")
|
||||
return
|
||||
|
||||
|
||||
# Client state tracking - use config for initial position
|
||||
local_index = max(0, buffer.index - Config.INITIAL_BEHIND_CHUNKS)
|
||||
initial_position = local_index
|
||||
|
|
@ -254,46 +279,46 @@ def stream_ts(request, channel_id):
|
|||
chunks_sent = 0
|
||||
stream_start_time = time.time()
|
||||
consecutive_empty = 0 # Track consecutive empty reads
|
||||
|
||||
|
||||
# Timing parameters from config
|
||||
ts_packet_size = 188
|
||||
target_bitrate = Config.TARGET_BITRATE
|
||||
target_bitrate = Config.TARGET_BITRATE
|
||||
packets_per_second = target_bitrate / (8 * ts_packet_size)
|
||||
|
||||
|
||||
logger.info(f"[{client_id}] Starting stream at index {local_index} (buffer at {buffer.index})")
|
||||
|
||||
|
||||
# Check if we're the owner worker
|
||||
is_owner_worker = proxy_server.am_i_owner(channel_id) if hasattr(proxy_server, 'am_i_owner') else True
|
||||
|
||||
|
||||
# Main streaming loop
|
||||
while True:
|
||||
# Get chunks at client's position
|
||||
chunks = buffer.get_chunks_exact(local_index, Config.CHUNK_BATCH_SIZE)
|
||||
|
||||
|
||||
if chunks:
|
||||
# Reset empty counters since we got data
|
||||
empty_reads = 0
|
||||
consecutive_empty = 0
|
||||
|
||||
|
||||
# Track and send chunks
|
||||
chunk_sizes = [len(c) for c in chunks]
|
||||
total_size = sum(chunk_sizes)
|
||||
start_idx = local_index + 1
|
||||
end_idx = local_index + len(chunks)
|
||||
|
||||
|
||||
logger.debug(f"[{client_id}] Retrieved {len(chunks)} chunks ({total_size} bytes) from index {start_idx} to {end_idx}")
|
||||
|
||||
|
||||
# Calculate total packet count for this batch to maintain timing
|
||||
total_packets = sum(len(chunk) // ts_packet_size for chunk in chunks)
|
||||
batch_start_time = time.time()
|
||||
packets_sent_in_batch = 0
|
||||
|
||||
|
||||
# Send chunks with pacing
|
||||
for chunk in chunks:
|
||||
packets_in_chunk = len(chunk) // ts_packet_size
|
||||
bytes_sent += len(chunk)
|
||||
chunks_sent += 1
|
||||
|
||||
|
||||
# CRITICAL FIX: Detect client disconnection when yielding data
|
||||
try:
|
||||
yield chunk
|
||||
|
|
@ -303,24 +328,24 @@ def stream_ts(request, channel_id):
|
|||
if channel_id in proxy_server.client_managers:
|
||||
proxy_server.client_managers[channel_id].remove_client(client_id)
|
||||
break # Exit the generator
|
||||
|
||||
|
||||
# Pacing logic
|
||||
packets_sent_in_batch += packets_in_chunk
|
||||
elapsed = time.time() - batch_start_time
|
||||
target_time = packets_sent_in_batch / packets_per_second
|
||||
|
||||
|
||||
# If we're sending too fast, add a small delay
|
||||
if elapsed < target_time and packets_sent_in_batch < total_packets:
|
||||
sleep_time = min(target_time - elapsed, 0.05)
|
||||
if sleep_time > 0.001:
|
||||
time.sleep(sleep_time)
|
||||
|
||||
|
||||
# Log progress periodically
|
||||
if chunks_sent % 100 == 0:
|
||||
elapsed = time.time() - stream_start_time
|
||||
rate = bytes_sent / elapsed / 1024 if elapsed > 0 else 0
|
||||
logger.info(f"[{client_id}] Stats: {chunks_sent} chunks, {bytes_sent/1024:.1f}KB, {rate:.1f}KB/s")
|
||||
|
||||
|
||||
# Update local index
|
||||
local_index = end_idx
|
||||
last_yield_time = time.time()
|
||||
|
|
@ -328,14 +353,14 @@ def stream_ts(request, channel_id):
|
|||
# No chunks available
|
||||
empty_reads += 1
|
||||
consecutive_empty += 1
|
||||
|
||||
|
||||
# Check if we're caught up to buffer head
|
||||
at_buffer_head = local_index >= buffer.index
|
||||
|
||||
|
||||
# If we're at buffer head and no data is coming, send keepalive
|
||||
# Only check stream manager health if it exists
|
||||
stream_healthy = stream_manager.healthy if stream_manager else True
|
||||
|
||||
|
||||
if at_buffer_head and not stream_healthy and consecutive_empty >= 5:
|
||||
# Create a null TS packet as keepalive (188 bytes filled with padding)
|
||||
# This prevents VLC from hitting EOF
|
||||
|
|
@ -343,7 +368,7 @@ def stream_ts(request, channel_id):
|
|||
keepalive_packet[0] = 0x47 # Sync byte
|
||||
keepalive_packet[1] = 0x1F # PID high bits (null packet)
|
||||
keepalive_packet[2] = 0xFF # PID low bits (null packet)
|
||||
|
||||
|
||||
logger.debug(f"[{client_id}] Sending keepalive packet while waiting at buffer head")
|
||||
yield bytes(keepalive_packet)
|
||||
bytes_sent += len(keepalive_packet)
|
||||
|
|
@ -354,12 +379,12 @@ def stream_ts(request, channel_id):
|
|||
# Standard wait
|
||||
sleep_time = min(0.1 * consecutive_empty, 1.0) # Progressive backoff up to 1s
|
||||
time.sleep(sleep_time)
|
||||
|
||||
|
||||
# Log empty reads periodically
|
||||
if empty_reads % 50 == 0:
|
||||
stream_status = "healthy" if (stream_manager and stream_manager.healthy) else "unknown"
|
||||
logger.debug(f"[{client_id}] Waiting for chunks beyond {local_index} (buffer at {buffer.index}, stream: {stream_status})")
|
||||
|
||||
|
||||
# CRITICAL FIX: Check for client disconnect during wait periods
|
||||
# Django/WSGI might not immediately detect disconnections, but we can check periodically
|
||||
if consecutive_empty > 10: # After some number of empty reads
|
||||
|
|
@ -374,7 +399,7 @@ def stream_ts(request, channel_id):
|
|||
# Error reading from connection, likely closed
|
||||
logger.info(f"[{client_id}] Connection error, client likely disconnected")
|
||||
break
|
||||
|
||||
|
||||
# Disconnect after long inactivity
|
||||
# For non-owner workers, we're more lenient with timeout
|
||||
if time.time() - last_yield_time > Config.STREAM_TIMEOUT:
|
||||
|
|
@ -385,25 +410,25 @@ def stream_ts(request, channel_id):
|
|||
# Non-owner worker without data for too long
|
||||
logger.warning(f"[{client_id}] Non-owner worker with no data for {Config.STREAM_TIMEOUT}s, disconnecting")
|
||||
break
|
||||
|
||||
|
||||
# ADD THIS: Check if worker has more recent chunks but still stuck
|
||||
# This can indicate the client is disconnected but we're not detecting it
|
||||
if consecutive_empty > 100 and buffer.index > local_index + 50:
|
||||
logger.warning(f"[{client_id}] Possible ghost client: buffer has advanced {buffer.index - local_index} chunks ahead but client stuck at {local_index}")
|
||||
break
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{client_id}] Stream error: {e}", exc_info=True)
|
||||
finally:
|
||||
# Client cleanup
|
||||
elapsed = time.time() - stream_start_time
|
||||
local_clients = 0
|
||||
|
||||
|
||||
if channel_id in proxy_server.client_managers:
|
||||
local_clients = proxy_server.client_managers[channel_id].remove_client(client_id)
|
||||
total_clients = proxy_server.client_managers[channel_id].get_total_client_count()
|
||||
logger.info(f"[{client_id}] Disconnected after {elapsed:.2f}s, {bytes_sent/1024:.1f}KB in {chunks_sent} chunks (local: {local_clients}, total: {total_clients})")
|
||||
|
||||
|
||||
# If no clients left and we're the owner, schedule shutdown using the config value
|
||||
if local_clients == 0 and proxy_server.am_i_owner(channel_id):
|
||||
logger.info(f"No local clients left for channel {channel_id}, scheduling shutdown")
|
||||
|
|
@ -412,7 +437,7 @@ def stream_ts(request, channel_id):
|
|||
shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 5)
|
||||
logger.info(f"Waiting {shutdown_delay}s before checking if channel should be stopped")
|
||||
time.sleep(shutdown_delay)
|
||||
|
||||
|
||||
# After delay, check global client count
|
||||
if channel_id in proxy_server.client_managers:
|
||||
total = proxy_server.client_managers[channel_id].get_total_client_count()
|
||||
|
|
@ -421,17 +446,17 @@ def stream_ts(request, channel_id):
|
|||
proxy_server.stop_channel(channel_id)
|
||||
else:
|
||||
logger.info(f"Not shutting down channel {channel_id}, {total} clients still connected")
|
||||
|
||||
|
||||
shutdown_thread = threading.Thread(target=delayed_shutdown)
|
||||
shutdown_thread.daemon = True
|
||||
shutdown_thread.start()
|
||||
|
||||
|
||||
response = StreamingHttpResponse(
|
||||
streaming_content=generate(),
|
||||
content_type='video/mp2t'
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in stream_ts: {e}", exc_info=True)
|
||||
return JsonResponse({'error': str(e)}, status=500)
|
||||
|
|
@ -444,16 +469,16 @@ def change_stream(request, channel_id):
|
|||
data = json.loads(request.body)
|
||||
new_url = data.get('url')
|
||||
user_agent = data.get('user_agent')
|
||||
|
||||
|
||||
if not new_url:
|
||||
return JsonResponse({'error': 'No URL provided'}, status=400)
|
||||
|
||||
|
||||
logger.info(f"Attempting to change stream URL for channel {channel_id} to {new_url}")
|
||||
|
||||
|
||||
# Enhanced channel detection
|
||||
in_local_managers = channel_id in proxy_server.stream_managers
|
||||
in_local_buffers = channel_id in proxy_server.stream_buffers
|
||||
|
||||
|
||||
# First check Redis directly before using our wrapper method
|
||||
redis_keys = None
|
||||
if proxy_server.redis_client:
|
||||
|
|
@ -462,17 +487,17 @@ def change_stream(request, channel_id):
|
|||
redis_keys = [k.decode('utf-8') for k in redis_keys] if redis_keys else []
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking Redis keys: {e}")
|
||||
|
||||
|
||||
# Now use our standard check
|
||||
channel_exists = proxy_server.check_if_channel_exists(channel_id)
|
||||
|
||||
|
||||
# Log detailed diagnostics
|
||||
logger.info(f"Channel {channel_id} diagnostics: "
|
||||
f"in_local_managers={in_local_managers}, "
|
||||
f"in_local_buffers={in_local_buffers}, "
|
||||
f"redis_keys_count={len(redis_keys) if redis_keys else 0}, "
|
||||
f"channel_exists={channel_exists}")
|
||||
|
||||
|
||||
if not channel_exists:
|
||||
# If channel doesn't exist but we found Redis keys, force initialize it
|
||||
if redis_keys:
|
||||
|
|
@ -488,17 +513,17 @@ def change_stream(request, channel_id):
|
|||
'redis_keys': redis_keys,
|
||||
}
|
||||
}, status=404)
|
||||
|
||||
|
||||
# Update metadata in Redis regardless of ownership - this ensures URL is updated
|
||||
# even if the owner worker is handling another request
|
||||
if proxy_server.redis_client:
|
||||
try:
|
||||
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
|
||||
|
||||
|
||||
# First check if the key exists and what type it is
|
||||
key_type = proxy_server.redis_client.type(metadata_key).decode('utf-8')
|
||||
logger.debug(f"Redis key {metadata_key} is of type: {key_type}")
|
||||
|
||||
|
||||
# Use the appropriate method based on the key type
|
||||
if key_type == 'hash':
|
||||
proxy_server.redis_client.hset(metadata_key, "url", new_url)
|
||||
|
|
@ -517,21 +542,21 @@ def change_stream(request, channel_id):
|
|||
if user_agent:
|
||||
metadata["user_agent"] = user_agent
|
||||
proxy_server.redis_client.hset(metadata_key, mapping=metadata)
|
||||
|
||||
|
||||
# Set switch request flag to ensure all workers see it
|
||||
switch_key = f"ts_proxy:channel:{channel_id}:switch_request"
|
||||
proxy_server.redis_client.setex(switch_key, 30, new_url) # 30 second TTL
|
||||
|
||||
|
||||
logger.info(f"Updated metadata for channel {channel_id} in Redis")
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating Redis metadata: {e}", exc_info=True)
|
||||
|
||||
|
||||
# If we're the owner, update directly
|
||||
if proxy_server.am_i_owner(channel_id) and channel_id in proxy_server.stream_managers:
|
||||
logger.info(f"This worker is the owner, changing stream URL for channel {channel_id}")
|
||||
manager = proxy_server.stream_managers[channel_id]
|
||||
old_url = manager.url
|
||||
|
||||
|
||||
# Update the stream
|
||||
result = manager.update_url(new_url)
|
||||
logger.info(f"Stream URL changed from {old_url} to {new_url}, result: {result}")
|
||||
|
|
@ -542,7 +567,7 @@ def change_stream(request, channel_id):
|
|||
'owner': True,
|
||||
'worker_id': proxy_server.worker_id
|
||||
})
|
||||
|
||||
|
||||
# If we're not the owner, publish an event for the owner to pick up
|
||||
else:
|
||||
logger.info(f"This worker is not the owner, requesting URL change via Redis PubSub")
|
||||
|
|
@ -555,12 +580,12 @@ def change_stream(request, channel_id):
|
|||
"requester": proxy_server.worker_id,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
|
||||
|
||||
proxy_server.redis_client.publish(
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
f"ts_proxy:events:{channel_id}",
|
||||
json.dumps(switch_request)
|
||||
)
|
||||
|
||||
|
||||
return JsonResponse({
|
||||
'message': 'Stream URL change requested',
|
||||
'channel': channel_id,
|
||||
|
|
@ -568,9 +593,9 @@ def change_stream(request, channel_id):
|
|||
'owner': False,
|
||||
'worker_id': proxy_server.worker_id
|
||||
})
|
||||
|
||||
|
||||
except json.JSONDecodeError:
|
||||
return JsonResponse({'error': 'Invalid JSON'}, status=400)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to change stream: {e}", exc_info=True)
|
||||
return JsonResponse({'error': str(e)}, status=500)
|
||||
return JsonResponse({'error': str(e)}, status=500)
|
||||
|
|
|
|||
|
|
@ -47,6 +47,14 @@ class StreamProfile(models.Model):
|
|||
def __str__(self):
|
||||
return self.profile_name
|
||||
|
||||
def build_command(self, stream_url):
|
||||
cmd = []
|
||||
if self.command == "ffmpeg":
|
||||
cmd = ["ffmpeg", "-i", stream_url] + self.parameters.split() + ["pipe:1"]
|
||||
elif self.command == "streamlink":
|
||||
cmd = ["streamlink", stream_url] + self.parameters.split()
|
||||
|
||||
return cmd
|
||||
|
||||
class CoreSettings(models.Model):
|
||||
key = models.CharField(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue