Dispatcharr/apps/proxy/ts_proxy/views.py

854 lines
35 KiB
Python

import json
import threading
import time
import random
import re
import pathlib
from django.http import StreamingHttpResponse, JsonResponse, HttpResponseRedirect
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import get_object_or_404
from apps.proxy.config import TSConfig as Config
from .server import ProxyServer
from .channel_status import ChannelStatus
from .stream_generator import create_stream_generator
from .utils import get_client_ip
from .redis_keys import RedisKeys
import logging
from apps.channels.models import Channel, Stream
from apps.m3u.models import M3UAccount, M3UAccountProfile
from apps.accounts.models import User
from core.models import UserAgent, CoreSettings, PROXY_PROFILE_NAME
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from apps.accounts.permissions import (
IsAdmin,
permission_classes_by_method,
permission_classes_by_action,
)
from .constants import ChannelState, EventType, StreamType, ChannelMetadataField
from .config_helper import ConfigHelper
from .services.channel_service import ChannelService
from core.utils import send_websocket_update
from .url_utils import (
generate_stream_url,
transform_url,
get_stream_info_for_switch,
get_stream_object,
get_alternate_streams,
)
from .utils import get_logger
from uuid import UUID
import gevent
from dispatcharr.utils import network_access_allowed
logger = get_logger()
@api_view(["GET"])
def stream_ts(request, channel_id):
if not network_access_allowed(request, "STREAMS"):
return JsonResponse({"error": "Forbidden"}, status=403)
"""Stream TS data to client with immediate response and keep-alive packets during initialization"""
channel = get_stream_object(channel_id)
client_user_agent = None
proxy_server = ProxyServer.get_instance()
try:
# Generate a unique client ID
client_id = f"client_{int(time.time() * 1000)}_{random.randint(1000, 9999)}"
client_ip = get_client_ip(request)
logger.info(f"[{client_id}] Requested stream for channel {channel_id}")
# Extract client user agent early
for header in ["HTTP_USER_AGENT", "User-Agent", "user-agent"]:
if header in request.META:
client_user_agent = request.META[header]
logger.debug(
f"[{client_id}] Client connected with user agent: {client_user_agent}"
)
break
# Check if we need to reinitialize the channel
needs_initialization = True
channel_state = None
channel_initializing = False
# Get current channel state from Redis if available
if proxy_server.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
if proxy_server.redis_client.exists(metadata_key):
metadata = proxy_server.redis_client.hgetall(metadata_key)
state_field = ChannelMetadataField.STATE.encode("utf-8")
if state_field in metadata:
channel_state = metadata[state_field].decode("utf-8")
if channel_state:
# Channel is being initialized or already active - no need for reinitialization
needs_initialization = False
logger.debug(
f"[{client_id}] Channel {channel_id} already in state {channel_state}, skipping initialization"
)
# Special handling for initializing/connecting states
if channel_state in [
ChannelState.INITIALIZING,
ChannelState.CONNECTING,
]:
channel_initializing = True
logger.debug(
f"[{client_id}] Channel {channel_id} is still initializing, client will wait for completion"
)
else:
# Only check for owner if channel is in a valid state
owner_field = ChannelMetadataField.OWNER.encode("utf-8")
if owner_field in metadata:
owner = metadata[owner_field].decode("utf-8")
owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat"
if proxy_server.redis_client.exists(owner_heartbeat_key):
# Owner is still active, so we don't need to reinitialize
needs_initialization = False
logger.debug(
f"[{client_id}] Channel {channel_id} has active owner {owner}"
)
# Start initialization if needed
if needs_initialization or not proxy_server.check_if_channel_exists(channel_id):
logger.info(f"[{client_id}] Starting channel {channel_id} initialization")
# Force cleanup of any previous instance if in terminal state
if channel_state in [
ChannelState.ERROR,
ChannelState.STOPPING,
ChannelState.STOPPED,
]:
logger.warning(
f"[{client_id}] Channel {channel_id} in state {channel_state}, forcing cleanup"
)
ChannelService.stop_channel(channel_id)
# Use fixed retry interval and timeout
retry_timeout = 1.5 # 1.5 seconds total timeout
retry_interval = 0.1 # 100ms between attempts
wait_start_time = time.time()
stream_url = None
stream_user_agent = None
transcode = False
profile_value = None
error_reason = None
attempt = 0
# Try to get a stream with fixed interval retries
while time.time() - wait_start_time < retry_timeout:
attempt += 1
stream_url, stream_user_agent, transcode, profile_value = (
generate_stream_url(channel_id)
)
if stream_url is not None:
logger.info(
f"[{client_id}] Successfully obtained stream for channel {channel_id} after {attempt} attempts"
)
break
# If we failed because there are no streams assigned, don't retry
_, _, error_reason = channel.get_stream()
if error_reason and "maximum connection limits" not in error_reason:
logger.warning(
f"[{client_id}] Can't retry - error not related to connection limits: {error_reason}"
)
break
# Wait 100ms before retrying
elapsed_time = time.time() - wait_start_time
remaining_time = retry_timeout - elapsed_time
if remaining_time > retry_interval:
logger.info(
f"[{client_id}] Waiting {retry_interval*1000:.0f}ms for a connection to become available (attempt {attempt}, {remaining_time:.1f}s remaining)"
)
gevent.sleep(retry_interval)
retry_interval += 0.025 # Increase wait time by 25ms for next attempt
if stream_url is None:
# Make sure to release any stream locks that might have been acquired
if hasattr(channel, "streams") and channel.streams.exists():
for stream in channel.streams.all():
try:
stream.release_stream()
logger.info(
f"[{client_id}] Released stream {stream.id} for channel {channel_id}"
)
except Exception as e:
logger.error(f"[{client_id}] Error releasing stream: {e}")
# Get the specific error message if available
wait_duration = f"{int(time.time() - wait_start_time)}s"
error_msg = (
error_reason
if error_reason
else "No available streams for this channel"
)
return JsonResponse(
{"error": error_msg, "waited": wait_duration}, status=503
) # 503 Service Unavailable is appropriate here
# Get the stream ID from the channel
stream_id, m3u_profile_id, _ = channel.get_stream()
logger.info(
f"Channel {channel_id} using stream ID {stream_id}, m3u account profile ID {m3u_profile_id}"
)
# Generate transcode command if needed
stream_profile = channel.get_stream_profile()
if stream_profile.is_redirect():
# Validate the stream URL before redirecting
from .url_utils import (
validate_stream_url,
get_alternate_streams,
get_stream_info_for_switch,
)
# Try initial URL
logger.info(f"[{client_id}] Validating redirect URL: {stream_url}")
is_valid, final_url, status_code, message = validate_stream_url(
stream_url, user_agent=stream_user_agent, timeout=(5, 5)
)
# If first URL doesn't validate, try alternates
if not is_valid:
logger.warning(
f"[{client_id}] Primary stream URL failed validation: {message}"
)
# Track tried streams to avoid loops
tried_streams = {stream_id}
# Get alternate streams
alternates = get_alternate_streams(channel_id, stream_id)
# Try each alternate until one works
for alt in alternates:
if alt["stream_id"] in tried_streams:
continue
tried_streams.add(alt["stream_id"])
# Get stream info
alt_info = get_stream_info_for_switch(
channel_id, alt["stream_id"]
)
if "error" in alt_info:
logger.warning(
f"[{client_id}] Error getting alternate stream info: {alt_info['error']}"
)
continue
# Validate the alternate URL
logger.info(
f"[{client_id}] Trying alternate stream #{alt['stream_id']}: {alt_info['url']}"
)
is_valid, final_url, status_code, message = validate_stream_url(
alt_info["url"],
user_agent=alt_info["user_agent"],
timeout=(5, 5),
)
if is_valid:
logger.info(
f"[{client_id}] Alternate stream #{alt['stream_id']} validated successfully"
)
break
else:
logger.warning(
f"[{client_id}] Alternate stream #{alt['stream_id']} failed validation: {message}"
)
# Release stream lock before redirecting
channel.release_stream()
# Final decision based on validation results
if is_valid:
logger.info(
f"[{client_id}] Redirecting to validated URL: {final_url} ({message})"
)
return HttpResponseRedirect(final_url)
else:
logger.error(
f"[{client_id}] All available redirect URLs failed validation"
)
return JsonResponse(
{"error": "All available streams failed validation"}, status=502
) # 502 Bad Gateway
# Initialize channel with the stream's user agent (not the client's)
success = ChannelService.initialize_channel(
channel_id,
stream_url,
stream_user_agent,
transcode,
profile_value,
stream_id,
m3u_profile_id,
)
if not success:
return JsonResponse(
{"error": "Failed to initialize channel"}, status=500
)
# If we're the owner, wait for connection to establish
if proxy_server.am_i_owner(channel_id):
manager = proxy_server.stream_managers.get(channel_id)
if manager:
wait_start = time.time()
timeout = ConfigHelper.connection_timeout()
while not manager.connected:
if time.time() - wait_start > timeout:
proxy_server.stop_channel(channel_id)
return JsonResponse(
{"error": "Connection timeout"}, status=504
)
# Check if this manager should keep retrying or stop
if not manager.should_retry():
# Check channel state in Redis to make a better decision
metadata_key = RedisKeys.channel_metadata(channel_id)
current_state = None
if proxy_server.redis_client:
try:
state_bytes = proxy_server.redis_client.hget(
metadata_key, ChannelMetadataField.STATE
)
if state_bytes:
current_state = state_bytes.decode("utf-8")
logger.debug(
f"[{client_id}] Current state of channel {channel_id}: {current_state}"
)
except Exception as e:
logger.warning(
f"[{client_id}] Error getting channel state: {e}"
)
# Allow normal transitional states to continue
if current_state in [
ChannelState.INITIALIZING,
ChannelState.CONNECTING,
]:
logger.info(
f"[{client_id}] Channel {channel_id} is in {current_state} state, continuing to wait"
)
# Reset wait timer to allow the transition to complete
wait_start = time.time()
continue
# Check if we're switching URLs
if (
hasattr(manager, "url_switching")
and manager.url_switching
):
logger.info(
f"[{client_id}] Stream manager is currently switching URLs for channel {channel_id}"
)
# Reset wait timer to give the switch a chance
wait_start = time.time()
continue
# If we reach here, we've exhausted retries and the channel isn't in a valid transitional state
logger.warning(
f"[{client_id}] Channel {channel_id} failed to connect and is not in transitional state"
)
proxy_server.stop_channel(channel_id)
return JsonResponse(
{"error": "Failed to connect"}, status=502
)
gevent.sleep(
0.1
) # FIXED: Using gevent.sleep instead of time.sleep
logger.info(f"[{client_id}] Successfully initialized channel {channel_id}")
channel_initializing = True
# Register client - can do this regardless of initialization state
# Create local resources if needed
if (
channel_id not in proxy_server.stream_buffers
or channel_id not in proxy_server.client_managers
):
logger.debug(
f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now"
)
# Get URL from Redis metadata
url = None
stream_user_agent = None # Initialize the variable
if proxy_server.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
url_bytes = proxy_server.redis_client.hget(
metadata_key, ChannelMetadataField.URL
)
ua_bytes = proxy_server.redis_client.hget(
metadata_key, ChannelMetadataField.USER_AGENT
)
profile_bytes = proxy_server.redis_client.hget(
metadata_key, ChannelMetadataField.STREAM_PROFILE
)
if url_bytes:
url = url_bytes.decode("utf-8")
if ua_bytes:
stream_user_agent = ua_bytes.decode("utf-8")
# Extract transcode setting from Redis
if profile_bytes:
profile_str = profile_bytes.decode("utf-8")
use_transcode = (
profile_str == PROXY_PROFILE_NAME or profile_str == "None"
)
logger.debug(
f"Using profile '{profile_str}' for channel {channel_id}, transcode={use_transcode}"
)
else:
# Default settings when profile not found in Redis
profile_str = "None" # Default profile name
use_transcode = (
False # Default to direct streaming without transcoding
)
logger.debug(
f"No profile found in Redis for channel {channel_id}, defaulting to transcode={use_transcode}"
)
# Use client_user_agent as fallback if stream_user_agent is None
success = proxy_server.initialize_channel(
url, channel_id, stream_user_agent or client_user_agent, use_transcode
)
if not success:
logger.error(
f"[{client_id}] Failed to initialize channel {channel_id} locally"
)
return JsonResponse(
{"error": "Failed to initialize channel locally"}, status=500
)
logger.info(
f"[{client_id}] Successfully initialized channel {channel_id} locally"
)
# Register client
buffer = proxy_server.stream_buffers[channel_id]
client_manager = proxy_server.client_managers[channel_id]
client_manager.add_client(client_id, client_ip, client_user_agent)
logger.info(f"[{client_id}] Client registered with channel {channel_id}")
# Create a stream generator for this client
generate = create_stream_generator(
channel_id, client_id, client_ip, client_user_agent, channel_initializing
)
# Return the StreamingHttpResponse from the main function
response = StreamingHttpResponse(
streaming_content=generate(), content_type="video/mp2t"
)
response["Cache-Control"] = "no-cache"
return response
except Exception as e:
logger.error(f"Error in stream_ts: {e}", exc_info=True)
return JsonResponse({"error": str(e)}, status=500)
@api_view(["GET"])
def stream_xc(request, username, password, channel_id):
user = get_object_or_404(User, username=username)
extension = pathlib.Path(channel_id).suffix
channel_id = pathlib.Path(channel_id).stem
custom_properties = user.custom_properties or {}
if "xc_password" not in custom_properties:
return Response({"error": "Invalid credentials"}, status=401)
if custom_properties["xc_password"] != password:
return Response({"error": "Invalid credentials"}, status=401)
print(f"Fetchin channel with ID: {channel_id}")
if user.user_level < 10:
filters = {
"id": int(channel_id),
"channelprofilemembership__enabled": True,
"user_level__lte": user.user_level,
}
if user.channel_profiles.count() > 0:
channel_profiles = user.channel_profiles.all()
filters["channelprofilemembership__channel_profile__in"] = channel_profiles
channel = Channel.objects.filter(**filters).distinct().first()
if not channel:
return JsonResponse({"error": "Not found"}, status=404)
else:
channel = get_object_or_404(Channel, id=channel_id)
# @TODO: we've got the file 'type' via extension, support this when we support multiple outputs
return stream_ts(request._request, channel.uuid)
@csrf_exempt
@api_view(["POST"])
@permission_classes([IsAdmin])
def change_stream(request, channel_id):
"""Change stream URL for existing channel with enhanced diagnostics"""
proxy_server = ProxyServer.get_instance()
try:
data = json.loads(request.body)
new_url = data.get("url")
user_agent = data.get("user_agent")
stream_id = data.get("stream_id")
# If stream_id is provided, get the URL and user_agent from it
if stream_id:
logger.info(
f"Stream ID {stream_id} provided, looking up stream info for channel {channel_id}"
)
stream_info = get_stream_info_for_switch(channel_id, stream_id)
if "error" in stream_info:
return JsonResponse(
{"error": stream_info["error"], "stream_id": stream_id}, status=404
)
# Use the info from the stream
new_url = stream_info["url"]
user_agent = stream_info["user_agent"]
m3u_profile_id = stream_info.get("m3u_profile_id")
# Stream ID will be passed to change_stream_url later
elif not new_url:
return JsonResponse(
{"error": "Either url or stream_id must be provided"}, status=400
)
logger.info(
f"Attempting to change stream for channel {channel_id} to {new_url}"
)
# Use the service layer instead of direct implementation
# Pass stream_id to ensure proper connection tracking
result = ChannelService.change_stream_url(
channel_id, new_url, user_agent, stream_id, m3u_profile_id
)
# Get the stream manager before updating URL
stream_manager = proxy_server.stream_managers.get(channel_id)
# If we have a stream manager, reset its tried_stream_ids when manually changing streams
if stream_manager:
# Reset tried streams when manually switching URL via API
stream_manager.tried_stream_ids = set()
logger.debug(
f"Reset tried stream IDs for channel {channel_id} during manual stream change"
)
if result.get("status") == "error":
return JsonResponse(
{
"error": result.get("message", "Unknown error"),
"diagnostics": result.get("diagnostics", {}),
},
status=404,
)
# Format response based on whether it was a direct update or event-based
response_data = {
"message": "Stream changed successfully",
"channel": channel_id,
"url": new_url,
"owner": result.get("direct_update", False),
"worker_id": proxy_server.worker_id,
}
# Include stream_id in response if it was used
if stream_id:
response_data["stream_id"] = stream_id
return JsonResponse(response_data)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
except Exception as e:
logger.error(f"Failed to change stream: {e}", exc_info=True)
return JsonResponse({"error": str(e)}, status=500)
@api_view(["GET"])
@permission_classes([IsAdmin])
def channel_status(request, channel_id=None):
"""
Returns status information about channels with detail level based on request:
- /status/ returns basic summary of all channels
- /status/{channel_id} returns detailed info about specific channel
"""
proxy_server = ProxyServer.get_instance()
try:
# Check if Redis is available
if not proxy_server.redis_client:
return JsonResponse({"error": "Redis connection not available"}, status=500)
# Handle single channel or all channels
if channel_id:
# Detailed info for specific channel
channel_info = ChannelStatus.get_detailed_channel_info(channel_id)
if channel_info:
return JsonResponse(channel_info)
else:
return JsonResponse(
{"error": f"Channel {channel_id} not found"}, status=404
)
else:
# Basic info for all channels
channel_pattern = "ts_proxy:channel:*:metadata"
all_channels = []
# Extract channel IDs from keys
cursor = 0
while True:
cursor, keys = proxy_server.redis_client.scan(
cursor, match=channel_pattern
)
for key in keys:
channel_id_match = re.search(
r"ts_proxy:channel:(.*):metadata", key.decode("utf-8")
)
if channel_id_match:
ch_id = channel_id_match.group(1)
channel_info = ChannelStatus.get_basic_channel_info(ch_id)
if channel_info:
all_channels.append(channel_info)
if cursor == 0:
break
# Send WebSocket update with the stats
# Format it the same way the original Celery task did
send_websocket_update(
"updates",
"update",
{
"success": True,
"type": "channel_stats",
"stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})
}
)
return JsonResponse({"channels": all_channels, "count": len(all_channels)})
except Exception as e:
logger.error(f"Error in channel_status: {e}", exc_info=True)
return JsonResponse({"error": str(e)}, status=500)
@csrf_exempt
@api_view(["POST", "DELETE"])
@permission_classes([IsAdmin])
def stop_channel(request, channel_id):
"""Stop a channel and release all associated resources using PubSub events"""
try:
logger.info(f"Request to stop channel {channel_id} received")
# Use the service layer instead of direct implementation
result = ChannelService.stop_channel(channel_id)
if result.get("status") == "error":
return JsonResponse(
{"error": result.get("message", "Unknown error")}, status=404
)
return JsonResponse(
{
"message": "Channel stop request sent",
"channel_id": channel_id,
"previous_state": result.get("previous_state"),
}
)
except Exception as e:
logger.error(f"Failed to stop channel: {e}", exc_info=True)
return JsonResponse({"error": str(e)}, status=500)
@csrf_exempt
@api_view(["POST"])
@permission_classes([IsAdmin])
def stop_client(request, channel_id):
"""Stop a specific client connection using existing client management"""
try:
# Parse request body to get client ID
data = json.loads(request.body)
client_id = data.get("client_id")
if not client_id:
return JsonResponse({"error": "No client_id provided"}, status=400)
# Use the service layer instead of direct implementation
result = ChannelService.stop_client(channel_id, client_id)
if result.get("status") == "error":
return JsonResponse({"error": result.get("message")}, status=404)
return JsonResponse(
{
"message": "Client stop request processed",
"channel_id": channel_id,
"client_id": client_id,
"locally_processed": result.get("locally_processed", False),
}
)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
except Exception as e:
logger.error(f"Failed to stop client: {e}", exc_info=True)
return JsonResponse({"error": str(e)}, status=500)
@csrf_exempt
@api_view(["POST"])
@permission_classes([IsAdmin])
def next_stream(request, channel_id):
"""Switch to the next available stream for a channel"""
proxy_server = ProxyServer.get_instance()
try:
logger.info(
f"Request to switch to next stream for channel {channel_id} received"
)
# Check if the channel exists
channel = get_stream_object(channel_id)
# First check if channel is active in Redis
current_stream_id = None
profile_id = None
if proxy_server.redis_client:
metadata_key = RedisKeys.channel_metadata(channel_id)
if proxy_server.redis_client.exists(metadata_key):
# Get current stream ID from Redis
stream_id_bytes = proxy_server.redis_client.hget(
metadata_key, ChannelMetadataField.STREAM_ID
)
if stream_id_bytes:
current_stream_id = int(stream_id_bytes.decode("utf-8"))
logger.info(
f"Found current stream ID {current_stream_id} in Redis for channel {channel_id}"
)
# Get M3U profile from Redis if available
profile_id_bytes = proxy_server.redis_client.hget(
metadata_key, ChannelMetadataField.M3U_PROFILE
)
if profile_id_bytes:
profile_id = int(profile_id_bytes.decode("utf-8"))
logger.info(
f"Found M3U profile ID {profile_id} in Redis for channel {channel_id}"
)
if not current_stream_id:
# Channel is not running
return JsonResponse(
{"error": "No current stream found for channel"}, status=404
)
# Get all streams for this channel in their defined order
streams = list(channel.streams.all().order_by("channelstream__order"))
if len(streams) <= 1:
return JsonResponse(
{
"error": "No alternate streams available for this channel",
"current_stream_id": current_stream_id,
},
status=404,
)
# Find the current stream's position in the list
current_index = None
for i, stream in enumerate(streams):
if stream.id == current_stream_id:
current_index = i
break
if current_index is None:
logger.warning(
f"Current stream ID {current_stream_id} not found in channel's streams list"
)
# Fall back to the first stream that's not the current one
next_stream = next((s for s in streams if s.id != current_stream_id), None)
if not next_stream:
return JsonResponse(
{
"error": "Could not find current stream in channel list",
"current_stream_id": current_stream_id,
},
status=404,
)
else:
# Get the next stream in the rotation (with wrap-around)
next_index = (current_index + 1) % len(streams)
next_stream = streams[next_index]
next_stream_id = next_stream.id
logger.info(
f"Rotating to next stream ID {next_stream_id} for channel {channel_id}"
)
# Get full stream info including URL for the next stream
stream_info = get_stream_info_for_switch(channel_id, next_stream_id)
if "error" in stream_info:
return JsonResponse(
{
"error": stream_info["error"],
"current_stream_id": current_stream_id,
"next_stream_id": next_stream_id,
},
status=404,
)
# Now use the ChannelService to change the stream URL
result = ChannelService.change_stream_url(
channel_id,
stream_info["url"],
stream_info["user_agent"],
next_stream_id, # Pass the stream_id to be stored in Redis
)
if result.get("status") == "error":
return JsonResponse(
{
"error": result.get("message", "Unknown error"),
"diagnostics": result.get("diagnostics", {}),
"current_stream_id": current_stream_id,
"next_stream_id": next_stream_id,
},
status=404,
)
# Format success response
response_data = {
"message": "Stream switched to next available",
"channel": channel_id,
"previous_stream_id": current_stream_id,
"new_stream_id": next_stream_id,
"new_url": stream_info["url"],
"owner": result.get("direct_update", False),
"worker_id": proxy_server.worker_id,
}
return JsonResponse(response_data)
except Exception as e:
logger.error(f"Failed to switch to next stream: {e}", exc_info=True)
return JsonResponse({"error": str(e)}, status=500)