diff --git a/.github/workflows/base-image.yml b/.github/workflows/base-image.yml index 955043fb..1da33d4f 100644 --- a/.github/workflows/base-image.yml +++ b/.github/workflows/base-image.yml @@ -6,11 +6,13 @@ on: paths: - 'docker/DispatcharrBase' - '.github/workflows/base-image.yml' + - 'requirements.txt' pull_request: branches: [ main, dev ] paths: - 'docker/DispatcharrBase' - '.github/workflows/base-image.yml' + - 'requirements.txt' workflow_dispatch: # Allow manual triggering permissions: diff --git a/apps/channels/api_views.py b/apps/channels/api_views.py index 23632704..890dd247 100644 --- a/apps/channels/api_views.py +++ b/apps/channels/api_views.py @@ -230,11 +230,11 @@ class ChannelViewSet(viewsets.ModelViewSet): type=openapi.TYPE_OBJECT, required=["channel_ids"], properties={ - "starting_number": openapi.Schema(type=openapi.TYPE_STRING, description="Starting channel number to assign"), + "starting_number": openapi.Schema(type=openapi.TYPE_NUMBER, description="Starting channel number to assign (can be decimal)"), "channel_ids": openapi.Schema( type=openapi.TYPE_ARRAY, items=openapi.Items(type=openapi.TYPE_INTEGER), - description="Channel IDs to assign" + description="Channel IDs to assign" ) } ), @@ -244,7 +244,12 @@ class ChannelViewSet(viewsets.ModelViewSet): def assign(self, request): with transaction.atomic(): channel_ids = request.data.get('channel_ids', []) - channel_num = request.data.get('starting_number', 1) + # Ensure starting_number is processed as a float + try: + channel_num = float(request.data.get('starting_number', 1)) + except (ValueError, TypeError): + channel_num = 1.0 + for channel_id in channel_ids: Channel.objects.filter(id=channel_id).update(channel_number=channel_num) channel_num = channel_num + 1 @@ -266,7 +271,7 @@ class ChannelViewSet(viewsets.ModelViewSet): type=openapi.TYPE_INTEGER, description="ID of the stream to link" ), "channel_number": openapi.Schema( - type=openapi.TYPE_INTEGER, + type=openapi.TYPE_NUMBER, description="(Optional) Desired channel number. Must not be in use." ), "name": openapi.Schema( @@ -293,9 +298,9 @@ class ChannelViewSet(viewsets.ModelViewSet): channel_number = None if 'tvg-chno' in stream_custom_props: - channel_number = int(stream_custom_props['tvg-chno']) + channel_number = float(stream_custom_props['tvg-chno']) elif 'channel-number' in stream_custom_props: - channel_number = int(stream_custom_props['channel-number']) + channel_number = float(stream_custom_props['channel-number']) if channel_number is None: provided_number = request.data.get('channel_number') @@ -303,7 +308,7 @@ class ChannelViewSet(viewsets.ModelViewSet): channel_number = Channel.get_next_available_channel_number() else: try: - channel_number = int(provided_number) + channel_number = float(provided_number) except ValueError: return Response({"error": "channel_number must be an integer."}, status=status.HTTP_400_BAD_REQUEST) # If the provided number is already used, return an error. @@ -362,7 +367,7 @@ class ChannelViewSet(viewsets.ModelViewSet): type=openapi.TYPE_INTEGER, description="ID of the stream to link" ), "channel_number": openapi.Schema( - type=openapi.TYPE_INTEGER, + type=openapi.TYPE_NUMBER, description="(Optional) Desired channel number. Must not be in use." ), "name": openapi.Schema( @@ -419,9 +424,13 @@ class ChannelViewSet(viewsets.ModelViewSet): channel_number = None if 'tvg-chno' in stream_custom_props: - channel_number = int(stream_custom_props['tvg-chno']) + channel_number = float(stream_custom_props['tvg-chno']) elif 'channel-number' in stream_custom_props: - channel_number = int(stream_custom_props['channel-number']) + channel_number = float(stream_custom_props['channel-number']) + # Get the tvc_guide_stationid from custom properties if it exists + tvc_guide_stationid = None + if 'tvc-guide-stationid' in stream_custom_props: + tvc_guide_stationid = stream_custom_props['tvc-guide-stationid'] # Determine channel number: if provided, use it (if free); else auto assign. if channel_number is None: @@ -430,7 +439,7 @@ class ChannelViewSet(viewsets.ModelViewSet): channel_number = get_auto_number() else: try: - channel_number = int(provided_number) + channel_number = float(provided_number) except ValueError: errors.append({"item": item, "error": "channel_number must be an integer."}) continue @@ -442,6 +451,7 @@ class ChannelViewSet(viewsets.ModelViewSet): channel_data = { "channel_number": channel_number, "name": name, + 'tvc_guide_stationid': tvc_guide_stationid, "tvg_id": stream.tvg_id, "channel_group_id": channel_group.id, } diff --git a/apps/channels/forms.py b/apps/channels/forms.py index 342bd0fe..a566adbd 100644 --- a/apps/channels/forms.py +++ b/apps/channels/forms.py @@ -14,6 +14,13 @@ class ChannelGroupForm(forms.ModelForm): # Channel Form # class ChannelForm(forms.ModelForm): + # Explicitly define channel_number as FloatField to ensure decimal values work + channel_number = forms.FloatField( + required=False, + widget=forms.NumberInput(attrs={'step': '0.1'}), # Allow decimal steps + help_text="Channel number can include decimals (e.g., 1.1, 2.5)" + ) + channel_group = forms.ModelChoiceField( queryset=ChannelGroup.objects.all(), required=False, diff --git a/apps/channels/migrations/0020_alter_channel_channel_number.py b/apps/channels/migrations/0020_alter_channel_channel_number.py new file mode 100644 index 00000000..0a1b6ead --- /dev/null +++ b/apps/channels/migrations/0020_alter_channel_channel_number.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.6 on 2025-05-15 19:37 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dispatcharr_channels', '0019_channel_tvc_guide_stationid'), + ] + + operations = [ + migrations.AlterField( + model_name='channel', + name='channel_number', + field=models.FloatField(db_index=True), + ), + ] diff --git a/apps/channels/models.py b/apps/channels/models.py index dc5c50fb..191eb45e 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -209,7 +209,7 @@ class ChannelManager(models.Manager): class Channel(models.Model): - channel_number = models.IntegerField() + channel_number = models.FloatField(db_index=True) name = models.CharField(max_length=255) logo = models.ForeignKey( 'Logo', diff --git a/apps/channels/serializers.py b/apps/channels/serializers.py index ce3910a4..5423037f 100644 --- a/apps/channels/serializers.py +++ b/apps/channels/serializers.py @@ -115,7 +115,14 @@ class BulkChannelProfileMembershipSerializer(serializers.Serializer): # class ChannelSerializer(serializers.ModelSerializer): # Show nested group data, or ID - channel_number = serializers.IntegerField(allow_null=True, required=False) + # Ensure channel_number is explicitly typed as FloatField and properly validated + channel_number = serializers.FloatField( + allow_null=True, + required=False, + error_messages={ + 'invalid': 'Channel number must be a valid decimal number.' + } + ) channel_group_id = serializers.PrimaryKeyRelatedField( queryset=ChannelGroup.objects.all(), source="channel_group", @@ -234,6 +241,16 @@ class ChannelSerializer(serializers.ModelSerializer): return instance + def validate_channel_number(self, value): + """Ensure channel_number is properly processed as a float""" + if value is None: + return value + + try: + # Ensure it's processed as a float + return float(value) + except (ValueError, TypeError): + raise serializers.ValidationError("Channel number must be a valid decimal number.") def validate_stream_profile(self, value): """Handle special case where empty/0 values mean 'use default' (null)""" diff --git a/apps/channels/tasks.py b/apps/channels/tasks.py index 88d040e8..6217a4ca 100755 --- a/apps/channels/tasks.py +++ b/apps/channels/tasks.py @@ -7,6 +7,7 @@ import time import json import subprocess from datetime import datetime +import gc from celery import shared_task from django.utils.text import slugify @@ -63,146 +64,162 @@ def match_epg_channels(): 4) If a match is found, we set channel.tvg_id 5) Summarize and log results. """ - logger.info("Starting EPG matching logic...") - - # Attempt to retrieve a "preferred-region" if configured try: - region_obj = CoreSettings.objects.get(key="preferred-region") - region_code = region_obj.value.strip().lower() - except CoreSettings.DoesNotExist: - region_code = None + logger.info("Starting EPG matching logic...") - matched_channels = [] - channels_to_update = [] + # Attempt to retrieve a "preferred-region" if configured + try: + region_obj = CoreSettings.objects.get(key="preferred-region") + region_code = region_obj.value.strip().lower() + except CoreSettings.DoesNotExist: + region_code = None - # Get channels that don't have EPG data assigned - channels_without_epg = Channel.objects.filter(epg_data__isnull=True) - logger.info(f"Found {channels_without_epg.count()} channels without EPG data") + matched_channels = [] + channels_to_update = [] - channels_json = [] - for channel in channels_without_epg: - # Normalize TVG ID - strip whitespace and convert to lowercase - normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" - if normalized_tvg_id: - logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'") + # Get channels that don't have EPG data assigned + channels_without_epg = Channel.objects.filter(epg_data__isnull=True) + logger.info(f"Found {channels_without_epg.count()} channels without EPG data") - channels_json.append({ - "id": channel.id, - "name": channel.name, - "tvg_id": normalized_tvg_id, # Use normalized TVG ID - "original_tvg_id": channel.tvg_id, # Keep original for reference - "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, - "norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name) - }) + channels_json = [] + for channel in channels_without_epg: + # Normalize TVG ID - strip whitespace and convert to lowercase + normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" + if normalized_tvg_id: + logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'") - # Similarly normalize EPG data TVG IDs - epg_json = [] - for epg in EPGData.objects.all(): - normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" - epg_json.append({ - 'id': epg.id, - 'tvg_id': normalized_tvg_id, # Use normalized TVG ID - 'original_tvg_id': epg.tvg_id, # Keep original for reference - 'name': epg.name, - 'norm_name': normalize_name(epg.name), - 'epg_source_id': epg.epg_source.id if epg.epg_source else None, - }) + channels_json.append({ + "id": channel.id, + "name": channel.name, + "tvg_id": normalized_tvg_id, # Use normalized TVG ID + "original_tvg_id": channel.tvg_id, # Keep original for reference + "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, + "norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name) + }) - # Log available EPG data TVG IDs for debugging - unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id']) - logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}") + # Similarly normalize EPG data TVG IDs + epg_json = [] + for epg in EPGData.objects.all(): + normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" + epg_json.append({ + 'id': epg.id, + 'tvg_id': normalized_tvg_id, # Use normalized TVG ID + 'original_tvg_id': epg.tvg_id, # Keep original for reference + 'name': epg.name, + 'norm_name': normalize_name(epg.name), + 'epg_source_id': epg.epg_source.id if epg.epg_source else None, + }) - payload = { - "channels": channels_json, - "epg_data": epg_json, - "region_code": region_code, - } + # Log available EPG data TVG IDs for debugging + unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id']) + logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}") - with tempfile.NamedTemporaryFile(delete=False) as temp_file: - temp_file.write(json.dumps(payload).encode('utf-8')) - temp_file_path = temp_file.name - - process = subprocess.Popen( - ['python', '/app/scripts/epg_match.py', temp_file_path], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - - # Log stderr in real-time - for line in iter(process.stderr.readline, ''): - if line: - logger.info(line.strip()) - - process.stderr.close() - stdout, stderr = process.communicate() - - os.remove(temp_file_path) - - if process.returncode != 0: - return f"Failed to process EPG matching: {stderr}" - - result = json.loads(stdout) - # This returns lists of dicts, not model objects - channels_to_update_dicts = result["channels_to_update"] - matched_channels = result["matched_channels"] - - # Convert your dict-based 'channels_to_update' into real Channel objects - if channels_to_update_dicts: - # Extract IDs of the channels that need updates - channel_ids = [d["id"] for d in channels_to_update_dicts] - - # Fetch them from DB - channels_qs = Channel.objects.filter(id__in=channel_ids) - channels_list = list(channels_qs) - - # Build a map from channel_id -> epg_data_id (or whatever fields you need) - epg_mapping = { - d["id"]: d["epg_data_id"] for d in channels_to_update_dicts + payload = { + "channels": channels_json, + "epg_data": epg_json, + "region_code": region_code, } - # Populate each Channel object with the updated epg_data_id - for channel_obj in channels_list: - # The script sets 'epg_data_id' in the returned dict - # We either assign directly, or fetch the EPGData instance if needed. - channel_obj.epg_data_id = epg_mapping.get(channel_obj.id) + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(json.dumps(payload).encode('utf-8')) + temp_file_path = temp_file.name - # Now we have real model objects, so bulk_update will work - Channel.objects.bulk_update(channels_list, ["epg_data"]) + # After writing to the file but before subprocess + # Explicitly delete the large data structures + del payload + gc.collect() - total_matched = len(matched_channels) - if total_matched: - logger.info(f"Match Summary: {total_matched} channel(s) matched.") - for (cid, cname, tvg) in matched_channels: - logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'") - else: - logger.info("No new channels were matched.") + process = subprocess.Popen( + ['python', '/app/scripts/epg_match.py', temp_file_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) - logger.info("Finished EPG matching logic.") + # Log stderr in real-time + for line in iter(process.stderr.readline, ''): + if line: + logger.info(line.strip()) - # Send update with additional information for refreshing UI - channel_layer = get_channel_layer() - associations = [ - {"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]} - for chan in channels_to_update_dicts - ] + process.stderr.close() + stdout, stderr = process.communicate() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": { - "success": True, - "type": "epg_match", - "refresh_channels": True, # Flag to tell frontend to refresh channels - "matches_count": total_matched, - "message": f"EPG matching complete: {total_matched} channel(s) matched", - "associations": associations # Add the associations data + os.remove(temp_file_path) + + if process.returncode != 0: + return f"Failed to process EPG matching: {stderr}" + + result = json.loads(stdout) + # This returns lists of dicts, not model objects + channels_to_update_dicts = result["channels_to_update"] + matched_channels = result["matched_channels"] + + # Explicitly clean up large objects + del stdout, result + gc.collect() + + # Convert your dict-based 'channels_to_update' into real Channel objects + if channels_to_update_dicts: + # Extract IDs of the channels that need updates + channel_ids = [d["id"] for d in channels_to_update_dicts] + + # Fetch them from DB + channels_qs = Channel.objects.filter(id__in=channel_ids) + channels_list = list(channels_qs) + + # Build a map from channel_id -> epg_data_id (or whatever fields you need) + epg_mapping = { + d["id"]: d["epg_data_id"] for d in channels_to_update_dicts } - } - ) - return f"Done. Matched {total_matched} channel(s)." + # Populate each Channel object with the updated epg_data_id + for channel_obj in channels_list: + # The script sets 'epg_data_id' in the returned dict + # We either assign directly, or fetch the EPGData instance if needed. + channel_obj.epg_data_id = epg_mapping.get(channel_obj.id) + + # Now we have real model objects, so bulk_update will work + Channel.objects.bulk_update(channels_list, ["epg_data"]) + + total_matched = len(matched_channels) + if total_matched: + logger.info(f"Match Summary: {total_matched} channel(s) matched.") + for (cid, cname, tvg) in matched_channels: + logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'") + else: + logger.info("No new channels were matched.") + + logger.info("Finished EPG matching logic.") + + # Send update with additional information for refreshing UI + channel_layer = get_channel_layer() + associations = [ + {"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]} + for chan in channels_to_update_dicts + ] + + async_to_sync(channel_layer.group_send)( + 'updates', + { + 'type': 'update', + "data": { + "success": True, + "type": "epg_match", + "refresh_channels": True, # Flag to tell frontend to refresh channels + "matches_count": total_matched, + "message": f"EPG matching complete: {total_matched} channel(s) matched", + "associations": associations # Add the associations data + } + } + ) + + return f"Done. Matched {total_matched} channel(s)." + finally: + # Final cleanup + gc.collect() + # Use our standardized cleanup function for more thorough memory management + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) @shared_task diff --git a/apps/epg/api_views.py b/apps/epg/api_views.py index 70bfa87d..240e2dcb 100644 --- a/apps/epg/api_views.py +++ b/apps/epg/api_views.py @@ -125,6 +125,40 @@ class EPGGridAPIView(APIView): # Serialize the regular programs serialized_programs = ProgramDataSerializer(programs, many=True).data + # Humorous program descriptions based on time of day - same as in output/views.py + time_descriptions = { + (0, 4): [ + "Late Night with {channel} - Where insomniacs unite!", + "The 'Why Am I Still Awake?' Show on {channel}", + "Counting Sheep - A {channel} production for the sleepless" + ], + (4, 8): [ + "Dawn Patrol - Rise and shine with {channel}!", + "Early Bird Special - Coffee not included", + "Morning Zombies - Before coffee viewing on {channel}" + ], + (8, 12): [ + "Mid-Morning Meetings - Pretend you're paying attention while watching {channel}", + "The 'I Should Be Working' Hour on {channel}", + "Productivity Killer - {channel}'s daytime programming" + ], + (12, 16): [ + "Lunchtime Laziness with {channel}", + "The Afternoon Slump - Brought to you by {channel}", + "Post-Lunch Food Coma Theater on {channel}" + ], + (16, 20): [ + "Rush Hour - {channel}'s alternative to traffic", + "The 'What's For Dinner?' Debate on {channel}", + "Evening Escapism - {channel}'s remedy for reality" + ], + (20, 24): [ + "Prime Time Placeholder - {channel}'s finest not-programming", + "The 'Netflix Was Too Complicated' Show on {channel}", + "Family Argument Avoider - Courtesy of {channel}" + ] + } + # Generate and append dummy programs dummy_programs = [] for channel in channels_without_epg: @@ -140,6 +174,22 @@ class EPGGridAPIView(APIView): start_time = start_time.replace(minute=0, second=0, microsecond=0) end_time = start_time + timedelta(hours=4) + # Get the hour for selecting a description + hour = start_time.hour + day = 0 # Use 0 as we're only doing 1 day + + # Find the appropriate time slot for description + for time_range, descriptions in time_descriptions.items(): + start_range, end_range = time_range + if start_range <= hour < end_range: + # Pick a description using the sum of the hour and day as seed + # This makes it somewhat random but consistent for the same timeslot + description = descriptions[(hour + day) % len(descriptions)].format(channel=channel.name) + break + else: + # Fallback description if somehow no range matches + description = f"Placeholder program for {channel.name} - EPG data went on vacation" + # Create a dummy program in the same format as regular programs dummy_program = { 'id': f"dummy-{channel.id}-{hour_offset}", # Create a unique ID @@ -150,7 +200,7 @@ class EPGGridAPIView(APIView): 'start_time': start_time.isoformat(), 'end_time': end_time.isoformat(), 'title': f"{channel.name}", - 'description': f"Placeholder program for {channel.name}", + 'description': description, 'tvg_id': dummy_tvg_id, 'sub_title': None, 'custom_properties': None diff --git a/apps/epg/migrations/0013_alter_epgsource_refresh_interval.py b/apps/epg/migrations/0013_alter_epgsource_refresh_interval.py new file mode 100644 index 00000000..64be2c3c --- /dev/null +++ b/apps/epg/migrations/0013_alter_epgsource_refresh_interval.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.6 on 2025-05-21 19:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('epg', '0012_alter_epgsource_status'), + ] + + operations = [ + migrations.AlterField( + model_name='epgsource', + name='refresh_interval', + field=models.IntegerField(default=0), + ), + ] diff --git a/apps/epg/migrations/0014_epgsource_extracted_file_path.py b/apps/epg/migrations/0014_epgsource_extracted_file_path.py new file mode 100644 index 00000000..9ee1170b --- /dev/null +++ b/apps/epg/migrations/0014_epgsource_extracted_file_path.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.6 on 2025-05-26 15:48 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('epg', '0013_alter_epgsource_refresh_interval'), + ] + + operations = [ + migrations.AddField( + model_name='epgsource', + name='extracted_file_path', + field=models.CharField(blank=True, help_text='Path to extracted XML file after decompression', max_length=1024, null=True), + ), + ] diff --git a/apps/epg/models.py b/apps/epg/models.py index ed8f2708..8abfb26f 100644 --- a/apps/epg/models.py +++ b/apps/epg/models.py @@ -32,7 +32,9 @@ class EPGSource(models.Model): api_key = models.CharField(max_length=255, blank=True, null=True) # For Schedules Direct is_active = models.BooleanField(default=True) file_path = models.CharField(max_length=1024, blank=True, null=True) - refresh_interval = models.IntegerField(default=24) + extracted_file_path = models.CharField(max_length=1024, blank=True, null=True, + help_text="Path to extracted XML file after decompression") + refresh_interval = models.IntegerField(default=0) refresh_task = models.ForeignKey( PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True ) @@ -59,8 +61,46 @@ class EPGSource(models.Model): return self.name def get_cache_file(self): - # Decide on file extension - file_ext = ".gz" if self.url.lower().endswith('.gz') else ".xml" + import mimetypes + + # Use a temporary extension for initial download + # The actual extension will be determined after content inspection + file_ext = ".tmp" + + # If file_path is already set and contains an extension, use that + # This handles cases where we've already detected the proper type + if self.file_path and os.path.exists(self.file_path): + _, existing_ext = os.path.splitext(self.file_path) + if existing_ext: + file_ext = existing_ext + else: + # Try to detect the MIME type and map to extension + mime_type, _ = mimetypes.guess_type(self.file_path) + if mime_type: + if mime_type == 'application/gzip' or mime_type == 'application/x-gzip': + file_ext = '.gz' + elif mime_type == 'application/zip': + file_ext = '.zip' + elif mime_type == 'application/xml' or mime_type == 'text/xml': + file_ext = '.xml' + # For files without mime type detection, try peeking at content + else: + try: + with open(self.file_path, 'rb') as f: + header = f.read(4) + # Check for gzip magic number (1f 8b) + if header[:2] == b'\x1f\x8b': + file_ext = '.gz' + # Check for zip magic number (PK..) + elif header[:2] == b'PK': + file_ext = '.zip' + # Check for XML + elif header[:5] == b'': + file_ext = '.xml' + except Exception as e: + # If we can't read the file, just keep the default extension + pass + filename = f"{self.id}{file_ext}" # Build full path in MEDIA_ROOT/cached_epg diff --git a/apps/epg/signals.py b/apps/epg/signals.py index 6f98e84a..e8a004cb 100644 --- a/apps/epg/signals.py +++ b/apps/epg/signals.py @@ -3,8 +3,10 @@ from django.dispatch import receiver from .models import EPGSource from .tasks import refresh_epg_data, delete_epg_refresh_task_by_id from django_celery_beat.models import PeriodicTask, IntervalSchedule +from core.utils import is_protected_path import json import logging +import os logger = logging.getLogger(__name__) @@ -95,3 +97,31 @@ def update_status_on_active_change(sender, instance, **kwargs): except EPGSource.DoesNotExist: # New record, will use default status pass + +@receiver(post_delete, sender=EPGSource) +def delete_cached_files(sender, instance, **kwargs): + """ + Delete cached files associated with an EPGSource when it's deleted. + Only deletes files that aren't in protected directories. + """ + # Check and delete the main file path if not protected + if instance.file_path and os.path.exists(instance.file_path): + if is_protected_path(instance.file_path): + logger.info(f"Skipping deletion of protected file: {instance.file_path}") + else: + try: + os.remove(instance.file_path) + logger.info(f"Deleted cached file: {instance.file_path}") + except OSError as e: + logger.error(f"Error deleting cached file {instance.file_path}: {e}") + + # Check and delete the extracted file path if it exists, is different from main path, and not protected + if instance.extracted_file_path and os.path.exists(instance.extracted_file_path) and instance.extracted_file_path != instance.file_path: + if is_protected_path(instance.extracted_file_path): + logger.info(f"Skipping deletion of protected extracted file: {instance.extracted_file_path}") + else: + try: + os.remove(instance.extracted_file_path) + logger.info(f"Deleted extracted file: {instance.extracted_file_path}") + except OSError as e: + logger.error(f"Error deleting extracted file {instance.extracted_file_path}: {e}") diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index f102630f..1a5f832e 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -5,9 +5,13 @@ import gzip import os import uuid import requests -import xml.etree.ElementTree as ET import time # Add import for tracking download progress from datetime import datetime, timedelta, timezone as dt_timezone +import gc # Add garbage collection module +import json +from lxml import etree # Using lxml exclusively +import psutil # Add import for memory tracking +import zipfile from celery import shared_task from django.conf import settings @@ -20,7 +24,7 @@ from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from .models import EPGSource, EPGData, ProgramData -from core.utils import acquire_task_lock, release_task_lock +from core.utils import acquire_task_lock, release_task_lock, send_websocket_update, cleanup_memory logger = logging.getLogger(__name__) @@ -38,15 +42,18 @@ def send_epg_update(source_id, action, progress, **kwargs): # Add the additional key-value pairs from kwargs data.update(kwargs) - # Now, send the updated data dictionary - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - 'data': data - } - ) + # Use the standardized update function with garbage collection for program parsing + # This is a high-frequency operation that needs more aggressive memory management + collect_garbage = action == "parsing_programs" and progress % 10 == 0 + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) + + # Explicitly clear references + data = None + + # For high-frequency parsing, occasionally force additional garbage collection + # to prevent memory buildup + if action == "parsing_programs" and progress % 50 == 0: + gc.collect() def delete_epg_refresh_task_by_id(epg_id): @@ -112,6 +119,8 @@ def refresh_all_epg_data(): for source in active_sources: refresh_epg_data(source.id) + # Force garbage collection between sources + gc.collect() logger.info("Finished refresh_epg_data task.") return "EPG data refreshed." @@ -123,6 +132,7 @@ def refresh_epg_data(source_id): logger.debug(f"EPG refresh for {source_id} already running") return + source = None try: # Try to get the EPG source try: @@ -139,12 +149,16 @@ def refresh_epg_data(source_id): # Release the lock and exit release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return f"EPG source {source_id} does not exist, task cleaned up" # The source exists but is not active, just skip processing if not source.is_active: logger.info(f"EPG source {source_id} is not active. Skipping.") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return # Continue with the normal processing... @@ -154,12 +168,16 @@ def refresh_epg_data(source_id): if not fetch_success: logger.error(f"Failed to fetch XMLTV for source {source.name}") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return parse_channels_success = parse_channels_only(source) if not parse_channels_success: logger.error(f"Failed to parse channels for source {source.name}") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return parse_programs_for_source(source) @@ -171,14 +189,18 @@ def refresh_epg_data(source_id): except Exception as e: logger.error(f"Error in refresh_epg_data for source {source_id}: {e}", exc_info=True) try: - source = EPGSource.objects.get(id=source_id) - source.status = 'error' - source.last_message = f"Error refreshing EPG data: {str(e)}" - source.save(update_fields=['status', 'last_message']) - send_epg_update(source_id, "refresh", 100, status="error", error=str(e)) + if source: + source.status = 'error' + source.last_message = f"Error refreshing EPG data: {str(e)}" + source.save(update_fields=['status', 'last_message']) + send_epg_update(source_id, "refresh", 100, status="error", error=str(e)) except Exception as inner_e: logger.error(f"Error updating source status: {inner_e}") finally: + # Clear references to ensure proper garbage collection + source = None + # Force garbage collection before releasing the lock + gc.collect() release_task_lock('refresh_epg_data', source_id) @@ -187,6 +209,28 @@ def fetch_xmltv(source): if not source.url and source.file_path and os.path.exists(source.file_path): logger.info(f"Using existing local file for EPG source: {source.name} at {source.file_path}") + # Check if the existing file is compressed and we need to extract it + if source.file_path.endswith(('.gz', '.zip')) and not source.file_path.endswith('.xml'): + try: + # Define the path for the extracted file in the cache directory + cache_dir = os.path.join(settings.MEDIA_ROOT, "cached_epg") + os.makedirs(cache_dir, exist_ok=True) + xml_path = os.path.join(cache_dir, f"{source.id}.xml") + + # Extract to the cache location keeping the original + extracted_path = extract_compressed_file(source.file_path, xml_path, delete_original=False) + + if extracted_path: + logger.info(f"Extracted mapped compressed file to: {extracted_path}") + # Update to use extracted_file_path instead of changing file_path + source.extracted_file_path = extracted_path + source.save(update_fields=['extracted_file_path']) + else: + logger.error(f"Failed to extract mapped compressed file. Using original file: {source.file_path}") + except Exception as e: + logger.error(f"Failed to extract existing compressed file: {e}") + # Continue with the original file if extraction fails + # Set the status to success in the database source.status = 'success' source.save(update_fields=['status']) @@ -206,9 +250,6 @@ def fetch_xmltv(source): send_epg_update(source.id, "downloading", 100, status="error", error="No URL provided and no valid local file exists") return False - if os.path.exists(source.get_cache_file()): - os.remove(source.get_cache_file()) - logger.info(f"Fetching XMLTV data from source: {source.name}") try: # Get default user agent from settings @@ -235,7 +276,7 @@ def fetch_xmltv(source): send_epg_update(source.id, "downloading", 0) # Use streaming response to track download progress - with requests.get(source.url, headers=headers, stream=True, timeout=30) as response: + with requests.get(source.url, headers=headers, stream=True, timeout=60) as response: # Handle 404 specifically if response.status_code == 404: logger.error(f"EPG URL not found (404): {source.url}") @@ -297,16 +338,23 @@ def fetch_xmltv(source): response.raise_for_status() logger.debug("XMLTV data fetched successfully.") - cache_file = source.get_cache_file() + # Define base paths for consistent file naming + cache_dir = os.path.join(settings.MEDIA_ROOT, "cached_epg") + os.makedirs(cache_dir, exist_ok=True) + + # Create temporary download file with .tmp extension + temp_download_path = os.path.join(cache_dir, f"{source.id}.tmp") # Check if we have content length for progress tracking total_size = int(response.headers.get('content-length', 0)) downloaded = 0 start_time = time.time() last_update_time = start_time + update_interval = 0.5 # Only update every 0.5 seconds - with open(cache_file, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): + # Download to temporary file + with open(temp_download_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=16384): # Increased chunk size for better performance if chunk: f.write(chunk) @@ -326,27 +374,115 @@ def fetch_xmltv(source): # Time remaining (in seconds) time_remaining = (total_size - downloaded) / (speed * 1024) if speed > 0 and total_size > 0 else 0 - # Only send updates every 0.5 seconds to avoid flooding + # Only send updates at specified intervals to avoid flooding current_time = time.time() - if current_time - last_update_time >= 0.5 and progress > 0: + if current_time - last_update_time >= update_interval and progress > 0: last_update_time = current_time send_epg_update( source.id, "downloading", progress, - speed=speed, - elapsed_time=elapsed_time, - time_remaining=time_remaining + speed=round(speed, 2), + elapsed_time=round(elapsed_time, 1), + time_remaining=round(time_remaining, 1), + downloaded=f"{downloaded / (1024 * 1024):.2f} MB" ) + # Explicitly delete the chunk to free memory immediately + del chunk + # Send completion notification send_epg_update(source.id, "downloading", 100) + # Determine the appropriate file extension based on content detection + with open(temp_download_path, 'rb') as f: + content_sample = f.read(1024) # Just need the first 1KB to detect format + + # Use our helper function to detect the format + format_type, is_compressed, file_extension = detect_file_format( + file_path=source.url, # Original URL as a hint + content=content_sample # Actual file content for detection + ) + + logger.debug(f"File format detection results: type={format_type}, compressed={is_compressed}, extension={file_extension}") + + # Ensure consistent final paths + compressed_path = os.path.join(cache_dir, f"{source.id}{file_extension}" if is_compressed else f"{source.id}.compressed") + xml_path = os.path.join(cache_dir, f"{source.id}.xml") + + # Clean up old files before saving new ones + if os.path.exists(compressed_path): + try: + os.remove(compressed_path) + logger.debug(f"Removed old compressed file: {compressed_path}") + except OSError as e: + logger.warning(f"Failed to remove old compressed file: {e}") + + if os.path.exists(xml_path): + try: + os.remove(xml_path) + logger.debug(f"Removed old XML file: {xml_path}") + except OSError as e: + logger.warning(f"Failed to remove old XML file: {e}") + + # Rename the temp file to appropriate final path + if is_compressed: + try: + os.rename(temp_download_path, compressed_path) + logger.debug(f"Renamed temp file to compressed file: {compressed_path}") + current_file_path = compressed_path + except OSError as e: + logger.error(f"Failed to rename temp file to compressed file: {e}") + current_file_path = temp_download_path # Fall back to using temp file + else: + try: + os.rename(temp_download_path, xml_path) + logger.debug(f"Renamed temp file to XML file: {xml_path}") + current_file_path = xml_path + except OSError as e: + logger.error(f"Failed to rename temp file to XML file: {e}") + current_file_path = temp_download_path # Fall back to using temp file + + # Now extract the file if it's compressed + if is_compressed: + try: + logger.info(f"Extracting compressed file {current_file_path}") + send_epg_update(source.id, "extracting", 0, message="Extracting downloaded file") + + # Always extract to the standard XML path - set delete_original to True to clean up + extracted = extract_compressed_file(current_file_path, xml_path, delete_original=True) + + if extracted: + logger.info(f"Successfully extracted to {xml_path}, compressed file deleted") + send_epg_update(source.id, "extracting", 100, message=f"File extracted successfully, temporary file removed") + # Update to store only the extracted file path since the compressed file is now gone + source.file_path = xml_path + source.extracted_file_path = None + else: + logger.error("Extraction failed, using compressed file") + send_epg_update(source.id, "extracting", 100, status="error", message="Extraction failed, using compressed file") + # Use the compressed file + source.file_path = current_file_path + source.extracted_file_path = None + except Exception as e: + logger.error(f"Error extracting file: {str(e)}", exc_info=True) + send_epg_update(source.id, "extracting", 100, status="error", message=f"Error during extraction: {str(e)}") + # Use the compressed file if extraction fails + source.file_path = current_file_path + source.extracted_file_path = None + else: + # It's already an XML file + source.file_path = current_file_path + source.extracted_file_path = None + + # Update the source's file paths + source.save(update_fields=['file_path', 'status', 'extracted_file_path']) + # Update status to parsing source.status = 'parsing' source.save(update_fields=['status']) - logger.info(f"Cached EPG file saved to {cache_file}") + logger.info(f"Cached EPG file saved to {source.file_path}") return True except requests.exceptions.HTTPError as e: @@ -424,6 +560,20 @@ def fetch_xmltv(source): ) send_epg_update(source.id, "downloading", 100, status="error", error=user_message) return False + except requests.exceptions.Timeout as e: + # Handle timeout errors specifically + error_message = str(e) + user_message = f"Timeout error: EPG source '{source.name}' took too long to respond" + logger.error(f"Timeout error fetching XMLTV from {source.name}: {e}", exc_info=True) + + # Update source status + source.status = 'error' + source.last_message = user_message + source.save(update_fields=['status', 'last_message']) + + # Send notifications + send_epg_update(source.id, "downloading", 100, status="error", error=user_message) + return False except Exception as e: error_message = str(e) logger.error(f"Error fetching XMLTV from {source.name}: {e}", exc_info=True) @@ -438,14 +588,140 @@ def fetch_xmltv(source): return False +def extract_compressed_file(file_path, output_path=None, delete_original=False): + """ + Extracts a compressed file (.gz or .zip) to an XML file. + + Args: + file_path: Path to the compressed file + output_path: Specific path where the file should be extracted (optional) + delete_original: Whether to delete the original compressed file after successful extraction + + Returns: + Path to the extracted XML file, or None if extraction failed + """ + try: + if output_path is None: + base_path = os.path.splitext(file_path)[0] + extracted_path = f"{base_path}.xml" + else: + extracted_path = output_path + + # Make sure the output path doesn't already exist + if os.path.exists(extracted_path): + try: + os.remove(extracted_path) + logger.info(f"Removed existing extracted file: {extracted_path}") + except Exception as e: + logger.warning(f"Failed to remove existing extracted file: {e}") + # If we can't delete the existing file and no specific output was requested, + # create a unique filename instead + if output_path is None: + base_path = os.path.splitext(file_path)[0] + extracted_path = f"{base_path}_{uuid.uuid4().hex[:8]}.xml" + + # Use our detection helper to determine the file format instead of relying on extension + with open(file_path, 'rb') as f: + content_sample = f.read(4096) # Read a larger sample to ensure accurate detection + + format_type, is_compressed, _ = detect_file_format(file_path=file_path, content=content_sample) + + if format_type == 'gzip': + logger.debug(f"Extracting gzip file: {file_path}") + try: + # First check if the content is XML by reading a sample + with gzip.open(file_path, 'rb') as gz_file: + content_sample = gz_file.read(4096) # Read first 4KB for detection + detected_format, _, _ = detect_file_format(content=content_sample) + + if detected_format != 'xml': + logger.warning(f"GZIP file does not appear to contain XML content: {file_path} (detected as: {detected_format})") + # Continue anyway since GZIP only contains one file + + # Reset file pointer and extract the content + gz_file.seek(0) + with open(extracted_path, 'wb') as out_file: + out_file.write(gz_file.read()) + except Exception as e: + logger.error(f"Error extracting GZIP file: {e}", exc_info=True) + return None + + logger.info(f"Successfully extracted gzip file to: {extracted_path}") + + # Delete original compressed file if requested + if delete_original: + try: + os.remove(file_path) + logger.info(f"Deleted original compressed file: {file_path}") + except Exception as e: + logger.warning(f"Failed to delete original compressed file {file_path}: {e}") + + return extracted_path + + elif format_type == 'zip': + logger.debug(f"Extracting zip file: {file_path}") + with zipfile.ZipFile(file_path, 'r') as zip_file: + # Find the first XML file in the ZIP archive + xml_files = [f for f in zip_file.namelist() if f.lower().endswith('.xml')] + + if not xml_files: + logger.info("No files with .xml extension found in ZIP archive, checking content of all files") + # Check content of each file to see if any are XML without proper extension + for filename in zip_file.namelist(): + if not filename.endswith('/'): # Skip directories + try: + # Read a sample of the file content + content_sample = zip_file.read(filename, 4096) # Read up to 4KB for detection + format_type, _, _ = detect_file_format(content=content_sample) + if format_type == 'xml': + logger.info(f"Found XML content in file without .xml extension: {filename}") + xml_files = [filename] + break + except Exception as e: + logger.warning(f"Error reading file {filename} from ZIP: {e}") + + if not xml_files: + logger.error("No XML file found in ZIP archive") + return None + + # Extract the first XML file + xml_content = zip_file.read(xml_files[0]) + with open(extracted_path, 'wb') as out_file: + out_file.write(xml_content) + + logger.info(f"Successfully extracted zip file to: {extracted_path}") + + # Delete original compressed file if requested + if delete_original: + try: + os.remove(file_path) + logger.info(f"Deleted original compressed file: {file_path}") + except Exception as e: + logger.warning(f"Failed to delete original compressed file {file_path}: {e}") + + return extracted_path + + else: + logger.error(f"Unsupported or unrecognized compressed file format: {file_path} (detected as: {format_type})") + return None + + except Exception as e: + logger.error(f"Error extracting {file_path}: {str(e)}", exc_info=True) + return None + + def parse_channels_only(source): - file_path = source.file_path + # Use extracted file if available, otherwise use the original file path + file_path = source.extracted_file_path if source.extracted_file_path else source.file_path if not file_path: file_path = source.get_cache_file() # Send initial parsing notification send_epg_update(source.id, "parsing_channels", 0) + process = None + should_log_memory = False + try: # Check if the file exists if not os.path.exists(file_path): @@ -474,98 +750,267 @@ def parse_channels_only(source): return False # Verify the file was downloaded successfully - if not os.path.exists(new_path): - logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}") + if not os.path.exists(source.file_path): + logger.error(f"Failed to fetch EPG data, file still missing at: {source.file_path}") # Update status to error source.status = 'error' source.last_message = f"Failed to fetch EPG data, file missing after download" source.save(update_fields=['status', 'last_message']) send_epg_update(source.id, "parsing_channels", 100, status="error", error="File not found after download") return False + + # Update file_path with the new location + file_path = source.file_path else: logger.error(f"No URL provided for EPG source {source.name}, cannot fetch new data") # Update status to error source.status = 'error' source.last_message = f"No URL provided, cannot fetch EPG data" - source.save(update_fields=['status', 'last_message']) - send_epg_update(source.id, "parsing_channels", 100, status="error", error="No URL provided") - return False + source.save(update_fields=['updated_at']) - file_path = new_path + # Initialize process variable for memory tracking only in debug mode + try: + process = None + # Get current log level as a number + current_log_level = logger.getEffectiveLevel() - logger.info(f"Parsing channels from EPG file: {file_path}") - existing_epgs = {e.tvg_id: e for e in EPGData.objects.filter(epg_source=source)} + # Only track memory usage when log level is DEBUG (10) or more verbose + # This is more future-proof than string comparisons + should_log_memory = current_log_level <= logging.DEBUG or settings.DEBUG - # Read entire file (decompress if .gz) - if file_path.endswith('.gz'): - with open(file_path, 'rb') as gz_file: - decompressed = gzip.decompress(gz_file.read()) - xml_data = decompressed.decode('utf-8') - else: - with open(file_path, 'r', encoding='utf-8') as xml_file: - xml_data = xml_file.read() + if should_log_memory: + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.debug(f"[parse_channels_only] Initial memory usage: {initial_memory:.2f} MB") + except (ImportError, NameError): + process = None + should_log_memory = False + logger.warning("psutil not available for memory tracking") - # Update progress to show file read completed - send_epg_update(source.id, "parsing_channels", 25) + # Replace full dictionary load with more efficient lookup set + existing_tvg_ids = set() + existing_epgs = {} # Initialize the dictionary that will lazily load objects + last_id = 0 + chunk_size = 5000 - root = ET.fromstring(xml_data) - channels = root.findall('channel') + while True: + tvg_id_chunk = set(EPGData.objects.filter( + epg_source=source, + id__gt=last_id + ).order_by('id').values_list('tvg_id', flat=True)[:chunk_size]) + if not tvg_id_chunk: + break + + existing_tvg_ids.update(tvg_id_chunk) + last_id = EPGData.objects.filter(tvg_id__in=tvg_id_chunk).order_by('-id')[0].id + # Update progress to show file read starting + send_epg_update(source.id, "parsing_channels", 10) + + # Stream parsing instead of loading entire file at once + # This can be simplified since we now always have XML files epgs_to_create = [] epgs_to_update = [] + total_channels = 0 + processed_channels = 0 + batch_size = 500 # Process in batches to limit memory usage + progress = 0 # Initialize progress variable here - logger.info(f"Found {len(channels)} entries in {file_path}") + # Track memory at key points + if process: + logger.debug(f"[parse_channels_only] Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # Update progress to show parsing started - send_epg_update(source.id, "parsing_channels", 50) + try: + # Attempt to count existing channels in the database + try: + total_channels = EPGData.objects.filter(epg_source=source).count() + logger.info(f"Found {total_channels} existing channels for this source") + except Exception as e: + logger.error(f"Error counting channels: {e}") + total_channels = 500 # Default estimate + if process: + logger.debug(f"[parse_channels_only] Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB") - total_channels = len(channels) - for i, channel_elem in enumerate(channels): - tvg_id = channel_elem.get('id', '').strip() - if not tvg_id: - continue # skip blank/invalid IDs + # Update progress after counting + send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels) - display_name = channel_elem.findtext('display-name', default=tvg_id).strip() + # Open the file - no need to check file type since it's always XML now + logger.debug(f"Opening file for channel parsing: {file_path}") + source_file = open(file_path, 'rb') - if tvg_id in existing_epgs: - epg_obj = existing_epgs[tvg_id] - if epg_obj.name != display_name: - epg_obj.name = display_name - epgs_to_update.append(epg_obj) - else: - epgs_to_create.append(EPGData( - tvg_id=tvg_id, - name=display_name, - epg_source=source, - )) + if process: + logger.debug(f"[parse_channels_only] Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # Send occasional progress updates - if i % 100 == 0 or i == total_channels - 1: - progress = 50 + int((i / total_channels) * 40) # Scale to 50-90% range - send_epg_update(source.id, "parsing_channels", progress) + # Change iterparse to look for both channel and programme elements + logger.debug(f"Creating iterparse context for channels and programmes") + channel_parser = etree.iterparse(source_file, events=('end',), tag=('channel', 'programme')) + if process: + logger.debug(f"[parse_channels_only] Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB") - # Update progress before database operations - send_epg_update(source.id, "parsing_channels", 90) + channel_count = 0 + total_elements_processed = 0 # Track total elements processed, not just channels + for _, elem in channel_parser: + total_elements_processed += 1 + # Only process channel elements + if elem.tag == 'channel': + channel_count += 1 + tvg_id = elem.get('id', '').strip() + if tvg_id: + display_name = None + for child in elem: + if child.tag == 'display-name' and child.text: + display_name = child.text.strip() + break + + if not display_name: + display_name = tvg_id + + # Use lazy loading approach to reduce memory usage + if tvg_id in existing_tvg_ids: + # Only fetch the object if we need to update it and it hasn't been loaded yet + if tvg_id not in existing_epgs: + try: + # This loads the full EPG object from the database and caches it + existing_epgs[tvg_id] = EPGData.objects.get(tvg_id=tvg_id, epg_source=source) + except EPGData.DoesNotExist: + # Handle race condition where record was deleted + existing_tvg_ids.remove(tvg_id) + epgs_to_create.append(EPGData( + tvg_id=tvg_id, + name=display_name, + epg_source=source, + )) + logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 1: {tvg_id} - {display_name}") + processed_channels += 1 + continue + + # We use the cached object to check if the name has changed + epg_obj = existing_epgs[tvg_id] + if epg_obj.name != display_name: + # Only update if the name actually changed + epg_obj.name = display_name + epgs_to_update.append(epg_obj) + logger.debug(f"[parse_channels_only] Added channel to update to epgs_to_update: {tvg_id} - {display_name}") + else: + # No changes needed, just clear the element + logger.debug(f"[parse_channels_only] No changes needed for channel {tvg_id} - {display_name}") + else: + # This is a new channel that doesn't exist in our database + epgs_to_create.append(EPGData( + tvg_id=tvg_id, + name=display_name, + epg_source=source, + )) + logger.debug(f"[parse_channels_only] Added new channel to epgs_to_create 2: {tvg_id} - {display_name}") + + processed_channels += 1 + + # Batch processing + if len(epgs_to_create) >= batch_size: + logger.info(f"[parse_channels_only] Bulk creating {len(epgs_to_create)} EPG entries") + EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) + if process: + logger.info(f"[parse_channels_only] Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") + del epgs_to_create # Explicit deletion + epgs_to_create = [] + cleanup_memory(log_usage=should_log_memory, force_collection=True) + if process: + logger.info(f"[parse_channels_only] Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB") + + if len(epgs_to_update) >= batch_size: + logger.info(f"[parse_channels_only] Bulk updating {len(epgs_to_update)} EPG entries") + if process: + logger.info(f"[parse_channels_only] Memory before bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB") + EPGData.objects.bulk_update(epgs_to_update, ["name"]) + if process: + logger.info(f"[parse_channels_only] Memory after bulk_update: {process.memory_info().rss / 1024 / 1024:.2f} MB") + epgs_to_update = [] + # Force garbage collection + cleanup_memory(log_usage=should_log_memory, force_collection=True) + + # Periodically clear the existing_epgs cache to prevent memory buildup + if processed_channels % 1000 == 0: + logger.info(f"[parse_channels_only] Clearing existing_epgs cache at {processed_channels} channels") + existing_epgs.clear() + cleanup_memory(log_usage=should_log_memory, force_collection=True) + if process: + logger.info(f"[parse_channels_only] Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + # Send progress updates + if processed_channels % 100 == 0 or processed_channels == total_channels: + progress = 25 + int((processed_channels / total_channels) * 65) if total_channels > 0 else 90 + send_epg_update( + source.id, + "parsing_channels", + progress, + processed=processed_channels, + total=total_channels + ) + if processed_channels > total_channels: + logger.debug(f"[parse_channels_only] Processed channel {tvg_id} - processed {processed_channels - total_channels} additional channels") + else: + logger.debug(f"[parse_channels_only] Processed channel {tvg_id} - processed {processed_channels}/{total_channels}") + if process: + logger.debug(f"[parse_channels_only] Memory before elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + # Clear memory + try: + # First clear the element's content + clear_element(elem) + + except Exception as e: + # Just log the error and continue - don't let cleanup errors stop processing + logger.debug(f"[parse_channels_only] Non-critical error during XML element cleanup: {e}") + if process: + logger.debug(f"[parse_channels_only] Memory after elem cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + logger.debug(f"[parse_channels_only] Total elements processed: {total_elements_processed}") + + else: + clear_element(elem) + continue + + except (etree.XMLSyntaxError, Exception) as xml_error: + logger.error(f"[parse_channels_only] XML parsing failed: {xml_error}") + # Update status to error + source.status = 'error' + source.last_message = f"Error parsing XML file: {str(xml_error)}" + source.save(update_fields=['status', 'last_message']) + send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(xml_error)) + return False + if process: + logger.info(f"[parse_channels_only] Processed {processed_channels} channels current memory: {process.memory_info().rss / 1024 / 1024:.2f} MB") + else: + logger.info(f"[parse_channels_only] Processed {processed_channels} channels") + # Process any remaining items if epgs_to_create: EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) + logger.debug(f"[parse_channels_only] Created final batch of {len(epgs_to_create)} EPG entries") if epgs_to_update: EPGData.objects.bulk_update(epgs_to_update, ["name"]) + logger.debug(f"[parse_channels_only] Updated final batch of {len(epgs_to_update)} EPG entries") + if process: + logger.debug(f"[parse_channels_only] Memory after final batch creation: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + # Update source status with channel count + source.status = 'success' + source.last_message = f"Successfully parsed {processed_channels} channels" + source.save(update_fields=['status', 'last_message']) # Send completion notification - send_epg_update(source.id, "parsing_channels", 100, status="success") - - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": {"success": True, "type": "epg_channels"} - } + send_epg_update( + source.id, + "parsing_channels", + 100, + status="success", + channels_count=processed_channels ) - logger.info("Finished parsing channel info.") + send_websocket_update('updates', 'update', {"success": True, "type": "epg_channels"}) + + logger.info(f"Finished parsing channel info. Found {processed_channels} channels.") + return True except FileNotFoundError: @@ -584,6 +1029,40 @@ def parse_channels_only(source): source.save(update_fields=['status', 'last_message']) send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(e)) return False + finally: + # Cleanup memory and close file + if process: + logger.debug(f"[parse_channels_only] Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + try: + if 'channel_parser' in locals(): + del channel_parser + if 'elem' in locals(): + del elem + if 'parent' in locals(): + del parent + + if 'source_file' in locals(): + source_file.close() + del source_file + # Clear remaining large data structures + existing_epgs.clear() + epgs_to_create.clear() + epgs_to_update.clear() + existing_epgs = None + epgs_to_create = None + epgs_to_update = None + cleanup_memory(log_usage=should_log_memory, force_collection=True) + except Exception as e: + logger.warning(f"Cleanup error: {e}") + + try: + if process: + final_memory = process.memory_info().rss / 1024 / 1024 + logger.debug(f"[parse_channels_only] Final memory usage: {final_memory:.2f} MB") + process = None + except: + pass + @shared_task @@ -592,211 +1071,294 @@ def parse_programs_for_tvg_id(epg_id): logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task") return "Task already running" - epg = EPGData.objects.get(id=epg_id) - epg_source = epg.epg_source + source_file = None + program_parser = None + programs_to_create = [] + programs_processed = 0 + try: + # Add memory tracking only in trace mode or higher + try: + process = None + # Get current log level as a number + current_log_level = logger.getEffectiveLevel() - if not Channel.objects.filter(epg_data=epg).exists(): - logger.info(f"No channels matched to EPG {epg.tvg_id}") - release_task_lock('parse_epg_programs', epg_id) - return + # Only track memory usage when log level is TRACE or more verbose or if running in DEBUG mode + should_log_memory = current_log_level <= 5 or settings.DEBUG - logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}") + if should_log_memory: + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Initial memory usage: {initial_memory:.2f} MB") + mem_before = initial_memory + except ImportError: + process = None + should_log_memory = False - # First, remove all existing programs - ProgramData.objects.filter(epg=epg).delete() + epg = EPGData.objects.get(id=epg_id) + epg_source = epg.epg_source - file_path = epg_source.file_path - if not file_path: - file_path = epg_source.get_cache_file() - - # Check if the file exists - if not os.path.exists(file_path): - logger.error(f"EPG file not found at: {file_path}") - - # Update the file path in the database - new_path = epg_source.get_cache_file() - logger.info(f"Updating file_path from '{file_path}' to '{new_path}'") - epg_source.file_path = new_path - epg_source.save(update_fields=['file_path']) - - # Fetch new data before continuing - if epg_source.url: - logger.info(f"Fetching new EPG data from URL: {epg_source.url}") - # Properly check the return value from fetch_xmltv - fetch_success = fetch_xmltv(epg_source) - - # If fetch was not successful or the file still doesn't exist, abort - if not fetch_success: - logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}") - # Update status to error if not already set - epg_source.status = 'error' - epg_source.last_message = f"Failed to download EPG data, cannot parse programs" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file") - release_task_lock('parse_epg_programs', epg_id) - return - - # Also check if the file exists after download - if not os.path.exists(new_path): - logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}") - epg_source.status = 'error' - epg_source.last_message = f"Failed to download EPG data, file missing after download" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download") - release_task_lock('parse_epg_programs', epg_id) - return - else: - logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data") - # Update status to error - epg_source.status = 'error' - epg_source.last_message = f"No URL provided, cannot fetch EPG data" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided") + if not Channel.objects.filter(epg_data=epg).exists(): + logger.info(f"No channels matched to EPG {epg.tvg_id}") release_task_lock('parse_epg_programs', epg_id) return - file_path = new_path + logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}") - # Read entire file (decompress if .gz) - try: - if file_path.endswith('.gz'): - with open(file_path, 'rb') as gz_file: - decompressed = gzip.decompress(gz_file.read()) - xml_data = decompressed.decode('utf-8') - else: - with open(file_path, 'r', encoding='utf-8') as xml_file: - xml_data = xml_file.read() - except FileNotFoundError: - logger.error(f"EPG file not found at: {file_path}") - release_task_lock('parse_epg_programs', epg_id) - return - except Exception as e: - logger.error(f"Error reading EPG file {file_path}: {e}", exc_info=True) - release_task_lock('parse_epg_programs', epg_id) - return + # Optimize deletion with a single delete query instead of chunking + # This is faster for most database engines + ProgramData.objects.filter(epg=epg).delete() - root = ET.fromstring(xml_data) + file_path = epg_source.extracted_file_path if epg_source.extracted_file_path else epg_source.file_path + if not file_path: + file_path = epg_source.get_cache_file() - # Find only elements for this tvg_id - matched_programmes = [p for p in root.findall('programme') if p.get('channel') == epg.tvg_id] - logger.debug(f"Found {len(matched_programmes)} programmes for tvg_id={epg.tvg_id}") + # Check if the file exists + if not os.path.exists(file_path): + logger.error(f"EPG file not found at: {file_path}") - programs_to_create = [] - for prog in matched_programmes: - start_time = parse_xmltv_time(prog.get('start')) - end_time = parse_xmltv_time(prog.get('stop')) - title = prog.findtext('title', default='No Title') - desc = prog.findtext('desc', default='') - sub_title = prog.findtext('sub-title', default='') + if epg_source.url: + # Update the file path in the database + new_path = epg_source.get_cache_file() + logger.info(f"Updating file_path from '{file_path}' to '{new_path}'") + epg_source.file_path = new_path + epg_source.save(update_fields=['file_path']) + logger.info(f"Fetching new EPG data from URL: {epg_source.url}") + else: + logger.info(f"EPG source does not have a URL, using existing file path: {file_path} to rebuild cache") - # Extract custom properties - custom_props = {} + # Fetch new data before continuing + if epg_source: - # Extract categories - categories = [] - for cat_elem in prog.findall('category'): - if cat_elem.text and cat_elem.text.strip(): - categories.append(cat_elem.text.strip()) - if categories: - custom_props['categories'] = categories + # Properly check the return value from fetch_xmltv + fetch_success = fetch_xmltv(epg_source) - # Extract episode numbers - for ep_num in prog.findall('episode-num'): - system = ep_num.get('system', '') - if system == 'xmltv_ns' and ep_num.text: - # Parse XMLTV episode-num format (season.episode.part) - parts = ep_num.text.split('.') - if len(parts) >= 2: - if parts[0].strip() != '': - try: - season = int(parts[0]) + 1 # XMLTV format is zero-based - custom_props['season'] = season - except ValueError: - pass - if parts[1].strip() != '': - try: - episode = int(parts[1]) + 1 # XMLTV format is zero-based - custom_props['episode'] = episode - except ValueError: - pass - elif system == 'onscreen' and ep_num.text: - # Just store the raw onscreen format - custom_props['onscreen_episode'] = ep_num.text.strip() + # If fetch was not successful or the file still doesn't exist, abort + if not fetch_success: + logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}") + # Update status to error if not already set + epg_source.status = 'error' + epg_source.last_message = f"Failed to download EPG data, cannot parse programs" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file") + release_task_lock('parse_epg_programs', epg_id) + return - # Extract ratings - for rating_elem in prog.findall('rating'): - if rating_elem.findtext('value'): - custom_props['rating'] = rating_elem.findtext('value').strip() - if rating_elem.get('system'): - custom_props['rating_system'] = rating_elem.get('system') - break # Just use the first rating + # Also check if the file exists after download + if not os.path.exists(epg_source.file_path): + logger.error(f"Failed to fetch EPG data, file still missing at: {epg_source.file_path}") + epg_source.status = 'error' + epg_source.last_message = f"Failed to download EPG data, file missing after download" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download") + release_task_lock('parse_epg_programs', epg_id) + return - # Extract credits (actors, directors, etc.) - credits_elem = prog.find('credits') - if credits_elem is not None: - credits = {} - for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']: - elements = credits_elem.findall(credit_type) - if elements: - names = [e.text.strip() for e in elements if e.text and e.text.strip()] - if names: - credits[credit_type] = names - if credits: - custom_props['credits'] = credits + # Update file_path with the new location + if epg_source.extracted_file_path: + file_path = epg_source.extracted_file_path + else: + file_path = epg_source.file_path + else: + logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data") + # Update status to error + epg_source.status = 'error' + epg_source.last_message = f"No URL provided, cannot fetch EPG data" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided") + release_task_lock('parse_epg_programs', epg_id) + return - # Extract other common program metadata - if prog.findtext('date'): - custom_props['year'] = prog.findtext('date').strip()[:4] # Just the year part + # Use streaming parsing to reduce memory usage + # No need to check file type anymore since it's always XML + logger.debug(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}") - if prog.findtext('country'): - custom_props['country'] = prog.findtext('country').strip() - - for icon_elem in prog.findall('icon'): - if icon_elem.get('src'): - custom_props['icon'] = icon_elem.get('src') - break # Just use the first icon - - for kw in ['previously-shown', 'premiere', 'new']: - if prog.find(kw) is not None: - custom_props[kw.replace('-', '_')] = True - - # Convert custom_props to JSON string if not empty - custom_properties_json = None - if custom_props: - import json + # Memory usage tracking + if process: try: - custom_properties_json = json.dumps(custom_props) + mem_before = process.memory_info().rss / 1024 / 1024 + logger.debug(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB") except Exception as e: - logger.error(f"Error serializing custom properties to JSON: {e}", exc_info=True) + logger.warning(f"Error tracking memory: {e}") + mem_before = 0 - programs_to_create.append(ProgramData( - epg=epg, - start_time=start_time, - end_time=end_time, - title=title, - description=desc, - sub_title=sub_title, - tvg_id=epg.tvg_id, - custom_properties=custom_properties_json - )) + programs_to_create = [] + batch_size = 1000 # Process in batches to limit memory usage - ProgramData.objects.bulk_create(programs_to_create) + try: + # Open the file directly - no need to check compression + logger.debug(f"Opening file for parsing: {file_path}") + source_file = open(file_path, 'rb') - release_task_lock('parse_epg_programs', epg_id) + # Stream parse the file using lxml's iterparse + program_parser = etree.iterparse(source_file, events=('end',), tag='programme') + + for _, elem in program_parser: + if elem.get('channel') == epg.tvg_id: + try: + start_time = parse_xmltv_time(elem.get('start')) + end_time = parse_xmltv_time(elem.get('stop')) + title = None + desc = None + sub_title = None + + # Efficiently process child elements + for child in elem: + if child.tag == 'title': + title = child.text or 'No Title' + elif child.tag == 'desc': + desc = child.text or '' + elif child.tag == 'sub-title': + sub_title = child.text or '' + + if not title: + title = 'No Title' + + # Extract custom properties + custom_props = extract_custom_properties(elem) + custom_properties_json = None + + if custom_props: + logger.trace(f"Number of custom properties: {len(custom_props)}") + try: + custom_properties_json = json.dumps(custom_props) + except Exception as e: + logger.error(f"Error serializing custom properties to JSON: {e}", exc_info=True) + + programs_to_create.append(ProgramData( + epg=epg, + start_time=start_time, + end_time=end_time, + title=title, + description=desc, + sub_title=sub_title, + tvg_id=epg.tvg_id, + custom_properties=custom_properties_json + )) + programs_processed += 1 + # Clear the element to free memory + clear_element(elem) + # Batch processing + if len(programs_to_create) >= batch_size: + ProgramData.objects.bulk_create(programs_to_create) + logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}") + programs_to_create = [] + # Only call gc.collect() every few batches + if programs_processed % (batch_size * 5) == 0: + gc.collect() + + except Exception as e: + logger.error(f"Error processing program for {epg.tvg_id}: {e}", exc_info=True) + else: + # Immediately clean up non-matching elements to reduce memory pressure + if elem is not None: + clear_element(elem) + continue + + # Make sure to close the file and release parser resources + if source_file: + source_file.close() + source_file = None + + if program_parser: + program_parser = None + + gc.collect() + + except zipfile.BadZipFile as zip_error: + logger.error(f"Bad ZIP file: {zip_error}") + raise + except etree.XMLSyntaxError as xml_error: + logger.error(f"XML syntax error parsing program data: {xml_error}") + raise + except Exception as e: + logger.error(f"Error parsing XML for programs: {e}", exc_info=True) + raise + finally: + # Ensure file is closed even if an exception occurs + if source_file: + source_file.close() + source_file = None + # Memory tracking after processing + if process: + try: + mem_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 1 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") + except Exception as e: + logger.warning(f"Error tracking memory: {e}") + + # Process any remaining items + if programs_to_create: + ProgramData.objects.bulk_create(programs_to_create) + logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}") + programs_to_create = None + custom_props = None + custom_properties_json = None + + + logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") + finally: + # Reset internal caches and pools that lxml might be keeping + try: + etree.clear_error_log() + except: + pass + # Explicit cleanup of all potentially large objects + if source_file: + try: + source_file.close() + except: + pass + source_file = None + program_parser = None + programs_to_create = None + + epg_source = None + # Add comprehensive cleanup before releasing lock + cleanup_memory(log_usage=should_log_memory, force_collection=True) + # Memory tracking after processing + if process: + try: + mem_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Final memory usage {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") + except Exception as e: + logger.warning(f"Error tracking memory: {e}") + process = None + epg = None + programs_processed = None + release_task_lock('parse_epg_programs', epg_id) - logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") def parse_programs_for_source(epg_source, tvg_id=None): # Send initial programs parsing notification send_epg_update(epg_source.id, "parsing_programs", 0) + should_log_memory = False + process = None + initial_memory = 0 + + # Add memory tracking only in trace mode or higher + try: + # Get current log level as a number + current_log_level = logger.getEffectiveLevel() + + # Only track memory usage when log level is TRACE or more verbose + should_log_memory = current_log_level <= 5 or settings.DEBUG # Assuming TRACE is level 5 or lower + + if should_log_memory: + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_source] Initial memory usage: {initial_memory:.2f} MB") + except ImportError: + logger.warning("psutil not available for memory tracking") + process = None + should_log_memory = False try: - epg_entries = EPGData.objects.filter(epg_source=epg_source) - total_entries = epg_entries.count() - processed = 0 + # Process EPG entries in batches rather than all at once + batch_size = 20 # Process fewer channels at once to reduce memory usage + epg_count = EPGData.objects.filter(epg_source=epg_source).count() - if total_entries == 0: + if epg_count == 0: logger.info(f"No EPG entries found for source: {epg_source.name}") # Update status - this is not an error, just no entries epg_source.status = 'success' @@ -804,31 +1366,51 @@ def parse_programs_for_source(epg_source, tvg_id=None): send_epg_update(epg_source.id, "parsing_programs", 100, status="success") return True - logger.info(f"Parsing programs for {total_entries} EPG entries from source: {epg_source.name}") + logger.info(f"Parsing programs for {epg_count} EPG entries from source: {epg_source.name}") failed_entries = [] program_count = 0 channel_count = 0 updated_count = 0 + processed = 0 + # Process in batches using cursor-based approach to limit memory usage + last_id = 0 + while True: + # Get a batch of EPG entries + batch_entries = list(EPGData.objects.filter( + epg_source=epg_source, + id__gt=last_id + ).order_by('id')[:batch_size]) - for epg in epg_entries: - if epg.tvg_id: - try: - result = parse_programs_for_tvg_id(epg.id) - if result == "Task already running": - logger.info(f"Program parse for {epg.id} already in progress, skipping") + if not batch_entries: + break # No more entries to process - processed += 1 - progress = min(95, int((processed / total_entries) * 100)) if total_entries > 0 else 50 - send_epg_update(epg_source.id, "parsing_programs", progress) - except Exception as e: - logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True) - failed_entries.append(f"{epg.tvg_id}: {str(e)}") + # Update last_id for next iteration + last_id = batch_entries[-1].id + + # Process this batch + for epg in batch_entries: + if epg.tvg_id: + try: + result = parse_programs_for_tvg_id(epg.id) + if result == "Task already running": + logger.info(f"Program parse for {epg.id} already in progress, skipping") + + processed += 1 + progress = min(95, int((processed / epg_count) * 100)) if epg_count > 0 else 50 + send_epg_update(epg_source.id, "parsing_programs", progress) + except Exception as e: + logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True) + failed_entries.append(f"{epg.tvg_id}: {str(e)}") + + # Force garbage collection after each batch + batch_entries = None # Remove reference to help garbage collection + gc.collect() # If there were failures, include them in the message but continue if failed_entries: epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed - error_summary = f"Failed to parse {len(failed_entries)} of {total_entries} entries" + error_summary = f"Failed to parse {len(failed_entries)} of {epg_count} entries" stats_summary = f"Processed {program_count} programs across {channel_count} channels. Updated: {updated_count}." epg_source.last_message = f"{stats_summary} Warning: {error_summary}" epg_source.updated_at = timezone.now() @@ -838,6 +1420,11 @@ def parse_programs_for_source(epg_source, tvg_id=None): send_epg_update(epg_source.id, "parsing_programs", 100, status="success", message=epg_source.last_message) + + # Explicitly release memory of large lists before returning + del failed_entries + gc.collect() + return True # If all successful, set a comprehensive success message @@ -864,8 +1451,25 @@ def parse_programs_for_source(epg_source, tvg_id=None): status="error", message=epg_source.last_message) return False + finally: + # Final memory cleanup and tracking + # Explicitly release any remaining large data structures + failed_entries = None + program_count = None + channel_count = None + updated_count = None + processed = None + gc.collect() + + # Add comprehensive memory cleanup at the end + cleanup_memory(log_usage=should_log_memory, force_collection=True) + if process: + final_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB difference: {final_memory - initial_memory:.2f} MB") + # Explicitly clear the process object to prevent potential memory leaks + process = None def fetch_schedules_direct(source): logger.info(f"Fetching Schedules Direct data from source: {source.name}") try: @@ -941,17 +1545,42 @@ def fetch_schedules_direct(source): # ------------------------------- def parse_xmltv_time(time_str): try: + # Basic format validation + if len(time_str) < 14: + logger.warning(f"XMLTV timestamp too short: '{time_str}', using as-is") + dt_obj = datetime.strptime(time_str, '%Y%m%d%H%M%S') + return timezone.make_aware(dt_obj, timezone=dt_timezone.utc) + + # Parse base datetime dt_obj = datetime.strptime(time_str[:14], '%Y%m%d%H%M%S') - tz_sign = time_str[15] - tz_hours = int(time_str[16:18]) - tz_minutes = int(time_str[18:20]) - if tz_sign == '+': - dt_obj = dt_obj - timedelta(hours=tz_hours, minutes=tz_minutes) - elif tz_sign == '-': - dt_obj = dt_obj + timedelta(hours=tz_hours, minutes=tz_minutes) - aware_dt = timezone.make_aware(dt_obj, timezone=dt_timezone.utc) - logger.debug(f"Parsed XMLTV time '{time_str}' to {aware_dt}") - return aware_dt + + # Handle timezone if present + if len(time_str) >= 20: # Has timezone info + tz_sign = time_str[15] + tz_hours = int(time_str[16:18]) + tz_minutes = int(time_str[18:20]) + + # Create a timezone object + if tz_sign == '+': + tz_offset = dt_timezone(timedelta(hours=tz_hours, minutes=tz_minutes)) + elif tz_sign == '-': + tz_offset = dt_timezone(timedelta(hours=-tz_hours, minutes=-tz_minutes)) + else: + tz_offset = dt_timezone.utc + + # Make datetime aware with correct timezone + aware_dt = datetime.replace(dt_obj, tzinfo=tz_offset) + # Convert to UTC + aware_dt = aware_dt.astimezone(dt_timezone.utc) + + logger.trace(f"Parsed XMLTV time '{time_str}' to {aware_dt}") + return aware_dt + else: + # No timezone info, assume UTC + aware_dt = timezone.make_aware(dt_obj, timezone=dt_timezone.utc) + logger.trace(f"Parsed XMLTV time without timezone '{time_str}' as UTC: {aware_dt}") + return aware_dt + except Exception as e: logger.error(f"Error parsing XMLTV time '{time_str}': {e}", exc_info=True) raise @@ -966,3 +1595,156 @@ def parse_schedules_direct_time(time_str): except Exception as e: logger.error(f"Error parsing Schedules Direct time '{time_str}': {e}", exc_info=True) raise + + +# Helper function to extract custom properties - moved to a separate function to clean up the code +def extract_custom_properties(prog): + # Create a new dictionary for each call + custom_props = {} + + # Extract categories with a single comprehension to reduce intermediate objects + categories = [cat.text.strip() for cat in prog.findall('category') if cat.text and cat.text.strip()] + if categories: + custom_props['categories'] = categories + + # Extract episode numbers + for ep_num in prog.findall('episode-num'): + system = ep_num.get('system', '') + if system == 'xmltv_ns' and ep_num.text: + # Parse XMLTV episode-num format (season.episode.part) + parts = ep_num.text.split('.') + if len(parts) >= 2: + if parts[0].strip() != '': + try: + season = int(parts[0]) + 1 # XMLTV format is zero-based + custom_props['season'] = season + except ValueError: + pass + if parts[1].strip() != '': + try: + episode = int(parts[1]) + 1 # XMLTV format is zero-based + custom_props['episode'] = episode + except ValueError: + pass + elif system == 'onscreen' and ep_num.text: + # Just store the raw onscreen format + custom_props['onscreen_episode'] = ep_num.text.strip() + + # Extract ratings more efficiently + rating_elem = prog.find('rating') + if rating_elem is not None: + value_elem = rating_elem.find('value') + if value_elem is not None and value_elem.text: + custom_props['rating'] = value_elem.text.strip() + if rating_elem.get('system'): + custom_props['rating_system'] = rating_elem.get('system') + + # Extract credits more efficiently + credits_elem = prog.find('credits') + if credits_elem is not None: + credits = {} + for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']: + names = [e.text.strip() for e in credits_elem.findall(credit_type) if e.text and e.text.strip()] + if names: + credits[credit_type] = names + if credits: + custom_props['credits'] = credits + + # Extract other common program metadata + date_elem = prog.find('date') + if date_elem is not None and date_elem.text: + custom_props['year'] = date_elem.text.strip()[:4] # Just the year part + + country_elem = prog.find('country') + if country_elem is not None and country_elem.text: + custom_props['country'] = country_elem.text.strip() + + icon_elem = prog.find('icon') + if icon_elem is not None and icon_elem.get('src'): + custom_props['icon'] = icon_elem.get('src') + + # Simpler approach for boolean flags + for kw in ['previously-shown', 'premiere', 'new']: + if prog.find(kw) is not None: + custom_props[kw.replace('-', '_')] = True + + return custom_props + +def clear_element(elem): + """Clear an XML element and its parent to free memory.""" + try: + elem.clear() + parent = elem.getparent() + if parent is not None: + while elem.getprevious() is not None: + del parent[0] + parent.remove(elem) + except Exception as e: + logger.warning(f"Error clearing XML element: {e}", exc_info=True) + + +def detect_file_format(file_path=None, content=None): + """ + Detect file format by examining content or file path. + + Args: + file_path: Path to file (optional) + content: Raw file content bytes (optional) + + Returns: + tuple: (format_type, is_compressed, file_extension) + format_type: 'gzip', 'zip', 'xml', or 'unknown' + is_compressed: Boolean indicating if the file is compressed + file_extension: Appropriate file extension including dot (.gz, .zip, .xml) + """ + # Default return values + format_type = 'unknown' + is_compressed = False + file_extension = '.tmp' + + # First priority: check content magic numbers as they're most reliable + if content: + # We only need the first few bytes for magic number detection + header = content[:20] if len(content) >= 20 else content + + # Check for gzip magic number (1f 8b) + if len(header) >= 2 and header[:2] == b'\x1f\x8b': + return 'gzip', True, '.gz' + + # Check for zip magic number (PK..) + if len(header) >= 2 and header[:2] == b'PK': + return 'zip', True, '.zip' + + # Check for XML - either standard XML header or XMLTV-specific tag + if len(header) >= 5 and (b'' in header): + return 'xml', False, '.xml' + + # Second priority: check file extension - focus on the final extension for compression + if file_path: + logger.debug(f"Detecting file format for: {file_path}") + + # Handle compound extensions like .xml.gz - prioritize compression extensions + lower_path = file_path.lower() + + # Check for compression extensions explicitly + if lower_path.endswith('.gz') or lower_path.endswith('.gzip'): + return 'gzip', True, '.gz' + elif lower_path.endswith('.zip'): + return 'zip', True, '.zip' + elif lower_path.endswith('.xml'): + return 'xml', False, '.xml' + + # Fallback to mimetypes only if direct extension check doesn't work + import mimetypes + mime_type, _ = mimetypes.guess_type(file_path) + logger.debug(f"Guessed MIME type: {mime_type}") + if mime_type: + if mime_type == 'application/gzip' or mime_type == 'application/x-gzip': + return 'gzip', True, '.gz' + elif mime_type == 'application/zip': + return 'zip', True, '.zip' + elif mime_type == 'application/xml' or mime_type == 'text/xml': + return 'xml', False, '.xml' + + # If we reach here, we couldn't reliably determine the format + return format_type, is_compressed, file_extension diff --git a/apps/hdhr/api_views.py b/apps/hdhr/api_views.py index 676c0fb9..278efc36 100644 --- a/apps/hdhr/api_views.py +++ b/apps/hdhr/api_views.py @@ -84,13 +84,19 @@ class DiscoverAPIView(APIView): logger.debug(f"Calculated tuner count: {tuner_count} (limited profiles: {limited_tuners}, custom streams: {custom_stream_count}, unlimited: {has_unlimited})") + # Create a unique DeviceID for the HDHomeRun device based on profile ID or a default value + device_ID = "12345678" # Default DeviceID + friendly_name = "Dispatcharr HDHomeRun" + if profile is not None: + device_ID = f"dispatcharr-hdhr-{profile}" + friendly_name = f"Dispatcharr HDHomeRun - {profile}" if not device: data = { - "FriendlyName": "Dispatcharr HDHomeRun", + "FriendlyName": friendly_name, "ModelNumber": "HDTC-2US", "FirmwareName": "hdhomerun3_atsc", "FirmwareVersion": "20200101", - "DeviceID": "12345678", + "DeviceID": device_ID, "DeviceAuth": "test_auth_token", "BaseURL": base_url, "LineupURL": f"{base_url}/lineup.json", @@ -129,16 +135,24 @@ class LineupAPIView(APIView): else: channels = Channel.objects.all().order_by('channel_number') - lineup = [ - { - "GuideNumber": str(ch.channel_number), + lineup = [] + for ch in channels: + # Format channel number as integer if it has no decimal component + if ch.channel_number is not None: + if ch.channel_number == int(ch.channel_number): + formatted_channel_number = str(int(ch.channel_number)) + else: + formatted_channel_number = str(ch.channel_number) + else: + formatted_channel_number = "" + + lineup.append({ + "GuideNumber": formatted_channel_number, "GuideName": ch.name, "URL": request.build_absolute_uri(f"/proxy/ts/stream/{ch.uuid}"), - "Guide_ID": str(ch.channel_number), - "Station": str(ch.channel_number), - } - for ch in channels - ] + "Guide_ID": formatted_channel_number, + "Station": formatted_channel_number, + }) return JsonResponse(lineup, safe=False) diff --git a/apps/m3u/api_views.py b/apps/m3u/api_views.py index 6176a0ca..daac92b1 100644 --- a/apps/m3u/api_views.py +++ b/apps/m3u/api_views.py @@ -51,7 +51,15 @@ class M3UAccountViewSet(viewsets.ModelViewSet): # Add file_path to the request data so it's available during creation request.data._mutable = True # Allow modification of the request data request.data['file_path'] = file_path # Include the file path if a file was uploaded - request.data.pop('server_url') + + # Handle the user_agent field - convert "null" string to None + if 'user_agent' in request.data and request.data['user_agent'] == 'null': + request.data['user_agent'] = None + + # Handle server_url appropriately + if 'server_url' in request.data and not request.data['server_url']: + request.data.pop('server_url') + request.data._mutable = False # Make the request data immutable again # Now call super().create() to create the instance @@ -82,16 +90,24 @@ class M3UAccountViewSet(viewsets.ModelViewSet): # Add file_path to the request data so it's available during creation request.data._mutable = True # Allow modification of the request data request.data['file_path'] = file_path # Include the file path if a file was uploaded - request.data.pop('server_url') + + # Handle the user_agent field - convert "null" string to None + if 'user_agent' in request.data and request.data['user_agent'] == 'null': + request.data['user_agent'] = None + + # Handle server_url appropriately + if 'server_url' in request.data and not request.data['server_url']: + request.data.pop('server_url') + request.data._mutable = False # Make the request data immutable again if instance.file_path and os.path.exists(instance.file_path): os.remove(instance.file_path) - # Now call super().create() to create the instance + # Now call super().update() to update the instance response = super().update(request, *args, **kwargs) - # After the instance is created, return the response + # After the instance is updated, return the response return response def partial_update(self, request, *args, **kwargs): diff --git a/apps/m3u/migrations/0012_alter_m3uaccount_refresh_interval.py b/apps/m3u/migrations/0012_alter_m3uaccount_refresh_interval.py new file mode 100644 index 00000000..7045810e --- /dev/null +++ b/apps/m3u/migrations/0012_alter_m3uaccount_refresh_interval.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.6 on 2025-05-21 19:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('m3u', '0011_alter_m3uaccount_status'), + ] + + operations = [ + migrations.AlterField( + model_name='m3uaccount', + name='refresh_interval', + field=models.IntegerField(default=0), + ), + ] diff --git a/apps/m3u/models.py b/apps/m3u/models.py index 503ac3da..a297fd18 100644 --- a/apps/m3u/models.py +++ b/apps/m3u/models.py @@ -96,7 +96,7 @@ class M3UAccount(models.Model): username = models.CharField(max_length=255, null=True, blank=True) password = models.CharField(max_length=255, null=True, blank=True) custom_properties = models.TextField(null=True, blank=True) - refresh_interval = models.IntegerField(default=24) + refresh_interval = models.IntegerField(default=0) refresh_task = models.ForeignKey( PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True ) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index f6b1044c..b1b1170d 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -22,11 +22,11 @@ from core.utils import RedisClient, acquire_task_lock, release_task_lock from core.models import CoreSettings, UserAgent from asgiref.sync import async_to_sync from core.xtream_codes import Client as XCClient +from core.utils import send_websocket_update logger = logging.getLogger(__name__) BATCH_SIZE = 1000 -SKIP_EXTS = {} m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u") def fetch_m3u_lines(account, use_cache=False): @@ -200,11 +200,6 @@ def parse_extinf_line(line: str) -> dict: 'name': name } -import re -import logging - -logger = logging.getLogger(__name__) - def _matches_filters(stream_name: str, group_name: str, filters): """Check if a stream or group name matches a precompiled regex filter.""" compiled_filters = [(re.compile(f.regex_pattern, re.IGNORECASE), f.exclude) for f in filters] @@ -245,7 +240,7 @@ def process_groups(account, groups): groups_to_create = [] for group_name, custom_props in groups.items(): logger.debug(f"Handling group: {group_name}") - if (group_name not in existing_groups) and (group_name not in SKIP_EXTS): + if (group_name not in existing_groups): groups_to_create.append(ChannelGroup( name=group_name, )) @@ -495,8 +490,9 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." # Aggressive garbage collection - del streams_to_create, streams_to_update, stream_hashes, existing_streams - gc.collect() + #del streams_to_create, streams_to_update, stream_hashes, existing_streams + #from core.utils import cleanup_memory + #cleanup_memory(log_usage=True, force_collection=True) return retval @@ -695,25 +691,68 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False): release_task_lock('refresh_m3u_account_groups', account_id) return f"Failed to fetch M3U data for account_id={account_id}.", None - for line in lines: + # Log basic file structure for debugging + logger.debug(f"Processing {len(lines)} lines from M3U file") + + line_count = 0 + extinf_count = 0 + url_count = 0 + valid_stream_count = 0 + problematic_lines = [] + + for line_index, line in enumerate(lines): + line_count += 1 line = line.strip() + if line.startswith("#EXTINF"): + extinf_count += 1 parsed = parse_extinf_line(line) if parsed: if "group-title" in parsed["attributes"]: - groups[parsed["attributes"]["group-title"]] = {} + group_name = parsed["attributes"]["group-title"] + # Log new groups as they're discovered + if group_name not in groups: + logger.debug(f"Found new group: '{group_name}'") + groups[group_name] = {} extinf_data.append(parsed) + else: + # Log problematic EXTINF lines + logger.warning(f"Failed to parse EXTINF at line {line_index+1}: {line[:200]}") + problematic_lines.append((line_index+1, line[:200])) + elif extinf_data and line.startswith("http"): + url_count += 1 # Associate URL with the last EXTINF line extinf_data[-1]["url"] = line + valid_stream_count += 1 + # Periodically log progress for large files + if valid_stream_count % 1000 == 0: + logger.debug(f"Processed {valid_stream_count} valid streams so far...") + + # Log summary statistics + logger.info(f"M3U parsing complete - Lines: {line_count}, EXTINF: {extinf_count}, URLs: {url_count}, Valid streams: {valid_stream_count}") + + if problematic_lines: + logger.warning(f"Found {len(problematic_lines)} problematic lines during parsing") + for i, (line_num, content) in enumerate(problematic_lines[:10]): # Log max 10 examples + logger.warning(f"Problematic line #{i+1} at line {line_num}: {content}") + if len(problematic_lines) > 10: + logger.warning(f"... and {len(problematic_lines) - 10} more problematic lines") + + # Log group statistics + logger.info(f"Found {len(groups)} groups in M3U file: {', '.join(list(groups.keys())[:20])}" + + ("..." if len(groups) > 20 else "")) + + # Cache processed data cache_path = os.path.join(m3u_dir, f"{account_id}.json") with open(cache_path, 'w', encoding='utf-8') as f: json.dump({ "extinf_data": extinf_data, "groups": groups, }, f) + logger.debug(f"Cached parsed M3U data to {cache_path}") send_m3u_update(account_id, "processing_groups", 0) @@ -830,11 +869,31 @@ def refresh_single_m3u_account(account_id): cache_path = os.path.join(m3u_dir, f"{account_id}.json") if os.path.exists(cache_path): - with open(cache_path, 'r') as file: - data = json.load(file) + try: + with open(cache_path, 'r') as file: + data = json.load(file) - extinf_data = data['extinf_data'] - groups = data['groups'] + extinf_data = data['extinf_data'] + groups = data['groups'] + except json.JSONDecodeError as e: + # Handle corrupted JSON file + logger.error(f"Error parsing cached M3U data for account {account_id}: {str(e)}") + + # Backup the corrupted file for potential analysis + backup_path = f"{cache_path}.corrupted" + try: + os.rename(cache_path, backup_path) + logger.info(f"Renamed corrupted cache file to {backup_path}") + except OSError as rename_err: + logger.warning(f"Failed to rename corrupted cache file: {str(rename_err)}") + + # Reset the data to empty structures + extinf_data = [] + groups = None + except Exception as e: + logger.error(f"Unexpected error reading cached M3U data: {str(e)}") + extinf_data = [] + groups = None if not extinf_data: try: @@ -903,6 +962,7 @@ def refresh_single_m3u_account(account_id): account.save(update_fields=['status']) if account.account_type == M3UAccount.Types.STADNARD: + logger.debug(f"Processing Standard account with groups: {existing_groups}") # Break into batches and process in parallel batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)] task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches) @@ -1060,7 +1120,8 @@ def refresh_single_m3u_account(account_id): # Aggressive garbage collection del existing_groups, extinf_data, groups, batches - gc.collect() + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) # Clean up cache file since we've fully processed it if os.path.exists(cache_path): @@ -1091,12 +1152,10 @@ def send_m3u_update(account_id, action, progress, **kwargs): # Add the additional key-value pairs from kwargs data.update(kwargs) - # Now, send the updated data dictionary - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - 'data': data - } - ) + # Use the standardized function with memory management + # Enable garbage collection for certain operations + collect_garbage = action == "parsing" and progress % 25 == 0 + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) + + # Explicitly clear data reference to help garbage collection + data = None diff --git a/apps/output/views.py b/apps/output/views.py index 38b69bde..39b20a41 100644 --- a/apps/output/views.py +++ b/apps/output/views.py @@ -24,7 +24,18 @@ def generate_m3u(request, profile_name=None): m3u_content = "#EXTM3U\n" for channel in channels: group_title = channel.channel_group.name if channel.channel_group else "Default" - tvg_id = channel.channel_number or channel.id + + # Format channel number as integer if it has no decimal component + if channel.channel_number is not None: + if channel.channel_number == int(channel.channel_number): + formatted_channel_number = int(channel.channel_number) + else: + formatted_channel_number = channel.channel_number + else: + formatted_channel_number = "" + + # Use formatted channel number for tvg_id to ensure proper matching with EPG + tvg_id = str(formatted_channel_number) if formatted_channel_number != "" else str(channel.id) tvg_name = channel.name tvg_logo = "" @@ -36,11 +47,9 @@ def generate_m3u(request, profile_name=None): if channel.tvc_guide_stationid: tvc_guide_stationid = f'tvc-guide-stationid="{channel.tvc_guide_stationid}" ' - channel_number = channel.channel_number - extinf_line = ( f'#EXTINF:-1 tvg-id="{tvg_id}" tvg-name="{tvg_name}" tvg-logo="{tvg_logo}" ' - f'tvg-chno="{channel_number}" {tvc_guide_stationid}group-title="{group_title}",{channel.name}\n' + f'tvg-chno="{formatted_channel_number}" {tvc_guide_stationid}group-title="{group_title}",{channel.name}\n' ) base_url = request.build_absolute_uri('/')[:-1] @@ -53,26 +62,96 @@ def generate_m3u(request, profile_name=None): response['Content-Disposition'] = 'attachment; filename="channels.m3u"' return response -def generate_dummy_epg(name, channel_id, num_days=7, interval_hours=4): - xml_lines = [] +def generate_dummy_epg(channel_id, channel_name, xml_lines=None, num_days=1, program_length_hours=4): + """ + Generate dummy EPG programs for channels without EPG data. + Creates program blocks for a specified number of days. - # Loop through the number of days - for day_offset in range(num_days): - current_day = datetime.now() + timedelta(days=day_offset) + Args: + channel_id: The channel ID to use in the program entries + channel_name: The name of the channel to use in program titles + xml_lines: Optional list to append lines to, otherwise returns new list + num_days: Number of days to generate EPG data for (default: 1) + program_length_hours: Length of each program block in hours (default: 4) - # Loop through each 4-hour interval in the day - for hour in range(0, 24, interval_hours): - start_time = current_day.replace(hour=hour, minute=0, second=0, microsecond=0) - stop_time = start_time + timedelta(hours=interval_hours) + Returns: + List of XML lines for the dummy EPG entries + """ + if xml_lines is None: + xml_lines = [] - # Format the times as per the requested format - start_str = start_time.strftime("%Y%m%d%H%M%S") + " 0000" - stop_str = stop_time.strftime("%Y%m%d%H%M%S") + " 0000" + # Get current time rounded to hour + now = timezone.now() + now = now.replace(minute=0, second=0, microsecond=0) - # Create the XML-like programme entry with escaped name - xml_lines.append(f'') - xml_lines.append(f' {html.escape(name)}') - xml_lines.append(f'') + # Humorous program descriptions based on time of day + time_descriptions = { + (0, 4): [ + f"Late Night with {channel_name} - Where insomniacs unite!", + f"The 'Why Am I Still Awake?' Show on {channel_name}", + f"Counting Sheep - A {channel_name} production for the sleepless" + ], + (4, 8): [ + f"Dawn Patrol - Rise and shine with {channel_name}!", + f"Early Bird Special - Coffee not included", + f"Morning Zombies - Before coffee viewing on {channel_name}" + ], + (8, 12): [ + f"Mid-Morning Meetings - Pretend you're paying attention while watching {channel_name}", + f"The 'I Should Be Working' Hour on {channel_name}", + f"Productivity Killer - {channel_name}'s daytime programming" + ], + (12, 16): [ + f"Lunchtime Laziness with {channel_name}", + f"The Afternoon Slump - Brought to you by {channel_name}", + f"Post-Lunch Food Coma Theater on {channel_name}" + ], + (16, 20): [ + f"Rush Hour - {channel_name}'s alternative to traffic", + f"The 'What's For Dinner?' Debate on {channel_name}", + f"Evening Escapism - {channel_name}'s remedy for reality" + ], + (20, 24): [ + f"Prime Time Placeholder - {channel_name}'s finest not-programming", + f"The 'Netflix Was Too Complicated' Show on {channel_name}", + f"Family Argument Avoider - Courtesy of {channel_name}" + ] + } + + # Create programs for each day + for day in range(num_days): + day_start = now + timedelta(days=day) + + # Create programs with specified length throughout the day + for hour_offset in range(0, 24, program_length_hours): + # Calculate program start and end times + start_time = day_start + timedelta(hours=hour_offset) + end_time = start_time + timedelta(hours=program_length_hours) + + # Get the hour for selecting a description + hour = start_time.hour + + # Find the appropriate time slot for description + for time_range, descriptions in time_descriptions.items(): + start_range, end_range = time_range + if start_range <= hour < end_range: + # Pick a description using the sum of the hour and day as seed + # This makes it somewhat random but consistent for the same timeslot + description = descriptions[(hour + day) % len(descriptions)] + break + else: + # Fallback description if somehow no range matches + description = f"Placeholder program for {channel_name} - EPG data went on vacation" + + # Format times in XMLTV format + start_str = start_time.strftime("%Y%m%d%H%M%S %z") + stop_str = end_time.strftime("%Y%m%d%H%M%S %z") + + # Create program entry with escaped channel name + xml_lines.append(f' ') + xml_lines.append(f' {html.escape(channel_name)}') + xml_lines.append(f' {html.escape(description)}') + xml_lines.append(f' ') return xml_lines @@ -98,9 +177,17 @@ def generate_epg(request, profile_name=None): # Retrieve all active channels for channel in channels: - channel_id = channel.channel_number or channel.id + # Format channel number as integer if it has no decimal component - same as M3U generation + if channel.channel_number is not None: + if channel.channel_number == int(channel.channel_number): + formatted_channel_number = str(int(channel.channel_number)) + else: + formatted_channel_number = str(channel.channel_number) + else: + formatted_channel_number = str(channel.id) + display_name = channel.epg_data.name if channel.epg_data else channel.name - xml_lines.append(f' ') + xml_lines.append(f' ') xml_lines.append(f' {html.escape(display_name)}') # Add channel logo if available @@ -111,16 +198,34 @@ def generate_epg(request, profile_name=None): xml_lines.append(' ') for channel in channels: - channel_id = channel.channel_number or channel.id + # Use the same formatting for channel ID in program entries + if channel.channel_number is not None: + if channel.channel_number == int(channel.channel_number): + formatted_channel_number = str(int(channel.channel_number)) + else: + formatted_channel_number = str(channel.channel_number) + else: + formatted_channel_number = str(channel.id) + display_name = channel.epg_data.name if channel.epg_data else channel.name if not channel.epg_data: - xml_lines = xml_lines + generate_dummy_epg(display_name, channel_id) + # Use the enhanced dummy EPG generation function with defaults + # These values could be made configurable via settings or request parameters + num_days = 1 # Default to 1 days of dummy EPG data + program_length_hours = 4 # Default to 4-hour program blocks + generate_dummy_epg( + formatted_channel_number, + display_name, + xml_lines, + num_days=num_days, + program_length_hours=program_length_hours + ) else: programs = channel.epg_data.programs.all() for prog in programs: start_str = prog.start_time.strftime("%Y%m%d%H%M%S %z") stop_str = prog.end_time.strftime("%Y%m%d%H%M%S %z") - xml_lines.append(f' ') + xml_lines.append(f' ') xml_lines.append(f' {html.escape(prog.title)}') # Add subtitle if available diff --git a/apps/proxy/tasks.py b/apps/proxy/tasks.py index a4aaf8e5..00e3e039 100644 --- a/apps/proxy/tasks.py +++ b/apps/proxy/tasks.py @@ -6,8 +6,10 @@ import redis import json import logging import re +import gc # Add import for garbage collection from core.utils import RedisClient from apps.proxy.ts_proxy.channel_status import ChannelStatus +from core.utils import send_websocket_update logger = logging.getLogger(__name__) @@ -43,11 +45,17 @@ def fetch_channel_stats(): return # return JsonResponse({'error': str(e)}, status=500) - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( + send_websocket_update( "updates", + "update", { - "type": "update", - "data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})} + "success": True, + "type": "channel_stats", + "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)}) }, + collect_garbage=True ) + + # Explicitly clean up large data structures + all_channels = None + gc.collect() diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py index cebcc545..3d0a53d9 100644 --- a/apps/proxy/ts_proxy/server.py +++ b/apps/proxy/ts_proxy/server.py @@ -464,6 +464,28 @@ class ProxyServer: def initialize_channel(self, url, channel_id, user_agent=None, transcode=False, stream_id=None): """Initialize a channel without redundant active key""" try: + # IMPROVED: First check if channel is already being initialized by another process + if self.redis_client: + metadata_key = RedisKeys.channel_metadata(channel_id) + if self.redis_client.exists(metadata_key): + metadata = self.redis_client.hgetall(metadata_key) + if b'state' in metadata: + state = metadata[b'state'].decode('utf-8') + active_states = [ChannelState.INITIALIZING, ChannelState.CONNECTING, + ChannelState.WAITING_FOR_CLIENTS, ChannelState.ACTIVE] + if state in active_states: + logger.info(f"Channel {channel_id} already being initialized with state {state}") + # Create buffer and client manager only if we don't have them + if channel_id not in self.stream_buffers: + self.stream_buffers[channel_id] = StreamBuffer(channel_id, redis_client=self.redis_client) + if channel_id not in self.client_managers: + self.client_managers[channel_id] = ClientManager( + channel_id, + redis_client=self.redis_client, + worker_id=self.worker_id + ) + return True + # Create buffer and client manager instances buffer = StreamBuffer(channel_id, redis_client=self.redis_client) client_manager = ClientManager( @@ -476,6 +498,20 @@ class ProxyServer: self.stream_buffers[channel_id] = buffer self.client_managers[channel_id] = client_manager + # IMPROVED: Set initializing state in Redis BEFORE any other operations + if self.redis_client: + # Set early initialization state to prevent race conditions + metadata_key = RedisKeys.channel_metadata(channel_id) + initial_metadata = { + "state": ChannelState.INITIALIZING, + "init_time": str(time.time()), + "owner": self.worker_id + } + if stream_id: + initial_metadata["stream_id"] = str(stream_id) + self.redis_client.hset(metadata_key, mapping=initial_metadata) + logger.info(f"Set early initializing state for channel {channel_id}") + # Get channel URL from Redis if available channel_url = url channel_user_agent = user_agent diff --git a/apps/proxy/ts_proxy/stream_generator.py b/apps/proxy/ts_proxy/stream_generator.py index 97e02c16..817a7b82 100644 --- a/apps/proxy/ts_proxy/stream_generator.py +++ b/apps/proxy/ts_proxy/stream_generator.py @@ -120,9 +120,19 @@ class StreamGenerator: yield create_ts_packet('error', f"Error: {error_message}") return False else: + # Improved logging to track initialization progress + init_time = "unknown" + if b'init_time' in metadata: + try: + init_time_float = float(metadata[b'init_time'].decode('utf-8')) + init_duration = time.time() - init_time_float + init_time = f"{init_duration:.1f}s ago" + except: + pass + # Still initializing - send keepalive if needed if time.time() - last_keepalive >= keepalive_interval: - status_msg = f"Initializing: {state}" + status_msg = f"Initializing: {state} (started {init_time})" keepalive_packet = create_ts_packet('keepalive', status_msg) logger.debug(f"[{self.client_id}] Sending keepalive packet during initialization, state={state}") yield keepalive_packet diff --git a/apps/proxy/ts_proxy/stream_manager.py b/apps/proxy/ts_proxy/stream_manager.py index 7d158c09..054de05b 100644 --- a/apps/proxy/ts_proxy/stream_manager.py +++ b/apps/proxy/ts_proxy/stream_manager.py @@ -376,28 +376,33 @@ class StreamManager: logger.debug(f"Started stderr reader thread for channel {self.channel_id}") def _read_stderr(self): - """Read and log stderr output from the transcode process""" + """Read and log ffmpeg stderr output""" try: - if not self.transcode_process or not self.transcode_process.stderr: - logger.warning(f"No stderr to read for channel {self.channel_id}") - return - - for line in iter(self.transcode_process.stderr.readline, b''): - if not line: - break - - # Decode the line and strip whitespace - error_line = line.decode('utf-8', errors='replace').strip() - - # Skip empty lines - if not error_line: - continue - - # Log all stderr output as debug messages - logger.debug(f"Transcode stderr [{self.channel_id}]: {error_line}") - + for error_line in iter(self.transcode_process.stderr.readline, b''): + if error_line: + error_line = error_line.decode('utf-8', errors='replace').strip() + try: + # Wrap the logging call in a try-except to prevent crashes due to logging errors + logger.debug(f"Transcode stderr [{self.channel_id}]: {error_line}") + except OSError as e: + # If logging fails, try a simplified log message + if e.errno == 105: # No buffer space available + try: + # Try a much shorter message without the error content + logger.warning(f"Logging error (buffer full) in channel {self.channel_id}") + except: + # If even that fails, we have to silently continue + pass + except Exception: + # Ignore other logging errors to prevent thread crashes + pass except Exception as e: - logger.error(f"Error reading transcode stderr: {e}") + # Catch any other exceptions in the thread to prevent crashes + try: + logger.error(f"Error in stderr reader thread: {e}") + except: + # Again, if logging fails, continue silently + pass def _establish_http_connection(self): """Establish a direct HTTP connection to the stream""" diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py index ef232fd2..b90e1585 100644 --- a/apps/proxy/ts_proxy/views.py +++ b/apps/proxy/ts_proxy/views.py @@ -53,6 +53,7 @@ def stream_ts(request, channel_id): # Check if we need to reinitialize the channel needs_initialization = True channel_state = None + channel_initializing = False # Get current channel state from Redis if available if proxy_server.redis_client: @@ -63,30 +64,36 @@ def stream_ts(request, channel_id): if state_field in metadata: channel_state = metadata[state_field].decode('utf-8') - # Only skip initialization if channel is in a healthy state - valid_states = [ChannelState.ACTIVE, ChannelState.WAITING_FOR_CLIENTS] - if channel_state in valid_states: - # Verify the owner is still active + # IMPROVED: Check for *any* state that indicates initialization is in progress + active_states = [ChannelState.INITIALIZING, ChannelState.CONNECTING, ChannelState.WAITING_FOR_CLIENTS, ChannelState.ACTIVE] + if channel_state in active_states: + # Channel is being initialized or already active - no need for reinitialization + needs_initialization = False + logger.debug(f"[{client_id}] Channel {channel_id} already in state {channel_state}, skipping initialization") + + # Special handling for initializing/connecting states + if channel_state in [ChannelState.INITIALIZING, ChannelState.CONNECTING]: + channel_initializing = True + logger.debug(f"[{client_id}] Channel {channel_id} is still initializing, client will wait for completion") + else: + # Only check for owner if channel is in a valid state owner_field = ChannelMetadataField.OWNER.encode('utf-8') if owner_field in metadata: owner = metadata[owner_field].decode('utf-8') owner_heartbeat_key = f"ts_proxy:worker:{owner}:heartbeat" if proxy_server.redis_client.exists(owner_heartbeat_key): - # Owner is active and channel is in good state + # Owner is still active, so we don't need to reinitialize needs_initialization = False - logger.info(f"[{client_id}] Channel {channel_id} in state {channel_state} with active owner {owner}") + logger.debug(f"[{client_id}] Channel {channel_id} has active owner {owner}") # Start initialization if needed - channel_initializing = False if needs_initialization or not proxy_server.check_if_channel_exists(channel_id): - # Force cleanup of any previous instance + logger.info(f"[{client_id}] Starting channel {channel_id} initialization") + # Force cleanup of any previous instance if in terminal state if channel_state in [ChannelState.ERROR, ChannelState.STOPPING, ChannelState.STOPPED]: logger.warning(f"[{client_id}] Channel {channel_id} in state {channel_state}, forcing cleanup") proxy_server.stop_channel(channel_id) - # Initialize the channel (but don't wait for completion) - logger.info(f"[{client_id}] Starting channel {channel_id} initialization") - # Use max retry attempts and connection timeout from config max_retries = ConfigHelper.max_retries() retry_timeout = ConfigHelper.connection_timeout() diff --git a/core/tasks.py b/core/tasks.py index a6bd80cf..0fdaedf7 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -2,13 +2,12 @@ from celery import shared_task from channels.layers import get_channel_layer from asgiref.sync import async_to_sync -import redis import json import logging import re import time import os -from core.utils import RedisClient +from core.utils import RedisClient, send_websocket_update from apps.proxy.ts_proxy.channel_status import ChannelStatus from apps.m3u.models import M3UAccount from apps.epg.models import EPGSource @@ -36,11 +35,6 @@ LOG_THROTTLE_SECONDS = 300 # 5 minutes # Track if this is the first scan since startup _first_scan_completed = False -@shared_task -def beat_periodic_task(): - fetch_channel_stats() - scan_and_process_files() - def throttled_log(logger_method, message, key=None, *args, **kwargs): """Only log messages with the same key once per throttle period""" if key is None: @@ -52,6 +46,11 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs): logger_method(message, *args, **kwargs) _last_log_times[key] = now +@shared_task +def beat_periodic_task(): + fetch_channel_stats() + scan_and_process_files() + @shared_task def scan_and_process_files(): global _first_scan_completed @@ -176,12 +175,12 @@ def scan_and_process_files(): epg_skipped += 1 continue - if not filename.endswith('.xml') and not filename.endswith('.gz'): + if not filename.endswith('.xml') and not filename.endswith('.gz') and not filename.endswith('.zip'): # Use trace level if not first scan if _first_scan_completed: - logger.trace(f"Skipping {filename}: Not an XML or GZ file") + logger.trace(f"Skipping {filename}: Not an XML, GZ or zip file") else: - logger.debug(f"Skipping {filename}: Not an XML or GZ file") + logger.debug(f"Skipping {filename}: Not an XML, GZ or zip file") epg_skipped += 1 continue @@ -293,19 +292,23 @@ def fetch_channel_stats(): if cursor == 0: break + send_websocket_update( + "updates", + "update", + { + "success": True, + "type": "channel_stats", + "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)}) + }, + collect_garbage=True + ) + + # Explicitly clean up large data structures + all_channels = None + except Exception as e: logger.error(f"Error in channel_status: {e}", exc_info=True) return - # return JsonResponse({'error': str(e)}, status=500) - - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - "updates", - { - "type": "update", - "data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})} - }, - ) @shared_task def rehash_streams(keys): diff --git a/core/utils.py b/core/utils.py index 3a5d84f4..9951ce26 100644 --- a/core/utils.py +++ b/core/utils.py @@ -52,6 +52,43 @@ class RedisClient: # Validate connection with ping client.ping() client.flushdb() + + # Disable persistence on first connection - improves performance + # Only try to disable if not in a read-only environment + try: + client.config_set('save', '') # Disable RDB snapshots + client.config_set('appendonly', 'no') # Disable AOF logging + + # Set optimal memory settings with environment variable support + # Get max memory from environment or use a larger default (512MB instead of 256MB) + #max_memory = os.environ.get('REDIS_MAX_MEMORY', '512mb') + #eviction_policy = os.environ.get('REDIS_EVICTION_POLICY', 'allkeys-lru') + + # Apply memory settings + #client.config_set('maxmemory-policy', eviction_policy) + #client.config_set('maxmemory', max_memory) + + #logger.info(f"Redis configured with maxmemory={max_memory}, policy={eviction_policy}") + + # Disable protected mode when in debug mode + if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true': + client.config_set('protected-mode', 'no') # Disable protected mode in debug + logger.warning("Redis protected mode disabled for debug environment") + + logger.trace("Redis persistence disabled for better performance") + except redis.exceptions.ResponseError as e: + # Improve error handling for Redis configuration errors + if "OOM" in str(e): + logger.error(f"Redis OOM during configuration: {e}") + # Try to increase maxmemory as an emergency measure + try: + client.config_set('maxmemory', '768mb') + logger.warning("Applied emergency Redis memory increase to 768MB") + except: + pass + else: + logger.error(f"Redis configuration error: {e}") + logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") cls._client = client @@ -151,12 +188,145 @@ def release_task_lock(task_name, id): # Remove the lock redis_client.delete(lock_id) -def send_websocket_event(event, success, data): +def send_websocket_update(group_name, event_type, data, collect_garbage=False): + """ + Standardized function to send WebSocket updates with proper memory management. + + Args: + group_name: The WebSocket group to send to (e.g. 'updates') + event_type: The type of message (e.g. 'update') + data: The data to send + collect_garbage: Whether to force garbage collection after sending + """ channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": {"success": True, "type": "epg_channels"} - } - ) + try: + async_to_sync(channel_layer.group_send)( + group_name, + { + 'type': event_type, + 'data': data + } + ) + except Exception as e: + logger.warning(f"Failed to send WebSocket update: {e}") + finally: + # Explicitly release references to help garbage collection + channel_layer = None + + # Force garbage collection if requested + if collect_garbage: + gc.collect() + +def send_websocket_event(event, success, data): + """Acquire a lock to prevent concurrent task execution.""" + data_payload = {"success": success, "type": event} + if data: + # Make a copy to avoid modifying the original + data_payload.update(data) + + # Use the standardized function + send_websocket_update('updates', 'update', data_payload) + + # Help garbage collection by clearing references + data_payload = None + +# Add memory monitoring utilities +def get_memory_usage(): + """Returns current memory usage in MB""" + import psutil + process = psutil.Process(os.getpid()) + return process.memory_info().rss / (1024 * 1024) + +def monitor_memory_usage(func): + """Decorator to monitor memory usage before and after function execution""" + def wrapper(*args, **kwargs): + import gc + # Force garbage collection before measuring + gc.collect() + + # Get initial memory usage + start_mem = get_memory_usage() + logger.debug(f"Memory usage before {func.__name__}: {start_mem:.2f} MB") + + # Call the original function + result = func(*args, **kwargs) + + # Force garbage collection before measuring again + gc.collect() + + # Get final memory usage + end_mem = get_memory_usage() + logger.debug(f"Memory usage after {func.__name__}: {end_mem:.2f} MB (Change: {end_mem - start_mem:.2f} MB)") + + return result + return wrapper + +def cleanup_memory(log_usage=False, force_collection=True): + """ + Comprehensive memory cleanup function to reduce memory footprint + + Args: + log_usage: Whether to log memory usage before and after cleanup + force_collection: Whether to force garbage collection + """ + logger.trace("Starting memory cleanup django memory cleanup") + # Skip logging if log level is not set to debug or more verbose (like trace) + current_log_level = logger.getEffectiveLevel() + if not current_log_level <= logging.DEBUG: + log_usage = False + if log_usage: + try: + import psutil + process = psutil.Process() + before_mem = process.memory_info().rss / (1024 * 1024) + logger.debug(f"Memory before cleanup: {before_mem:.2f} MB") + except (ImportError, Exception) as e: + logger.debug(f"Error getting memory usage: {e}") + + # Clear any object caches from Django ORM + from django.db import connection, reset_queries + reset_queries() + + # Force garbage collection + if force_collection: + # Run full collection + gc.collect(generation=2) + # Clear cyclic references + gc.collect(generation=0) + + if log_usage: + try: + import psutil + process = psutil.Process() + after_mem = process.memory_info().rss / (1024 * 1024) + logger.debug(f"Memory after cleanup: {after_mem:.2f} MB (change: {after_mem-before_mem:.2f} MB)") + except (ImportError, Exception): + pass + logger.trace("Memory cleanup complete for django") + +def is_protected_path(file_path): + """ + Determine if a file path is in a protected directory that shouldn't be deleted. + + Args: + file_path (str): The file path to check + + Returns: + bool: True if the path is protected, False otherwise + """ + if not file_path: + return False + + # List of protected directory prefixes + protected_dirs = [ + '/data/epgs', # EPG files mapped from host + '/data/uploads', # User uploaded files + '/data/m3us' # M3U files mapped from host + ] + + # Check if the path starts with any protected directory + for protected_dir in protected_dirs: + if file_path.startswith(protected_dir): + return True + + return False diff --git a/dispatcharr/celery.py b/dispatcharr/celery.py index a0ff2168..8856d330 100644 --- a/dispatcharr/celery.py +++ b/dispatcharr/celery.py @@ -2,6 +2,7 @@ import os from celery import Celery import logging +from celery.signals import task_postrun # Add import for signals # Initialize with defaults before Django settings are loaded DEFAULT_LOG_LEVEL = 'DEBUG' @@ -48,6 +49,56 @@ app.conf.update( worker_task_log_format='%(asctime)s %(levelname)s %(task_name)s: %(message)s', ) +# Add memory cleanup after task completion +#@task_postrun.connect # Use the imported signal +def cleanup_task_memory(**kwargs): + """Clean up memory after each task completes""" + # Get task name from kwargs + task_name = kwargs.get('task').name if kwargs.get('task') else '' + + # Only run cleanup for memory-intensive tasks + memory_intensive_tasks = [ + 'apps.m3u.tasks.refresh_single_m3u_account', + 'apps.m3u.tasks.refresh_m3u_accounts', + 'apps.m3u.tasks.process_m3u_batch', + 'apps.m3u.tasks.process_xc_category', + 'apps.epg.tasks.refresh_epg_data', + 'apps.epg.tasks.refresh_all_epg_data', + 'apps.epg.tasks.parse_programs_for_source', + 'apps.epg.tasks.parse_programs_for_tvg_id', + 'apps.channels.tasks.match_epg_channels', + 'core.tasks.rehash_streams' + ] + + # Check if this is a memory-intensive task + if task_name in memory_intensive_tasks: + # Import cleanup_memory function + from core.utils import cleanup_memory + + # Use the comprehensive cleanup function + cleanup_memory(log_usage=True, force_collection=True) + + # Log memory usage if psutil is installed + try: + import psutil + process = psutil.Process() + if hasattr(process, 'memory_info'): + mem = process.memory_info().rss / (1024 * 1024) + print(f"Memory usage after {task_name}: {mem:.2f} MB") + except (ImportError, Exception): + pass + else: + # For non-intensive tasks, just log but don't force cleanup + try: + import psutil + process = psutil.Process() + if hasattr(process, 'memory_info'): + mem = process.memory_info().rss / (1024 * 1024) + if mem > 500: # Only log if using more than 500MB + print(f"High memory usage detected in {task_name}: {mem:.2f} MB") + except (ImportError, Exception): + pass + @app.on_after_configure.connect def setup_celery_logging(**kwargs): # Use our directly determined log level diff --git a/dispatcharr/jwt_ws_auth.py b/dispatcharr/jwt_ws_auth.py index 3c7afeab..b478cd6f 100644 --- a/dispatcharr/jwt_ws_auth.py +++ b/dispatcharr/jwt_ws_auth.py @@ -6,7 +6,9 @@ from django.contrib.auth.models import AnonymousUser from django.contrib.auth import get_user_model from rest_framework_simplejwt.exceptions import InvalidToken, TokenError from rest_framework_simplejwt.authentication import JWTAuthentication +import logging +logger = logging.getLogger(__name__) User = get_user_model() @database_sync_to_async @@ -15,7 +17,11 @@ def get_user(validated_token): jwt_auth = JWTAuthentication() user = jwt_auth.get_user(validated_token) return user - except: + except User.DoesNotExist: + logger.warning(f"User from token does not exist. User ID: {validated_token.get('user_id', 'unknown')}") + return AnonymousUser() + except Exception as e: + logger.error(f"Error getting user from token: {str(e)}") return AnonymousUser() class JWTAuthMiddleware(BaseMiddleware): @@ -26,11 +32,16 @@ class JWTAuthMiddleware(BaseMiddleware): token = query_string.get("token", [None])[0] if token is not None: - validated_token = JWTAuthentication().get_validated_token(token) - scope["user"] = await get_user(validated_token) + try: + validated_token = JWTAuthentication().get_validated_token(token) + scope["user"] = await get_user(validated_token) + except (InvalidToken, TokenError) as e: + logger.warning(f"Invalid token: {str(e)}") + scope["user"] = AnonymousUser() else: scope["user"] = AnonymousUser() - except (InvalidToken, TokenError): + except Exception as e: + logger.error(f"Error in JWT authentication: {str(e)}") scope["user"] = AnonymousUser() return await super().__call__(scope, receive, send) diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 8f3921c0..4e1e0d55 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -43,6 +43,34 @@ INSTALLED_APPS = [ 'django_celery_beat', ] +# EPG Processing optimization settings +EPG_BATCH_SIZE = 1000 # Number of records to process in a batch +EPG_MEMORY_LIMIT = 512 # Memory limit in MB before forcing garbage collection +EPG_ENABLE_MEMORY_MONITORING = True # Whether to monitor memory usage during processing + +# Database optimization settings +DATABASE_STATEMENT_TIMEOUT = 300 # Seconds before timing out long-running queries +DATABASE_CONN_MAX_AGE = 60 # Connection max age in seconds, helps with frequent reconnects + +# Disable atomic requests for performance-sensitive views +ATOMIC_REQUESTS = False + +# Cache settings - add caching for EPG operations +CACHES = { + 'default': { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + 'LOCATION': 'dispatcharr-epg-cache', + 'TIMEOUT': 3600, # 1 hour cache timeout + 'OPTIONS': { + 'MAX_ENTRIES': 10000, + 'CULL_FREQUENCY': 3, # Purge 1/3 of entries when max is reached + } + } +} + +# Timeouts for external connections +REQUESTS_TIMEOUT = 30 # Seconds for external API requests + MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', @@ -171,11 +199,24 @@ CELERY_BROKER_TRANSPORT_OPTIONS = { CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' +# Memory management settings +#CELERY_WORKER_MAX_TASKS_PER_CHILD = 10 # Restart worker after 10 tasks to free memory +#CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Don't prefetch tasks - process one at a time +#CELERY_TASK_ACKS_LATE = True # Only acknowledge tasks after they're processed +#CELERY_TASK_TIME_LIMIT = 3600 # 1 hour time limit per task +#CELERY_TASK_SOFT_TIME_LIMIT = 3540 # Soft limit 60 seconds before hard limit +#CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True # Cancel tasks if connection lost +#CELERY_TASK_IGNORE_RESULT = True # Don't store results unless explicitly needed + CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler" CELERY_BEAT_SCHEDULE = { 'fetch-channel-statuses': { - 'task': 'core.tasks.beat_periodic_task', - 'schedule': 2.0, + 'task': 'apps.proxy.tasks.fetch_channel_stats', # Direct task call + 'schedule': 2.0, # Every 2 seconds + }, + 'scan-files': { + 'task': 'core.tasks.scan_and_process_files', # Direct task call + 'schedule': 20.0, # Every 20 seconds }, } @@ -280,6 +321,11 @@ LOGGING = { 'level': LOG_LEVEL, # Use environment-configured level 'propagate': False, # Don't propagate to root logger to avoid duplicate logs }, + 'core.utils': { + 'handlers': ['console'], + 'level': LOG_LEVEL, + 'propagate': False, + }, 'apps.proxy': { 'handlers': ['console'], 'level': LOG_LEVEL, # Use environment-configured level diff --git a/docker/uwsgi.debug.ini b/docker/uwsgi.debug.ini index 43ecd5ce..6ca855f3 100644 --- a/docker/uwsgi.debug.ini +++ b/docker/uwsgi.debug.ini @@ -8,7 +8,7 @@ exec-before = python /app/scripts/wait_for_redis.py ; Start Redis first attach-daemon = redis-server ; Then start other services -attach-daemon = celery -A dispatcharr worker +attach-daemon = celery -A dispatcharr worker --concurrency=4 attach-daemon = celery -A dispatcharr beat attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application attach-daemon = cd /app/frontend && npm run dev @@ -47,6 +47,7 @@ thunder-lock = true log-4xx = true log-5xx = true disable-logging = false +log-buffering = 1024 # Add buffer size limit for logging ; Longer timeouts for debugging sessions harakiri = 3600 diff --git a/docker/uwsgi.dev.ini b/docker/uwsgi.dev.ini index 62a5f352..f3e5238e 100644 --- a/docker/uwsgi.dev.ini +++ b/docker/uwsgi.dev.ini @@ -57,4 +57,5 @@ log-master = true logformat-strftime = true log-date = %%Y-%%m-%%d %%H:%%M:%%S,000 # Use formatted time with environment variable for log level -log-format = %(ftime) $(DISPATCHARR_LOG_LEVEL) uwsgi.requests Worker ID: %(wid) %(method) %(status) %(uri) %(msecs)ms \ No newline at end of file +log-format = %(ftime) $(DISPATCHARR_LOG_LEVEL) uwsgi.requests Worker ID: %(wid) %(method) %(status) %(uri) %(msecs)ms +log-buffering = 1024 # Add buffer size limit for logging \ No newline at end of file diff --git a/docker/uwsgi.ini b/docker/uwsgi.ini index 5068268c..32eb6e3c 100644 --- a/docker/uwsgi.ini +++ b/docker/uwsgi.ini @@ -55,4 +55,5 @@ log-master = true logformat-strftime = true log-date = %%Y-%%m-%%d %%H:%%M:%%S,000 # Use formatted time with environment variable for log level -log-format = %(ftime) $(DISPATCHARR_LOG_LEVEL) uwsgi.requests Worker ID: %(wid) %(method) %(status) %(uri) %(msecs)ms \ No newline at end of file +log-format = %(ftime) $(DISPATCHARR_LOG_LEVEL) uwsgi.requests Worker ID: %(wid) %(method) %(status) %(uri) %(msecs)ms +log-buffering = 1024 # Add buffer size limit for logging \ No newline at end of file diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 1c032ab3..7295d12e 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -57,6 +57,12 @@ const App = () => { } } catch (error) { console.error('Error checking superuser status:', error); + // If authentication error, redirect to login + if (error.status === 401) { + localStorage.removeItem('token'); + localStorage.removeItem('refreshToken'); + window.location.href = '/login'; + } } } checkSuperuser(); diff --git a/frontend/src/api.js b/frontend/src/api.js index 60b22634..73bbde7d 100644 --- a/frontend/src/api.js +++ b/frontend/src/api.js @@ -103,14 +103,13 @@ export default class API { static async fetchSuperUser() { try { - const response = await request( - `${host}/api/accounts/initialize-superuser/`, - { auth: false } - ); - - return response; - } catch (e) { - errorNotification('Failed to fetch superuser', e); + return await request(`${host}/api/accounts/initialize-superuser/`, { + auth: false, + method: 'GET', + }); + } catch (error) { + console.error('Error checking superuser status:', error); + throw error; } } @@ -150,11 +149,21 @@ export default class API { } static async refreshToken(refresh) { - return await request(`${host}/api/accounts/token/refresh/`, { - auth: false, - method: 'POST', - body: { refresh }, - }); + try { + return await request(`${host}/api/accounts/token/refresh/`, { + auth: false, + method: 'POST', + body: { refresh }, + }); + } catch (error) { + // If user does not exist or token is invalid, clear tokens + if (error.status === 401 || error.message?.includes('does not exist')) { + localStorage.removeItem('token'); + localStorage.removeItem('refreshToken'); + window.location.href = '/login'; // Redirect to login + } + throw error; + } } static async logout() { @@ -359,8 +368,8 @@ export default class API { payload.channel_number !== null && payload.channel_number !== undefined ) { - const parsedNumber = parseInt(payload.channel_number, 10); - payload.channel_number = isNaN(parsedNumber) ? null : parsedNumber; + // Ensure channel_number is explicitly treated as a float + payload.channel_number = parseFloat(payload.channel_number); } const response = await request( diff --git a/frontend/src/components/forms/Channel.jsx b/frontend/src/components/forms/Channel.jsx index 71f7c7b2..ac048712 100644 --- a/frontend/src/components/forms/Channel.jsx +++ b/frontend/src/components/forms/Channel.jsx @@ -667,6 +667,9 @@ const ChannelForm = ({ channel = null, isOpen, onClose }) => { : '' } size="xs" + step={0.1} // Add step prop to allow decimal inputs + precision={1} // Specify decimal precision + removeTrailingZeros // Optional: remove trailing zeros for cleaner display /> { const epgs = useEPGsStore((state) => state.epgs); - const [file, setFile] = useState(null); - - const handleFileChange = (e) => { - const file = e.target.files[0]; - if (file) { - setFile(file); - } - }; + // Remove the file state and handler since we're not supporting file uploads + const [sourceType, setSourceType] = useState('xmltv'); const form = useForm({ mode: 'uncontrolled', @@ -47,114 +50,151 @@ const EPG = ({ epg = null, isOpen, onClose }) => { const values = form.getValues(); if (epg?.id) { - await API.updateEPG({ id: epg.id, ...values, file }); + // Remove file from API call + await API.updateEPG({ id: epg.id, ...values }); } else { + // Remove file from API call await API.addEPG({ ...values, - file, }); } form.reset(); - setFile(null); onClose(); }; useEffect(() => { if (epg) { - form.setValues({ + const values = { name: epg.name, source_type: epg.source_type, url: epg.url, api_key: epg.api_key, is_active: epg.is_active, refresh_interval: epg.refresh_interval, - }); + }; + form.setValues(values); + setSourceType(epg.source_type); // Update source type state } else { form.reset(); + setSourceType('xmltv'); // Reset to xmltv } }, [epg]); + // Function to handle source type changes + const handleSourceTypeChange = (value) => { + form.setFieldValue('source_type', value); + setSourceType(value); + }; + if (!isOpen) { return <>; } return ( - +
- + + {/* Left Column */} + + - + handleSourceTypeChange(event.currentTarget.value)} + /> - + + - + - How often to automatically refresh EPG data
- (0 to disable automatic refreshes)} - {...form.getInputProps('refresh_interval')} - key={form.key('refresh_interval')} - /> + {/* Right Column */} + + - + - - - + {/* Put checkbox at the same level as Refresh Interval */} + + Status + When enabled, this EPG source will auto update. + + + + + +
+ + {/* Full Width Section */} + + + + + + + +
); diff --git a/frontend/src/components/forms/M3U.jsx b/frontend/src/components/forms/M3U.jsx index 8e8fd932..9affa984 100644 --- a/frontend/src/components/forms/M3U.jsx +++ b/frontend/src/components/forms/M3U.jsx @@ -307,7 +307,7 @@ const M3U = ({ description="User-Agent header to use when accessing this M3U source" {...form.getInputProps('user_agent')} key={form.key('user_agent')} - data={[{ value: '0', label: '(use default)' }].concat( + data={[{ value: '0', label: '(Use Default)' }].concat( userAgents.map((ua) => ({ label: ua.name, value: `${ua.id}`, diff --git a/frontend/src/components/tables/ChannelsTable.jsx b/frontend/src/components/tables/ChannelsTable.jsx index 2716e994..cb81e988 100644 --- a/frontend/src/components/tables/ChannelsTable.jsx +++ b/frontend/src/components/tables/ChannelsTable.jsx @@ -593,11 +593,18 @@ const ChannelsTable = ({ }) => { id: 'channel_number', accessorKey: 'channel_number', size: 40, - cell: ({ getValue }) => ( - - {getValue()} - - ), + cell: ({ getValue }) => { + const value = getValue(); + // Format as integer if no decimal component + const formattedValue = value !== null && value !== undefined ? + (value === Math.floor(value) ? Math.floor(value) : value) : ''; + + return ( + + {formattedValue} + + ); + }, }, { id: 'name', diff --git a/frontend/src/components/tables/M3UsTable.jsx b/frontend/src/components/tables/M3UsTable.jsx index 0c5b2b4d..261e3984 100644 --- a/frontend/src/components/tables/M3UsTable.jsx +++ b/frontend/src/components/tables/M3UsTable.jsx @@ -210,24 +210,24 @@ const M3UTable = () => { - Parsing: + Parsing: {parseInt(data.progress)}% {data.elapsed_time && ( - Elapsed: + Elapsed: {elapsedTime} )} {data.time_remaining && ( - Remaining: + Remaining: {timeRemaining} )} {data.streams_processed && ( - Streams: + Streams: {data.streams_processed} )} @@ -424,7 +424,7 @@ const M3UTable = () => { if (data.status === 'success') { return ( - + {value} @@ -434,7 +434,7 @@ const M3UTable = () => { // For all other status values, just use dimmed text return ( - + {value} diff --git a/frontend/src/pages/Guide.jsx b/frontend/src/pages/Guide.jsx index b64c2517..c0042c31 100644 --- a/frontend/src/pages/Guide.jsx +++ b/frontend/src/pages/Guide.jsx @@ -78,31 +78,14 @@ export default function TVChannelGuide({ startDate, endDate }) { const fetched = await API.getGrid(); // GETs your EPG grid console.log(`Received ${fetched.length} programs`); - // Unique tvg_ids from returned programs - const programIds = [...new Set(fetched.map((p) => p.tvg_id))]; + // Include ALL channels, sorted by channel number - don't filter by EPG data + const sortedChannels = Object.values(channels) + .sort((a, b) => (a.channel_number || Infinity) - (b.channel_number || Infinity)); - // Filter your Redux/Zustand channels by matching tvg_id - const filteredChannels = Object.values(channels) - // Include channels with matching tvg_ids OR channels with null epg_data - .filter( - (ch) => - programIds.includes(tvgsById[ch.epg_data_id]?.tvg_id) || - programIds.includes(ch.uuid) || - ch.epg_data_id === null - ) - // Add sorting by channel_number - .sort( - (a, b) => - (a.channel_number || Infinity) - (b.channel_number || Infinity) - ); + console.log(`Using all ${sortedChannels.length} available channels`); - console.log( - `found ${filteredChannels.length} channels with matching tvg_ids` - ); - - setGuideChannels(filteredChannels); - setFilteredChannels(filteredChannels); // Initialize filtered channels - console.log(fetched); + setGuideChannels(sortedChannels); + setFilteredChannels(sortedChannels); // Initialize filtered channels setPrograms(fetched); setLoading(false); }; @@ -135,9 +118,12 @@ export default function TVChannelGuide({ startDate, endDate }) { if (selectedProfileId !== 'all') { // Get the profile's enabled channels const profileChannels = profiles[selectedProfileId]?.channels || []; - const enabledChannelIds = profileChannels - .filter((pc) => pc.enabled) - .map((pc) => pc.id); + // Check if channels is a Set (from the error message, it likely is) + const enabledChannelIds = Array.isArray(profileChannels) + ? profileChannels.filter((pc) => pc.enabled).map((pc) => pc.id) + : profiles[selectedProfileId]?.channels instanceof Set + ? Array.from(profiles[selectedProfileId].channels) + : []; result = result.filter((channel) => enabledChannelIds.includes(channel.id) @@ -1210,13 +1196,51 @@ export default function TVChannelGuide({ startDate, endDate }) { paddingLeft: 0, // Remove any padding that might push content }} > - {channelPrograms.map((program) => { - return ( + {channelPrograms.length > 0 ? ( + channelPrograms.map((program) => (
{renderProgram(program, start)}
- ); - })} + )) + ) : ( + // Simple placeholder for channels with no program data - 2 hour blocks + <> + {/* Generate repeating placeholder blocks every 2 hours across the timeline */} + {Array.from({ length: Math.ceil(hourTimeline.length / 2) }).map((_, index) => ( + + + + No Program Information Available + + + + ))} + + )}
); diff --git a/frontend/src/pages/Stats.jsx b/frontend/src/pages/Stats.jsx index fc6705b0..fa3250ef 100644 --- a/frontend/src/pages/Stats.jsx +++ b/frontend/src/pages/Stats.jsx @@ -692,25 +692,40 @@ const ChannelsPage = () => { }, [channelStats, channels, channelsByUUID, streamProfiles]); return ( - - {Object.keys(activeChannels).length === 0 ? ( - - No active channels currently streaming - - ) : ( - Object.values(activeChannels).map((channel) => ( - - )) - )} - + + + {Object.keys(activeChannels).length === 0 ? ( + + No active channels currently streaming + + ) : ( + Object.values(activeChannels).map((channel) => ( + + + + )) + )} + + ); }; diff --git a/requirements.txt b/requirements.txt index 7d7117f4..daf3356e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,6 +21,7 @@ rapidfuzz==3.12.1 # PyTorch dependencies (CPU only) --extra-index-url https://download.pytorch.org/whl/cpu/ torch==2.6.0+cpu +tzlocal # ML/NLP dependencies sentence-transformers==3.4.1 @@ -28,3 +29,4 @@ channels channels-redis django-filter django-celery-beat +lxml==5.4.0 diff --git a/scripts/epg_match.py b/scripts/epg_match.py index ed86d865..890ffe3a 100644 --- a/scripts/epg_match.py +++ b/scripts/epg_match.py @@ -4,11 +4,15 @@ import sys import json import re import os -import sys +import logging + from rapidfuzz import fuzz from sentence_transformers import util from sentence_transformers import SentenceTransformer as st +# Set up logger +logger = logging.getLogger(__name__) + # Load the sentence-transformers model once at the module level SENTENCE_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" MODEL_PATH = os.path.join("/app", "models", "all-MiniLM-L6-v2") @@ -18,18 +22,15 @@ BEST_FUZZY_THRESHOLD = 85 LOWER_FUZZY_THRESHOLD = 40 EMBED_SIM_THRESHOLD = 0.65 -def eprint(*args, **kwargs): - print(*args, file=sys.stderr, **kwargs) - def process_data(input_data): os.makedirs(MODEL_PATH, exist_ok=True) # If not present locally, download: if not os.path.exists(os.path.join(MODEL_PATH, "config.json")): - eprint(f"Local model not found in {MODEL_PATH}; downloading from {SENTENCE_MODEL_NAME}...") + logger.info(f"Local model not found in {MODEL_PATH}; downloading from {SENTENCE_MODEL_NAME}...") st_model = st(SENTENCE_MODEL_NAME, cache_folder=MODEL_PATH) else: - eprint(f"Loading local model from {MODEL_PATH}") + logger.info(f"Loading local model from {MODEL_PATH}") st_model = st(MODEL_PATH) channels = input_data["channels"] @@ -59,7 +60,7 @@ def process_data(input_data): # Add to matched_channels list so it's counted in the total matched_channels.append((chan['id'], fallback_name, epg_by_tvg_id["tvg_id"])) - eprint(f"Channel {chan['id']} '{fallback_name}' => EPG found by tvg_id={epg_by_tvg_id['tvg_id']}") + logger.info(f"Channel {chan['id']} '{fallback_name}' => EPG found by tvg_id={epg_by_tvg_id['tvg_id']}") continue # If channel has a tvg_id that doesn't exist in EPGData, do direct check. @@ -67,15 +68,14 @@ def process_data(input_data): if chan["tvg_id"]: epg_match = [epg["id"] for epg in epg_data if epg["tvg_id"] == chan["tvg_id"]] if epg_match: - # Fix: Access the first element directly since epg_match contains the IDs themselves - chan["epg_data_id"] = epg_match[0] # Directly use the integer ID - eprint(f"Channel {chan['id']} '{chan['name']}' => EPG found by tvg_id={chan['tvg_id']}") + chan["epg_data_id"] = epg_match[0] + logger.info(f"Channel {chan['id']} '{chan['name']}' => EPG found by tvg_id={chan['tvg_id']}") channels_to_update.append(chan) continue # C) Perform name-based fuzzy matching if not chan["norm_chan"]: - eprint(f"Channel {chan['id']} '{chan['name']}' => empty after normalization, skipping") + logger.debug(f"Channel {chan['id']} '{chan['name']}' => empty after normalization, skipping") continue best_score = 0 @@ -99,7 +99,7 @@ def process_data(input_data): bonus = 15 score = base_score + bonus - eprint( + logger.debug( f"Channel {chan['id']} '{fallback_name}' => EPG row {row['id']}: " f"name='{row['name']}', norm_name='{row['norm_name']}', " f"combined_text='{combined_text}', dot_regions={dot_regions}, " @@ -112,7 +112,7 @@ def process_data(input_data): # If no best match was found, skip if not best_epg: - eprint(f"Channel {chan['id']} '{fallback_name}' => no EPG match at all.") + logger.debug(f"Channel {chan['id']} '{fallback_name}' => no EPG match at all.") continue # If best_score is above BEST_FUZZY_THRESHOLD => direct accept @@ -121,7 +121,7 @@ def process_data(input_data): channels_to_update.append(chan) matched_channels.append((chan['id'], fallback_name, best_epg["tvg_id"])) - eprint( + logger.info( f"Channel {chan['id']} '{fallback_name}' => matched tvg_id={best_epg['tvg_id']} " f"(score={best_score})" ) @@ -138,27 +138,35 @@ def process_data(input_data): channels_to_update.append(chan) matched_channels.append((chan['id'], fallback_name, matched_epg["tvg_id"])) - eprint( + logger.info( f"Channel {chan['id']} '{fallback_name}' => matched EPG tvg_id={matched_epg['tvg_id']} " f"(fuzzy={best_score}, cos-sim={top_value:.2f})" ) else: - eprint( + logger.info( f"Channel {chan['id']} '{fallback_name}' => fuzzy={best_score}, " f"cos-sim={top_value:.2f} < {EMBED_SIM_THRESHOLD}, skipping" ) else: - eprint( - f"Channel {chan['id']} '{fallback_name}' => fuzzy={best_score} < " - f"{LOWER_FUZZY_THRESHOLD}, skipping" + # No good match found - fuzzy score is too low + logger.info( + f"Channel {chan['id']} '{fallback_name}' => best fuzzy match score={best_score} < {LOWER_FUZZY_THRESHOLD}, skipping" ) return { "channels_to_update": channels_to_update, - "matched_channels": matched_channels, + "matched_channels": matched_channels } def main(): + # Configure logging + logging_level = os.environ.get('DISPATCHARR_LOG_LEVEL', 'INFO') + logging.basicConfig( + level=getattr(logging, logging_level), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + stream=sys.stderr + ) + # Read input data from a file input_file_path = sys.argv[1] with open(input_file_path, 'r') as f: