merged in dev

This commit is contained in:
dekzter 2025-05-21 08:16:35 -04:00
commit a96c5f0f5c
11 changed files with 1218 additions and 426 deletions

View file

@ -6,11 +6,13 @@ on:
paths:
- 'docker/DispatcharrBase'
- '.github/workflows/base-image.yml'
- 'requirements.txt'
pull_request:
branches: [ main, dev ]
paths:
- 'docker/DispatcharrBase'
- '.github/workflows/base-image.yml'
- 'requirements.txt'
workflow_dispatch: # Allow manual triggering
permissions:

View file

@ -7,6 +7,7 @@ import time
import json
import subprocess
from datetime import datetime
import gc
from celery import shared_task
from django.utils.text import slugify
@ -63,146 +64,162 @@ def match_epg_channels():
4) If a match is found, we set channel.tvg_id
5) Summarize and log results.
"""
logger.info("Starting EPG matching logic...")
# Attempt to retrieve a "preferred-region" if configured
try:
region_obj = CoreSettings.objects.get(key="preferred-region")
region_code = region_obj.value.strip().lower()
except CoreSettings.DoesNotExist:
region_code = None
logger.info("Starting EPG matching logic...")
matched_channels = []
channels_to_update = []
# Attempt to retrieve a "preferred-region" if configured
try:
region_obj = CoreSettings.objects.get(key="preferred-region")
region_code = region_obj.value.strip().lower()
except CoreSettings.DoesNotExist:
region_code = None
# Get channels that don't have EPG data assigned
channels_without_epg = Channel.objects.filter(epg_data__isnull=True)
logger.info(f"Found {channels_without_epg.count()} channels without EPG data")
matched_channels = []
channels_to_update = []
channels_json = []
for channel in channels_without_epg:
# Normalize TVG ID - strip whitespace and convert to lowercase
normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else ""
if normalized_tvg_id:
logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'")
# Get channels that don't have EPG data assigned
channels_without_epg = Channel.objects.filter(epg_data__isnull=True)
logger.info(f"Found {channels_without_epg.count()} channels without EPG data")
channels_json.append({
"id": channel.id,
"name": channel.name,
"tvg_id": normalized_tvg_id, # Use normalized TVG ID
"original_tvg_id": channel.tvg_id, # Keep original for reference
"fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name,
"norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name)
})
channels_json = []
for channel in channels_without_epg:
# Normalize TVG ID - strip whitespace and convert to lowercase
normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else ""
if normalized_tvg_id:
logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'")
# Similarly normalize EPG data TVG IDs
epg_json = []
for epg in EPGData.objects.all():
normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
epg_json.append({
'id': epg.id,
'tvg_id': normalized_tvg_id, # Use normalized TVG ID
'original_tvg_id': epg.tvg_id, # Keep original for reference
'name': epg.name,
'norm_name': normalize_name(epg.name),
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
})
channels_json.append({
"id": channel.id,
"name": channel.name,
"tvg_id": normalized_tvg_id, # Use normalized TVG ID
"original_tvg_id": channel.tvg_id, # Keep original for reference
"fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name,
"norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name)
})
# Log available EPG data TVG IDs for debugging
unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id'])
logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}")
# Similarly normalize EPG data TVG IDs
epg_json = []
for epg in EPGData.objects.all():
normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
epg_json.append({
'id': epg.id,
'tvg_id': normalized_tvg_id, # Use normalized TVG ID
'original_tvg_id': epg.tvg_id, # Keep original for reference
'name': epg.name,
'norm_name': normalize_name(epg.name),
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
})
payload = {
"channels": channels_json,
"epg_data": epg_json,
"region_code": region_code,
}
# Log available EPG data TVG IDs for debugging
unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id'])
logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}")
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(json.dumps(payload).encode('utf-8'))
temp_file_path = temp_file.name
process = subprocess.Popen(
['python', '/app/scripts/epg_match.py', temp_file_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Log stderr in real-time
for line in iter(process.stderr.readline, ''):
if line:
logger.info(line.strip())
process.stderr.close()
stdout, stderr = process.communicate()
os.remove(temp_file_path)
if process.returncode != 0:
return f"Failed to process EPG matching: {stderr}"
result = json.loads(stdout)
# This returns lists of dicts, not model objects
channels_to_update_dicts = result["channels_to_update"]
matched_channels = result["matched_channels"]
# Convert your dict-based 'channels_to_update' into real Channel objects
if channels_to_update_dicts:
# Extract IDs of the channels that need updates
channel_ids = [d["id"] for d in channels_to_update_dicts]
# Fetch them from DB
channels_qs = Channel.objects.filter(id__in=channel_ids)
channels_list = list(channels_qs)
# Build a map from channel_id -> epg_data_id (or whatever fields you need)
epg_mapping = {
d["id"]: d["epg_data_id"] for d in channels_to_update_dicts
payload = {
"channels": channels_json,
"epg_data": epg_json,
"region_code": region_code,
}
# Populate each Channel object with the updated epg_data_id
for channel_obj in channels_list:
# The script sets 'epg_data_id' in the returned dict
# We either assign directly, or fetch the EPGData instance if needed.
channel_obj.epg_data_id = epg_mapping.get(channel_obj.id)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(json.dumps(payload).encode('utf-8'))
temp_file_path = temp_file.name
# Now we have real model objects, so bulk_update will work
Channel.objects.bulk_update(channels_list, ["epg_data"])
# After writing to the file but before subprocess
# Explicitly delete the large data structures
del payload
gc.collect()
total_matched = len(matched_channels)
if total_matched:
logger.info(f"Match Summary: {total_matched} channel(s) matched.")
for (cid, cname, tvg) in matched_channels:
logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'")
else:
logger.info("No new channels were matched.")
process = subprocess.Popen(
['python', '/app/scripts/epg_match.py', temp_file_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
logger.info("Finished EPG matching logic.")
# Log stderr in real-time
for line in iter(process.stderr.readline, ''):
if line:
logger.info(line.strip())
# Send update with additional information for refreshing UI
channel_layer = get_channel_layer()
associations = [
{"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]}
for chan in channels_to_update_dicts
]
process.stderr.close()
stdout, stderr = process.communicate()
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {
"success": True,
"type": "epg_match",
"refresh_channels": True, # Flag to tell frontend to refresh channels
"matches_count": total_matched,
"message": f"EPG matching complete: {total_matched} channel(s) matched",
"associations": associations # Add the associations data
os.remove(temp_file_path)
if process.returncode != 0:
return f"Failed to process EPG matching: {stderr}"
result = json.loads(stdout)
# This returns lists of dicts, not model objects
channels_to_update_dicts = result["channels_to_update"]
matched_channels = result["matched_channels"]
# Explicitly clean up large objects
del stdout, result
gc.collect()
# Convert your dict-based 'channels_to_update' into real Channel objects
if channels_to_update_dicts:
# Extract IDs of the channels that need updates
channel_ids = [d["id"] for d in channels_to_update_dicts]
# Fetch them from DB
channels_qs = Channel.objects.filter(id__in=channel_ids)
channels_list = list(channels_qs)
# Build a map from channel_id -> epg_data_id (or whatever fields you need)
epg_mapping = {
d["id"]: d["epg_data_id"] for d in channels_to_update_dicts
}
}
)
return f"Done. Matched {total_matched} channel(s)."
# Populate each Channel object with the updated epg_data_id
for channel_obj in channels_list:
# The script sets 'epg_data_id' in the returned dict
# We either assign directly, or fetch the EPGData instance if needed.
channel_obj.epg_data_id = epg_mapping.get(channel_obj.id)
# Now we have real model objects, so bulk_update will work
Channel.objects.bulk_update(channels_list, ["epg_data"])
total_matched = len(matched_channels)
if total_matched:
logger.info(f"Match Summary: {total_matched} channel(s) matched.")
for (cid, cname, tvg) in matched_channels:
logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'")
else:
logger.info("No new channels were matched.")
logger.info("Finished EPG matching logic.")
# Send update with additional information for refreshing UI
channel_layer = get_channel_layer()
associations = [
{"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]}
for chan in channels_to_update_dicts
]
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {
"success": True,
"type": "epg_match",
"refresh_channels": True, # Flag to tell frontend to refresh channels
"matches_count": total_matched,
"message": f"EPG matching complete: {total_matched} channel(s) matched",
"associations": associations # Add the associations data
}
}
)
return f"Done. Matched {total_matched} channel(s)."
finally:
# Final cleanup
gc.collect()
# Use our standardized cleanup function for more thorough memory management
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)
@shared_task

File diff suppressed because it is too large Load diff

View file

@ -496,7 +496,8 @@ def process_m3u_batch(account_id, batch, groups, hash_keys):
# Aggressive garbage collection
del streams_to_create, streams_to_update, stream_hashes, existing_streams
gc.collect()
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)
return retval
@ -1080,7 +1081,8 @@ def refresh_single_m3u_account(account_id):
# Aggressive garbage collection
del existing_groups, extinf_data, groups, batches
gc.collect()
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)
# Clean up cache file since we've fully processed it
if os.path.exists(cache_path):
@ -1088,6 +1090,8 @@ def refresh_single_m3u_account(account_id):
return f"Dispatched jobs complete."
from core.utils import send_websocket_update
def send_m3u_update(account_id, action, progress, **kwargs):
# Start with the base data dictionary
data = {
@ -1111,12 +1115,10 @@ def send_m3u_update(account_id, action, progress, **kwargs):
# Add the additional key-value pairs from kwargs
data.update(kwargs)
# Now, send the updated data dictionary
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
'data': data
}
)
# Use the standardized function with memory management
# Enable garbage collection for certain operations
collect_garbage = action == "parsing" and progress % 25 == 0
send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage)
# Explicitly clear data reference to help garbage collection
data = None

View file

@ -6,8 +6,10 @@ import redis
import json
import logging
import re
import gc # Add import for garbage collection
from core.utils import RedisClient
from apps.proxy.ts_proxy.channel_status import ChannelStatus
from core.utils import send_websocket_update
logger = logging.getLogger(__name__)
@ -43,11 +45,17 @@ def fetch_channel_stats():
return
# return JsonResponse({'error': str(e)}, status=500)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
send_websocket_update(
"updates",
"update",
{
"type": "update",
"data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})}
"success": True,
"type": "channel_stats",
"stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})
},
collect_garbage=True
)
# Explicitly clean up large data structures
all_channels = None
gc.collect()

View file

@ -2,13 +2,12 @@
from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
import redis
import json
import logging
import re
import time
import os
from core.utils import RedisClient
from core.utils import RedisClient, send_websocket_update
from apps.proxy.ts_proxy.channel_status import ChannelStatus
from apps.m3u.models import M3UAccount
from apps.epg.models import EPGSource
@ -36,11 +35,6 @@ LOG_THROTTLE_SECONDS = 300 # 5 minutes
# Track if this is the first scan since startup
_first_scan_completed = False
@shared_task
def beat_periodic_task():
fetch_channel_stats()
scan_and_process_files()
def throttled_log(logger_method, message, key=None, *args, **kwargs):
"""Only log messages with the same key once per throttle period"""
if key is None:
@ -52,6 +46,11 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs):
logger_method(message, *args, **kwargs)
_last_log_times[key] = now
@shared_task
def beat_periodic_task():
fetch_channel_stats()
scan_and_process_files()
@shared_task
def scan_and_process_files():
global _first_scan_completed
@ -293,19 +292,23 @@ def fetch_channel_stats():
if cursor == 0:
break
send_websocket_update(
"updates",
"update",
{
"success": True,
"type": "channel_stats",
"stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})
},
collect_garbage=True
)
# Explicitly clean up large data structures
all_channels = None
except Exception as e:
logger.error(f"Error in channel_status: {e}", exc_info=True)
return
# return JsonResponse({'error': str(e)}, status=500)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"updates",
{
"type": "update",
"data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})}
},
)
@shared_task
def rehash_streams(keys):

View file

@ -59,6 +59,10 @@ class RedisClient:
client.config_set('save', '') # Disable RDB snapshots
client.config_set('appendonly', 'no') # Disable AOF logging
# Set optimal memory settings
client.config_set('maxmemory-policy', 'allkeys-lru') # Use LRU eviction
client.config_set('maxmemory', '256mb') # Set reasonable memory limit
# Disable protected mode when in debug mode
if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true':
client.config_set('protected-mode', 'no') # Disable protected mode in debug
@ -169,12 +173,118 @@ def release_task_lock(task_name, id):
# Remove the lock
redis_client.delete(lock_id)
def send_websocket_event(event, success, data):
def send_websocket_update(group_name, event_type, data, collect_garbage=False):
"""
Standardized function to send WebSocket updates with proper memory management.
Args:
group_name: The WebSocket group to send to (e.g. 'updates')
event_type: The type of message (e.g. 'update')
data: The data to send
collect_garbage: Whether to force garbage collection after sending
"""
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {"success": True, "type": "epg_channels"}
}
)
try:
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': event_type,
'data': data
}
)
except Exception as e:
logger.warning(f"Failed to send WebSocket update: {e}")
finally:
# Explicitly release references to help garbage collection
channel_layer = None
# Force garbage collection if requested
if collect_garbage:
gc.collect()
def send_websocket_event(event, success, data):
"""Acquire a lock to prevent concurrent task execution."""
data_payload = {"success": success, "type": event}
if data:
# Make a copy to avoid modifying the original
data_payload.update(data)
# Use the standardized function
send_websocket_update('updates', 'update', data_payload)
# Help garbage collection by clearing references
data_payload = None
# Add memory monitoring utilities
def get_memory_usage():
"""Returns current memory usage in MB"""
import psutil
process = psutil.Process(os.getpid())
return process.memory_info().rss / (1024 * 1024)
def monitor_memory_usage(func):
"""Decorator to monitor memory usage before and after function execution"""
def wrapper(*args, **kwargs):
import gc
# Force garbage collection before measuring
gc.collect()
# Get initial memory usage
start_mem = get_memory_usage()
logger.debug(f"Memory usage before {func.__name__}: {start_mem:.2f} MB")
# Call the original function
result = func(*args, **kwargs)
# Force garbage collection before measuring again
gc.collect()
# Get final memory usage
end_mem = get_memory_usage()
logger.debug(f"Memory usage after {func.__name__}: {end_mem:.2f} MB (Change: {end_mem - start_mem:.2f} MB)")
return result
return wrapper
def cleanup_memory(log_usage=False, force_collection=True):
"""
Comprehensive memory cleanup function to reduce memory footprint
Args:
log_usage: Whether to log memory usage before and after cleanup
force_collection: Whether to force garbage collection
"""
logger.trace("Starting memory cleanup django memory cleanup")
# Skip logging if log level is not set to debug or more verbose (like trace)
current_log_level = logger.getEffectiveLevel()
if not current_log_level <= logging.DEBUG:
log_usage = False
if log_usage:
try:
import psutil
process = psutil.Process()
before_mem = process.memory_info().rss / (1024 * 1024)
logger.debug(f"Memory before cleanup: {before_mem:.2f} MB")
except (ImportError, Exception) as e:
logger.debug(f"Error getting memory usage: {e}")
# Clear any object caches from Django ORM
from django.db import connection, reset_queries
reset_queries()
# Force garbage collection
if force_collection:
# Run full collection
gc.collect(generation=2)
# Clear cyclic references
gc.collect(generation=0)
if log_usage:
try:
import psutil
process = psutil.Process()
after_mem = process.memory_info().rss / (1024 * 1024)
logger.debug(f"Memory after cleanup: {after_mem:.2f} MB (change: {after_mem-before_mem:.2f} MB)")
except (ImportError, Exception):
pass
logger.trace("Memory cleanup complete for django")

View file

@ -2,6 +2,7 @@
import os
from celery import Celery
import logging
from celery.signals import task_postrun # Add import for signals
# Initialize with defaults before Django settings are loaded
DEFAULT_LOG_LEVEL = 'DEBUG'
@ -48,6 +49,56 @@ app.conf.update(
worker_task_log_format='%(asctime)s %(levelname)s %(task_name)s: %(message)s',
)
# Add memory cleanup after task completion
@task_postrun.connect # Use the imported signal
def cleanup_task_memory(**kwargs):
"""Clean up memory after each task completes"""
# Get task name from kwargs
task_name = kwargs.get('task').name if kwargs.get('task') else ''
# Only run cleanup for memory-intensive tasks
memory_intensive_tasks = [
'apps.m3u.tasks.refresh_single_m3u_account',
'apps.m3u.tasks.refresh_m3u_accounts',
'apps.m3u.tasks.process_m3u_batch',
'apps.m3u.tasks.process_xc_category',
'apps.epg.tasks.refresh_epg_data',
'apps.epg.tasks.refresh_all_epg_data',
'apps.epg.tasks.parse_programs_for_source',
'apps.epg.tasks.parse_programs_for_tvg_id',
'apps.channels.tasks.match_epg_channels',
'core.tasks.rehash_streams'
]
# Check if this is a memory-intensive task
if task_name in memory_intensive_tasks:
# Import cleanup_memory function
from core.utils import cleanup_memory
# Use the comprehensive cleanup function
cleanup_memory(log_usage=True, force_collection=True)
# Log memory usage if psutil is installed
try:
import psutil
process = psutil.Process()
if hasattr(process, 'memory_info'):
mem = process.memory_info().rss / (1024 * 1024)
print(f"Memory usage after {task_name}: {mem:.2f} MB")
except (ImportError, Exception):
pass
else:
# For non-intensive tasks, just log but don't force cleanup
try:
import psutil
process = psutil.Process()
if hasattr(process, 'memory_info'):
mem = process.memory_info().rss / (1024 * 1024)
if mem > 500: # Only log if using more than 500MB
print(f"High memory usage detected in {task_name}: {mem:.2f} MB")
except (ImportError, Exception):
pass
@app.on_after_configure.connect
def setup_celery_logging(**kwargs):
# Use our directly determined log level

View file

@ -44,6 +44,36 @@ INSTALLED_APPS = [
"django_celery_beat",
]
# EPG Processing optimization settings
EPG_BATCH_SIZE = 1000 # Number of records to process in a batch
EPG_MEMORY_LIMIT = 512 # Memory limit in MB before forcing garbage collection
EPG_ENABLE_MEMORY_MONITORING = True # Whether to monitor memory usage during processing
# Database optimization settings
DATABASE_STATEMENT_TIMEOUT = 300 # Seconds before timing out long-running queries
DATABASE_CONN_MAX_AGE = (
60 # Connection max age in seconds, helps with frequent reconnects
)
# Disable atomic requests for performance-sensitive views
ATOMIC_REQUESTS = False
# Cache settings - add caching for EPG operations
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
"LOCATION": "dispatcharr-epg-cache",
"TIMEOUT": 3600, # 1 hour cache timeout
"OPTIONS": {
"MAX_ENTRIES": 10000,
"CULL_FREQUENCY": 3, # Purge 1/3 of entries when max is reached
},
}
}
# Timeouts for external connections
REQUESTS_TIMEOUT = 30 # Seconds for external API requests
MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
@ -165,6 +195,15 @@ CELERY_BROKER_TRANSPORT_OPTIONS = {
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
# Memory management settings
# CELERY_WORKER_MAX_TASKS_PER_CHILD = 10 # Restart worker after 10 tasks to free memory
# CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Don't prefetch tasks - process one at a time
# CELERY_TASK_ACKS_LATE = True # Only acknowledge tasks after they're processed
# CELERY_TASK_TIME_LIMIT = 3600 # 1 hour time limit per task
# CELERY_TASK_SOFT_TIME_LIMIT = 3540 # Soft limit 60 seconds before hard limit
# CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True # Cancel tasks if connection lost
# CELERY_TASK_IGNORE_RESULT = True # Don't store results unless explicitly needed
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler"
CELERY_BEAT_SCHEDULE = {
"fetch-channel-statuses": {
@ -275,6 +314,11 @@ LOGGING = {
"level": LOG_LEVEL, # Use environment-configured level
"propagate": False, # Don't propagate to root logger to avoid duplicate logs
},
"core.utils": {
"handlers": ["console"],
"level": LOG_LEVEL,
"propagate": False,
},
"apps.proxy": {
"handlers": ["console"],
"level": LOG_LEVEL, # Use environment-configured level

View file

@ -8,7 +8,7 @@ exec-before = python /app/scripts/wait_for_redis.py
; Start Redis first
attach-daemon = redis-server
; Then start other services
attach-daemon = celery -A dispatcharr worker
attach-daemon = celery -A dispatcharr worker --concurrency=4
attach-daemon = celery -A dispatcharr beat
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
attach-daemon = cd /app/frontend && npm run dev

View file

@ -30,3 +30,4 @@ channels
channels-redis
django-filter
django-celery-beat
lxml==5.4.0