Rewored celery memory cleanup logic.

This commit is contained in:
SergeantPanda 2025-05-18 20:57:37 -05:00
parent f821743163
commit 7c809931d7
4 changed files with 53 additions and 35 deletions

View file

@ -216,9 +216,9 @@ def match_epg_channels():
finally:
# Final cleanup
gc.collect()
# Force an even more aggressive cleanup
import gc
gc.collect(generation=2)
# Use our standardized cleanup function for more thorough memory management
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)
@shared_task

View file

@ -496,7 +496,8 @@ def process_m3u_batch(account_id, batch, groups, hash_keys):
# Aggressive garbage collection
del streams_to_create, streams_to_update, stream_hashes, existing_streams
gc.collect()
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)
return retval
@ -1080,7 +1081,8 @@ def refresh_single_m3u_account(account_id):
# Aggressive garbage collection
del existing_groups, extinf_data, groups, batches
gc.collect()
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)
# Clean up cache file since we've fully processed it
if os.path.exists(cache_path):

View file

@ -49,29 +49,13 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs):
def clear_memory():
"""Force aggressive garbage collection to free memory"""
import gc
# Run full garbage collection
gc.collect(generation=2)
# Find and break any reference cycles
gc.collect(generation=0)
# Clear any cached objects in memory
gc.collect(generation=1)
# Check if psutil is available for more advanced monitoring
try:
import psutil
process = psutil.Process()
if hasattr(process, 'memory_info'):
mem = process.memory_info().rss / (1024 * 1024)
logger.debug(f"Memory usage after cleanup: {mem:.2f} MB")
except (ImportError, Exception):
pass
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)
@shared_task
def beat_periodic_task():
fetch_channel_stats()
scan_and_process_files()
# Call memory cleanup after completing tasks
clear_memory()
@shared_task
def scan_and_process_files():

View file

@ -53,19 +53,51 @@ app.conf.update(
@task_postrun.connect # Use the imported signal
def cleanup_task_memory(**kwargs):
"""Clean up memory after each task completes"""
import gc
# Force garbage collection
gc.collect()
# Get task name from kwargs
task_name = kwargs.get('task').name if kwargs.get('task') else ''
# Log memory usage if psutil is installed
try:
import psutil
process = psutil.Process()
if hasattr(process, 'memory_info'):
mem = process.memory_info().rss / (1024 * 1024)
print(f"Memory usage after task: {mem:.2f} MB")
except (ImportError, Exception):
pass
# Only run cleanup for memory-intensive tasks
memory_intensive_tasks = [
'apps.m3u.tasks.refresh_single_m3u_account',
'apps.m3u.tasks.refresh_m3u_accounts',
'apps.m3u.tasks.process_m3u_batch',
'apps.m3u.tasks.process_xc_category',
'apps.epg.tasks.refresh_epg_data',
'apps.epg.tasks.refresh_all_epg_data',
'apps.epg.tasks.parse_programs_for_source',
'apps.epg.tasks.parse_programs_for_tvg_id',
'apps.channels.tasks.match_epg_channels',
'core.tasks.rehash_streams'
]
# Check if this is a memory-intensive task
if task_name in memory_intensive_tasks:
# Import cleanup_memory function
from core.utils import cleanup_memory
# Use the comprehensive cleanup function
cleanup_memory(log_usage=True, force_collection=True)
# Log memory usage if psutil is installed
try:
import psutil
process = psutil.Process()
if hasattr(process, 'memory_info'):
mem = process.memory_info().rss / (1024 * 1024)
print(f"Memory usage after {task_name}: {mem:.2f} MB")
except (ImportError, Exception):
pass
else:
# For non-intensive tasks, just log but don't force cleanup
try:
import psutil
process = psutil.Process()
if hasattr(process, 'memory_info'):
mem = process.memory_info().rss / (1024 * 1024)
if mem > 500: # Only log if using more than 500MB
print(f"High memory usage detected in {task_name}: {mem:.2f} MB")
except (ImportError, Exception):
pass
@app.on_after_configure.connect
def setup_celery_logging(**kwargs):