Dispatcharr/apps/proxy/ts_proxy/views.py

456 lines
20 KiB
Python

import json
import threading
import time
import random
import re
from django.http import StreamingHttpResponse, JsonResponse, HttpResponseRedirect
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import get_object_or_404
from apps.proxy.config import TSConfig as Config
from .server import ProxyServer
from .channel_status import ChannelStatus
from .stream_generator import create_stream_generator
from .utils import get_client_ip
from .redis_keys import RedisKeys
import logging
from apps.channels.models import Channel, Stream
from apps.m3u.models import M3UAccount, M3UAccountProfile
from core.models import UserAgent, CoreSettings, PROXY_PROFILE_NAME
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from .constants import ChannelState, EventType, StreamType, ChannelMetadataField
from .config_helper import ConfigHelper
from .services.channel_service import ChannelService
from .url_utils import generate_stream_url, transform_url, get_stream_info_for_switch, get_stream_object, get_alternate_streams
from .utils import get_logger
from uuid import UUID
logger = get_logger()
@api_view(['GET'])
def stream_ts(request, channel_id):
"""Stream TS data to client with immediate response and keep-alive packets during initialization"""
channel = get_stream_object(channel_id)
client_user_agent = None
proxy_server = ProxyServer.get_instance()
try:
# Generate a unique client ID
client_id = f"client_{int(time.time() * 1000)}_{random.randint(1000, 9999)}"
client_ip = get_client_ip(request)
logger.info(f"[{client_id}] Requested stream for channel {channel_id}")
# Extract client user agent early
for header in ['HTTP_USER_AGENT', 'User-Agent', 'user-agent']:
if header in request.META:
client_user_agent = request.META[header]
logger.debug(f"[{client_id}] Client connected with user agent: {client_user_agent}")
break
# Check if we need to reinitialize the channel
needs_initialization = True
channel_state = None
# Get current channel state from Redis if available
if proxy_server.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
if proxy_server.redis_client.exists(metadata_key):
metadata = proxy_server.redis_client.hgetall(metadata_key)
state_field = ChannelMetadataField.STATE.encode('utf-8')
if state_field in metadata:
channel_state = metadata[state_field].decode('utf-8')
# Only skip initialization if channel is in a healthy state
valid_states = [ChannelState.ACTIVE, ChannelState.WAITING_FOR_CLIENTS]
if channel_state in valid_states:
# Verify the owner is still active
owner_field = ChannelMetadataField.OWNER.encode('utf-8')
if owner_field in metadata:
owner = metadata[owner_field].decode('utf-8')
owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat"
if proxy_server.redis_client.exists(owner_heartbeat_key):
# Owner is active and channel is in good state
needs_initialization = False
logger.info(f"[{client_id}] Channel {channel_id} in state {channel_state} with active owner {owner}")
# Start initialization if needed
channel_initializing = False
if needs_initialization or not proxy_server.check_if_channel_exists(channel_id):
# Force cleanup of any previous instance
if channel_state in [ChannelState.ERROR, ChannelState.STOPPING, ChannelState.STOPPED]:
logger.warning(f"[{client_id}] Channel {channel_id} in state {channel_state}, forcing cleanup")
proxy_server.stop_channel(channel_id)
# Initialize the channel (but don't wait for completion)
logger.info(f"[{client_id}] Starting channel {channel_id} initialization")
# Use the utility function to get stream URL and settings
stream_url, stream_user_agent, transcode, profile_value = generate_stream_url(channel_id)
if stream_url is None:
return JsonResponse({'error': 'Channel not available'}, status=404)
# Get the stream ID from the channel
stream_id, m3u_profile_id = channel.get_stream()
logger.info(f"Channel {channel_id} using stream ID {stream_id}, m3u account profile ID {m3u_profile_id}")
# Generate transcode command if needed
stream_profile = channel.get_stream_profile()
if stream_profile.is_redirect():
return HttpResponseRedirect(stream_url)
# Initialize channel with the stream's user agent (not the client's)
success = ChannelService.initialize_channel(
channel_id, stream_url, stream_user_agent, transcode, profile_value, stream_id, m3u_profile_id
)
if not success:
return JsonResponse({'error': 'Failed to initialize channel'}, status=500)
# If we're the owner, wait for connection to establish
if proxy_server.am_i_owner(channel_id):
manager = proxy_server.stream_managers.get(channel_id)
if manager:
wait_start = time.time()
timeout = ConfigHelper.connection_timeout()
while not manager.connected:
if time.time() - wait_start > timeout:
proxy_server.stop_channel(channel_id)
return JsonResponse({'error': 'Connection timeout'}, status=504)
if not manager.should_retry():
proxy_server.stop_channel(channel_id)
return JsonResponse({'error': 'Failed to connect'}, status=502)
time.sleep(0.1)
logger.info(f"[{client_id}] Successfully initialized channel {channel_id}")
channel_initializing = True
logger.info(f"[{client_id}] Channel {channel_id} initialization started")
# Register client - can do this regardless of initialization state
# Create local resources if needed
if channel_id not in proxy_server.stream_buffers or channel_id not in proxy_server.client_managers:
logger.debug(f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now")
# Get URL from Redis metadata
url = None
stream_user_agent = None # Initialize the variable
if proxy_server.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
url_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.URL)
ua_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.USER_AGENT)
profile_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.STREAM_PROFILE)
if url_bytes:
url = url_bytes.decode('utf-8')
if ua_bytes:
stream_user_agent = ua_bytes.decode('utf-8')
# Extract transcode setting from Redis
if profile_bytes:
profile_str = profile_bytes.decode('utf-8')
use_transcode = (profile_str == PROXY_PROFILE_NAME or profile_str == 'None')
logger.debug(f"Using profile '{profile_str}' for channel {channel_id}, transcode={use_transcode}")
else:
# Default settings when profile not found in Redis
profile_str = 'None' # Default profile name
use_transcode = False # Default to direct streaming without transcoding
logger.debug(f"No profile found in Redis for channel {channel_id}, defaulting to transcode={use_transcode}")
# Use client_user_agent as fallback if stream_user_agent is None
success = proxy_server.initialize_channel(url, channel_id, stream_user_agent or client_user_agent, use_transcode)
if not success:
logger.error(f"[{client_id}] Failed to initialize channel {channel_id} locally")
return JsonResponse({'error': 'Failed to initialize channel locally'}, status=500)
logger.info(f"[{client_id}] Successfully initialized channel {channel_id} locally")
# Register client
buffer = proxy_server.stream_buffers[channel_id]
client_manager = proxy_server.client_managers[channel_id]
client_manager.add_client(client_id, client_ip, client_user_agent)
logger.info(f"[{client_id}] Client registered with channel {channel_id}")
# Create a stream generator for this client
generate = create_stream_generator(
channel_id, client_id, client_ip, client_user_agent, channel_initializing
)
# Return the StreamingHttpResponse from the main function
response = StreamingHttpResponse(
streaming_content=generate(),
content_type='video/mp2t'
)
response['Cache-Control'] = 'no-cache'
return response
except Exception as e:
logger.error(f"Error in stream_ts: {e}", exc_info=True)
return JsonResponse({'error': str(e)}, status=500)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def change_stream(request, channel_id):
"""Change stream URL for existing channel with enhanced diagnostics"""
proxy_server = ProxyServer.get_instance()
try:
data = json.loads(request.body)
new_url = data.get('url')
user_agent = data.get('user_agent')
if not new_url:
return JsonResponse({'error': 'No URL provided'}, status=400)
logger.info(f"Attempting to change stream URL for channel {channel_id} to {new_url}")
# Use the service layer instead of direct implementation
result = ChannelService.change_stream_url(channel_id, new_url, user_agent)
if result.get('status') == 'error':
return JsonResponse({
'error': result.get('message', 'Unknown error'),
'diagnostics': result.get('diagnostics', {})
}, status=404)
# Format response based on whether it was a direct update or event-based
if result.get('direct_update'):
return JsonResponse({
'message': 'Stream URL updated',
'channel': channel_id,
'url': new_url,
'owner': True,
'worker_id': proxy_server.worker_id
})
else:
return JsonResponse({
'message': 'Stream URL change requested',
'channel': channel_id,
'url': new_url,
'owner': False,
'worker_id': proxy_server.worker_id
})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
logger.error(f"Failed to change stream: {e}", exc_info=True)
return JsonResponse({'error': str(e)}, status=500)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def channel_status(request, channel_id=None):
"""
Returns status information about channels with detail level based on request:
- /status/ returns basic summary of all channels
- /status/{channel_id} returns detailed info about specific channel
"""
proxy_server = ProxyServer.get_instance()
try:
# Check if Redis is available
if not proxy_server.redis_client:
return JsonResponse({'error': 'Redis connection not available'}, status=500)
# Handle single channel or all channels
if channel_id:
# Detailed info for specific channel
channel_info = ChannelStatus.get_detailed_channel_info(channel_id)
if channel_info:
return JsonResponse(channel_info)
else:
return JsonResponse({'error': f'Channel {channel_id} not found'}, status=404)
else:
# Basic info for all channels
channel_pattern = "ts_proxy:channel:*:metadata"
all_channels = []
# Extract channel IDs from keys
cursor = 0
while True:
cursor, keys = proxy_server.redis_client.scan(cursor, match=channel_pattern)
for key in keys:
channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8'))
if channel_id_match:
ch_id = channel_id_match.group(1)
channel_info = ChannelStatus.get_basic_channel_info(ch_id)
if channel_info:
all_channels.append(channel_info)
if cursor == 0:
break
return JsonResponse({'channels': all_channels, 'count': len(all_channels)})
except Exception as e:
logger.error(f"Error in channel_status: {e}", exc_info=True)
return JsonResponse({'error': str(e)}, status=500)
@csrf_exempt
@api_view(['POST', 'DELETE'])
@permission_classes([IsAuthenticated])
def stop_channel(request, channel_id):
"""Stop a channel and release all associated resources using PubSub events"""
try:
logger.info(f"Request to stop channel {channel_id} received")
# Use the service layer instead of direct implementation
result = ChannelService.stop_channel(channel_id)
if result.get('status') == 'error':
return JsonResponse({'error': result.get('message', 'Unknown error')}, status=404)
return JsonResponse({
'message': 'Channel stop request sent',
'channel_id': channel_id,
'previous_state': result.get('previous_state')
})
except Exception as e:
logger.error(f"Failed to stop channel: {e}", exc_info=True)
return JsonResponse({'error': str(e)}, status=500)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def stop_client(request, channel_id):
"""Stop a specific client connection using existing client management"""
try:
# Parse request body to get client ID
data = json.loads(request.body)
client_id = data.get('client_id')
if not client_id:
return JsonResponse({'error': 'No client_id provided'}, status=400)
# Use the service layer instead of direct implementation
result = ChannelService.stop_client(channel_id, client_id)
if result.get('status') == 'error':
return JsonResponse({'error': result.get('message')}, status=404)
return JsonResponse({
'message': 'Client stop request processed',
'channel_id': channel_id,
'client_id': client_id,
'locally_processed': result.get('locally_processed', False)
})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
logger.error(f"Failed to stop client: {e}", exc_info=True)
return JsonResponse({'error': str(e)}, status=500)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def next_stream(request, channel_id):
"""Switch to the next available stream for a channel"""
proxy_server = ProxyServer.get_instance()
try:
logger.info(f"Request to switch to next stream for channel {channel_id} received")
# Check if the channel exists
channel = get_stream_object(channel_id)
# First check if channel is active in Redis
current_stream_id = None
profile_id = None
if proxy_server.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
if proxy_server.redis_client.exists(metadata_key):
# Get current stream ID from Redis
stream_id_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.STREAM_ID)
if stream_id_bytes:
current_stream_id = int(stream_id_bytes.decode('utf-8'))
logger.info(f"Found current stream ID {current_stream_id} in Redis for channel {channel_id}")
# Get M3U profile from Redis if available
profile_id_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.M3U_PROFILE)
if profile_id_bytes:
profile_id = int(profile_id_bytes.decode('utf-8'))
logger.info(f"Found M3U profile ID {profile_id} in Redis for channel {channel_id}")
if not current_stream_id:
# Channel is not running
return JsonResponse({'error': 'No current stream found for channel'}, status=404)
# Get all streams for this channel in their defined order
streams = list(channel.streams.all().order_by('channelstream__order'))
if len(streams) <= 1:
return JsonResponse({
'error': 'No alternate streams available for this channel',
'current_stream_id': current_stream_id
}, status=404)
# Find the current stream's position in the list
current_index = None
for i, stream in enumerate(streams):
if stream.id == current_stream_id:
current_index = i
break
if current_index is None:
logger.warning(f"Current stream ID {current_stream_id} not found in channel's streams list")
# Fall back to the first stream that's not the current one
next_stream = next((s for s in streams if s.id != current_stream_id), None)
if not next_stream:
return JsonResponse({
'error': 'Could not find current stream in channel list',
'current_stream_id': current_stream_id
}, status=404)
else:
# Get the next stream in the rotation (with wrap-around)
next_index = (current_index + 1) % len(streams)
next_stream = streams[next_index]
next_stream_id = next_stream.id
logger.info(f"Rotating to next stream ID {next_stream_id} for channel {channel_id}")
# Get full stream info including URL for the next stream
stream_info = get_stream_info_for_switch(channel_id, next_stream_id)
if 'error' in stream_info:
return JsonResponse({
'error': stream_info['error'],
'current_stream_id': current_stream_id,
'next_stream_id': next_stream_id
}, status=404)
# Now use the ChannelService to change the stream URL
result = ChannelService.change_stream_url(
channel_id,
stream_info['url'],
stream_info['user_agent'],
next_stream_id # Pass the stream_id to be stored in Redis
)
if result.get('status') == 'error':
return JsonResponse({
'error': result.get('message', 'Unknown error'),
'diagnostics': result.get('diagnostics', {}),
'current_stream_id': current_stream_id,
'next_stream_id': next_stream_id
}, status=404)
# Format success response
response_data = {
'message': 'Stream switched to next available',
'channel': channel_id,
'previous_stream_id': current_stream_id,
'new_stream_id': next_stream_id,
'new_url': stream_info['url'],
'owner': result.get('direct_update', False),
'worker_id': proxy_server.worker_id
}
return JsonResponse(response_data)
except Exception as e:
logger.error(f"Failed to switch to next stream: {e}", exc_info=True)
return JsonResponse({'error': str(e)}, status=500)