filesystem watch and process of m3u and epg

This commit is contained in:
dekzter 2025-04-06 15:58:55 -04:00
parent e185fbcda6
commit 354cd84c88
13 changed files with 296 additions and 27 deletions

View file

@ -1,8 +1,9 @@
import logging
import logging, os
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.decorators import action
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from django.utils import timezone
@ -26,6 +27,29 @@ class EPGSourceViewSet(viewsets.ModelViewSet):
logger.debug("Listing all EPG sources.")
return super().list(request, *args, **kwargs)
@action(detail=False, methods=['post'])
def upload(self, request):
if 'file' not in request.FILES:
return Response({'error': 'No file uploaded'}, status=status.HTTP_400_BAD_REQUEST)
file = request.FILES['file']
file_name = file.name
file_path = os.path.join('/data/uploads/epgs', file_name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'wb+') as destination:
for chunk in file.chunks():
destination.write(chunk)
new_obj_data = request.data.copy()
new_obj_data['file_path'] = file_path
serializer = self.get_serializer(data=new_obj_data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
return Response(serializer.data, status=status.HTTP_201_CREATED)
# ─────────────────────────────
# 2) Program API (CRUD)
# ─────────────────────────────

View file

@ -53,6 +53,9 @@ def refresh_epg_data(source_id):
release_task_lock('refresh_epg_data', source_id)
def fetch_xmltv(source):
if not source.url:
return
logger.info(f"Fetching XMLTV data from source: {source.name}")
try:
response = requests.get(source.url, timeout=30)

View file

@ -7,6 +7,8 @@ from drf_yasg import openapi
from django.shortcuts import get_object_or_404
from django.http import JsonResponse
from django.core.cache import cache
import os
from rest_framework.decorators import action
# Import all models, including UserAgent.
from .models import M3UAccount, M3UFilter, ServerGroup, M3UAccountProfile
@ -29,6 +31,29 @@ class M3UAccountViewSet(viewsets.ModelViewSet):
serializer_class = M3UAccountSerializer
permission_classes = [IsAuthenticated]
@action(detail=False, methods=['post'])
def upload(self, request):
if 'file' not in request.FILES:
return Response({'error': 'No file uploaded'}, status=status.HTTP_400_BAD_REQUEST)
file = request.FILES['file']
file_name = file.name
file_path = os.path.join('/data/uploads/m3us', file_name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'wb+') as destination:
for chunk in file.chunks():
destination.write(chunk)
new_obj_data = request.data.copy()
new_obj_data['file_path'] = file_path
serializer = self.get_serializer(data=new_obj_data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
return Response(serializer.data, status=status.HTTP_201_CREATED)
class M3UFilterViewSet(viewsets.ModelViewSet):
"""Handles CRUD operations for M3U filters"""
queryset = M3UFilter.objects.all()

View file

@ -0,0 +1,22 @@
# Generated by Django 5.1.6 on 2025-04-06 19:09
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('m3u', '0006_populate_periodic_tasks'),
]
operations = [
migrations.RemoveField(
model_name='m3uaccount',
name='uploaded_file',
),
migrations.AddField(
model_name='m3uaccount',
name='file_path',
field=models.CharField(blank=True, max_length=255, null=True),
),
]

View file

@ -20,8 +20,8 @@ class M3UAccount(models.Model):
null=True,
help_text="The base URL of the M3U server (optional if a file is uploaded)"
)
uploaded_file = models.FileField(
upload_to='m3u_uploads/',
file_path = models.CharField(
max_length=255,
blank=True,
null=True
)

View file

@ -63,7 +63,7 @@ class M3UAccountSerializer(serializers.ModelSerializer):
class Meta:
model = M3UAccount
fields = [
'id', 'name', 'server_url', 'uploaded_file', 'server_group',
'id', 'name', 'server_url', 'file_path', 'server_group',
'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked',
'channel_groups', 'refresh_interval'
]

View file

@ -4,6 +4,7 @@ import re
import requests
import os
import gc
import gzip, zipfile
from celery.app.control import Inspect
from celery.result import AsyncResult
from celery import shared_task, current_app, group
@ -38,26 +39,40 @@ def fetch_m3u_lines(account, use_cache=False):
logger.info(f"Fetching from URL {account.server_url}")
try:
response = requests.get(account.server_url, headers=headers, stream=True)
response.raise_for_status() # This will raise an HTTPError if the status is not 200
response.raise_for_status()
with open(file_path, 'wb') as file:
# Stream the content in chunks and write to the file
for chunk in response.iter_content(chunk_size=8192): # You can adjust the chunk size
if chunk: # Ensure chunk is not empty
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching M3U from URL {account.server_url}: {e}")
return [] # Return an empty list in case of error
return []
with open(file_path, 'r', encoding='utf-8') as f:
return f.readlines()
elif account.uploaded_file:
elif account.file_path:
try:
# Open the file and return the lines as a list or iterator
with open(account.uploaded_file.path, 'r', encoding='utf-8') as f:
return f.readlines() # Ensure you return lines from the file, not the file object
except IOError as e:
logger.error(f"Error opening file {account.uploaded_file.path}: {e}")
return [] # Return an empty list in case of error
if account.file_path.endswith('.gz'):
with gzip.open(account.file_path, 'rt', encoding='utf-8') as f:
return f.readlines()
elif account.file_path.endswith('.zip'):
with zipfile.ZipFile(account.file_path, 'r') as zip_file:
for name in zip_file.namelist():
if name.endswith('.m3u'):
with zip_file.open(name) as f:
return [line.decode('utf-8') for line in f.readlines()]
logger.warning(f"No .m3u file found in ZIP archive: {account.file_path}")
return []
else:
with open(account.file_path, 'r', encoding='utf-8') as f:
return f.readlines()
except (IOError, OSError, zipfile.BadZipFile, gzip.BadGzipFile) as e:
logger.error(f"Error opening file {account.file_path}: {e}")
return []
# Return an empty list if neither server_url nor uploaded_file is available
return []
@ -247,11 +262,13 @@ def process_m3u_batch(account_id, batch, groups, hash_keys):
except Exception as e:
logger.error(f"Bulk create failed: {str(e)}")
retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
# Aggressive garbage collection
del streams_to_create, streams_to_update, stream_hash, existing_streams
gc.collect()
return f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
return retval
def cleanup_streams(account_id):
account = M3UAccount.objects.get(id=account_id, is_active=True)

159
core/tasks.py Normal file
View file

@ -0,0 +1,159 @@
# yourapp/tasks.py
from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
import redis
import json
import logging
import re
import time
import os
from core.utils import RedisClient
from apps.proxy.ts_proxy.channel_status import ChannelStatus
from apps.m3u.models import M3UAccount
from apps.epg.models import EPGSource
from apps.m3u.tasks import refresh_single_m3u_account
from apps.epg.tasks import refresh_epg_data
logger = logging.getLogger(__name__)
EPG_WATCH_DIR = '/data/epgs'
M3U_WATCH_DIR = '/data/m3us'
MIN_AGE_SECONDS = 6
STARTUP_SKIP_AGE = 30
REDIS_PREFIX = "processed_file:"
REDIS_TTL = 60 * 60 * 24 * 3 # expire keys after 3 days (optional)
# Store the last known value to compare with new data
last_known_data = {}
@shared_task
def beat_periodic_task():
fetch_channel_stats()
scan_and_process_files()
@shared_task
def scan_and_process_files():
redis_client = RedisClient.get_client()
now = time.time()
for filename in os.listdir(M3U_WATCH_DIR):
filepath = os.path.join(M3U_WATCH_DIR, filename)
if not os.path.isfile(filepath):
continue
mtime = os.path.getmtime(filepath)
age = now - mtime
redis_key = REDIS_PREFIX + filepath
stored_mtime = redis_client.get(redis_key)
# Startup safety: skip old untracked files
if not stored_mtime and age > STARTUP_SKIP_AGE:
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
continue # Assume already processed before startup
# File too new — probably still being written
if age < MIN_AGE_SECONDS:
continue
# Skip if we've already processed this mtime
if stored_mtime and float(stored_mtime) >= mtime:
continue
m3u_account, _ = M3UAccount.objects.get_or_create(file_path=filepath, defaults={
"name": filename,
})
refresh_single_m3u_account.delay(m3u_account.id)
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"updates",
{
"type": "update",
"data": {"success": True, "type": "m3u_file", "filename": filename}
},
)
for filename in os.listdir(EPG_WATCH_DIR):
filepath = os.path.join(EPG_WATCH_DIR, filename)
if not os.path.isfile(filepath):
continue
mtime = os.path.getmtime(filepath)
age = now - mtime
redis_key = REDIS_PREFIX + filepath
stored_mtime = redis_client.get(redis_key)
# Startup safety: skip old untracked files
if not stored_mtime and age > STARTUP_SKIP_AGE:
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
continue # Assume already processed before startup
# File too new — probably still being written
if age < MIN_AGE_SECONDS:
continue
# Skip if we've already processed this mtime
if stored_mtime and float(stored_mtime) >= mtime:
continue
epg_source, _ = EPGSource.objects.get_or_create(file_path=filepath, defaults={
"name": filename,
"source_type": "xmltv",
})
refresh_epg_data.delay(epg_source.id) # Trigger Celery task
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"updates",
{
"type": "update",
"data": {"success": True, "type": "epg_file", "filename": filename}
},
)
def fetch_channel_stats():
redis_client = RedisClient.get_client()
try:
# Basic info for all channels
channel_pattern = "ts_proxy:channel:*:metadata"
all_channels = []
# Extract channel IDs from keys
cursor = 0
while True:
cursor, keys = redis_client.scan(cursor, match=channel_pattern)
for key in keys:
channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8'))
if channel_id_match:
ch_id = channel_id_match.group(1)
channel_info = ChannelStatus.get_basic_channel_info(ch_id)
if channel_info:
all_channels.append(channel_info)
if cursor == 0:
break
except Exception as e:
logger.error(f"Error in channel_status: {e}", exc_info=True)
return
# return JsonResponse({'error': str(e)}, status=500)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"updates",
{
"type": "update",
"data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})}
},
)

View file

@ -174,7 +174,7 @@ CELERY_TASK_SERIALIZER = 'json'
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler"
CELERY_BEAT_SCHEDULE = {
'fetch-channel-statuses': {
'task': 'apps.proxy.tasks.fetch_channel_stats',
'task': 'core.tasks.beat_periodic_task',
'schedule': 2.0,
},
}

View file

@ -2,6 +2,10 @@
mkdir -p /data/logos
mkdir -p /data/recordings
mkdir -p /data/uploads/m3us
mkdir -p /data/uploads/epgs
mkdir -p /data/m3us
mkdir -p /data/epgs
mkdir -p /app/logo_cache
mkdir -p /app/media

View file

@ -22,7 +22,7 @@ export const WebsocketProvider = ({ children }) => {
useChannelsStore();
const { fetchPlaylists, setRefreshProgress, setProfilePreview } =
usePlaylistsStore();
const { fetchEPGData } = useEPGsStore();
const { fetchEPGData, fetchEPGs } = useEPGsStore();
const ws = useRef(null);
@ -57,6 +57,22 @@ export const WebsocketProvider = ({ children }) => {
socket.onmessage = async (event) => {
event = JSON.parse(event.data);
switch (event.data.type) {
case 'epg_file':
fetchEPGs();
notifications.show({
title: 'EPG File Detected',
message: `Processing ${event.data.filename}`,
});
break;
case 'm3u_file':
fetchPlaylists();
notifications.show({
title: 'M3U File Detected',
message: `Processing ${event.data.filename}`,
});
break;
case 'm3u_group_refresh':
fetchChannelGroups();
fetchPlaylists();

View file

@ -500,14 +500,14 @@ export default class API {
static async addPlaylist(values) {
let body = null;
if (values.uploaded_file) {
if (values.file) {
body = new FormData();
for (const prop in values) {
body.append(prop, values[prop]);
}
} else {
body = { ...values };
delete body.uploaded_file;
delete body.file;
body = JSON.stringify(body);
}
@ -515,7 +515,7 @@ export default class API {
method: 'POST',
headers: {
Authorization: `Bearer ${await API.getAuthToken()}`,
...(values.uploaded_file
...(values.file
? {}
: {
'Content-Type': 'application/json',

View file

@ -63,13 +63,12 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
await API.updatePlaylist({
id: playlist.id,
...values,
uploaded_file: file,
file,
});
} else {
setLoadingText('Fetching groups');
newPlaylist = await API.addPlaylist({
...values,
uploaded_file: file,
file,
});
notifications.show({
@ -160,10 +159,10 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
/>
<FileInput
id="uploaded_file"
id="file"
label="Upload files"
placeholder="Upload files"
value={formik.uploaded_file}
value={formik.file}
onChange={handleFileChange}
/>
</Stack>