From 6de565857db0deaff8649312b18b0186369f0146 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Wed, 21 May 2025 14:59:24 -0500 Subject: [PATCH 01/11] Set default refresh interval for files added via mapping to 0 since they will auto-update when modified. --- .../0013_alter_epgsource_refresh_interval.py | 18 ++++++++++++++++++ apps/epg/models.py | 2 +- .../0012_alter_m3uaccount_refresh_interval.py | 18 ++++++++++++++++++ apps/m3u/models.py | 2 +- 4 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 apps/epg/migrations/0013_alter_epgsource_refresh_interval.py create mode 100644 apps/m3u/migrations/0012_alter_m3uaccount_refresh_interval.py diff --git a/apps/epg/migrations/0013_alter_epgsource_refresh_interval.py b/apps/epg/migrations/0013_alter_epgsource_refresh_interval.py new file mode 100644 index 00000000..64be2c3c --- /dev/null +++ b/apps/epg/migrations/0013_alter_epgsource_refresh_interval.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.6 on 2025-05-21 19:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('epg', '0012_alter_epgsource_status'), + ] + + operations = [ + migrations.AlterField( + model_name='epgsource', + name='refresh_interval', + field=models.IntegerField(default=0), + ), + ] diff --git a/apps/epg/models.py b/apps/epg/models.py index ed8f2708..dce4e21b 100644 --- a/apps/epg/models.py +++ b/apps/epg/models.py @@ -32,7 +32,7 @@ class EPGSource(models.Model): api_key = models.CharField(max_length=255, blank=True, null=True) # For Schedules Direct is_active = models.BooleanField(default=True) file_path = models.CharField(max_length=1024, blank=True, null=True) - refresh_interval = models.IntegerField(default=24) + refresh_interval = models.IntegerField(default=0) refresh_task = models.ForeignKey( PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True ) diff --git a/apps/m3u/migrations/0012_alter_m3uaccount_refresh_interval.py b/apps/m3u/migrations/0012_alter_m3uaccount_refresh_interval.py new file mode 100644 index 00000000..7045810e --- /dev/null +++ b/apps/m3u/migrations/0012_alter_m3uaccount_refresh_interval.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.6 on 2025-05-21 19:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('m3u', '0011_alter_m3uaccount_status'), + ] + + operations = [ + migrations.AlterField( + model_name='m3uaccount', + name='refresh_interval', + field=models.IntegerField(default=0), + ), + ] diff --git a/apps/m3u/models.py b/apps/m3u/models.py index 503ac3da..a297fd18 100644 --- a/apps/m3u/models.py +++ b/apps/m3u/models.py @@ -96,7 +96,7 @@ class M3UAccount(models.Model): username = models.CharField(max_length=255, null=True, blank=True) password = models.CharField(max_length=255, null=True, blank=True) custom_properties = models.TextField(null=True, blank=True) - refresh_interval = models.IntegerField(default=24) + refresh_interval = models.IntegerField(default=0) refresh_task = models.ForeignKey( PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True ) From e816fa6afd5243578a4847d599c7ac5fc15dd98d Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Wed, 21 May 2025 15:53:52 -0500 Subject: [PATCH 02/11] Update epg form to be a closer match to m3u form. --- frontend/src/components/forms/EPG.jsx | 202 +++++++++++++++----------- 1 file changed, 121 insertions(+), 81 deletions(-) diff --git a/frontend/src/components/forms/EPG.jsx b/frontend/src/components/forms/EPG.jsx index b886dccf..0c7f78c0 100644 --- a/frontend/src/components/forms/EPG.jsx +++ b/frontend/src/components/forms/EPG.jsx @@ -12,19 +12,22 @@ import { NativeSelect, NumberInput, Space, + Grid, + Group, + FileInput, + Title, + Text, + Divider, + Stack, + Box, } from '@mantine/core'; import { isNotEmpty, useForm } from '@mantine/form'; +import { IconUpload } from '@tabler/icons-react'; const EPG = ({ epg = null, isOpen, onClose }) => { const epgs = useEPGsStore((state) => state.epgs); - const [file, setFile] = useState(null); - - const handleFileChange = (e) => { - const file = e.target.files[0]; - if (file) { - setFile(file); - } - }; + // Remove the file state and handler since we're not supporting file uploads + const [sourceType, setSourceType] = useState('xmltv'); const form = useForm({ mode: 'uncontrolled', @@ -47,114 +50,151 @@ const EPG = ({ epg = null, isOpen, onClose }) => { const values = form.getValues(); if (epg?.id) { - await API.updateEPG({ id: epg.id, ...values, file }); + // Remove file from API call + await API.updateEPG({ id: epg.id, ...values }); } else { + // Remove file from API call await API.addEPG({ ...values, - file, }); } form.reset(); - setFile(null); onClose(); }; useEffect(() => { if (epg) { - form.setValues({ + const values = { name: epg.name, source_type: epg.source_type, url: epg.url, api_key: epg.api_key, is_active: epg.is_active, refresh_interval: epg.refresh_interval, - }); + }; + form.setValues(values); + setSourceType(epg.source_type); // Update source type state } else { form.reset(); + setSourceType('xmltv'); // Reset to xmltv } }, [epg]); + // Function to handle source type changes + const handleSourceTypeChange = (value) => { + form.setFieldValue('source_type', value); + setSourceType(value); + }; + if (!isOpen) { return <>; } return ( - +
- + + {/* Left Column */} + + - + handleSourceTypeChange(event.currentTarget.value)} + /> - + + - + - How often to automatically refresh EPG data
- (0 to disable automatic refreshes)} - {...form.getInputProps('refresh_interval')} - key={form.key('refresh_interval')} - /> + {/* Right Column */} + + - + - - - + {/* Put checkbox at the same level as Refresh Interval */} + + Status + When enabled, this EPG source will auto update. + + + + + +
+ + {/* Full Width Section */} + + + + + + + +
); From 0fcab93ac36b1e7ea29fd714563c6b797ee787a7 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Wed, 21 May 2025 16:28:15 -0500 Subject: [PATCH 03/11] Enhance WebSocket update handling by limiting data size and improving garbage collection for large operations --- apps/m3u/tasks.py | 50 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 4a1f2645..fb7969c6 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -1112,13 +1112,51 @@ def send_m3u_update(account_id, action, progress, **kwargs): except: pass # If account can't be retrieved, continue without these fields - # Add the additional key-value pairs from kwargs - data.update(kwargs) + # Add the additional key-value pairs from kwargs with size limiting + for key, value in kwargs.items(): + # Handle large arrays - limit to summary data + if isinstance(value, (list, tuple)) and len(value) > 100: + data[key] = f"{len(value)} items (truncated for performance)" + # Handle very large integers - use abbreviations for streams + elif key in ['streams_processed', 'streams_created', 'streams_updated', 'streams_deleted'] and isinstance(value, int) and value > 10000: + # Format as "226K" instead of 226154 + data[key] = f"{value//1000}K" if value >= 1000 else value + # Handle other large values that might be serialized to JSON + elif isinstance(value, (dict, object)) and key not in ['status', 'action', 'progress']: + try: + # Use a safer approach for complex objects + if hasattr(value, '__dict__'): + # Just store the class name and id if available + data[key] = f"{value.__class__.__name__}" + if hasattr(value, 'id'): + data[key] += f"(id={value.id})" + else: + # For dictionaries, limit based on size + data[key] = value + except: + # If we can't serialize, skip this value + data[key] = f"[Object of type {type(value).__name__}]" + else: + # Default case - add the value as is + data[key] = value - # Use the standardized function with memory management - # Enable garbage collection for certain operations - collect_garbage = action == "parsing" and progress % 25 == 0 - send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) + # Protect against message size limits in WebSocket protocol + # Most implementations limit to ~1MB, we'll be conservative + try: + # Use the standardized function with memory management + # Enable garbage collection for certain operations + collect_garbage = action == "parsing" and progress % 25 == 0 + + # Add extra garbage collection for large stream operations + if any(key in kwargs for key in ['streams_processed', 'streams_created', 'streams_updated']) and \ + any(isinstance(kwargs.get(key), int) and kwargs.get(key, 0) > 10000 + for key in ['streams_processed', 'streams_created', 'streams_updated']): + collect_garbage = True + + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) + except Exception as e: + # Log the error but don't crash the process + logger.warning(f"Error sending WebSocket update: {e}") # Explicitly clear data reference to help garbage collection data = None From 57298eb81140d30eb5bd26daf0b42a147da14175 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Wed, 21 May 2025 16:47:00 -0500 Subject: [PATCH 04/11] Added some padding for parsing status messaging. --- frontend/src/components/tables/M3UsTable.jsx | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/frontend/src/components/tables/M3UsTable.jsx b/frontend/src/components/tables/M3UsTable.jsx index 0c5b2b4d..cd30fbaf 100644 --- a/frontend/src/components/tables/M3UsTable.jsx +++ b/frontend/src/components/tables/M3UsTable.jsx @@ -210,24 +210,24 @@ const M3UTable = () => { - Parsing: + Parsing: {parseInt(data.progress)}% {data.elapsed_time && ( - Elapsed: + Elapsed: {elapsedTime} )} {data.time_remaining && ( - Remaining: + Remaining: {timeRemaining} )} {data.streams_processed && ( - Streams: + Streams: {data.streams_processed} )} From 448f9bc6cf5641e775d81baccf32860fa412a1c9 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 22 May 2025 09:46:34 -0500 Subject: [PATCH 05/11] Decreased line height for status messages to look better on smaller screens. --- frontend/src/components/tables/M3UsTable.jsx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frontend/src/components/tables/M3UsTable.jsx b/frontend/src/components/tables/M3UsTable.jsx index cd30fbaf..261e3984 100644 --- a/frontend/src/components/tables/M3UsTable.jsx +++ b/frontend/src/components/tables/M3UsTable.jsx @@ -424,7 +424,7 @@ const M3UTable = () => { if (data.status === 'success') { return ( - + {value} @@ -434,7 +434,7 @@ const M3UTable = () => { // For all other status values, just use dimmed text return ( - + {value} From 48e76273d1a22a81ffde49b813c0a76fb6c1634d Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 22 May 2025 10:23:59 -0500 Subject: [PATCH 06/11] Attempt at fixing timezone issues. --- apps/epg/tasks.py | 45 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/apps/epg/tasks.py b/apps/epg/tasks.py index 71e468bd..f3f281e7 100644 --- a/apps/epg/tasks.py +++ b/apps/epg/tasks.py @@ -1445,17 +1445,42 @@ def fetch_schedules_direct(source): # ------------------------------- def parse_xmltv_time(time_str): try: + # Basic format validation + if len(time_str) < 14: + logger.warning(f"XMLTV timestamp too short: '{time_str}', using as-is") + dt_obj = datetime.strptime(time_str, '%Y%m%d%H%M%S') + return timezone.make_aware(dt_obj, timezone=dt_timezone.utc) + + # Parse base datetime dt_obj = datetime.strptime(time_str[:14], '%Y%m%d%H%M%S') - tz_sign = time_str[15] - tz_hours = int(time_str[16:18]) - tz_minutes = int(time_str[18:20]) - if tz_sign == '+': - dt_obj = dt_obj - timedelta(hours=tz_hours, minutes=tz_minutes) - elif tz_sign == '-': - dt_obj = dt_obj + timedelta(hours=tz_hours, minutes=tz_minutes) - aware_dt = timezone.make_aware(dt_obj, timezone=dt_timezone.utc) - logger.trace(f"Parsed XMLTV time '{time_str}' to {aware_dt}") - return aware_dt + + # Handle timezone if present + if len(time_str) >= 20: # Has timezone info + tz_sign = time_str[15] + tz_hours = int(time_str[16:18]) + tz_minutes = int(time_str[18:20]) + + # Create a timezone object + if tz_sign == '+': + tz_offset = dt_timezone(timedelta(hours=tz_hours, minutes=tz_minutes)) + elif tz_sign == '-': + tz_offset = dt_timezone(timedelta(hours=-tz_hours, minutes=-tz_minutes)) + else: + tz_offset = dt_timezone.utc + + # Make datetime aware with correct timezone + aware_dt = datetime.replace(dt_obj, tzinfo=tz_offset) + # Convert to UTC + aware_dt = aware_dt.astimezone(dt_timezone.utc) + + logger.trace(f"Parsed XMLTV time '{time_str}' to {aware_dt}") + return aware_dt + else: + # No timezone info, assume UTC + aware_dt = timezone.make_aware(dt_obj, timezone=dt_timezone.utc) + logger.trace(f"Parsed XMLTV time without timezone '{time_str}' as UTC: {aware_dt}") + return aware_dt + except Exception as e: logger.error(f"Error parsing XMLTV time '{time_str}': {e}", exc_info=True) raise From d01a69828a20dad53a2d21854405ac0b8bd93c2c Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 22 May 2025 11:58:44 -0500 Subject: [PATCH 07/11] Added additional logging for m3u processing --- apps/m3u/tasks.py | 130 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 117 insertions(+), 13 deletions(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index fb7969c6..39622024 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -184,22 +184,48 @@ def parse_extinf_line(line: str) -> dict: - 'name': the value from tvg-name (if present) or the display name otherwise. """ if not line.startswith("#EXTINF:"): + logger.debug(f"Not an EXTINF line: {line[:50]}...") return None + content = line[len("#EXTINF:"):].strip() + logger.debug(f"Parsing EXTINF content: {content[:100]}...") + # Split on the first comma that is not inside quotes. parts = re.split(r',(?=(?:[^"]*"[^"]*")*[^"]*$)', content, maxsplit=1) if len(parts) != 2: + logger.warning(f"Invalid EXTINF format - couldn't split at comma: {content[:100]}...") return None + attributes_part, display_name = parts[0], parts[1].strip() - attrs = dict(re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part)) + + # Debug raw attribute parsing + logger.debug(f"Attribute part: {attributes_part[:100]}...") + logger.debug(f"Display name: {display_name[:100]}...") + + # Extract attributes with more detailed logging + try: + attr_matches = re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part) + if not attr_matches: + logger.warning(f"No attributes found in: {attributes_part[:100]}...") + + attrs = dict(attr_matches) + logger.debug(f"Extracted attributes: {attrs}") + except Exception as e: + logger.error(f"Error parsing attributes: {str(e)}", exc_info=True) + attrs = {} + # Use tvg-name attribute if available; otherwise, use the display name. name = attrs.get('tvg-name', display_name) - return { + + result = { 'attributes': attrs, 'display_name': display_name, 'name': name } + logger.debug(f"EXTINF parsed result: {result}") + return result + import re import logging @@ -405,24 +431,48 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): """Processes a batch of M3U streams using bulk operations.""" account = M3UAccount.objects.get(id=account_id) + logger.debug(f"Processing batch of {len(batch)} streams for account {account_id}") + streams_to_create = [] streams_to_update = [] stream_hashes = {} + invalid_streams = [] + + # Log hash key configuration + logger.debug(f"Using hash keys: {hash_keys}") # compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters] logger.debug(f"Processing batch of {len(batch)}") - for stream_info in batch: + for stream_index, stream_info in enumerate(batch): try: - name, url = stream_info["name"], stream_info["url"] - tvg_id, tvg_logo = stream_info["attributes"].get("tvg-id", ""), stream_info["attributes"].get("tvg-logo", "") - group_title = stream_info["attributes"].get("group-title", "Default Group") + # Extract basic stream info with better error handling + try: + name = stream_info["name"] + url = stream_info["url"] + attrs = stream_info["attributes"] + tvg_id = attrs.get("tvg-id", "") + tvg_logo = attrs.get("tvg-logo", "") + group_title = attrs.get("group-title", "Default Group") + except KeyError as e: + logger.warning(f"Missing required field in stream {stream_index}: {e}") + logger.debug(f"Stream data: {stream_info}") + invalid_streams.append((stream_index, f"Missing field: {e}")) + continue # Filter out disabled groups for this account if group_title not in groups: - logger.debug(f"Skipping stream in disabled group: {group_title}") + logger.debug(f"Skipping stream in disabled group: '{group_title}', name: '{name}'") + continue + + # Generate hash key with error handling + try: + stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) + logger.debug(f"Generated hash {stream_hash} for stream '{name}'") + except Exception as e: + logger.error(f"Error generating hash for stream '{name}': {str(e)}") + invalid_streams.append((stream_index, f"Hash generation error: {str(e)}")) continue - stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) stream_props = { "name": name, "url": url, @@ -431,14 +481,25 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): "m3u_account": account, "channel_group_id": int(groups.get(group_title)), "stream_hash": stream_hash, - "custom_properties": json.dumps(stream_info["attributes"]), + "custom_properties": json.dumps(attrs), } if stream_hash not in stream_hashes: stream_hashes[stream_hash] = stream_props except Exception as e: - logger.error(f"Failed to process stream {name}: {e}") - logger.error(json.dumps(stream_info)) + logger.error(f"Failed to process stream at index {stream_index}: {e}", exc_info=True) + if "name" in stream_info: + logger.error(f"Stream name: {stream_info['name']}") + logger.error(f"Stream data: {json.dumps(stream_info)[:500]}") + invalid_streams.append((stream_index, f"Processing error: {str(e)}")) + + # Log invalid stream summary + if invalid_streams: + logger.warning(f"Found {len(invalid_streams)} invalid streams in batch") + for i, (idx, error) in enumerate(invalid_streams[:5]): # Log first 5 + logger.warning(f"Invalid stream #{i+1} at index {idx}: {error}") + if len(invalid_streams) > 5: + logger.warning(f"... and {len(invalid_streams) - 5} more invalid streams") existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())} @@ -696,25 +757,68 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False): release_task_lock('refresh_m3u_account_groups', account_id) return f"Failed to fetch M3U data for account_id={account_id}.", None - for line in lines: + # Log basic file structure for debugging + logger.debug(f"Processing {len(lines)} lines from M3U file") + + line_count = 0 + extinf_count = 0 + url_count = 0 + valid_stream_count = 0 + problematic_lines = [] + + for line_index, line in enumerate(lines): + line_count += 1 line = line.strip() + if line.startswith("#EXTINF"): + extinf_count += 1 parsed = parse_extinf_line(line) if parsed: if "group-title" in parsed["attributes"]: - groups[parsed["attributes"]["group-title"]] = {} + group_name = parsed["attributes"]["group-title"] + # Log new groups as they're discovered + if group_name not in groups: + logger.debug(f"Found new group: '{group_name}'") + groups[group_name] = {} extinf_data.append(parsed) + else: + # Log problematic EXTINF lines + logger.warning(f"Failed to parse EXTINF at line {line_index+1}: {line[:200]}") + problematic_lines.append((line_index+1, line[:200])) + elif extinf_data and line.startswith("http"): + url_count += 1 # Associate URL with the last EXTINF line extinf_data[-1]["url"] = line + valid_stream_count += 1 + # Periodically log progress for large files + if valid_stream_count % 1000 == 0: + logger.debug(f"Processed {valid_stream_count} valid streams so far...") + + # Log summary statistics + logger.info(f"M3U parsing complete - Lines: {line_count}, EXTINF: {extinf_count}, URLs: {url_count}, Valid streams: {valid_stream_count}") + + if problematic_lines: + logger.warning(f"Found {len(problematic_lines)} problematic lines during parsing") + for i, (line_num, content) in enumerate(problematic_lines[:10]): # Log max 10 examples + logger.warning(f"Problematic line #{i+1} at line {line_num}: {content}") + if len(problematic_lines) > 10: + logger.warning(f"... and {len(problematic_lines) - 10} more problematic lines") + + # Log group statistics + logger.info(f"Found {len(groups)} groups in M3U file: {', '.join(list(groups.keys())[:20])}" + + ("..." if len(groups) > 20 else "")) + + # Cache processed data cache_path = os.path.join(m3u_dir, f"{account_id}.json") with open(cache_path, 'w', encoding='utf-8') as f: json.dump({ "extinf_data": extinf_data, "groups": groups, }, f) + logger.debug(f"Cached parsed M3U data to {cache_path}") send_m3u_update(account_id, "processing_groups", 0) From 8302acd78a6640c79e571fee0082d2beca2bd65d Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 22 May 2025 15:34:20 -0500 Subject: [PATCH 08/11] Additional cleanup while processing batches. --- apps/m3u/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 39622024..3873af31 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -556,7 +556,7 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." # Aggressive garbage collection - del streams_to_create, streams_to_update, stream_hashes, existing_streams + del streams_to_create, streams_to_update, stream_hashes, existing_streams, stream_props, invalid_streams, changed_streams, unchanged_streams from core.utils import cleanup_memory cleanup_memory(log_usage=True, force_collection=True) @@ -1028,6 +1028,7 @@ def refresh_single_m3u_account(account_id): account.save(update_fields=['status']) if account.account_type == M3UAccount.Types.STADNARD: + logger.debug(f"Processing Standard account with groups: {existing_groups}") # Break into batches and process in parallel batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)] task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches) From 925850a01217e82180fad40f02af4b75f5d4848e Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 22 May 2025 16:51:20 -0500 Subject: [PATCH 09/11] Fix change_streams and unchanged_streams possibly not existing when trying to clean up. --- apps/m3u/tasks.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 3873af31..4fb6dfca 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -226,11 +226,6 @@ def parse_extinf_line(line: str) -> dict: logger.debug(f"EXTINF parsed result: {result}") return result -import re -import logging - -logger = logging.getLogger(__name__) - def _matches_filters(stream_name: str, group_name: str, filters): """Check if a stream or group name matches a precompiled regex filter.""" compiled_filters = [(re.compile(f.regex_pattern, re.IGNORECASE), f.exclude) for f in filters] @@ -437,6 +432,9 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): streams_to_update = [] stream_hashes = {} invalid_streams = [] + # Initialize these variables to prevent UnboundLocalError during cleanup + changed_streams = [] + unchanged_streams = [] # Log hash key configuration logger.debug(f"Using hash keys: {hash_keys}") From f87ab4b07171fa098e0cbb17fd400b52b458cb43 Mon Sep 17 00:00:00 2001 From: SergeantPanda Date: Thu, 22 May 2025 21:52:28 -0500 Subject: [PATCH 10/11] Rolled back some earlier memory omptimizations that were causing issues with extremely large m3us. --- apps/m3u/tasks.py | 148 +++++++----------------------------------- core/utils.py | 29 +++++++-- dispatcharr/celery.py | 2 +- 3 files changed, 45 insertions(+), 134 deletions(-) diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index 4fb6dfca..b1b1170d 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -22,11 +22,11 @@ from core.utils import RedisClient, acquire_task_lock, release_task_lock from core.models import CoreSettings, UserAgent from asgiref.sync import async_to_sync from core.xtream_codes import Client as XCClient +from core.utils import send_websocket_update logger = logging.getLogger(__name__) BATCH_SIZE = 1000 -SKIP_EXTS = {} m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u") def fetch_m3u_lines(account, use_cache=False): @@ -184,48 +184,22 @@ def parse_extinf_line(line: str) -> dict: - 'name': the value from tvg-name (if present) or the display name otherwise. """ if not line.startswith("#EXTINF:"): - logger.debug(f"Not an EXTINF line: {line[:50]}...") return None - content = line[len("#EXTINF:"):].strip() - logger.debug(f"Parsing EXTINF content: {content[:100]}...") - # Split on the first comma that is not inside quotes. parts = re.split(r',(?=(?:[^"]*"[^"]*")*[^"]*$)', content, maxsplit=1) if len(parts) != 2: - logger.warning(f"Invalid EXTINF format - couldn't split at comma: {content[:100]}...") return None - attributes_part, display_name = parts[0], parts[1].strip() - - # Debug raw attribute parsing - logger.debug(f"Attribute part: {attributes_part[:100]}...") - logger.debug(f"Display name: {display_name[:100]}...") - - # Extract attributes with more detailed logging - try: - attr_matches = re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part) - if not attr_matches: - logger.warning(f"No attributes found in: {attributes_part[:100]}...") - - attrs = dict(attr_matches) - logger.debug(f"Extracted attributes: {attrs}") - except Exception as e: - logger.error(f"Error parsing attributes: {str(e)}", exc_info=True) - attrs = {} - + attrs = dict(re.findall(r'([^\s]+)=["\']([^"\']+)["\']', attributes_part)) # Use tvg-name attribute if available; otherwise, use the display name. name = attrs.get('tvg-name', display_name) - - result = { + return { 'attributes': attrs, 'display_name': display_name, 'name': name } - logger.debug(f"EXTINF parsed result: {result}") - return result - def _matches_filters(stream_name: str, group_name: str, filters): """Check if a stream or group name matches a precompiled regex filter.""" compiled_filters = [(re.compile(f.regex_pattern, re.IGNORECASE), f.exclude) for f in filters] @@ -266,7 +240,7 @@ def process_groups(account, groups): groups_to_create = [] for group_name, custom_props in groups.items(): logger.debug(f"Handling group: {group_name}") - if (group_name not in existing_groups) and (group_name not in SKIP_EXTS): + if (group_name not in existing_groups): groups_to_create.append(ChannelGroup( name=group_name, )) @@ -426,51 +400,24 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): """Processes a batch of M3U streams using bulk operations.""" account = M3UAccount.objects.get(id=account_id) - logger.debug(f"Processing batch of {len(batch)} streams for account {account_id}") - streams_to_create = [] streams_to_update = [] stream_hashes = {} - invalid_streams = [] - # Initialize these variables to prevent UnboundLocalError during cleanup - changed_streams = [] - unchanged_streams = [] - - # Log hash key configuration - logger.debug(f"Using hash keys: {hash_keys}") # compiled_filters = [(f.filter_type, re.compile(f.regex_pattern, re.IGNORECASE)) for f in filters] logger.debug(f"Processing batch of {len(batch)}") - for stream_index, stream_info in enumerate(batch): + for stream_info in batch: try: - # Extract basic stream info with better error handling - try: - name = stream_info["name"] - url = stream_info["url"] - attrs = stream_info["attributes"] - tvg_id = attrs.get("tvg-id", "") - tvg_logo = attrs.get("tvg-logo", "") - group_title = attrs.get("group-title", "Default Group") - except KeyError as e: - logger.warning(f"Missing required field in stream {stream_index}: {e}") - logger.debug(f"Stream data: {stream_info}") - invalid_streams.append((stream_index, f"Missing field: {e}")) - continue + name, url = stream_info["name"], stream_info["url"] + tvg_id, tvg_logo = stream_info["attributes"].get("tvg-id", ""), stream_info["attributes"].get("tvg-logo", "") + group_title = stream_info["attributes"].get("group-title", "Default Group") # Filter out disabled groups for this account if group_title not in groups: - logger.debug(f"Skipping stream in disabled group: '{group_title}', name: '{name}'") - continue - - # Generate hash key with error handling - try: - stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) - logger.debug(f"Generated hash {stream_hash} for stream '{name}'") - except Exception as e: - logger.error(f"Error generating hash for stream '{name}': {str(e)}") - invalid_streams.append((stream_index, f"Hash generation error: {str(e)}")) + logger.debug(f"Skipping stream in disabled group: {group_title}") continue + stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) stream_props = { "name": name, "url": url, @@ -479,25 +426,14 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): "m3u_account": account, "channel_group_id": int(groups.get(group_title)), "stream_hash": stream_hash, - "custom_properties": json.dumps(attrs), + "custom_properties": json.dumps(stream_info["attributes"]), } if stream_hash not in stream_hashes: stream_hashes[stream_hash] = stream_props except Exception as e: - logger.error(f"Failed to process stream at index {stream_index}: {e}", exc_info=True) - if "name" in stream_info: - logger.error(f"Stream name: {stream_info['name']}") - logger.error(f"Stream data: {json.dumps(stream_info)[:500]}") - invalid_streams.append((stream_index, f"Processing error: {str(e)}")) - - # Log invalid stream summary - if invalid_streams: - logger.warning(f"Found {len(invalid_streams)} invalid streams in batch") - for i, (idx, error) in enumerate(invalid_streams[:5]): # Log first 5 - logger.warning(f"Invalid stream #{i+1} at index {idx}: {error}") - if len(invalid_streams) > 5: - logger.warning(f"... and {len(invalid_streams) - 5} more invalid streams") + logger.error(f"Failed to process stream {name}: {e}") + logger.error(json.dumps(stream_info)) existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())} @@ -554,9 +490,9 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." # Aggressive garbage collection - del streams_to_create, streams_to_update, stream_hashes, existing_streams, stream_props, invalid_streams, changed_streams, unchanged_streams - from core.utils import cleanup_memory - cleanup_memory(log_usage=True, force_collection=True) + #del streams_to_create, streams_to_update, stream_hashes, existing_streams + #from core.utils import cleanup_memory + #cleanup_memory(log_usage=True, force_collection=True) return retval @@ -1193,8 +1129,6 @@ def refresh_single_m3u_account(account_id): return f"Dispatched jobs complete." -from core.utils import send_websocket_update - def send_m3u_update(account_id, action, progress, **kwargs): # Start with the base data dictionary data = { @@ -1215,51 +1149,13 @@ def send_m3u_update(account_id, action, progress, **kwargs): except: pass # If account can't be retrieved, continue without these fields - # Add the additional key-value pairs from kwargs with size limiting - for key, value in kwargs.items(): - # Handle large arrays - limit to summary data - if isinstance(value, (list, tuple)) and len(value) > 100: - data[key] = f"{len(value)} items (truncated for performance)" - # Handle very large integers - use abbreviations for streams - elif key in ['streams_processed', 'streams_created', 'streams_updated', 'streams_deleted'] and isinstance(value, int) and value > 10000: - # Format as "226K" instead of 226154 - data[key] = f"{value//1000}K" if value >= 1000 else value - # Handle other large values that might be serialized to JSON - elif isinstance(value, (dict, object)) and key not in ['status', 'action', 'progress']: - try: - # Use a safer approach for complex objects - if hasattr(value, '__dict__'): - # Just store the class name and id if available - data[key] = f"{value.__class__.__name__}" - if hasattr(value, 'id'): - data[key] += f"(id={value.id})" - else: - # For dictionaries, limit based on size - data[key] = value - except: - # If we can't serialize, skip this value - data[key] = f"[Object of type {type(value).__name__}]" - else: - # Default case - add the value as is - data[key] = value + # Add the additional key-value pairs from kwargs + data.update(kwargs) - # Protect against message size limits in WebSocket protocol - # Most implementations limit to ~1MB, we'll be conservative - try: - # Use the standardized function with memory management - # Enable garbage collection for certain operations - collect_garbage = action == "parsing" and progress % 25 == 0 - - # Add extra garbage collection for large stream operations - if any(key in kwargs for key in ['streams_processed', 'streams_created', 'streams_updated']) and \ - any(isinstance(kwargs.get(key), int) and kwargs.get(key, 0) > 10000 - for key in ['streams_processed', 'streams_created', 'streams_updated']): - collect_garbage = True - - send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) - except Exception as e: - # Log the error but don't crash the process - logger.warning(f"Error sending WebSocket update: {e}") + # Use the standardized function with memory management + # Enable garbage collection for certain operations + collect_garbage = action == "parsing" and progress % 25 == 0 + send_websocket_update('updates', 'update', data, collect_garbage=collect_garbage) # Explicitly clear data reference to help garbage collection data = None diff --git a/core/utils.py b/core/utils.py index fcff03e5..039b0695 100644 --- a/core/utils.py +++ b/core/utils.py @@ -59,9 +59,16 @@ class RedisClient: client.config_set('save', '') # Disable RDB snapshots client.config_set('appendonly', 'no') # Disable AOF logging - # Set optimal memory settings - client.config_set('maxmemory-policy', 'allkeys-lru') # Use LRU eviction - client.config_set('maxmemory', '256mb') # Set reasonable memory limit + # Set optimal memory settings with environment variable support + # Get max memory from environment or use a larger default (512MB instead of 256MB) + #max_memory = os.environ.get('REDIS_MAX_MEMORY', '512mb') + #eviction_policy = os.environ.get('REDIS_EVICTION_POLICY', 'allkeys-lru') + + # Apply memory settings + #client.config_set('maxmemory-policy', eviction_policy) + #client.config_set('maxmemory', max_memory) + + #logger.info(f"Redis configured with maxmemory={max_memory}, policy={eviction_policy}") # Disable protected mode when in debug mode if os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true': @@ -69,10 +76,18 @@ class RedisClient: logger.warning("Redis protected mode disabled for debug environment") logger.trace("Redis persistence disabled for better performance") - except redis.exceptions.ResponseError: - # This might fail if Redis is configured to prohibit CONFIG command - # or if running in protected mode - that's okay - logger.error("Could not modify Redis persistence settings (may be restricted)") + except redis.exceptions.ResponseError as e: + # Improve error handling for Redis configuration errors + if "OOM" in str(e): + logger.error(f"Redis OOM during configuration: {e}") + # Try to increase maxmemory as an emergency measure + try: + client.config_set('maxmemory', '768mb') + logger.warning("Applied emergency Redis memory increase to 768MB") + except: + pass + else: + logger.error(f"Redis configuration error: {e}") logger.info(f"Connected to Redis at {redis_host}:{redis_port}/{redis_db}") diff --git a/dispatcharr/celery.py b/dispatcharr/celery.py index 855acacd..8856d330 100644 --- a/dispatcharr/celery.py +++ b/dispatcharr/celery.py @@ -50,7 +50,7 @@ app.conf.update( ) # Add memory cleanup after task completion -@task_postrun.connect # Use the imported signal +#@task_postrun.connect # Use the imported signal def cleanup_task_memory(**kwargs): """Clean up memory after each task completes""" # Get task name from kwargs From 2c521b69ae47a48f9ab8e6f6078b5d33bdced33a Mon Sep 17 00:00:00 2001 From: dekzter Date: Sat, 24 May 2025 07:18:08 -0400 Subject: [PATCH 11/11] tzlocal for upcoming MRs --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 22a51fbc..daf3356e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,6 +21,7 @@ rapidfuzz==3.12.1 # PyTorch dependencies (CPU only) --extra-index-url https://download.pytorch.org/whl/cpu/ torch==2.6.0+cpu +tzlocal # ML/NLP dependencies sentence-transformers==3.4.1 @@ -28,4 +29,4 @@ channels channels-redis django-filter django-celery-beat -lxml==5.4.0 \ No newline at end of file +lxml==5.4.0