mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
Greatly reduced number of logs for m3u/epg file importing/monitoring.
This commit is contained in:
parent
75a0262f5a
commit
5be46c6e36
1 changed files with 67 additions and 33 deletions
100
core/tasks.py
100
core/tasks.py
|
|
@ -27,12 +27,27 @@ REDIS_TTL = 60 * 60 * 24 * 3 # expire keys after 3 days (optional)
|
|||
|
||||
# Store the last known value to compare with new data
|
||||
last_known_data = {}
|
||||
# Store when we last logged certain recurring messages
|
||||
_last_log_times = {}
|
||||
# Don't repeat similar log messages more often than this (in seconds)
|
||||
LOG_THROTTLE_SECONDS = 300 # 5 minutes
|
||||
|
||||
@shared_task
|
||||
def beat_periodic_task():
|
||||
fetch_channel_stats()
|
||||
scan_and_process_files()
|
||||
|
||||
def throttled_log(logger_method, message, key=None, *args, **kwargs):
|
||||
"""Only log messages with the same key once per throttle period"""
|
||||
if key is None:
|
||||
# Use message as key if no explicit key provided
|
||||
key = message
|
||||
|
||||
now = time.time()
|
||||
if key not in _last_log_times or (now - _last_log_times[key]) >= LOG_THROTTLE_SECONDS:
|
||||
logger_method(message, *args, **kwargs)
|
||||
_last_log_times[key] = now
|
||||
|
||||
@shared_task
|
||||
def scan_and_process_files():
|
||||
redis_client = RedisClient.get_client()
|
||||
|
|
@ -40,21 +55,23 @@ def scan_and_process_files():
|
|||
|
||||
# Add debug logging for the auto-import setting
|
||||
auto_import_value = CoreSettings.get_auto_import_mapped_files()
|
||||
logger.info(f"Auto-import mapped files setting value: '{auto_import_value}' (type: {type(auto_import_value).__name__})")
|
||||
logger.debug(f"Auto-import mapped files setting value: '{auto_import_value}' (type: {type(auto_import_value).__name__})")
|
||||
|
||||
# Check if directories exist
|
||||
logger.info(f"Checking M3U directory: {M3U_WATCH_DIR} (exists: {os.path.exists(M3U_WATCH_DIR)})")
|
||||
logger.info(f"Checking EPG directory: {EPG_WATCH_DIR} (exists: {os.path.exists(EPG_WATCH_DIR)})")
|
||||
dirs_exist = all(os.path.exists(d) for d in [M3U_WATCH_DIR, EPG_WATCH_DIR])
|
||||
if not dirs_exist:
|
||||
throttled_log(logger.warning, f"Watch directories missing: M3U ({os.path.exists(M3U_WATCH_DIR)}), EPG ({os.path.exists(EPG_WATCH_DIR)})", "watch_dirs_missing")
|
||||
|
||||
for filename in os.listdir(M3U_WATCH_DIR):
|
||||
# Process M3U files
|
||||
m3u_files = [f for f in os.listdir(M3U_WATCH_DIR)
|
||||
if os.path.isfile(os.path.join(M3U_WATCH_DIR, f)) and
|
||||
(f.endswith('.m3u') or f.endswith('.m3u8'))]
|
||||
|
||||
m3u_processed = 0
|
||||
m3u_skipped = 0
|
||||
|
||||
for filename in m3u_files:
|
||||
filepath = os.path.join(M3U_WATCH_DIR, filename)
|
||||
|
||||
if not os.path.isfile(filepath):
|
||||
continue
|
||||
|
||||
if not filename.endswith('.m3u') and not filename.endswith('.m3u8'):
|
||||
continue
|
||||
|
||||
mtime = os.path.getmtime(filepath)
|
||||
age = now - mtime
|
||||
redis_key = REDIS_PREFIX + filepath
|
||||
|
|
@ -65,22 +82,27 @@ def scan_and_process_files():
|
|||
# Check if this file is already in the database
|
||||
existing_m3u = M3UAccount.objects.filter(file_path=filepath).exists()
|
||||
if existing_m3u:
|
||||
logger.info(f"Skipping {filename}: Already exists in database")
|
||||
logger.debug(f"Skipping {filename}: Already exists in database")
|
||||
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
|
||||
m3u_skipped += 1
|
||||
continue
|
||||
else:
|
||||
logger.info(f"Processing {filename} despite age: Not found in database")
|
||||
logger.debug(f"Processing {filename} despite age: Not found in database")
|
||||
# Continue processing this file even though it's old
|
||||
|
||||
# File too new — probably still being written
|
||||
if age < MIN_AGE_SECONDS:
|
||||
logger.debug(f"Skipping {filename}: Too new (age={age}s)")
|
||||
m3u_skipped += 1
|
||||
continue
|
||||
|
||||
# Skip if we've already processed this mtime
|
||||
if stored_mtime and float(stored_mtime) >= mtime:
|
||||
logger.debug(f"Skipping {filename}: Already processed this version")
|
||||
m3u_skipped += 1
|
||||
continue
|
||||
|
||||
m3u_account, _ = M3UAccount.objects.get_or_create(file_path=filepath, defaults={
|
||||
m3u_account, created = M3UAccount.objects.get_or_create(file_path=filepath, defaults={
|
||||
"name": filename,
|
||||
"is_active": CoreSettings.get_auto_import_mapped_files() in [True, "true", "True"],
|
||||
})
|
||||
|
|
@ -88,10 +110,13 @@ def scan_and_process_files():
|
|||
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
|
||||
|
||||
if not m3u_account.is_active:
|
||||
logger.info("M3U account is inactive, skipping.")
|
||||
logger.debug(f"Skipping {filename}: M3U account is inactive")
|
||||
m3u_skipped += 1
|
||||
continue
|
||||
|
||||
logger.info(f"Queueing refresh for M3U file: {filename}")
|
||||
refresh_single_m3u_account.delay(m3u_account.id)
|
||||
m3u_processed += 1
|
||||
|
||||
channel_layer = get_channel_layer()
|
||||
async_to_sync(channel_layer.group_send)(
|
||||
|
|
@ -102,23 +127,31 @@ def scan_and_process_files():
|
|||
},
|
||||
)
|
||||
|
||||
logger.debug(f"M3U processing complete: {m3u_processed} processed, {m3u_skipped} skipped, {len(m3u_files)} total")
|
||||
|
||||
# Process EPG files
|
||||
try:
|
||||
epg_files = os.listdir(EPG_WATCH_DIR)
|
||||
logger.info(f"Found {len(epg_files)} files in EPG directory: {epg_files}")
|
||||
logger.debug(f"Found {len(epg_files)} files in EPG directory")
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing EPG directory: {e}")
|
||||
epg_files = []
|
||||
|
||||
epg_processed = 0
|
||||
epg_skipped = 0
|
||||
epg_errors = 0
|
||||
|
||||
for filename in epg_files:
|
||||
filepath = os.path.join(EPG_WATCH_DIR, filename)
|
||||
logger.info(f"Processing potential EPG file: {filename}")
|
||||
|
||||
if not os.path.isfile(filepath):
|
||||
logger.info(f"Skipping {filename}: Not a file")
|
||||
logger.debug(f"Skipping {filename}: Not a file")
|
||||
epg_skipped += 1
|
||||
continue
|
||||
|
||||
if not filename.endswith('.xml') and not filename.endswith('.gz'):
|
||||
logger.info(f"Skipping {filename}: Not an XML or GZ file")
|
||||
logger.debug(f"Skipping {filename}: Not an XML or GZ file")
|
||||
epg_skipped += 1
|
||||
continue
|
||||
|
||||
mtime = os.path.getmtime(filepath)
|
||||
|
|
@ -126,32 +159,32 @@ def scan_and_process_files():
|
|||
redis_key = REDIS_PREFIX + filepath
|
||||
stored_mtime = redis_client.get(redis_key)
|
||||
|
||||
logger.info(f"File {filename}: age={age}s, MIN_AGE={MIN_AGE_SECONDS}s, stored_mtime={stored_mtime}")
|
||||
|
||||
# Instead of assuming old files were processed, check if they exist in the database
|
||||
if not stored_mtime and age > STARTUP_SKIP_AGE:
|
||||
# Check if this file is already in the database
|
||||
existing_epg = EPGSource.objects.filter(file_path=filepath).exists()
|
||||
if existing_epg:
|
||||
logger.info(f"Skipping {filename}: Already exists in database")
|
||||
logger.debug(f"Skipping {filename}: Already exists in database")
|
||||
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
|
||||
epg_skipped += 1
|
||||
continue
|
||||
else:
|
||||
logger.info(f"Processing {filename} despite age: Not found in database")
|
||||
logger.debug(f"Processing {filename} despite age: Not found in database")
|
||||
# Continue processing this file even though it's old
|
||||
|
||||
# File too new — probably still being written
|
||||
if age < MIN_AGE_SECONDS:
|
||||
logger.info(f"Skipping {filename}: Too new, possibly still being written (age={age}s < {MIN_AGE_SECONDS}s)")
|
||||
logger.debug(f"Skipping {filename}: Too new, possibly still being written (age={age}s)")
|
||||
epg_skipped += 1
|
||||
continue
|
||||
|
||||
# Skip if we've already processed this mtime
|
||||
if stored_mtime and float(stored_mtime) >= mtime:
|
||||
logger.info(f"Skipping {filename}: Already processed this version (stored={stored_mtime}, current={mtime})")
|
||||
logger.debug(f"Skipping {filename}: Already processed this version")
|
||||
epg_skipped += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
logger.info(f"Creating/getting EPG source for {filename}")
|
||||
epg_source, created = EPGSource.objects.get_or_create(file_path=filepath, defaults={
|
||||
"name": filename,
|
||||
"source_type": "xmltv",
|
||||
|
|
@ -160,25 +193,26 @@ def scan_and_process_files():
|
|||
|
||||
# Add debug logging for created sources
|
||||
if created:
|
||||
logger.info(f"Created new EPG source '{filename}' with is_active={epg_source.is_active}")
|
||||
else:
|
||||
logger.info(f"Found existing EPG source '{filename}' with is_active={epg_source.is_active}")
|
||||
logger.info(f"Created new EPG source '{filename}'")
|
||||
|
||||
redis_client.set(redis_key, mtime, ex=REDIS_TTL)
|
||||
|
||||
if not epg_source.is_active:
|
||||
logger.info(f"Skipping {filename}: EPG source is marked as inactive")
|
||||
logger.debug(f"Skipping {filename}: EPG source is marked as inactive")
|
||||
epg_skipped += 1
|
||||
continue
|
||||
|
||||
logger.info(f"Triggering refresh_epg_data task for EPG source id={epg_source.id}")
|
||||
logger.info(f"Queueing refresh for EPG file: {filename}")
|
||||
refresh_epg_data.delay(epg_source.id) # Trigger Celery task
|
||||
|
||||
logger.info(f"Successfully queued refresh for EPG file: {filename}")
|
||||
epg_processed += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing EPG file {filename}: {str(e)}", exc_info=True)
|
||||
epg_errors += 1
|
||||
continue
|
||||
|
||||
logger.debug(f"EPG processing complete: {epg_processed} processed, {epg_skipped} skipped, {epg_errors} errors")
|
||||
|
||||
def fetch_channel_stats():
|
||||
redis_client = RedisClient.get_client()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue