The struggle is real

This commit is contained in:
SergeantPanda 2025-05-18 17:05:03 -05:00
parent 8133af5d20
commit ed665584e9
5 changed files with 234 additions and 143 deletions

View file

@ -10,6 +10,7 @@ from datetime import datetime, timedelta, timezone as dt_timezone
import gc # Add garbage collection module
import json
from lxml import etree # Using lxml exclusively
import psutil # Add import for memory tracking
from celery import shared_task
from django.conf import settings
@ -22,7 +23,7 @@ from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from .models import EPGSource, EPGData, ProgramData
from core.utils import acquire_task_lock, release_task_lock
from core.utils import acquire_task_lock, release_task_lock, send_websocket_update
logger = logging.getLogger(__name__)
@ -40,18 +41,18 @@ def send_epg_update(source_id, action, progress, **kwargs):
# Add the additional key-value pairs from kwargs
data.update(kwargs)
# Now, send the updated data dictionary
channel_layer = get_channel_layer()
try:
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
'data': data
}
)
except Exception as e:
logger.warning(f"Failed to send WebSocket update: {e}")
# Use the standardized update function with garbage collection for program parsing
# This is a high-frequency operation that needs more aggressive memory management
collect_garbage = action == "parsing_programs" and progress % 10 == 0
send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage)
# Explicitly clear references
data = None
# For high-frequency parsing, occasionally force additional garbage collection
# to prevent memory buildup
if action == "parsing_programs" and progress % 50 == 0:
gc.collect()
def delete_epg_refresh_task_by_id(epg_id):
@ -529,18 +530,16 @@ def parse_channels_only(source):
# Update status to error
source.status = 'error'
source.last_message = f"No URL provided, cannot fetch EPG data"
source.save(update_fields=['status', 'last_message'])
send_epg_update(source.id, "parsing_channels", 100, status="error", error="No URL provided")
return False
source.save(update_fields=['updated_at'])
file_path = new_path
logger.info(f"Parsing channels from EPG file: {file_path}")
# Add memory tracking at start
import psutil
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"Initial memory usage: {initial_memory:.2f} MB")
# Initialize process variable for memory tracking
try:
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"Initial memory usage: {initial_memory:.2f} MB")
except (ImportError, NameError):
process = None
logger.warning("psutil not available for memory tracking")
# Replace full dictionary load with more efficient lookup set
existing_tvg_ids = set()
@ -574,7 +573,8 @@ def parse_channels_only(source):
progress = 0 # Initialize progress variable here
# Track memory at key points
logger.info(f"Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
try:
# Create a parser with the desired options
@ -584,7 +584,8 @@ def parse_channels_only(source):
# Open the file first
logger.info(f"Opening file for initial channel count: {file_path}")
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
logger.info(f"Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Count channels
try:
@ -597,7 +598,8 @@ def parse_channels_only(source):
# Close the file to reset position
logger.info(f"Closing initial file handle")
source_file.close()
logger.info(f"Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Update progress after counting
send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels)
@ -605,11 +607,13 @@ def parse_channels_only(source):
# Reset file position for actual processing
logger.info(f"Re-opening file for channel parsing: {file_path}")
source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb')
logger.info(f"Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB")
logger.info(f"Creating iterparse context")
channel_parser = etree.iterparse(source_file, events=('end',), tag='channel')
logger.info(f"Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB")
channel_count = 0
for _, elem in channel_parser:
@ -658,11 +662,13 @@ def parse_channels_only(source):
if len(epgs_to_create) >= batch_size:
logger.info(f"Bulk creating {len(epgs_to_create)} EPG entries")
EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True)
logger.info(f"Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB")
del epgs_to_create # Explicit deletion
epgs_to_create = []
gc.collect()
logger.info(f"Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB")
if len(epgs_to_update) >= batch_size:
EPGData.objects.bulk_update(epgs_to_update, ["name"])
@ -675,7 +681,8 @@ def parse_channels_only(source):
logger.info(f"Clearing existing_epgs cache at {processed_channels} channels")
existing_epgs.clear()
gc.collect()
logger.info(f"Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Send progress updates
if processed_channels % 100 == 0 or processed_channels == total_channels:
@ -689,9 +696,29 @@ def parse_channels_only(source):
)
logger.debug(f"Processed channel: {tvg_id} - {display_name}")
# Clear memory
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
try:
# First clear the element's content
elem.clear()
# Get the parent before we might lose reference to it
parent = elem.getparent()
if parent is not None:
# Clean up preceding siblings
while elem.getprevious() is not None:
del parent[0]
# Try to fully detach this element from parent
try:
parent.remove(elem)
del elem
del parent
except (ValueError, KeyError, TypeError):
# Element might already be removed or detached
pass
except Exception as e:
# Just log the error and continue - don't let cleanup errors stop processing
logger.debug(f"Non-critical error during XML element cleanup: {e}")
# Check if we should break early to avoid excessive sleep
if processed_channels >= total_channels and total_channels > 0:
@ -700,7 +727,8 @@ def parse_channels_only(source):
# Explicit cleanup before sleeping
logger.info(f"Completed channel parsing loop, processed {processed_channels} channels")
logger.info(f"Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Explicit cleanup of the parser
del channel_parser
@ -713,7 +741,8 @@ def parse_channels_only(source):
# Force garbage collection
gc.collect()
logger.info(f"Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Remove long sleep that might be causing issues
# time.sleep(200) # This seems excessive and may be causing issues
@ -737,9 +766,11 @@ def parse_channels_only(source):
logger.info(f"Updated final batch of {len(epgs_to_update)} EPG entries")
# Final garbage collection and memory tracking
logger.info(f"Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
gc.collect()
logger.info(f"Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
if process:
logger.info(f"Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Update source status with channel count
source.status = 'success'
@ -755,14 +786,7 @@ def parse_channels_only(source):
channels_count=processed_channels
)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {"success": True, "type": "epg_channels"}
}
)
send_websocket_update('updates', 'update', {"success": True, "type": "epg_channels"})
logger.info(f"Finished parsing channel info. Found {processed_channels} channels.")
# Remove excessive sleep
@ -790,15 +814,20 @@ def parse_channels_only(source):
logger.info("In finally block, ensuring cleanup")
existing_tvg_ids = None
existing_epgs = None
gc.collect()
# Check final memory usage
try:
import psutil
process = psutil.Process()
final_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"Final memory usage: {final_memory:.2f} MB")
except:
pass
# Explicitly clear the process object to prevent potential memory leaks
if 'process' in locals() and process is not None:
process = None
# Check final memory usage after clearing process
gc.collect()
@shared_task
@ -806,17 +835,16 @@ def parse_programs_for_tvg_id(epg_id):
if not acquire_task_lock('parse_epg_programs', epg_id):
logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task")
return "Task already running"
source_file = None
program_parser = None
programs_to_create = None
epg = None
epg_source = None
programs_processed = 0
try:
# Add memory tracking
try:
import psutil
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_tvg_id] Initial memory usage: {initial_memory:.2f} MB")
@ -834,39 +862,16 @@ def parse_programs_for_tvg_id(epg_id):
logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}")
# First, remove all existing programs - use chunked delete to avoid memory issues
# Delete old programs
chunk_size = 5000
programs_to_delete = ProgramData.objects.filter(epg=epg)
total_programs = programs_to_delete.count()
if total_programs > 0:
logger.info(f"Deleting {total_programs} existing programs for {epg.tvg_id}")
# More memory-efficient approach using cursor-based pagination
last_id = 0
while True:
# Get batch of IDs greater than the last ID processed
id_batch = list(programs_to_delete.filter(id__gt=last_id).order_by('id').values_list('id', flat=True)[:chunk_size])
if not id_batch:
break
# Store the last ID before deleting the batch variable
if id_batch:
max_id = id_batch[-1]
else:
max_id = 0
# Delete this batch
ProgramData.objects.filter(id__in=id_batch).delete()
# Release memory immediately
del id_batch
gc.collect()
# Update last_id for next iteration using our stored value
last_id = max_id
# Explicitly delete query objects
del programs_to_delete
del last_id
last_id = 0
while True:
ids = list(ProgramData.objects.filter(epg=epg, id__gt=last_id).order_by('id').values_list('id', flat=True)[:chunk_size])
if not ids:
break
ProgramData.objects.filter(id__in=ids).delete()
last_id = ids[-1]
del ids
gc.collect()
file_path = epg_source.file_path
@ -929,7 +934,7 @@ def parse_programs_for_tvg_id(epg_id):
# Memory usage tracking
if process:
mem_before = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_tvg_id] Memory before parsing: {mem_before:.2f} MB")
logger.info(f"[parse_programs_for_tvg_id] Memory before parsing {epg.tvg_id} - {mem_before:.2f} MB")
programs_to_create = []
batch_size = 1000 # Process in batches to limit memory usage
@ -968,9 +973,9 @@ def parse_programs_for_tvg_id(epg_id):
# Extract custom properties
custom_props = extract_custom_properties(elem)
custom_properties_json = None
if custom_props:
logger.debug(f"Number of custom properties: {len(custom_props)}")
logger.trace(f"Number of custom properties: {len(custom_props)}")
try:
custom_properties_json = json.dumps(custom_props)
except Exception as e:
@ -986,30 +991,58 @@ def parse_programs_for_tvg_id(epg_id):
tvg_id=epg.tvg_id,
custom_properties=custom_properties_json
))
programs_processed += 1
custom_props = None
custom_properties_json = None
del custom_props
del custom_properties_json
del start_time
del end_time
del title
del desc
del sub_title
elem.clear()
parent = elem.getparent()
if parent is not None:
while elem.getprevious() is not None:
del parent[0]
parent.remove(elem)
del elem
del parent
#gc.collect()
# Batch processing
if len(programs_to_create) >= batch_size:
ProgramData.objects.bulk_create(programs_to_create)
logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}")
del programs_to_create # Explicit deletion
programs_to_create = []
# Force more aggressive garbage collection
custom_props = None
custom_properties_json = None
del programs_to_create
del custom_props
del custom_properties_json
gc.collect()
#continue
except Exception as e:
logger.error(f"Error processing program for {epg.tvg_id}: {e}", exc_info=True)
# Important: Clear the element to avoid memory leaks (lxml specific method)
elem.clear()
# Also eliminate ancestors to prevent memory leaks
while elem.getprevious() is not None:
del elem.getparent()[0]
# Important: Clear the element to avoid memory leaks using a more robust approach
try:
# First clear the element's content
elem.clear()
# Get the parent before we might lose reference to it
parent = elem.getparent()
if parent is not None:
# Clean up preceding siblings
while elem.getprevious() is not None:
del parent[0]
# Try to fully detach this element from parent
try:
parent.remove(elem)
del elem
del parent
except (ValueError, KeyError, TypeError):
# Element might already be removed or detached
pass
except Exception as e:
# Just log the error and continue - don't let cleanup errors stop processing
logger.debug(f"Non-critical error during XML element cleanup: {e}")
# Make sure to close the file and release parser resources
if source_file:
@ -1032,18 +1065,21 @@ def parse_programs_for_tvg_id(epg_id):
if source_file:
source_file.close()
source_file = None
# Memory tracking after processing
if process:
mem_after = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 1 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)")
# Process any remaining items
if programs_to_create:
ProgramData.objects.bulk_create(programs_to_create)
logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}")
del programs_to_create
programs_to_create = []
programs_to_create = None
custom_props = None
custom_properties_json = None
#del programs_to_create
#programs_to_create = []
# Memory tracking after processing
if process:
mem_after = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_tvg_id] Memory after parsing {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)")
# Final garbage collection
gc.collect()
@ -1066,25 +1102,31 @@ def parse_programs_for_tvg_id(epg_id):
source_file.close()
except:
pass
# Memory tracking after processing
if process:
mem_after = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_tvg_id] Memory after parsing 2 {epg.tvg_id} - {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)")
source_file = None
program_parser = None
programs_to_create = None
epg = None
epg_source = None
# Explicitly clear the process object to prevent potential memory leaks
if 'process' in locals() and process is not None:
process = None
# Force garbage collection before releasing lock
gc.collect()
release_task_lock('parse_epg_programs', epg_id)
def parse_programs_for_source(epg_source, tvg_id=None):
# Send initial programs parsing notification
send_epg_update(epg_source.id, "parsing_programs", 0)
#time.sleep(100)
# Add memory tracking
try:
import psutil
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024
logger.info(f"[parse_programs_for_source] Initial memory usage: {initial_memory:.2f} MB")
@ -1219,6 +1261,10 @@ def parse_programs_for_source(epg_source, tvg_id=None):
processed = None
gc.collect()
# Explicitly clear the process object to prevent potential memory leaks
if 'process' in locals() and process is not None:
process = None
def fetch_schedules_direct(source):
logger.info(f"Fetching Schedules Direct data from source: {source.name}")

View file

@ -1088,6 +1088,8 @@ def refresh_single_m3u_account(account_id):
return f"Dispatched jobs complete."
from core.utils import send_websocket_update
def send_m3u_update(account_id, action, progress, **kwargs):
# Start with the base data dictionary
data = {
@ -1111,12 +1113,10 @@ def send_m3u_update(account_id, action, progress, **kwargs):
# Add the additional key-value pairs from kwargs
data.update(kwargs)
# Now, send the updated data dictionary
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
'data': data
}
)
# Use the standardized function with memory management
# Enable garbage collection for certain operations
collect_garbage = action == "parsing" and progress % 25 == 0
send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage)
# Explicitly clear data reference to help garbage collection
data = None

View file

@ -6,8 +6,10 @@ import redis
import json
import logging
import re
import gc # Add import for garbage collection
from core.utils import RedisClient
from apps.proxy.ts_proxy.channel_status import ChannelStatus
from core.utils import send_websocket_update
logger = logging.getLogger(__name__)
@ -43,11 +45,17 @@ def fetch_channel_stats():
return
# return JsonResponse({'error': str(e)}, status=500)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
send_websocket_update(
"updates",
"update",
{
"type": "update",
"data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})}
"success": True,
"type": "channel_stats",
"stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})
},
collect_garbage=True
)
# Explicitly clean up large data structures
all_channels = None
gc.collect()

View file

@ -8,7 +8,7 @@ import logging
import re
import time
import os
from core.utils import RedisClient
from core.utils import RedisClient, send_websocket_update
from apps.proxy.ts_proxy.channel_status import ChannelStatus
from apps.m3u.models import M3UAccount
from apps.epg.models import EPGSource
@ -317,19 +317,24 @@ def fetch_channel_stats():
if cursor == 0:
break
send_websocket_update(
"updates",
"update",
{
"success": True,
"type": "channel_stats",
"stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})
},
collect_garbage=True
)
# Explicitly clean up large data structures
all_channels = None
gc.collect()
except Exception as e:
logger.error(f"Error in channel_status: {e}", exc_info=True)
return
# return JsonResponse({'error': str(e)}, status=500)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"updates",
{
"type": "update",
"data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})}
},
)
@shared_task
def rehash_streams(keys):

View file

@ -173,15 +173,47 @@ def release_task_lock(task_name, id):
# Remove the lock
redis_client.delete(lock_id)
def send_websocket_event(event, success, data):
def send_websocket_update(group_name, event_type, data, collect_garbage=False):
"""
Standardized function to send WebSocket updates with proper memory management.
Args:
group_name: The WebSocket group to send to (e.g. 'updates')
event_type: The type of message (e.g. 'update')
data: The data to send
collect_garbage: Whether to force garbage collection after sending
"""
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {"success": True, "type": "epg_channels"}
}
)
try:
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': event_type,
'data': data
}
)
except Exception as e:
logger.warning(f"Failed to send WebSocket update: {e}")
finally:
# Explicitly release references to help garbage collection
channel_layer = None
# Force garbage collection if requested
if collect_garbage:
gc.collect()
def send_websocket_event(event, success, data):
"""Acquire a lock to prevent concurrent task execution."""
data_payload = {"success": success, "type": event}
if data:
# Make a copy to avoid modifying the original
data_payload.update(data)
# Use the standardized function
send_websocket_update('updates', 'update', data_payload)
# Help garbage collection by clearing references
data_payload = None
# Add memory monitoring utilities
def get_memory_usage():