# apps/channels/tasks.py import logging import os import select import re import requests import time import json import subprocess import signal from zoneinfo import ZoneInfo from datetime import datetime, timedelta import gc from celery import shared_task from django.utils.text import slugify from rapidfuzz import fuzz from apps.channels.models import Channel from apps.epg.models import EPGData from core.models import CoreSettings from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from asgiref.sync import async_to_sync from channels.layers import get_channel_layer import tempfile from urllib.parse import quote logger = logging.getLogger(__name__) # PostgreSQL btree index has a limit of ~2704 bytes (1/3 of 8KB page size) # We use 2000 as a safe maximum to account for multibyte characters def validate_logo_url(logo_url, max_length=2000): """ Fast validation for logo URLs during bulk creation. Returns None if URL is too long (would exceed PostgreSQL btree index limit), original URL otherwise. PostgreSQL btree indexes have a maximum size of ~2704 bytes. URLs longer than this cannot be indexed and would cause database errors. These are typically base64-encoded images embedded in URLs. """ if logo_url and len(logo_url) > max_length: logger.warning(f"Logo URL too long ({len(logo_url)} > {max_length}), skipping: {logo_url[:100]}...") return None return logo_url def send_epg_matching_progress(total_channels, matched_channels, current_channel_name="", stage="matching"): """ Send EPG matching progress via WebSocket """ try: channel_layer = get_channel_layer() if channel_layer: progress_data = { 'type': 'epg_matching_progress', 'total': total_channels, 'matched': len(matched_channels) if isinstance(matched_channels, list) else matched_channels, 'remaining': total_channels - (len(matched_channels) if isinstance(matched_channels, list) else matched_channels), 'current_channel': current_channel_name, 'stage': stage, 'progress_percent': round((len(matched_channels) if isinstance(matched_channels, list) else matched_channels) / total_channels * 100, 1) if total_channels > 0 else 0 } async_to_sync(channel_layer.group_send)( "updates", { "type": "update", "data": { "type": "epg_matching_progress", **progress_data } } ) except Exception as e: logger.warning(f"Failed to send EPG matching progress: {e}") # Lazy loading for ML models - only imported/loaded when needed _ml_model_cache = { 'sentence_transformer': None } def get_sentence_transformer(): """Lazy load the sentence transformer model only when needed""" if _ml_model_cache['sentence_transformer'] is None: try: from sentence_transformers import SentenceTransformer from sentence_transformers import util model_name = "sentence-transformers/all-MiniLM-L6-v2" cache_dir = "/data/models" # Check environment variable to disable downloads disable_downloads = os.environ.get('DISABLE_ML_DOWNLOADS', 'false').lower() == 'true' if disable_downloads: # Check if model exists before attempting to load hf_model_path = os.path.join(cache_dir, f"models--{model_name.replace('/', '--')}") if not os.path.exists(hf_model_path): logger.warning("ML model not found and downloads disabled (DISABLE_ML_DOWNLOADS=true). Skipping ML matching.") return None, None # Ensure cache directory exists os.makedirs(cache_dir, exist_ok=True) # Let sentence-transformers handle all cache detection and management logger.info(f"Loading sentence transformer model (cache: {cache_dir})") _ml_model_cache['sentence_transformer'] = SentenceTransformer( model_name, cache_folder=cache_dir ) return _ml_model_cache['sentence_transformer'], util except ImportError: logger.warning("sentence-transformers not available - ML-enhanced matching disabled") return None, None except Exception as e: logger.error(f"Failed to load sentence transformer: {e}") return None, None else: from sentence_transformers import util return _ml_model_cache['sentence_transformer'], util # ML matching thresholds (same as original script) BEST_FUZZY_THRESHOLD = 85 LOWER_FUZZY_THRESHOLD = 40 EMBED_SIM_THRESHOLD = 0.65 # Words we remove to help with fuzzy + embedding matching COMMON_EXTRANEOUS_WORDS = [ "tv", "channel", "network", "television", "east", "west", "hd", "uhd", "24/7", "1080p", "720p", "540p", "480p", "film", "movie", "movies" ] def normalize_name(name: str) -> str: """ A more aggressive normalization that: - Lowercases - Removes bracketed/parenthesized text - Removes punctuation - Strips extraneous words - Collapses extra spaces """ if not name: return "" norm = name.lower() norm = re.sub(r"\[.*?\]", "", norm) # Extract and preserve important call signs from parentheses before removing them # This captures call signs like (KVLY), (KING), (KARE), etc. call_sign_match = re.search(r"\(([A-Z]{3,5})\)", name) preserved_call_sign = "" if call_sign_match: preserved_call_sign = " " + call_sign_match.group(1).lower() # Now remove all parentheses content norm = re.sub(r"\(.*?\)", "", norm) # Add back the preserved call sign norm = norm + preserved_call_sign norm = re.sub(r"[^\w\s]", "", norm) tokens = norm.split() tokens = [t for t in tokens if t not in COMMON_EXTRANEOUS_WORDS] norm = " ".join(tokens).strip() return norm def match_channels_to_epg(channels_data, epg_data, region_code=None, use_ml=True, send_progress=True): """ EPG matching logic that finds the best EPG matches for channels using multiple matching strategies including fuzzy matching and ML models. Automatically uses conservative thresholds for bulk matching (multiple channels) to avoid bad matches that create user cleanup work, and aggressive thresholds for single channel matching where users specifically requested a match attempt. """ channels_to_update = [] matched_channels = [] total_channels = len(channels_data) # Send initial progress if send_progress: send_epg_matching_progress(total_channels, 0, stage="starting") # Try to get ML models if requested (but don't load yet - lazy loading) st_model, util = None, None epg_embeddings = None ml_available = use_ml # Automatically determine matching strategy based on number of channels is_bulk_matching = len(channels_data) > 1 # Adjust matching thresholds based on operation type if is_bulk_matching: # Conservative thresholds for bulk matching to avoid creating cleanup work FUZZY_HIGH_CONFIDENCE = 90 # Only very high fuzzy scores FUZZY_MEDIUM_CONFIDENCE = 70 # Higher threshold for ML enhancement ML_HIGH_CONFIDENCE = 0.75 # Higher ML confidence required ML_LAST_RESORT = 0.65 # More conservative last resort FUZZY_LAST_RESORT_MIN = 50 # Higher fuzzy minimum for last resort logger.info(f"Using conservative thresholds for bulk matching ({total_channels} channels)") else: # More aggressive thresholds for single channel matching (user requested specific match) FUZZY_HIGH_CONFIDENCE = 85 # Original threshold FUZZY_MEDIUM_CONFIDENCE = 40 # Original threshold ML_HIGH_CONFIDENCE = 0.65 # Original threshold ML_LAST_RESORT = 0.50 # Original desperate threshold FUZZY_LAST_RESORT_MIN = 20 # Original minimum logger.info("Using aggressive thresholds for single channel matching") # Process each channel for index, chan in enumerate(channels_data): normalized_tvg_id = chan.get("tvg_id", "") fallback_name = chan["tvg_id"].strip() if chan["tvg_id"] else chan["name"] # Send progress update every 5 channels or for the first few if send_progress and (index < 5 or index % 5 == 0 or index == total_channels - 1): send_epg_matching_progress( total_channels, len(matched_channels), current_channel_name=chan["name"][:50], # Truncate long names stage="matching" ) normalized_tvg_id = chan.get("tvg_id", "") fallback_name = chan["tvg_id"].strip() if chan["tvg_id"] else chan["name"] # Step 1: Exact TVG ID match epg_by_tvg_id = next((epg for epg in epg_data if epg["tvg_id"] == normalized_tvg_id), None) if normalized_tvg_id and epg_by_tvg_id: chan["epg_data_id"] = epg_by_tvg_id["id"] channels_to_update.append(chan) matched_channels.append((chan['id'], fallback_name, epg_by_tvg_id["tvg_id"])) logger.info(f"Channel {chan['id']} '{fallback_name}' => EPG found by exact tvg_id={epg_by_tvg_id['tvg_id']}") continue # Step 2: Secondary TVG ID check (legacy compatibility) if chan["tvg_id"]: epg_match = [epg["id"] for epg in epg_data if epg["tvg_id"] == chan["tvg_id"]] if epg_match: chan["epg_data_id"] = epg_match[0] channels_to_update.append(chan) matched_channels.append((chan['id'], fallback_name, chan["tvg_id"])) logger.info(f"Channel {chan['id']} '{chan['name']}' => EPG found by secondary tvg_id={chan['tvg_id']}") continue # Step 2.5: Exact Gracenote ID match normalized_gracenote_id = chan.get("gracenote_id", "") if normalized_gracenote_id: epg_by_gracenote_id = next((epg for epg in epg_data if epg["tvg_id"] == normalized_gracenote_id), None) if epg_by_gracenote_id: chan["epg_data_id"] = epg_by_gracenote_id["id"] channels_to_update.append(chan) matched_channels.append((chan['id'], fallback_name, f"gracenote:{epg_by_gracenote_id['tvg_id']}")) logger.info(f"Channel {chan['id']} '{fallback_name}' => EPG found by exact gracenote_id={normalized_gracenote_id}") continue # Step 3: Name-based fuzzy matching if not chan["norm_chan"]: logger.debug(f"Channel {chan['id']} '{chan['name']}' => empty after normalization, skipping") continue best_score = 0 best_epg = None # Debug: show what we're matching against logger.debug(f"Fuzzy matching '{chan['norm_chan']}' against EPG entries...") # Find best fuzzy match for row in epg_data: if not row.get("norm_name"): continue base_score = fuzz.ratio(chan["norm_chan"], row["norm_name"]) bonus = 0 # Apply region-based bonus/penalty if region_code and row.get("tvg_id"): combined_text = row["tvg_id"].lower() + " " + row["name"].lower() dot_regions = re.findall(r'\.([a-z]{2})', combined_text) if dot_regions: if region_code in dot_regions: bonus = 15 # Bigger bonus for matching region else: bonus = -15 # Penalty for different region elif region_code in combined_text: bonus = 10 score = base_score + bonus # Debug the best few matches if score > 50: # Only show decent matches logger.debug(f" EPG '{row['name']}' (norm: '{row['norm_name']}') => score: {score} (base: {base_score}, bonus: {bonus})") # When scores are equal, prefer higher priority EPG source row_priority = row.get('epg_source_priority', 0) best_priority = best_epg.get('epg_source_priority', 0) if best_epg else -1 if score > best_score or (score == best_score and row_priority > best_priority): best_score = score best_epg = row # Log the best score we found if best_epg: logger.info(f"Channel {chan['id']} '{chan['name']}' => best match: '{best_epg['name']}' (score: {best_score})") else: logger.debug(f"Channel {chan['id']} '{chan['name']}' => no EPG entries with valid norm_name found") continue # High confidence match - accept immediately if best_score >= FUZZY_HIGH_CONFIDENCE: chan["epg_data_id"] = best_epg["id"] channels_to_update.append(chan) matched_channels.append((chan['id'], chan['name'], best_epg["tvg_id"])) logger.info(f"Channel {chan['id']} '{chan['name']}' => matched tvg_id={best_epg['tvg_id']} (score={best_score})") # Medium confidence - use ML if available (lazy load models here) elif best_score >= FUZZY_MEDIUM_CONFIDENCE and ml_available: # Lazy load ML models only when we actually need them if st_model is None: st_model, util = get_sentence_transformer() # Lazy generate embeddings only when we actually need them if epg_embeddings is None and st_model and any(row.get("norm_name") for row in epg_data): try: logger.info("Generating embeddings for EPG data using ML model (lazy loading)") epg_embeddings = st_model.encode( [row["norm_name"] for row in epg_data if row.get("norm_name")], convert_to_tensor=True ) except Exception as e: logger.warning(f"Failed to generate embeddings: {e}") epg_embeddings = None if epg_embeddings is not None and st_model: try: # Generate embedding for this channel chan_embedding = st_model.encode(chan["norm_chan"], convert_to_tensor=True) # Calculate similarity with all EPG embeddings sim_scores = util.cos_sim(chan_embedding, epg_embeddings)[0] top_index = int(sim_scores.argmax()) top_value = float(sim_scores[top_index]) if top_value >= ML_HIGH_CONFIDENCE: # Find the EPG entry that corresponds to this embedding index epg_with_names = [epg for epg in epg_data if epg.get("norm_name")] matched_epg = epg_with_names[top_index] chan["epg_data_id"] = matched_epg["id"] channels_to_update.append(chan) matched_channels.append((chan['id'], chan['name'], matched_epg["tvg_id"])) logger.info(f"Channel {chan['id']} '{chan['name']}' => matched EPG tvg_id={matched_epg['tvg_id']} (fuzzy={best_score}, ML-sim={top_value:.2f})") else: logger.info(f"Channel {chan['id']} '{chan['name']}' => fuzzy={best_score}, ML-sim={top_value:.2f} < {ML_HIGH_CONFIDENCE}, trying last resort...") # Last resort: try ML with very low fuzzy threshold if top_value >= ML_LAST_RESORT: # Dynamic last resort threshold epg_with_names = [epg for epg in epg_data if epg.get("norm_name")] matched_epg = epg_with_names[top_index] chan["epg_data_id"] = matched_epg["id"] channels_to_update.append(chan) matched_channels.append((chan['id'], chan['name'], matched_epg["tvg_id"])) logger.info(f"Channel {chan['id']} '{chan['name']}' => LAST RESORT match EPG tvg_id={matched_epg['tvg_id']} (fuzzy={best_score}, ML-sim={top_value:.2f})") else: logger.info(f"Channel {chan['id']} '{chan['name']}' => even last resort ML-sim {top_value:.2f} < {ML_LAST_RESORT}, skipping") except Exception as e: logger.warning(f"ML matching failed for channel {chan['id']}: {e}") # Fall back to non-ML decision logger.info(f"Channel {chan['id']} '{chan['name']}' => fuzzy score {best_score} below threshold, skipping") # Last resort: Try ML matching even with very low fuzzy scores elif best_score >= FUZZY_LAST_RESORT_MIN and ml_available: # Lazy load ML models for last resort attempts if st_model is None: st_model, util = get_sentence_transformer() # Lazy generate embeddings for last resort attempts if epg_embeddings is None and st_model and any(row.get("norm_name") for row in epg_data): try: logger.info("Generating embeddings for EPG data using ML model (last resort lazy loading)") epg_embeddings = st_model.encode( [row["norm_name"] for row in epg_data if row.get("norm_name")], convert_to_tensor=True ) except Exception as e: logger.warning(f"Failed to generate embeddings for last resort: {e}") epg_embeddings = None if epg_embeddings is not None and st_model: try: logger.info(f"Channel {chan['id']} '{chan['name']}' => trying ML as last resort (fuzzy={best_score})") # Generate embedding for this channel chan_embedding = st_model.encode(chan["norm_chan"], convert_to_tensor=True) # Calculate similarity with all EPG embeddings sim_scores = util.cos_sim(chan_embedding, epg_embeddings)[0] top_index = int(sim_scores.argmax()) top_value = float(sim_scores[top_index]) if top_value >= ML_LAST_RESORT: # Dynamic threshold for desperate attempts # Find the EPG entry that corresponds to this embedding index epg_with_names = [epg for epg in epg_data if epg.get("norm_name")] matched_epg = epg_with_names[top_index] chan["epg_data_id"] = matched_epg["id"] channels_to_update.append(chan) matched_channels.append((chan['id'], chan['name'], matched_epg["tvg_id"])) logger.info(f"Channel {chan['id']} '{chan['name']}' => DESPERATE LAST RESORT match EPG tvg_id={matched_epg['tvg_id']} (fuzzy={best_score}, ML-sim={top_value:.2f})") else: logger.info(f"Channel {chan['id']} '{chan['name']}' => desperate last resort ML-sim {top_value:.2f} < {ML_LAST_RESORT}, giving up") except Exception as e: logger.warning(f"Last resort ML matching failed for channel {chan['id']}: {e}") logger.info(f"Channel {chan['id']} '{chan['name']}' => best fuzzy score={best_score} < {FUZZY_MEDIUM_CONFIDENCE}, giving up") else: # No ML available or very low fuzzy score logger.info(f"Channel {chan['id']} '{chan['name']}' => best fuzzy score={best_score} < {FUZZY_MEDIUM_CONFIDENCE}, no ML fallback available") # Clean up ML models from memory after matching (infrequent operation) if _ml_model_cache['sentence_transformer'] is not None: logger.info("Cleaning up ML models from memory") _ml_model_cache['sentence_transformer'] = None gc.collect() # Send final progress update if send_progress: send_epg_matching_progress( total_channels, len(matched_channels), stage="completed" ) return { "channels_to_update": channels_to_update, "matched_channels": matched_channels } @shared_task def match_epg_channels(): """ Uses integrated EPG matching instead of external script. Provides the same functionality with better performance and maintainability. """ try: logger.info("Starting integrated EPG matching...") # Get region preference try: region_obj = CoreSettings.objects.get(key="preferred-region") region_code = region_obj.value.strip().lower() except CoreSettings.DoesNotExist: region_code = None # Get channels that don't have EPG data assigned channels_without_epg = Channel.objects.filter(epg_data__isnull=True) logger.info(f"Found {channels_without_epg.count()} channels without EPG data") channels_data = [] for channel in channels_without_epg: normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" normalized_gracenote_id = channel.tvc_guide_stationid.strip().lower() if channel.tvc_guide_stationid else "" channels_data.append({ "id": channel.id, "name": channel.name, "tvg_id": normalized_tvg_id, "original_tvg_id": channel.tvg_id, "gracenote_id": normalized_gracenote_id, "original_gracenote_id": channel.tvc_guide_stationid, "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, "norm_chan": normalize_name(channel.name) # Always use channel name for fuzzy matching! }) # Get all EPG data from active sources, ordered by source priority (highest first) so we prefer higher priority matches epg_data = [] for epg in EPGData.objects.select_related('epg_source').filter(epg_source__is_active=True): normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" epg_data.append({ 'id': epg.id, 'tvg_id': normalized_tvg_id, 'original_tvg_id': epg.tvg_id, 'name': epg.name, 'norm_name': normalize_name(epg.name), 'epg_source_id': epg.epg_source.id if epg.epg_source else None, 'epg_source_priority': epg.epg_source.priority if epg.epg_source else 0, }) # Sort EPG data by source priority (highest first) so we prefer higher priority matches epg_data.sort(key=lambda x: x['epg_source_priority'], reverse=True) logger.info(f"Processing {len(channels_data)} channels against {len(epg_data)} EPG entries (from active sources only)") # Run EPG matching with progress updates - automatically uses conservative thresholds for bulk operations result = match_channels_to_epg(channels_data, epg_data, region_code, use_ml=True, send_progress=True) channels_to_update_dicts = result["channels_to_update"] matched_channels = result["matched_channels"] # Update channels in database if channels_to_update_dicts: channel_ids = [d["id"] for d in channels_to_update_dicts] channels_qs = Channel.objects.filter(id__in=channel_ids) channels_list = list(channels_qs) # Create mapping from channel_id to epg_data_id epg_mapping = {d["id"]: d["epg_data_id"] for d in channels_to_update_dicts} # Update each channel with matched EPG data for channel_obj in channels_list: epg_data_id = epg_mapping.get(channel_obj.id) if epg_data_id: try: epg_data_obj = EPGData.objects.get(id=epg_data_id) channel_obj.epg_data = epg_data_obj except EPGData.DoesNotExist: logger.error(f"EPG data {epg_data_id} not found for channel {channel_obj.id}") # Bulk update all channels Channel.objects.bulk_update(channels_list, ["epg_data"]) total_matched = len(matched_channels) if total_matched: logger.info(f"Match Summary: {total_matched} channel(s) matched.") for (cid, cname, tvg) in matched_channels: logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'") else: logger.info("No new channels were matched.") logger.info("Finished integrated EPG matching.") # Send WebSocket update channel_layer = get_channel_layer() associations = [ {"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]} for chan in channels_to_update_dicts ] async_to_sync(channel_layer.group_send)( 'updates', { 'type': 'update', "data": { "success": True, "type": "epg_match", "refresh_channels": True, "matches_count": total_matched, "message": f"EPG matching complete: {total_matched} channel(s) matched", "associations": associations } } ) return f"Done. Matched {total_matched} channel(s)." finally: # Clean up ML models from memory after bulk matching if _ml_model_cache['sentence_transformer'] is not None: logger.info("Cleaning up ML models from memory") _ml_model_cache['sentence_transformer'] = None # Memory cleanup gc.collect() from core.utils import cleanup_memory cleanup_memory(log_usage=True, force_collection=True) @shared_task def match_selected_channels_epg(channel_ids): """ Match EPG data for only the specified selected channels. Uses the same integrated EPG matching logic but processes only selected channels. """ try: logger.info(f"Starting integrated EPG matching for {len(channel_ids)} selected channels...") # Get region preference try: region_obj = CoreSettings.objects.get(key="preferred-region") region_code = region_obj.value.strip().lower() except CoreSettings.DoesNotExist: region_code = None # Get only the specified channels that don't have EPG data assigned channels_without_epg = Channel.objects.filter( id__in=channel_ids, epg_data__isnull=True ) logger.info(f"Found {channels_without_epg.count()} selected channels without EPG data") if not channels_without_epg.exists(): logger.info("No selected channels need EPG matching.") # Send WebSocket update channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( 'updates', { 'type': 'update', "data": { "success": True, "type": "epg_match", "refresh_channels": True, "matches_count": 0, "message": "No selected channels need EPG matching", "associations": [] } } ) return "No selected channels needed EPG matching." channels_data = [] for channel in channels_without_epg: normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" normalized_gracenote_id = channel.tvc_guide_stationid.strip().lower() if channel.tvc_guide_stationid else "" channels_data.append({ "id": channel.id, "name": channel.name, "tvg_id": normalized_tvg_id, "original_tvg_id": channel.tvg_id, "gracenote_id": normalized_gracenote_id, "original_gracenote_id": channel.tvc_guide_stationid, "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, "norm_chan": normalize_name(channel.name) }) # Get all EPG data from active sources, ordered by source priority (highest first) so we prefer higher priority matches epg_data = [] for epg in EPGData.objects.select_related('epg_source').filter(epg_source__is_active=True): normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" epg_data.append({ 'id': epg.id, 'tvg_id': normalized_tvg_id, 'original_tvg_id': epg.tvg_id, 'name': epg.name, 'norm_name': normalize_name(epg.name), 'epg_source_id': epg.epg_source.id if epg.epg_source else None, 'epg_source_priority': epg.epg_source.priority if epg.epg_source else 0, }) # Sort EPG data by source priority (highest first) so we prefer higher priority matches epg_data.sort(key=lambda x: x['epg_source_priority'], reverse=True) logger.info(f"Processing {len(channels_data)} selected channels against {len(epg_data)} EPG entries (from active sources only)") # Run EPG matching with progress updates - automatically uses appropriate thresholds result = match_channels_to_epg(channels_data, epg_data, region_code, use_ml=True, send_progress=True) channels_to_update_dicts = result["channels_to_update"] matched_channels = result["matched_channels"] # Update channels in database if channels_to_update_dicts: channel_ids_to_update = [d["id"] for d in channels_to_update_dicts] channels_qs = Channel.objects.filter(id__in=channel_ids_to_update) channels_list = list(channels_qs) # Create mapping from channel_id to epg_data_id epg_mapping = {d["id"]: d["epg_data_id"] for d in channels_to_update_dicts} # Update each channel with matched EPG data for channel_obj in channels_list: epg_data_id = epg_mapping.get(channel_obj.id) if epg_data_id: try: epg_data_obj = EPGData.objects.get(id=epg_data_id) channel_obj.epg_data = epg_data_obj except EPGData.DoesNotExist: logger.error(f"EPG data {epg_data_id} not found for channel {channel_obj.id}") # Bulk update all channels Channel.objects.bulk_update(channels_list, ["epg_data"]) total_matched = len(matched_channels) if total_matched: logger.info(f"Selected Channel Match Summary: {total_matched} channel(s) matched.") for (cid, cname, tvg) in matched_channels: logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'") else: logger.info("No selected channels were matched.") logger.info("Finished integrated EPG matching for selected channels.") # Send WebSocket update channel_layer = get_channel_layer() associations = [ {"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]} for chan in channels_to_update_dicts ] async_to_sync(channel_layer.group_send)( 'updates', { 'type': 'update', "data": { "success": True, "type": "epg_match", "refresh_channels": True, "matches_count": total_matched, "message": f"EPG matching complete: {total_matched} selected channel(s) matched", "associations": associations } } ) return f"Done. Matched {total_matched} selected channel(s)." finally: # Clean up ML models from memory after bulk matching if _ml_model_cache['sentence_transformer'] is not None: logger.info("Cleaning up ML models from memory") _ml_model_cache['sentence_transformer'] = None # Memory cleanup gc.collect() from core.utils import cleanup_memory cleanup_memory(log_usage=True, force_collection=True) @shared_task def match_single_channel_epg(channel_id): """ Try to match a single channel with EPG data using the integrated matching logic that includes both fuzzy and ML-enhanced matching. Returns a dict with match status and message. """ try: from apps.channels.models import Channel from apps.epg.models import EPGData logger.info(f"Starting integrated single channel EPG matching for channel ID {channel_id}") # Get the channel try: channel = Channel.objects.get(id=channel_id) except Channel.DoesNotExist: return {"matched": False, "message": "Channel not found"} # If channel already has EPG data, skip if channel.epg_data: return {"matched": False, "message": f"Channel '{channel.name}' already has EPG data assigned"} # Prepare single channel data for matching (same format as bulk matching) normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else "" normalized_gracenote_id = channel.tvc_guide_stationid.strip().lower() if channel.tvc_guide_stationid else "" channel_data = { "id": channel.id, "name": channel.name, "tvg_id": normalized_tvg_id, "original_tvg_id": channel.tvg_id, "gracenote_id": normalized_gracenote_id, "original_gracenote_id": channel.tvc_guide_stationid, "fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name, "norm_chan": normalize_name(channel.name) # Always use channel name for fuzzy matching! } logger.info(f"Channel data prepared: name='{channel.name}', tvg_id='{normalized_tvg_id}', gracenote_id='{normalized_gracenote_id}', norm_chan='{channel_data['norm_chan']}'") # Debug: Test what the normalization does to preserve call signs test_name = "NBC 11 (KVLY) - Fargo" # Example for testing test_normalized = normalize_name(test_name) logger.debug(f"DEBUG normalization example: '{test_name}' → '{test_normalized}' (call sign preserved)") # Get all EPG data for matching from active sources - must include norm_name field # Ordered by source priority (highest first) so we prefer higher priority matches epg_data_list = [] for epg in EPGData.objects.select_related('epg_source').filter(epg_source__is_active=True, name__isnull=False).exclude(name=''): normalized_epg_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else "" epg_data_list.append({ 'id': epg.id, 'tvg_id': normalized_epg_tvg_id, 'original_tvg_id': epg.tvg_id, 'name': epg.name, 'norm_name': normalize_name(epg.name), 'epg_source_id': epg.epg_source.id if epg.epg_source else None, 'epg_source_priority': epg.epg_source.priority if epg.epg_source else 0, }) # Sort EPG data by source priority (highest first) so we prefer higher priority matches epg_data_list.sort(key=lambda x: x['epg_source_priority'], reverse=True) if not epg_data_list: return {"matched": False, "message": "No EPG data available for matching (from active sources)"} logger.info(f"Matching single channel '{channel.name}' against {len(epg_data_list)} EPG entries") # Send progress for single channel matching send_epg_matching_progress(1, 0, current_channel_name=channel.name, stage="matching") # Use the EPG matching function - automatically uses aggressive thresholds for single channel result = match_channels_to_epg([channel_data], epg_data_list, send_progress=False) channels_to_update = result.get("channels_to_update", []) matched_channels = result.get("matched_channels", []) if channels_to_update: # Find our channel in the results channel_match = None for update in channels_to_update: if update["id"] == channel.id: channel_match = update break if channel_match: # Apply the match to the channel try: epg_data = EPGData.objects.get(id=channel_match['epg_data_id']) channel.epg_data = epg_data channel.save(update_fields=["epg_data"]) # Find match details from matched_channels for better reporting match_details = None for match_info in matched_channels: if match_info[0] == channel.id: # matched_channels format: (channel_id, channel_name, epg_info) match_details = match_info break success_msg = f"Channel '{channel.name}' matched with EPG '{epg_data.name}'" if match_details: success_msg += f" (matched via: {match_details[2]})" logger.info(success_msg) # Send completion progress for single channel send_epg_matching_progress(1, 1, current_channel_name=channel.name, stage="completed") # Clean up ML models from memory after single channel matching if _ml_model_cache['sentence_transformer'] is not None: logger.info("Cleaning up ML models from memory") _ml_model_cache['sentence_transformer'] = None gc.collect() return { "matched": True, "message": success_msg, "epg_name": epg_data.name, "epg_id": epg_data.id } except EPGData.DoesNotExist: return {"matched": False, "message": "Matched EPG data not found"} # No match found # Send completion progress for single channel (failed) send_epg_matching_progress(1, 0, current_channel_name=channel.name, stage="completed") # Clean up ML models from memory after single channel matching if _ml_model_cache['sentence_transformer'] is not None: logger.info("Cleaning up ML models from memory") _ml_model_cache['sentence_transformer'] = None gc.collect() return { "matched": False, "message": f"No suitable EPG match found for channel '{channel.name}'" } except Exception as e: logger.error(f"Error in integrated single channel EPG matching: {e}", exc_info=True) # Clean up ML models from memory even on error if _ml_model_cache['sentence_transformer'] is not None: logger.info("Cleaning up ML models from memory after error") _ml_model_cache['sentence_transformer'] = None gc.collect() return {"matched": False, "message": f"Error during matching: {str(e)}"} def evaluate_series_rules_impl(tvg_id: str | None = None): """Synchronous implementation of series rule evaluation; returns details for debugging.""" from django.utils import timezone from apps.channels.models import Recording, Channel from apps.epg.models import EPGData, ProgramData rules = CoreSettings.get_dvr_series_rules() result = {"scheduled": 0, "details": []} if not isinstance(rules, list) or not rules: return result # Optionally filter for tvg_id if tvg_id: rules = [r for r in rules if str(r.get("tvg_id")) == str(tvg_id)] if not rules: result["details"].append({"tvg_id": tvg_id, "status": "no_rule"}) return result now = timezone.now() horizon = now + timedelta(days=7) # Preload existing recordings' program ids to avoid duplicates existing_program_ids = set() for rec in Recording.objects.all().only("custom_properties"): try: pid = rec.custom_properties.get("program", {}).get("id") if rec.custom_properties else None if pid is not None: # Normalize to string for consistent comparisons existing_program_ids.add(str(pid)) except Exception: continue for rule in rules: rv_tvg = str(rule.get("tvg_id") or "").strip() mode = (rule.get("mode") or "all").lower() series_title = (rule.get("title") or "").strip() norm_series = normalize_name(series_title) if series_title else None if not rv_tvg: result["details"].append({"tvg_id": rv_tvg, "status": "invalid_rule"}) continue epg = EPGData.objects.filter(tvg_id=rv_tvg).first() if not epg: result["details"].append({"tvg_id": rv_tvg, "status": "no_epg_match"}) continue programs_qs = ProgramData.objects.filter( epg=epg, start_time__gte=now, start_time__lte=horizon, ) if series_title: programs_qs = programs_qs.filter(title__iexact=series_title) programs = list(programs_qs.order_by("start_time")) # Fallback: if no direct matches and we have a title, try normalized comparison in Python if series_title and not programs: all_progs = ProgramData.objects.filter( epg=epg, start_time__gte=now, start_time__lte=horizon, ).only("id", "title", "start_time", "end_time", "custom_properties", "tvg_id") programs = [p for p in all_progs if normalize_name(p.title) == norm_series] channel = Channel.objects.filter(epg_data=epg).order_by("channel_number").first() if not channel: result["details"].append({"tvg_id": rv_tvg, "status": "no_channel_for_epg"}) continue # # Many providers list multiple future airings of the same episode # (e.g., prime-time and a late-night repeat). Previously we scheduled # a recording for each airing which shows up as duplicates in the DVR. # # To avoid that, we collapse programs to the earliest airing per # unique episode using the best identifier available: # - season+episode from ProgramData.custom_properties # - onscreen_episode (e.g., S08E03) # - sub_title (episode name), scoped by tvg_id+series title # If none of the above exist, we fall back to keeping each program # (usually movies or specials without episode identifiers). # def _episode_key(p: "ProgramData"): try: props = p.custom_properties or {} season = props.get("season") episode = props.get("episode") onscreen = props.get("onscreen_episode") except Exception: season = episode = onscreen = None base = f"{p.tvg_id or ''}|{(p.title or '').strip().lower()}" # series scope if season is not None and episode is not None: return f"{base}|s{season}e{episode}" if onscreen: return f"{base}|{str(onscreen).strip().lower()}" if p.sub_title: return f"{base}|{p.sub_title.strip().lower()}" # No reliable episode identity; use the program id to avoid over-merging return f"id:{p.id}" # Optionally filter to only brand-new episodes before grouping if mode == "new": filtered = [] for p in programs: try: if (p.custom_properties or {}).get("new"): filtered.append(p) except Exception: pass programs = filtered # Pick the earliest airing for each episode key earliest_by_key = {} for p in programs: k = _episode_key(p) cur = earliest_by_key.get(k) if cur is None or p.start_time < cur.start_time: earliest_by_key[k] = p unique_programs = list(earliest_by_key.values()) created_here = 0 for prog in unique_programs: try: # Skip if already scheduled by program id if str(prog.id) in existing_program_ids: continue # Extra guard: skip if a recording exists for the same channel + timeslot try: from django.db.models import Q if Recording.objects.filter( channel=channel, start_time=prog.start_time, end_time=prog.end_time, ).filter(Q(custom_properties__program__id=prog.id) | Q(custom_properties__program__title=prog.title)).exists(): continue except Exception: continue # already scheduled/recorded # Apply global DVR pre/post offsets (in minutes) try: pre_min = int(CoreSettings.get_dvr_pre_offset_minutes()) except Exception: pre_min = 0 try: post_min = int(CoreSettings.get_dvr_post_offset_minutes()) except Exception: post_min = 0 adj_start = prog.start_time adj_end = prog.end_time try: if pre_min and pre_min > 0: adj_start = adj_start - timedelta(minutes=pre_min) except Exception: pass try: if post_min and post_min > 0: adj_end = adj_end + timedelta(minutes=post_min) except Exception: pass rec = Recording.objects.create( channel=channel, start_time=adj_start, end_time=adj_end, custom_properties={ "program": { "id": prog.id, "tvg_id": prog.tvg_id, "title": prog.title, "sub_title": prog.sub_title, "description": prog.description, "start_time": prog.start_time.isoformat(), "end_time": prog.end_time.isoformat(), } }, ) existing_program_ids.add(str(prog.id)) created_here += 1 try: prefetch_recording_artwork.apply_async(args=[rec.id], countdown=1) except Exception: pass except Exception as e: result["details"].append({"tvg_id": rv_tvg, "status": "error", "error": str(e)}) continue result["scheduled"] += created_here result["details"].append({"tvg_id": rv_tvg, "title": series_title, "status": "ok", "created": created_here}) # Notify frontend to refresh try: channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( 'updates', {'type': 'update', 'data': {"success": True, "type": "recordings_refreshed", "scheduled": result["scheduled"]}}, ) except Exception: pass return result @shared_task def evaluate_series_rules(tvg_id: str | None = None): return evaluate_series_rules_impl(tvg_id) def reschedule_upcoming_recordings_for_offset_change_impl(): """Recalculate start/end for all future EPG-based recordings using current DVR offsets. Only recordings that have not yet started (start_time > now) and that were scheduled from EPG data (custom_properties.program present) are updated. """ from django.utils import timezone from django.utils.dateparse import parse_datetime from apps.channels.models import Recording now = timezone.now() try: pre_min = int(CoreSettings.get_dvr_pre_offset_minutes()) except Exception: pre_min = 0 try: post_min = int(CoreSettings.get_dvr_post_offset_minutes()) except Exception: post_min = 0 changed = 0 scanned = 0 for rec in Recording.objects.filter(start_time__gt=now).iterator(): scanned += 1 try: cp = rec.custom_properties or {} program = cp.get("program") if isinstance(cp, dict) else None if not isinstance(program, dict): continue base_start = program.get("start_time") base_end = program.get("end_time") if not base_start or not base_end: continue start_dt = parse_datetime(str(base_start)) end_dt = parse_datetime(str(base_end)) if start_dt is None or end_dt is None: continue adj_start = start_dt adj_end = end_dt try: if pre_min and pre_min > 0: adj_start = adj_start - timedelta(minutes=pre_min) except Exception: pass try: if post_min and post_min > 0: adj_end = adj_end + timedelta(minutes=post_min) except Exception: pass if rec.start_time != adj_start or rec.end_time != adj_end: rec.start_time = adj_start rec.end_time = adj_end rec.save(update_fields=["start_time", "end_time"]) changed += 1 except Exception: continue # Notify frontend to refresh try: channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( 'updates', {'type': 'update', 'data': {"success": True, "type": "recordings_refreshed", "rescheduled": changed}}, ) except Exception: pass return {"changed": changed, "scanned": scanned, "pre": pre_min, "post": post_min} @shared_task def reschedule_upcoming_recordings_for_offset_change(): return reschedule_upcoming_recordings_for_offset_change_impl() def _notify_recordings_refresh(): try: from core.utils import send_websocket_update send_websocket_update('updates', 'update', {"success": True, "type": "recordings_refreshed"}) except Exception: pass def purge_recurring_rule_impl(rule_id: int) -> int: """Remove all future recordings created by a recurring rule.""" from django.utils import timezone from .models import Recording now = timezone.now() try: removed, _ = Recording.objects.filter( start_time__gte=now, custom_properties__rule__id=rule_id, ).delete() except Exception: removed = 0 if removed: _notify_recordings_refresh() return removed def sync_recurring_rule_impl(rule_id: int, drop_existing: bool = True, horizon_days: int = 14) -> int: """Ensure recordings exist for a recurring rule within the scheduling horizon.""" from django.utils import timezone from .models import RecurringRecordingRule, Recording rule = RecurringRecordingRule.objects.filter(pk=rule_id).select_related("channel").first() now = timezone.now() removed = 0 if drop_existing: removed = purge_recurring_rule_impl(rule_id) if not rule or not rule.enabled: return 0 days = rule.cleaned_days() if not days: return 0 tz_name = CoreSettings.get_system_time_zone() try: tz = ZoneInfo(tz_name) except Exception: logger.warning("Invalid or unsupported time zone '%s'; falling back to Server default", tz_name) tz = timezone.get_current_timezone() start_limit = rule.start_date or now.date() end_limit = rule.end_date horizon = now + timedelta(days=horizon_days) start_window = max(start_limit, now.date()) if drop_existing and end_limit: end_window = end_limit else: end_window = horizon.date() if end_limit and end_limit < end_window: end_window = end_limit if end_window < start_window: return 0 total_created = 0 for offset in range((end_window - start_window).days + 1): target_date = start_window + timedelta(days=offset) if target_date.weekday() not in days: continue if end_limit and target_date > end_limit: continue try: start_dt = timezone.make_aware(datetime.combine(target_date, rule.start_time), tz) end_dt = timezone.make_aware(datetime.combine(target_date, rule.end_time), tz) except Exception: continue if end_dt <= start_dt: end_dt = end_dt + timedelta(days=1) if start_dt <= now: continue exists = Recording.objects.filter( channel=rule.channel, start_time=start_dt, custom_properties__rule__id=rule.id, ).exists() if exists: continue description = rule.name or f"Recurring recording for {rule.channel.name}" cp = { "rule": { "type": "recurring", "id": rule.id, "days_of_week": days, "name": rule.name or "", }, "status": "scheduled", "description": description, "program": { "title": rule.name or rule.channel.name, "description": description, "start_time": start_dt.isoformat(), "end_time": end_dt.isoformat(), }, } try: Recording.objects.create( channel=rule.channel, start_time=start_dt, end_time=end_dt, custom_properties=cp, ) total_created += 1 except Exception as err: logger.warning(f"Failed to create recurring recording for rule {rule.id}: {err}") if removed or total_created: _notify_recordings_refresh() return total_created @shared_task def rebuild_recurring_rule(rule_id: int, horizon_days: int = 14): return sync_recurring_rule_impl(rule_id, drop_existing=True, horizon_days=horizon_days) @shared_task def maintain_recurring_recordings(): from .models import RecurringRecordingRule total = 0 for rule_id in RecurringRecordingRule.objects.filter(enabled=True).values_list("id", flat=True): try: total += sync_recurring_rule_impl(rule_id, drop_existing=False) except Exception as err: logger.warning(f"Recurring rule maintenance failed for {rule_id}: {err}") return total @shared_task def purge_recurring_rule(rule_id: int): return purge_recurring_rule_impl(rule_id) @shared_task def _safe_name(s): try: import re s = s or "" # Remove forbidden filename characters and normalize spaces s = re.sub(r'[\\/:*?"<>|]+', '', s) s = s.strip() return s except Exception: return s or "" def _parse_epg_tv_movie_info(program): """Return tuple (is_movie, season, episode, year, sub_title) from EPG ProgramData if available.""" is_movie = False season = None episode = None year = None sub_title = program.get('sub_title') if isinstance(program, dict) else None try: from apps.epg.models import ProgramData prog_id = program.get('id') if isinstance(program, dict) else None epg_program = ProgramData.objects.filter(id=prog_id).only('custom_properties').first() if prog_id else None if epg_program and epg_program.custom_properties: cp = epg_program.custom_properties # Determine categories cats = [c.lower() for c in (cp.get('categories') or []) if isinstance(c, str)] is_movie = 'movie' in cats or 'film' in cats season = cp.get('season') episode = cp.get('episode') onscreen = cp.get('onscreen_episode') if (season is None or episode is None) and isinstance(onscreen, str): import re as _re m = _re.search(r'[sS](\d+)[eE](\d+)', onscreen) if m: season = season or int(m.group(1)) episode = episode or int(m.group(2)) d = cp.get('date') if d: year = str(d)[:4] except Exception: pass return is_movie, season, episode, year, sub_title def _build_output_paths(channel, program, start_time, end_time): """ Build (final_path, temp_ts_path, final_filename) using DVR templates. """ from core.models import CoreSettings # Root for DVR recordings: fixed to /data/recordings inside the container library_root = '/data/recordings' is_movie, season, episode, year, sub_title = _parse_epg_tv_movie_info(program) show = _safe_name(program.get('title') if isinstance(program, dict) else channel.name) title = _safe_name(program.get('title') if isinstance(program, dict) else channel.name) sub_title = _safe_name(sub_title) season = int(season) if season is not None else 0 episode = int(episode) if episode is not None else 0 year = year or str(start_time.year) values = { 'show': show, 'title': title, 'sub_title': sub_title, 'season': season, 'episode': episode, 'year': year, 'channel': _safe_name(channel.name), 'start': start_time.strftime('%Y%m%d_%H%M%S'), 'end': end_time.strftime('%Y%m%d_%H%M%S'), } template = CoreSettings.get_dvr_movie_template() if is_movie else CoreSettings.get_dvr_tv_template() # Build relative path from templates with smart fallbacks rel_path = None if not is_movie and (season == 0 or episode == 0): # TV fallback template when S/E are missing try: tv_fb = CoreSettings.get_dvr_tv_fallback_template() rel_path = tv_fb.format(**values) except Exception: # Older setting support try: fallback_root = CoreSettings.get_dvr_tv_fallback_dir() except Exception: fallback_root = "TV_Shows" rel_path = f"{fallback_root}/{show}/{values['start']}.mkv" if not rel_path: try: rel_path = template.format(**values) except Exception: rel_path = None # Movie-specific fallback if formatting failed or title missing if is_movie and not rel_path: try: m_fb = CoreSettings.get_dvr_movie_fallback_template() rel_path = m_fb.format(**values) except Exception: rel_path = f"Movies/{values['start']}.mkv" # As a last resort for TV if not is_movie and not rel_path: rel_path = f"TV_Shows/{show}/S{season:02d}E{episode:02d}.mkv" # Keep any leading folder like 'Recordings/' from the template so users can # structure their library under /data as desired. if not rel_path.lower().endswith('.mkv'): rel_path = f"{rel_path}.mkv" # Normalize path (strip ./) if rel_path.startswith('./'): rel_path = rel_path[2:] final_path = rel_path if rel_path.startswith('/') else os.path.join(library_root, rel_path) final_path = os.path.normpath(final_path) # Ensure directory exists os.makedirs(os.path.dirname(final_path), exist_ok=True) # Derive temp TS path in same directory base_no_ext = os.path.splitext(os.path.basename(final_path))[0] temp_ts_path = os.path.join(os.path.dirname(final_path), f"{base_no_ext}.ts") return final_path, temp_ts_path, os.path.basename(final_path) @shared_task def run_recording(recording_id, channel_id, start_time_str, end_time_str): """ Execute a scheduled recording for the given channel/recording. Enhancements: - Accepts recording_id so we can persist metadata back to the Recording row - Persists basic file info (name/path) to Recording.custom_properties - Attempts to capture stream stats from TS proxy (codec, resolution, fps, etc.) - Attempts to capture a poster (via program.custom_properties) and store a Logo reference """ channel = Channel.objects.get(id=channel_id) start_time = datetime.fromisoformat(start_time_str) end_time = datetime.fromisoformat(end_time_str) duration_seconds = int((end_time - start_time).total_seconds()) # Build output paths from templates # We need program info; will refine after we load Recording cp below filename = None final_path = None temp_ts_path = None channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( "updates", { "type": "update", "data": {"success": True, "type": "recording_started", "channel": channel.name} }, ) logger.info(f"Starting recording for channel {channel.name}") # Log system event for recording start try: from core.utils import log_system_event log_system_event( 'recording_start', channel_id=channel.uuid, channel_name=channel.name, recording_id=recording_id ) except Exception as e: logger.error(f"Could not log recording start event: {e}") # Try to resolve the Recording row up front recording_obj = None try: from .models import Recording, Logo recording_obj = Recording.objects.get(id=recording_id) # Prime custom_properties with file info/status cp = recording_obj.custom_properties or {} cp.update({ "status": "recording", "started_at": str(datetime.now()), }) # Provide a predictable playback URL for the frontend cp["file_url"] = f"/api/channels/recordings/{recording_id}/file/" cp["output_file_url"] = cp["file_url"] # Determine program info (may include id for deeper details) program = cp.get("program") or {} final_path, temp_ts_path, filename = _build_output_paths(channel, program, start_time, end_time) cp["file_name"] = filename cp["file_path"] = final_path cp["_temp_file_path"] = temp_ts_path # Resolve poster the same way VODs do: # 1) Prefer image(s) from EPG Program custom_properties (images/icon) # 2) Otherwise reuse an existing VOD logo matching title (Movie/Series) # 3) Otherwise save any direct poster URL from provided program fields program = (cp.get("program") or {}) if isinstance(cp, dict) else {} def pick_best_image_from_epg_props(epg_props): try: images = epg_props.get("images") or [] if not isinstance(images, list): return None # Prefer poster/cover and larger sizes size_order = {"xxl": 6, "xl": 5, "l": 4, "m": 3, "s": 2, "xs": 1} def score(img): t = (img.get("type") or "").lower() size = (img.get("size") or "").lower() return ( 2 if t in ("poster", "cover") else 1, size_order.get(size, 0) ) best = None for im in images: if not isinstance(im, dict): continue url = im.get("url") if not url: continue if best is None or score(im) > score(best): best = im return best.get("url") if best else None except Exception: return None poster_logo_id = None poster_url = None # Try EPG Program custom_properties by ID try: from apps.epg.models import ProgramData prog_id = program.get("id") if prog_id: epg_program = ProgramData.objects.filter(id=prog_id).only("custom_properties").first() if epg_program and epg_program.custom_properties: epg_props = epg_program.custom_properties or {} poster_url = pick_best_image_from_epg_props(epg_props) if not poster_url: icon = epg_props.get("icon") if isinstance(icon, str) and icon: poster_url = icon except Exception as e: logger.debug(f"EPG image lookup failed: {e}") # Fallback: reuse VOD Logo by matching title if not poster_url and not poster_logo_id: try: from apps.vod.models import Movie, Series title = program.get("title") or channel.name vod_logo = None movie = Movie.objects.filter(name__iexact=title).select_related("logo").first() if movie and movie.logo: vod_logo = movie.logo if not vod_logo: series = Series.objects.filter(name__iexact=title).select_related("logo").first() if series and series.logo: vod_logo = series.logo if vod_logo: poster_logo_id = vod_logo.id except Exception as e: logger.debug(f"VOD logo fallback failed: {e}") # External metadata lookups (TMDB/OMDb) when EPG/VOD didn't provide an image if not poster_url and not poster_logo_id: try: tmdb_key = os.environ.get('TMDB_API_KEY') omdb_key = os.environ.get('OMDB_API_KEY') title = (program.get('title') or channel.name or '').strip() year = None imdb_id = None # Try to derive year and imdb from EPG program custom_properties try: from apps.epg.models import ProgramData prog_id = program.get('id') epg_program = ProgramData.objects.filter(id=prog_id).only('custom_properties').first() if prog_id else None if epg_program and epg_program.custom_properties: d = epg_program.custom_properties.get('date') if d and len(str(d)) >= 4: year = str(d)[:4] imdb_id = epg_program.custom_properties.get('imdb.com_id') or imdb_id except Exception: pass # TMDB: by IMDb ID if not poster_url and tmdb_key and imdb_id: try: url = f"https://api.themoviedb.org/3/find/{quote(imdb_id)}?api_key={tmdb_key}&external_source=imdb_id" resp = requests.get(url, timeout=5) if resp.ok: data = resp.json() or {} picks = [] for k in ('movie_results', 'tv_results', 'tv_episode_results', 'tv_season_results'): lst = data.get(k) or [] picks.extend(lst) poster_path = None for item in picks: if item.get('poster_path'): poster_path = item['poster_path'] break if poster_path: poster_url = f"https://image.tmdb.org/t/p/w780{poster_path}" except Exception: pass # TMDB: by title (and year if available) if not poster_url and tmdb_key and title: try: q = quote(title) extra = f"&year={year}" if year else "" url = f"https://api.themoviedb.org/3/search/multi?api_key={tmdb_key}&query={q}{extra}" resp = requests.get(url, timeout=5) if resp.ok: data = resp.json() or {} results = data.get('results') or [] results.sort(key=lambda x: float(x.get('popularity') or 0), reverse=True) for item in results: if item.get('poster_path'): poster_url = f"https://image.tmdb.org/t/p/w780{item['poster_path']}" break except Exception: pass # OMDb fallback if not poster_url and omdb_key: try: if imdb_id: url = f"https://www.omdbapi.com/?apikey={omdb_key}&i={quote(imdb_id)}" elif title: yy = f"&y={year}" if year else "" url = f"https://www.omdbapi.com/?apikey={omdb_key}&t={quote(title)}{yy}" else: url = None if url: resp = requests.get(url, timeout=5) if resp.ok: data = resp.json() or {} p = data.get('Poster') if p and p != 'N/A': poster_url = p except Exception: pass except Exception as e: logger.debug(f"External poster lookup failed: {e}") # Keyless fallback providers (no API keys required) if not poster_url and not poster_logo_id: try: title = (program.get('title') or channel.name or '').strip() if title: # 1) TVMaze (TV shows) - singlesearch by title try: url = f"https://api.tvmaze.com/singlesearch/shows?q={quote(title)}" resp = requests.get(url, timeout=5) if resp.ok: data = resp.json() or {} img = (data.get('image') or {}) p = img.get('original') or img.get('medium') if p: poster_url = p except Exception: pass # 2) iTunes Search API (movies or tv shows) if not poster_url: try: for media in ('movie', 'tvShow'): url = f"https://itunes.apple.com/search?term={quote(title)}&media={media}&limit=1" resp = requests.get(url, timeout=5) if resp.ok: data = resp.json() or {} results = data.get('results') or [] if results: art = results[0].get('artworkUrl100') if art: # Scale up to 600x600 by convention poster_url = art.replace('100x100', '600x600') break except Exception: pass except Exception as e: logger.debug(f"Keyless poster lookup failed: {e}") # Last: check direct fields on provided program object if not poster_url and not poster_logo_id: for key in ("poster", "cover", "cover_big", "image", "icon"): val = program.get(key) if isinstance(val, dict): candidate = val.get("url") if candidate: poster_url = candidate break elif isinstance(val, str) and val: poster_url = val break # Create or assign Logo if not poster_logo_id and poster_url and len(poster_url) <= 1000: try: logo, _ = Logo.objects.get_or_create(url=poster_url, defaults={"name": program.get("title") or channel.name}) poster_logo_id = logo.id except Exception as e: logger.debug(f"Unable to persist poster to Logo: {e}") if poster_logo_id: cp["poster_logo_id"] = poster_logo_id if poster_url and "poster_url" not in cp: cp["poster_url"] = poster_url # Ensure destination exists so it's visible immediately try: os.makedirs(os.path.dirname(final_path), exist_ok=True) if not os.path.exists(final_path): open(final_path, 'ab').close() except Exception: pass recording_obj.custom_properties = cp recording_obj.save(update_fields=["custom_properties"]) except Exception as e: logger.debug(f"Unable to prime Recording metadata: {e}") interrupted = False interrupted_reason = None bytes_written = 0 from requests.exceptions import ReadTimeout, ConnectionError as ReqConnectionError, ChunkedEncodingError # Determine internal base URL(s) for TS streaming # Prefer explicit override, then try common ports for debug and docker explicit = os.environ.get('DISPATCHARR_INTERNAL_TS_BASE_URL') is_dev = (os.environ.get('DISPATCHARR_ENV', '').lower() == 'dev') or \ (os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true') or \ (os.environ.get('REDIS_HOST', 'redis') in ('localhost', '127.0.0.1')) candidates = [] if explicit: candidates.append(explicit) if is_dev: # Debug container typically exposes API on 5656 candidates.extend(['http://127.0.0.1:5656', 'http://127.0.0.1:9191']) # Docker service name fallback candidates.append(os.environ.get('DISPATCHARR_INTERNAL_API_BASE', 'http://web:9191')) # Last-resort localhost ports candidates.extend(['http://localhost:5656', 'http://localhost:9191']) chosen_base = None last_error = None bytes_written = 0 interrupted = False interrupted_reason = None # We'll attempt each base until we receive some data for base in candidates: try: test_url = f"{base.rstrip('/')}/proxy/ts/stream/{channel.uuid}" logger.info(f"DVR: trying TS base {base} -> {test_url}") with requests.get( test_url, headers={ 'User-Agent': 'Dispatcharr-DVR', }, stream=True, timeout=(10, 15), ) as response: response.raise_for_status() # Open the file and start copying; if we get any data within a short window, accept this base got_any_data = False test_window = 3.0 # seconds to detect first bytes window_start = time.time() with open(temp_ts_path, 'wb') as file: started_at = time.time() for chunk in response.iter_content(chunk_size=8192): if not chunk: # keep-alives may be empty; continue if not got_any_data and (time.time() - window_start) > test_window: break continue # We have data got_any_data = True chosen_base = base # Fall through to full recording loop using this same response/connection file.write(chunk) bytes_written += len(chunk) elapsed = time.time() - started_at if elapsed > duration_seconds: break # Continue draining the stream for chunk2 in response.iter_content(chunk_size=8192): if not chunk2: continue file.write(chunk2) bytes_written += len(chunk2) elapsed = time.time() - started_at if elapsed > duration_seconds: break break # exit outer for-loop once we switched to full drain # If we wrote any bytes, treat as success and stop trying candidates if bytes_written > 0: logger.info(f"DVR: selected TS base {base}; wrote initial {bytes_written} bytes") break else: last_error = f"no_data_from_{base}" logger.warning(f"DVR: no data received from {base} within {test_window}s, trying next base") # Clean up empty temp file try: if os.path.exists(temp_ts_path) and os.path.getsize(temp_ts_path) == 0: os.remove(temp_ts_path) except Exception: pass except Exception as e: last_error = str(e) logger.warning(f"DVR: attempt failed for base {base}: {e}") if chosen_base is None and bytes_written == 0: interrupted = True interrupted_reason = f"no_stream_data: {last_error or 'all_bases_failed'}" else: # If we ended before reaching planned duration, record reason actual_elapsed = 0 try: actual_elapsed = os.path.getsize(temp_ts_path) and (duration_seconds) # Best effort; we streamed until duration or disconnect above except Exception: pass # We cannot compute accurate elapsed here; fine to leave as is pass # If no bytes were written at all, mark detail if bytes_written == 0 and not interrupted: interrupted = True interrupted_reason = f"no_stream_data: {last_error or 'unknown'}" # Update DB status immediately so the UI reflects the change on the event below try: if recording_obj is None: from .models import Recording recording_obj = Recording.objects.get(id=recording_id) cp_now = recording_obj.custom_properties or {} cp_now.update({ "status": "interrupted" if interrupted else "completed", "ended_at": str(datetime.now()), "file_name": filename or cp_now.get("file_name"), "file_path": final_path or cp_now.get("file_path"), }) if interrupted and interrupted_reason: cp_now["interrupted_reason"] = interrupted_reason recording_obj.custom_properties = cp_now recording_obj.save(update_fields=["custom_properties"]) except Exception as e: logger.debug(f"Failed to update immediate recording status: {e}") async_to_sync(channel_layer.group_send)( "updates", { "type": "update", "data": {"success": True, "type": "recording_ended", "channel": channel.name} }, ) # After the loop, the file and response are closed automatically. logger.info(f"Finished recording for channel {channel.name}") # Log system event for recording end try: from core.utils import log_system_event log_system_event( 'recording_end', channel_id=channel.uuid, channel_name=channel.name, recording_id=recording_id, interrupted=interrupted, bytes_written=bytes_written ) except Exception as e: logger.error(f"Could not log recording end event: {e}") # Remux TS to MKV container remux_success = False try: if temp_ts_path and os.path.exists(temp_ts_path): subprocess.run([ "ffmpeg", "-y", "-i", temp_ts_path, "-c", "copy", final_path ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) remux_success = os.path.exists(final_path) # Clean up temp file on success if remux_success: try: os.remove(temp_ts_path) except Exception: pass except Exception as e: logger.warning(f"MKV remux failed: {e}") # Persist final metadata to Recording (status, ended_at, and stream stats if available) try: if recording_obj is None: from .models import Recording recording_obj = Recording.objects.get(id=recording_id) cp = recording_obj.custom_properties or {} cp.update({ "ended_at": str(datetime.now()), }) if interrupted: cp["status"] = "interrupted" if interrupted_reason: cp["interrupted_reason"] = interrupted_reason else: cp["status"] = "completed" cp["bytes_written"] = bytes_written cp["remux_success"] = remux_success # Try to get stream stats from TS proxy Redis metadata try: from core.utils import RedisClient from apps.proxy.ts_proxy.redis_keys import RedisKeys from apps.proxy.ts_proxy.constants import ChannelMetadataField r = RedisClient.get_client() if r is not None: metadata_key = RedisKeys.channel_metadata(str(channel.uuid)) md = r.hgetall(metadata_key) if md: def _gv(bkey): return md.get(bkey.encode('utf-8')) def _d(bkey, cast=str): v = _gv(bkey) try: if v is None: return None s = v.decode('utf-8') return cast(s) if cast is not str else s except Exception: return None stream_info = {} # Video fields for key, caster in [ (ChannelMetadataField.VIDEO_CODEC, str), (ChannelMetadataField.RESOLUTION, str), (ChannelMetadataField.WIDTH, float), (ChannelMetadataField.HEIGHT, float), (ChannelMetadataField.SOURCE_FPS, float), (ChannelMetadataField.PIXEL_FORMAT, str), (ChannelMetadataField.VIDEO_BITRATE, float), ]: val = _d(key, caster) if val is not None: stream_info[key] = val # Audio fields for key, caster in [ (ChannelMetadataField.AUDIO_CODEC, str), (ChannelMetadataField.SAMPLE_RATE, float), (ChannelMetadataField.AUDIO_CHANNELS, str), (ChannelMetadataField.AUDIO_BITRATE, float), ]: val = _d(key, caster) if val is not None: stream_info[key] = val if stream_info: cp["stream_info"] = stream_info except Exception as e: logger.debug(f"Unable to capture stream stats for recording: {e}") # Removed: local thumbnail generation. We rely on EPG/VOD/TMDB/OMDb/keyless providers only. recording_obj.custom_properties = cp recording_obj.save(update_fields=["custom_properties"]) except Exception as e: logger.debug(f"Unable to finalize Recording metadata: {e}") # Optionally run comskip post-process try: from core.models import CoreSettings if CoreSettings.get_dvr_comskip_enabled(): comskip_process_recording.delay(recording_id) except Exception: pass @shared_task def recover_recordings_on_startup(): """ On service startup, reschedule or resume recordings to handle server restarts. - For recordings whose window includes 'now': mark interrupted and start a new recording for the remainder. - For future recordings: ensure a task is scheduled at start_time. Uses a Redis lock to ensure only one worker runs this recovery. """ try: from django.utils import timezone from .models import Recording from core.utils import RedisClient from .signals import schedule_recording_task redis = RedisClient.get_client() if redis: lock_key = "dvr:recover_lock" # Set lock with 60s TTL; only first winner proceeds if not redis.set(lock_key, "1", ex=60, nx=True): return "Recovery already in progress" now = timezone.now() # Resume in-window recordings active = Recording.objects.filter(start_time__lte=now, end_time__gt=now) for rec in active: try: cp = rec.custom_properties or {} # Mark interrupted due to restart; will flip to 'recording' when task starts cp["status"] = "interrupted" cp["interrupted_reason"] = "server_restarted" rec.custom_properties = cp rec.save(update_fields=["custom_properties"]) # Start recording for remaining window run_recording.apply_async( args=[rec.id, rec.channel_id, str(now), str(rec.end_time)], eta=now ) except Exception as e: logger.warning(f"Failed to resume recording {rec.id}: {e}") # Ensure future recordings are scheduled upcoming = Recording.objects.filter(start_time__gt=now, end_time__gt=now) for rec in upcoming: try: # Schedule task at start_time task_id = schedule_recording_task(rec) if task_id: rec.task_id = task_id rec.save(update_fields=["task_id"]) except Exception as e: logger.warning(f"Failed to schedule recording {rec.id}: {e}") return "Recovery complete" except Exception as e: logger.error(f"Error during DVR recovery: {e}") return f"Error: {e}" @shared_task def comskip_process_recording(recording_id: int): """Run comskip on the MKV to remove commercials and replace the file in place. Safe to call even if comskip is not installed; stores status in custom_properties.comskip. """ import shutil from django.db import DatabaseError from .models import Recording # Helper to broadcast status over websocket def _ws(status: str, extra: dict | None = None): try: from core.utils import send_websocket_update payload = {"success": True, "type": "comskip_status", "status": status, "recording_id": recording_id} if extra: payload.update(extra) send_websocket_update('updates', 'update', payload) except Exception: pass try: rec = Recording.objects.get(id=recording_id) except Recording.DoesNotExist: return "not_found" cp = rec.custom_properties.copy() if isinstance(rec.custom_properties, dict) else {} def _persist_custom_properties(): """Persist updated custom_properties without raising if the row disappeared.""" try: updated = Recording.objects.filter(pk=recording_id).update(custom_properties=cp) if not updated: logger.warning( "Recording %s vanished before comskip status could be saved", recording_id, ) return False except DatabaseError as db_err: logger.warning( "Failed to persist comskip status for recording %s: %s", recording_id, db_err, ) return False except Exception as unexpected: logger.warning( "Unexpected error while saving comskip status for recording %s: %s", recording_id, unexpected, ) return False return True file_path = (cp or {}).get("file_path") if not file_path or not os.path.exists(file_path): return "no_file" if isinstance(cp.get("comskip"), dict) and cp["comskip"].get("status") == "completed": return "already_processed" comskip_bin = shutil.which("comskip") if not comskip_bin: cp["comskip"] = {"status": "skipped", "reason": "comskip_not_installed"} _persist_custom_properties() _ws('skipped', {"reason": "comskip_not_installed"}) return "comskip_missing" base, _ = os.path.splitext(file_path) edl_path = f"{base}.edl" # Notify start _ws('started', {"title": (cp.get('program') or {}).get('title') or os.path.basename(file_path)}) try: cmd = [comskip_bin, "--output", os.path.dirname(file_path)] # Prefer user-specified INI, fall back to known defaults ini_candidates = [] try: custom_ini = CoreSettings.get_dvr_comskip_custom_path() if custom_ini: ini_candidates.append(custom_ini) except Exception as ini_err: logger.debug(f"Unable to load custom comskip.ini path: {ini_err}") ini_candidates.extend(["/etc/comskip/comskip.ini", "/app/docker/comskip.ini"]) selected_ini = None for ini_path in ini_candidates: if ini_path and os.path.exists(ini_path): selected_ini = ini_path cmd.extend([f"--ini={ini_path}"]) break cmd.append(file_path) subprocess.run( cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) except subprocess.CalledProcessError as e: stderr_tail = (e.stderr or "").strip().splitlines() stderr_tail = stderr_tail[-5:] if stderr_tail else [] detail = { "status": "error", "reason": "comskip_failed", "returncode": e.returncode, } if e.returncode and e.returncode < 0: try: detail["signal"] = signal.Signals(-e.returncode).name except Exception: detail["signal"] = f"signal_{-e.returncode}" if stderr_tail: detail["stderr"] = "\n".join(stderr_tail) if selected_ini: detail["ini_path"] = selected_ini cp["comskip"] = detail _persist_custom_properties() _ws('error', {"reason": "comskip_failed", "returncode": e.returncode}) return "comskip_failed" except Exception as e: cp["comskip"] = {"status": "error", "reason": f"comskip_failed: {e}"} _persist_custom_properties() _ws('error', {"reason": str(e)}) return "comskip_failed" if not os.path.exists(edl_path): cp["comskip"] = {"status": "error", "reason": "edl_not_found"} _persist_custom_properties() _ws('error', {"reason": "edl_not_found"}) return "no_edl" # Duration via ffprobe def _ffprobe_duration(path): try: p = subprocess.run([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) return float(p.stdout.strip()) except Exception: return None duration = _ffprobe_duration(file_path) if duration is None: cp["comskip"] = {"status": "error", "reason": "duration_unknown"} _persist_custom_properties() _ws('error', {"reason": "duration_unknown"}) return "no_duration" commercials = [] try: with open(edl_path, "r") as f: for line in f: parts = line.strip().split() if len(parts) >= 2: try: s = float(parts[0]); e = float(parts[1]) commercials.append((max(0.0, s), min(duration, e))) except Exception: pass except Exception: pass commercials.sort() keep = [] cur = 0.0 for s, e in commercials: if s > cur: keep.append((cur, max(cur, s))) cur = max(cur, e) if cur < duration: keep.append((cur, duration)) if not commercials or sum((e - s) for s, e in commercials) <= 0.5: cp["comskip"] = { "status": "completed", "skipped": True, "edl": os.path.basename(edl_path), } if selected_ini: cp["comskip"]["ini_path"] = selected_ini _persist_custom_properties() _ws('skipped', {"reason": "no_commercials", "commercials": 0}) return "no_commercials" workdir = os.path.dirname(file_path) parts = [] try: for idx, (s, e) in enumerate(keep): seg = os.path.join(workdir, f"segment_{idx:03d}.mkv") dur = max(0.0, e - s) if dur <= 0.01: continue subprocess.run([ "ffmpeg", "-y", "-ss", f"{s:.3f}", "-i", file_path, "-t", f"{dur:.3f}", "-c", "copy", "-avoid_negative_ts", "1", seg ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) parts.append(seg) if not parts: raise RuntimeError("no_parts") list_path = os.path.join(workdir, "concat_list.txt") with open(list_path, "w") as lf: for pth in parts: escaped = pth.replace("'", "'\\''") lf.write(f"file '{escaped}'\n") output_path = os.path.join(workdir, f"{os.path.splitext(os.path.basename(file_path))[0]}.cut.mkv") subprocess.run([ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_path, "-c", "copy", output_path ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: os.replace(output_path, file_path) except Exception: shutil.copy(output_path, file_path) try: os.remove(list_path) except Exception: pass for pth in parts: try: os.remove(pth) except Exception: pass cp["comskip"] = { "status": "completed", "edl": os.path.basename(edl_path), "segments_kept": len(parts), "commercials": len(commercials), } if selected_ini: cp["comskip"]["ini_path"] = selected_ini _persist_custom_properties() _ws('completed', {"commercials": len(commercials), "segments_kept": len(parts)}) return "ok" except Exception as e: cp["comskip"] = {"status": "error", "reason": str(e)} _persist_custom_properties() _ws('error', {"reason": str(e)}) return f"error:{e}" def _resolve_poster_for_program(channel_name, program): """Internal helper that attempts to resolve a poster URL and/or Logo id. Returns (poster_logo_id, poster_url) where either may be None. """ poster_logo_id = None poster_url = None # Try EPG Program images first try: from apps.epg.models import ProgramData prog_id = program.get("id") if isinstance(program, dict) else None if prog_id: epg_program = ProgramData.objects.filter(id=prog_id).only("custom_properties").first() if epg_program and epg_program.custom_properties: epg_props = epg_program.custom_properties or {} def pick_best_image_from_epg_props(epg_props): images = epg_props.get("images") or [] if not isinstance(images, list): return None size_order = {"xxl": 6, "xl": 5, "l": 4, "m": 3, "s": 2, "xs": 1} def score(img): t = (img.get("type") or "").lower() size = (img.get("size") or "").lower() return (2 if t in ("poster", "cover") else 1, size_order.get(size, 0)) best = None for im in images: if not isinstance(im, dict): continue url = im.get("url") if not url: continue if best is None or score(im) > score(best): best = im return best.get("url") if best else None poster_url = pick_best_image_from_epg_props(epg_props) if not poster_url: icon = epg_props.get("icon") if isinstance(icon, str) and icon: poster_url = icon except Exception: pass # VOD logo fallback by title if not poster_url and not poster_logo_id: try: from apps.vod.models import Movie, Series title = (program.get("title") if isinstance(program, dict) else None) or channel_name vod_logo = None movie = Movie.objects.filter(name__iexact=title).select_related("logo").first() if movie and movie.logo: vod_logo = movie.logo if not vod_logo: series = Series.objects.filter(name__iexact=title).select_related("logo").first() if series and series.logo: vod_logo = series.logo if vod_logo: poster_logo_id = vod_logo.id except Exception: pass # Keyless providers (TVMaze & iTunes) if not poster_url and not poster_logo_id: try: title = (program.get('title') if isinstance(program, dict) else None) or channel_name if title: # TVMaze try: url = f"https://api.tvmaze.com/singlesearch/shows?q={quote(title)}" resp = requests.get(url, timeout=5) if resp.ok: data = resp.json() or {} img = (data.get('image') or {}) p = img.get('original') or img.get('medium') if p: poster_url = p except Exception: pass # iTunes if not poster_url: try: for media in ('movie', 'tvShow'): url = f"https://itunes.apple.com/search?term={quote(title)}&media={media}&limit=1" resp = requests.get(url, timeout=5) if resp.ok: data = resp.json() or {} results = data.get('results') or [] if results: art = results[0].get('artworkUrl100') if art: poster_url = art.replace('100x100', '600x600') break except Exception: pass except Exception: pass # Fallback: search existing Logo entries by name if we still have nothing if not poster_logo_id and not poster_url: try: from .models import Logo title = (program.get("title") if isinstance(program, dict) else None) or channel_name existing = Logo.objects.filter(name__iexact=title).first() if existing: poster_logo_id = existing.id poster_url = existing.url except Exception: pass # Save to Logo if URL available if not poster_logo_id and poster_url and len(poster_url) <= 1000: try: from .models import Logo logo, _ = Logo.objects.get_or_create(url=poster_url, defaults={"name": (program.get("title") if isinstance(program, dict) else None) or channel_name}) poster_logo_id = logo.id except Exception: pass return poster_logo_id, poster_url @shared_task def prefetch_recording_artwork(recording_id): """Prefetch poster info for a scheduled recording so the UI can show art in Upcoming.""" try: from .models import Recording rec = Recording.objects.get(id=recording_id) cp = rec.custom_properties or {} program = cp.get("program") or {} poster_logo_id, poster_url = _resolve_poster_for_program(rec.channel.name, program) updated = False if poster_logo_id and cp.get("poster_logo_id") != poster_logo_id: cp["poster_logo_id"] = poster_logo_id updated = True if poster_url and cp.get("poster_url") != poster_url: cp["poster_url"] = poster_url updated = True # Enrich with rating if available from ProgramData.custom_properties try: from apps.epg.models import ProgramData prog_id = program.get("id") if isinstance(program, dict) else None if prog_id: epg_program = ProgramData.objects.filter(id=prog_id).only("custom_properties").first() if epg_program and isinstance(epg_program.custom_properties, dict): rating_val = epg_program.custom_properties.get("rating") rating_sys = epg_program.custom_properties.get("rating_system") season_val = epg_program.custom_properties.get("season") episode_val = epg_program.custom_properties.get("episode") onscreen = epg_program.custom_properties.get("onscreen_episode") if rating_val and cp.get("rating") != rating_val: cp["rating"] = rating_val updated = True if rating_sys and cp.get("rating_system") != rating_sys: cp["rating_system"] = rating_sys updated = True if season_val is not None and cp.get("season") != season_val: cp["season"] = season_val updated = True if episode_val is not None and cp.get("episode") != episode_val: cp["episode"] = episode_val updated = True if onscreen and cp.get("onscreen_episode") != onscreen: cp["onscreen_episode"] = onscreen updated = True except Exception: pass if updated: rec.custom_properties = cp rec.save(update_fields=["custom_properties"]) try: from core.utils import send_websocket_update send_websocket_update('updates', 'update', {"success": True, "type": "recording_updated", "recording_id": rec.id}) except Exception: pass return "ok" except Exception as e: logger.debug(f"prefetch_recording_artwork failed: {e}") return f"error: {e}" @shared_task(bind=True) def bulk_create_channels_from_streams(self, stream_ids, channel_profile_ids=None, starting_channel_number=None): """ Asynchronously create channels from a list of stream IDs. Provides progress updates via WebSocket. Args: stream_ids: List of stream IDs to create channels from channel_profile_ids: Optional list of channel profile IDs to assign channels to starting_channel_number: Optional starting channel number behavior: - None: Use provider channel numbers, then auto-assign from 1 - 0: Start with lowest available number and increment by 1 - Other number: Use as starting number for auto-assignment """ from apps.channels.models import Stream, Channel, ChannelGroup, ChannelProfile, ChannelProfileMembership, Logo from apps.epg.models import EPGData from django.db import transaction from django.shortcuts import get_object_or_404 from core.utils import send_websocket_update task_id = self.request.id total_streams = len(stream_ids) created_channels = [] errors = [] try: # Send initial progress update send_websocket_update('updates', 'update', { 'type': 'bulk_channel_creation_progress', 'task_id': task_id, 'progress': 0, 'total': total_streams, 'status': 'starting', 'message': f'Starting bulk creation of {total_streams} channels...' }) # Gather current used numbers once used_numbers = set(Channel.objects.all().values_list("channel_number", flat=True)) # Initialize next_number based on starting_channel_number mode if starting_channel_number is None: # Mode 1: Use provider numbers when available, auto-assign when not next_number = 1 elif starting_channel_number == 0: # Mode 2: Start from lowest available number next_number = 1 else: # Mode 3: Start from specified number next_number = starting_channel_number def get_auto_number(): nonlocal next_number while next_number in used_numbers: next_number += 1 used_numbers.add(next_number) return next_number logos_to_create = [] channels_to_create = [] streams_map = [] logo_map = [] profile_map = [] # Process streams in batches to avoid memory issues batch_size = 100 processed = 0 for i in range(0, total_streams, batch_size): batch_stream_ids = stream_ids[i:i + batch_size] # Fetch streams and preserve the order from batch_stream_ids batch_streams_dict = {stream.id: stream for stream in Stream.objects.filter(id__in=batch_stream_ids)} batch_streams = [batch_streams_dict[stream_id] for stream_id in batch_stream_ids if stream_id in batch_streams_dict] # Send progress update send_websocket_update('updates', 'update', { 'type': 'bulk_channel_creation_progress', 'task_id': task_id, 'progress': processed, 'total': total_streams, 'status': 'processing', 'message': f'Processing streams {processed + 1}-{min(processed + batch_size, total_streams)} of {total_streams}...' }) for stream in batch_streams: try: name = stream.name channel_group = stream.channel_group stream_custom_props = stream.custom_properties or {} # Determine channel number based on starting_channel_number mode channel_number = None if starting_channel_number is None: # Mode 1: Use provider numbers when available if "tvg-chno" in stream_custom_props: channel_number = float(stream_custom_props["tvg-chno"]) elif "channel-number" in stream_custom_props: channel_number = float(stream_custom_props["channel-number"]) elif "num" in stream_custom_props: channel_number = float(stream_custom_props["num"]) # For modes 2 and 3 (starting_channel_number == 0 or specific number), # ignore provider numbers and use sequential assignment # Get TVC guide station ID tvc_guide_stationid = None if "tvc-guide-stationid" in stream_custom_props: tvc_guide_stationid = stream_custom_props["tvc-guide-stationid"] # Check if the determined/provider number is available if channel_number is not None and ( channel_number in used_numbers or Channel.objects.filter(channel_number=channel_number).exists() ): # Provider number is taken, use auto-assignment channel_number = get_auto_number() elif channel_number is not None: # Provider number is available, use it used_numbers.add(channel_number) else: # No provider number or ignoring provider numbers, use auto-assignment channel_number = get_auto_number() channel_data = { "channel_number": channel_number, "name": name, "tvc_guide_stationid": tvc_guide_stationid, "tvg_id": stream.tvg_id, } # Only add channel_group_id if the stream has a channel group if channel_group: channel_data["channel_group_id"] = channel_group.id # Attempt to find existing EPGs with the same tvg-id epgs = EPGData.objects.filter(tvg_id=stream.tvg_id) if epgs: channel_data["epg_data_id"] = epgs.first().id channel = Channel(**channel_data) channels_to_create.append(channel) streams_map.append([stream.id]) # Store profile IDs for this channel profile_map.append(channel_profile_ids) # Handle logo - validate URL length to avoid PostgreSQL btree index errors validated_logo_url = validate_logo_url(stream.logo_url) if stream.logo_url else None if validated_logo_url: logos_to_create.append( Logo( url=validated_logo_url, name=stream.name or stream.tvg_id, ) ) logo_map.append(validated_logo_url) else: logo_map.append(None) processed += 1 except Exception as e: errors.append({ 'stream_id': stream.id if 'stream' in locals() else 'unknown', 'error': str(e) }) processed += 1 # Create logos first if logos_to_create: send_websocket_update('updates', 'update', { 'type': 'bulk_channel_creation_progress', 'task_id': task_id, 'progress': processed, 'total': total_streams, 'status': 'creating_logos', 'message': f'Creating {len(logos_to_create)} logos...' }) Logo.objects.bulk_create(logos_to_create, ignore_conflicts=True) # Get logo objects for association channel_logos = { logo.url: logo for logo in Logo.objects.filter( url__in=[url for url in logo_map if url is not None] ) } # Create channels in database if channels_to_create: send_websocket_update('updates', 'update', { 'type': 'bulk_channel_creation_progress', 'task_id': task_id, 'progress': processed, 'total': total_streams, 'status': 'creating_channels', 'message': f'Creating {len(channels_to_create)} channels in database...' }) with transaction.atomic(): created_channels = Channel.objects.bulk_create(channels_to_create) # Update channels with logos and create stream associations update = [] channel_stream_associations = [] channel_profile_memberships = [] for channel, stream_ids, logo_url, profile_ids in zip( created_channels, streams_map, logo_map, profile_map ): if logo_url: channel.logo = channel_logos[logo_url] update.append(channel) # Create stream associations for stream_id in stream_ids: from apps.channels.models import ChannelStream channel_stream_associations.append( ChannelStream(channel=channel, stream_id=stream_id, order=0) ) # Handle channel profile membership # Semantics: # - None: add to ALL profiles (backward compatible default) # - Empty array []: add to NO profiles # - Sentinel [0] or 0 in array: add to ALL profiles (explicit) # - [1,2,...]: add to specified profile IDs only if profile_ids is None: # Omitted -> add to all profiles (backward compatible) all_profiles = ChannelProfile.objects.all() channel_profile_memberships.extend([ ChannelProfileMembership( channel_profile=profile, channel=channel, enabled=True ) for profile in all_profiles ]) elif isinstance(profile_ids, list) and len(profile_ids) == 0: # Empty array -> add to no profiles pass elif isinstance(profile_ids, list) and 0 in profile_ids: # Sentinel 0 -> add to all profiles (explicit) all_profiles = ChannelProfile.objects.all() channel_profile_memberships.extend([ ChannelProfileMembership( channel_profile=profile, channel=channel, enabled=True ) for profile in all_profiles ]) else: # Specific profile IDs try: specific_profiles = ChannelProfile.objects.filter(id__in=profile_ids) channel_profile_memberships.extend([ ChannelProfileMembership( channel_profile=profile, channel=channel, enabled=True ) for profile in specific_profiles ]) except Exception as e: errors.append({ 'channel_id': channel.id, 'error': f'Failed to add to profiles: {str(e)}' }) # Bulk update channels with logos if update: Channel.objects.bulk_update(update, ["logo"]) # Bulk create channel-stream associations if channel_stream_associations: from apps.channels.models import ChannelStream ChannelStream.objects.bulk_create(channel_stream_associations, ignore_conflicts=True) # Bulk create profile memberships if channel_profile_memberships: ChannelProfileMembership.objects.bulk_create(channel_profile_memberships, ignore_conflicts=True) # Send completion update send_websocket_update('updates', 'update', { 'type': 'bulk_channel_creation_progress', 'task_id': task_id, 'progress': total_streams, 'total': total_streams, 'status': 'completed', 'message': f'Successfully created {len(created_channels)} channels', 'created_count': len(created_channels), 'error_count': len(errors), 'errors': errors[:10] # Send first 10 errors only }) # Send general channel update notification send_websocket_update('updates', 'update', { 'type': 'channels_created', 'count': len(created_channels) }) return { 'status': 'completed', 'created_count': len(created_channels), 'error_count': len(errors), 'errors': errors } except Exception as e: logger.error(f"Bulk channel creation failed: {e}") send_websocket_update('updates', 'update', { 'type': 'bulk_channel_creation_progress', 'task_id': task_id, 'progress': 0, 'total': total_streams, 'status': 'failed', 'message': f'Task failed: {str(e)}', 'error': str(e) }) raise @shared_task(bind=True) def set_channels_names_from_epg(self, channel_ids): """ Celery task to set channel names from EPG data for multiple channels """ from core.utils import send_websocket_update task_id = self.request.id total_channels = len(channel_ids) updated_count = 0 errors = [] try: logger.info(f"Starting EPG name setting task for {total_channels} channels") # Send initial progress send_websocket_update('updates', 'update', { 'type': 'epg_name_setting_progress', 'task_id': task_id, 'progress': 0, 'total': total_channels, 'status': 'running', 'message': 'Starting EPG name setting...' }) batch_size = 100 for i in range(0, total_channels, batch_size): batch_ids = channel_ids[i:i + batch_size] batch_updates = [] # Get channels and their EPG data channels = Channel.objects.filter(id__in=batch_ids).select_related('epg_data') for channel in channels: try: if channel.epg_data and channel.epg_data.name: if channel.name != channel.epg_data.name: channel.name = channel.epg_data.name batch_updates.append(channel) updated_count += 1 except Exception as e: errors.append(f"Channel {channel.id}: {str(e)}") logger.error(f"Error processing channel {channel.id}: {e}") # Bulk update the batch if batch_updates: Channel.objects.bulk_update(batch_updates, ['name']) # Send progress update progress = min(i + batch_size, total_channels) send_websocket_update('updates', 'update', { 'type': 'epg_name_setting_progress', 'task_id': task_id, 'progress': progress, 'total': total_channels, 'status': 'running', 'message': f'Updated {updated_count} channel names...', 'updated_count': updated_count }) # Send completion notification send_websocket_update('updates', 'update', { 'type': 'epg_name_setting_progress', 'task_id': task_id, 'progress': total_channels, 'total': total_channels, 'status': 'completed', 'message': f'Successfully updated {updated_count} channel names from EPG data', 'updated_count': updated_count, 'error_count': len(errors), 'errors': errors }) logger.info(f"EPG name setting task completed. Updated {updated_count} channels") return { 'status': 'completed', 'updated_count': updated_count, 'error_count': len(errors), 'errors': errors } except Exception as e: logger.error(f"EPG name setting task failed: {e}") send_websocket_update('updates', 'update', { 'type': 'epg_name_setting_progress', 'task_id': task_id, 'progress': 0, 'total': total_channels, 'status': 'failed', 'message': f'Task failed: {str(e)}', 'error': str(e) }) raise @shared_task(bind=True) def set_channels_logos_from_epg(self, channel_ids): """ Celery task to set channel logos from EPG data for multiple channels Creates logos from EPG icon URLs if they don't exist """ from .models import Logo from core.utils import send_websocket_update import requests from urllib.parse import urlparse task_id = self.request.id total_channels = len(channel_ids) updated_count = 0 created_logos_count = 0 errors = [] try: logger.info(f"Starting EPG logo setting task for {total_channels} channels") # Send initial progress send_websocket_update('updates', 'update', { 'type': 'epg_logo_setting_progress', 'task_id': task_id, 'progress': 0, 'total': total_channels, 'status': 'running', 'message': 'Starting EPG logo setting...' }) batch_size = 50 # Smaller batch for logo processing for i in range(0, total_channels, batch_size): batch_ids = channel_ids[i:i + batch_size] batch_updates = [] # Get channels and their EPG data channels = Channel.objects.filter(id__in=batch_ids).select_related('epg_data', 'logo') for channel in channels: try: if channel.epg_data and channel.epg_data.icon_url: icon_url = channel.epg_data.icon_url.strip() # Try to find existing logo with this URL try: logo = Logo.objects.get(url=icon_url) except Logo.DoesNotExist: # Create new logo from EPG icon URL try: # Generate a name for the logo logo_name = channel.epg_data.name or f"Logo for {channel.epg_data.tvg_id}" # Create the logo record logo = Logo.objects.create( name=logo_name, url=icon_url ) created_logos_count += 1 logger.info(f"Created new logo from EPG: {logo_name} - {icon_url}") except Exception as create_error: errors.append(f"Channel {channel.id}: Failed to create logo from {icon_url}: {str(create_error)}") logger.error(f"Failed to create logo for channel {channel.id}: {create_error}") continue # Update channel logo if different if channel.logo != logo: channel.logo = logo batch_updates.append(channel) updated_count += 1 except Exception as e: errors.append(f"Channel {channel.id}: {str(e)}") logger.error(f"Error processing channel {channel.id}: {e}") # Bulk update the batch if batch_updates: Channel.objects.bulk_update(batch_updates, ['logo']) # Send progress update progress = min(i + batch_size, total_channels) send_websocket_update('updates', 'update', { 'type': 'epg_logo_setting_progress', 'task_id': task_id, 'progress': progress, 'total': total_channels, 'status': 'running', 'message': f'Updated {updated_count} channel logos, created {created_logos_count} new logos...', 'updated_count': updated_count, 'created_logos_count': created_logos_count }) # Send completion notification send_websocket_update('updates', 'update', { 'type': 'epg_logo_setting_progress', 'task_id': task_id, 'progress': total_channels, 'total': total_channels, 'status': 'completed', 'message': f'Successfully updated {updated_count} channel logos and created {created_logos_count} new logos from EPG data', 'updated_count': updated_count, 'created_logos_count': created_logos_count, 'error_count': len(errors), 'errors': errors }) logger.info(f"EPG logo setting task completed. Updated {updated_count} channels, created {created_logos_count} logos") return { 'status': 'completed', 'updated_count': updated_count, 'created_logos_count': created_logos_count, 'error_count': len(errors), 'errors': errors } except Exception as e: logger.error(f"EPG logo setting task failed: {e}") send_websocket_update('updates', 'update', { 'type': 'epg_logo_setting_progress', 'task_id': task_id, 'progress': 0, 'total': total_channels, 'status': 'failed', 'message': f'Task failed: {str(e)}', 'error': str(e) }) raise @shared_task(bind=True) def set_channels_tvg_ids_from_epg(self, channel_ids): """ Celery task to set channel TVG-IDs from EPG data for multiple channels """ from core.utils import send_websocket_update task_id = self.request.id total_channels = len(channel_ids) updated_count = 0 errors = [] try: logger.info(f"Starting EPG TVG-ID setting task for {total_channels} channels") # Send initial progress send_websocket_update('updates', 'update', { 'type': 'epg_tvg_id_setting_progress', 'task_id': task_id, 'progress': 0, 'total': total_channels, 'status': 'running', 'message': 'Starting EPG TVG-ID setting...' }) batch_size = 100 for i in range(0, total_channels, batch_size): batch_ids = channel_ids[i:i + batch_size] batch_updates = [] # Get channels and their EPG data channels = Channel.objects.filter(id__in=batch_ids).select_related('epg_data') for channel in channels: try: if channel.epg_data and channel.epg_data.tvg_id: if channel.tvg_id != channel.epg_data.tvg_id: channel.tvg_id = channel.epg_data.tvg_id batch_updates.append(channel) updated_count += 1 except Exception as e: errors.append(f"Channel {channel.id}: {str(e)}") logger.error(f"Error processing channel {channel.id}: {e}") # Bulk update the batch if batch_updates: Channel.objects.bulk_update(batch_updates, ['tvg_id']) # Send progress update progress = min(i + batch_size, total_channels) send_websocket_update('updates', 'update', { 'type': 'epg_tvg_id_setting_progress', 'task_id': task_id, 'progress': progress, 'total': total_channels, 'status': 'running', 'message': f'Updated {updated_count} channel TVG-IDs...', 'updated_count': updated_count }) # Send completion notification send_websocket_update('updates', 'update', { 'type': 'epg_tvg_id_setting_progress', 'task_id': task_id, 'progress': total_channels, 'total': total_channels, 'status': 'completed', 'message': f'Successfully updated {updated_count} channel TVG-IDs from EPG data', 'updated_count': updated_count, 'error_count': len(errors), 'errors': errors }) logger.info(f"EPG TVG-ID setting task completed. Updated {updated_count} channels") return { 'status': 'completed', 'updated_count': updated_count, 'error_count': len(errors), 'errors': errors } except Exception as e: logger.error(f"EPG TVG-ID setting task failed: {e}") send_websocket_update('updates', 'update', { 'type': 'epg_tvg_id_setting_progress', 'task_id': task_id, 'progress': 0, 'total': total_channels, 'status': 'failed', 'message': f'Task failed: {str(e)}', 'error': str(e) }) raise