import logging, os from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.decorators import action from drf_yasg.utils import swagger_auto_schema from drf_yasg import openapi from django.utils import timezone from datetime import timedelta from .models import EPGSource, ProgramData, EPGData # Added ProgramData from .serializers import ( ProgramDataSerializer, EPGSourceSerializer, EPGDataSerializer, ) # Updated serializer from .tasks import refresh_epg_data from apps.accounts.permissions import ( Authenticated, permission_classes_by_action, permission_classes_by_method, ) logger = logging.getLogger(__name__) # ───────────────────────────── # 1) EPG Source API (CRUD) # ───────────────────────────── class EPGSourceViewSet(viewsets.ModelViewSet): """ API endpoint that allows EPG sources to be viewed or edited. """ queryset = EPGSource.objects.all() serializer_class = EPGSourceSerializer def get_permissions(self): try: return [perm() for perm in permission_classes_by_action[self.action]] except KeyError: return [Authenticated()] def list(self, request, *args, **kwargs): logger.debug("Listing all EPG sources.") return super().list(request, *args, **kwargs) @action(detail=False, methods=["post"]) def upload(self, request): if "file" not in request.FILES: return Response( {"error": "No file uploaded"}, status=status.HTTP_400_BAD_REQUEST ) file = request.FILES["file"] file_name = file.name file_path = os.path.join("/data/uploads/epgs", file_name) os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "wb+") as destination: for chunk in file.chunks(): destination.write(chunk) new_obj_data = request.data.copy() new_obj_data["file_path"] = file_path serializer = self.get_serializer(data=new_obj_data) serializer.is_valid(raise_exception=True) self.perform_create(serializer) return Response(serializer.data, status=status.HTTP_201_CREATED) def partial_update(self, request, *args, **kwargs): """Handle partial updates with special logic for is_active field""" instance = self.get_object() # Check if we're toggling is_active if ( "is_active" in request.data and instance.is_active != request.data["is_active"] ): # Set appropriate status based on new is_active value if request.data["is_active"]: request.data["status"] = "idle" else: request.data["status"] = "disabled" # Continue with regular partial update return super().partial_update(request, *args, **kwargs) # ───────────────────────────── # 2) Program API (CRUD) # ───────────────────────────── class ProgramViewSet(viewsets.ModelViewSet): """Handles CRUD operations for EPG programs""" queryset = ProgramData.objects.all() serializer_class = ProgramDataSerializer def get_permissions(self): try: return [perm() for perm in permission_classes_by_action[self.action]] except KeyError: return [Authenticated()] def list(self, request, *args, **kwargs): logger.debug("Listing all EPG programs.") return super().list(request, *args, **kwargs) # ───────────────────────────── # 3) EPG Grid View # ───────────────────────────── class EPGGridAPIView(APIView): """Returns all programs airing in the next 24 hours including currently running ones and recent ones""" def get_permissions(self): try: return [ perm() for perm in permission_classes_by_method[self.request.method] ] except KeyError: return [Authenticated()] @swagger_auto_schema( operation_description="Retrieve programs from the previous hour, currently running and upcoming for the next 24 hours", responses={200: ProgramDataSerializer(many=True)}, ) def get(self, request, format=None): # Use current time instead of midnight now = timezone.now() one_hour_ago = now - timedelta(hours=1) twenty_four_hours_later = now + timedelta(hours=24) logger.debug( f"EPGGridAPIView: Querying programs between {one_hour_ago} and {twenty_four_hours_later}." ) # Use select_related to prefetch EPGData and include programs from the last hour programs = ProgramData.objects.select_related("epg").filter( # Programs that end after one hour ago (includes recently ended programs) end_time__gt=one_hour_ago, # AND start before the end time window start_time__lt=twenty_four_hours_later, ) count = programs.count() logger.debug( f"EPGGridAPIView: Found {count} program(s), including recently ended, currently running, and upcoming shows." ) # Generate dummy programs for channels that have no EPG data OR dummy EPG sources from apps.channels.models import Channel from apps.epg.models import EPGSource from django.db.models import Q # Get channels with no EPG data at all (standard dummy) channels_without_epg = Channel.objects.filter(Q(epg_data__isnull=True)) # Get channels with custom dummy EPG sources (generate on-demand with patterns) channels_with_custom_dummy = Channel.objects.filter( epg_data__epg_source__source_type='dummy' ).distinct() # Log what we found without_count = channels_without_epg.count() custom_count = channels_with_custom_dummy.count() if without_count > 0: channel_names = [f"{ch.name} (ID: {ch.id})" for ch in channels_without_epg] logger.debug( f"EPGGridAPIView: Channels needing standard dummy EPG: {', '.join(channel_names)}" ) if custom_count > 0: channel_names = [f"{ch.name} (ID: {ch.id})" for ch in channels_with_custom_dummy] logger.debug( f"EPGGridAPIView: Channels needing custom dummy EPG: {', '.join(channel_names)}" ) logger.debug( f"EPGGridAPIView: Found {without_count} channels needing standard dummy, {custom_count} needing custom dummy EPG." ) # Serialize the regular programs serialized_programs = ProgramDataSerializer(programs, many=True).data # Humorous program descriptions based on time of day - same as in output/views.py time_descriptions = { (0, 4): [ "Late Night with {channel} - Where insomniacs unite!", "The 'Why Am I Still Awake?' Show on {channel}", "Counting Sheep - A {channel} production for the sleepless", ], (4, 8): [ "Dawn Patrol - Rise and shine with {channel}!", "Early Bird Special - Coffee not included", "Morning Zombies - Before coffee viewing on {channel}", ], (8, 12): [ "Mid-Morning Meetings - Pretend you're paying attention while watching {channel}", "The 'I Should Be Working' Hour on {channel}", "Productivity Killer - {channel}'s daytime programming", ], (12, 16): [ "Lunchtime Laziness with {channel}", "The Afternoon Slump - Brought to you by {channel}", "Post-Lunch Food Coma Theater on {channel}", ], (16, 20): [ "Rush Hour - {channel}'s alternative to traffic", "The 'What's For Dinner?' Debate on {channel}", "Evening Escapism - {channel}'s remedy for reality", ], (20, 24): [ "Prime Time Placeholder - {channel}'s finest not-programming", "The 'Netflix Was Too Complicated' Show on {channel}", "Family Argument Avoider - Courtesy of {channel}", ], } # Generate and append dummy programs dummy_programs = [] # Import the function from output.views from apps.output.views import generate_dummy_programs as gen_dummy_progs # Handle channels with CUSTOM dummy EPG sources (with patterns) for channel in channels_with_custom_dummy: # For dummy EPGs, ALWAYS use channel UUID to ensure unique programs per channel # This prevents multiple channels assigned to the same dummy EPG from showing identical data # Each channel gets its own unique program data even if they share the same EPG source dummy_tvg_id = str(channel.uuid) try: # Get the custom dummy EPG source epg_source = channel.epg_data.epg_source if channel.epg_data else None logger.debug(f"Generating custom dummy programs for channel: {channel.name} (ID: {channel.id})") # Determine which name to parse based on custom properties name_to_parse = channel.name if epg_source and epg_source.custom_properties: custom_props = epg_source.custom_properties name_source = custom_props.get('name_source') if name_source == 'stream': # Get the stream index (1-based from user, convert to 0-based) stream_index = custom_props.get('stream_index', 1) - 1 # Get streams ordered by channelstream order channel_streams = channel.streams.all().order_by('channelstream__order') if channel_streams.exists() and 0 <= stream_index < channel_streams.count(): stream = list(channel_streams)[stream_index] name_to_parse = stream.name logger.debug(f"Using stream name for parsing: {name_to_parse} (stream index: {stream_index})") else: logger.warning(f"Stream index {stream_index} not found for channel {channel.name}, falling back to channel name") elif name_source == 'channel': logger.debug(f"Using channel name for parsing: {name_to_parse}") # Generate programs using custom patterns from the dummy EPG source # Use the same tvg_id that will be set in the program data generated = gen_dummy_progs( channel_id=dummy_tvg_id, channel_name=name_to_parse, num_days=1, program_length_hours=4, epg_source=epg_source ) # Custom dummy should always return data (either from patterns or fallback) if generated: logger.debug(f"Generated {len(generated)} custom dummy programs for {channel.name}") # Convert generated programs to API format for program in generated: dummy_program = { "id": f"dummy-custom-{channel.id}-{program['start_time'].hour}", "epg": {"tvg_id": dummy_tvg_id, "name": channel.name}, "start_time": program['start_time'].isoformat(), "end_time": program['end_time'].isoformat(), "title": program['title'], "description": program['description'], "tvg_id": dummy_tvg_id, "sub_title": None, "custom_properties": None, } dummy_programs.append(dummy_program) else: logger.warning(f"No programs generated for custom dummy EPG channel: {channel.name}") except Exception as e: logger.error( f"Error creating custom dummy programs for channel {channel.name} (ID: {channel.id}): {str(e)}" ) # Handle channels with NO EPG data (standard dummy with humorous descriptions) for channel in channels_without_epg: # For channels with no EPG, use UUID to ensure uniqueness (matches frontend logic) # The frontend uses: tvgRecord?.tvg_id ?? channel.uuid # Since there's no EPG data, it will fall back to UUID dummy_tvg_id = str(channel.uuid) try: logger.debug(f"Generating standard dummy programs for channel: {channel.name} (ID: {channel.id})") # Create programs every 4 hours for the next 24 hours with humorous descriptions for hour_offset in range(0, 24, 4): # Use timedelta for time arithmetic instead of replace() to avoid hour overflow start_time = now + timedelta(hours=hour_offset) # Set minutes/seconds to zero for clean time blocks start_time = start_time.replace(minute=0, second=0, microsecond=0) end_time = start_time + timedelta(hours=4) # Get the hour for selecting a description hour = start_time.hour day = 0 # Use 0 as we're only doing 1 day # Find the appropriate time slot for description for time_range, descriptions in time_descriptions.items(): start_range, end_range = time_range if start_range <= hour < end_range: # Pick a description using the sum of the hour and day as seed # This makes it somewhat random but consistent for the same timeslot description = descriptions[ (hour + day) % len(descriptions) ].format(channel=channel.name) break else: # Fallback description if somehow no range matches description = f"Placeholder program for {channel.name} - EPG data went on vacation" # Create a dummy program in the same format as regular programs dummy_program = { "id": f"dummy-standard-{channel.id}-{hour_offset}", "epg": {"tvg_id": dummy_tvg_id, "name": channel.name}, "start_time": start_time.isoformat(), "end_time": end_time.isoformat(), "title": f"{channel.name}", "description": description, "tvg_id": dummy_tvg_id, "sub_title": None, "custom_properties": None, } dummy_programs.append(dummy_program) except Exception as e: logger.error( f"Error creating standard dummy programs for channel {channel.name} (ID: {channel.id}): {str(e)}" ) # Combine regular and dummy programs all_programs = list(serialized_programs) + dummy_programs logger.debug( f"EPGGridAPIView: Returning {len(all_programs)} total programs (including {len(dummy_programs)} dummy programs)." ) return Response({"data": all_programs}, status=status.HTTP_200_OK) # ───────────────────────────── # 4) EPG Import View # ───────────────────────────── class EPGImportAPIView(APIView): """Triggers an EPG data refresh""" def get_permissions(self): try: return [ perm() for perm in permission_classes_by_method[self.request.method] ] except KeyError: return [Authenticated()] @swagger_auto_schema( operation_description="Triggers an EPG data import", responses={202: "EPG data import initiated"}, ) def post(self, request, format=None): logger.info("EPGImportAPIView: Received request to import EPG data.") epg_id = request.data.get("id", None) # Check if this is a dummy EPG source try: from .models import EPGSource epg_source = EPGSource.objects.get(id=epg_id) if epg_source.source_type == 'dummy': logger.info(f"EPGImportAPIView: Skipping refresh for dummy EPG source {epg_id}") return Response( {"success": False, "message": "Dummy EPG sources do not require refreshing."}, status=status.HTTP_400_BAD_REQUEST, ) except EPGSource.DoesNotExist: pass # Let the task handle the missing source refresh_epg_data.delay(epg_id) # Trigger Celery task logger.info("EPGImportAPIView: Task dispatched to refresh EPG data.") return Response( {"success": True, "message": "EPG data import initiated."}, status=status.HTTP_202_ACCEPTED, ) # ───────────────────────────── # 5) EPG Data View # ───────────────────────────── class EPGDataViewSet(viewsets.ReadOnlyModelViewSet): """ API endpoint that allows EPGData objects to be viewed. """ queryset = EPGData.objects.all() serializer_class = EPGDataSerializer def get_permissions(self): try: return [perm() for perm in permission_classes_by_action[self.action]] except KeyError: return [Authenticated()]