From 354cd84c884fcf894495bf6d553c111c00bcb27d Mon Sep 17 00:00:00 2001 From: dekzter Date: Sun, 6 Apr 2025 15:58:55 -0400 Subject: [PATCH] filesystem watch and process of m3u and epg --- apps/epg/api_views.py | 26 ++- apps/epg/tasks.py | 3 + apps/m3u/api_views.py | 25 +++ ...ount_uploaded_file_m3uaccount_file_path.py | 22 +++ apps/m3u/models.py | 4 +- apps/m3u/serializers.py | 2 +- apps/m3u/tasks.py | 43 +++-- core/tasks.py | 159 ++++++++++++++++++ dispatcharr/settings.py | 2 +- docker/init/03-init-dispatcharr.sh | 4 + frontend/src/WebSocket.jsx | 18 +- frontend/src/api.js | 6 +- frontend/src/components/forms/M3U.jsx | 9 +- 13 files changed, 296 insertions(+), 27 deletions(-) create mode 100644 apps/m3u/migrations/0007_remove_m3uaccount_uploaded_file_m3uaccount_file_path.py create mode 100644 core/tasks.py diff --git a/apps/epg/api_views.py b/apps/epg/api_views.py index 74ef9380..f0bf4792 100644 --- a/apps/epg/api_views.py +++ b/apps/epg/api_views.py @@ -1,8 +1,9 @@ -import logging +import logging, os from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated +from rest_framework.decorators import action from drf_yasg.utils import swagger_auto_schema from drf_yasg import openapi from django.utils import timezone @@ -26,6 +27,29 @@ class EPGSourceViewSet(viewsets.ModelViewSet): logger.debug("Listing all EPG sources.") return super().list(request, *args, **kwargs) + @action(detail=False, methods=['post']) + def upload(self, request): + if 'file' not in request.FILES: + return Response({'error': 'No file uploaded'}, status=status.HTTP_400_BAD_REQUEST) + + file = request.FILES['file'] + file_name = file.name + file_path = os.path.join('/data/uploads/epgs', file_name) + + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'wb+') as destination: + for chunk in file.chunks(): + destination.write(chunk) + + new_obj_data = request.data.copy() + new_obj_data['file_path'] = file_path + + serializer = self.get_serializer(data=new_obj_data) + serializer.is_valid(raise_exception=True) + self.perform_create(serializer) + + return Response(serializer.data, status=status.HTTP_201_CREATED) + # ───────────────────────────── # 2) Program API (CRUD) # ───────────────────────────── diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index fd75ec81..3b84df6d 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -53,6 +53,9 @@ def refresh_epg_data(source_id): release_task_lock('refresh_epg_data', source_id) def fetch_xmltv(source): + if not source.url: + return + logger.info(f"Fetching XMLTV data from source: {source.name}") try: response = requests.get(source.url, timeout=30) diff --git a/apps/m3u/api_views.py b/apps/m3u/api_views.py index e3d3b9d1..8737a07a 100644 --- a/apps/m3u/api_views.py +++ b/apps/m3u/api_views.py @@ -7,6 +7,8 @@ from drf_yasg import openapi from django.shortcuts import get_object_or_404 from django.http import JsonResponse from django.core.cache import cache +import os +from rest_framework.decorators import action # Import all models, including UserAgent. from .models import M3UAccount, M3UFilter, ServerGroup, M3UAccountProfile @@ -29,6 +31,29 @@ class M3UAccountViewSet(viewsets.ModelViewSet): serializer_class = M3UAccountSerializer permission_classes = [IsAuthenticated] + @action(detail=False, methods=['post']) + def upload(self, request): + if 'file' not in request.FILES: + return Response({'error': 'No file uploaded'}, status=status.HTTP_400_BAD_REQUEST) + + file = request.FILES['file'] + file_name = file.name + file_path = os.path.join('/data/uploads/m3us', file_name) + + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'wb+') as destination: + for chunk in file.chunks(): + destination.write(chunk) + + new_obj_data = request.data.copy() + new_obj_data['file_path'] = file_path + + serializer = self.get_serializer(data=new_obj_data) + serializer.is_valid(raise_exception=True) + self.perform_create(serializer) + + return Response(serializer.data, status=status.HTTP_201_CREATED) + class M3UFilterViewSet(viewsets.ModelViewSet): """Handles CRUD operations for M3U filters""" queryset = M3UFilter.objects.all() diff --git a/apps/m3u/migrations/0007_remove_m3uaccount_uploaded_file_m3uaccount_file_path.py b/apps/m3u/migrations/0007_remove_m3uaccount_uploaded_file_m3uaccount_file_path.py new file mode 100644 index 00000000..086eff29 --- /dev/null +++ b/apps/m3u/migrations/0007_remove_m3uaccount_uploaded_file_m3uaccount_file_path.py @@ -0,0 +1,22 @@ +# Generated by Django 5.1.6 on 2025-04-06 19:09 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('m3u', '0006_populate_periodic_tasks'), + ] + + operations = [ + migrations.RemoveField( + model_name='m3uaccount', + name='uploaded_file', + ), + migrations.AddField( + model_name='m3uaccount', + name='file_path', + field=models.CharField(blank=True, max_length=255, null=True), + ), + ] diff --git a/apps/m3u/models.py b/apps/m3u/models.py index e324e690..cc84d768 100644 --- a/apps/m3u/models.py +++ b/apps/m3u/models.py @@ -20,8 +20,8 @@ class M3UAccount(models.Model): null=True, help_text="The base URL of the M3U server (optional if a file is uploaded)" ) - uploaded_file = models.FileField( - upload_to='m3u_uploads/', + file_path = models.CharField( + max_length=255, blank=True, null=True ) diff --git a/apps/m3u/serializers.py b/apps/m3u/serializers.py index b977486a..e7dbfcea 100644 --- a/apps/m3u/serializers.py +++ b/apps/m3u/serializers.py @@ -63,7 +63,7 @@ class M3UAccountSerializer(serializers.ModelSerializer): class Meta: model = M3UAccount fields = [ - 'id', 'name', 'server_url', 'uploaded_file', 'server_group', + 'id', 'name', 'server_url', 'file_path', 'server_group', 'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked', 'channel_groups', 'refresh_interval' ] diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 460bb181..e2de2af3 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -4,6 +4,7 @@ import re import requests import os import gc +import gzip, zipfile from celery.app.control import Inspect from celery.result import AsyncResult from celery import shared_task, current_app, group @@ -38,26 +39,40 @@ def fetch_m3u_lines(account, use_cache=False): logger.info(f"Fetching from URL {account.server_url}") try: response = requests.get(account.server_url, headers=headers, stream=True) - response.raise_for_status() # This will raise an HTTPError if the status is not 200 + response.raise_for_status() with open(file_path, 'wb') as file: - # Stream the content in chunks and write to the file - for chunk in response.iter_content(chunk_size=8192): # You can adjust the chunk size - if chunk: # Ensure chunk is not empty + for chunk in response.iter_content(chunk_size=8192): + if chunk: file.write(chunk) except requests.exceptions.RequestException as e: logger.error(f"Error fetching M3U from URL {account.server_url}: {e}") - return [] # Return an empty list in case of error + return [] with open(file_path, 'r', encoding='utf-8') as f: return f.readlines() - elif account.uploaded_file: + elif account.file_path: try: - # Open the file and return the lines as a list or iterator - with open(account.uploaded_file.path, 'r', encoding='utf-8') as f: - return f.readlines() # Ensure you return lines from the file, not the file object - except IOError as e: - logger.error(f"Error opening file {account.uploaded_file.path}: {e}") - return [] # Return an empty list in case of error + if account.file_path.endswith('.gz'): + with gzip.open(account.file_path, 'rt', encoding='utf-8') as f: + return f.readlines() + + elif account.file_path.endswith('.zip'): + with zipfile.ZipFile(account.file_path, 'r') as zip_file: + for name in zip_file.namelist(): + if name.endswith('.m3u'): + with zip_file.open(name) as f: + return [line.decode('utf-8') for line in f.readlines()] + logger.warning(f"No .m3u file found in ZIP archive: {account.file_path}") + return [] + + else: + with open(account.file_path, 'r', encoding='utf-8') as f: + return f.readlines() + + except (IOError, OSError, zipfile.BadZipFile, gzip.BadGzipFile) as e: + logger.error(f"Error opening file {account.file_path}: {e}") + return [] + # Return an empty list if neither server_url nor uploaded_file is available return [] @@ -247,11 +262,13 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): except Exception as e: logger.error(f"Bulk create failed: {str(e)}") + retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." + # Aggressive garbage collection del streams_to_create, streams_to_update, stream_hash, existing_streams gc.collect() - return f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." + return retval def cleanup_streams(account_id): account = M3UAccount.objects.get(id=account_id, is_active=True) diff --git a/core/tasks.py b/core/tasks.py new file mode 100644 index 00000000..62c20b3e --- /dev/null +++ b/core/tasks.py @@ -0,0 +1,159 @@ +# yourapp/tasks.py +from celery import shared_task +from channels.layers import get_channel_layer +from asgiref.sync import async_to_sync +import redis +import json +import logging +import re +import time +import os +from core.utils import RedisClient +from apps.proxy.ts_proxy.channel_status import ChannelStatus +from apps.m3u.models import M3UAccount +from apps.epg.models import EPGSource +from apps.m3u.tasks import refresh_single_m3u_account +from apps.epg.tasks import refresh_epg_data + +logger = logging.getLogger(__name__) + +EPG_WATCH_DIR = '/data/epgs' +M3U_WATCH_DIR = '/data/m3us' +MIN_AGE_SECONDS = 6 +STARTUP_SKIP_AGE = 30 +REDIS_PREFIX = "processed_file:" +REDIS_TTL = 60 * 60 * 24 * 3 # expire keys after 3 days (optional) + +# Store the last known value to compare with new data +last_known_data = {} + +@shared_task +def beat_periodic_task(): + fetch_channel_stats() + scan_and_process_files() + +@shared_task +def scan_and_process_files(): + redis_client = RedisClient.get_client() + now = time.time() + + for filename in os.listdir(M3U_WATCH_DIR): + filepath = os.path.join(M3U_WATCH_DIR, filename) + + if not os.path.isfile(filepath): + continue + + mtime = os.path.getmtime(filepath) + age = now - mtime + redis_key = REDIS_PREFIX + filepath + stored_mtime = redis_client.get(redis_key) + + # Startup safety: skip old untracked files + if not stored_mtime and age > STARTUP_SKIP_AGE: + redis_client.set(redis_key, mtime, ex=REDIS_TTL) + continue # Assume already processed before startup + + # File too new — probably still being written + if age < MIN_AGE_SECONDS: + continue + + # Skip if we've already processed this mtime + if stored_mtime and float(stored_mtime) >= mtime: + continue + + + m3u_account, _ = M3UAccount.objects.get_or_create(file_path=filepath, defaults={ + "name": filename, + }) + + refresh_single_m3u_account.delay(m3u_account.id) + redis_client.set(redis_key, mtime, ex=REDIS_TTL) + redis_client.set(redis_key, mtime, ex=REDIS_TTL) + + channel_layer = get_channel_layer() + async_to_sync(channel_layer.group_send)( + "updates", + { + "type": "update", + "data": {"success": True, "type": "m3u_file", "filename": filename} + }, + ) + + for filename in os.listdir(EPG_WATCH_DIR): + filepath = os.path.join(EPG_WATCH_DIR, filename) + + if not os.path.isfile(filepath): + continue + + mtime = os.path.getmtime(filepath) + age = now - mtime + redis_key = REDIS_PREFIX + filepath + stored_mtime = redis_client.get(redis_key) + + # Startup safety: skip old untracked files + if not stored_mtime and age > STARTUP_SKIP_AGE: + redis_client.set(redis_key, mtime, ex=REDIS_TTL) + continue # Assume already processed before startup + + # File too new — probably still being written + if age < MIN_AGE_SECONDS: + continue + + # Skip if we've already processed this mtime + if stored_mtime and float(stored_mtime) >= mtime: + continue + + epg_source, _ = EPGSource.objects.get_or_create(file_path=filepath, defaults={ + "name": filename, + "source_type": "xmltv", + }) + + refresh_epg_data.delay(epg_source.id) # Trigger Celery task + redis_client.set(redis_key, mtime, ex=REDIS_TTL) + redis_client.set(redis_key, mtime, ex=REDIS_TTL) + + channel_layer = get_channel_layer() + async_to_sync(channel_layer.group_send)( + "updates", + { + "type": "update", + "data": {"success": True, "type": "epg_file", "filename": filename} + }, + ) + +def fetch_channel_stats(): + redis_client = RedisClient.get_client() + + try: + # Basic info for all channels + channel_pattern = "ts_proxy:channel:*:metadata" + all_channels = [] + + # Extract channel IDs from keys + cursor = 0 + while True: + cursor, keys = redis_client.scan(cursor, match=channel_pattern) + for key in keys: + channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8')) + if channel_id_match: + ch_id = channel_id_match.group(1) + channel_info = ChannelStatus.get_basic_channel_info(ch_id) + if channel_info: + all_channels.append(channel_info) + + if cursor == 0: + break + + except Exception as e: + logger.error(f"Error in channel_status: {e}", exc_info=True) + return + # return JsonResponse({'error': str(e)}, status=500) + + channel_layer = get_channel_layer() + async_to_sync(channel_layer.group_send)( + "updates", + { + "type": "update", + "data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})} + }, + ) diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 59b3af0c..92f77eb9 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -174,7 +174,7 @@ CELERY_TASK_SERIALIZER = 'json' CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler" CELERY_BEAT_SCHEDULE = { 'fetch-channel-statuses': { - 'task': 'apps.proxy.tasks.fetch_channel_stats', + 'task': 'core.tasks.beat_periodic_task', 'schedule': 2.0, }, } diff --git a/docker/init/03-init-dispatcharr.sh b/docker/init/03-init-dispatcharr.sh index 78417d35..9417acd8 100644 --- a/docker/init/03-init-dispatcharr.sh +++ b/docker/init/03-init-dispatcharr.sh @@ -2,6 +2,10 @@ mkdir -p /data/logos mkdir -p /data/recordings +mkdir -p /data/uploads/m3us +mkdir -p /data/uploads/epgs +mkdir -p /data/m3us +mkdir -p /data/epgs mkdir -p /app/logo_cache mkdir -p /app/media diff --git a/frontend/src/WebSocket.jsx b/frontend/src/WebSocket.jsx index 4fa08216..73939cf3 100644 --- a/frontend/src/WebSocket.jsx +++ b/frontend/src/WebSocket.jsx @@ -22,7 +22,7 @@ export const WebsocketProvider = ({ children }) => { useChannelsStore(); const { fetchPlaylists, setRefreshProgress, setProfilePreview } = usePlaylistsStore(); - const { fetchEPGData } = useEPGsStore(); + const { fetchEPGData, fetchEPGs } = useEPGsStore(); const ws = useRef(null); @@ -57,6 +57,22 @@ export const WebsocketProvider = ({ children }) => { socket.onmessage = async (event) => { event = JSON.parse(event.data); switch (event.data.type) { + case 'epg_file': + fetchEPGs(); + notifications.show({ + title: 'EPG File Detected', + message: `Processing ${event.data.filename}`, + }); + break; + + case 'm3u_file': + fetchPlaylists(); + notifications.show({ + title: 'M3U File Detected', + message: `Processing ${event.data.filename}`, + }); + break; + case 'm3u_group_refresh': fetchChannelGroups(); fetchPlaylists(); diff --git a/frontend/src/api.js b/frontend/src/api.js index 2d40959f..9e1dfd55 100644 --- a/frontend/src/api.js +++ b/frontend/src/api.js @@ -500,14 +500,14 @@ export default class API { static async addPlaylist(values) { let body = null; - if (values.uploaded_file) { + if (values.file) { body = new FormData(); for (const prop in values) { body.append(prop, values[prop]); } } else { body = { ...values }; - delete body.uploaded_file; + delete body.file; body = JSON.stringify(body); } @@ -515,7 +515,7 @@ export default class API { method: 'POST', headers: { Authorization: `Bearer ${await API.getAuthToken()}`, - ...(values.uploaded_file + ...(values.file ? {} : { 'Content-Type': 'application/json', diff --git a/frontend/src/components/forms/M3U.jsx b/frontend/src/components/forms/M3U.jsx index f0feb28a..0b3ce020 100644 --- a/frontend/src/components/forms/M3U.jsx +++ b/frontend/src/components/forms/M3U.jsx @@ -63,13 +63,12 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { await API.updatePlaylist({ id: playlist.id, ...values, - uploaded_file: file, + file, }); } else { - setLoadingText('Fetching groups'); newPlaylist = await API.addPlaylist({ ...values, - uploaded_file: file, + file, }); notifications.show({ @@ -160,10 +159,10 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { />