mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 18:54:58 +00:00
Bug Fix: Fix a bug where searching for an available stream could clear out stream locks for streams that it never acquired a lock for.
885 lines
36 KiB
Python
885 lines
36 KiB
Python
import json
|
|
import threading
|
|
import time
|
|
import random
|
|
import re
|
|
import pathlib
|
|
from django.http import StreamingHttpResponse, JsonResponse, HttpResponseRedirect
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.shortcuts import get_object_or_404
|
|
from apps.proxy.config import TSConfig as Config
|
|
from .server import ProxyServer
|
|
from .channel_status import ChannelStatus
|
|
from .stream_generator import create_stream_generator
|
|
from .utils import get_client_ip
|
|
from .redis_keys import RedisKeys
|
|
import logging
|
|
from apps.channels.models import Channel, Stream
|
|
from apps.m3u.models import M3UAccount, M3UAccountProfile
|
|
from apps.accounts.models import User
|
|
from core.models import UserAgent, CoreSettings, PROXY_PROFILE_NAME
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from rest_framework.response import Response
|
|
from apps.accounts.permissions import (
|
|
IsAdmin,
|
|
permission_classes_by_method,
|
|
permission_classes_by_action,
|
|
)
|
|
from .constants import ChannelState, EventType, StreamType, ChannelMetadataField
|
|
from .config_helper import ConfigHelper
|
|
from .services.channel_service import ChannelService
|
|
from core.utils import send_websocket_update
|
|
from .url_utils import (
|
|
generate_stream_url,
|
|
transform_url,
|
|
get_stream_info_for_switch,
|
|
get_stream_object,
|
|
get_alternate_streams,
|
|
)
|
|
from .utils import get_logger
|
|
from uuid import UUID
|
|
import gevent
|
|
from dispatcharr.utils import network_access_allowed
|
|
|
|
logger = get_logger()
|
|
|
|
|
|
@api_view(["GET"])
|
|
def stream_ts(request, channel_id):
|
|
if not network_access_allowed(request, "STREAMS"):
|
|
return JsonResponse({"error": "Forbidden"}, status=403)
|
|
|
|
"""Stream TS data to client with immediate response and keep-alive packets during initialization"""
|
|
channel = get_stream_object(channel_id)
|
|
|
|
client_user_agent = None
|
|
proxy_server = ProxyServer.get_instance()
|
|
|
|
try:
|
|
# Generate a unique client ID
|
|
client_id = f"client_{int(time.time() * 1000)}_{random.randint(1000, 9999)}"
|
|
client_ip = get_client_ip(request)
|
|
logger.info(f"[{client_id}] Requested stream for channel {channel_id}")
|
|
|
|
# Extract client user agent early
|
|
for header in ["HTTP_USER_AGENT", "User-Agent", "user-agent"]:
|
|
if header in request.META:
|
|
client_user_agent = request.META[header]
|
|
logger.debug(
|
|
f"[{client_id}] Client connected with user agent: {client_user_agent}"
|
|
)
|
|
break
|
|
|
|
# Check if we need to reinitialize the channel
|
|
needs_initialization = True
|
|
channel_state = None
|
|
channel_initializing = False
|
|
|
|
# Get current channel state from Redis if available
|
|
if proxy_server.redis_client:
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
if proxy_server.redis_client.exists(metadata_key):
|
|
metadata = proxy_server.redis_client.hgetall(metadata_key)
|
|
state_field = ChannelMetadataField.STATE.encode("utf-8")
|
|
if state_field in metadata:
|
|
channel_state = metadata[state_field].decode("utf-8")
|
|
|
|
if channel_state:
|
|
# Channel is being initialized or already active - no need for reinitialization
|
|
needs_initialization = False
|
|
logger.debug(
|
|
f"[{client_id}] Channel {channel_id} already in state {channel_state}, skipping initialization"
|
|
)
|
|
|
|
# Special handling for initializing/connecting states
|
|
if channel_state in [
|
|
ChannelState.INITIALIZING,
|
|
ChannelState.CONNECTING,
|
|
]:
|
|
channel_initializing = True
|
|
logger.debug(
|
|
f"[{client_id}] Channel {channel_id} is still initializing, client will wait for completion"
|
|
)
|
|
else:
|
|
# Only check for owner if channel is in a valid state
|
|
owner_field = ChannelMetadataField.OWNER.encode("utf-8")
|
|
if owner_field in metadata:
|
|
owner = metadata[owner_field].decode("utf-8")
|
|
owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat"
|
|
if proxy_server.redis_client.exists(owner_heartbeat_key):
|
|
# Owner is still active, so we don't need to reinitialize
|
|
needs_initialization = False
|
|
logger.debug(
|
|
f"[{client_id}] Channel {channel_id} has active owner {owner}"
|
|
)
|
|
|
|
# Start initialization if needed
|
|
if needs_initialization or not proxy_server.check_if_channel_exists(channel_id):
|
|
logger.info(f"[{client_id}] Starting channel {channel_id} initialization")
|
|
# Force cleanup of any previous instance if in terminal state
|
|
if channel_state in [
|
|
ChannelState.ERROR,
|
|
ChannelState.STOPPING,
|
|
ChannelState.STOPPED,
|
|
]:
|
|
logger.warning(
|
|
f"[{client_id}] Channel {channel_id} in state {channel_state}, forcing cleanup"
|
|
)
|
|
ChannelService.stop_channel(channel_id)
|
|
|
|
# Use fixed retry interval and timeout
|
|
retry_timeout = 3 # 3 seconds total timeout
|
|
retry_interval = 0.1 # 100ms between attempts
|
|
wait_start_time = time.time()
|
|
|
|
stream_url = None
|
|
stream_user_agent = None
|
|
transcode = False
|
|
profile_value = None
|
|
error_reason = None
|
|
attempt = 0
|
|
should_retry = True
|
|
|
|
# Try to get a stream with fixed interval retries
|
|
while should_retry and time.time() - wait_start_time < retry_timeout:
|
|
attempt += 1
|
|
stream_url, stream_user_agent, transcode, profile_value = (
|
|
generate_stream_url(channel_id)
|
|
)
|
|
|
|
if stream_url is not None:
|
|
logger.info(
|
|
f"[{client_id}] Successfully obtained stream for channel {channel_id} after {attempt} attempts"
|
|
)
|
|
break
|
|
|
|
# On first failure, check if the error is retryable
|
|
if attempt == 1:
|
|
_, _, error_reason = channel.get_stream()
|
|
if error_reason and "maximum connection limits" not in error_reason:
|
|
logger.warning(
|
|
f"[{client_id}] Can't retry - error not related to connection limits: {error_reason}"
|
|
)
|
|
should_retry = False
|
|
break
|
|
|
|
# Check if we have time remaining for another sleep cycle
|
|
elapsed_time = time.time() - wait_start_time
|
|
remaining_time = retry_timeout - elapsed_time
|
|
|
|
# If we don't have enough time for the next sleep interval, break
|
|
# but only after we've already made an attempt (the while condition will try one more time)
|
|
if remaining_time <= retry_interval:
|
|
logger.info(
|
|
f"[{client_id}] Insufficient time ({remaining_time:.1f}s) for another sleep cycle, will make one final attempt"
|
|
)
|
|
break
|
|
|
|
# Wait before retrying
|
|
logger.info(
|
|
f"[{client_id}] Waiting {retry_interval*1000:.0f}ms for a connection to become available (attempt {attempt}, {remaining_time:.1f}s remaining)"
|
|
)
|
|
gevent.sleep(retry_interval)
|
|
retry_interval += 0.025 # Increase wait time by 25ms for next attempt
|
|
|
|
# Make one final attempt if we still don't have a stream, should retry, and haven't exceeded timeout
|
|
if stream_url is None and should_retry and time.time() - wait_start_time < retry_timeout:
|
|
attempt += 1
|
|
logger.info(
|
|
f"[{client_id}] Making final attempt {attempt} at timeout boundary"
|
|
)
|
|
stream_url, stream_user_agent, transcode, profile_value = (
|
|
generate_stream_url(channel_id)
|
|
)
|
|
if stream_url is not None:
|
|
logger.info(
|
|
f"[{client_id}] Successfully obtained stream on final attempt for channel {channel_id}"
|
|
)
|
|
|
|
if stream_url is None:
|
|
# Release the channel's stream lock if one was acquired
|
|
# Note: Only call this if get_stream() actually assigned a stream
|
|
# In our case, if stream_url is None, no stream was ever assigned, so don't release
|
|
|
|
# Get the specific error message if available
|
|
wait_duration = f"{int(time.time() - wait_start_time)}s"
|
|
error_msg = (
|
|
error_reason
|
|
if error_reason
|
|
else "No available streams for this channel"
|
|
)
|
|
logger.info(
|
|
f"[{client_id}] Failed to obtain stream after {attempt} attempts over {wait_duration}: {error_msg}"
|
|
)
|
|
return JsonResponse(
|
|
{"error": error_msg, "waited": wait_duration}, status=503
|
|
) # 503 Service Unavailable is appropriate here
|
|
|
|
# Get the stream ID from the channel
|
|
stream_id, m3u_profile_id, _ = channel.get_stream()
|
|
logger.info(
|
|
f"Channel {channel_id} using stream ID {stream_id}, m3u account profile ID {m3u_profile_id}"
|
|
)
|
|
|
|
# Generate transcode command if needed
|
|
stream_profile = channel.get_stream_profile()
|
|
if stream_profile.is_redirect():
|
|
# Validate the stream URL before redirecting
|
|
from .url_utils import (
|
|
validate_stream_url,
|
|
get_alternate_streams,
|
|
get_stream_info_for_switch,
|
|
)
|
|
|
|
# Try initial URL
|
|
logger.info(f"[{client_id}] Validating redirect URL: {stream_url}")
|
|
is_valid, final_url, status_code, message = validate_stream_url(
|
|
stream_url, user_agent=stream_user_agent, timeout=(5, 5)
|
|
)
|
|
|
|
# If first URL doesn't validate, try alternates
|
|
if not is_valid:
|
|
logger.warning(
|
|
f"[{client_id}] Primary stream URL failed validation: {message}"
|
|
)
|
|
|
|
# Track tried streams to avoid loops
|
|
tried_streams = {stream_id}
|
|
|
|
# Get alternate streams
|
|
alternates = get_alternate_streams(channel_id, stream_id)
|
|
|
|
# Try each alternate until one works
|
|
for alt in alternates:
|
|
if alt["stream_id"] in tried_streams:
|
|
continue
|
|
|
|
tried_streams.add(alt["stream_id"])
|
|
|
|
# Get stream info
|
|
alt_info = get_stream_info_for_switch(
|
|
channel_id, alt["stream_id"]
|
|
)
|
|
if "error" in alt_info:
|
|
logger.warning(
|
|
f"[{client_id}] Error getting alternate stream info: {alt_info['error']}"
|
|
)
|
|
continue
|
|
|
|
# Validate the alternate URL
|
|
logger.info(
|
|
f"[{client_id}] Trying alternate stream #{alt['stream_id']}: {alt_info['url']}"
|
|
)
|
|
is_valid, final_url, status_code, message = validate_stream_url(
|
|
alt_info["url"],
|
|
user_agent=alt_info["user_agent"],
|
|
timeout=(5, 5),
|
|
)
|
|
|
|
if is_valid:
|
|
logger.info(
|
|
f"[{client_id}] Alternate stream #{alt['stream_id']} validated successfully"
|
|
)
|
|
break
|
|
else:
|
|
logger.warning(
|
|
f"[{client_id}] Alternate stream #{alt['stream_id']} failed validation: {message}"
|
|
)
|
|
# Release stream lock before redirecting
|
|
channel.release_stream()
|
|
# Final decision based on validation results
|
|
if is_valid:
|
|
logger.info(
|
|
f"[{client_id}] Redirecting to validated URL: {final_url} ({message})"
|
|
)
|
|
return HttpResponseRedirect(final_url)
|
|
else:
|
|
logger.error(
|
|
f"[{client_id}] All available redirect URLs failed validation"
|
|
)
|
|
return JsonResponse(
|
|
{"error": "All available streams failed validation"}, status=502
|
|
) # 502 Bad Gateway
|
|
|
|
# Initialize channel with the stream's user agent (not the client's)
|
|
success = ChannelService.initialize_channel(
|
|
channel_id,
|
|
stream_url,
|
|
stream_user_agent,
|
|
transcode,
|
|
profile_value,
|
|
stream_id,
|
|
m3u_profile_id,
|
|
)
|
|
|
|
if not success:
|
|
return JsonResponse(
|
|
{"error": "Failed to initialize channel"}, status=500
|
|
)
|
|
|
|
# If we're the owner, wait for connection to establish
|
|
if proxy_server.am_i_owner(channel_id):
|
|
manager = proxy_server.stream_managers.get(channel_id)
|
|
if manager:
|
|
wait_start = time.time()
|
|
timeout = ConfigHelper.connection_timeout()
|
|
while not manager.connected:
|
|
if time.time() - wait_start > timeout:
|
|
proxy_server.stop_channel(channel_id)
|
|
return JsonResponse(
|
|
{"error": "Connection timeout"}, status=504
|
|
)
|
|
|
|
# Check if this manager should keep retrying or stop
|
|
if not manager.should_retry():
|
|
# Check channel state in Redis to make a better decision
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
current_state = None
|
|
|
|
if proxy_server.redis_client:
|
|
try:
|
|
state_bytes = proxy_server.redis_client.hget(
|
|
metadata_key, ChannelMetadataField.STATE
|
|
)
|
|
if state_bytes:
|
|
current_state = state_bytes.decode("utf-8")
|
|
logger.debug(
|
|
f"[{client_id}] Current state of channel {channel_id}: {current_state}"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"[{client_id}] Error getting channel state: {e}"
|
|
)
|
|
|
|
# Allow normal transitional states to continue
|
|
if current_state in [
|
|
ChannelState.INITIALIZING,
|
|
ChannelState.CONNECTING,
|
|
]:
|
|
logger.info(
|
|
f"[{client_id}] Channel {channel_id} is in {current_state} state, continuing to wait"
|
|
)
|
|
# Reset wait timer to allow the transition to complete
|
|
wait_start = time.time()
|
|
continue
|
|
|
|
# Check if we're switching URLs
|
|
if (
|
|
hasattr(manager, "url_switching")
|
|
and manager.url_switching
|
|
):
|
|
logger.info(
|
|
f"[{client_id}] Stream manager is currently switching URLs for channel {channel_id}"
|
|
)
|
|
# Reset wait timer to give the switch a chance
|
|
wait_start = time.time()
|
|
continue
|
|
|
|
# If we reach here, we've exhausted retries and the channel isn't in a valid transitional state
|
|
logger.warning(
|
|
f"[{client_id}] Channel {channel_id} failed to connect and is not in transitional state"
|
|
)
|
|
proxy_server.stop_channel(channel_id)
|
|
return JsonResponse(
|
|
{"error": "Failed to connect"}, status=502
|
|
)
|
|
|
|
gevent.sleep(
|
|
0.1
|
|
) # FIXED: Using gevent.sleep instead of time.sleep
|
|
|
|
logger.info(f"[{client_id}] Successfully initialized channel {channel_id}")
|
|
channel_initializing = True
|
|
|
|
# Register client - can do this regardless of initialization state
|
|
# Create local resources if needed
|
|
if (
|
|
channel_id not in proxy_server.stream_buffers
|
|
or channel_id not in proxy_server.client_managers
|
|
):
|
|
logger.debug(
|
|
f"[{client_id}] Channel {channel_id} exists in Redis but not initialized in this worker - initializing now"
|
|
)
|
|
|
|
# Get URL from Redis metadata
|
|
url = None
|
|
stream_user_agent = None # Initialize the variable
|
|
|
|
if proxy_server.redis_client:
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
url_bytes = proxy_server.redis_client.hget(
|
|
metadata_key, ChannelMetadataField.URL
|
|
)
|
|
ua_bytes = proxy_server.redis_client.hget(
|
|
metadata_key, ChannelMetadataField.USER_AGENT
|
|
)
|
|
profile_bytes = proxy_server.redis_client.hget(
|
|
metadata_key, ChannelMetadataField.STREAM_PROFILE
|
|
)
|
|
|
|
if url_bytes:
|
|
url = url_bytes.decode("utf-8")
|
|
if ua_bytes:
|
|
stream_user_agent = ua_bytes.decode("utf-8")
|
|
# Extract transcode setting from Redis
|
|
if profile_bytes:
|
|
profile_str = profile_bytes.decode("utf-8")
|
|
use_transcode = (
|
|
profile_str == PROXY_PROFILE_NAME or profile_str == "None"
|
|
)
|
|
logger.debug(
|
|
f"Using profile '{profile_str}' for channel {channel_id}, transcode={use_transcode}"
|
|
)
|
|
else:
|
|
# Default settings when profile not found in Redis
|
|
profile_str = "None" # Default profile name
|
|
use_transcode = (
|
|
False # Default to direct streaming without transcoding
|
|
)
|
|
logger.debug(
|
|
f"No profile found in Redis for channel {channel_id}, defaulting to transcode={use_transcode}"
|
|
)
|
|
|
|
# Use client_user_agent as fallback if stream_user_agent is None
|
|
success = proxy_server.initialize_channel(
|
|
url, channel_id, stream_user_agent or client_user_agent, use_transcode
|
|
)
|
|
if not success:
|
|
logger.error(
|
|
f"[{client_id}] Failed to initialize channel {channel_id} locally"
|
|
)
|
|
return JsonResponse(
|
|
{"error": "Failed to initialize channel locally"}, status=500
|
|
)
|
|
|
|
logger.info(
|
|
f"[{client_id}] Successfully initialized channel {channel_id} locally"
|
|
)
|
|
|
|
# Register client
|
|
buffer = proxy_server.stream_buffers[channel_id]
|
|
client_manager = proxy_server.client_managers[channel_id]
|
|
client_manager.add_client(client_id, client_ip, client_user_agent)
|
|
logger.info(f"[{client_id}] Client registered with channel {channel_id}")
|
|
|
|
# Create a stream generator for this client
|
|
generate = create_stream_generator(
|
|
channel_id, client_id, client_ip, client_user_agent, channel_initializing
|
|
)
|
|
|
|
# Return the StreamingHttpResponse from the main function
|
|
response = StreamingHttpResponse(
|
|
streaming_content=generate(), content_type="video/mp2t"
|
|
)
|
|
response["Cache-Control"] = "no-cache"
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in stream_ts: {e}", exc_info=True)
|
|
return JsonResponse({"error": str(e)}, status=500)
|
|
|
|
|
|
@api_view(["GET"])
|
|
def stream_xc(request, username, password, channel_id):
|
|
user = get_object_or_404(User, username=username)
|
|
|
|
extension = pathlib.Path(channel_id).suffix
|
|
channel_id = pathlib.Path(channel_id).stem
|
|
|
|
custom_properties = user.custom_properties or {}
|
|
|
|
if "xc_password" not in custom_properties:
|
|
return Response({"error": "Invalid credentials"}, status=401)
|
|
|
|
if custom_properties["xc_password"] != password:
|
|
return Response({"error": "Invalid credentials"}, status=401)
|
|
|
|
print(f"Fetchin channel with ID: {channel_id}")
|
|
if user.user_level < 10:
|
|
user_profile_count = user.channel_profiles.count()
|
|
|
|
# If user has ALL profiles or NO profiles, give unrestricted access
|
|
if user_profile_count == 0:
|
|
# No profile filtering - user sees all channels based on user_level
|
|
filters = {
|
|
"id": int(channel_id),
|
|
"user_level__lte": user.user_level
|
|
}
|
|
channel = Channel.objects.filter(**filters).first()
|
|
else:
|
|
# User has specific limited profiles assigned
|
|
filters = {
|
|
"id": int(channel_id),
|
|
"channelprofilemembership__enabled": True,
|
|
"user_level__lte": user.user_level,
|
|
"channelprofilemembership__channel_profile__in": user.channel_profiles.all()
|
|
}
|
|
channel = Channel.objects.filter(**filters).distinct().first()
|
|
|
|
if not channel:
|
|
return JsonResponse({"error": "Not found"}, status=404)
|
|
else:
|
|
channel = get_object_or_404(Channel, id=channel_id)
|
|
|
|
# @TODO: we've got the file 'type' via extension, support this when we support multiple outputs
|
|
return stream_ts(request._request, str(channel.uuid))
|
|
|
|
|
|
@csrf_exempt
|
|
@api_view(["POST"])
|
|
@permission_classes([IsAdmin])
|
|
def change_stream(request, channel_id):
|
|
"""Change stream URL for existing channel with enhanced diagnostics"""
|
|
proxy_server = ProxyServer.get_instance()
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
new_url = data.get("url")
|
|
user_agent = data.get("user_agent")
|
|
stream_id = data.get("stream_id")
|
|
|
|
# If stream_id is provided, get the URL and user_agent from it
|
|
if stream_id:
|
|
logger.info(
|
|
f"Stream ID {stream_id} provided, looking up stream info for channel {channel_id}"
|
|
)
|
|
stream_info = get_stream_info_for_switch(channel_id, stream_id)
|
|
|
|
if "error" in stream_info:
|
|
return JsonResponse(
|
|
{"error": stream_info["error"], "stream_id": stream_id}, status=404
|
|
)
|
|
|
|
# Use the info from the stream
|
|
new_url = stream_info["url"]
|
|
user_agent = stream_info["user_agent"]
|
|
m3u_profile_id = stream_info.get("m3u_profile_id")
|
|
# Stream ID will be passed to change_stream_url later
|
|
elif not new_url:
|
|
return JsonResponse(
|
|
{"error": "Either url or stream_id must be provided"}, status=400
|
|
)
|
|
|
|
logger.info(
|
|
f"Attempting to change stream for channel {channel_id} to {new_url}"
|
|
)
|
|
|
|
# Use the service layer instead of direct implementation
|
|
# Pass stream_id to ensure proper connection tracking
|
|
result = ChannelService.change_stream_url(
|
|
channel_id, new_url, user_agent, stream_id, m3u_profile_id
|
|
)
|
|
|
|
# Get the stream manager before updating URL
|
|
stream_manager = proxy_server.stream_managers.get(channel_id)
|
|
|
|
# If we have a stream manager, reset its tried_stream_ids when manually changing streams
|
|
if stream_manager:
|
|
# Reset tried streams when manually switching URL via API
|
|
stream_manager.tried_stream_ids = set()
|
|
logger.debug(
|
|
f"Reset tried stream IDs for channel {channel_id} during manual stream change"
|
|
)
|
|
|
|
if result.get("status") == "error":
|
|
return JsonResponse(
|
|
{
|
|
"error": result.get("message", "Unknown error"),
|
|
"diagnostics": result.get("diagnostics", {}),
|
|
},
|
|
status=404,
|
|
)
|
|
|
|
# Format response based on whether it was a direct update or event-based
|
|
response_data = {
|
|
"message": "Stream changed successfully",
|
|
"channel": channel_id,
|
|
"url": new_url,
|
|
"owner": result.get("direct_update", False),
|
|
"worker_id": proxy_server.worker_id,
|
|
}
|
|
|
|
# Include stream_id in response if it was used
|
|
if stream_id:
|
|
response_data["stream_id"] = stream_id
|
|
|
|
return JsonResponse(response_data)
|
|
|
|
except json.JSONDecodeError:
|
|
return JsonResponse({"error": "Invalid JSON"}, status=400)
|
|
except Exception as e:
|
|
logger.error(f"Failed to change stream: {e}", exc_info=True)
|
|
return JsonResponse({"error": str(e)}, status=500)
|
|
|
|
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAdmin])
|
|
def channel_status(request, channel_id=None):
|
|
"""
|
|
Returns status information about channels with detail level based on request:
|
|
- /status/ returns basic summary of all channels
|
|
- /status/{channel_id} returns detailed info about specific channel
|
|
"""
|
|
proxy_server = ProxyServer.get_instance()
|
|
|
|
try:
|
|
# Check if Redis is available
|
|
if not proxy_server.redis_client:
|
|
return JsonResponse({"error": "Redis connection not available"}, status=500)
|
|
|
|
# Handle single channel or all channels
|
|
if channel_id:
|
|
# Detailed info for specific channel
|
|
channel_info = ChannelStatus.get_detailed_channel_info(channel_id)
|
|
if channel_info:
|
|
return JsonResponse(channel_info)
|
|
else:
|
|
return JsonResponse(
|
|
{"error": f"Channel {channel_id} not found"}, status=404
|
|
)
|
|
else:
|
|
# Basic info for all channels
|
|
channel_pattern = "ts_proxy:channel:*:metadata"
|
|
all_channels = []
|
|
|
|
# Extract channel IDs from keys
|
|
cursor = 0
|
|
while True:
|
|
cursor, keys = proxy_server.redis_client.scan(
|
|
cursor, match=channel_pattern
|
|
)
|
|
for key in keys:
|
|
channel_id_match = re.search(
|
|
r"ts_proxy:channel:(.*):metadata", key.decode("utf-8")
|
|
)
|
|
if channel_id_match:
|
|
ch_id = channel_id_match.group(1)
|
|
channel_info = ChannelStatus.get_basic_channel_info(ch_id)
|
|
if channel_info:
|
|
all_channels.append(channel_info)
|
|
|
|
if cursor == 0:
|
|
break
|
|
|
|
# Send WebSocket update with the stats
|
|
# Format it the same way the original Celery task did
|
|
send_websocket_update(
|
|
"updates",
|
|
"update",
|
|
{
|
|
"success": True,
|
|
"type": "channel_stats",
|
|
"stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})
|
|
}
|
|
)
|
|
|
|
return JsonResponse({"channels": all_channels, "count": len(all_channels)})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in channel_status: {e}", exc_info=True)
|
|
return JsonResponse({"error": str(e)}, status=500)
|
|
|
|
|
|
@csrf_exempt
|
|
@api_view(["POST", "DELETE"])
|
|
@permission_classes([IsAdmin])
|
|
def stop_channel(request, channel_id):
|
|
"""Stop a channel and release all associated resources using PubSub events"""
|
|
try:
|
|
logger.info(f"Request to stop channel {channel_id} received")
|
|
|
|
# Use the service layer instead of direct implementation
|
|
result = ChannelService.stop_channel(channel_id)
|
|
|
|
if result.get("status") == "error":
|
|
return JsonResponse(
|
|
{"error": result.get("message", "Unknown error")}, status=404
|
|
)
|
|
|
|
return JsonResponse(
|
|
{
|
|
"message": "Channel stop request sent",
|
|
"channel_id": channel_id,
|
|
"previous_state": result.get("previous_state"),
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to stop channel: {e}", exc_info=True)
|
|
return JsonResponse({"error": str(e)}, status=500)
|
|
|
|
|
|
@csrf_exempt
|
|
@api_view(["POST"])
|
|
@permission_classes([IsAdmin])
|
|
def stop_client(request, channel_id):
|
|
"""Stop a specific client connection using existing client management"""
|
|
try:
|
|
# Parse request body to get client ID
|
|
data = json.loads(request.body)
|
|
client_id = data.get("client_id")
|
|
|
|
if not client_id:
|
|
return JsonResponse({"error": "No client_id provided"}, status=400)
|
|
|
|
# Use the service layer instead of direct implementation
|
|
result = ChannelService.stop_client(channel_id, client_id)
|
|
|
|
if result.get("status") == "error":
|
|
return JsonResponse({"error": result.get("message")}, status=404)
|
|
|
|
return JsonResponse(
|
|
{
|
|
"message": "Client stop request processed",
|
|
"channel_id": channel_id,
|
|
"client_id": client_id,
|
|
"locally_processed": result.get("locally_processed", False),
|
|
}
|
|
)
|
|
|
|
except json.JSONDecodeError:
|
|
return JsonResponse({"error": "Invalid JSON"}, status=400)
|
|
except Exception as e:
|
|
logger.error(f"Failed to stop client: {e}", exc_info=True)
|
|
return JsonResponse({"error": str(e)}, status=500)
|
|
|
|
|
|
@csrf_exempt
|
|
@api_view(["POST"])
|
|
@permission_classes([IsAdmin])
|
|
def next_stream(request, channel_id):
|
|
"""Switch to the next available stream for a channel"""
|
|
proxy_server = ProxyServer.get_instance()
|
|
|
|
try:
|
|
logger.info(
|
|
f"Request to switch to next stream for channel {channel_id} received"
|
|
)
|
|
|
|
# Check if the channel exists
|
|
channel = get_stream_object(channel_id)
|
|
|
|
# First check if channel is active in Redis
|
|
current_stream_id = None
|
|
profile_id = None
|
|
|
|
if proxy_server.redis_client:
|
|
metadata_key = RedisKeys.channel_metadata(channel_id)
|
|
if proxy_server.redis_client.exists(metadata_key):
|
|
# Get current stream ID from Redis
|
|
stream_id_bytes = proxy_server.redis_client.hget(
|
|
metadata_key, ChannelMetadataField.STREAM_ID
|
|
)
|
|
if stream_id_bytes:
|
|
current_stream_id = int(stream_id_bytes.decode("utf-8"))
|
|
logger.info(
|
|
f"Found current stream ID {current_stream_id} in Redis for channel {channel_id}"
|
|
)
|
|
|
|
# Get M3U profile from Redis if available
|
|
profile_id_bytes = proxy_server.redis_client.hget(
|
|
metadata_key, ChannelMetadataField.M3U_PROFILE
|
|
)
|
|
if profile_id_bytes:
|
|
profile_id = int(profile_id_bytes.decode("utf-8"))
|
|
logger.info(
|
|
f"Found M3U profile ID {profile_id} in Redis for channel {channel_id}"
|
|
)
|
|
|
|
if not current_stream_id:
|
|
# Channel is not running
|
|
return JsonResponse(
|
|
{"error": "No current stream found for channel"}, status=404
|
|
)
|
|
|
|
# Get all streams for this channel in their defined order
|
|
streams = list(channel.streams.all().order_by("channelstream__order"))
|
|
|
|
if len(streams) <= 1:
|
|
return JsonResponse(
|
|
{
|
|
"error": "No alternate streams available for this channel",
|
|
"current_stream_id": current_stream_id,
|
|
},
|
|
status=404,
|
|
)
|
|
|
|
# Find the current stream's position in the list
|
|
current_index = None
|
|
for i, stream in enumerate(streams):
|
|
if stream.id == current_stream_id:
|
|
current_index = i
|
|
break
|
|
|
|
if current_index is None:
|
|
logger.warning(
|
|
f"Current stream ID {current_stream_id} not found in channel's streams list"
|
|
)
|
|
# Fall back to the first stream that's not the current one
|
|
next_stream = next((s for s in streams if s.id != current_stream_id), None)
|
|
if not next_stream:
|
|
return JsonResponse(
|
|
{
|
|
"error": "Could not find current stream in channel list",
|
|
"current_stream_id": current_stream_id,
|
|
},
|
|
status=404,
|
|
)
|
|
else:
|
|
# Get the next stream in the rotation (with wrap-around)
|
|
next_index = (current_index + 1) % len(streams)
|
|
next_stream = streams[next_index]
|
|
|
|
next_stream_id = next_stream.id
|
|
logger.info(
|
|
f"Rotating to next stream ID {next_stream_id} for channel {channel_id}"
|
|
)
|
|
|
|
# Get full stream info including URL for the next stream
|
|
stream_info = get_stream_info_for_switch(channel_id, next_stream_id)
|
|
|
|
if "error" in stream_info:
|
|
return JsonResponse(
|
|
{
|
|
"error": stream_info["error"],
|
|
"current_stream_id": current_stream_id,
|
|
"next_stream_id": next_stream_id,
|
|
},
|
|
status=404,
|
|
)
|
|
|
|
# Now use the ChannelService to change the stream URL
|
|
result = ChannelService.change_stream_url(
|
|
channel_id,
|
|
stream_info["url"],
|
|
stream_info["user_agent"],
|
|
next_stream_id, # Pass the stream_id to be stored in Redis
|
|
)
|
|
|
|
if result.get("status") == "error":
|
|
return JsonResponse(
|
|
{
|
|
"error": result.get("message", "Unknown error"),
|
|
"diagnostics": result.get("diagnostics", {}),
|
|
"current_stream_id": current_stream_id,
|
|
"next_stream_id": next_stream_id,
|
|
},
|
|
status=404,
|
|
)
|
|
|
|
# Format success response
|
|
response_data = {
|
|
"message": "Stream switched to next available",
|
|
"channel": channel_id,
|
|
"previous_stream_id": current_stream_id,
|
|
"new_stream_id": next_stream_id,
|
|
"new_url": stream_info["url"],
|
|
"owner": result.get("direct_update", False),
|
|
"worker_id": proxy_server.worker_id,
|
|
}
|
|
|
|
return JsonResponse(response_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to switch to next stream: {e}", exc_info=True)
|
|
return JsonResponse({"error": str(e)}, status=500)
|