diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index f102630f..395f49cb 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -5,9 +5,11 @@ import gzip import os import uuid import requests -import xml.etree.ElementTree as ET import time # Add import for tracking download progress from datetime import datetime, timedelta, timezone as dt_timezone +import gc # Add garbage collection module +import json +from lxml import etree # Using lxml exclusively from celery import shared_task from django.conf import settings @@ -40,13 +42,16 @@ def send_epg_update(source_id, action, progress, **kwargs): # Now, send the updated data dictionary channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - 'updates', - { - 'type': 'update', - 'data': data - } - ) + try: + async_to_sync(channel_layer.group_send)( + 'updates', + { + 'type': 'update', + 'data': data + } + ) + except Exception as e: + logger.warning(f"Failed to send WebSocket update: {e}") def delete_epg_refresh_task_by_id(epg_id): @@ -206,8 +211,12 @@ def fetch_xmltv(source): send_epg_update(source.id, "downloading", 100, status="error", error="No URL provided and no valid local file exists") return False + # Clean up existing cache file if os.path.exists(source.get_cache_file()): - os.remove(source.get_cache_file()) + try: + os.remove(source.get_cache_file()) + except Exception as e: + logger.warning(f"Failed to remove existing cache file: {e}") logger.info(f"Fetching XMLTV data from source: {source.name}") try: @@ -235,7 +244,7 @@ def fetch_xmltv(source): send_epg_update(source.id, "downloading", 0) # Use streaming response to track download progress - with requests.get(source.url, headers=headers, stream=True, timeout=30) as response: + with requests.get(source.url, headers=headers, stream=True, timeout=60) as response: # Handle 404 specifically if response.status_code == 404: logger.error(f"EPG URL not found (404): {source.url}") @@ -304,9 +313,10 @@ def fetch_xmltv(source): downloaded = 0 start_time = time.time() last_update_time = start_time + update_interval = 0.5 # Only update every 0.5 seconds with open(cache_file, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): + for chunk in response.iter_content(chunk_size=16384): # Increased chunk size for better performance if chunk: f.write(chunk) @@ -326,17 +336,18 @@ def fetch_xmltv(source): # Time remaining (in seconds) time_remaining = (total_size - downloaded) / (speed * 1024) if speed > 0 and total_size > 0 else 0 - # Only send updates every 0.5 seconds to avoid flooding + # Only send updates at specified intervals to avoid flooding current_time = time.time() - if current_time - last_update_time >= 0.5 and progress > 0: + if current_time - last_update_time >= update_interval and progress > 0: last_update_time = current_time send_epg_update( source.id, "downloading", progress, - speed=speed, - elapsed_time=elapsed_time, - time_remaining=time_remaining + speed=round(speed, 2), + elapsed_time=round(elapsed_time, 1), + time_remaining=round(time_remaining, 1), + downloaded=f"{downloaded / (1024 * 1024):.2f} MB" ) # Send completion notification @@ -424,6 +435,20 @@ def fetch_xmltv(source): ) send_epg_update(source.id, "downloading", 100, status="error", error=user_message) return False + except requests.exceptions.Timeout as e: + # Handle timeout errors specifically + error_message = str(e) + user_message = f"Timeout error: EPG source '{source.name}' took too long to respond" + logger.error(f"Timeout error fetching XMLTV from {source.name}: {e}", exc_info=True) + + # Update source status + source.status = 'error' + source.last_message = user_message + source.save(update_fields=['status', 'last_message']) + + # Send notifications + send_epg_update(source.id, "downloading", 100, status="error", error=user_message) + return False except Exception as e: error_message = str(e) logger.error(f"Error fetching XMLTV from {source.name}: {e}", exc_info=True) @@ -496,65 +521,133 @@ def parse_channels_only(source): logger.info(f"Parsing channels from EPG file: {file_path}") existing_epgs = {e.tvg_id: e for e in EPGData.objects.filter(epg_source=source)} - # Read entire file (decompress if .gz) - if file_path.endswith('.gz'): - with open(file_path, 'rb') as gz_file: - decompressed = gzip.decompress(gz_file.read()) - xml_data = decompressed.decode('utf-8') - else: - with open(file_path, 'r', encoding='utf-8') as xml_file: - xml_data = xml_file.read() + # Update progress to show file read starting + send_epg_update(source.id, "parsing_channels", 10) - # Update progress to show file read completed - send_epg_update(source.id, "parsing_channels", 25) - - root = ET.fromstring(xml_data) - channels = root.findall('channel') + # Stream parsing instead of loading entire file at once + is_gzipped = file_path.endswith('.gz') epgs_to_create = [] epgs_to_update = [] + total_channels = 0 + processed_channels = 0 + batch_size = 500 # Process in batches to limit memory usage - logger.info(f"Found {len(channels)} entries in {file_path}") + try: + # Create a parser with the desired options + parser = etree.XMLParser(huge_tree=True, remove_blank_text=True) - # Update progress to show parsing started - send_epg_update(source.id, "parsing_channels", 50) + # Count channels for progress reporting - use proper lxml approach + # Open the file first + source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') - total_channels = len(channels) - for i, channel_elem in enumerate(channels): - tvg_id = channel_elem.get('id', '').strip() - if not tvg_id: - continue # skip blank/invalid IDs + # Create an iterparse context without parser parameter + channel_finder = etree.iterparse(source_file, events=('end',), tag='channel') - display_name = channel_elem.findtext('display-name', default=tvg_id).strip() + # Count channels + total_channels = sum(1 for _ in channel_finder) - if tvg_id in existing_epgs: - epg_obj = existing_epgs[tvg_id] - if epg_obj.name != display_name: - epg_obj.name = display_name - epgs_to_update.append(epg_obj) - else: - epgs_to_create.append(EPGData( - tvg_id=tvg_id, - name=display_name, - epg_source=source, - )) + # Close the file to reset position + source_file.close() - # Send occasional progress updates - if i % 100 == 0 or i == total_channels - 1: - progress = 50 + int((i / total_channels) * 40) # Scale to 50-90% range - send_epg_update(source.id, "parsing_channels", progress) + # Update progress after counting + send_epg_update(source.id, "parsing_channels", 25, total_channels=total_channels) - # Update progress before database operations - send_epg_update(source.id, "parsing_channels", 90) + # Reset file position for actual processing + source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') + channel_parser = etree.iterparse(source_file, events=('end',), tag='channel') + for _, elem in channel_parser: + tvg_id = elem.get('id', '').strip() + if tvg_id: + display_name = None + for child in elem: + if child.tag == 'display-name' and child.text: + display_name = child.text.strip() + break + + if not display_name: + display_name = tvg_id + + if tvg_id in existing_epgs: + epg_obj = existing_epgs[tvg_id] + if epg_obj.name != display_name: + epg_obj.name = display_name + epgs_to_update.append(epg_obj) + else: + epgs_to_create.append(EPGData( + tvg_id=tvg_id, + name=display_name, + epg_source=source, + )) + + processed_channels += 1 + + # Batch processing + if len(epgs_to_create) >= batch_size: + EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) + epgs_to_create = [] + # Force garbage collection + gc.collect() + + if len(epgs_to_update) >= batch_size: + EPGData.objects.bulk_update(epgs_to_update, ["name"]) + epgs_to_update = [] + # Force garbage collection + gc.collect() + + # Send progress updates + if processed_channels % 100 == 0 or processed_channels == total_channels: + progress = 25 + int((processed_channels / total_channels) * 65) if total_channels > 0 else 90 + send_epg_update( + source.id, + "parsing_channels", + progress, + processed=processed_channels, + total=total_channels + ) + + # Clear memory + elem.clear() + while elem.getprevious() is not None: + del elem.getparent()[0] + + # Make sure to close the file + source_file.close() + + except (etree.XMLSyntaxError, Exception) as xml_error: + # Instead of falling back, just handle the error + logger.error(f"XML parsing failed: {xml_error}") + # Update status to error + source.status = 'error' + source.last_message = f"Error parsing XML file: {str(xml_error)}" + source.save(update_fields=['status', 'last_message']) + send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(xml_error)) + return False + + # Process any remaining items if epgs_to_create: EPGData.objects.bulk_create(epgs_to_create, ignore_conflicts=True) if epgs_to_update: EPGData.objects.bulk_update(epgs_to_update, ["name"]) + # Final garbage collection + gc.collect() + + # Update source status with channel count + source.status = 'success' + source.last_message = f"Successfully parsed {processed_channels} channels" + source.save(update_fields=['status', 'last_message']) + # Send completion notification - send_epg_update(source.id, "parsing_channels", 100, status="success") + send_epg_update( + source.id, + "parsing_channels", + 100, + status="success", + channels_count=processed_channels + ) channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( @@ -565,7 +658,7 @@ def parse_channels_only(source): } ) - logger.info("Finished parsing channel info.") + logger.info(f"Finished parsing channel info. Found {processed_channels} channels.") return True except FileNotFoundError: @@ -592,199 +685,183 @@ def parse_programs_for_tvg_id(epg_id): logger.info(f"Program parse for {epg_id} already in progress, skipping duplicate task") return "Task already running" - epg = EPGData.objects.get(id=epg_id) - epg_source = epg.epg_source + try: + epg = EPGData.objects.get(id=epg_id) + epg_source = epg.epg_source - if not Channel.objects.filter(epg_data=epg).exists(): - logger.info(f"No channels matched to EPG {epg.tvg_id}") - release_task_lock('parse_epg_programs', epg_id) - return - - logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}") - - # First, remove all existing programs - ProgramData.objects.filter(epg=epg).delete() - - file_path = epg_source.file_path - if not file_path: - file_path = epg_source.get_cache_file() - - # Check if the file exists - if not os.path.exists(file_path): - logger.error(f"EPG file not found at: {file_path}") - - # Update the file path in the database - new_path = epg_source.get_cache_file() - logger.info(f"Updating file_path from '{file_path}' to '{new_path}'") - epg_source.file_path = new_path - epg_source.save(update_fields=['file_path']) - - # Fetch new data before continuing - if epg_source.url: - logger.info(f"Fetching new EPG data from URL: {epg_source.url}") - # Properly check the return value from fetch_xmltv - fetch_success = fetch_xmltv(epg_source) - - # If fetch was not successful or the file still doesn't exist, abort - if not fetch_success: - logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}") - # Update status to error if not already set - epg_source.status = 'error' - epg_source.last_message = f"Failed to download EPG data, cannot parse programs" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file") - release_task_lock('parse_epg_programs', epg_id) - return - - # Also check if the file exists after download - if not os.path.exists(new_path): - logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}") - epg_source.status = 'error' - epg_source.last_message = f"Failed to download EPG data, file missing after download" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download") - release_task_lock('parse_epg_programs', epg_id) - return - else: - logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data") - # Update status to error - epg_source.status = 'error' - epg_source.last_message = f"No URL provided, cannot fetch EPG data" - epg_source.save(update_fields=['status', 'last_message']) - send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided") + if not Channel.objects.filter(epg_data=epg).exists(): + logger.info(f"No channels matched to EPG {epg.tvg_id}") release_task_lock('parse_epg_programs', epg_id) return - file_path = new_path + logger.info(f"Refreshing program data for tvg_id: {epg.tvg_id}") - # Read entire file (decompress if .gz) - try: - if file_path.endswith('.gz'): - with open(file_path, 'rb') as gz_file: - decompressed = gzip.decompress(gz_file.read()) - xml_data = decompressed.decode('utf-8') - else: - with open(file_path, 'r', encoding='utf-8') as xml_file: - xml_data = xml_file.read() - except FileNotFoundError: - logger.error(f"EPG file not found at: {file_path}") + # First, remove all existing programs - use chunked delete to avoid memory issues + chunk_size = 5000 + programs_to_delete = ProgramData.objects.filter(epg=epg) + total_programs = programs_to_delete.count() + + if total_programs > 0: + logger.info(f"Deleting {total_programs} existing programs for {epg.tvg_id}") + + # Get only the IDs to conserve memory + program_ids = list(programs_to_delete.values_list('id', flat=True)) + + # Delete in chunks using ID-based filtering + for i in range(0, len(program_ids), chunk_size): + chunk_ids = program_ids[i:i + chunk_size] + ProgramData.objects.filter(id__in=chunk_ids).delete() + gc.collect() # Force garbage collection after batch delete + + file_path = epg_source.file_path + if not file_path: + file_path = epg_source.get_cache_file() + + # Check if the file exists + if not os.path.exists(file_path): + logger.error(f"EPG file not found at: {file_path}") + + # Update the file path in the database + new_path = epg_source.get_cache_file() + logger.info(f"Updating file_path from '{file_path}' to '{new_path}'") + epg_source.file_path = new_path + epg_source.save(update_fields=['file_path']) + + # Fetch new data before continuing + if epg_source.url: + logger.info(f"Fetching new EPG data from URL: {epg_source.url}") + # Properly check the return value from fetch_xmltv + fetch_success = fetch_xmltv(epg_source) + + # If fetch was not successful or the file still doesn't exist, abort + if not fetch_success: + logger.error(f"Failed to fetch EPG data, cannot parse programs for tvg_id: {epg.tvg_id}") + # Update status to error if not already set + epg_source.status = 'error' + epg_source.last_message = f"Failed to download EPG data, cannot parse programs" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="Failed to download EPG file") + release_task_lock('parse_epg_programs', epg_id) + return + + # Also check if the file exists after download + if not os.path.exists(new_path): + logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}") + epg_source.status = 'error' + epg_source.last_message = f"Failed to download EPG data, file missing after download" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="File not found after download") + release_task_lock('parse_epg_programs', epg_id) + return + else: + logger.error(f"No URL provided for EPG source {epg_source.name}, cannot fetch new data") + # Update status to error + epg_source.status = 'error' + epg_source.last_message = f"No URL provided, cannot fetch EPG data" + epg_source.save(update_fields=['status', 'last_message']) + send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error="No URL provided") + release_task_lock('parse_epg_programs', epg_id) + return + + file_path = new_path + + # Use streaming parsing to reduce memory usage + is_gzipped = file_path.endswith('.gz') + + logger.info(f"Parsing programs for tvg_id={epg.tvg_id} from {file_path}") + + programs_to_create = [] + batch_size = 1000 # Process in batches to limit memory usage + + try: + # Create a parser with the desired options + parser = etree.XMLParser(huge_tree=True, remove_blank_text=True) + + # Open the file properly + source_file = gzip.open(file_path, 'rb') if is_gzipped else open(file_path, 'rb') + + # Stream parse the file using lxml's iterparse (without parser parameter) + program_parser = etree.iterparse(source_file, events=('end',), tag='programme') + + for _, elem in program_parser: + if elem.get('channel') == epg.tvg_id: + try: + start_time = parse_xmltv_time(elem.get('start')) + end_time = parse_xmltv_time(elem.get('stop')) + title = None + desc = None + sub_title = None + + # Efficiently process child elements + for child in elem: + if child.tag == 'title': + title = child.text or 'No Title' + elif child.tag == 'desc': + desc = child.text or '' + elif child.tag == 'sub-title': + sub_title = child.text or '' + + if not title: + title = 'No Title' + + # Extract custom properties + custom_props = extract_custom_properties(elem) + custom_properties_json = None + if custom_props: + try: + custom_properties_json = json.dumps(custom_props) + except Exception as e: + logger.error(f"Error serializing custom properties to JSON: {e}", exc_info=True) + + programs_to_create.append(ProgramData( + epg=epg, + start_time=start_time, + end_time=end_time, + title=title, + description=desc, + sub_title=sub_title, + tvg_id=epg.tvg_id, + custom_properties=custom_properties_json + )) + + # Batch processing + if len(programs_to_create) >= batch_size: + ProgramData.objects.bulk_create(programs_to_create) + logger.debug(f"Saved batch of {len(programs_to_create)} programs for {epg.tvg_id}") + programs_to_create = [] + # Force garbage collection after batch processing + gc.collect() + + except Exception as e: + logger.error(f"Error processing program for {epg.tvg_id}: {e}", exc_info=True) + + # Important: Clear the element to avoid memory leaks (lxml specific method) + elem.clear() + # Also eliminate ancestors to prevent memory leaks + while elem.getprevious() is not None: + del elem.getparent()[0] + + # Make sure to close the file + source_file.close() + + except etree.XMLSyntaxError as xml_error: + logger.error(f"XML syntax error parsing program data: {xml_error}") + raise + except Exception as e: + logger.error(f"Error parsing XML for programs: {e}", exc_info=True) + raise + + # Process any remaining items + if programs_to_create: + ProgramData.objects.bulk_create(programs_to_create) + logger.debug(f"Saved final batch of {len(programs_to_create)} programs for {epg.tvg_id}") + + # Final garbage collection + gc.collect() + + logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") + finally: release_task_lock('parse_epg_programs', epg_id) - return - except Exception as e: - logger.error(f"Error reading EPG file {file_path}: {e}", exc_info=True) - release_task_lock('parse_epg_programs', epg_id) - return - - root = ET.fromstring(xml_data) - - # Find only elements for this tvg_id - matched_programmes = [p for p in root.findall('programme') if p.get('channel') == epg.tvg_id] - logger.debug(f"Found {len(matched_programmes)} programmes for tvg_id={epg.tvg_id}") - - programs_to_create = [] - for prog in matched_programmes: - start_time = parse_xmltv_time(prog.get('start')) - end_time = parse_xmltv_time(prog.get('stop')) - title = prog.findtext('title', default='No Title') - desc = prog.findtext('desc', default='') - sub_title = prog.findtext('sub-title', default='') - - # Extract custom properties - custom_props = {} - - # Extract categories - categories = [] - for cat_elem in prog.findall('category'): - if cat_elem.text and cat_elem.text.strip(): - categories.append(cat_elem.text.strip()) - if categories: - custom_props['categories'] = categories - - # Extract episode numbers - for ep_num in prog.findall('episode-num'): - system = ep_num.get('system', '') - if system == 'xmltv_ns' and ep_num.text: - # Parse XMLTV episode-num format (season.episode.part) - parts = ep_num.text.split('.') - if len(parts) >= 2: - if parts[0].strip() != '': - try: - season = int(parts[0]) + 1 # XMLTV format is zero-based - custom_props['season'] = season - except ValueError: - pass - if parts[1].strip() != '': - try: - episode = int(parts[1]) + 1 # XMLTV format is zero-based - custom_props['episode'] = episode - except ValueError: - pass - elif system == 'onscreen' and ep_num.text: - # Just store the raw onscreen format - custom_props['onscreen_episode'] = ep_num.text.strip() - - # Extract ratings - for rating_elem in prog.findall('rating'): - if rating_elem.findtext('value'): - custom_props['rating'] = rating_elem.findtext('value').strip() - if rating_elem.get('system'): - custom_props['rating_system'] = rating_elem.get('system') - break # Just use the first rating - - # Extract credits (actors, directors, etc.) - credits_elem = prog.find('credits') - if credits_elem is not None: - credits = {} - for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']: - elements = credits_elem.findall(credit_type) - if elements: - names = [e.text.strip() for e in elements if e.text and e.text.strip()] - if names: - credits[credit_type] = names - if credits: - custom_props['credits'] = credits - - # Extract other common program metadata - if prog.findtext('date'): - custom_props['year'] = prog.findtext('date').strip()[:4] # Just the year part - - if prog.findtext('country'): - custom_props['country'] = prog.findtext('country').strip() - - for icon_elem in prog.findall('icon'): - if icon_elem.get('src'): - custom_props['icon'] = icon_elem.get('src') - break # Just use the first icon - - for kw in ['previously-shown', 'premiere', 'new']: - if prog.find(kw) is not None: - custom_props[kw.replace('-', '_')] = True - - # Convert custom_props to JSON string if not empty - custom_properties_json = None - if custom_props: - import json - try: - custom_properties_json = json.dumps(custom_props) - except Exception as e: - logger.error(f"Error serializing custom properties to JSON: {e}", exc_info=True) - - programs_to_create.append(ProgramData( - epg=epg, - start_time=start_time, - end_time=end_time, - title=title, - description=desc, - sub_title=sub_title, - tvg_id=epg.tvg_id, - custom_properties=custom_properties_json - )) - - ProgramData.objects.bulk_create(programs_to_create) - - release_task_lock('parse_epg_programs', epg_id) - - logger.info(f"Completed program parsing for tvg_id={epg.tvg_id}.") def parse_programs_for_source(epg_source, tvg_id=None): @@ -966,3 +1043,81 @@ def parse_schedules_direct_time(time_str): except Exception as e: logger.error(f"Error parsing Schedules Direct time '{time_str}': {e}", exc_info=True) raise + + +# Helper function to extract custom properties - moved to a separate function to clean up the code +def extract_custom_properties(prog): + custom_props = {} + + # Extract categories + categories = [] + for cat_elem in prog.findall('category'): + if cat_elem.text and cat_elem.text.strip(): + categories.append(cat_elem.text.strip()) + if categories: + custom_props['categories'] = categories + + # Extract episode numbers + for ep_num in prog.findall('episode-num'): + system = ep_num.get('system', '') + if system == 'xmltv_ns' and ep_num.text: + # Parse XMLTV episode-num format (season.episode.part) + parts = ep_num.text.split('.') + if len(parts) >= 2: + if parts[0].strip() != '': + try: + season = int(parts[0]) + 1 # XMLTV format is zero-based + custom_props['season'] = season + except ValueError: + pass + if parts[1].strip() != '': + try: + episode = int(parts[1]) + 1 # XMLTV format is zero-based + custom_props['episode'] = episode + except ValueError: + pass + elif system == 'onscreen' and ep_num.text: + # Just store the raw onscreen format + custom_props['onscreen_episode'] = ep_num.text.strip() + + # Extract ratings + for rating_elem in prog.findall('rating'): + value_elem = rating_elem.find('value') + if value_elem is not None and value_elem.text: + custom_props['rating'] = value_elem.text.strip() + if rating_elem.get('system'): + custom_props['rating_system'] = rating_elem.get('system') + break # Just use the first rating + + # Extract credits (actors, directors, etc.) + credits_elem = prog.find('credits') + if credits_elem is not None: + credits = {} + for credit_type in ['director', 'actor', 'writer', 'presenter', 'producer']: + elements = credits_elem.findall(credit_type) + if elements: + names = [e.text.strip() for e in elements if e.text and e.text.strip()] + if names: + credits[credit_type] = names + if credits: + custom_props['credits'] = credits + + # Extract other common program metadata + date_elem = prog.find('date') + if date_elem is not None and date_elem.text: + custom_props['year'] = date_elem.text.strip()[:4] # Just the year part + + country_elem = prog.find('country') + if country_elem is not None and country_elem.text: + custom_props['country'] = country_elem.text.strip() + + for icon_elem in prog.findall('icon'): + if icon_elem.get('src'): + custom_props['icon'] = icon_elem.get('src') + break # Just use the first icon + + for kw in ['previously-shown', 'premiere', 'new']: + if prog.find(kw) is not None: + custom_props[kw.replace('-', '_')] = True + + return custom_props diff --git a/core/utils.py b/core/utils.py index 6b5e6815..01463ad9 100644 --- a/core/utils.py +++ b/core/utils.py @@ -59,6 +59,10 @@ class RedisClient: client.config_set('save', '') # Disable RDB snapshots client.config_set('appendonly', 'no') # Disable AOF logging + # Set optimal memory settings + client.config_set('maxmemory-policy', 'allkeys-lru') # Use LRU eviction + client.config_set('maxmemory', '256mb') # Set reasonable memory limit + # Disable protected mode when in debug mode if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true': client.config_set('protected-mode', 'no') # Disable protected mode in debug @@ -178,3 +182,34 @@ def send_websocket_event(event, success, data): "data": {"success": True, "type": "epg_channels"} } ) + +# Add memory monitoring utilities +def get_memory_usage(): + """Returns current memory usage in MB""" + import psutil + process = psutil.Process(os.getpid()) + return process.memory_info().rss / (1024 * 1024) + +def monitor_memory_usage(func): + """Decorator to monitor memory usage before and after function execution""" + def wrapper(*args, **kwargs): + import gc + # Force garbage collection before measuring + gc.collect() + + # Get initial memory usage + start_mem = get_memory_usage() + logger.debug(f"Memory usage before {func.__name__}: {start_mem:.2f} MB") + + # Call the original function + result = func(*args, **kwargs) + + # Force garbage collection before measuring again + gc.collect() + + # Get final memory usage + end_mem = get_memory_usage() + logger.debug(f"Memory usage after {func.__name__}: {end_mem:.2f} MB (Change: {end_mem - start_mem:.2f} MB)") + + return result + return wrapper diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py index 02d04597..9eb7dd2b 100644 --- a/dispatcharr/settings.py +++ b/dispatcharr/settings.py @@ -43,6 +43,34 @@ INSTALLED_APPS = [ 'django_celery_beat', ] +# EPG Processing optimization settings +EPG_BATCH_SIZE = 1000 # Number of records to process in a batch +EPG_MEMORY_LIMIT = 512 # Memory limit in MB before forcing garbage collection +EPG_ENABLE_MEMORY_MONITORING = True # Whether to monitor memory usage during processing + +# Database optimization settings +DATABASE_STATEMENT_TIMEOUT = 300 # Seconds before timing out long-running queries +DATABASE_CONN_MAX_AGE = 60 # Connection max age in seconds, helps with frequent reconnects + +# Disable atomic requests for performance-sensitive views +ATOMIC_REQUESTS = False + +# Cache settings - add caching for EPG operations +CACHES = { + 'default': { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + 'LOCATION': 'dispatcharr-epg-cache', + 'TIMEOUT': 3600, # 1 hour cache timeout + 'OPTIONS': { + 'MAX_ENTRIES': 10000, + 'CULL_FREQUENCY': 3, # Purge 1/3 of entries when max is reached + } + } +} + +# Timeouts for external connections +REQUESTS_TIMEOUT = 30 # Seconds for external API requests + MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', diff --git a/requirements.txt b/requirements.txt index 7d7117f4..8810e336 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,3 +28,5 @@ channels channels-redis django-filter django-celery-beat +memory-profiler==0.61.0 +lxml==5.4.0