Dispatcharr/apps/channels/api_views.py
Dispatcharr 8c364d3eb8 Fix DVR
Fixed bug where connection wouldn't release
2025-09-04 16:00:01 -05:00

1973 lines
77 KiB
Python

from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.permissions import AllowAny
from rest_framework.decorators import action
from rest_framework.parsers import MultiPartParser, FormParser, JSONParser
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from django.shortcuts import get_object_or_404, get_list_or_404
from django.db import transaction
import os, json, requests, logging
from apps.accounts.permissions import (
Authenticated,
IsAdmin,
IsOwnerOfObject,
permission_classes_by_action,
permission_classes_by_method,
)
from core.models import UserAgent, CoreSettings
from core.utils import RedisClient
from .models import (
Stream,
Channel,
ChannelGroup,
Logo,
ChannelProfile,
ChannelProfileMembership,
Recording,
)
from .serializers import (
StreamSerializer,
ChannelSerializer,
ChannelGroupSerializer,
LogoSerializer,
ChannelProfileMembershipSerializer,
BulkChannelProfileMembershipSerializer,
ChannelProfileSerializer,
RecordingSerializer,
)
from .tasks import match_epg_channels, evaluate_series_rules, evaluate_series_rules_impl
import django_filters
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.filters import SearchFilter, OrderingFilter
from apps.epg.models import EPGData
from apps.vod.models import Movie, Series
from django.db.models import Q
from django.http import StreamingHttpResponse, FileResponse, Http404
from django.utils import timezone
import mimetypes
from rest_framework.pagination import PageNumberPagination
logger = logging.getLogger(__name__)
class OrInFilter(django_filters.Filter):
"""
Custom filter that handles the OR condition instead of AND.
"""
def filter(self, queryset, value):
if value:
# Create a Q object for each value and combine them with OR
query = Q()
for val in value.split(","):
query |= Q(**{self.field_name: val})
return queryset.filter(query)
return queryset
class StreamPagination(PageNumberPagination):
page_size = 50 # Default page size to match frontend default
page_size_query_param = "page_size" # Allow clients to specify page size
max_page_size = 10000 # Prevent excessive page sizes
class StreamFilter(django_filters.FilterSet):
name = django_filters.CharFilter(lookup_expr="icontains")
channel_group_name = OrInFilter(
field_name="channel_group__name", lookup_expr="icontains"
)
m3u_account = django_filters.NumberFilter(field_name="m3u_account__id")
m3u_account_name = django_filters.CharFilter(
field_name="m3u_account__name", lookup_expr="icontains"
)
m3u_account_is_active = django_filters.BooleanFilter(
field_name="m3u_account__is_active"
)
class Meta:
model = Stream
fields = [
"name",
"channel_group_name",
"m3u_account",
"m3u_account_name",
"m3u_account_is_active",
]
# ─────────────────────────────────────────────────────────
# 1) Stream API (CRUD)
# ─────────────────────────────────────────────────────────
class StreamViewSet(viewsets.ModelViewSet):
queryset = Stream.objects.all()
serializer_class = StreamSerializer
pagination_class = StreamPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_class = StreamFilter
search_fields = ["name", "channel_group__name"]
ordering_fields = ["name", "channel_group__name"]
ordering = ["-name"]
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
def get_queryset(self):
qs = super().get_queryset()
# Exclude streams from inactive M3U accounts
qs = qs.exclude(m3u_account__is_active=False)
assigned = self.request.query_params.get("assigned")
if assigned is not None:
qs = qs.filter(channels__id=assigned)
unassigned = self.request.query_params.get("unassigned")
if unassigned == "1":
qs = qs.filter(channels__isnull=True)
channel_group = self.request.query_params.get("channel_group")
if channel_group:
group_names = channel_group.split(",")
qs = qs.filter(channel_group__name__in=group_names)
return qs
def list(self, request, *args, **kwargs):
ids = request.query_params.get("ids", None)
if ids:
ids = ids.split(",")
streams = get_list_or_404(Stream, id__in=ids)
serializer = self.get_serializer(streams, many=True)
return Response(serializer.data)
return super().list(request, *args, **kwargs)
@action(detail=False, methods=["get"], url_path="ids")
def get_ids(self, request, *args, **kwargs):
# Get the filtered queryset
queryset = self.get_queryset()
# Apply filtering, search, and ordering
queryset = self.filter_queryset(queryset)
# Return only the IDs from the queryset
stream_ids = queryset.values_list("id", flat=True)
# Return the response with the list of IDs
return Response(list(stream_ids))
@action(detail=False, methods=["get"], url_path="groups")
def get_groups(self, request, *args, **kwargs):
# Get unique ChannelGroup names that are linked to streams
group_names = (
ChannelGroup.objects.filter(streams__isnull=False)
.order_by("name")
.values_list("name", flat=True)
.distinct()
)
# Return the response with the list of unique group names
return Response(list(group_names))
# ─────────────────────────────────────────────────────────
# 2) Channel Group Management (CRUD)
# ─────────────────────────────────────────────────────────
class ChannelGroupViewSet(viewsets.ModelViewSet):
queryset = ChannelGroup.objects.all()
serializer_class = ChannelGroupSerializer
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
def get_queryset(self):
"""Add annotation for association counts"""
from django.db.models import Count
return ChannelGroup.objects.annotate(
channel_count=Count('channels', distinct=True),
m3u_account_count=Count('m3u_accounts', distinct=True)
)
def update(self, request, *args, **kwargs):
"""Override update to check M3U associations"""
instance = self.get_object()
# Check if group has M3U account associations
if hasattr(instance, 'm3u_account') and instance.m3u_account.exists():
return Response(
{"error": "Cannot edit group with M3U account associations"},
status=status.HTTP_400_BAD_REQUEST
)
return super().update(request, *args, **kwargs)
def partial_update(self, request, *args, **kwargs):
"""Override partial_update to check M3U associations"""
instance = self.get_object()
# Check if group has M3U account associations
if hasattr(instance, 'm3u_account') and instance.m3u_account.exists():
return Response(
{"error": "Cannot edit group with M3U account associations"},
status=status.HTTP_400_BAD_REQUEST
)
return super().partial_update(request, *args, **kwargs)
@swagger_auto_schema(
method="post",
operation_description="Delete all channel groups that have no associations (no channels or M3U accounts)",
responses={200: "Cleanup completed"},
)
@action(detail=False, methods=["post"], url_path="cleanup")
def cleanup_unused_groups(self, request):
"""Delete all channel groups with no channels or M3U account associations"""
from django.db.models import Count
# Find groups with no channels and no M3U account associations
unused_groups = ChannelGroup.objects.annotate(
channel_count=Count('channels', distinct=True),
m3u_account_count=Count('m3u_accounts', distinct=True)
).filter(
channel_count=0,
m3u_account_count=0
)
deleted_count = unused_groups.count()
group_names = list(unused_groups.values_list('name', flat=True))
# Delete the unused groups
unused_groups.delete()
return Response({
"message": f"Successfully deleted {deleted_count} unused channel groups",
"deleted_count": deleted_count,
"deleted_groups": group_names
})
def destroy(self, request, *args, **kwargs):
"""Override destroy to check for associations before deletion"""
instance = self.get_object()
# Check if group has associated channels
if instance.channels.exists():
return Response(
{"error": "Cannot delete group with associated channels"},
status=status.HTTP_400_BAD_REQUEST
)
# Check if group has M3U account associations
if hasattr(instance, 'm3u_account') and instance.m3u_account.exists():
return Response(
{"error": "Cannot delete group with M3U account associations"},
status=status.HTTP_400_BAD_REQUEST
)
return super().destroy(request, *args, **kwargs)
# ─────────────────────────────────────────────────────────
# 3) Channel Management (CRUD)
# ─────────────────────────────────────────────────────────
class ChannelPagination(PageNumberPagination):
page_size = 50 # Default page size to match frontend default
page_size_query_param = "page_size" # Allow clients to specify page size
max_page_size = 10000 # Prevent excessive page sizes
def paginate_queryset(self, queryset, request, view=None):
if not request.query_params.get(self.page_query_param):
return None # disables pagination, returns full queryset
return super().paginate_queryset(queryset, request, view)
class ChannelFilter(django_filters.FilterSet):
name = django_filters.CharFilter(lookup_expr="icontains")
channel_group_name = OrInFilter(
field_name="channel_group__name", lookup_expr="icontains"
)
class Meta:
model = Channel
fields = [
"name",
"channel_group_name",
]
class ChannelViewSet(viewsets.ModelViewSet):
queryset = Channel.objects.all()
serializer_class = ChannelSerializer
pagination_class = ChannelPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_class = ChannelFilter
search_fields = ["name", "channel_group__name"]
ordering_fields = ["channel_number", "name", "channel_group__name"]
ordering = ["-channel_number"]
def get_permissions(self):
if self.action in [
"edit_bulk",
"assign",
"from_stream",
"from_stream_bulk",
"match_epg",
"set_epg",
"batch_set_epg",
]:
return [IsAdmin()]
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
def get_queryset(self):
qs = (
super()
.get_queryset()
.select_related(
"channel_group",
"logo",
"epg_data",
"stream_profile",
)
.prefetch_related("streams")
)
channel_group = self.request.query_params.get("channel_group")
if channel_group:
group_names = channel_group.split(",")
qs = qs.filter(channel_group__name__in=group_names)
if self.request.user.user_level < 10:
qs = qs.filter(user_level__lte=self.request.user.user_level)
return qs
def get_serializer_context(self):
context = super().get_serializer_context()
include_streams = (
self.request.query_params.get("include_streams", "false") == "true"
)
context["include_streams"] = include_streams
return context
@action(detail=False, methods=["patch"], url_path="edit/bulk")
def edit_bulk(self, request):
"""
Bulk edit channels.
Expects a list of channels with their updates.
"""
data = request.data
if not isinstance(data, list):
return Response(
{"error": "Expected a list of channel updates"},
status=status.HTTP_400_BAD_REQUEST,
)
updated_channels = []
errors = []
for channel_data in data:
channel_id = channel_data.get("id")
if not channel_id:
errors.append({"error": "Channel ID is required"})
continue
try:
channel = Channel.objects.get(id=channel_id)
# Handle channel_group_id properly - convert string to integer if needed
if 'channel_group_id' in channel_data:
group_id = channel_data['channel_group_id']
if group_id is not None:
try:
channel_data['channel_group_id'] = int(group_id)
except (ValueError, TypeError):
channel_data['channel_group_id'] = None
# Use the serializer to validate and update
serializer = ChannelSerializer(
channel, data=channel_data, partial=True
)
if serializer.is_valid():
updated_channel = serializer.save()
updated_channels.append(updated_channel)
else:
errors.append({
"channel_id": channel_id,
"errors": serializer.errors
})
except Channel.DoesNotExist:
errors.append({
"channel_id": channel_id,
"error": "Channel not found"
})
except Exception as e:
errors.append({
"channel_id": channel_id,
"error": str(e)
})
if errors:
return Response(
{"errors": errors, "updated_count": len(updated_channels)},
status=status.HTTP_400_BAD_REQUEST,
)
# Serialize the updated channels for response
serialized_channels = ChannelSerializer(updated_channels, many=True).data
return Response({
"message": f"Successfully updated {len(updated_channels)} channels",
"channels": serialized_channels
})
@action(detail=False, methods=["get"], url_path="ids")
def get_ids(self, request, *args, **kwargs):
# Get the filtered queryset
queryset = self.get_queryset()
# Apply filtering, search, and ordering
queryset = self.filter_queryset(queryset)
# Return only the IDs from the queryset
channel_ids = queryset.values_list("id", flat=True)
# Return the response with the list of IDs
return Response(list(channel_ids))
@swagger_auto_schema(
method="post",
operation_description="Auto-assign channel_number in bulk by an ordered list of channel IDs.",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["channel_ids"],
properties={
"starting_number": openapi.Schema(
type=openapi.TYPE_NUMBER,
description="Starting channel number to assign (can be decimal)",
),
"channel_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_INTEGER),
description="Channel IDs to assign",
),
},
),
responses={200: "Channels have been auto-assigned!"},
)
@action(detail=False, methods=["post"], url_path="assign")
def assign(self, request):
with transaction.atomic():
channel_ids = request.data.get("channel_ids", [])
# Ensure starting_number is processed as a float
try:
channel_num = float(request.data.get("starting_number", 1))
except (ValueError, TypeError):
channel_num = 1.0
for channel_id in channel_ids:
Channel.objects.filter(id=channel_id).update(channel_number=channel_num)
channel_num = channel_num + 1
return Response(
{"message": "Channels have been auto-assigned!"}, status=status.HTTP_200_OK
)
@swagger_auto_schema(
method="post",
operation_description=(
"Create a new channel from an existing stream. "
"If 'channel_number' is provided, it will be used (if available); "
"otherwise, the next available channel number is assigned. "
"If 'channel_profile_ids' is provided, the channel will only be added to those profiles. "
"Accepts either a single ID or an array of IDs."
),
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["stream_id"],
properties={
"stream_id": openapi.Schema(
type=openapi.TYPE_INTEGER, description="ID of the stream to link"
),
"channel_number": openapi.Schema(
type=openapi.TYPE_NUMBER,
description="(Optional) Desired channel number. Must not be in use.",
),
"name": openapi.Schema(
type=openapi.TYPE_STRING, description="Desired channel name"
),
"channel_profile_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_INTEGER),
description="(Optional) Channel profile ID(s) to add the channel to. Can be a single ID or array of IDs. If not provided, channel is added to all profiles."
),
},
),
responses={201: ChannelSerializer()},
)
@action(detail=False, methods=["post"], url_path="from-stream")
def from_stream(self, request):
stream_id = request.data.get("stream_id")
if not stream_id:
return Response(
{"error": "Missing stream_id"}, status=status.HTTP_400_BAD_REQUEST
)
stream = get_object_or_404(Stream, pk=stream_id)
channel_group = stream.channel_group
name = request.data.get("name")
if name is None:
name = stream.name
# Check if client provided a channel_number; if not, auto-assign one.
stream_custom_props = stream.custom_properties or {}
channel_number = None
if "tvg-chno" in stream_custom_props:
channel_number = float(stream_custom_props["tvg-chno"])
elif "channel-number" in stream_custom_props:
channel_number = float(stream_custom_props["channel-number"])
elif "num" in stream_custom_props:
channel_number = float(stream_custom_props["num"])
if channel_number is None:
provided_number = request.data.get("channel_number")
if provided_number is None:
channel_number = Channel.get_next_available_channel_number()
else:
try:
channel_number = float(provided_number)
except ValueError:
return Response(
{"error": "channel_number must be an integer."},
status=status.HTTP_400_BAD_REQUEST,
)
# If the provided number is already used, return an error.
if Channel.objects.filter(channel_number=channel_number).exists():
return Response(
{
"error": f"Channel number {channel_number} is already in use. Please choose a different number."
},
status=status.HTTP_400_BAD_REQUEST,
)
# Get the tvc_guide_stationid from custom properties if it exists
tvc_guide_stationid = None
if "tvc-guide-stationid" in stream_custom_props:
tvc_guide_stationid = stream_custom_props["tvc-guide-stationid"]
channel_data = {
"channel_number": channel_number,
"name": name,
"tvg_id": stream.tvg_id,
"tvc_guide_stationid": tvc_guide_stationid,
"streams": [stream_id],
}
# Only add channel_group_id if the stream has a channel group
if channel_group:
channel_data["channel_group_id"] = channel_group.id
if stream.logo_url:
logo, _ = Logo.objects.get_or_create(
url=stream.logo_url, defaults={"name": stream.name or stream.tvg_id}
)
channel_data["logo_id"] = logo.id
# Attempt to find existing EPGs with the same tvg-id
epgs = EPGData.objects.filter(tvg_id=stream.tvg_id)
if epgs:
channel_data["epg_data_id"] = epgs.first().id
serializer = self.get_serializer(data=channel_data)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
channel = serializer.save()
channel.streams.add(stream)
# Handle channel profile membership
channel_profile_ids = request.data.get("channel_profile_ids")
if channel_profile_ids is not None:
# Normalize single ID to array
if not isinstance(channel_profile_ids, list):
channel_profile_ids = [channel_profile_ids]
if channel_profile_ids:
# Add channel only to the specified profiles
try:
channel_profiles = ChannelProfile.objects.filter(id__in=channel_profile_ids)
if len(channel_profiles) != len(channel_profile_ids):
missing_ids = set(channel_profile_ids) - set(channel_profiles.values_list('id', flat=True))
return Response(
{"error": f"Channel profiles with IDs {list(missing_ids)} not found"},
status=status.HTTP_400_BAD_REQUEST,
)
ChannelProfileMembership.objects.bulk_create([
ChannelProfileMembership(
channel_profile=profile,
channel=channel,
enabled=True
)
for profile in channel_profiles
])
except Exception as e:
return Response(
{"error": f"Error creating profile memberships: {str(e)}"},
status=status.HTTP_400_BAD_REQUEST,
)
else:
# Default behavior: add to all profiles
profiles = ChannelProfile.objects.all()
ChannelProfileMembership.objects.bulk_create([
ChannelProfileMembership(channel_profile=profile, channel=channel, enabled=True)
for profile in profiles
])
return Response(serializer.data, status=status.HTTP_201_CREATED)
@swagger_auto_schema(
method="post",
operation_description=(
"Bulk create channels from existing streams. For each object, if 'channel_number' is provided, "
"it is used (if available); otherwise, the next available number is auto-assigned. "
"Each object must include 'stream_id' and 'name'. "
"Supports single profile ID or array of profile IDs in 'channel_profile_ids'."
),
request_body=openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["stream_id"],
properties={
"stream_id": openapi.Schema(
type=openapi.TYPE_INTEGER,
description="ID of the stream to link",
),
"channel_number": openapi.Schema(
type=openapi.TYPE_NUMBER,
description="(Optional) Desired channel number. Must not be in use.",
),
"name": openapi.Schema(
type=openapi.TYPE_STRING, description="Desired channel name"
),
"channel_profile_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_INTEGER),
description="(Optional) Channel profile ID(s) to add the channel to. Can be a single ID or array of IDs."
),
},
),
),
responses={201: "Bulk channels created"},
)
@action(detail=False, methods=["post"], url_path="from-stream/bulk")
def from_stream_bulk(self, request):
data_list = request.data
if not isinstance(data_list, list):
return Response(
{"error": "Expected a list of channel objects"},
status=status.HTTP_400_BAD_REQUEST,
)
created_channels = []
errors = []
# Gather current used numbers once.
used_numbers = set(
Channel.objects.all().values_list("channel_number", flat=True)
)
next_number = 1
def get_auto_number():
nonlocal next_number
while next_number in used_numbers:
next_number += 1
used_numbers.add(next_number)
return next_number
logos_to_create = []
channels_to_create = []
streams_map = []
logo_map = []
profile_map = [] # Track which profiles each channel should be added to
for item in data_list:
stream_id = item.get("stream_id")
if not stream_id:
errors.append(
{
"item": item,
"error": "Missing required field: stream_id is required.",
}
)
continue
try:
stream = get_object_or_404(Stream, pk=stream_id)
except Exception as e:
errors.append({"item": item, "error": str(e)})
continue
name = item.get("name")
if name is None:
name = stream.name
channel_group = stream.channel_group
stream_custom_props = stream.custom_properties or {}
channel_number = None
if "tvg-chno" in stream_custom_props:
channel_number = float(stream_custom_props["tvg-chno"])
elif "channel-number" in stream_custom_props:
channel_number = float(stream_custom_props["channel-number"])
elif "num" in stream_custom_props:
channel_number = float(stream_custom_props["num"])
# Get the tvc_guide_stationid from custom properties if it exists
tvc_guide_stationid = None
if "tvc-guide-stationid" in stream_custom_props:
tvc_guide_stationid = stream_custom_props["tvc-guide-stationid"]
# Determine channel number: if provided, use it (if free); else auto assign.
if channel_number is None:
provided_number = item.get("channel_number")
if provided_number is None:
channel_number = get_auto_number()
else:
try:
channel_number = float(provided_number)
except ValueError:
errors.append(
{
"item": item,
"error": "channel_number must be a number.",
}
)
continue
if (
channel_number in used_numbers
or Channel.objects.filter(
channel_number=channel_number
).exists()
):
errors.append(
{
"item": item,
"error": f"Channel number {channel_number} is already in use.",
}
)
continue
used_numbers.add(channel_number)
channel_data = {
"channel_number": channel_number,
"name": name,
"tvc_guide_stationid": tvc_guide_stationid,
"tvg_id": stream.tvg_id,
}
# Only add channel_group_id if the stream has a channel group
if channel_group:
channel_data["channel_group_id"] = channel_group.id
# Attempt to find existing EPGs with the same tvg-id
epgs = EPGData.objects.filter(tvg_id=stream.tvg_id)
if epgs:
channel_data["epg_data_id"] = epgs.first().id
serializer = self.get_serializer(data=channel_data)
if serializer.is_valid():
validated_data = serializer.validated_data
channel = Channel(**validated_data)
channels_to_create.append(channel)
streams_map.append([stream_id])
# Store which profiles this channel should be added to - normalize to array
channel_profile_ids = item.get("channel_profile_ids")
if channel_profile_ids is not None:
# Normalize single ID to array
if not isinstance(channel_profile_ids, list):
channel_profile_ids = [channel_profile_ids]
profile_map.append(channel_profile_ids)
if stream.logo_url:
logos_to_create.append(
Logo(
url=stream.logo_url,
name=stream.name or stream.tvg_id,
)
)
logo_map.append(stream.logo_url)
else:
logo_map.append(None)
else:
errors.append({"item": item, "error": serializer.errors})
if logos_to_create:
Logo.objects.bulk_create(logos_to_create, ignore_conflicts=True)
channel_logos = {
logo.url: logo
for logo in Logo.objects.filter(
url__in=[url for url in logo_map if url is not None]
)
}
# Get all profiles for default assignment
all_profiles = ChannelProfile.objects.all()
channel_profile_memberships = []
if channels_to_create:
with transaction.atomic():
created_channels = Channel.objects.bulk_create(channels_to_create)
update = []
for channel, stream_ids, logo_url, channel_profile_ids in zip(
created_channels, streams_map, logo_map, profile_map
):
if logo_url:
channel.logo = channel_logos[logo_url]
update.append(channel)
# Handle channel profile membership based on channel_profile_ids
if channel_profile_ids:
# Add channel only to the specified profiles
try:
specific_profiles = ChannelProfile.objects.filter(id__in=channel_profile_ids)
channel_profile_memberships.extend([
ChannelProfileMembership(
channel_profile=profile,
channel=channel,
enabled=True
)
for profile in specific_profiles
])
except Exception:
# If profiles don't exist, add to all profiles as fallback
channel_profile_memberships.extend([
ChannelProfileMembership(
channel_profile=profile,
channel=channel,
enabled=True
)
for profile in all_profiles
])
else:
# Default behavior: add to all profiles
channel_profile_memberships.extend([
ChannelProfileMembership(
channel_profile=profile,
channel=channel,
enabled=True
)
for profile in all_profiles
])
# Bulk create profile memberships
if channel_profile_memberships:
ChannelProfileMembership.objects.bulk_create(
channel_profile_memberships
)
# Update logos
if update:
Channel.objects.bulk_update(update, ["logo"])
# Set stream relationships
for channel, stream_ids in zip(created_channels, streams_map):
channel.streams.set(stream_ids)
response_data = {"created": ChannelSerializer(created_channels, many=True).data}
if errors:
response_data["errors"] = errors
return Response(response_data, status=status.HTTP_201_CREATED)
# ─────────────────────────────────────────────────────────
# 6) EPG Fuzzy Matching
# ─────────────────────────────────────────────────────────
@swagger_auto_schema(
method="post",
operation_description="Kick off a Celery task that tries to fuzzy-match channels with EPG data.",
responses={202: "EPG matching task initiated"},
)
@action(detail=False, methods=["post"], url_path="match-epg")
def match_epg(self, request):
match_epg_channels.delay()
return Response(
{"message": "EPG matching task initiated."}, status=status.HTTP_202_ACCEPTED
)
# ─────────────────────────────────────────────────────────
# 7) Set EPG and Refresh
# ─────────────────────────────────────────────────────────
@swagger_auto_schema(
method="post",
operation_description="Set EPG data for a channel and refresh program data",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["epg_data_id"],
properties={
"epg_data_id": openapi.Schema(
type=openapi.TYPE_INTEGER, description="EPG data ID to link"
)
},
),
responses={200: "EPG data linked and refresh triggered"},
)
@action(detail=True, methods=["post"], url_path="set-epg")
def set_epg(self, request, pk=None):
channel = self.get_object()
epg_data_id = request.data.get("epg_data_id")
# Handle removing EPG link
if epg_data_id in (None, "", "0", 0):
channel.epg_data = None
channel.save(update_fields=["epg_data"])
return Response(
{"message": f"EPG data removed from channel {channel.name}"}
)
try:
# Get the EPG data object
from apps.epg.models import EPGData
epg_data = EPGData.objects.get(pk=epg_data_id)
# Set the EPG data and save
channel.epg_data = epg_data
channel.save(update_fields=["epg_data"])
# Explicitly trigger program refresh for this EPG
from apps.epg.tasks import parse_programs_for_tvg_id
task_result = parse_programs_for_tvg_id.delay(epg_data.id)
# Prepare response with task status info
status_message = "EPG refresh queued"
if task_result.result == "Task already running":
status_message = "EPG refresh already in progress"
return Response(
{
"message": f"EPG data set to {epg_data.tvg_id} for channel {channel.name}. {status_message}.",
"channel": self.get_serializer(channel).data,
"task_status": status_message,
}
)
except Exception as e:
return Response({"error": str(e)}, status=400)
@swagger_auto_schema(
method="post",
operation_description="Associate multiple channels with EPG data without triggering a full refresh",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"associations": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"channel_id": openapi.Schema(type=openapi.TYPE_INTEGER),
"epg_data_id": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
)
},
),
responses={200: "EPG data linked for multiple channels"},
)
@action(detail=False, methods=["post"], url_path="batch-set-epg")
def batch_set_epg(self, request):
"""Efficiently associate multiple channels with EPG data at once."""
associations = request.data.get("associations", [])
channels_updated = 0
programs_refreshed = 0
unique_epg_ids = set()
for assoc in associations:
channel_id = assoc.get("channel_id")
epg_data_id = assoc.get("epg_data_id")
if not channel_id:
continue
try:
# Get the channel
channel = Channel.objects.get(id=channel_id)
# Set the EPG data
channel.epg_data_id = epg_data_id
channel.save(update_fields=["epg_data"])
channels_updated += 1
# Track unique EPG data IDs
if epg_data_id:
unique_epg_ids.add(epg_data_id)
except Channel.DoesNotExist:
logger.error(f"Channel with ID {channel_id} not found")
except Exception as e:
logger.error(
f"Error setting EPG data for channel {channel_id}: {str(e)}"
)
# Trigger program refresh for unique EPG data IDs
from apps.epg.tasks import parse_programs_for_tvg_id
for epg_id in unique_epg_ids:
parse_programs_for_tvg_id.delay(epg_id)
programs_refreshed += 1
return Response(
{
"success": True,
"channels_updated": channels_updated,
"programs_refreshed": programs_refreshed,
}
)
# ─────────────────────────────────────────────────────────
# 4) Bulk Delete Streams
# ─────────────────────────────────────────────────────────
class BulkDeleteStreamsAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
@swagger_auto_schema(
operation_description="Bulk delete streams by ID",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["stream_ids"],
properties={
"stream_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_INTEGER),
description="Stream IDs to delete",
)
},
),
responses={204: "Streams deleted"},
)
def delete(self, request, *args, **kwargs):
stream_ids = request.data.get("stream_ids", [])
Stream.objects.filter(id__in=stream_ids).delete()
return Response(
{"message": "Streams deleted successfully!"},
status=status.HTTP_204_NO_CONTENT,
)
# ─────────────────────────────────────────────────────────
# 5) Bulk Delete Channels
# ─────────────────────────────────────────────────────────
class BulkDeleteChannelsAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
@swagger_auto_schema(
operation_description="Bulk delete channels by ID",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["channel_ids"],
properties={
"channel_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_INTEGER),
description="Channel IDs to delete",
)
},
),
responses={204: "Channels deleted"},
)
def delete(self, request):
channel_ids = request.data.get("channel_ids", [])
Channel.objects.filter(id__in=channel_ids).delete()
return Response(
{"message": "Channels deleted"}, status=status.HTTP_204_NO_CONTENT
)
# ─────────────────────────────────────────────────────────
# 6) Bulk Delete Logos
# ─────────────────────────────────────────────────────────
class BulkDeleteLogosAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
@swagger_auto_schema(
operation_description="Bulk delete logos by ID",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["logo_ids"],
properties={
"logo_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_INTEGER),
description="Logo IDs to delete",
)
},
),
responses={204: "Logos deleted"},
)
def delete(self, request):
logo_ids = request.data.get("logo_ids", [])
delete_files = request.data.get("delete_files", False)
# Get logos and their usage info before deletion
logos_to_delete = Logo.objects.filter(id__in=logo_ids)
total_channels_affected = 0
local_files_deleted = 0
for logo in logos_to_delete:
# Handle file deletion for local files
if delete_files and logo.url and logo.url.startswith('/data/logos'):
try:
if os.path.exists(logo.url):
os.remove(logo.url)
local_files_deleted += 1
logger.info(f"Deleted local logo file: {logo.url}")
except Exception as e:
logger.error(f"Failed to delete logo file {logo.url}: {str(e)}")
return Response(
{"error": f"Failed to delete logo file {logo.url}: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
if logo.channels.exists():
channel_count = logo.channels.count()
total_channels_affected += channel_count
# Remove logo from channels
logo.channels.update(logo=None)
logger.info(f"Removed logo {logo.name} from {channel_count} channels before deletion")
# Delete logos
deleted_count = logos_to_delete.delete()[0]
message = f"Successfully deleted {deleted_count} logos"
if total_channels_affected > 0:
message += f" and removed them from {total_channels_affected} channels"
if local_files_deleted > 0:
message += f" and deleted {local_files_deleted} local files"
return Response(
{"message": message},
status=status.HTTP_204_NO_CONTENT
)
class CleanupUnusedLogosAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
@swagger_auto_schema(
operation_description="Delete all logos that are not used by any channels, movies, or series",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"delete_files": openapi.Schema(
type=openapi.TYPE_BOOLEAN,
description="Whether to delete local logo files from disk",
default=False
)
},
),
responses={200: "Cleanup completed"},
)
def post(self, request):
"""Delete all logos with no channel, movie, or series associations"""
delete_files = request.data.get("delete_files", False)
# Find logos that are not used by channels, movies, or series
filter_conditions = Q(channels__isnull=True)
# Add VOD conditions if models are available
try:
filter_conditions &= Q(movie__isnull=True)
except:
pass
try:
filter_conditions &= Q(series__isnull=True)
except:
pass
unused_logos = Logo.objects.filter(filter_conditions)
deleted_count = unused_logos.count()
logo_names = list(unused_logos.values_list('name', flat=True))
local_files_deleted = 0
# Handle file deletion for local files if requested
if delete_files:
for logo in unused_logos:
if logo.url and logo.url.startswith('/data/logos'):
try:
if os.path.exists(logo.url):
os.remove(logo.url)
local_files_deleted += 1
logger.info(f"Deleted local logo file: {logo.url}")
except Exception as e:
logger.error(f"Failed to delete logo file {logo.url}: {str(e)}")
return Response(
{"error": f"Failed to delete logo file {logo.url}: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Delete the unused logos
unused_logos.delete()
message = f"Successfully deleted {deleted_count} unused logos"
if local_files_deleted > 0:
message += f" and deleted {local_files_deleted} local files"
return Response({
"message": message,
"deleted_count": deleted_count,
"deleted_logos": logo_names,
"local_files_deleted": local_files_deleted
})
class LogoPagination(PageNumberPagination):
page_size = 50 # Default page size to match frontend default
page_size_query_param = "page_size" # Allow clients to specify page size
max_page_size = 1000 # Prevent excessive page sizes
def paginate_queryset(self, queryset, request, view=None):
# Check if pagination should be disabled for specific requests
if request.query_params.get('no_pagination') == 'true':
return None # disables pagination, returns full queryset
return super().paginate_queryset(queryset, request, view)
class LogoViewSet(viewsets.ModelViewSet):
queryset = Logo.objects.all()
serializer_class = LogoSerializer
pagination_class = LogoPagination
parser_classes = (MultiPartParser, FormParser, JSONParser)
def get_permissions(self):
if self.action in ["upload"]:
return [IsAdmin()]
if self.action in ["cache"]:
return [AllowAny()]
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
def get_queryset(self):
"""Optimize queryset with prefetch and add filtering"""
# Start with basic prefetch for channels
queryset = Logo.objects.prefetch_related('channels').order_by('name')
# Try to prefetch VOD relations if available
try:
queryset = queryset.prefetch_related('movie', 'series')
except:
# VOD app might not be available, continue without VOD prefetch
pass
# Filter by specific IDs
ids = self.request.query_params.getlist('ids')
if ids:
try:
# Convert string IDs to integers and filter
id_list = [int(id_str) for id_str in ids if id_str.isdigit()]
if id_list:
queryset = queryset.filter(id__in=id_list)
except (ValueError, TypeError):
pass # Invalid IDs, return empty queryset
queryset = Logo.objects.none()
# Filter by usage - now includes VOD content
used_filter = self.request.query_params.get('used', None)
if used_filter == 'true':
# Logo is used if it has any channels, movies, or series
filter_conditions = Q(channels__isnull=False)
# Add VOD conditions if models are available
try:
filter_conditions |= Q(movie__isnull=False)
except:
pass
try:
filter_conditions |= Q(series__isnull=False)
except:
pass
queryset = queryset.filter(filter_conditions).distinct()
elif used_filter == 'false':
# Logo is unused if it has no channels, movies, or series
filter_conditions = Q(channels__isnull=True)
# Add VOD conditions if models are available
try:
filter_conditions &= Q(movie__isnull=True)
except:
pass
try:
filter_conditions &= Q(series__isnull=True)
except:
pass
queryset = queryset.filter(filter_conditions)
# Filter for channel assignment (unused + channel-used, exclude VOD-only)
channel_assignable = self.request.query_params.get('channel_assignable', None)
if channel_assignable == 'true':
# Include logos that are either:
# 1. Completely unused, OR
# 2. Used by channels (but may also be used by VOD)
# Exclude logos that are ONLY used by VOD content
unused_condition = Q(channels__isnull=True)
channel_used_condition = Q(channels__isnull=False)
# Add VOD conditions if models are available
try:
unused_condition &= Q(movie__isnull=True) & Q(series__isnull=True)
except:
pass
# Combine: unused OR used by channels
filter_conditions = unused_condition | channel_used_condition
queryset = queryset.filter(filter_conditions).distinct()
# Filter by name
name_filter = self.request.query_params.get('name', None)
if name_filter:
queryset = queryset.filter(name__icontains=name_filter)
return queryset
def create(self, request, *args, **kwargs):
"""Create a new logo entry"""
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
logo = serializer.save()
return Response(self.get_serializer(logo).data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def update(self, request, *args, **kwargs):
"""Update an existing logo"""
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
if serializer.is_valid():
logo = serializer.save()
return Response(self.get_serializer(logo).data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def destroy(self, request, *args, **kwargs):
"""Delete a logo and remove it from any channels using it"""
logo = self.get_object()
delete_file = request.query_params.get('delete_file', 'false').lower() == 'true'
# Check if it's a local file that should be deleted
if delete_file and logo.url and logo.url.startswith('/data/logos'):
try:
if os.path.exists(logo.url):
os.remove(logo.url)
logger.info(f"Deleted local logo file: {logo.url}")
except Exception as e:
logger.error(f"Failed to delete logo file {logo.url}: {str(e)}")
return Response(
{"error": f"Failed to delete logo file: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Instead of preventing deletion, remove the logo from channels
if logo.channels.exists():
channel_count = logo.channels.count()
logo.channels.update(logo=None)
logger.info(f"Removed logo {logo.name} from {channel_count} channels before deletion")
return super().destroy(request, *args, **kwargs)
@action(detail=False, methods=["post"])
def upload(self, request):
if "file" not in request.FILES:
return Response(
{"error": "No file uploaded"}, status=status.HTTP_400_BAD_REQUEST
)
file = request.FILES["file"]
# Validate file
try:
from dispatcharr.utils import validate_logo_file
validate_logo_file(file)
except Exception as e:
return Response(
{"error": str(e)}, status=status.HTTP_400_BAD_REQUEST
)
file_name = file.name
file_path = os.path.join("/data/logos", file_name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb+") as destination:
for chunk in file.chunks():
destination.write(chunk)
# Mark file as processed in Redis to prevent file scanner notifications
try:
redis_client = RedisClient.get_client()
if redis_client:
# Use the same key format as the file scanner
redis_key = f"processed_file:{file_path}"
redis_client.setex(redis_key, 60 * 60 * 24 * 3, "api_upload") # 3 day TTL
logger.debug(f"Marked uploaded logo file as processed in Redis: {file_path}")
except Exception as e:
logger.warning(f"Failed to mark logo file as processed in Redis: {e}")
# Get custom name from request data, fallback to filename
custom_name = request.data.get('name', '').strip()
logo_name = custom_name if custom_name else file_name
logo, _ = Logo.objects.get_or_create(
url=file_path,
defaults={
"name": logo_name,
},
)
# Use get_serializer to ensure proper context
serializer = self.get_serializer(logo)
return Response(
serializer.data,
status=status.HTTP_201_CREATED,
)
@action(detail=True, methods=["get"], permission_classes=[AllowAny])
def cache(self, request, pk=None):
"""Streams the logo file, whether it's local or remote."""
logo = self.get_object()
logo_url = logo.url
if logo_url.startswith("/data"): # Local file
if not os.path.exists(logo_url):
raise Http404("Image not found")
# Get proper mime type (first item of the tuple)
content_type, _ = mimetypes.guess_type(logo_url)
if not content_type:
content_type = "image/jpeg" # Default to a common image type
# Use context manager and set Content-Disposition to inline
response = StreamingHttpResponse(
open(logo_url, "rb"), content_type=content_type
)
response["Content-Disposition"] = 'inline; filename="{}"'.format(
os.path.basename(logo_url)
)
return response
else: # Remote image
try:
# Get the default user agent
try:
default_user_agent_id = CoreSettings.get_default_user_agent_id()
user_agent_obj = UserAgent.objects.get(id=int(default_user_agent_id))
user_agent = user_agent_obj.user_agent
except (CoreSettings.DoesNotExist, UserAgent.DoesNotExist, ValueError):
# Fallback to hardcoded if default not found
user_agent = 'Dispatcharr/1.0'
# Add proper timeouts to prevent hanging
remote_response = requests.get(
logo_url,
stream=True,
timeout=(3, 5), # (connect_timeout, read_timeout)
headers={'User-Agent': user_agent}
)
if remote_response.status_code == 200:
# Try to get content type from response headers first
content_type = remote_response.headers.get("Content-Type")
# If no content type in headers or it's empty, guess based on URL
if not content_type:
content_type, _ = mimetypes.guess_type(logo_url)
# If still no content type, default to common image type
if not content_type:
content_type = "image/jpeg"
response = StreamingHttpResponse(
remote_response.iter_content(chunk_size=8192),
content_type=content_type,
)
response["Content-Disposition"] = 'inline; filename="{}"'.format(
os.path.basename(logo_url)
)
return response
raise Http404("Remote image not found")
except requests.exceptions.Timeout:
logger.warning(f"Timeout fetching logo from {logo_url}")
raise Http404("Logo request timed out")
except requests.exceptions.ConnectionError:
logger.warning(f"Connection error fetching logo from {logo_url}")
raise Http404("Unable to connect to logo server")
except requests.RequestException as e:
logger.warning(f"Error fetching logo from {logo_url}: {e}")
raise Http404("Error fetching remote image")
class ChannelProfileViewSet(viewsets.ModelViewSet):
queryset = ChannelProfile.objects.all()
serializer_class = ChannelProfileSerializer
def get_queryset(self):
user = self.request.user
# If user_level is 10, return all ChannelProfiles
if hasattr(user, "user_level") and user.user_level == 10:
return ChannelProfile.objects.all()
# Otherwise, return only ChannelProfiles related to the user
return self.request.user.channel_profiles.all()
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
class GetChannelStreamsAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def get(self, request, channel_id):
channel = get_object_or_404(Channel, id=channel_id)
# Order the streams by channelstream__order to match the order in the channel view
streams = channel.streams.all().order_by("channelstream__order")
serializer = StreamSerializer(streams, many=True)
return Response(serializer.data)
class UpdateChannelMembershipAPIView(APIView):
permission_classes = [IsOwnerOfObject]
def patch(self, request, profile_id, channel_id):
"""Enable or disable a channel for a specific group"""
channel_profile = get_object_or_404(ChannelProfile, id=profile_id)
channel = get_object_or_404(Channel, id=channel_id)
try:
membership = ChannelProfileMembership.objects.get(
channel_profile=channel_profile, channel=channel
)
except ChannelProfileMembership.DoesNotExist:
# Create the membership if it does not exist (for custom channels)
membership = ChannelProfileMembership.objects.create(
channel_profile=channel_profile,
channel=channel,
enabled=False # Default to False, will be updated below
)
serializer = ChannelProfileMembershipSerializer(
membership, data=request.data, partial=True
)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class BulkUpdateChannelMembershipAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def patch(self, request, profile_id):
"""Bulk enable or disable channels for a specific profile"""
# Get the channel profile
channel_profile = get_object_or_404(ChannelProfile, id=profile_id)
# Validate the incoming data using the serializer
serializer = BulkChannelProfileMembershipSerializer(data=request.data)
if serializer.is_valid():
updates = serializer.validated_data["channels"]
channel_ids = [entry["channel_id"] for entry in updates]
memberships = ChannelProfileMembership.objects.filter(
channel_profile=channel_profile, channel_id__in=channel_ids
)
membership_dict = {m.channel.id: m for m in memberships}
for entry in updates:
channel_id = entry["channel_id"]
enabled_status = entry["enabled"]
if channel_id in membership_dict:
membership_dict[channel_id].enabled = enabled_status
ChannelProfileMembership.objects.bulk_update(memberships, ["enabled"])
return Response({"status": "success"}, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class RecordingViewSet(viewsets.ModelViewSet):
queryset = Recording.objects.all()
serializer_class = RecordingSerializer
def get_permissions(self):
# Allow unauthenticated playback of recording files (like other streaming endpoints)
if getattr(self, 'action', None) == 'file':
return [AllowAny()]
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
@action(detail=True, methods=["post"], url_path="comskip")
def comskip(self, request, pk=None):
"""Trigger comskip processing for this recording."""
from .tasks import comskip_process_recording
rec = get_object_or_404(Recording, pk=pk)
try:
comskip_process_recording.delay(rec.id)
return Response({"success": True, "queued": True})
except Exception as e:
return Response({"success": False, "error": str(e)}, status=400)
@action(detail=True, methods=["get"], url_path="file")
def file(self, request, pk=None):
"""Stream a recorded file with HTTP Range support for seeking."""
recording = get_object_or_404(Recording, pk=pk)
cp = recording.custom_properties or {}
file_path = cp.get("file_path")
file_name = cp.get("file_name") or "recording"
if not file_path or not os.path.exists(file_path):
raise Http404("Recording file not found")
# Guess content type
ext = os.path.splitext(file_path)[1].lower()
if ext == ".mp4":
content_type = "video/mp4"
elif ext == ".mkv":
content_type = "video/x-matroska"
else:
content_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
file_size = os.path.getsize(file_path)
range_header = request.META.get("HTTP_RANGE", "").strip()
def file_iterator(path, start=0, end=None, chunk_size=8192):
with open(path, "rb") as f:
f.seek(start)
remaining = (end - start + 1) if end is not None else None
while True:
if remaining is not None and remaining <= 0:
break
bytes_to_read = min(chunk_size, remaining) if remaining is not None else chunk_size
data = f.read(bytes_to_read)
if not data:
break
if remaining is not None:
remaining -= len(data)
yield data
if range_header and range_header.startswith("bytes="):
# Parse Range header
try:
range_spec = range_header.split("=", 1)[1]
start_str, end_str = range_spec.split("-", 1)
start = int(start_str) if start_str else 0
end = int(end_str) if end_str else file_size - 1
start = max(0, start)
end = min(file_size - 1, end)
length = end - start + 1
resp = StreamingHttpResponse(
file_iterator(file_path, start, end),
status=206,
content_type=content_type,
)
resp["Content-Range"] = f"bytes {start}-{end}/{file_size}"
resp["Content-Length"] = str(length)
resp["Accept-Ranges"] = "bytes"
resp["Content-Disposition"] = f"inline; filename=\"{file_name}\""
return resp
except Exception:
# Fall back to full file if parsing fails
pass
# Full file response
response = FileResponse(open(file_path, "rb"), content_type=content_type)
response["Content-Length"] = str(file_size)
response["Accept-Ranges"] = "bytes"
response["Content-Disposition"] = f"inline; filename=\"{file_name}\""
return response
def destroy(self, request, *args, **kwargs):
"""Delete the Recording and ensure any active DVR client connection is closed.
Also removes the associated file(s) from disk if present.
"""
instance = self.get_object()
# Attempt to close the DVR client connection for this channel if active
try:
channel_uuid = str(instance.channel.uuid)
# Lazy imports to avoid module overhead if proxy isn't used
from core.utils import RedisClient
from apps.proxy.ts_proxy.redis_keys import RedisKeys
from apps.proxy.ts_proxy.services.channel_service import ChannelService
r = RedisClient.get_client()
if r:
client_set_key = RedisKeys.clients(channel_uuid)
client_ids = r.smembers(client_set_key) or []
stopped = 0
for raw_id in client_ids:
try:
cid = raw_id.decode("utf-8") if isinstance(raw_id, (bytes, bytearray)) else str(raw_id)
meta_key = RedisKeys.client_metadata(channel_uuid, cid)
ua = r.hget(meta_key, "user_agent")
ua_s = ua.decode("utf-8") if isinstance(ua, (bytes, bytearray)) else (ua or "")
# Identify DVR recording client by its user agent
if ua_s and "Dispatcharr-DVR" in ua_s:
try:
ChannelService.stop_client(channel_uuid, cid)
stopped += 1
except Exception as inner_e:
logger.debug(f"Failed to stop DVR client {cid} for channel {channel_uuid}: {inner_e}")
except Exception as inner:
logger.debug(f"Error while checking client metadata: {inner}")
if stopped:
logger.info(f"Stopped {stopped} DVR client(s) for channel {channel_uuid} due to recording cancellation")
# If no clients remain after stopping DVR clients, proactively stop the channel
try:
remaining = r.scard(client_set_key) or 0
except Exception:
remaining = 0
if remaining == 0:
try:
ChannelService.stop_channel(channel_uuid)
logger.info(f"Stopped channel {channel_uuid} (no clients remain)")
except Exception as sc_e:
logger.debug(f"Unable to stop channel {channel_uuid}: {sc_e}")
except Exception as e:
logger.debug(f"Unable to stop DVR clients for cancelled recording: {e}")
# Capture paths before deletion
cp = instance.custom_properties or {}
file_path = cp.get("file_path")
temp_ts_path = cp.get("_temp_file_path")
# Perform DB delete first, then try to remove files
response = super().destroy(request, *args, **kwargs)
# Notify frontends to refresh recordings
try:
from core.utils import send_websocket_update
send_websocket_update('updates', 'update', {"success": True, "type": "recordings_refreshed"})
except Exception:
pass
library_dir = '/app/data'
allowed_roots = ['/data/', library_dir.rstrip('/') + '/']
def _safe_remove(path: str):
if not path or not isinstance(path, str):
return
try:
if any(path.startswith(root) for root in allowed_roots) and os.path.exists(path):
os.remove(path)
logger.info(f"Deleted recording artifact: {path}")
except Exception as ex:
logger.warning(f"Failed to delete recording artifact {path}: {ex}")
_safe_remove(file_path)
_safe_remove(temp_ts_path)
return response
class BulkDeleteUpcomingRecordingsAPIView(APIView):
"""Delete all upcoming (future) recordings."""
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_method[self.request.method]]
except KeyError:
return [Authenticated()]
def post(self, request):
now = timezone.now()
qs = Recording.objects.filter(start_time__gt=now)
removed = qs.count()
qs.delete()
try:
from core.utils import send_websocket_update
send_websocket_update('updates', 'update', {"success": True, "type": "recordings_refreshed", "removed": removed})
except Exception:
pass
return Response({"success": True, "removed": removed})
class SeriesRulesAPIView(APIView):
"""Manage DVR series recording rules (list/add)."""
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_method[self.request.method]]
except KeyError:
return [Authenticated()]
def get(self, request):
return Response({"rules": CoreSettings.get_dvr_series_rules()})
def post(self, request):
data = request.data or {}
tvg_id = str(data.get("tvg_id") or "").strip()
mode = (data.get("mode") or "all").lower()
title = data.get("title") or ""
if mode not in ("all", "new"):
return Response({"error": "mode must be 'all' or 'new'"}, status=status.HTTP_400_BAD_REQUEST)
if not tvg_id:
return Response({"error": "tvg_id is required"}, status=status.HTTP_400_BAD_REQUEST)
rules = CoreSettings.get_dvr_series_rules()
# Upsert by tvg_id
existing = next((r for r in rules if str(r.get("tvg_id")) == tvg_id), None)
if existing:
existing.update({"mode": mode, "title": title})
else:
rules.append({"tvg_id": tvg_id, "mode": mode, "title": title})
CoreSettings.set_dvr_series_rules(rules)
# Evaluate immediately for this tvg_id (async)
try:
evaluate_series_rules.delay(tvg_id)
except Exception:
pass
return Response({"success": True, "rules": rules})
class DeleteSeriesRuleAPIView(APIView):
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_method[self.request.method]]
except KeyError:
return [Authenticated()]
def delete(self, request, tvg_id):
tvg_id = str(tvg_id)
rules = [r for r in CoreSettings.get_dvr_series_rules() if str(r.get("tvg_id")) != tvg_id]
CoreSettings.set_dvr_series_rules(rules)
return Response({"success": True, "rules": rules})
class EvaluateSeriesRulesAPIView(APIView):
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_method[self.request.method]]
except KeyError:
return [Authenticated()]
def post(self, request):
tvg_id = request.data.get("tvg_id")
# Run synchronously so UI sees results immediately
result = evaluate_series_rules_impl(str(tvg_id)) if tvg_id else evaluate_series_rules_impl()
return Response({"success": True, **result})
class BulkRemoveSeriesRecordingsAPIView(APIView):
"""Bulk remove scheduled recordings for a series rule.
POST body:
- tvg_id: required (EPG channel id)
- title: optional (series title)
- scope: 'title' (default) or 'channel'
"""
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_method[self.request.method]]
except KeyError:
return [Authenticated()]
def post(self, request):
from django.utils import timezone
tvg_id = str(request.data.get("tvg_id") or "").strip()
title = request.data.get("title")
scope = (request.data.get("scope") or "title").lower()
if not tvg_id:
return Response({"error": "tvg_id is required"}, status=status.HTTP_400_BAD_REQUEST)
qs = Recording.objects.filter(
start_time__gte=timezone.now(),
custom_properties__program__tvg_id=tvg_id,
)
if scope == "title" and title:
qs = qs.filter(custom_properties__program__title=title)
count = qs.count()
qs.delete()
try:
from core.utils import send_websocket_update
send_websocket_update('updates', 'update', {"success": True, "type": "recordings_refreshed", "removed": count})
except Exception:
pass
return Response({"success": True, "removed": count})