Merge pull request #59 from Dispatcharr/dev

0.3.0
This commit is contained in:
SergeantPanda 2025-04-15 10:09:12 -05:00 committed by GitHub
commit 8bb0487d1b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 262 additions and 19 deletions

View file

@ -133,4 +133,4 @@ Have a question? Want to suggest a feature? Just want to say hi?\
---
### 🚀 *Happy Streaming! The Dispatcharr Team*
### 🚀 *Happy Streaming! The Dispatcharr Team*

View file

@ -3,10 +3,12 @@ from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.permissions import IsAuthenticated
from django.http import JsonResponse, HttpResponseForbidden, HttpResponse
import logging
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from django.shortcuts import get_object_or_404
from apps.channels.models import Channel, ChannelProfile
from django.db import models
from apps.channels.models import Channel, ChannelProfile, Stream
from .models import HDHRDevice
from .serializers import HDHRDeviceSerializer
from django.contrib.auth.decorators import login_required
@ -15,6 +17,9 @@ from django.views import View
from django.utils.decorators import method_decorator
from django.contrib.auth.decorators import login_required
from django.views.decorators.csrf import csrf_exempt
from apps.m3u.models import M3UAccountProfile
# Configure logger
logger = logging.getLogger(__name__)
@login_required
def hdhr_dashboard_view(request):
@ -46,6 +51,36 @@ class DiscoverAPIView(APIView):
base_url = request.build_absolute_uri(f'/{"/".join(uri_parts)}/').rstrip('/')
device = HDHRDevice.objects.first()
# Calculate tuner count from active profiles (excluding default "custom Default" profile)
profiles = M3UAccountProfile.objects.filter(is_active=True).exclude(id=1)
# 1. Check if any profile has unlimited streams (max_streams=0)
has_unlimited = profiles.filter(max_streams=0).exists()
# 2. Calculate tuner count from limited profiles
limited_tuners = 0
if not has_unlimited:
limited_tuners = profiles.filter(max_streams__gt=0).aggregate(
total=models.Sum('max_streams')
).get('total', 0) or 0
# 3. Add custom stream count to tuner count
custom_stream_count = Stream.objects.filter(is_custom=True).count()
logger.debug(f"Found {custom_stream_count} custom streams")
# 4. Calculate final tuner count
if has_unlimited:
# If there are unlimited profiles, start with 10 plus custom streams
tuner_count = 10 + custom_stream_count
else:
# Otherwise use the limited profile sum plus custom streams
tuner_count = limited_tuners + custom_stream_count
# 5. Ensure minimum of 2 tuners
tuner_count = max(2, tuner_count)
logger.debug(f"Calculated tuner count: {tuner_count} (limited profiles: {limited_tuners}, custom streams: {custom_stream_count}, unlimited: {has_unlimited})")
if not device:
data = {
"FriendlyName": "Dispatcharr HDHomeRun",
@ -56,7 +91,7 @@ class DiscoverAPIView(APIView):
"DeviceAuth": "test_auth_token",
"BaseURL": base_url,
"LineupURL": f"{base_url}/lineup.json",
"TunerCount": 10,
"TunerCount": tuner_count,
}
else:
data = {
@ -68,7 +103,7 @@ class DiscoverAPIView(APIView):
"DeviceAuth": "test_auth_token",
"BaseURL": base_url,
"LineupURL": f"{base_url}/lineup.json",
"TunerCount": 10,
"TunerCount": tuner_count,
}
return JsonResponse(data)

View file

@ -52,7 +52,7 @@ class DiscoverAPIView(APIView):
"DeviceAuth": "test_auth_token",
"BaseURL": base_url,
"LineupURL": f"{base_url}/lineup.json",
"TunerCount": "10",
"TunerCount": 10,
}
else:
data = {
@ -64,7 +64,7 @@ class DiscoverAPIView(APIView):
"DeviceAuth": "test_auth_token",
"BaseURL": base_url,
"LineupURL": f"{base_url}/lineup.json",
"TunerCount": "10",
"TunerCount": 10,
}
return JsonResponse(data)

View file

@ -35,7 +35,7 @@ class TSConfig(BaseConfig):
# Streaming settings
TARGET_BITRATE = 8000000 # Target bitrate (8 Mbps)
STREAM_TIMEOUT = 10 # Disconnect after this many seconds of no data
STREAM_TIMEOUT = 20 # Disconnect after this many seconds of no data
HEALTH_CHECK_INTERVAL = 5 # Check stream health every N seconds
# Resource management

View file

@ -571,9 +571,6 @@ class StreamManager:
# Reset retry counter to allow immediate reconnect
self.retry_count = 0
# Reset tried streams when manually switching URL
self.tried_stream_ids = set()
# Also reset buffer position to prevent stale data after URL change
if hasattr(self.buffer, 'reset_buffer_position'):
try:

View file

@ -11,6 +11,7 @@ from apps.m3u.models import M3UAccount, M3UAccountProfile
from core.models import UserAgent, CoreSettings
from .utils import get_logger
from uuid import UUID
import requests
logger = get_logger()
@ -95,14 +96,14 @@ def transform_url(input_url: str, search_pattern: str, replace_pattern: str) ->
str: The transformed URL
"""
try:
logger.info("Executing URL pattern replacement:")
logger.info(f" base URL: {input_url}")
logger.info(f" search: {search_pattern}")
logger.debug("Executing URL pattern replacement:")
logger.debug(f" base URL: {input_url}")
logger.debug(f" search: {search_pattern}")
# Handle backreferences in the replacement pattern
safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', replace_pattern)
logger.info(f" replace: {replace_pattern}")
logger.info(f" safe replace: {safe_replace_pattern}")
logger.debug(f" replace: {replace_pattern}")
logger.debug(f" safe replace: {safe_replace_pattern}")
# Apply the transformation
stream_url = re.sub(search_pattern, safe_replace_pattern, input_url)
@ -268,3 +269,115 @@ def get_alternate_streams(channel_id: str, current_stream_id: Optional[int] = No
except Exception as e:
logger.error(f"Error getting alternate streams for channel {channel_id}: {e}", exc_info=True)
return []
def validate_stream_url(url, user_agent=None, timeout=(5, 5)):
"""
Validate if a stream URL is accessible without downloading the full content.
Args:
url (str): The URL to validate
user_agent (str): User agent to use for the request
timeout (tuple): Connection and read timeout in seconds
Returns:
tuple: (is_valid, final_url, status_code, message)
"""
try:
# Create session with proper headers
session = requests.Session()
headers = {
'User-Agent': user_agent,
'Connection': 'close' # Don't keep connection alive
}
session.headers.update(headers)
# Make HEAD request first as it's faster and doesn't download content
head_response = session.head(
url,
timeout=timeout,
allow_redirects=True
)
# If HEAD not supported, server will return 405 or other error
if 200 <= head_response.status_code < 300:
# HEAD request successful
return True, head_response.url, head_response.status_code, "Valid (HEAD request)"
# Try a GET request with stream=True to avoid downloading all content
get_response = session.get(
url,
stream=True,
timeout=timeout,
allow_redirects=True
)
# IMPORTANT: Check status code first before checking content
if not (200 <= get_response.status_code < 300):
logger.warning(f"Stream validation failed with HTTP status {get_response.status_code}")
return False, get_response.url, get_response.status_code, f"Invalid HTTP status: {get_response.status_code}"
# Only check content if status code is valid
try:
chunk = next(get_response.iter_content(chunk_size=188*10))
is_valid = len(chunk) > 0
message = f"Valid (GET request, received {len(chunk)} bytes)"
except StopIteration:
is_valid = False
message = "Empty response from server"
# Check content type for additional validation
content_type = get_response.headers.get('Content-Type', '').lower()
# Expanded list of valid content types for streaming media
valid_content_types = [
'video/',
'audio/',
'mpegurl',
'octet-stream',
'mp2t',
'mp4',
'mpeg',
'dash+xml',
'application/mp4',
'application/mpeg',
'application/x-mpegurl',
'application/vnd.apple.mpegurl',
'application/ogg',
'm3u',
'playlist',
'binary/',
'rtsp',
'rtmp',
'hls',
'ts'
]
content_type_valid = any(type_str in content_type for type_str in valid_content_types)
# Always consider the stream valid if we got data, regardless of content type
# But add content type info to the message for debugging
if content_type:
content_type_msg = f" (Content-Type: {content_type}"
if content_type_valid:
content_type_msg += ", recognized as valid stream format)"
else:
content_type_msg += ", unrecognized but may still work)"
message += content_type_msg
# Clean up connection
get_response.close()
# If we have content, consider it valid even with unrecognized content type
return is_valid, get_response.url, get_response.status_code, message
except requests.exceptions.Timeout:
return False, url, 0, "Timeout connecting to stream"
except requests.exceptions.TooManyRedirects:
return False, url, 0, "Too many redirects"
except requests.exceptions.RequestException as e:
return False, url, 0, f"Request error: {str(e)}"
except Exception as e:
return False, url, 0, f"Validation error: {str(e)}"
finally:
if 'session' in locals():
session.close()

View file

@ -146,7 +146,63 @@ def stream_ts(request, channel_id):
# Generate transcode command if needed
stream_profile = channel.get_stream_profile()
if stream_profile.is_redirect():
return HttpResponseRedirect(stream_url)
# Validate the stream URL before redirecting
from .url_utils import validate_stream_url, get_alternate_streams, get_stream_info_for_switch
# Try initial URL
logger.info(f"[{client_id}] Validating redirect URL: {stream_url}")
is_valid, final_url, status_code, message = validate_stream_url(
stream_url,
user_agent=stream_user_agent,
timeout=(5, 5)
)
# If first URL doesn't validate, try alternates
if not is_valid:
logger.warning(f"[{client_id}] Primary stream URL failed validation: {message}")
# Track tried streams to avoid loops
tried_streams = {stream_id}
# Get alternate streams
alternates = get_alternate_streams(channel_id, stream_id)
# Try each alternate until one works
for alt in alternates:
if alt['stream_id'] in tried_streams:
continue
tried_streams.add(alt['stream_id'])
# Get stream info
alt_info = get_stream_info_for_switch(channel_id, alt['stream_id'])
if 'error' in alt_info:
logger.warning(f"[{client_id}] Error getting alternate stream info: {alt_info['error']}")
continue
# Validate the alternate URL
logger.info(f"[{client_id}] Trying alternate stream #{alt['stream_id']}: {alt_info['url']}")
is_valid, final_url, status_code, message = validate_stream_url(
alt_info['url'],
user_agent=alt_info['user_agent'],
timeout=(5, 5)
)
if is_valid:
logger.info(f"[{client_id}] Alternate stream #{alt['stream_id']} validated successfully")
break
else:
logger.warning(f"[{client_id}] Alternate stream #{alt['stream_id']} failed validation: {message}")
# Final decision based on validation results
if is_valid:
logger.info(f"[{client_id}] Redirecting to validated URL: {final_url} ({message})")
return HttpResponseRedirect(final_url)
else:
logger.error(f"[{client_id}] All available redirect URLs failed validation")
return JsonResponse({
'error': 'All available streams failed validation'
}, status=502) # 502 Bad Gateway
# Initialize channel with the stream's user agent (not the client's)
success = ChannelService.initialize_channel(
@ -166,9 +222,41 @@ def stream_ts(request, channel_id):
if time.time() - wait_start > timeout:
proxy_server.stop_channel(channel_id)
return JsonResponse({'error': 'Connection timeout'}, status=504)
# Check if this manager should keep retrying or stop
if not manager.should_retry():
# Check channel state in Redis to make a better decision
metadata_key = RedisKeys.channel_metadata(channel_id)
current_state = None
if proxy_server.redis_client:
try:
state_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.STATE)
if state_bytes:
current_state = state_bytes.decode('utf-8')
logger.debug(f"[{client_id}] Current state of channel {channel_id}: {current_state}")
except Exception as e:
logger.warning(f"[{client_id}] Error getting channel state: {e}")
# Allow normal transitional states to continue
if current_state in [ChannelState.INITIALIZING, ChannelState.CONNECTING]:
logger.info(f"[{client_id}] Channel {channel_id} is in {current_state} state, continuing to wait")
# Reset wait timer to allow the transition to complete
wait_start = time.time()
continue
# Check if we're switching URLs
if hasattr(manager, 'url_switching') and manager.url_switching:
logger.info(f"[{client_id}] Stream manager is currently switching URLs for channel {channel_id}")
# Reset wait timer to give the switch a chance
wait_start = time.time()
continue
# If we reach here, we've exhausted retries and the channel isn't in a valid transitional state
logger.warning(f"[{client_id}] Channel {channel_id} failed to connect and is not in transitional state")
proxy_server.stop_channel(channel_id)
return JsonResponse({'error': 'Failed to connect'}, status=502)
time.sleep(0.1)
logger.info(f"[{client_id}] Successfully initialized channel {channel_id}")
@ -256,6 +344,15 @@ def change_stream(request, channel_id):
# Use the service layer instead of direct implementation
result = ChannelService.change_stream_url(channel_id, new_url, user_agent)
# Get the stream manager before updating URL
stream_manager = proxy_server.stream_managers.get(channel_id)
# If we have a stream manager, reset its tried_stream_ids when manually changing streams
if stream_manager:
# Reset tried streams when manually switching URL via API
stream_manager.tried_stream_ids = set()
logger.debug(f"Reset tried stream IDs for channel {channel_id} during manual stream change")
if result.get('status') == 'error':
return JsonResponse({
'error': result.get('message', 'Unknown error'),

View file

@ -25,8 +25,9 @@ die-on-term = true
static-map = /static=/app/static
# Worker management (Optimize for I/O bound tasks)
workers = 2
enable-threads = false
workers = 4
threads = 4
enable-threads = true
# Optimize for streaming
http = 0.0.0.0:5656

View file

@ -2,4 +2,4 @@
Dispatcharr version information.
"""
__version__ = '0.2.1' # Follow semantic versioning (MAJOR.MINOR.PATCH)
__build__ = '0' # Auto-incremented on builds
__build__ = '8' # Auto-incremented on builds