diff --git a/apps/channels/tasks.py b/apps/channels/tasks.py index 4fd11adf..ce08e57c 100644 --- a/apps/channels/tasks.py +++ b/apps/channels/tasks.py @@ -4,11 +4,11 @@ import os import re import requests import time +import gc from datetime import datetime from celery import shared_task from rapidfuzz import fuzz -from sentence_transformers import util from django.conf import settings from django.db import transaction from django.utils.text import slugify @@ -67,6 +67,8 @@ def match_epg_channels(): 4) If a match is found, we set channel.tvg_id 5) Summarize and log results. """ + from sentence_transformers import util + logger.info("Starting EPG matching logic...") st_model = SentenceTransformer.get_model() @@ -222,6 +224,8 @@ def match_epg_channels(): } ) + SentenceTransformer.clear() + gc.collect() return f"Done. Matched {total_matched} channel(s)." @shared_task diff --git a/apps/m3u/signals.py b/apps/m3u/signals.py index 1a885752..07e774b2 100644 --- a/apps/m3u/signals.py +++ b/apps/m3u/signals.py @@ -14,7 +14,7 @@ def refresh_account_on_save(sender, instance, created, **kwargs): if it is active or newly created. """ if created: - refresh_single_m3u_account.delay(instance.id) + refresh_m3u_groups(instance.id) @receiver(post_save, sender=M3UAccount) def create_or_update_refresh_task(sender, instance, **kwargs): diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index b99105c7..b5b4fd10 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -3,6 +3,7 @@ import logging import re import requests import os +import gc from celery.app.control import Inspect from celery.result import AsyncResult from celery import shared_task, current_app, group @@ -160,21 +161,15 @@ def process_groups(account, group_names): ) @shared_task -def process_m3u_batch(account_id, batch, group_names, hash_keys): +def process_m3u_batch(account_id, batch, groups, hash_keys): """Processes a batch of M3U streams using bulk operations.""" account = M3UAccount.objects.get(id=account_id) - existing_groups = {group.name: group for group in ChannelGroup.objects.filter( - m3u_account__m3u_account=account, # Filter by the M3UAccount - m3u_account__enabled=True # Filter by the enabled flag in the join table - )} streams_to_create = [] streams_to_update = [] stream_hashes = {} # compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters] - redis_client = RedisClient.get_client() - logger.debug(f"Processing batch of {len(batch)}") for stream_info in batch: name, url = stream_info["name"], stream_info["url"] @@ -182,7 +177,7 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): group_title = stream_info["attributes"].get("group-title", "Default Group") # Filter out disabled groups for this account - if group_title not in existing_groups: + if group_title not in groups: logger.debug(f"Skipping stream in disabled group: {group_title}") continue @@ -199,18 +194,18 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): try: stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) - if redis_client.exists(f"m3u_refresh:{stream_hash}"): - # duplicate already processed by another batch - continue + # if redis_client.exists(f"m3u_refresh:{stream_hash}"): + # # duplicate already processed by another batch + # continue - redis_client.set(f"m3u_refresh:{stream_hash}", "true") + # redis_client.set(f"m3u_refresh:{stream_hash}", "true") stream_props = { "name": name, "url": url, "logo_url": tvg_logo, "tvg_id": tvg_id, "m3u_account": account, - "channel_group": existing_groups[group_title], + "channel_group_id": int(groups.get(group_title)), "stream_hash": stream_hash, "custom_properties": json.dumps(stream_info["attributes"]), } @@ -226,15 +221,13 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): for stream_hash, stream_props in stream_hashes.items(): if stream_hash in existing_streams: obj = existing_streams[stream_hash] - changed = False - for key, value in stream_props.items(): - if hasattr(obj, key) and getattr(obj, key) == value: - continue - changed = True - setattr(obj, key, value) + existing_attr = {field.name: getattr(obj, field.name) for field in Stream._meta.fields if field != 'channel_group_id'} + changed = any(existing_attr[key] != value for key, value in stream_props.items() if key != 'channel_group_id') - obj.last_seen = timezone.now() if changed: + for key, value in stream_props.items(): + setattr(obj, key, value) + obj.last_seen = timezone.now() streams_to_update.append(obj) del existing_streams[stream_hash] else: @@ -244,15 +237,19 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys): streams_to_create.append(Stream(**stream_props)) try: - if streams_to_create: - Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True) - if streams_to_update: - Stream.objects.bulk_update(streams_to_update, stream_props.keys()) - if len(existing_streams.keys()) > 0: - Stream.objects.bulk_update(existing_streams.values(), ["last_seen"]) + with transaction.atomic(): + if streams_to_create: + Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True) + if streams_to_update: + Stream.objects.bulk_update(streams_to_update, { key for key in stream_props.keys() if key not in ["m3u_account", "stream_hash"] and key not in hash_keys}) + # if len(existing_streams.keys()) > 0: + # Stream.objects.bulk_update(existing_streams.values(), ["last_seen"]) except Exception as e: logger.error(f"Bulk create failed: {str(e)}") + # Aggressive garbage collection + del streams_to_create, streams_to_update, stream_hash, existing_streams + gc.collect() return f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." @@ -276,26 +273,20 @@ def cleanup_streams(account_id): logger.info(f"Cleanup complete") -def refresh_m3u_groups(account_id): +def refresh_m3u_groups(account_id, use_cache=False): if not acquire_task_lock('refresh_m3u_account_groups', account_id): return f"Task already running for account_id={account_id}.", None - # Record start time - start_time = time.time() - try: account = M3UAccount.objects.get(id=account_id, is_active=True) except M3UAccount.DoesNotExist: release_task_lock('refresh_m3u_account_groups', account_id) return f"M3UAccount with ID={account_id} not found or inactive.", None - send_progress_update(0, account_id) - - lines = fetch_m3u_lines(account) extinf_data = [] groups = set(["Default Group"]) - for line in lines: + for line in fetch_m3u_lines(account, use_cache): line = line.strip() if line.startswith("#EXTINF"): parsed = parse_extinf_line(line) @@ -365,9 +356,14 @@ def refresh_single_m3u_account(account_id, use_cache=False): hash_keys = CoreSettings.get_m3u_hash_key().split(",") + existing_groups = {group.name: group.id for group in ChannelGroup.objects.filter( + m3u_account__m3u_account=account, # Filter by the M3UAccount + m3u_account__enabled=True # Filter by the enabled flag in the join table + )} + # Break into batches and process in parallel batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)] - task_group = group(process_m3u_batch.s(account_id, batch, groups, hash_keys) for batch in batches) + task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches) total_batches = len(batches) completed_batches = 0 @@ -409,15 +405,19 @@ def refresh_single_m3u_account(account_id, use_cache=False): print(f"Function took {elapsed_time} seconds to execute.") + # Aggressive garbage collection + del existing_groups, extinf_data, groups, batches + gc.collect() + release_task_lock('refresh_single_m3u_account', account_id) - cursor = 0 - while True: - cursor, keys = redis_client.scan(cursor, match=f"m3u_refresh:*", count=BATCH_SIZE) - if keys: - redis_client.delete(*keys) # Delete the matching keys - if cursor == 0: - break + # cursor = 0 + # while True: + # cursor, keys = redis_client.scan(cursor, match=f"m3u_refresh:*", count=BATCH_SIZE) + # if keys: + # redis_client.delete(*keys) # Delete the matching keys + # if cursor == 0: + # break return f"Dispatched jobs complete." diff --git a/core/utils.py b/core/utils.py index dcf973fb..d6f0b446 100644 --- a/core/utils.py +++ b/core/utils.py @@ -8,6 +8,7 @@ from redis.exceptions import ConnectionError, TimeoutError from django.core.cache import cache from asgiref.sync import async_to_sync from channels.layers import get_channel_layer +import gc logger = logging.getLogger(__name__) @@ -176,7 +177,17 @@ class SentenceTransformer: # If not present locally, download: if not os.path.exists(os.path.join(MODEL_PATH, "config.json")): logger.info(f"Local model not found in {MODEL_PATH}; downloading from {SENTENCE_MODEL_NAME}...") - st_model = st(SENTENCE_MODEL_NAME, cache_folder=MODEL_PATH) + cls._instance = st(SENTENCE_MODEL_NAME, cache_folder=MODEL_PATH) else: logger.info(f"Loading local model from {MODEL_PATH}") - st_model = st(MODEL_PATH) + cls._instance = st(MODEL_PATH) + + return cls._instance + + @classmethod + def clear(cls): + """Clear the model instance and release memory.""" + if cls._instance is not None: + del cls._instance + cls._instance = None + gc.collect() diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 1e45dd5a..59b3af0c 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -30,7 +30,6 @@ INSTALLED_APPS = [ 'apps.proxy.ts_proxy', 'core', 'drf_yasg', - 'daphne', 'channels', 'django.contrib.admin', 'django.contrib.auth', diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index e6622206..d4f4007a 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -89,10 +89,13 @@ elif [ "$DISPATCHARR_DEBUG" = "true" ]; then uwsgi_file="/app/docker/uwsgi.debug.ini" fi - if [[ "$DISPATCHARR_ENV" = "dev" ]]; then . /app/docker/init/99-init-dev.sh - + echo "Starting frontend dev environment" + su - $POSTGRES_USER -c "cd /app/frontend && npm run dev &" + npm_pid=$(pgrep vite | sort | head -n1) + echo "✅ vite started with PID $npm_pid" + pids+=("$npm_pid") else echo "🚀 Starting nginx..." nginx @@ -105,11 +108,36 @@ cd /app python manage.py migrate --noinput python manage.py collectstatic --noinput -echo "🚀 Starting uwsgi..." -su - $POSTGRES_USER -c "cd /app && uwsgi --ini $uwsgi_file &" -uwsgi_pid=$(pgrep uwsgi | sort | head -n1) -echo "✅ uwsgi started with PID $uwsgi_pid" -pids+=("$uwsgi_pid") +sed -i 's/protected-mode yes/protected-mode no/g' /etc/redis/redis.conf +su - $POSTGRES_USER -c "redis-server --protected-mode no &" +redis_pid=$(pgrep redis) +echo "✅ redis started with PID $redis_pid" +pids+=("$redis_pid") + +echo "🚀 Starting gunicorn..." +su - $POSTGRES_USER -c "cd /app && gunicorn dispatcharr.asgi:application \ + --bind 0.0.0.0:5656 \ + --worker-class uvicorn.workers.UvicornWorker \ + --workers 2 \ + --threads 1 \ + --timeout 600 \ + --keep-alive 30 \ + --access-logfile - \ + --error-logfile - &" +gunicorn_pid=$(pgrep gunicorn | sort | head -n1) +echo "✅ gunicorn started with PID $gunicorn_pid" +pids+=("$gunicorn_pid") + +echo "Starting celery and beat..." +su - $POSTGRES_USER -c "cd /app && celery -A dispatcharr worker -l info --autoscale=8,2 &" +celery_pid=$(pgrep celery | sort | head -n1) +echo "✅ celery started with PID $celery_pid" +pids+=("$celery_pid") + +su - $POSTGRES_USER -c "cd /app && celery -A dispatcharr beat -l info &" +beat_pid=$(pgrep beat | sort | head -n1) +echo "✅ celery beat started with PID $beat_pid" +pids+=("$beat_pid") # Wait for at least one process to exit and log the process that exited first if [ ${#pids[@]} -gt 0 ]; then diff --git a/docker/init/03-init-dispatcharr.sh b/docker/init/03-init-dispatcharr.sh index 3497c1d6..78417d35 100644 --- a/docker/init/03-init-dispatcharr.sh +++ b/docker/init/03-init-dispatcharr.sh @@ -10,11 +10,12 @@ sed -i "s/NGINX_PORT/${DISPATCHARR_PORT}/g" /etc/nginx/sites-enabled/default # NOTE: mac doesn't run as root, so only manage permissions # if this script is running as root if [ "$(id -u)" = "0" ]; then - touch /app/uwsgi.sock - chown $PUID:$PGID /app/uwsgi.sock # Needs to own ALL of /data except db, we handle that below chown -R $PUID:$PGID /data + chown -R $PUID:$PGID /app/logo_cache + chown -R $PUID:$PGID /app/media + # Permissions chown -R postgres:postgres /data/db chmod +x /data diff --git a/docker/nginx.conf b/docker/nginx.conf index 4c26dc1c..71f17438 100644 --- a/docker/nginx.conf +++ b/docker/nginx.conf @@ -11,7 +11,7 @@ server { # Serve Django via uWSGI location / { include uwsgi_params; - uwsgi_pass unix:/app/uwsgi.sock; + proxy_pass http://127.0.0.1:5656; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $host; @@ -62,7 +62,7 @@ server { # WebSockets for real-time communication location /ws/ { - proxy_pass http://127.0.0.1:8001; + proxy_pass http://127.0.0.1:5656; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; diff --git a/frontend/src/WebSocket.jsx b/frontend/src/WebSocket.jsx index 133f749e..616236cc 100644 --- a/frontend/src/WebSocket.jsx +++ b/frontend/src/WebSocket.jsx @@ -29,7 +29,7 @@ export const WebsocketProvider = ({ children }) => { useEffect(() => { let wsUrl = `${window.location.host}/ws/`; if (import.meta.env.DEV) { - wsUrl = `${window.location.hostname}:8001/ws/`; + wsUrl = `${window.location.hostname}:5656/ws/`; } if (window.location.protocol.match(/https/)) { diff --git a/frontend/src/components/forms/M3U.jsx b/frontend/src/components/forms/M3U.jsx index adc5f7ec..f8c70698 100644 --- a/frontend/src/components/forms/M3U.jsx +++ b/frontend/src/components/forms/M3U.jsx @@ -58,7 +58,6 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { }), onSubmit: async (values, { setSubmitting, resetForm }) => { let newPlaylist; - console.log(values); if (playlist?.id) { await API.updatePlaylist({ id: playlist.id, @@ -66,6 +65,7 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { uploaded_file: file, }); } else { + setLoadingText('Fetching groups'); newPlaylist = await API.addPlaylist({ ...values, uploaded_file: file, @@ -75,7 +75,6 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { // Don't prompt for group filters, but keeping this here // in case we want to revive it - newPlaylist = null; } resetForm(); diff --git a/frontend/src/components/forms/M3UGroupFilter.jsx b/frontend/src/components/forms/M3UGroupFilter.jsx index 89ffc0e4..2fe221de 100644 --- a/frontend/src/components/forms/M3UGroupFilter.jsx +++ b/frontend/src/components/forms/M3UGroupFilter.jsx @@ -59,7 +59,7 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => { channel_groups: groupStates, }); setIsLoading(false); - + API.refreshPlaylist(playlist.id); onClose(); }; diff --git a/frontend/src/components/tables/ChannelsTable.jsx b/frontend/src/components/tables/ChannelsTable.jsx index df95f00b..c69cc0a8 100644 --- a/frontend/src/components/tables/ChannelsTable.jsx +++ b/frontend/src/components/tables/ChannelsTable.jsx @@ -268,8 +268,6 @@ const ChannelsTable = ({}) => { ); const [channelsEnabledHeaderSwitch, setChannelsEnabledHeaderSwitch] = useState(false); - const [moreActionsAnchorEl, setMoreActionsAnchorEl] = useState(null); - const [actionsOpenRow, setActionsOpenRow] = useState(null); const [hdhrUrl, setHDHRUrl] = useState(hdhrUrlBase); const [epgUrl, setEPGUrl] = useState(epgUrlBase); @@ -720,11 +718,6 @@ const ChannelsTable = ({}) => { ); }; - const handleMoreActionsClick = (event, rowId) => { - setMoreActionsAnchorEl(event.currentTarget); - setActionsOpenRow(rowId); - }; - const table = useMantineReactTable({ ...TableHelper.defaultProperties, columns, @@ -884,13 +877,7 @@ const ChannelsTable = ({}) => { {env_mode == 'dev' && (