diff --git a/apps/channels/tasks.py b/apps/channels/tasks.py index 88d040e8..b4de5e07 100755 --- a/apps/channels/tasks.py +++ b/apps/channels/tasks.py @@ -63,146 +63,162 @@ def match_epg_channels(): 4) If a match is found, we set channel.tvg_id 5) Summarize and log results. """ - logger.info("Starting EPG matching logic...") - - # Attempt to retrieve a "preferred-region" if configured try: - region_obj = CoreSettings.objects.get(key="preferred-region") - region_code = region_obj.value.strip().lower() - except CoreSettings.DoesNotExist: - region_code = None + logger.info("Starting EPG matching logic...") - matched_channels = [] - channels_to_update = [] + # Attempt to retrieve a "preferred-region" if configured + try: + region_obj = CoreSettings.objects.get(key="preferred-region") + region_code = region_obj.value.strip().lower() + except CoreSettings.DoesNotExist: + region_code = None - # Get channels that don't have EPG data assigned - channels_without_epg = Channel.objects.filter(epg_data__isnull=True) - logger.info(f"Found {channels_without_epg.count()} channels without EPG data") + matched_channels = [] + channels_to_update = [] - channels_json = [] - for channel in channels_without_epg: - # Normalize TVG ID - strip whitespace and convert to lowercase - normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" - if normalized_tvg_id: - logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'") + # Get channels that don't have EPG data assigned + channels_without_epg = Channel.objects.filter(epg_data__isnull=True) + logger.info(f"Found {channels_without_epg.count()} channels without EPG data") - channels_json.append({ - "id": channel.id, - "name": channel.name, - "tvg_id": normalized_tvg_id, # Use normalized TVG ID - "original_tvg_id": channel.tvg_id, # Keep original for reference - "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, - "norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name) - }) + channels_json = [] + for channel in channels_without_epg: + # Normalize TVG ID - strip whitespace and convert to lowercase + normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" + if normalized_tvg_id: + logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'") - # Similarly normalize EPG data TVG IDs - epg_json = [] - for epg in EPGData.objects.all(): - normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" - epg_json.append({ - 'id': epg.id, - 'tvg_id': normalized_tvg_id, # Use normalized TVG ID - 'original_tvg_id': epg.tvg_id, # Keep original for reference - 'name': epg.name, - 'norm_name': normalize_name(epg.name), - 'epg_source_id': epg.epg_source.id if epg.epg_source else None, - }) + channels_json.append({ + "id": channel.id, + "name": channel.name, + "tvg_id": normalized_tvg_id, # Use normalized TVG ID + "original_tvg_id": channel.tvg_id, # Keep original for reference + "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, + "norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name) + }) - # Log available EPG data TVG IDs for debugging - unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id']) - logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}") + # Similarly normalize EPG data TVG IDs + epg_json = [] + for epg in EPGData.objects.all(): + normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" + epg_json.append({ + 'id': epg.id, + 'tvg_id': normalized_tvg_id, # Use normalized TVG ID + 'original_tvg_id': epg.tvg_id, # Keep original for reference + 'name': epg.name, + 'norm_name': normalize_name(epg.name), + 'epg_source_id': epg.epg_source.id if epg.epg_source else None, + }) - payload = { - "channels": channels_json, - "epg_data": epg_json, - "region_code": region_code, - } + # Log available EPG data TVG IDs for debugging + unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id']) + logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}") - with tempfile.NamedTemporaryFile(delete=False) as temp_file: - temp_file.write(json.dumps(payload).encode('utf-8')) - temp_file_path = temp_file.name - - process = subprocess.Popen( - ['python', '/app/scripts/epg_match.py', temp_file_path], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - - # Log stderr in real-time - for line in iter(process.stderr.readline, ''): - if line: - logger.info(line.strip()) - - process.stderr.close() - stdout, stderr = process.communicate() - - os.remove(temp_file_path) - - if process.returncode != 0: - return f"Failed to process EPG matching: {stderr}" - - result = json.loads(stdout) - # This returns lists of dicts, not model objects - channels_to_update_dicts = result["channels_to_update"] - matched_channels = result["matched_channels"] - - # Convert your dict-based 'channels_to_update' into real Channel objects - if channels_to_update_dicts: - # Extract IDs of the channels that need updates - channel_ids = [d["id"] for d in channels_to_update_dicts] - - # Fetch them from DB - channels_qs = Channel.objects.filter(id__in=channel_ids) - channels_list = list(channels_qs) - - # Build a map from channel_id -> epg_data_id (or whatever fields you need) - epg_mapping = { - d["id"]: d["epg_data_id"] for d in channels_to_update_dicts + payload = { + "channels": channels_json, + "epg_data": epg_json, + "region_code": region_code, } - # Populate each Channel object with the updated epg_data_id - for channel_obj in channels_list: - # The script sets 'epg_data_id' in the returned dict - # We either assign directly, or fetch the EPGData instance if needed. - channel_obj.epg_data_id = epg_mapping.get(channel_obj.id) + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(json.dumps(payload).encode('utf-8')) + temp_file_path = temp_file.name - # Now we have real model objects, so bulk_update will work - Channel.objects.bulk_update(channels_list, ["epg_data"]) + # After writing to the file but before subprocess + # Explicitly delete the large data structures + del payload + gc.collect() - total_matched = len(matched_channels) - if total_matched: - logger.info(f"Match Summary: {total_matched} channel(s) matched.") - for (cid, cname, tvg) in matched_channels: - logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'") - else: - logger.info("No new channels were matched.") + process = subprocess.Popen( + ['python', '/app/scripts/epg_match.py', temp_file_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) - logger.info("Finished EPG matching logic.") + # Log stderr in real-time + for line in iter(process.stderr.readline, ''): + if line: + logger.info(line.strip()) - # Send update with additional information for refreshing UI - channel_layer = get_channel_layer() - associations = [ - {"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]} - for chan in channels_to_update_dicts - ] + process.stderr.close() + stdout, stderr = process.communicate() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - "data": { - "success": True, - "type": "epg_match", - "refresh_channels": True, # Flag to tell frontend to refresh channels - "matches_count": total_matched, - "message": f"EPG matching complete: {total_matched} channel(s) matched", - "associations": associations # Add the associations data + os.remove(temp_file_path) + + if process.returncode != 0: + return f"Failed to process EPG matching: {stderr}" + + result = json.loads(stdout) + # This returns lists of dicts, not model objects + channels_to_update_dicts = result["channels_to_update"] + matched_channels = result["matched_channels"] + + # Explicitly clean up large objects + del stdout, result + gc.collect() + + # Convert your dict-based 'channels_to_update' into real Channel objects + if channels_to_update_dicts: + # Extract IDs of the channels that need updates + channel_ids = [d["id"] for d in channels_to_update_dicts] + + # Fetch them from DB + channels_qs = Channel.objects.filter(id__in=channel_ids) + channels_list = list(channels_qs) + + # Build a map from channel_id -> epg_data_id (or whatever fields you need) + epg_mapping = { + d["id"]: d["epg_data_id"] for d in channels_to_update_dicts } - } - ) - return f"Done. Matched {total_matched} channel(s)." + # Populate each Channel object with the updated epg_data_id + for channel_obj in channels_list: + # The script sets 'epg_data_id' in the returned dict + # We either assign directly, or fetch the EPGData instance if needed. + channel_obj.epg_data_id = epg_mapping.get(channel_obj.id) + + # Now we have real model objects, so bulk_update will work + Channel.objects.bulk_update(channels_list, ["epg_data"]) + + total_matched = len(matched_channels) + if total_matched: + logger.info(f"Match Summary: {total_matched} channel(s) matched.") + for (cid, cname, tvg) in matched_channels: + logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'") + else: + logger.info("No new channels were matched.") + + logger.info("Finished EPG matching logic.") + + # Send update with additional information for refreshing UI + channel_layer = get_channel_layer() + associations = [ + {"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]} + for chan in channels_to_update_dicts + ] + + async_to_sync(channel_layer.group_send)( + 'updates', + { + 'type': 'update', + "data": { + "success": True, + "type": "epg_match", + "refresh_channels": True, # Flag to tell frontend to refresh channels + "matches_count": total_matched, + "message": f"EPG matching complete: {total_matched} channel(s) matched", + "associations": associations # Add the associations data + } + } + ) + + return f"Done. Matched {total_matched} channel(s)." + finally: + # Final cleanup + gc.collect() + # Force an even more aggressive cleanup + import gc + gc.collect(generation=2) @shared_task diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index 7f042e3a..74a77f0e 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -117,6 +117,8 @@ def refresh_all_epg_data(): for source in active_sources: refresh_epg_data(source.id) + # Force garbage collection between sources + gc.collect() logger.info("Finished refresh_epg_data task.") return "EPG data refreshed." @@ -128,6 +130,7 @@ def refresh_epg_data(source_id): logger.debug(f"EPG refresh for {source_id} already running") return + source = None try: # Try to get the EPG source try: @@ -144,12 +147,16 @@ def refresh_epg_data(source_id): # Release the lock and exit release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return f"EPG source {source_id} does not exist, task cleaned up" # The source exists but is not active, just skip processing if not source.is_active: logger.info(f"EPG source {source_id} is not active. Skipping.") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return # Continue with the normal processing... @@ -159,12 +166,16 @@ def refresh_epg_data(source_id): if not fetch_success: logger.error(f"Failed to fetch XMLTV for source {source.name}") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return parse_channels_success = parse_channels_only(source) if not parse_channels_success: logger.error(f"Failed to parse channels for source {source.name}") release_task_lock('refresh_epg_data', source_id) + # Force garbage collection before exit + gc.collect() return parse_programs_for_source(source) @@ -176,14 +187,18 @@ def refresh_epg_data(source_id): except Exception as e: logger.error(f"Error in refresh_epg_data for source {source_id}: {e}", exc_info=True) try: - source = EPGSource.objects.get(id=source_id) - source.status = 'error' - source.last_message = f"Error refreshing EPG data: {str(e)}" - source.save(update_fields=['status', 'last_message']) - send_epg_update(source_id, "refresh", 100, status="error", error=str(e)) + if source: + source.status = 'error' + source.last_message = f"Error refreshing EPG data: {str(e)}" + source.save(update_fields=['status', 'last_message']) + send_epg_update(source_id, "refresh", 100, status="error", error=str(e)) except Exception as inner_e: logger.error(f"Error updating source status: {inner_e}") finally: + # Clear references to ensure proper garbage collection + source = None + # Force garbage collection before releasing the lock + gc.collect() release_task_lock('refresh_epg_data', source_id) @@ -191,7 +206,6 @@ def fetch_xmltv(source): # Handle cases with local file but no URL if not source.url and source.file_path and os.path.exists(source.file_path): logger.info(f"Using existing local file for EPG source: {source.name} at {source.file_path}") - # Set the status to success in the database source.status = 'success' source.save(update_fields=['status']) @@ -350,6 +364,9 @@ def fetch_xmltv(source): downloaded=f"{downloaded / (1024 * 1024):.2f} MB" ) + # Explicitly delete the chunk to free memory immediately + del chunk + # Send completion notification send_epg_update(source.id, "downloading", 100) @@ -517,9 +534,14 @@ def parse_channels_only(source): return False file_path = new_path - logger.info(f"Parsing channels from EPG file: {file_path}") + # Add memory tracking at start + import psutil + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"Initial memory usage: {initial_memory:.2f} MB") + # Replace full dictionary load with more efficient lookup set existing_tvg_ids = set() existing_epgs = {} # Initialize the dictionary that will lazily load objects @@ -537,7 +559,7 @@ def parse_channels_only(source): existing_tvg_ids.update(tvg_id_chunk) last_id = EPGData.objects.filter(tvg_id__in=tvg_id_chunk).order_by('-id')[0].id - + #time.sleep(20) # Update progress to show file read starting send_epg_update(source.id, "parsing_channels", 10) @@ -549,35 +571,49 @@ def parse_channels_only(source): total_channels = 0 processed_channels = 0 batch_size = 500 # Process in batches to limit memory usage + progress = 0 # Initialize progress variable here + + # Track memory at key points + logger.info(f"Memory before opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") try: # Create a parser with the desired options - parser = etree.XMLParser(huge_tree=True, remove_blank_text=True) + #parser = etree.XMLParser(huge_tree=True, remove_blank_text=True) # Count channels for progress reporting - use proper lxml approach # Open the file first + logger.info(f"Opening file for initial channel count: {file_path}") source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') - - # Create an iterparse context without parser parameter - channel_finder = etree.iterparse(source_file, events=('end',), tag='channel') + logger.info(f"Memory after opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Count channels try: total_channels = EPGData.objects.filter(epg_source=source).count() - except: + logger.info(f"Found {total_channels} existing channels for this source") + except Exception as e: + logger.error(f"Error counting channels: {e}") total_channels = 500 # Default estimate # Close the file to reset position + logger.info(f"Closing initial file handle") source_file.close() + logger.info(f"Memory after closing initial file: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Update progress after counting send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels) # Reset file position for actual processing + logger.info(f"Re-opening file for channel parsing: {file_path}") source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') - channel_parser = etree.iterparse(source_file, events=('end',), tag='channel') + logger.info(f"Memory after re-opening file: {process.memory_info().rss / 1024 / 1024:.2f} MB") + logger.info(f"Creating iterparse context") + channel_parser = etree.iterparse(source_file, events=('end',), tag='channel') + logger.info(f"Memory after creating iterparse: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + channel_count = 0 for _, elem in channel_parser: + channel_count += 1 tvg_id = elem.get('id', '').strip() if tvg_id: display_name = None @@ -620,10 +656,13 @@ def parse_channels_only(source): # Batch processing if len(epgs_to_create) >= batch_size: + logger.info(f"Bulk creating {len(epgs_to_create)} EPG entries") EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) + logger.info(f"Memory after bulk_create: {process.memory_info().rss / 1024 / 1024:.2f} MB") + del epgs_to_create # Explicit deletion epgs_to_create = [] - # Force garbage collection gc.collect() + logger.info(f"Memory after gc.collect(): {process.memory_info().rss / 1024 / 1024:.2f} MB") if len(epgs_to_update) >= batch_size: EPGData.objects.bulk_update(epgs_to_update, ["name"]) @@ -633,8 +672,10 @@ def parse_channels_only(source): # Periodically clear the existing_epgs cache to prevent memory buildup if processed_channels % 1000 == 0: + logger.info(f"Clearing existing_epgs cache at {processed_channels} channels") existing_epgs.clear() gc.collect() + logger.info(f"Memory after clearing cache: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Send progress updates if processed_channels % 100 == 0 or processed_channels == total_channels: @@ -646,17 +687,38 @@ def parse_channels_only(source): processed=processed_channels, total=total_channels ) - + logger.debug(f"Processed channel: {tvg_id} - {display_name}") # Clear memory elem.clear() while elem.getprevious() is not None: del elem.getparent()[0] - # Make sure to close the file + # Check if we should break early to avoid excessive sleep + if processed_channels >= total_channels and total_channels > 0: + logger.info(f"Breaking channel processing loop - processed {processed_channels}/{total_channels}") + break + + # Explicit cleanup before sleeping + logger.info(f"Completed channel parsing loop, processed {processed_channels} channels") + logger.info(f"Memory before cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + # Explicit cleanup of the parser + del channel_parser + logger.info(f"Deleted channel_parser object") + + # Close the file + logger.info(f"Closing file: {file_path}") source_file.close() + logger.info(f"File closed: {file_path}") + + # Force garbage collection + gc.collect() + logger.info(f"Memory after final cleanup: {process.memory_info().rss / 1024 / 1024:.2f} MB") + + # Remove long sleep that might be causing issues + # time.sleep(200) # This seems excessive and may be causing issues except (etree.XMLSyntaxError, Exception) as xml_error: - # Instead of falling back, just handle the error logger.error(f"XML parsing failed: {xml_error}") # Update status to error source.status = 'error' @@ -668,12 +730,16 @@ def parse_channels_only(source): # Process any remaining items if epgs_to_create: EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) + logger.info(f"Created final batch of {len(epgs_to_create)} EPG entries") if epgs_to_update: EPGData.objects.bulk_update(epgs_to_update, ["name"]) + logger.info(f"Updated final batch of {len(epgs_to_update)} EPG entries") - # Final garbage collection + # Final garbage collection and memory tracking + logger.info(f"Memory before final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") gc.collect() + logger.info(f"Memory after final gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") # Update source status with channel count source.status = 'success' @@ -699,6 +765,8 @@ def parse_channels_only(source): ) logger.info(f"Finished parsing channel info. Found {processed_channels} channels.") + # Remove excessive sleep + # time.sleep(20) return True except FileNotFoundError: @@ -718,9 +786,19 @@ def parse_channels_only(source): send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(e)) return False finally: + # Add more detailed cleanup in finally block + logger.info("In finally block, ensuring cleanup") existing_tvg_ids = None existing_epgs = None gc.collect() + # Check final memory usage + try: + import psutil + process = psutil.Process() + final_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"Final memory usage: {final_memory:.2f} MB") + except: + pass @shared_task @@ -729,7 +807,22 @@ def parse_programs_for_tvg_id(epg_id): logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task") return "Task already running" + source_file = None + program_parser = None + programs_to_create = None + epg = None + epg_source = None + try: + # Add memory tracking + try: + import psutil + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Initial memory usage: {initial_memory:.2f} MB") + except ImportError: + process = None + epg = EPGData.objects.get(id=epg_id) epg_source = epg.epg_source @@ -756,12 +849,25 @@ def parse_programs_for_tvg_id(epg_id): if not id_batch: break + # Store the last ID before deleting the batch variable + if id_batch: + max_id = id_batch[-1] + else: + max_id = 0 + # Delete this batch ProgramData.objects.filter(id__in=id_batch).delete() + # Release memory immediately + del id_batch gc.collect() - # Update last_id for next iteration - last_id = id_batch[-1] if id_batch else 0 + # Update last_id for next iteration using our stored value + last_id = max_id + + # Explicitly delete query objects + del programs_to_delete + del last_id + gc.collect() file_path = epg_source.file_path if not file_path: @@ -820,17 +926,23 @@ def parse_programs_for_tvg_id(epg_id): logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}") + # Memory usage tracking + if process: + mem_before = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Memory before parsing: {mem_before:.2f} MB") + programs_to_create = [] batch_size = 1000 # Process in batches to limit memory usage + programs_processed = 0 try: # Create a parser with the desired options - parser = etree.XMLParser(huge_tree=True, remove_blank_text=True) + #parser = etree.XMLParser(huge_tree=True, remove_blank_text=True) # Open the file properly source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') - # Stream parse the file using lxml's iterparse (without parser parameter) + # Stream parse the file using lxml's iterparse program_parser = etree.iterparse(source_file, events=('end',), tag='programme') for _, elem in program_parser: @@ -858,6 +970,7 @@ def parse_programs_for_tvg_id(epg_id): custom_props = extract_custom_properties(elem) custom_properties_json = None if custom_props: + logger.debug(f"Number of custom properties: {len(custom_props)}") try: custom_properties_json = json.dumps(custom_props) except Exception as e: @@ -874,12 +987,19 @@ def parse_programs_for_tvg_id(epg_id): custom_properties=custom_properties_json )) + programs_processed += 1 + custom_props = None + custom_properties_json = None # Batch processing if len(programs_to_create) >= batch_size: ProgramData.objects.bulk_create(programs_to_create) logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}") + del programs_to_create # Explicit deletion programs_to_create = [] - # Force garbage collection after batch processing + + # Force more aggressive garbage collection + custom_props = None + custom_properties_json = None gc.collect() except Exception as e: @@ -891,8 +1011,17 @@ def parse_programs_for_tvg_id(epg_id): while elem.getprevious() is not None: del elem.getparent()[0] - # Make sure to close the file - source_file.close() + # Make sure to close the file and release parser resources + if source_file: + source_file.close() + source_file = None + + if program_parser: + program_parser = None + + # Free parser memory + parser = None + gc.collect() except etree.XMLSyntaxError as xml_error: logger.error(f"XML syntax error parsing program data: {xml_error}") @@ -900,17 +1029,54 @@ def parse_programs_for_tvg_id(epg_id): except Exception as e: logger.error(f"Error parsing XML for programs: {e}", exc_info=True) raise + finally: + # Ensure file is closed even if an exception occurs + if source_file: + source_file.close() + source_file = None # Process any remaining items if programs_to_create: ProgramData.objects.bulk_create(programs_to_create) logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}") + del programs_to_create + programs_to_create = [] + + # Memory tracking after processing + if process: + mem_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_tvg_id] Memory after parsing {programs_processed} programs: {mem_after:.2f} MB (change: {mem_after-mem_before:.2f} MB)") # Final garbage collection gc.collect() + # One additional garbage collection specifically for lxml elements + # which can sometimes be retained due to reference cycles + gc.collect() + + # Reset internal caches and pools that lxml might be keeping + try: + etree.clear_error_log() + except: + pass + logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") finally: + # Explicit cleanup of all potentially large objects + if source_file: + try: + source_file.close() + except: + pass + + source_file = None + program_parser = None + programs_to_create = None + epg = None + epg_source = None + + # Force garbage collection before releasing lock + gc.collect() release_task_lock('parse_epg_programs', epg_id) @@ -918,6 +1084,16 @@ def parse_programs_for_source(epg_source, tvg_id=None): # Send initial programs parsing notification send_epg_update(epg_source.id, "parsing_programs", 0) + # Add memory tracking + try: + import psutil + process = psutil.Process() + initial_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_source] Initial memory usage: {initial_memory:.2f} MB") + except ImportError: + logger.warning("psutil not available for memory tracking") + process = None + try: # Process EPG entries in batches rather than all at once batch_size = 20 # Process fewer channels at once to reduce memory usage @@ -939,6 +1115,10 @@ def parse_programs_for_source(epg_source, tvg_id=None): updated_count = 0 processed = 0 + # Memory check before batch processing + if process: + logger.info(f"[parse_programs_for_source] Memory before batch processing: {process.memory_info().rss / 1024 / 1024:.2f} MB") + # Process in batches using cursor-based approach to limit memory usage last_id = 0 while True: @@ -969,10 +1149,18 @@ def parse_programs_for_source(epg_source, tvg_id=None): logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True) failed_entries.append(f"{epg.tvg_id}: {str(e)}") + # Memory check after processing batch + if process: + logger.info(f"[parse_programs_for_source] Memory after processing batch: {process.memory_info().rss / 1024 / 1024:.2f} MB") + # Force garbage collection after each batch batch_entries = None # Remove reference to help garbage collection gc.collect() + # Memory check after garbage collection + if process: + logger.info(f"[parse_programs_for_source] Memory after gc: {process.memory_info().rss / 1024 / 1024:.2f} MB") + # If there were failures, include them in the message but continue if failed_entries: epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed @@ -986,6 +1174,11 @@ def parse_programs_for_source(epg_source, tvg_id=None): send_epg_update(epg_source.id, "parsing_programs", 100, status="success", message=epg_source.last_message) + + # Explicitly release memory of large lists before returning + del failed_entries + gc.collect() + return True # If all successful, set a comprehensive success message @@ -1012,6 +1205,21 @@ def parse_programs_for_source(epg_source, tvg_id=None): status="error", message=epg_source.last_message) return False + finally: + # Final memory cleanup and tracking + if process: + # Force garbage collection before measuring + gc.collect() + final_memory = process.memory_info().rss / 1024 / 1024 + logger.info(f"[parse_programs_for_source] Final memory usage: {final_memory:.2f} MB") + + # Explicitly release any remaining large data structures + failed_entries = None + program_count = None + channel_count = None + updated_count = None + processed = None + gc.collect() def fetch_schedules_direct(source): @@ -1118,13 +1326,11 @@ def parse_schedules_direct_time(time_str): # Helper function to extract custom properties - moved to a separate function to clean up the code def extract_custom_properties(prog): + # Create a new dictionary for each call custom_props = {} - # Extract categories - categories = [] - for cat_elem in prog.findall('category'): - if cat_elem.text and cat_elem.text.strip(): - categories.append(cat_elem.text.strip()) + # Extract categories with a single comprehension to reduce intermediate objects + categories = [cat.text.strip() for cat in prog.findall('category') if cat.text and cat.text.strip()] if categories: custom_props['categories'] = categories @@ -1151,25 +1357,23 @@ def extract_custom_properties(prog): # Just store the raw onscreen format custom_props['onscreen_episode'] = ep_num.text.strip() - # Extract ratings - for rating_elem in prog.findall('rating'): + # Extract ratings more efficiently + rating_elem = prog.find('rating') + if rating_elem is not None: value_elem = rating_elem.find('value') if value_elem is not None and value_elem.text: custom_props['rating'] = value_elem.text.strip() if rating_elem.get('system'): custom_props['rating_system'] = rating_elem.get('system') - break # Just use the first rating - # Extract credits (actors, directors, etc.) + # Extract credits more efficiently credits_elem = prog.find('credits') if credits_elem is not None: credits = {} for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']: - elements = credits_elem.findall(credit_type) - if elements: - names = [e.text.strip() for e in elements if e.text and e.text.strip()] - if names: - credits[credit_type] = names + names = [e.text.strip() for e in credits_elem.findall(credit_type) if e.text and e.text.strip()] + if names: + credits[credit_type] = names if credits: custom_props['credits'] = credits @@ -1182,11 +1386,11 @@ def extract_custom_properties(prog): if country_elem is not None and country_elem.text: custom_props['country'] = country_elem.text.strip() - for icon_elem in prog.findall('icon'): - if icon_elem.get('src'): - custom_props['icon'] = icon_elem.get('src') - break # Just use the first icon + icon_elem = prog.find('icon') + if icon_elem is not None and icon_elem.get('src'): + custom_props['icon'] = icon_elem.get('src') + # Simpler approach for boolean flags for kw in ['previously-shown', 'premiere', 'new']: if prog.find(kw) is not None: custom_props[kw.replace('-', '_')] = True diff --git a/core/tasks.py b/core/tasks.py index a6bd80cf..fbd9277d 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -36,11 +36,6 @@ LOG_THROTTLE_SECONDS = 300 # 5 minutes # Track if this is the first scan since startup _first_scan_completed = False -@shared_task -def beat_periodic_task(): - fetch_channel_stats() - scan_and_process_files() - def throttled_log(logger_method, message, key=None, *args, **kwargs): """Only log messages with the same key once per throttle period""" if key is None: @@ -52,6 +47,32 @@ def throttled_log(logger_method, message, key=None, *args, **kwargs): logger_method(message, *args, **kwargs) _last_log_times[key] = now +def clear_memory(): + """Force aggressive garbage collection to free memory""" + import gc + # Run full garbage collection + gc.collect(generation=2) + # Find and break any reference cycles + gc.collect(generation=0) + # Clear any cached objects in memory + gc.collect(generation=1) + # Check if psutil is available for more advanced monitoring + try: + import psutil + process = psutil.Process() + if hasattr(process, 'memory_info'): + mem = process.memory_info().rss / (1024 * 1024) + logger.debug(f"Memory usage after cleanup: {mem:.2f} MB") + except (ImportError, Exception): + pass + +@shared_task +def beat_periodic_task(): + fetch_channel_stats() + scan_and_process_files() + # Call memory cleanup after completing tasks + clear_memory() + @shared_task def scan_and_process_files(): global _first_scan_completed @@ -270,6 +291,9 @@ def scan_and_process_files(): # Mark that the first scan is complete _first_scan_completed = True + # Force memory cleanup + clear_memory() + def fetch_channel_stats(): redis_client = RedisClient.get_client() diff --git a/dispatcharr/celery.py b/dispatcharr/celery.py index a0ff2168..b0debc76 100644 --- a/dispatcharr/celery.py +++ b/dispatcharr/celery.py @@ -2,6 +2,7 @@ import os from celery import Celery import logging +from celery.signals import task_postrun # Add import for signals # Initialize with defaults before Django settings are loaded DEFAULT_LOG_LEVEL = 'DEBUG' @@ -48,6 +49,24 @@ app.conf.update( worker_task_log_format='%(asctime)s %(levelname)s %(task_name)s: %(message)s', ) +# Add memory cleanup after task completion +@task_postrun.connect # Use the imported signal +def cleanup_task_memory(**kwargs): + """Clean up memory after each task completes""" + import gc + # Force garbage collection + gc.collect() + + # Log memory usage if psutil is installed + try: + import psutil + process = psutil.Process() + if hasattr(process, 'memory_info'): + mem = process.memory_info().rss / (1024 * 1024) + print(f"Memory usage after task: {mem:.2f} MB") + except (ImportError, Exception): + pass + @app.on_after_configure.connect def setup_celery_logging(**kwargs): # Use our directly determined log level diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 9eb7dd2b..de8464d5 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -199,6 +199,15 @@ CELERY_BROKER_TRANSPORT_OPTIONS = { CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' +# Memory management settings +#CELERY_WORKER_MAX_TASKS_PER_CHILD = 10 # Restart worker after 10 tasks to free memory +#CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Don't prefetch tasks - process one at a time +#CELERY_TASK_ACKS_LATE = True # Only acknowledge tasks after they're processed +#CELERY_TASK_TIME_LIMIT = 3600 # 1 hour time limit per task +#CELERY_TASK_SOFT_TIME_LIMIT = 3540 # Soft limit 60 seconds before hard limit +#CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True # Cancel tasks if connection lost +#CELERY_TASK_IGNORE_RESULT = True # Don't store results unless explicitly needed + CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler" CELERY_BEAT_SCHEDULE = { 'fetch-channel-statuses': { diff --git a/docker/uwsgi.debug.ini b/docker/uwsgi.debug.ini index 43ecd5ce..ea567e1e 100644 --- a/docker/uwsgi.debug.ini +++ b/docker/uwsgi.debug.ini @@ -8,7 +8,7 @@ exec-before = python /app/scripts/wait_for_redis.py ; Start Redis first attach-daemon = redis-server ; Then start other services -attach-daemon = celery -A dispatcharr worker +attach-daemon = celery -A dispatcharr worker --concurrency=4 attach-daemon = celery -A dispatcharr beat attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application attach-daemon = cd /app/frontend && npm run dev