From f87ab4b07171fa098e0cbb17fd400b52b458cb43 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 22 May 2025 21:52:28 -0500 Subject: [PATCH] Rolled back some earlier memory omptimizations that were causing issues with extremely large m3us. --- apps/m3u/tasks.py | 148 +++++++----------------------------------- core/utils.py | 29 +++++++-- dispatcharr/celery.py | 2 +- 3 files changed, 45 insertions(+), 134 deletions(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 4fb6dfca..b1b1170d 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -22,11 +22,11 @@ from core.utils import RedisClient, acquire_task_lock, release_task_lock from core.models import CoreSettings, UserAgent from asgiref.sync import async_to_sync from core.xtream_codes import Client as XCClient +from core.utils import send_websocket_update logger = logging.getLogger(__name__) BATCH_SIZE = 1000 -SKIP_EXTS = {} m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u") def fetch_m3u_lines(account, use_cache=False): @@ -184,48 +184,22 @@ def parse_extinf_line(line: str) -> dict: - 'name': the value from tvg-name (if present) or the display name otherwise. """ if not line.startswith("#EXTINF:"): - logger.debug(f"Not an EXTINF line: {line[:50]}...") return None - content = line[len("#EXTINF:"):].strip() - logger.debug(f"Parsing EXTINF content: {content[:100]}...") - # Split on the first comma that is not inside quotes. parts = re.split(r',(?=(?:[^"]*"[^"]*")*[^"]*$)', content, maxsplit=1) if len(parts) != 2: - logger.warning(f"Invalid EXTINF format - couldn't split at comma: {content[:100]}...") return None - attributes_part, display_name = parts[0], parts[1].strip() - - # Debug raw attribute parsing - logger.debug(f"Attribute part: {attributes_part[:100]}...") - logger.debug(f"Display name: {display_name[:100]}...") - - # Extract attributes with more detailed logging - try: - attr_matches = re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part) - if not attr_matches: - logger.warning(f"No attributes found in: {attributes_part[:100]}...") - - attrs = dict(attr_matches) - logger.debug(f"Extracted attributes: {attrs}") - except Exception as e: - logger.error(f"Error parsing attributes: {str(e)}", exc_info=True) - attrs = {} - + attrs = dict(re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part)) # Use tvg-name attribute if available; otherwise, use the display name. name = attrs.get('tvg-name', display_name) - - result = { + return { 'attributes': attrs, 'display_name': display_name, 'name': name } - logger.debug(f"EXTINF parsed result: {result}") - return result - def _matches_filters(stream_name: str, group_name: str, filters): """Check if a stream or group name matches a precompiled regex filter.""" compiled_filters = [(re.compile(f.regex_pattern, re.IGNORECASE), f.exclude) for f in filters] @@ -266,7 +240,7 @@ def process_groups(account, groups): groups_to_create = [] for group_name, custom_props in groups.items(): logger.debug(f"Handling group: {group_name}") - if (group_name not in existing_groups) and (group_name not in SKIP_EXTS): + if (group_name not in existing_groups): groups_to_create.append(ChannelGroup( name=group_name, )) @@ -426,51 +400,24 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): """Processes a batch of M3U streams using bulk operations.""" account = M3UAccount.objects.get(id=account_id) - logger.debug(f"Processing batch of {len(batch)} streams for account {account_id}") - streams_to_create = [] streams_to_update = [] stream_hashes = {} - invalid_streams = [] - # Initialize these variables to prevent UnboundLocalError during cleanup - changed_streams = [] - unchanged_streams = [] - - # Log hash key configuration - logger.debug(f"Using hash keys: {hash_keys}") # compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters] logger.debug(f"Processing batch of {len(batch)}") - for stream_index, stream_info in enumerate(batch): + for stream_info in batch: try: - # Extract basic stream info with better error handling - try: - name = stream_info["name"] - url = stream_info["url"] - attrs = stream_info["attributes"] - tvg_id = attrs.get("tvg-id", "") - tvg_logo = attrs.get("tvg-logo", "") - group_title = attrs.get("group-title", "Default Group") - except KeyError as e: - logger.warning(f"Missing required field in stream {stream_index}: {e}") - logger.debug(f"Stream data: {stream_info}") - invalid_streams.append((stream_index, f"Missing field: {e}")) - continue + name, url = stream_info["name"], stream_info["url"] + tvg_id, tvg_logo = stream_info["attributes"].get("tvg-id", ""), stream_info["attributes"].get("tvg-logo", "") + group_title = stream_info["attributes"].get("group-title", "Default Group") # Filter out disabled groups for this account if group_title not in groups: - logger.debug(f"Skipping stream in disabled group: '{group_title}', name: '{name}'") - continue - - # Generate hash key with error handling - try: - stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) - logger.debug(f"Generated hash {stream_hash} for stream '{name}'") - except Exception as e: - logger.error(f"Error generating hash for stream '{name}': {str(e)}") - invalid_streams.append((stream_index, f"Hash generation error: {str(e)}")) + logger.debug(f"Skipping stream in disabled group: {group_title}") continue + stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) stream_props = { "name": name, "url": url, @@ -479,25 +426,14 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): "m3u_account": account, "channel_group_id": int(groups.get(group_title)), "stream_hash": stream_hash, - "custom_properties": json.dumps(attrs), + "custom_properties": json.dumps(stream_info["attributes"]), } if stream_hash not in stream_hashes: stream_hashes[stream_hash] = stream_props except Exception as e: - logger.error(f"Failed to process stream at index {stream_index}: {e}", exc_info=True) - if "name" in stream_info: - logger.error(f"Stream name: {stream_info['name']}") - logger.error(f"Stream data: {json.dumps(stream_info)[:500]}") - invalid_streams.append((stream_index, f"Processing error: {str(e)}")) - - # Log invalid stream summary - if invalid_streams: - logger.warning(f"Found {len(invalid_streams)} invalid streams in batch") - for i, (idx, error) in enumerate(invalid_streams[:5]): # Log first 5 - logger.warning(f"Invalid stream #{i+1} at index {idx}: {error}") - if len(invalid_streams) > 5: - logger.warning(f"... and {len(invalid_streams) - 5} more invalid streams") + logger.error(f"Failed to process stream {name}: {e}") + logger.error(json.dumps(stream_info)) existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())} @@ -554,9 +490,9 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." # Aggressive garbage collection - del streams_to_create, streams_to_update, stream_hashes, existing_streams, stream_props, invalid_streams, changed_streams, unchanged_streams - from core.utils import cleanup_memory - cleanup_memory(log_usage=True, force_collection=True) + #del streams_to_create, streams_to_update, stream_hashes, existing_streams + #from core.utils import cleanup_memory + #cleanup_memory(log_usage=True, force_collection=True) return retval @@ -1193,8 +1129,6 @@ def refresh_single_m3u_account(account_id): return f"Dispatched jobs complete." -from core.utils import send_websocket_update - def send_m3u_update(account_id, action, progress, **kwargs): # Start with the base data dictionary data = { @@ -1215,51 +1149,13 @@ def send_m3u_update(account_id, action, progress, **kwargs): except: pass # If account can't be retrieved, continue without these fields - # Add the additional key-value pairs from kwargs with size limiting - for key, value in kwargs.items(): - # Handle large arrays - limit to summary data - if isinstance(value, (list, tuple)) and len(value) > 100: - data[key] = f"{len(value)} items (truncated for performance)" - # Handle very large integers - use abbreviations for streams - elif key in ['streams_processed', 'streams_created', 'streams_updated', 'streams_deleted'] and isinstance(value, int) and value > 10000: - # Format as "226K" instead of 226154 - data[key] = f"{value//1000}K" if value >= 1000 else value - # Handle other large values that might be serialized to JSON - elif isinstance(value, (dict, object)) and key not in ['status', 'action', 'progress']: - try: - # Use a safer approach for complex objects - if hasattr(value, '__dict__'): - # Just store the class name and id if available - data[key] = f"{value.__class__.__name__}" - if hasattr(value, 'id'): - data[key] += f"(id={value.id})" - else: - # For dictionaries, limit based on size - data[key] = value - except: - # If we can't serialize, skip this value - data[key] = f"[Object of type {type(value).__name__}]" - else: - # Default case - add the value as is - data[key] = value + # Add the additional key-value pairs from kwargs + data.update(kwargs) - # Protect against message size limits in WebSocket protocol - # Most implementations limit to ~1MB, we'll be conservative - try: - # Use the standardized function with memory management - # Enable garbage collection for certain operations - collect_garbage = action == "parsing" and progress % 25 == 0 - - # Add extra garbage collection for large stream operations - if any(key in kwargs for key in ['streams_processed', 'streams_created', 'streams_updated']) and \ - any(isinstance(kwargs.get(key), int) and kwargs.get(key, 0) > 10000 - for key in ['streams_processed', 'streams_created', 'streams_updated']): - collect_garbage = True - - send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) - except Exception as e: - # Log the error but don't crash the process - logger.warning(f"Error sending WebSocket update: {e}") + # Use the standardized function with memory management + # Enable garbage collection for certain operations + collect_garbage = action == "parsing" and progress % 25 == 0 + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) # Explicitly clear data reference to help garbage collection data = None diff --git a/core/utils.py b/core/utils.py index fcff03e5..039b0695 100644 --- a/core/utils.py +++ b/core/utils.py @@ -59,9 +59,16 @@ class RedisClient: client.config_set('save', '') # Disable RDB snapshots client.config_set('appendonly', 'no') # Disable AOF logging - # Set optimal memory settings - client.config_set('maxmemory-policy', 'allkeys-lru') # Use LRU eviction - client.config_set('maxmemory', '256mb') # Set reasonable memory limit + # Set optimal memory settings with environment variable support + # Get max memory from environment or use a larger default (512MB instead of 256MB) + #max_memory = os.environ.get('REDIS_MAX_MEMORY', '512mb') + #eviction_policy = os.environ.get('REDIS_EVICTION_POLICY', 'allkeys-lru') + + # Apply memory settings + #client.config_set('maxmemory-policy', eviction_policy) + #client.config_set('maxmemory', max_memory) + + #logger.info(f"Redis configured with maxmemory={max_memory}, policy={eviction_policy}") # Disable protected mode when in debug mode if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true': @@ -69,10 +76,18 @@ class RedisClient: logger.warning("Redis protected mode disabled for debug environment") logger.trace("Redis persistence disabled for better performance") - except redis.exceptions.ResponseError: - # This might fail if Redis is configured to prohibit CONFIG command - # or if running in protected mode - that's okay - logger.error("Could not modify Redis persistence settings (may be restricted)") + except redis.exceptions.ResponseError as e: + # Improve error handling for Redis configuration errors + if "OOM" in str(e): + logger.error(f"Redis OOM during configuration: {e}") + # Try to increase maxmemory as an emergency measure + try: + client.config_set('maxmemory', '768mb') + logger.warning("Applied emergency Redis memory increase to 768MB") + except: + pass + else: + logger.error(f"Redis configuration error: {e}") logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") diff --git a/dispatcharr/celery.py b/dispatcharr/celery.py index 855acacd..8856d330 100644 --- a/dispatcharr/celery.py +++ b/dispatcharr/celery.py @@ -50,7 +50,7 @@ app.conf.update( ) # Add memory cleanup after task completion -@task_postrun.connect # Use the imported signal +#@task_postrun.connect # Use the imported signal def cleanup_task_memory(**kwargs): """Clean up memory after each task completes""" # Get task name from kwargs