diff --git a/core/tasks.py b/core/tasks.py index ccc6f177..83682a69 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -27,12 +27,27 @@ REDIS_TTL = 60 * 60 * 24 * 3 # expire keys after 3 days (optional) # Store the last known value to compare with new data last_known_data = {} +# Store when we last logged certain recurring messages +_last_log_times = {} +# Don't repeat similar log messages more often than this (in seconds) +LOG_THROTTLE_SECONDS = 300 # 5 minutes @shared_task def beat_periodic_task(): fetch_channel_stats() scan_and_process_files() +def throttled_log(logger_method, message, key=None, *args, **kwargs): + """Only log messages with the same key once per throttle period""" + if key is None: + # Use message as key if no explicit key provided + key = message + + now = time.time() + if key not in _last_log_times or (now - _last_log_times[key]) >= LOG_THROTTLE_SECONDS: + logger_method(message, *args, **kwargs) + _last_log_times[key] = now + @shared_task def scan_and_process_files(): redis_client = RedisClient.get_client() @@ -40,21 +55,23 @@ def scan_and_process_files(): # Add debug logging for the auto-import setting auto_import_value = CoreSettings.get_auto_import_mapped_files() - logger.info(f"Auto-import mapped files setting value: '{auto_import_value}' (type: {type(auto_import_value).__name__})") + logger.debug(f"Auto-import mapped files setting value: '{auto_import_value}' (type: {type(auto_import_value).__name__})") # Check if directories exist - logger.info(f"Checking M3U directory: {M3U_WATCH_DIR} (exists: {os.path.exists(M3U_WATCH_DIR)})") - logger.info(f"Checking EPG directory: {EPG_WATCH_DIR} (exists: {os.path.exists(EPG_WATCH_DIR)})") + dirs_exist = all(os.path.exists(d) for d in [M3U_WATCH_DIR, EPG_WATCH_DIR]) + if not dirs_exist: + throttled_log(logger.warning, f"Watch directories missing: M3U ({os.path.exists(M3U_WATCH_DIR)}), EPG ({os.path.exists(EPG_WATCH_DIR)})", "watch_dirs_missing") - for filename in os.listdir(M3U_WATCH_DIR): + # Process M3U files + m3u_files = [f for f in os.listdir(M3U_WATCH_DIR) + if os.path.isfile(os.path.join(M3U_WATCH_DIR, f)) and + (f.endswith('.m3u') or f.endswith('.m3u8'))] + + m3u_processed = 0 + m3u_skipped = 0 + + for filename in m3u_files: filepath = os.path.join(M3U_WATCH_DIR, filename) - - if not os.path.isfile(filepath): - continue - - if not filename.endswith('.m3u') and not filename.endswith('.m3u8'): - continue - mtime = os.path.getmtime(filepath) age = now - mtime redis_key = REDIS_PREFIX + filepath @@ -65,22 +82,27 @@ def scan_and_process_files(): # Check if this file is already in the database existing_m3u = M3UAccount.objects.filter(file_path=filepath).exists() if existing_m3u: - logger.info(f"Skipping {filename}: Already exists in database") + logger.debug(f"Skipping {filename}: Already exists in database") redis_client.set(redis_key, mtime, ex=REDIS_TTL) + m3u_skipped += 1 continue else: - logger.info(f"Processing {filename} despite age: Not found in database") + logger.debug(f"Processing {filename} despite age: Not found in database") # Continue processing this file even though it's old # File too new — probably still being written if age < MIN_AGE_SECONDS: + logger.debug(f"Skipping {filename}: Too new (age={age}s)") + m3u_skipped += 1 continue # Skip if we've already processed this mtime if stored_mtime and float(stored_mtime) >= mtime: + logger.debug(f"Skipping {filename}: Already processed this version") + m3u_skipped += 1 continue - m3u_account, _ = M3UAccount.objects.get_or_create(file_path=filepath, defaults={ + m3u_account, created = M3UAccount.objects.get_or_create(file_path=filepath, defaults={ "name": filename, "is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"], }) @@ -88,10 +110,13 @@ def scan_and_process_files(): redis_client.set(redis_key, mtime, ex=REDIS_TTL) if not m3u_account.is_active: - logger.info("M3U account is inactive, skipping.") + logger.debug(f"Skipping {filename}: M3U account is inactive") + m3u_skipped += 1 continue + logger.info(f"Queueing refresh for M3U file: {filename}") refresh_single_m3u_account.delay(m3u_account.id) + m3u_processed += 1 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( @@ -102,23 +127,31 @@ def scan_and_process_files(): }, ) + logger.debug(f"M3U processing complete: {m3u_processed} processed, {m3u_skipped} skipped, {len(m3u_files)} total") + + # Process EPG files try: epg_files = os.listdir(EPG_WATCH_DIR) - logger.info(f"Found {len(epg_files)} files in EPG directory: {epg_files}") + logger.debug(f"Found {len(epg_files)} files in EPG directory") except Exception as e: logger.error(f"Error listing EPG directory: {e}") epg_files = [] + epg_processed = 0 + epg_skipped = 0 + epg_errors = 0 + for filename in epg_files: filepath = os.path.join(EPG_WATCH_DIR, filename) - logger.info(f"Processing potential EPG file: {filename}") if not os.path.isfile(filepath): - logger.info(f"Skipping {filename}: Not a file") + logger.debug(f"Skipping {filename}: Not a file") + epg_skipped += 1 continue if not filename.endswith('.xml') and not filename.endswith('.gz'): - logger.info(f"Skipping {filename}: Not an XML or GZ file") + logger.debug(f"Skipping {filename}: Not an XML or GZ file") + epg_skipped += 1 continue mtime = os.path.getmtime(filepath) @@ -126,32 +159,32 @@ def scan_and_process_files(): redis_key = REDIS_PREFIX + filepath stored_mtime = redis_client.get(redis_key) - logger.info(f"File {filename}: age={age}s, MIN_AGE={MIN_AGE_SECONDS}s, stored_mtime={stored_mtime}") - # Instead of assuming old files were processed, check if they exist in the database if not stored_mtime and age > STARTUP_SKIP_AGE: # Check if this file is already in the database existing_epg = EPGSource.objects.filter(file_path=filepath).exists() if existing_epg: - logger.info(f"Skipping {filename}: Already exists in database") + logger.debug(f"Skipping {filename}: Already exists in database") redis_client.set(redis_key, mtime, ex=REDIS_TTL) + epg_skipped += 1 continue else: - logger.info(f"Processing {filename} despite age: Not found in database") + logger.debug(f"Processing {filename} despite age: Not found in database") # Continue processing this file even though it's old # File too new — probably still being written if age < MIN_AGE_SECONDS: - logger.info(f"Skipping {filename}: Too new, possibly still being written (age={age}s < {MIN_AGE_SECONDS}s)") + logger.debug(f"Skipping {filename}: Too new, possibly still being written (age={age}s)") + epg_skipped += 1 continue # Skip if we've already processed this mtime if stored_mtime and float(stored_mtime) >= mtime: - logger.info(f"Skipping {filename}: Already processed this version (stored={stored_mtime}, current={mtime})") + logger.debug(f"Skipping {filename}: Already processed this version") + epg_skipped += 1 continue try: - logger.info(f"Creating/getting EPG source for {filename}") epg_source, created = EPGSource.objects.get_or_create(file_path=filepath, defaults={ "name": filename, "source_type": "xmltv", @@ -160,25 +193,26 @@ def scan_and_process_files(): # Add debug logging for created sources if created: - logger.info(f"Created new EPG source '{filename}' with is_active={epg_source.is_active}") - else: - logger.info(f"Found existing EPG source '{filename}' with is_active={epg_source.is_active}") + logger.info(f"Created new EPG source '{filename}'") redis_client.set(redis_key, mtime, ex=REDIS_TTL) if not epg_source.is_active: - logger.info(f"Skipping {filename}: EPG source is marked as inactive") + logger.debug(f"Skipping {filename}: EPG source is marked as inactive") + epg_skipped += 1 continue - logger.info(f"Triggering refresh_epg_data task for EPG source id={epg_source.id}") + logger.info(f"Queueing refresh for EPG file: {filename}") refresh_epg_data.delay(epg_source.id) # Trigger Celery task - - logger.info(f"Successfully queued refresh for EPG file: {filename}") + epg_processed += 1 except Exception as e: logger.error(f"Error processing EPG file {filename}: {str(e)}", exc_info=True) + epg_errors += 1 continue + logger.debug(f"EPG processing complete: {epg_processed} processed, {epg_skipped} skipped, {epg_errors} errors") + def fetch_channel_stats(): redis_client = RedisClient.get_client()