Dispatcharr/apps/epg/api_views.py
SergeantPanda 0e1db3e39c Enhancement: - Stats page enhancements: Added "Now Playing" program information for active streams with smart polling that only fetches EPG data when programs are about to change (not on every stats refresh). Features include:
- Currently playing program title displayed with live broadcast indicator (green Radio icon)
  - Expandable program descriptions via chevron button
  - Efficient POST-based API endpoint (`/api/epg/current-programs/`) supporting batch channel queries or fetching all channels
  - Smart scheduling that fetches new program data 5 seconds after current program ends
  - Only polls when active channel list changes, not on stats refresh
- Channel preview button: Added preview functionality to active stream cards on stats page
2026-01-20 13:37:40 -06:00

505 lines
21 KiB
Python

import logging, os
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.decorators import action
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from django.utils import timezone
from datetime import timedelta
from .models import EPGSource, ProgramData, EPGData # Added ProgramData
from .serializers import (
ProgramDataSerializer,
EPGSourceSerializer,
EPGDataSerializer,
) # Updated serializer
from .tasks import refresh_epg_data
from apps.accounts.permissions import (
Authenticated,
permission_classes_by_action,
permission_classes_by_method,
)
logger = logging.getLogger(__name__)
# ─────────────────────────────
# 1) EPG Source API (CRUD)
# ─────────────────────────────
class EPGSourceViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows EPG sources to be viewed or edited.
"""
queryset = EPGSource.objects.all()
serializer_class = EPGSourceSerializer
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
def list(self, request, *args, **kwargs):
logger.debug("Listing all EPG sources.")
return super().list(request, *args, **kwargs)
@action(detail=False, methods=["post"])
def upload(self, request):
if "file" not in request.FILES:
return Response(
{"error": "No file uploaded"}, status=status.HTTP_400_BAD_REQUEST
)
file = request.FILES["file"]
file_name = file.name
file_path = os.path.join("/data/uploads/epgs", file_name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb+") as destination:
for chunk in file.chunks():
destination.write(chunk)
new_obj_data = request.data.copy()
new_obj_data["file_path"] = file_path
serializer = self.get_serializer(data=new_obj_data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
return Response(serializer.data, status=status.HTTP_201_CREATED)
def partial_update(self, request, *args, **kwargs):
"""Handle partial updates with special logic for is_active field"""
instance = self.get_object()
# Check if we're toggling is_active
if (
"is_active" in request.data
and instance.is_active != request.data["is_active"]
):
# Set appropriate status based on new is_active value
if request.data["is_active"]:
request.data["status"] = "idle"
else:
request.data["status"] = "disabled"
# Continue with regular partial update
return super().partial_update(request, *args, **kwargs)
# ─────────────────────────────
# 2) Program API (CRUD)
# ─────────────────────────────
class ProgramViewSet(viewsets.ModelViewSet):
"""Handles CRUD operations for EPG programs"""
queryset = ProgramData.objects.all()
serializer_class = ProgramDataSerializer
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
def list(self, request, *args, **kwargs):
logger.debug("Listing all EPG programs.")
return super().list(request, *args, **kwargs)
# ─────────────────────────────
# 3) EPG Grid View
# ─────────────────────────────
class EPGGridAPIView(APIView):
"""Returns all programs airing in the next 24 hours including currently running ones and recent ones"""
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
@swagger_auto_schema(
operation_description="Retrieve programs from the previous hour, currently running and upcoming for the next 24 hours",
responses={200: ProgramDataSerializer(many=True)},
)
def get(self, request, format=None):
# Use current time instead of midnight
now = timezone.now()
one_hour_ago = now - timedelta(hours=1)
twenty_four_hours_later = now + timedelta(hours=24)
logger.debug(
f"EPGGridAPIView: Querying programs between {one_hour_ago} and {twenty_four_hours_later}."
)
# Use select_related to prefetch EPGData and include programs from the last hour
programs = ProgramData.objects.select_related("epg").filter(
# Programs that end after one hour ago (includes recently ended programs)
end_time__gt=one_hour_ago,
# AND start before the end time window
start_time__lt=twenty_four_hours_later,
)
count = programs.count()
logger.debug(
f"EPGGridAPIView: Found {count} program(s), including recently ended, currently running, and upcoming shows."
)
# Generate dummy programs for channels that have no EPG data OR dummy EPG sources
from apps.channels.models import Channel
from apps.epg.models import EPGSource
from django.db.models import Q
# Get channels with no EPG data at all (standard dummy)
channels_without_epg = Channel.objects.filter(Q(epg_data__isnull=True))
# Get channels with custom dummy EPG sources (generate on-demand with patterns)
channels_with_custom_dummy = Channel.objects.filter(
epg_data__epg_source__source_type='dummy'
).distinct()
# Log what we found
without_count = channels_without_epg.count()
custom_count = channels_with_custom_dummy.count()
if without_count > 0:
channel_names = [f"{ch.name} (ID: {ch.id})" for ch in channels_without_epg]
logger.debug(
f"EPGGridAPIView: Channels needing standard dummy EPG: {', '.join(channel_names)}"
)
if custom_count > 0:
channel_names = [f"{ch.name} (ID: {ch.id})" for ch in channels_with_custom_dummy]
logger.debug(
f"EPGGridAPIView: Channels needing custom dummy EPG: {', '.join(channel_names)}"
)
logger.debug(
f"EPGGridAPIView: Found {without_count} channels needing standard dummy, {custom_count} needing custom dummy EPG."
)
# Serialize the regular programs
serialized_programs = ProgramDataSerializer(programs, many=True).data
# Humorous program descriptions based on time of day - same as in output/views.py
time_descriptions = {
(0, 4): [
"Late Night with {channel} - Where insomniacs unite!",
"The 'Why Am I Still Awake?' Show on {channel}",
"Counting Sheep - A {channel} production for the sleepless",
],
(4, 8): [
"Dawn Patrol - Rise and shine with {channel}!",
"Early Bird Special - Coffee not included",
"Morning Zombies - Before coffee viewing on {channel}",
],
(8, 12): [
"Mid-Morning Meetings - Pretend you're paying attention while watching {channel}",
"The 'I Should Be Working' Hour on {channel}",
"Productivity Killer - {channel}'s daytime programming",
],
(12, 16): [
"Lunchtime Laziness with {channel}",
"The Afternoon Slump - Brought to you by {channel}",
"Post-Lunch Food Coma Theater on {channel}",
],
(16, 20): [
"Rush Hour - {channel}'s alternative to traffic",
"The 'What's For Dinner?' Debate on {channel}",
"Evening Escapism - {channel}'s remedy for reality",
],
(20, 24): [
"Prime Time Placeholder - {channel}'s finest not-programming",
"The 'Netflix Was Too Complicated' Show on {channel}",
"Family Argument Avoider - Courtesy of {channel}",
],
}
# Generate and append dummy programs
dummy_programs = []
# Import the function from output.views
from apps.output.views import generate_dummy_programs as gen_dummy_progs
# Handle channels with CUSTOM dummy EPG sources (with patterns)
for channel in channels_with_custom_dummy:
# For dummy EPGs, ALWAYS use channel UUID to ensure unique programs per channel
# This prevents multiple channels assigned to the same dummy EPG from showing identical data
# Each channel gets its own unique program data even if they share the same EPG source
dummy_tvg_id = str(channel.uuid)
try:
# Get the custom dummy EPG source
epg_source = channel.epg_data.epg_source if channel.epg_data else None
logger.debug(f"Generating custom dummy programs for channel: {channel.name} (ID: {channel.id})")
# Determine which name to parse based on custom properties
name_to_parse = channel.name
if epg_source and epg_source.custom_properties:
custom_props = epg_source.custom_properties
name_source = custom_props.get('name_source')
if name_source == 'stream':
# Get the stream index (1-based from user, convert to 0-based)
stream_index = custom_props.get('stream_index', 1) - 1
# Get streams ordered by channelstream order
channel_streams = channel.streams.all().order_by('channelstream__order')
if channel_streams.exists() and 0 <= stream_index < channel_streams.count():
stream = list(channel_streams)[stream_index]
name_to_parse = stream.name
logger.debug(f"Using stream name for parsing: {name_to_parse} (stream index: {stream_index})")
else:
logger.warning(f"Stream index {stream_index} not found for channel {channel.name}, falling back to channel name")
elif name_source == 'channel':
logger.debug(f"Using channel name for parsing: {name_to_parse}")
# Generate programs using custom patterns from the dummy EPG source
# Use the same tvg_id that will be set in the program data
generated = gen_dummy_progs(
channel_id=dummy_tvg_id,
channel_name=name_to_parse,
num_days=1,
program_length_hours=4,
epg_source=epg_source
)
# Custom dummy should always return data (either from patterns or fallback)
if generated:
logger.debug(f"Generated {len(generated)} custom dummy programs for {channel.name}")
# Convert generated programs to API format
for program in generated:
dummy_program = {
"id": f"dummy-custom-{channel.id}-{program['start_time'].hour}",
"epg": {"tvg_id": dummy_tvg_id, "name": channel.name},
"start_time": program['start_time'].isoformat(),
"end_time": program['end_time'].isoformat(),
"title": program['title'],
"description": program['description'],
"tvg_id": dummy_tvg_id,
"sub_title": None,
"custom_properties": None,
}
dummy_programs.append(dummy_program)
else:
logger.warning(f"No programs generated for custom dummy EPG channel: {channel.name}")
except Exception as e:
logger.error(
f"Error creating custom dummy programs for channel {channel.name} (ID: {channel.id}): {str(e)}"
)
# Handle channels with NO EPG data (standard dummy with humorous descriptions)
for channel in channels_without_epg:
# For channels with no EPG, use UUID to ensure uniqueness (matches frontend logic)
# The frontend uses: tvgRecord?.tvg_id ?? channel.uuid
# Since there's no EPG data, it will fall back to UUID
dummy_tvg_id = str(channel.uuid)
try:
logger.debug(f"Generating standard dummy programs for channel: {channel.name} (ID: {channel.id})")
# Create programs every 4 hours for the next 24 hours with humorous descriptions
for hour_offset in range(0, 24, 4):
# Use timedelta for time arithmetic instead of replace() to avoid hour overflow
start_time = now + timedelta(hours=hour_offset)
# Set minutes/seconds to zero for clean time blocks
start_time = start_time.replace(minute=0, second=0, microsecond=0)
end_time = start_time + timedelta(hours=4)
# Get the hour for selecting a description
hour = start_time.hour
day = 0 # Use 0 as we're only doing 1 day
# Find the appropriate time slot for description
for time_range, descriptions in time_descriptions.items():
start_range, end_range = time_range
if start_range <= hour < end_range:
# Pick a description using the sum of the hour and day as seed
# This makes it somewhat random but consistent for the same timeslot
description = descriptions[
(hour + day) % len(descriptions)
].format(channel=channel.name)
break
else:
# Fallback description if somehow no range matches
description = f"Placeholder program for {channel.name} - EPG data went on vacation"
# Create a dummy program in the same format as regular programs
dummy_program = {
"id": f"dummy-standard-{channel.id}-{hour_offset}",
"epg": {"tvg_id": dummy_tvg_id, "name": channel.name},
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"title": f"{channel.name}",
"description": description,
"tvg_id": dummy_tvg_id,
"sub_title": None,
"custom_properties": None,
}
dummy_programs.append(dummy_program)
except Exception as e:
logger.error(
f"Error creating standard dummy programs for channel {channel.name} (ID: {channel.id}): {str(e)}"
)
# Combine regular and dummy programs
all_programs = list(serialized_programs) + dummy_programs
logger.debug(
f"EPGGridAPIView: Returning {len(all_programs)} total programs (including {len(dummy_programs)} dummy programs)."
)
return Response({"data": all_programs}, status=status.HTTP_200_OK)
# ─────────────────────────────
# 4) EPG Import View
# ─────────────────────────────
class EPGImportAPIView(APIView):
"""Triggers an EPG data refresh"""
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
@swagger_auto_schema(
operation_description="Triggers an EPG data import",
responses={202: "EPG data import initiated"},
)
def post(self, request, format=None):
logger.info("EPGImportAPIView: Received request to import EPG data.")
epg_id = request.data.get("id", None)
# Check if this is a dummy EPG source
try:
from .models import EPGSource
epg_source = EPGSource.objects.get(id=epg_id)
if epg_source.source_type == 'dummy':
logger.info(f"EPGImportAPIView: Skipping refresh for dummy EPG source {epg_id}")
return Response(
{"success": False, "message": "Dummy EPG sources do not require refreshing."},
status=status.HTTP_400_BAD_REQUEST,
)
except EPGSource.DoesNotExist:
pass # Let the task handle the missing source
refresh_epg_data.delay(epg_id) # Trigger Celery task
logger.info("EPGImportAPIView: Task dispatched to refresh EPG data.")
return Response(
{"success": True, "message": "EPG data import initiated."},
status=status.HTTP_202_ACCEPTED,
)
# ─────────────────────────────
# 5) EPG Data View
# ─────────────────────────────
class EPGDataViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows EPGData objects to be viewed.
"""
queryset = EPGData.objects.all()
serializer_class = EPGDataSerializer
def get_permissions(self):
try:
return [perm() for perm in permission_classes_by_action[self.action]]
except KeyError:
return [Authenticated()]
# ─────────────────────────────
# 6) Current Programs API
# ─────────────────────────────
class CurrentProgramsAPIView(APIView):
"""
Lightweight endpoint that returns currently playing programs for specified channel IDs.
Accepts POST with JSON body containing channel_ids array, or null/empty to fetch all channels.
"""
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
@swagger_auto_schema(
operation_description="Get currently playing programs for specified channels or all channels",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'channel_ids': openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(type=openapi.TYPE_INTEGER),
description="Array of channel IDs. If null or omitted, returns all channels with current programs.",
nullable=True,
)
},
),
responses={200: openapi.Response('Current programs', ProgramDataSerializer(many=True))},
)
def post(self, request, format=None):
# Get channel IDs from request body
channel_ids = request.data.get('channel_ids', None)
# Import Channel model
from apps.channels.models import Channel
# Build query for channels with EPG data
query = Channel.objects.filter(epg_data__isnull=False)
# Filter by specific channel IDs if provided
if channel_ids is not None:
if not isinstance(channel_ids, list):
return Response(
{"error": "channel_ids must be an array of integers or null"},
status=status.HTTP_400_BAD_REQUEST
)
try:
channel_ids = [int(id) for id in channel_ids]
except (ValueError, TypeError):
return Response(
{"error": "channel_ids must contain valid integers"},
status=status.HTTP_400_BAD_REQUEST
)
query = query.filter(id__in=channel_ids)
# Get channels with EPG data
channels = query.select_related('epg_data')
# Get current time
now = timezone.now()
# Build list of current programs
current_programs = []
for channel in channels:
# Query for current program
program = ProgramData.objects.filter(
epg=channel.epg_data,
start_time__lte=now,
end_time__gt=now
).first()
if program:
# Serialize program and add channel_id for easy mapping
program_data = ProgramDataSerializer(program).data
program_data['channel_id'] = channel.id
current_programs.append(program_data)
return Response(current_programs, status=status.HTTP_200_OK)