memory optimization, m3u processing, re-added group filtering before m3u ingestion

This commit is contained in:
dekzter 2025-04-06 11:36:16 -04:00
parent 472c20627c
commit ecc96f8b69
14 changed files with 108 additions and 103 deletions

View file

@ -4,11 +4,11 @@ import os
import re
import requests
import time
import gc
from datetime import datetime
from celery import shared_task
from rapidfuzz import fuzz
from sentence_transformers import util
from django.conf import settings
from django.db import transaction
from django.utils.text import slugify
@ -67,6 +67,8 @@ def match_epg_channels():
4) If a match is found, we set channel.tvg_id
5) Summarize and log results.
"""
from sentence_transformers import util
logger.info("Starting EPG matching logic...")
st_model = SentenceTransformer.get_model()
@ -222,6 +224,8 @@ def match_epg_channels():
}
)
SentenceTransformer.clear()
gc.collect()
return f"Done. Matched {total_matched} channel(s)."
@shared_task

View file

@ -14,7 +14,7 @@ def refresh_account_on_save(sender, instance, created, **kwargs):
if it is active or newly created.
"""
if created:
refresh_single_m3u_account.delay(instance.id)
refresh_m3u_groups(instance.id)
@receiver(post_save, sender=M3UAccount)
def create_or_update_refresh_task(sender, instance, **kwargs):

View file

@ -3,6 +3,7 @@ import logging
import re
import requests
import os
import gc
from celery.app.control import Inspect
from celery.result import AsyncResult
from celery import shared_task, current_app, group
@ -160,21 +161,15 @@ def process_groups(account, group_names):
)
@shared_task
def process_m3u_batch(account_id, batch, group_names, hash_keys):
def process_m3u_batch(account_id, batch, groups, hash_keys):
"""Processes a batch of M3U streams using bulk operations."""
account = M3UAccount.objects.get(id=account_id)
existing_groups = {group.name: group for group in ChannelGroup.objects.filter(
m3u_account__m3u_account=account, # Filter by the M3UAccount
m3u_account__enabled=True # Filter by the enabled flag in the join table
)}
streams_to_create = []
streams_to_update = []
stream_hashes = {}
# compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters]
redis_client = RedisClient.get_client()
logger.debug(f"Processing batch of {len(batch)}")
for stream_info in batch:
name, url = stream_info["name"], stream_info["url"]
@ -182,7 +177,7 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys):
group_title = stream_info["attributes"].get("group-title", "Default Group")
# Filter out disabled groups for this account
if group_title not in existing_groups:
if group_title not in groups:
logger.debug(f"Skipping stream in disabled group: {group_title}")
continue
@ -199,18 +194,18 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys):
try:
stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys)
if redis_client.exists(f"m3u_refresh:{stream_hash}"):
# duplicate already processed by another batch
continue
# if redis_client.exists(f"m3u_refresh:{stream_hash}"):
# # duplicate already processed by another batch
# continue
redis_client.set(f"m3u_refresh:{stream_hash}", "true")
# redis_client.set(f"m3u_refresh:{stream_hash}", "true")
stream_props = {
"name": name,
"url": url,
"logo_url": tvg_logo,
"tvg_id": tvg_id,
"m3u_account": account,
"channel_group": existing_groups[group_title],
"channel_group_id": int(groups.get(group_title)),
"stream_hash": stream_hash,
"custom_properties": json.dumps(stream_info["attributes"]),
}
@ -226,15 +221,13 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys):
for stream_hash, stream_props in stream_hashes.items():
if stream_hash in existing_streams:
obj = existing_streams[stream_hash]
changed = False
for key, value in stream_props.items():
if hasattr(obj, key) and getattr(obj, key) == value:
continue
changed = True
setattr(obj, key, value)
existing_attr = {field.name: getattr(obj, field.name) for field in Stream._meta.fields if field != 'channel_group_id'}
changed = any(existing_attr[key] != value for key, value in stream_props.items() if key != 'channel_group_id')
obj.last_seen = timezone.now()
if changed:
for key, value in stream_props.items():
setattr(obj, key, value)
obj.last_seen = timezone.now()
streams_to_update.append(obj)
del existing_streams[stream_hash]
else:
@ -244,15 +237,19 @@ def process_m3u_batch(account_id, batch, group_names, hash_keys):
streams_to_create.append(Stream(**stream_props))
try:
if streams_to_create:
Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True)
if streams_to_update:
Stream.objects.bulk_update(streams_to_update, stream_props.keys())
if len(existing_streams.keys()) > 0:
Stream.objects.bulk_update(existing_streams.values(), ["last_seen"])
with transaction.atomic():
if streams_to_create:
Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True)
if streams_to_update:
Stream.objects.bulk_update(streams_to_update, { key for key in stream_props.keys() if key not in ["m3u_account", "stream_hash"] and key not in hash_keys})
# if len(existing_streams.keys()) > 0:
# Stream.objects.bulk_update(existing_streams.values(), ["last_seen"])
except Exception as e:
logger.error(f"Bulk create failed: {str(e)}")
# Aggressive garbage collection
del streams_to_create, streams_to_update, stream_hash, existing_streams
gc.collect()
return f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
@ -276,26 +273,20 @@ def cleanup_streams(account_id):
logger.info(f"Cleanup complete")
def refresh_m3u_groups(account_id):
def refresh_m3u_groups(account_id, use_cache=False):
if not acquire_task_lock('refresh_m3u_account_groups', account_id):
return f"Task already running for account_id={account_id}.", None
# Record start time
start_time = time.time()
try:
account = M3UAccount.objects.get(id=account_id, is_active=True)
except M3UAccount.DoesNotExist:
release_task_lock('refresh_m3u_account_groups', account_id)
return f"M3UAccount with ID={account_id} not found or inactive.", None
send_progress_update(0, account_id)
lines = fetch_m3u_lines(account)
extinf_data = []
groups = set(["Default Group"])
for line in lines:
for line in fetch_m3u_lines(account, use_cache):
line = line.strip()
if line.startswith("#EXTINF"):
parsed = parse_extinf_line(line)
@ -365,9 +356,14 @@ def refresh_single_m3u_account(account_id, use_cache=False):
hash_keys = CoreSettings.get_m3u_hash_key().split(",")
existing_groups = {group.name: group.id for group in ChannelGroup.objects.filter(
m3u_account__m3u_account=account, # Filter by the M3UAccount
m3u_account__enabled=True # Filter by the enabled flag in the join table
)}
# Break into batches and process in parallel
batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)]
task_group = group(process_m3u_batch.s(account_id, batch, groups, hash_keys) for batch in batches)
task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches)
total_batches = len(batches)
completed_batches = 0
@ -409,15 +405,19 @@ def refresh_single_m3u_account(account_id, use_cache=False):
print(f"Function took {elapsed_time} seconds to execute.")
# Aggressive garbage collection
del existing_groups, extinf_data, groups, batches
gc.collect()
release_task_lock('refresh_single_m3u_account', account_id)
cursor = 0
while True:
cursor, keys = redis_client.scan(cursor, match=f"m3u_refresh:*", count=BATCH_SIZE)
if keys:
redis_client.delete(*keys) # Delete the matching keys
if cursor == 0:
break
# cursor = 0
# while True:
# cursor, keys = redis_client.scan(cursor, match=f"m3u_refresh:*", count=BATCH_SIZE)
# if keys:
# redis_client.delete(*keys) # Delete the matching keys
# if cursor == 0:
# break
return f"Dispatched jobs complete."

View file

@ -8,6 +8,7 @@ from redis.exceptions import ConnectionError, TimeoutError
from django.core.cache import cache
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
import gc
logger = logging.getLogger(__name__)
@ -176,7 +177,17 @@ class SentenceTransformer:
# If not present locally, download:
if not os.path.exists(os.path.join(MODEL_PATH, "config.json")):
logger.info(f"Local model not found in {MODEL_PATH}; downloading from {SENTENCE_MODEL_NAME}...")
st_model = st(SENTENCE_MODEL_NAME, cache_folder=MODEL_PATH)
cls._instance = st(SENTENCE_MODEL_NAME, cache_folder=MODEL_PATH)
else:
logger.info(f"Loading local model from {MODEL_PATH}")
st_model = st(MODEL_PATH)
cls._instance = st(MODEL_PATH)
return cls._instance
@classmethod
def clear(cls):
"""Clear the model instance and release memory."""
if cls._instance is not None:
del cls._instance
cls._instance = None
gc.collect()

View file

@ -30,7 +30,6 @@ INSTALLED_APPS = [
'apps.proxy.ts_proxy',
'core',
'drf_yasg',
'daphne',
'channels',
'django.contrib.admin',
'django.contrib.auth',

View file

@ -89,10 +89,13 @@ elif [ "$DISPATCHARR_DEBUG" = "true" ]; then
uwsgi_file="/app/docker/uwsgi.debug.ini"
fi
if [[ "$DISPATCHARR_ENV" = "dev" ]]; then
. /app/docker/init/99-init-dev.sh
echo "Starting frontend dev environment"
su - $POSTGRES_USER -c "cd /app/frontend && npm run dev &"
npm_pid=$(pgrep vite | sort | head -n1)
echo "✅ vite started with PID $npm_pid"
pids+=("$npm_pid")
else
echo "🚀 Starting nginx..."
nginx
@ -105,11 +108,36 @@ cd /app
python manage.py migrate --noinput
python manage.py collectstatic --noinput
echo "🚀 Starting uwsgi..."
su - $POSTGRES_USER -c "cd /app && uwsgi --ini $uwsgi_file &"
uwsgi_pid=$(pgrep uwsgi | sort | head -n1)
echo "✅ uwsgi started with PID $uwsgi_pid"
pids+=("$uwsgi_pid")
sed -i 's/protected-mode yes/protected-mode no/g' /etc/redis/redis.conf
su - $POSTGRES_USER -c "redis-server --protected-mode no &"
redis_pid=$(pgrep redis)
echo "✅ redis started with PID $redis_pid"
pids+=("$redis_pid")
echo "🚀 Starting gunicorn..."
su - $POSTGRES_USER -c "cd /app && gunicorn dispatcharr.asgi:application \
--bind 0.0.0.0:5656 \
--worker-class uvicorn.workers.UvicornWorker \
--workers 2 \
--threads 1 \
--timeout 600 \
--keep-alive 30 \
--access-logfile - \
--error-logfile - &"
gunicorn_pid=$(pgrep gunicorn | sort | head -n1)
echo "✅ gunicorn started with PID $gunicorn_pid"
pids+=("$gunicorn_pid")
echo "Starting celery and beat..."
su - $POSTGRES_USER -c "cd /app && celery -A dispatcharr worker -l info --autoscale=8,2 &"
celery_pid=$(pgrep celery | sort | head -n1)
echo "✅ celery started with PID $celery_pid"
pids+=("$celery_pid")
su - $POSTGRES_USER -c "cd /app && celery -A dispatcharr beat -l info &"
beat_pid=$(pgrep beat | sort | head -n1)
echo "✅ celery beat started with PID $beat_pid"
pids+=("$beat_pid")
# Wait for at least one process to exit and log the process that exited first
if [ ${#pids[@]} -gt 0 ]; then

View file

@ -10,11 +10,12 @@ sed -i "s/NGINX_PORT/${DISPATCHARR_PORT}/g" /etc/nginx/sites-enabled/default
# NOTE: mac doesn't run as root, so only manage permissions
# if this script is running as root
if [ "$(id -u)" = "0" ]; then
touch /app/uwsgi.sock
chown $PUID:$PGID /app/uwsgi.sock
# Needs to own ALL of /data except db, we handle that below
chown -R $PUID:$PGID /data
chown -R $PUID:$PGID /app/logo_cache
chown -R $PUID:$PGID /app/media
# Permissions
chown -R postgres:postgres /data/db
chmod +x /data

View file

@ -11,7 +11,7 @@ server {
# Serve Django via uWSGI
location / {
include uwsgi_params;
uwsgi_pass unix:/app/uwsgi.sock;
proxy_pass http://127.0.0.1:5656;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
@ -62,7 +62,7 @@ server {
# WebSockets for real-time communication
location /ws/ {
proxy_pass http://127.0.0.1:8001;
proxy_pass http://127.0.0.1:5656;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";

View file

@ -29,7 +29,7 @@ export const WebsocketProvider = ({ children }) => {
useEffect(() => {
let wsUrl = `${window.location.host}/ws/`;
if (import.meta.env.DEV) {
wsUrl = `${window.location.hostname}:8001/ws/`;
wsUrl = `${window.location.hostname}:5656/ws/`;
}
if (window.location.protocol.match(/https/)) {

View file

@ -58,7 +58,6 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
}),
onSubmit: async (values, { setSubmitting, resetForm }) => {
let newPlaylist;
console.log(values);
if (playlist?.id) {
await API.updatePlaylist({
id: playlist.id,
@ -66,6 +65,7 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
uploaded_file: file,
});
} else {
setLoadingText('Fetching groups');
newPlaylist = await API.addPlaylist({
...values,
uploaded_file: file,
@ -75,7 +75,6 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
// Don't prompt for group filters, but keeping this here
// in case we want to revive it
newPlaylist = null;
}
resetForm();

View file

@ -59,7 +59,7 @@ const M3UGroupFilter = ({ playlist = null, isOpen, onClose }) => {
channel_groups: groupStates,
});
setIsLoading(false);
API.refreshPlaylist(playlist.id);
onClose();
};

View file

@ -268,8 +268,6 @@ const ChannelsTable = ({}) => {
);
const [channelsEnabledHeaderSwitch, setChannelsEnabledHeaderSwitch] =
useState(false);
const [moreActionsAnchorEl, setMoreActionsAnchorEl] = useState(null);
const [actionsOpenRow, setActionsOpenRow] = useState(null);
const [hdhrUrl, setHDHRUrl] = useState(hdhrUrlBase);
const [epgUrl, setEPGUrl] = useState(epgUrlBase);
@ -720,11 +718,6 @@ const ChannelsTable = ({}) => {
);
};
const handleMoreActionsClick = (event, rowId) => {
setMoreActionsAnchorEl(event.currentTarget);
setActionsOpenRow(rowId);
};
const table = useMantineReactTable({
...TableHelper.defaultProperties,
columns,
@ -884,13 +877,7 @@ const ChannelsTable = ({}) => {
{env_mode == 'dev' && (
<Menu>
<Menu.Target>
<ActionIcon
onClick={(event) =>
handleMoreActionsClick(event, row.original.id)
}
variant="transparent"
size="sm"
>
<ActionIcon variant="transparent" size="sm">
<CircleEllipsis size="18" />
</ActionIcon>
</Menu.Target>

View file

@ -49,10 +49,7 @@ const StreamsTable = ({}) => {
const [rowSelection, setRowSelection] = useState([]);
const [stream, setStream] = useState(null);
const [modalOpen, setModalOpen] = useState(false);
const [moreActionsAnchorEl, setMoreActionsAnchorEl] = useState(null);
const [groupOptions, setGroupOptions] = useState([]);
const [m3uOptions, setM3uOptions] = useState([]);
const [actionsOpenRow, setActionsOpenRow] = useState(null);
const [initialDataCount, setInitialDataCount] = useState(null);
const [data, setData] = useState([]); // Holds fetched data
@ -62,7 +59,6 @@ const StreamsTable = ({}) => {
const [isLoading, setIsLoading] = useState(true);
const [sorting, setSorting] = useState([{ id: 'name', desc: '' }]);
const [selectedStreamIds, setSelectedStreamIds] = useState([]);
const [unselectedStreamIds, setUnselectedStreamIds] = useState([]);
// const [allRowsSelected, setAllRowsSelected] = useState(false);
const [pagination, setPagination] = useState({
pageIndex: 0,
@ -74,7 +70,6 @@ const StreamsTable = ({}) => {
m3u_account: '',
});
const debouncedFilters = useDebounce(filters, 500);
const hasData = data.length > 0;
const navigate = useNavigate();
@ -92,8 +87,6 @@ const StreamsTable = ({}) => {
} = useSettingsStore();
const { showVideo } = useVideoStore();
const isMoreActionsOpen = Boolean(moreActionsAnchorEl);
// Access the row virtualizer instance (optional)
const rowVirtualizerInstanceRef = useRef(null);
@ -354,16 +347,6 @@ const StreamsTable = ({}) => {
});
};
const handleMoreActionsClick = (event, rowId) => {
setMoreActionsAnchorEl(event.currentTarget);
setActionsOpenRow(rowId);
};
const handleMoreActionsClose = () => {
setMoreActionsAnchorEl(null);
setActionsOpenRow(null);
};
const onRowSelectionChange = (updater) => {
setRowSelection((prevRowSelection) => {
const newRowSelection =
@ -550,13 +533,7 @@ const StreamsTable = ({}) => {
<Menu>
<Menu.Target>
<ActionIcon
onClick={(event) =>
handleMoreActionsClick(event, row.original.id)
}
variant="transparent"
size="sm"
>
<ActionIcon variant="transparent" size="sm">
<EllipsisVertical size="18" />
</ActionIcon>
</Menu.Target>

View file

@ -11,7 +11,6 @@ drf-yasg>=1.20.0
streamlink
python-vlc
yt-dlp
gevent==24.11.1
django-cors-headers
djangorestframework-simplejwt
m3u8
@ -22,9 +21,9 @@ torch==2.6.0+cpu
# ML/NLP dependencies
sentence-transformers==3.4.1
uwsgi
channels
channels-redis
daphne
django-filter
django-celery-beat
gunicorn
uvicorn[standard]