forked from Mirrors/Dispatcharr
Rolled back some earlier memory omptimizations that were causing issues with extremely large m3us.
This commit is contained in:
parent
925850a012
commit
f87ab4b071
3 changed files with 45 additions and 134 deletions
|
|
@ -22,11 +22,11 @@ from core.utils import RedisClient, acquire_task_lock, release_task_lock
|
|||
from core.models import CoreSettings, UserAgent
|
||||
from asgiref.sync import async_to_sync
|
||||
from core.xtream_codes import Client as XCClient
|
||||
from core.utils import send_websocket_update
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BATCH_SIZE = 1000
|
||||
SKIP_EXTS = {}
|
||||
m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u")
|
||||
|
||||
def fetch_m3u_lines(account, use_cache=False):
|
||||
|
|
@ -184,48 +184,22 @@ def parse_extinf_line(line: str) -> dict:
|
|||
- 'name': the value from tvg-name (if present) or the display name otherwise.
|
||||
"""
|
||||
if not line.startswith("#EXTINF:"):
|
||||
logger.debug(f"Not an EXTINF line: {line[:50]}...")
|
||||
return None
|
||||
|
||||
content = line[len("#EXTINF:"):].strip()
|
||||
logger.debug(f"Parsing EXTINF content: {content[:100]}...")
|
||||
|
||||
# Split on the first comma that is not inside quotes.
|
||||
parts = re.split(r',(?=(?:[^"]*"[^"]*")*[^"]*$)', content, maxsplit=1)
|
||||
if len(parts) != 2:
|
||||
logger.warning(f"Invalid EXTINF format - couldn't split at comma: {content[:100]}...")
|
||||
return None
|
||||
|
||||
attributes_part, display_name = parts[0], parts[1].strip()
|
||||
|
||||
# Debug raw attribute parsing
|
||||
logger.debug(f"Attribute part: {attributes_part[:100]}...")
|
||||
logger.debug(f"Display name: {display_name[:100]}...")
|
||||
|
||||
# Extract attributes with more detailed logging
|
||||
try:
|
||||
attr_matches = re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part)
|
||||
if not attr_matches:
|
||||
logger.warning(f"No attributes found in: {attributes_part[:100]}...")
|
||||
|
||||
attrs = dict(attr_matches)
|
||||
logger.debug(f"Extracted attributes: {attrs}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing attributes: {str(e)}", exc_info=True)
|
||||
attrs = {}
|
||||
|
||||
attrs = dict(re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part))
|
||||
# Use tvg-name attribute if available; otherwise, use the display name.
|
||||
name = attrs.get('tvg-name', display_name)
|
||||
|
||||
result = {
|
||||
return {
|
||||
'attributes': attrs,
|
||||
'display_name': display_name,
|
||||
'name': name
|
||||
}
|
||||
|
||||
logger.debug(f"EXTINF parsed result: {result}")
|
||||
return result
|
||||
|
||||
def _matches_filters(stream_name: str, group_name: str, filters):
|
||||
"""Check if a stream or group name matches a precompiled regex filter."""
|
||||
compiled_filters = [(re.compile(f.regex_pattern, re.IGNORECASE), f.exclude) for f in filters]
|
||||
|
|
@ -266,7 +240,7 @@ def process_groups(account, groups):
|
|||
groups_to_create = []
|
||||
for group_name, custom_props in groups.items():
|
||||
logger.debug(f"Handling group: {group_name}")
|
||||
if (group_name not in existing_groups) and (group_name not in SKIP_EXTS):
|
||||
if (group_name not in existing_groups):
|
||||
groups_to_create.append(ChannelGroup(
|
||||
name=group_name,
|
||||
))
|
||||
|
|
@ -426,51 +400,24 @@ def process_m3u_batch(account_id, batch, groups, hash_keys):
|
|||
"""Processes a batch of M3U streams using bulk operations."""
|
||||
account = M3UAccount.objects.get(id=account_id)
|
||||
|
||||
logger.debug(f"Processing batch of {len(batch)} streams for account {account_id}")
|
||||
|
||||
streams_to_create = []
|
||||
streams_to_update = []
|
||||
stream_hashes = {}
|
||||
invalid_streams = []
|
||||
# Initialize these variables to prevent UnboundLocalError during cleanup
|
||||
changed_streams = []
|
||||
unchanged_streams = []
|
||||
|
||||
# Log hash key configuration
|
||||
logger.debug(f"Using hash keys: {hash_keys}")
|
||||
|
||||
# compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters]
|
||||
logger.debug(f"Processing batch of {len(batch)}")
|
||||
for stream_index, stream_info in enumerate(batch):
|
||||
for stream_info in batch:
|
||||
try:
|
||||
# Extract basic stream info with better error handling
|
||||
try:
|
||||
name = stream_info["name"]
|
||||
url = stream_info["url"]
|
||||
attrs = stream_info["attributes"]
|
||||
tvg_id = attrs.get("tvg-id", "")
|
||||
tvg_logo = attrs.get("tvg-logo", "")
|
||||
group_title = attrs.get("group-title", "Default Group")
|
||||
except KeyError as e:
|
||||
logger.warning(f"Missing required field in stream {stream_index}: {e}")
|
||||
logger.debug(f"Stream data: {stream_info}")
|
||||
invalid_streams.append((stream_index, f"Missing field: {e}"))
|
||||
continue
|
||||
name, url = stream_info["name"], stream_info["url"]
|
||||
tvg_id, tvg_logo = stream_info["attributes"].get("tvg-id", ""), stream_info["attributes"].get("tvg-logo", "")
|
||||
group_title = stream_info["attributes"].get("group-title", "Default Group")
|
||||
|
||||
# Filter out disabled groups for this account
|
||||
if group_title not in groups:
|
||||
logger.debug(f"Skipping stream in disabled group: '{group_title}', name: '{name}'")
|
||||
continue
|
||||
|
||||
# Generate hash key with error handling
|
||||
try:
|
||||
stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys)
|
||||
logger.debug(f"Generated hash {stream_hash} for stream '{name}'")
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating hash for stream '{name}': {str(e)}")
|
||||
invalid_streams.append((stream_index, f"Hash generation error: {str(e)}"))
|
||||
logger.debug(f"Skipping stream in disabled group: {group_title}")
|
||||
continue
|
||||
|
||||
stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys)
|
||||
stream_props = {
|
||||
"name": name,
|
||||
"url": url,
|
||||
|
|
@ -479,25 +426,14 @@ def process_m3u_batch(account_id, batch, groups, hash_keys):
|
|||
"m3u_account": account,
|
||||
"channel_group_id": int(groups.get(group_title)),
|
||||
"stream_hash": stream_hash,
|
||||
"custom_properties": json.dumps(attrs),
|
||||
"custom_properties": json.dumps(stream_info["attributes"]),
|
||||
}
|
||||
|
||||
if stream_hash not in stream_hashes:
|
||||
stream_hashes[stream_hash] = stream_props
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process stream at index {stream_index}: {e}", exc_info=True)
|
||||
if "name" in stream_info:
|
||||
logger.error(f"Stream name: {stream_info['name']}")
|
||||
logger.error(f"Stream data: {json.dumps(stream_info)[:500]}")
|
||||
invalid_streams.append((stream_index, f"Processing error: {str(e)}"))
|
||||
|
||||
# Log invalid stream summary
|
||||
if invalid_streams:
|
||||
logger.warning(f"Found {len(invalid_streams)} invalid streams in batch")
|
||||
for i, (idx, error) in enumerate(invalid_streams[:5]): # Log first 5
|
||||
logger.warning(f"Invalid stream #{i+1} at index {idx}: {error}")
|
||||
if len(invalid_streams) > 5:
|
||||
logger.warning(f"... and {len(invalid_streams) - 5} more invalid streams")
|
||||
logger.error(f"Failed to process stream {name}: {e}")
|
||||
logger.error(json.dumps(stream_info))
|
||||
|
||||
existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())}
|
||||
|
||||
|
|
@ -554,9 +490,9 @@ def process_m3u_batch(account_id, batch, groups, hash_keys):
|
|||
retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
|
||||
|
||||
# Aggressive garbage collection
|
||||
del streams_to_create, streams_to_update, stream_hashes, existing_streams, stream_props, invalid_streams, changed_streams, unchanged_streams
|
||||
from core.utils import cleanup_memory
|
||||
cleanup_memory(log_usage=True, force_collection=True)
|
||||
#del streams_to_create, streams_to_update, stream_hashes, existing_streams
|
||||
#from core.utils import cleanup_memory
|
||||
#cleanup_memory(log_usage=True, force_collection=True)
|
||||
|
||||
return retval
|
||||
|
||||
|
|
@ -1193,8 +1129,6 @@ def refresh_single_m3u_account(account_id):
|
|||
|
||||
return f"Dispatched jobs complete."
|
||||
|
||||
from core.utils import send_websocket_update
|
||||
|
||||
def send_m3u_update(account_id, action, progress, **kwargs):
|
||||
# Start with the base data dictionary
|
||||
data = {
|
||||
|
|
@ -1215,51 +1149,13 @@ def send_m3u_update(account_id, action, progress, **kwargs):
|
|||
except:
|
||||
pass # If account can't be retrieved, continue without these fields
|
||||
|
||||
# Add the additional key-value pairs from kwargs with size limiting
|
||||
for key, value in kwargs.items():
|
||||
# Handle large arrays - limit to summary data
|
||||
if isinstance(value, (list, tuple)) and len(value) > 100:
|
||||
data[key] = f"{len(value)} items (truncated for performance)"
|
||||
# Handle very large integers - use abbreviations for streams
|
||||
elif key in ['streams_processed', 'streams_created', 'streams_updated', 'streams_deleted'] and isinstance(value, int) and value > 10000:
|
||||
# Format as "226K" instead of 226154
|
||||
data[key] = f"{value//1000}K" if value >= 1000 else value
|
||||
# Handle other large values that might be serialized to JSON
|
||||
elif isinstance(value, (dict, object)) and key not in ['status', 'action', 'progress']:
|
||||
try:
|
||||
# Use a safer approach for complex objects
|
||||
if hasattr(value, '__dict__'):
|
||||
# Just store the class name and id if available
|
||||
data[key] = f"{value.__class__.__name__}"
|
||||
if hasattr(value, 'id'):
|
||||
data[key] += f"(id={value.id})"
|
||||
else:
|
||||
# For dictionaries, limit based on size
|
||||
data[key] = value
|
||||
except:
|
||||
# If we can't serialize, skip this value
|
||||
data[key] = f"[Object of type {type(value).__name__}]"
|
||||
else:
|
||||
# Default case - add the value as is
|
||||
data[key] = value
|
||||
# Add the additional key-value pairs from kwargs
|
||||
data.update(kwargs)
|
||||
|
||||
# Protect against message size limits in WebSocket protocol
|
||||
# Most implementations limit to ~1MB, we'll be conservative
|
||||
try:
|
||||
# Use the standardized function with memory management
|
||||
# Enable garbage collection for certain operations
|
||||
collect_garbage = action == "parsing" and progress % 25 == 0
|
||||
|
||||
# Add extra garbage collection for large stream operations
|
||||
if any(key in kwargs for key in ['streams_processed', 'streams_created', 'streams_updated']) and \
|
||||
any(isinstance(kwargs.get(key), int) and kwargs.get(key, 0) > 10000
|
||||
for key in ['streams_processed', 'streams_created', 'streams_updated']):
|
||||
collect_garbage = True
|
||||
|
||||
send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage)
|
||||
except Exception as e:
|
||||
# Log the error but don't crash the process
|
||||
logger.warning(f"Error sending WebSocket update: {e}")
|
||||
# Use the standardized function with memory management
|
||||
# Enable garbage collection for certain operations
|
||||
collect_garbage = action == "parsing" and progress % 25 == 0
|
||||
send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage)
|
||||
|
||||
# Explicitly clear data reference to help garbage collection
|
||||
data = None
|
||||
|
|
|
|||
|
|
@ -59,9 +59,16 @@ class RedisClient:
|
|||
client.config_set('save', '') # Disable RDB snapshots
|
||||
client.config_set('appendonly', 'no') # Disable AOF logging
|
||||
|
||||
# Set optimal memory settings
|
||||
client.config_set('maxmemory-policy', 'allkeys-lru') # Use LRU eviction
|
||||
client.config_set('maxmemory', '256mb') # Set reasonable memory limit
|
||||
# Set optimal memory settings with environment variable support
|
||||
# Get max memory from environment or use a larger default (512MB instead of 256MB)
|
||||
#max_memory = os.environ.get('REDIS_MAX_MEMORY', '512mb')
|
||||
#eviction_policy = os.environ.get('REDIS_EVICTION_POLICY', 'allkeys-lru')
|
||||
|
||||
# Apply memory settings
|
||||
#client.config_set('maxmemory-policy', eviction_policy)
|
||||
#client.config_set('maxmemory', max_memory)
|
||||
|
||||
#logger.info(f"Redis configured with maxmemory={max_memory}, policy={eviction_policy}")
|
||||
|
||||
# Disable protected mode when in debug mode
|
||||
if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true':
|
||||
|
|
@ -69,10 +76,18 @@ class RedisClient:
|
|||
logger.warning("Redis protected mode disabled for debug environment")
|
||||
|
||||
logger.trace("Redis persistence disabled for better performance")
|
||||
except redis.exceptions.ResponseError:
|
||||
# This might fail if Redis is configured to prohibit CONFIG command
|
||||
# or if running in protected mode - that's okay
|
||||
logger.error("Could not modify Redis persistence settings (may be restricted)")
|
||||
except redis.exceptions.ResponseError as e:
|
||||
# Improve error handling for Redis configuration errors
|
||||
if "OOM" in str(e):
|
||||
logger.error(f"Redis OOM during configuration: {e}")
|
||||
# Try to increase maxmemory as an emergency measure
|
||||
try:
|
||||
client.config_set('maxmemory', '768mb')
|
||||
logger.warning("Applied emergency Redis memory increase to 768MB")
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
logger.error(f"Redis configuration error: {e}")
|
||||
|
||||
logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}")
|
||||
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ app.conf.update(
|
|||
)
|
||||
|
||||
# Add memory cleanup after task completion
|
||||
@task_postrun.connect # Use the imported signal
|
||||
#@task_postrun.connect # Use the imported signal
|
||||
def cleanup_task_memory(**kwargs):
|
||||
"""Clean up memory after each task completes"""
|
||||
# Get task name from kwargs
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue