diff --git a/apps/channels/tasks.py b/apps/channels/tasks.py index b4de5e07..d59169e5 100755 --- a/apps/channels/tasks.py +++ b/apps/channels/tasks.py @@ -216,9 +216,9 @@ def match_epg_channels(): finally: # Final cleanup gc.collect() - # Force an even more aggressive cleanup - import gc - gc.collect(generation=2) + # Use our standardized cleanup function for more thorough memory management + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) @shared_task diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 513c550d..4a1f2645 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -496,7 +496,8 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): # Aggressive garbage collection del streams_to_create, streams_to_update, stream_hashes, existing_streams - gc.collect() + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) return retval @@ -1080,7 +1081,8 @@ def refresh_single_m3u_account(account_id): # Aggressive garbage collection del existing_groups, extinf_data, groups, batches - gc.collect() + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) # Clean up cache file since we've fully processed it if os.path.exists(cache_path): diff --git a/core/tasks.py b/core/tasks.py index 5e60a4e4..9af367f9 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -49,29 +49,13 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs): def clear_memory(): """Force aggressive garbage collection to free memory""" - import gc - # Run full garbage collection - gc.collect(generation=2) - # Find and break any reference cycles - gc.collect(generation=0) - # Clear any cached objects in memory - gc.collect(generation=1) - # Check if psutil is available for more advanced monitoring - try: - import psutil - process = psutil.Process() - if hasattr(process, 'memory_info'): - mem = process.memory_info().rss / (1024 * 1024) - logger.debug(f"Memory usage after cleanup: {mem:.2f} MB") - except (ImportError, Exception): - pass + from core.utils import cleanup_memory + cleanup_memory(log_usage=True, force_collection=True) @shared_task def beat_periodic_task(): fetch_channel_stats() scan_and_process_files() - # Call memory cleanup after completing tasks - clear_memory() @shared_task def scan_and_process_files(): diff --git a/dispatcharr/celery.py b/dispatcharr/celery.py index b0debc76..855acacd 100644 --- a/dispatcharr/celery.py +++ b/dispatcharr/celery.py @@ -53,19 +53,51 @@ app.conf.update( @task_postrun.connect # Use the imported signal def cleanup_task_memory(**kwargs): """Clean up memory after each task completes""" - import gc - # Force garbage collection - gc.collect() + # Get task name from kwargs + task_name = kwargs.get('task').name if kwargs.get('task') else '' - # Log memory usage if psutil is installed - try: - import psutil - process = psutil.Process() - if hasattr(process, 'memory_info'): - mem = process.memory_info().rss / (1024 * 1024) - print(f"Memory usage after task: {mem:.2f} MB") - except (ImportError, Exception): - pass + # Only run cleanup for memory-intensive tasks + memory_intensive_tasks = [ + 'apps.m3u.tasks.refresh_single_m3u_account', + 'apps.m3u.tasks.refresh_m3u_accounts', + 'apps.m3u.tasks.process_m3u_batch', + 'apps.m3u.tasks.process_xc_category', + 'apps.epg.tasks.refresh_epg_data', + 'apps.epg.tasks.refresh_all_epg_data', + 'apps.epg.tasks.parse_programs_for_source', + 'apps.epg.tasks.parse_programs_for_tvg_id', + 'apps.channels.tasks.match_epg_channels', + 'core.tasks.rehash_streams' + ] + + # Check if this is a memory-intensive task + if task_name in memory_intensive_tasks: + # Import cleanup_memory function + from core.utils import cleanup_memory + + # Use the comprehensive cleanup function + cleanup_memory(log_usage=True, force_collection=True) + + # Log memory usage if psutil is installed + try: + import psutil + process = psutil.Process() + if hasattr(process, 'memory_info'): + mem = process.memory_info().rss / (1024 * 1024) + print(f"Memory usage after {task_name}: {mem:.2f} MB") + except (ImportError, Exception): + pass + else: + # For non-intensive tasks, just log but don't force cleanup + try: + import psutil + process = psutil.Process() + if hasattr(process, 'memory_info'): + mem = process.memory_info().rss / (1024 * 1024) + if mem > 500: # Only log if using more than 500MB + print(f"High memory usage detected in {task_name}: {mem:.2f} MB") + except (ImportError, Exception): + pass @app.on_after_configure.connect def setup_celery_logging(**kwargs):