Dispatcharr/apps/media_library/metadata.py
Dispatcharr cd1ac0daea Updated Media Scanning
Replace delete alerts with confirmation modals and hide libraries immediately while deletion runs (Settings + Libraries).

Improve scan UX by handling unknown discovery totals and counting artwork only for shows/movies.

Tighten scan/metadata behavior with better cancellation handling, optional remote metadata, and stronger NFO parsing/episode matching.

Autosize the media detail modal scroll area for smoother viewing.
2026-01-03 21:03:11 -06:00

2111 lines
69 KiB
Python

import logging
import os
import re
import xml.etree.ElementTree as ET
from typing import Any, Dict, Optional, Tuple
import requests
from dateutil import parser as date_parser
from django.core.cache import cache
from django.utils import timezone
from core.models import CoreSettings
from apps.media_library.models import ArtworkAsset, MediaItem
from apps.media_library.utils import normalize_title
logger = logging.getLogger(__name__)
IMAGE_BASE_URL = "https://image.tmdb.org/t/p/original"
TMDB_API_BASE_URL = "https://api.themoviedb.org/3"
MOVIEDB_SEARCH_URL = "https://movie-db.org/api.php"
MOVIEDB_EPISODE_URL = "https://movie-db.org/episode.php"
MOVIEDB_CREDITS_URL = "https://movie-db.org/credits.php"
METADATA_CACHE_TIMEOUT = 60 * 60 * 6
MOVIEDB_HEALTH_CACHE_KEY = "movie-db:health"
MOVIEDB_HEALTH_SUCCESS_TTL = 60 * 15
MOVIEDB_HEALTH_FAILURE_TTL = 60 * 2
# Shared HTTP session + in-memory caches to reduce duplicate lookups.
_REQUESTS_SESSION = requests.Session()
_REQUESTS_SESSION.mount(
"https://",
requests.adapters.HTTPAdapter(pool_connections=20, pool_maxsize=40, max_retries=3),
)
_MOVIEDB_SEARCH_CACHE: dict[tuple[str, ...], tuple[dict | None, str | None]] = {}
_MOVIEDB_CREDITS_CACHE: dict[tuple[str, str], tuple[dict | None, str | None]] = {}
_TMDB_SEARCH_CACHE: dict[tuple[str, ...], tuple[dict | None, str | None]] = {}
_TMDB_DETAIL_CACHE: dict[tuple[str, str], tuple[dict | None, str | None]] = {}
# TMDB/Movie-DB genre IDs mapped to display values.
GENRE_ID_MAP: dict[int, str] = {
12: "Adventure",
14: "Fantasy",
16: "Animation",
18: "Drama",
27: "Horror",
28: "Action",
35: "Comedy",
36: "History",
37: "Western",
53: "Thriller",
80: "Crime",
99: "Documentary",
878: "Science Fiction",
9648: "Mystery",
10402: "Music",
10749: "Romance",
10751: "Family",
10752: "War",
10759: "Action & Adventure",
10762: "Kids",
10763: "News",
10764: "Reality",
10765: "Sci-Fi & Fantasy",
10766: "Soap",
10767: "Talk",
10768: "War & Politics",
10770: "TV Movie",
}
_YOUTUBE_ID_RE = re.compile(
r"(?:video_id=|v=|youtu\.be/|youtube\.com/embed/)([A-Za-z0-9_-]{6,})"
)
def _get_library_metadata_prefs(media_item: MediaItem) -> dict[str, Any]:
# Merge library language/region fields with optional metadata overrides.
prefs: dict[str, Any] = {"language": None, "region": None}
library = getattr(media_item, "library", None)
if not library:
return prefs
prefs["language"] = (library.metadata_language or "").strip() or None
prefs["region"] = (library.metadata_country or "").strip() or None
options = library.metadata_options or {}
if isinstance(options, dict):
if options.get("language"):
prefs["language"] = str(options["language"]).strip() or prefs["language"]
if options.get("region"):
prefs["region"] = str(options["region"]).strip() or prefs["region"]
return prefs
def _metadata_cache_key(
media_item: MediaItem,
prefs: dict[str, Any] | None = None,
*,
provider: str = "movie-db",
) -> str:
# Include provider + identifiers to avoid collisions across sources.
normalized = normalize_title(media_item.title) or media_item.normalized_title or ""
key_parts = [
provider,
str(media_item.item_type or ""),
normalized,
str(media_item.release_year or ""),
]
if media_item.movie_db_id:
key_parts.append(f"id:{media_item.movie_db_id}")
if media_item.item_type == MediaItem.TYPE_EPISODE:
season = media_item.season_number or ""
episode = media_item.episode_number or ""
key_parts.append(f"s{season}e{episode}")
parent = getattr(media_item, "parent", None)
if parent and parent.movie_db_id:
key_parts.append(f"parent:{parent.movie_db_id}")
if prefs:
language = (prefs.get("language") or "").strip()
region = (prefs.get("region") or "").strip()
if language or region:
key_parts.append(f"lang:{language}")
key_parts.append(f"region:{region}")
return "media-metadata:" + ":".join(key_parts)
def build_image_url(path: Optional[str]) -> Optional[str]:
if not path:
return None
return f"{IMAGE_BASE_URL}{path}"
def _to_serializable(obj):
# Force JSON-safe values so metadata payloads can be persisted.
if isinstance(obj, dict):
return {key: _to_serializable(value) for key, value in obj.items()}
if isinstance(obj, (list, tuple, set)):
return [_to_serializable(item) for item in obj]
if isinstance(obj, (str, int, float, bool)) or obj is None:
return obj
return str(obj)
def _get_tmdb_api_key() -> Optional[str]:
# Prefer stored setting, but allow env var for deployments.
key = CoreSettings.get_tmdb_api_key() or os.environ.get("TMDB_API_KEY")
if not key:
return None
key = str(key).strip()
return key or None
def _is_http_url(value: str | None) -> bool:
if not value:
return False
return value.startswith("http://") or value.startswith("https://")
def _safe_xml_text(node: ET.Element | None) -> Optional[str]:
if node is None:
return None
text = node.text or ""
text = text.strip()
return text or None
def _normalize_xml_tag(tag: Any) -> str:
if not isinstance(tag, str):
return ""
return tag.rsplit("}", 1)[-1].lower()
def _iter_nfo_nodes(root: ET.Element, *tags: str):
tag_set = {tag.lower() for tag in tags if tag}
for node in root.iter():
if _normalize_xml_tag(node.tag) in tag_set:
yield node
def _find_child_text(node: ET.Element, *tags: str) -> Optional[str]:
tag_set = {tag.lower() for tag in tags if tag}
for child in list(node):
if _normalize_xml_tag(child.tag) in tag_set:
value = _safe_xml_text(child)
if value:
return value
return None
def _parse_xml_int(text: str | None) -> Optional[int]:
if not text:
return None
try:
return int(float(text))
except (TypeError, ValueError):
return None
def _parse_xml_float(text: str | None) -> Optional[float]:
if not text:
return None
try:
return float(text)
except (TypeError, ValueError):
return None
def _normalize_numeric_rating(text: str | None) -> Optional[str]:
# MediaItem.rating expects numeric strings; skip non-numeric ratings.
if not text:
return None
value = str(text).strip()
if not value:
return None
try:
float(value)
except (TypeError, ValueError):
return None
return value
def check_movie_db_health(*, use_cache: bool = True) -> Tuple[bool, str | None]:
# Lightweight probe to decide whether Movie-DB is usable.
if use_cache:
cached = cache.get(MOVIEDB_HEALTH_CACHE_KEY)
if cached is not None:
return cached
try:
response = _REQUESTS_SESSION.get(
MOVIEDB_SEARCH_URL,
params={"type": "tv", "query": "The Simpsons"},
timeout=5,
)
except requests.RequestException as exc:
message = f"Movie-DB request failed: {exc}"
cache.set(MOVIEDB_HEALTH_CACHE_KEY, (False, message), MOVIEDB_HEALTH_FAILURE_TTL)
return False, message
if response.status_code != 200:
message = f"Movie-DB returned status {response.status_code}."
cache.set(MOVIEDB_HEALTH_CACHE_KEY, (False, message), MOVIEDB_HEALTH_FAILURE_TTL)
return False, message
try:
payload = response.json()
except ValueError:
message = "Movie-DB returned an unexpected response format."
cache.set(MOVIEDB_HEALTH_CACHE_KEY, (False, message), MOVIEDB_HEALTH_FAILURE_TTL)
return False, message
results = []
if isinstance(payload, dict):
results = payload.get("results") or []
elif isinstance(payload, list):
results = payload
if not results:
message = "Movie-DB did not return any results."
cache.set(MOVIEDB_HEALTH_CACHE_KEY, (False, message), MOVIEDB_HEALTH_FAILURE_TTL)
return False, message
cache.set(MOVIEDB_HEALTH_CACHE_KEY, (True, None), MOVIEDB_HEALTH_SUCCESS_TTL)
return True, None
def _parse_release_year(value: Optional[str]) -> Optional[int]:
if not value:
return None
try:
return date_parser.parse(value).year
except (ValueError, TypeError):
return None
def _nfo_rating(root: ET.Element) -> Optional[str]:
rating = _normalize_numeric_rating(_safe_xml_text(root.find("rating")))
if rating:
return rating
ratings = root.find("ratings")
if ratings is None:
return None
entries = ratings.findall("rating")
if not entries:
return None
default_entry = next(
(
entry
for entry in entries
if str(entry.get("default", "")).lower() in {"true", "1", "yes"}
),
None,
)
entry = default_entry or entries[0]
value = _normalize_numeric_rating(
_safe_xml_text(entry.find("value")) or _safe_xml_text(entry)
)
return value
def _nfo_list(root: ET.Element, tag: str) -> list[str]:
values: list[str] = []
for node in root.findall(tag):
value = _safe_xml_text(node)
if value:
values.append(value)
return values
def _nfo_cast(root: ET.Element) -> list[dict[str, Any]]:
cast: list[dict[str, Any]] = []
for actor in _iter_nfo_nodes(root, "actor"):
name = _find_child_text(actor, "name") or _safe_xml_text(actor)
if not name:
continue
cast.append(
{
"name": name,
"character": _find_child_text(actor, "role", "character"),
"profile_url": _find_child_text(actor, "thumb", "profile"),
}
)
return cast
def _nfo_crew(root: ET.Element) -> list[dict[str, Any]]:
crew: list[dict[str, Any]] = []
for director in _iter_nfo_nodes(root, "director"):
name = _find_child_text(director, "name") or _safe_xml_text(director)
if name:
crew.append({"name": name, "job": "Director", "department": "Directing"})
for writer in _iter_nfo_nodes(root, "credits", "writer"):
name = _find_child_text(writer, "name") or _safe_xml_text(writer)
if name:
crew.append({"name": name, "job": "Writer", "department": "Writing"})
return crew
def _nfo_art(root: ET.Element) -> tuple[Optional[str], Optional[str]]:
poster_url = None
backdrop_url = None
for thumb in root.findall("thumb"):
url = _safe_xml_text(thumb)
if not url:
continue
aspect = (thumb.get("aspect") or thumb.get("type") or "").lower()
if not poster_url and aspect in {"poster", "thumb"}:
poster_url = url
if not backdrop_url and aspect in {"fanart", "backdrop", "landscape"}:
backdrop_url = url
fanart = root.find("fanart")
if fanart is not None:
for thumb in fanart.findall("thumb"):
url = _safe_xml_text(thumb)
if url:
backdrop_url = backdrop_url or url
break
art = root.find("art")
if art is not None:
if not poster_url:
poster_url = _safe_xml_text(art.find("poster"))
if not backdrop_url:
backdrop_url = _safe_xml_text(art.find("fanart")) or _safe_xml_text(
art.find("backdrop")
)
if poster_url and not _is_http_url(poster_url):
poster_url = None
if backdrop_url and not _is_http_url(backdrop_url):
backdrop_url = None
return poster_url, backdrop_url
def _nfo_unique_ids(root: ET.Element) -> tuple[Optional[str], Optional[str]]:
imdb_id = None
tmdb_id = None
for node in root.findall("uniqueid"):
value = _safe_xml_text(node)
if not value:
continue
id_type = str(node.get("type", "")).lower()
if id_type == "imdb":
imdb_id = value
elif id_type in {"tmdb", "themoviedb"}:
tmdb_id = value
if not tmdb_id:
raw_id = _safe_xml_text(root.find("id"))
if raw_id and raw_id.isdigit():
tmdb_id = raw_id
return imdb_id, tmdb_id
def _nfo_runtime_minutes(root: ET.Element) -> Optional[int]:
runtime = _parse_xml_float(_safe_xml_text(root.find("runtime")))
if runtime:
return int(round(runtime))
duration = (
_safe_xml_text(root.find("./fileinfo/streamdetails/video/durationinseconds"))
or _safe_xml_text(root.find("./fileinfo/streamdetails/video/duration"))
)
seconds = _parse_xml_float(duration)
if seconds:
return int(round(seconds / 60))
return None
def _extract_youtube_id(value: str | None) -> Optional[str]:
if not value:
return None
text = str(value).strip()
if not text:
return None
match = _YOUTUBE_ID_RE.search(text)
if match:
return match.group(1)
if "://" not in text:
return text
return None
def _nfo_trailer(root: ET.Element) -> Optional[str]:
trailer = _safe_xml_text(root.find("trailer"))
if not trailer:
return None
return _extract_youtube_id(trailer) or trailer
_NFO_ROOT_TAGS = ("movie", "tvshow", "episodedetails", "episode")
def _extract_nfo_root_xml(payload: str) -> Optional[str]:
if not payload:
return None
match = re.search(
r"<(" + "|".join(_NFO_ROOT_TAGS) + r")\b[^>]*>",
payload,
flags=re.IGNORECASE,
)
if not match:
return None
tag = match.group(1).lower()
start = match.start()
payload_lower = payload.lower()
end_tag = f"</{tag}>"
end = payload_lower.rfind(end_tag)
if end == -1:
return None
end += len(end_tag)
return payload[start:end]
def _parse_nfo_root(nfo_path: str) -> tuple[Optional[ET.Element], Optional[str]]:
try:
tree = ET.parse(nfo_path)
except (ET.ParseError, OSError) as exc:
parse_error = exc
else:
root = tree.getroot()
if root is None:
return None, "NFO did not contain metadata."
return root, None
try:
with open(nfo_path, "r", encoding="utf-8-sig", errors="replace") as handle:
payload = handle.read()
except OSError as exc:
return None, f"Failed to parse NFO: {exc}"
extracted = _extract_nfo_root_xml(payload)
if not extracted:
return None, f"Failed to parse NFO: {parse_error}"
try:
root = ET.fromstring(extracted)
except ET.ParseError as exc:
return None, f"Failed to parse NFO: {exc}"
if root is None:
return None, "NFO did not contain metadata."
return root, None
def _find_library_base_path(file_path: str, library) -> Optional[str]:
# Identify which library location contains the file path.
if not file_path:
return None
normalized_file_path = os.path.normcase(
os.path.abspath(os.path.expanduser(file_path))
)
for location in library.locations.all():
base_path = os.path.normcase(
os.path.abspath(os.path.expanduser(location.path))
)
try:
if os.path.commonpath([base_path, normalized_file_path]) == base_path:
return base_path
except ValueError:
continue
return None
def _find_nfo_in_directory(
directory: str,
*,
preferred_names: list[str],
allow_single_fallback: bool = True,
) -> Optional[str]:
# Prefer named NFOs; optionally fallback to the only NFO when unambiguous.
try:
entries = [
entry
for entry in os.listdir(directory)
if entry.lower().endswith(".nfo")
]
except OSError:
return None
entries_by_lower = {entry.lower(): entry for entry in entries}
for name in preferred_names:
matched = entries_by_lower.get(name.lower())
if matched:
return os.path.join(directory, matched)
if allow_single_fallback and len(entries) == 1:
return os.path.join(directory, entries[0])
return None
def _iter_parent_dirs(start_dir: str, *, stop_dir: Optional[str] = None):
if not start_dir:
return
current = os.path.abspath(os.path.expanduser(start_dir))
stop = os.path.abspath(os.path.expanduser(stop_dir)) if stop_dir else None
stop_norm = os.path.normcase(stop) if stop else None
while True:
yield current
if stop_norm and os.path.normcase(current) == stop_norm:
break
parent = os.path.dirname(current)
if parent == current:
break
if stop:
try:
if os.path.commonpath([stop, parent]) != stop:
break
except ValueError:
break
current = parent
def _find_local_artwork_files(
directory: str | None,
) -> tuple[Optional[str], Optional[str]]:
# Prefer poster.jpg and fanart.jpg when present alongside NFO.
if not directory:
return None, None
try:
entries = [entry for entry in os.listdir(directory) if entry]
except OSError:
return None, None
entries_by_lower = {entry.lower(): entry for entry in entries}
poster_name = entries_by_lower.get("poster.jpg")
fanart_name = entries_by_lower.get("fanart.jpg")
poster_path = os.path.join(directory, poster_name) if poster_name else None
fanart_path = os.path.join(directory, fanart_name) if fanart_name else None
logger.debug(
"Local artwork files directory=%s poster=%s fanart=%s",
directory,
poster_path,
fanart_path,
)
return poster_path, fanart_path
def _local_artwork_url(media_item: MediaItem, asset_type: str) -> Optional[str]:
if not media_item.id:
return None
return f"/api/media-library/items/{media_item.id}/artwork/{asset_type}/"
def find_local_artwork_path(media_item: MediaItem, asset_type: str) -> Optional[str]:
# Resolve the local artwork path for API responses.
logger.debug(
"Artwork lookup start item_id=%s asset_type=%s",
media_item.id,
asset_type,
)
if asset_type not in {"poster", "backdrop"}:
logger.debug("Artwork lookup skipped: unsupported asset_type=%s", asset_type)
return None
nfo_path = _find_local_nfo_path(media_item)
directory = os.path.dirname(nfo_path) if nfo_path else None
if not directory:
file = media_item.files.filter(is_primary=True).first() or media_item.files.first()
if file and file.path:
directory = os.path.dirname(file.path)
logger.debug(
"Artwork lookup using media file directory=%s file_path=%s",
directory,
file.path,
)
else:
logger.debug("Artwork lookup failed: no media file path available")
if not directory:
return None
poster_path, fanart_path = _find_local_artwork_files(directory)
target_path = poster_path if asset_type == "poster" else fanart_path
if not target_path:
logger.debug(
"Artwork lookup missing target asset_type=%s directory=%s",
asset_type,
directory,
)
return None
base_path = _find_library_base_path(target_path, media_item.library)
if not base_path:
logger.debug(
"Artwork lookup failed: path not under library base path=%s library_id=%s",
target_path,
media_item.library_id,
)
return None
logger.debug(
"Artwork lookup success item_id=%s asset_type=%s path=%s base_path=%s",
media_item.id,
asset_type,
target_path,
base_path,
)
return target_path
def _find_local_nfo_path(media_item: MediaItem) -> Optional[str]:
# Locate the most likely NFO for movies, shows, or episodes.
files = list(media_item.files.all())
if not files and media_item.item_type == MediaItem.TYPE_SHOW:
episodes = list(
MediaItem.objects.filter(parent=media_item, item_type=MediaItem.TYPE_EPISODE)
.exclude(files__isnull=True)
.prefetch_related("files")[:5]
)
files = []
for episode in episodes:
files.extend(list(episode.files.all()))
for media_file in files:
file_path = media_file.path
if not file_path:
continue
directory = os.path.dirname(file_path)
base_name = os.path.splitext(os.path.basename(file_path))[0]
if media_item.item_type in {MediaItem.TYPE_MOVIE, MediaItem.TYPE_EPISODE}:
preferred = [f"{base_name}.nfo"]
if media_item.item_type == MediaItem.TYPE_MOVIE:
preferred.append("movie.nfo")
if media_item.item_type == MediaItem.TYPE_EPISODE:
preferred.append("episode.nfo")
candidate = _find_nfo_in_directory(
directory,
preferred_names=preferred,
allow_single_fallback=media_item.item_type == MediaItem.TYPE_MOVIE,
)
if candidate:
return candidate
continue
if media_item.item_type == MediaItem.TYPE_SHOW:
base_path = _find_library_base_path(file_path, media_item.library)
for root_dir in _iter_parent_dirs(directory, stop_dir=base_path):
candidate = _find_nfo_in_directory(
root_dir,
preferred_names=["tvshow.nfo"],
allow_single_fallback=False,
)
if candidate:
return candidate
return None
def fetch_local_nfo_metadata(
media_item: MediaItem,
) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
# Parse NFO XML and map to MediaItem metadata fields.
nfo_path = _find_local_nfo_path(media_item)
if not nfo_path:
return None, "No NFO file found."
root, error = _parse_nfo_root(nfo_path)
if error:
return None, error
imdb_id, tmdb_id = _nfo_unique_ids(root)
title = _safe_xml_text(root.find("title")) or _safe_xml_text(
root.find("originaltitle")
)
sort_title = _safe_xml_text(root.find("sorttitle"))
synopsis = _safe_xml_text(root.find("plot")) or _safe_xml_text(root.find("outline"))
tagline = _safe_xml_text(root.find("tagline"))
youtube_trailer = _nfo_trailer(root)
premiered = _safe_xml_text(root.find("premiered")) or _safe_xml_text(root.find("aired"))
release_year = _parse_xml_int(_safe_xml_text(root.find("year"))) or _parse_release_year(premiered)
runtime_minutes = _nfo_runtime_minutes(root)
genres = _nfo_list(root, "genre")
tags = _nfo_list(root, "tag")
studios = _nfo_list(root, "studio")
rating = _nfo_rating(root)
poster_url, backdrop_url = _nfo_art(root)
local_poster_path, local_backdrop_path = _find_local_artwork_files(
os.path.dirname(nfo_path)
)
if local_poster_path:
poster_url = _local_artwork_url(media_item, "poster") or poster_url
if local_backdrop_path:
backdrop_url = _local_artwork_url(media_item, "backdrop") or backdrop_url
cast = _nfo_cast(root)
crew = _nfo_crew(root)
if not any(
[
title,
synopsis,
tagline,
release_year,
runtime_minutes,
genres,
tags,
studios,
rating,
poster_url,
backdrop_url,
youtube_trailer,
imdb_id,
tmdb_id,
cast,
crew,
]
):
return None, "NFO did not contain usable metadata."
payload = _to_serializable(
{
"source": "nfo",
"title": title,
"sort_title": sort_title,
"synopsis": synopsis,
"tagline": tagline,
"youtube_trailer": youtube_trailer,
"release_year": release_year,
"runtime_minutes": runtime_minutes,
"genres": genres,
"tags": tags,
"studios": studios,
"rating": rating,
"poster": poster_url,
"backdrop": backdrop_url,
"imdb_id": imdb_id,
"tmdb_id": tmdb_id,
"raw": {
"path": nfo_path,
"title": title,
"sort_title": sort_title,
"synopsis": synopsis,
"tagline": tagline,
"youtube_trailer": youtube_trailer,
"release_year": release_year,
"runtime_minutes": runtime_minutes,
"genres": genres,
"tags": tags,
"studios": studios,
"rating": rating,
"poster": poster_url,
"backdrop": backdrop_url,
"imdb_id": imdb_id,
"tmdb_id": tmdb_id,
},
}
)
return payload, None
def _php_value_from_string(value: str):
lowered = value.lower()
if lowered == "true":
return True
if lowered == "false":
return False
if lowered in {"null", ""}:
return None
try:
if "." in value:
return float(value)
return int(value)
except ValueError:
return value
def _convert_numeric_container(value):
if isinstance(value, dict):
converted = {k: _convert_numeric_container(v) for k, v in value.items()}
numeric = True
indices: list[int] = []
for key in converted.keys():
try:
indices.append(int(key))
except (TypeError, ValueError):
numeric = False
break
if numeric:
return [converted[str(idx)] for idx in sorted(indices)]
return converted
if isinstance(value, list):
return [_convert_numeric_container(item) for item in value]
return value
def _parse_php_array_lines(lines: list[str], idx: int = 0):
def parse_body(current_idx: int):
result: dict[str, Any] = {}
while current_idx < len(lines):
token = lines[current_idx]
if token == ")":
return result, current_idx + 1
if token.startswith("[") and "] =>" in token:
key_part, value_part = token.split("] =>", 1)
key = key_part[1:].strip()
value = value_part.strip()
if value == "Array":
sub_value, next_idx = _parse_php_array_lines(lines, current_idx + 1)
result[key] = sub_value
current_idx = next_idx
continue
result[key] = _php_value_from_string(value)
current_idx += 1
return result, current_idx
if idx >= len(lines):
return {}, idx
token = lines[idx]
if token.lower() == "array":
idx += 1
if idx < len(lines) and lines[idx] == "(":
idx += 1
parsed, idx = parse_body(idx)
return _convert_numeric_container(parsed), idx
return {}, idx
def _parse_movie_db_php_response(payload: str) -> Optional[dict | list]:
if not payload:
return None
stripped = payload.replace("<pre>", "").replace("</pre>", "").strip()
if not stripped.lower().startswith("array"):
return None
lines = [line.strip() for line in stripped.splitlines() if line.strip()]
parsed, _ = _parse_php_array_lines(lines, 0)
return parsed
def _extract_imdb_id(payload: dict | None) -> Optional[str]:
if not payload or not isinstance(payload, dict):
return None
for key in ("imdb_id", "imdbId", "imdbID"):
value = payload.get(key)
if value:
return str(value)
external = payload.get("external_ids") or {}
if isinstance(external, dict):
for key in ("imdb_id", "imdbId", "imdbID"):
value = external.get(key)
if value:
return str(value)
return None
def _candidate_year(payload: dict) -> Optional[int]:
release_date = (
payload.get("release_date")
or payload.get("first_air_date")
or payload.get("air_date")
)
return _parse_release_year(release_date)
def _movie_db_title(payload: dict) -> str:
return (
payload.get("title")
or payload.get("name")
or payload.get("original_title")
or payload.get("original_name")
or ""
)
def _select_movie_db_candidate(
results: list[dict],
title: str,
*,
year: Optional[int] = None,
prefs: dict[str, Any] | None = None,
) -> Optional[dict]:
if not results:
return None
normalized_query = normalize_title(title)
language = (prefs or {}).get("language")
region = (prefs or {}).get("region")
best_score = None
best_result = None
for idx, result in enumerate(results):
if not isinstance(result, dict):
continue
normalized_title = normalize_title(_movie_db_title(result))
score = 0
if normalized_query and normalized_title == normalized_query:
score += 6
elif normalized_query and (
normalized_query in normalized_title or normalized_title in normalized_query
):
score += 3
if year:
candidate_year = _candidate_year(result)
if candidate_year and candidate_year == year:
score += 2
if language and result.get("original_language") == language:
score += 2
if region:
origin_country = result.get("origin_country") or []
if isinstance(origin_country, str):
origin_country = [origin_country]
if region in origin_country:
score += 2
if result.get("poster_path"):
score += 2
elif result.get("backdrop_path"):
score += 1
# Keep earlier results when scores tie.
score = (score, -idx)
if best_score is None or score > best_score:
best_score = score
best_result = result
return best_result or (results[0] if results else None)
def _movie_db_search(
media_type: str,
title: str,
*,
year: Optional[int] = None,
prefs: dict[str, Any] | None = None,
) -> tuple[Optional[dict], Optional[str]]:
language = (prefs or {}).get("language") or ""
region = (prefs or {}).get("region") or ""
normalized_key = (
media_type,
(title or "").strip().lower(),
str(year or ""),
str(language),
str(region),
)
if normalized_key in _MOVIEDB_SEARCH_CACHE:
return _MOVIEDB_SEARCH_CACHE[normalized_key]
try:
response = _REQUESTS_SESSION.get(
MOVIEDB_SEARCH_URL,
params={"type": media_type, "query": title},
timeout=6,
)
except requests.RequestException as exc:
message = f"Movie-DB search failed: {exc}"
logger.warning(message)
_MOVIEDB_SEARCH_CACHE[normalized_key] = (None, message)
return None, message
if response.status_code != 200:
message = f"Movie-DB search returned status {response.status_code}."
logger.debug(message)
_MOVIEDB_SEARCH_CACHE[normalized_key] = (None, message)
return None, message
try:
payload = response.json()
except ValueError:
message = "Movie-DB search returned an unexpected response format."
logger.debug(message)
_MOVIEDB_SEARCH_CACHE[normalized_key] = (None, message)
return None, message
results = []
if isinstance(payload, dict):
results = payload.get("results") or []
elif isinstance(payload, list):
results = payload
if not results:
message = "Movie-DB search returned no matches."
logger.debug(message)
_MOVIEDB_SEARCH_CACHE[normalized_key] = (None, message)
return None, message
candidate = _select_movie_db_candidate(
results, title, year=year, prefs=prefs
)
_MOVIEDB_SEARCH_CACHE[normalized_key] = (candidate, None)
return candidate, None
def _map_genre_ids(genre_ids: Any) -> list[str]:
if not genre_ids or not isinstance(genre_ids, (list, tuple, set)):
return []
genres: list[str] = []
seen: set[str] = set()
for entry in genre_ids:
try:
genre_id = int(entry)
except (TypeError, ValueError):
continue
name = GENRE_ID_MAP.get(genre_id)
if not name or name in seen:
continue
genres.append(name)
seen.add(name)
return genres
def _movie_db_fetch_credits(
media_type: str, content_id: Any
) -> tuple[Optional[dict], Optional[str]]:
if not content_id:
return None, "Movie-DB credits require an id."
normalized_key = (media_type, str(content_id))
if normalized_key in _MOVIEDB_CREDITS_CACHE:
return _MOVIEDB_CREDITS_CACHE[normalized_key]
params = {"id": content_id, "media_type": media_type}
try:
response = _REQUESTS_SESSION.get(
MOVIEDB_CREDITS_URL,
params=params,
timeout=6,
)
except requests.RequestException as exc:
message = f"Movie-DB credits lookup failed: {exc}"
logger.warning(message)
_MOVIEDB_CREDITS_CACHE[normalized_key] = (None, message)
return None, message
if response.status_code != 200 and media_type == "movie":
try:
response = _REQUESTS_SESSION.get(
MOVIEDB_CREDITS_URL,
params={"movie_id": content_id},
timeout=6,
)
except requests.RequestException as exc:
message = f"Movie-DB credits lookup failed: {exc}"
logger.warning(message)
_MOVIEDB_CREDITS_CACHE[normalized_key] = (None, message)
return None, message
if response.status_code != 200:
message = f"Movie-DB credits returned status {response.status_code}."
logger.debug(message)
_MOVIEDB_CREDITS_CACHE[normalized_key] = (None, message)
return None, message
try:
payload = response.json()
except ValueError:
message = "Movie-DB credits returned an unexpected response format."
logger.debug(message)
_MOVIEDB_CREDITS_CACHE[normalized_key] = (None, message)
return None, message
if not isinstance(payload, dict):
message = "Movie-DB credits returned an unexpected payload."
logger.debug(message)
_MOVIEDB_CREDITS_CACHE[normalized_key] = (None, message)
return None, message
_MOVIEDB_CREDITS_CACHE[normalized_key] = (payload, None)
return payload, None
def _normalize_credits(payload: dict | None) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
# Normalize cast/crew into the shared MediaItem schema.
if not payload:
return [], []
cast_entries = payload.get("cast") or []
crew_entries = payload.get("crew") or []
cast: list[dict[str, Any]] = []
for member in cast_entries[:12]:
if not isinstance(member, dict):
continue
name = member.get("name") or member.get("original_name")
if not name:
continue
cast.append(
{
"name": name,
"character": member.get("character"),
"profile_url": build_image_url(member.get("profile_path")),
}
)
crew: list[dict[str, Any]] = []
for member in crew_entries[:15]:
if not isinstance(member, dict):
continue
name = member.get("name") or member.get("original_name")
if not name:
continue
crew.append(
{
"name": name,
"job": member.get("job"),
"department": member.get("department"),
"profile_url": build_image_url(member.get("profile_path")),
}
)
return cast, crew
def _tmdb_title(payload: dict) -> str:
return payload.get("title") or payload.get("name") or ""
def _tmdb_candidate_year(payload: dict) -> Optional[int]:
return _parse_release_year(
payload.get("release_date") or payload.get("first_air_date")
)
def _tmdb_trailer_key(payload: dict | None) -> Optional[str]:
if not isinstance(payload, dict):
return None
videos = payload.get("videos")
if not isinstance(videos, dict):
return None
results = videos.get("results") or []
if not isinstance(results, list):
return None
candidates = [
entry
for entry in results
if isinstance(entry, dict)
and entry.get("site") == "YouTube"
and entry.get("key")
]
if not candidates:
return None
def score(entry: dict) -> tuple[int, int]:
score_value = 0
if entry.get("type") == "Trailer":
score_value += 3
elif entry.get("type") == "Teaser":
score_value += 1
if entry.get("official"):
score_value += 2
return score_value, 0
best = max(candidates, key=score)
return best.get("key")
def _select_tmdb_candidate(
results: list[dict],
title: str,
*,
year: Optional[int] = None,
prefs: dict[str, Any] | None = None,
) -> Optional[dict]:
# Score candidates by title, year, region, and artwork availability.
if not results:
return None
normalized_query = normalize_title(title)
region = (prefs or {}).get("region")
best_score = None
best_result = None
for idx, result in enumerate(results):
if not isinstance(result, dict):
continue
normalized_title = normalize_title(_tmdb_title(result))
score = 0
if normalized_query and normalized_title == normalized_query:
score += 6
elif normalized_query and (
normalized_query in normalized_title or normalized_title in normalized_query
):
score += 3
if year:
candidate_year = _tmdb_candidate_year(result)
if candidate_year and candidate_year == year:
score += 2
if region:
origin_country = result.get("origin_country") or []
if isinstance(origin_country, str):
origin_country = [origin_country]
if region in origin_country:
score += 2
if result.get("poster_path"):
score += 2
elif result.get("backdrop_path"):
score += 1
score = (score, -idx)
if best_score is None or score > best_score:
best_score = score
best_result = result
return best_result or (results[0] if results else None)
def _tmdb_request(
path: str,
*,
params: dict[str, Any],
timeout: int = 6,
) -> tuple[Optional[dict], Optional[str]]:
# TMDB API request wrapper with consistent error handling.
try:
response = _REQUESTS_SESSION.get(
f"{TMDB_API_BASE_URL}{path}",
params=params,
timeout=timeout,
)
except requests.RequestException as exc:
return None, f"TMDB request failed: {exc}"
if response.status_code != 200:
return None, f"TMDB returned status {response.status_code}."
try:
payload = response.json()
except ValueError:
return None, "TMDB returned an unexpected response format."
if not isinstance(payload, dict):
return None, "TMDB returned an unexpected payload."
return payload, None
def _tmdb_search(
media_type: str,
title: str,
*,
year: Optional[int] = None,
prefs: dict[str, Any] | None = None,
) -> tuple[Optional[dict], Optional[str]]:
api_key = _get_tmdb_api_key()
if not api_key:
return None, "TMDB API key is missing."
language = (prefs or {}).get("language") or ""
region = (prefs or {}).get("region") or ""
normalized_key = (
media_type,
(title or "").strip().lower(),
str(year or ""),
str(language),
str(region),
)
if normalized_key in _TMDB_SEARCH_CACHE:
return _TMDB_SEARCH_CACHE[normalized_key]
params = {"api_key": api_key, "query": title}
if language:
params["language"] = language
if region:
params["region"] = region
if media_type == "movie" and year:
params["year"] = year
if media_type == "tv" and year:
params["first_air_date_year"] = year
payload, error = _tmdb_request(
f"/search/{media_type}",
params=params,
)
if error:
_TMDB_SEARCH_CACHE[normalized_key] = (None, error)
return None, error
results = payload.get("results") or []
if not results:
message = "TMDB search returned no matches."
_TMDB_SEARCH_CACHE[normalized_key] = (None, message)
return None, message
candidate = _select_tmdb_candidate(results, title, year=year, prefs=prefs)
_TMDB_SEARCH_CACHE[normalized_key] = (candidate, None)
return candidate, None
def _tmdb_fetch_details(
media_type: str,
content_id: str,
*,
prefs: dict[str, Any] | None = None,
) -> tuple[Optional[dict], Optional[str]]:
api_key = _get_tmdb_api_key()
if not api_key:
return None, "TMDB API key is missing."
normalized_key = (media_type, str(content_id))
if normalized_key in _TMDB_DETAIL_CACHE:
return _TMDB_DETAIL_CACHE[normalized_key]
params = {"api_key": api_key, "append_to_response": "credits,videos"}
if prefs and prefs.get("language"):
params["language"] = prefs["language"]
payload, error = _tmdb_request(
f"/{media_type}/{content_id}",
params=params,
)
if error:
_TMDB_DETAIL_CACHE[normalized_key] = (None, error)
return None, error
_TMDB_DETAIL_CACHE[normalized_key] = (payload, None)
return payload, None
def _tmdb_fetch_episode_details(
tv_id: str,
season: int,
episode: int,
*,
prefs: dict[str, Any] | None = None,
) -> tuple[Optional[dict], Optional[str]]:
api_key = _get_tmdb_api_key()
if not api_key:
return None, "TMDB API key is missing."
params = {"api_key": api_key}
if prefs and prefs.get("language"):
params["language"] = prefs["language"]
payload, error = _tmdb_request(
f"/tv/{tv_id}/season/{season}/episode/{episode}",
params=params,
)
return payload, error
def fetch_tmdb_metadata(
media_item: MediaItem,
*,
use_cache: bool = True,
) -> tuple[Optional[Dict[str, Any]], Optional[str]]:
# TMDB metadata lookup for movies, shows, and episodes.
if media_item.item_type not in {
MediaItem.TYPE_MOVIE,
MediaItem.TYPE_SHOW,
MediaItem.TYPE_EPISODE,
}:
return None, "TMDB does not support this media type."
prefs = _get_library_metadata_prefs(media_item)
cache_key = _metadata_cache_key(media_item, prefs, provider="tmdb")
if use_cache:
cached = cache.get(cache_key)
if cached:
return cached, None
if media_item.item_type == MediaItem.TYPE_MOVIE:
direct_id = None
if media_item.movie_db_id and media_item.metadata_source in {"tmdb", "nfo"}:
direct_id = str(media_item.movie_db_id)
if direct_id:
details, message = _tmdb_fetch_details(
"movie",
direct_id,
prefs=prefs,
)
else:
candidate, message = _tmdb_search(
"movie",
media_item.title,
year=media_item.release_year,
prefs=prefs,
)
if not candidate:
return None, message
details, message = _tmdb_fetch_details(
"movie",
str(candidate.get("id")),
prefs=prefs,
)
if not details:
return None, message
credits_payload = details.get("credits") if isinstance(details, dict) else None
cast, crew = _normalize_credits(credits_payload)
genres = [entry.get("name") for entry in details.get("genres", []) if entry.get("name")]
runtime = details.get("runtime")
trailer_key = _tmdb_trailer_key(details)
release_year = _parse_release_year(details.get("release_date")) or media_item.release_year
metadata = _to_serializable(
{
"tmdb_id": str(details.get("id") or ""),
"imdb_id": details.get("imdb_id"),
"title": details.get("title") or media_item.title,
"synopsis": details.get("overview") or media_item.synopsis,
"tagline": details.get("tagline") or media_item.tagline,
"release_year": release_year,
"poster": build_image_url(details.get("poster_path")),
"backdrop": build_image_url(details.get("backdrop_path")),
"runtime_minutes": runtime,
"youtube_trailer": trailer_key,
"genres": genres,
"studios": [
entry.get("name")
for entry in details.get("production_companies", [])
if entry.get("name")
],
"rating": str(details.get("vote_average"))
if details.get("vote_average") is not None
else None,
"cast": cast,
"crew": crew,
"source": "tmdb",
"raw": details,
}
)
if use_cache:
cache.set(cache_key, metadata, METADATA_CACHE_TIMEOUT)
return metadata, None
if media_item.item_type == MediaItem.TYPE_SHOW:
direct_id = None
if media_item.movie_db_id and media_item.metadata_source in {"tmdb", "nfo"}:
direct_id = str(media_item.movie_db_id)
if direct_id:
details, message = _tmdb_fetch_details(
"tv",
direct_id,
prefs=prefs,
)
else:
candidate, message = _tmdb_search(
"tv",
media_item.title,
year=media_item.release_year,
prefs=prefs,
)
if not candidate:
return None, message
details, message = _tmdb_fetch_details(
"tv",
str(candidate.get("id")),
prefs=prefs,
)
if not details:
return None, message
credits_payload = details.get("credits") if isinstance(details, dict) else None
cast, crew = _normalize_credits(credits_payload)
genres = [entry.get("name") for entry in details.get("genres", []) if entry.get("name")]
runtime_list = details.get("episode_run_time") or []
runtime = runtime_list[0] if runtime_list else None
trailer_key = _tmdb_trailer_key(details)
release_year = _parse_release_year(details.get("first_air_date")) or media_item.release_year
metadata = _to_serializable(
{
"tmdb_id": str(details.get("id") or ""),
"title": details.get("name") or media_item.title,
"synopsis": details.get("overview") or media_item.synopsis,
"tagline": details.get("tagline") or media_item.tagline,
"release_year": release_year,
"poster": build_image_url(details.get("poster_path")),
"backdrop": build_image_url(details.get("backdrop_path")),
"runtime_minutes": runtime,
"youtube_trailer": trailer_key,
"genres": genres,
"studios": [
entry.get("name")
for entry in details.get("production_companies", [])
if entry.get("name")
],
"rating": str(details.get("vote_average"))
if details.get("vote_average") is not None
else None,
"cast": cast,
"crew": crew,
"source": "tmdb",
"raw": details,
}
)
if use_cache:
cache.set(cache_key, metadata, METADATA_CACHE_TIMEOUT)
return metadata, None
parent_series = media_item.parent or MediaItem.objects.filter(
pk=media_item.parent_id
).first()
if not parent_series:
return None, "Unable to determine parent series for episode."
tv_id = None
if parent_series.metadata_source in {"tmdb", "nfo"} and parent_series.movie_db_id:
tv_id = parent_series.movie_db_id
if not tv_id and isinstance(parent_series.metadata, dict):
tv_id = parent_series.metadata.get("id") or parent_series.metadata.get("tmdb_id")
if not tv_id:
series_candidate, message = _tmdb_search(
"tv",
parent_series.title,
year=parent_series.release_year or media_item.release_year,
prefs=prefs,
)
if not series_candidate:
return None, message
tv_id = series_candidate.get("id")
if not tv_id:
return None, "TMDB did not return a tvId for this series."
if media_item.season_number is None or media_item.episode_number is None:
return None, "Episode numbers are required for TMDB lookup."
try:
season_number = int(media_item.season_number)
episode_number = int(media_item.episode_number)
except (TypeError, ValueError):
return None, "Episode numbers are required for TMDB lookup."
details, message = _tmdb_fetch_episode_details(
str(tv_id),
season_number,
episode_number,
prefs=prefs,
)
if not details:
return None, message
release_year = _parse_release_year(details.get("air_date")) or media_item.release_year
cast = []
for star in details.get("guest_stars", [])[:12]:
if not isinstance(star, dict):
continue
name = star.get("name") or star.get("original_name")
if not name:
continue
cast.append(
{
"name": name,
"character": star.get("character"),
"profile_url": build_image_url(star.get("profile_path")),
}
)
crew = []
for member in details.get("crew", [])[:15]:
if not isinstance(member, dict):
continue
name = member.get("name") or member.get("original_name")
if not name:
continue
crew.append(
{
"name": name,
"job": member.get("job"),
"department": member.get("department"),
"profile_url": build_image_url(member.get("profile_path")),
}
)
metadata = _to_serializable(
{
"tmdb_id": str(details.get("id") or ""),
"title": details.get("name") or media_item.title,
"synopsis": details.get("overview") or media_item.synopsis,
"release_year": release_year,
"poster": build_image_url(details.get("still_path")),
"backdrop": build_image_url(details.get("still_path")),
"runtime_minutes": details.get("runtime"),
"genres": [],
"cast": cast,
"crew": crew,
"source": "tmdb",
"raw": details,
}
)
if use_cache:
cache.set(cache_key, metadata, METADATA_CACHE_TIMEOUT)
return metadata, None
def _movie_db_fetch_episode_details(
tv_id: Any, season: int, episode: int
) -> tuple[Optional[dict], Optional[str]]:
try:
response = _REQUESTS_SESSION.get(
MOVIEDB_EPISODE_URL,
params={"tvId": tv_id, "season": season, "episode": episode},
timeout=6,
)
except requests.RequestException as exc:
message = f"Movie-DB episode lookup failed: {exc}"
logger.warning(message)
return None, message
if response.status_code != 200:
message = f"Movie-DB episode endpoint returned status {response.status_code}."
logger.debug(message)
return None, message
parsed = _parse_movie_db_php_response(response.text)
if not parsed or not isinstance(parsed, dict):
message = "Movie-DB episode endpoint returned an unexpected payload."
logger.debug(message)
return None, message
return _convert_numeric_container(parsed), None
def fetch_movie_db_metadata(
media_item: MediaItem,
*,
use_cache: bool = True,
) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
# Movie-DB fallback metadata lookup when TMDB is unavailable.
if media_item.item_type not in {
MediaItem.TYPE_MOVIE,
MediaItem.TYPE_SHOW,
MediaItem.TYPE_EPISODE,
}:
return None, "Movie-DB does not support this media type."
prefs = _get_library_metadata_prefs(media_item)
cache_key = _metadata_cache_key(media_item, prefs, provider="movie-db")
if use_cache:
cached = cache.get(cache_key)
if cached:
return cached, None
if media_item.item_type == MediaItem.TYPE_MOVIE:
candidate, message = _movie_db_search(
"movie",
media_item.title,
year=media_item.release_year,
prefs=prefs,
)
if not candidate:
return None, message
movie_id = candidate.get("id")
genres = _map_genre_ids(candidate.get("genre_ids"))
credits_payload, _ = _movie_db_fetch_credits("movie", movie_id)
cast, crew = _normalize_credits(credits_payload)
imdb_id = _extract_imdb_id(candidate)
release_date = candidate.get("release_date") or candidate.get("first_air_date")
release_year = _parse_release_year(release_date) or media_item.release_year
metadata = _to_serializable(
{
"movie_db_id": str(movie_id or media_item.movie_db_id or ""),
"imdb_id": imdb_id,
"title": candidate.get("title")
or candidate.get("name")
or media_item.title,
"synopsis": candidate.get("overview") or media_item.synopsis,
"tagline": candidate.get("tagline") or media_item.tagline,
"release_year": release_year,
"poster": build_image_url(candidate.get("poster_path")),
"backdrop": build_image_url(candidate.get("backdrop_path")),
"runtime_minutes": candidate.get("runtime"),
"genres": genres,
"cast": cast,
"crew": crew,
"source": "movie-db",
"raw": candidate,
}
)
if use_cache:
cache.set(cache_key, metadata, METADATA_CACHE_TIMEOUT)
return metadata, None
if media_item.item_type == MediaItem.TYPE_SHOW:
candidate, message = _movie_db_search(
"tv",
media_item.title,
year=media_item.release_year,
prefs=prefs,
)
if not candidate:
return None, message
tv_id = candidate.get("id")
genres = _map_genre_ids(candidate.get("genre_ids"))
credits_payload, _ = _movie_db_fetch_credits("tv", tv_id)
cast, crew = _normalize_credits(credits_payload)
imdb_id = _extract_imdb_id(candidate)
release_date = candidate.get("first_air_date") or candidate.get("release_date")
release_year = _parse_release_year(release_date) or media_item.release_year
metadata = _to_serializable(
{
"movie_db_id": str(candidate.get("id") or media_item.movie_db_id or ""),
"imdb_id": imdb_id,
"title": candidate.get("name")
or candidate.get("title")
or media_item.title,
"synopsis": candidate.get("overview") or media_item.synopsis,
"tagline": candidate.get("tagline") or media_item.tagline,
"release_year": release_year,
"poster": build_image_url(candidate.get("poster_path")),
"backdrop": build_image_url(candidate.get("backdrop_path")),
"runtime_minutes": candidate.get("episode_run_time"),
"genres": genres,
"cast": cast,
"crew": crew,
"movie_db_tv_id": str(tv_id) if tv_id is not None else None,
"source": "movie-db",
"raw": candidate,
}
)
if use_cache:
cache.set(cache_key, metadata, METADATA_CACHE_TIMEOUT)
return metadata, None
parent_series = media_item.parent or MediaItem.objects.filter(
pk=media_item.parent_id
).first()
if not parent_series:
return None, "Unable to determine parent series for episode."
tv_id = None
if parent_series.metadata_source not in {"tmdb", "nfo"}:
tv_id = parent_series.movie_db_id or None
if not tv_id and isinstance(parent_series.metadata, dict):
tv_id = parent_series.metadata.get("movie_db_tv_id")
if tv_id and not parent_series.movie_db_id:
parent_series.movie_db_id = str(tv_id)
parent_series.save(update_fields=["movie_db_id", "updated_at"])
if not tv_id:
series_candidate, message = _movie_db_search(
"tv",
parent_series.title,
year=parent_series.release_year or media_item.release_year,
prefs=prefs,
)
if not series_candidate:
return None, message
tv_id = series_candidate.get("id")
if tv_id is not None:
tv_id_str = str(tv_id)
update_fields = []
if not parent_series.movie_db_id:
parent_series.movie_db_id = tv_id_str
update_fields.append("movie_db_id")
if isinstance(parent_series.metadata, dict):
updated_metadata = dict(parent_series.metadata)
else:
updated_metadata = {}
updated_metadata["movie_db_tv_id"] = tv_id_str
parent_series.metadata = updated_metadata
update_fields.append("metadata")
update_fields.append("updated_at")
parent_series.save(update_fields=update_fields)
if tv_id is None:
return None, "Movie-DB did not return a tvId for this series."
if media_item.season_number is None or media_item.episode_number is None:
return None, "Episode numbers are required for Movie-DB lookup."
try:
season_number = int(media_item.season_number)
episode_number = int(media_item.episode_number)
except (TypeError, ValueError):
return None, "Episode numbers are required for Movie-DB lookup."
episode_details, message = _movie_db_fetch_episode_details(
tv_id, season_number, episode_number
)
if not episode_details:
return None, message
release_year = (
_parse_release_year(episode_details.get("air_date"))
or media_item.release_year
)
guest_stars = episode_details.get("guest_stars") or []
crew_entries = episode_details.get("crew") or []
cast: list[dict[str, Any]] = []
for star in guest_stars:
if not isinstance(star, dict):
continue
name = star.get("name") or star.get("original_name")
if not name:
continue
cast.append(
{
"name": name,
"character": star.get("character"),
"profile_url": build_image_url(star.get("profile_path")),
}
)
crew: list[dict[str, Any]] = []
for member in crew_entries:
if not isinstance(member, dict):
continue
name = member.get("name") or member.get("original_name")
if not name:
continue
crew.append(
{
"name": name,
"job": member.get("job"),
"department": member.get("department"),
"profile_url": build_image_url(member.get("profile_path")),
}
)
metadata = _to_serializable(
{
"movie_db_id": str(episode_details.get("id") or media_item.movie_db_id or ""),
"title": episode_details.get("name") or media_item.title,
"synopsis": episode_details.get("overview") or media_item.synopsis,
"release_year": release_year,
"poster": build_image_url(episode_details.get("still_path")),
"backdrop": build_image_url(episode_details.get("still_path")),
"runtime_minutes": episode_details.get("runtime"),
"genres": [],
"cast": cast,
"crew": crew,
"movie_db_tv_id": str(tv_id) if tv_id is not None else None,
"source": "movie-db",
"raw": episode_details,
}
)
if use_cache:
cache.set(cache_key, metadata, METADATA_CACHE_TIMEOUT)
return metadata, None
def _is_empty_value(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str):
return value.strip() == ""
if isinstance(value, (list, tuple, set, dict)):
return len(value) == 0
if isinstance(value, (int, float)) and value == 0:
return True
return False
def apply_metadata(
media_item: MediaItem,
metadata: Dict[str, Any],
*,
fill_missing_only: bool = False,
update_source: bool = True,
) -> MediaItem:
# Optionally fill only missing fields, preserving existing metadata.
changed = False
update_fields: list[str] = []
def can_update(current_value: Any) -> bool:
if not fill_missing_only:
return True
return _is_empty_value(current_value)
movie_db_id = metadata.get("movie_db_id") or metadata.get("tmdb_id")
if movie_db_id and movie_db_id != media_item.movie_db_id and can_update(media_item.movie_db_id):
media_item.movie_db_id = movie_db_id
update_fields.append("movie_db_id")
changed = True
imdb_id = metadata.get("imdb_id")
if imdb_id and imdb_id != media_item.imdb_id and can_update(media_item.imdb_id):
media_item.imdb_id = imdb_id
update_fields.append("imdb_id")
changed = True
title = metadata.get("title")
if title and title != media_item.title and can_update(media_item.title):
media_item.title = title
update_fields.append("title")
changed = True
sort_title = metadata.get("sort_title")
if sort_title and sort_title != media_item.sort_title and can_update(media_item.sort_title):
media_item.sort_title = sort_title
update_fields.append("sort_title")
changed = True
synopsis = metadata.get("synopsis")
if synopsis and synopsis != media_item.synopsis and can_update(media_item.synopsis):
media_item.synopsis = synopsis
update_fields.append("synopsis")
changed = True
tagline = metadata.get("tagline")
if tagline and tagline != media_item.tagline and can_update(media_item.tagline):
media_item.tagline = tagline
update_fields.append("tagline")
changed = True
youtube_trailer = metadata.get("youtube_trailer") or metadata.get("trailer")
if (
youtube_trailer
and youtube_trailer != media_item.youtube_trailer
and can_update(media_item.youtube_trailer)
):
media_item.youtube_trailer = youtube_trailer
update_fields.append("youtube_trailer")
changed = True
rating = metadata.get("rating")
if rating and rating != media_item.rating and can_update(media_item.rating):
media_item.rating = rating
update_fields.append("rating")
changed = True
release_year = metadata.get("release_year")
if release_year and release_year != media_item.release_year and can_update(media_item.release_year):
try:
media_item.release_year = int(release_year)
except (TypeError, ValueError):
media_item.release_year = media_item.release_year
else:
changed = True
update_fields.append("release_year")
runtime_minutes = metadata.get("runtime_minutes")
if runtime_minutes and can_update(media_item.runtime_ms):
new_runtime_ms = int(float(runtime_minutes) * 60 * 1000)
if media_item.runtime_ms != new_runtime_ms:
media_item.runtime_ms = new_runtime_ms
update_fields.append("runtime_ms")
changed = True
genres = metadata.get("genres")
if genres and genres != media_item.genres and can_update(media_item.genres):
media_item.genres = genres
update_fields.append("genres")
changed = True
tags = metadata.get("tags")
if tags and tags != media_item.tags and can_update(media_item.tags):
media_item.tags = tags
update_fields.append("tags")
changed = True
studios = metadata.get("studios")
if studios and studios != media_item.studios and can_update(media_item.studios):
media_item.studios = studios
update_fields.append("studios")
changed = True
cast = metadata.get("cast")
if cast and cast != media_item.cast and can_update(media_item.cast):
media_item.cast = cast
update_fields.append("cast")
changed = True
crew = metadata.get("crew")
if crew and crew != media_item.crew and can_update(media_item.crew):
media_item.crew = crew
update_fields.append("crew")
changed = True
poster = metadata.get("poster")
if poster and poster != media_item.poster_url and can_update(media_item.poster_url):
media_item.poster_url = poster
update_fields.append("poster_url")
changed = True
backdrop = metadata.get("backdrop")
if backdrop and backdrop != media_item.backdrop_url and can_update(media_item.backdrop_url):
media_item.backdrop_url = backdrop
update_fields.append("backdrop_url")
changed = True
raw_payload = metadata.get("raw")
if raw_payload and raw_payload != media_item.metadata and can_update(media_item.metadata):
media_item.metadata = raw_payload
update_fields.append("metadata")
changed = True
new_source = metadata.get("source", media_item.metadata_source or "unknown")
if (
update_source
and new_source
and new_source != media_item.metadata_source
and can_update(media_item.metadata_source)
):
media_item.metadata_source = new_source
update_fields.append("metadata_source")
changed = True
if changed or not media_item.metadata_last_synced_at:
media_item.metadata_last_synced_at = timezone.now()
if "metadata_last_synced_at" not in update_fields:
update_fields.append("metadata_last_synced_at")
if update_fields:
update_fields = list(dict.fromkeys(update_fields + ["updated_at"]))
media_item.save(update_fields=update_fields)
if poster:
ArtworkAsset.objects.update_or_create(
media_item=media_item,
asset_type=ArtworkAsset.TYPE_POSTER,
source=metadata.get("source", "movie-db"),
defaults={"external_url": poster},
)
if backdrop:
ArtworkAsset.objects.update_or_create(
media_item=media_item,
asset_type=ArtworkAsset.TYPE_BACKDROP,
source=metadata.get("source", "movie-db"),
defaults={"external_url": backdrop},
)
return media_item
def _needs_remote_metadata(media_item: MediaItem) -> bool:
# Determine if enough metadata is missing to justify remote lookups.
required_fields = [
media_item.synopsis,
media_item.release_year,
media_item.runtime_ms,
media_item.genres,
media_item.poster_url,
media_item.backdrop_url,
media_item.cast,
media_item.crew,
media_item.rating,
]
if media_item.item_type in {MediaItem.TYPE_MOVIE, MediaItem.TYPE_SHOW}:
required_fields.append(media_item.youtube_trailer)
return any(_is_empty_value(value) for value in required_fields)
def sync_metadata(
media_item: MediaItem,
*,
force: bool = False,
allow_remote: bool = True,
) -> Optional[MediaItem]:
# Orchestrate metadata sources: NFO -> TMDB -> Movie-DB.
prefer_local = CoreSettings.get_prefer_local_metadata()
has_local_metadata = False
if prefer_local:
local_metadata, local_error = fetch_local_nfo_metadata(media_item)
if local_metadata:
apply_metadata(media_item, local_metadata)
has_local_metadata = True
elif local_error:
logger.debug("Local metadata skipped for %s: %s", media_item, local_error)
if has_local_metadata and not allow_remote:
return media_item
if not allow_remote:
return media_item if has_local_metadata else None
tmdb_metadata = None
tmdb_error = None
if _get_tmdb_api_key():
tmdb_metadata, tmdb_error = fetch_tmdb_metadata(
media_item, use_cache=not force
)
if tmdb_metadata:
return apply_metadata(
media_item,
tmdb_metadata,
fill_missing_only=prefer_local and has_local_metadata,
update_source=not (prefer_local and has_local_metadata),
)
if tmdb_error:
logger.debug("TMDB metadata skipped for %s: %s", media_item, tmdb_error)
movie_db_ok, movie_db_message = check_movie_db_health()
if not movie_db_ok:
if movie_db_message:
logger.debug("Movie-DB metadata skipped for %s: %s", media_item, movie_db_message)
return media_item if has_local_metadata else None
metadata, error = fetch_movie_db_metadata(media_item, use_cache=not force)
if not metadata:
if error:
logger.debug("Movie-DB metadata skipped for %s: %s", media_item, error)
return media_item if has_local_metadata else None
return apply_metadata(
media_item,
metadata,
fill_missing_only=prefer_local and has_local_metadata,
update_source=not (prefer_local and has_local_metadata),
)