Dispatcharr/apps/channels/tasks.py
2025-09-04 13:45:25 -05:00

1497 lines
61 KiB
Python
Executable file

# apps/channels/tasks.py
import logging
import os
import select
import re
import requests
import time
import json
import subprocess
from datetime import datetime, timedelta
import gc
from celery import shared_task
from django.utils.text import slugify
from apps.channels.models import Channel
from apps.epg.models import EPGData
from core.models import CoreSettings
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
import tempfile
from urllib.parse import quote
logger = logging.getLogger(__name__)
# Words we remove to help with fuzzy + embedding matching
COMMON_EXTRANEOUS_WORDS = [
"tv", "channel", "network", "television",
"east", "west", "hd", "uhd", "24/7",
"1080p", "720p", "540p", "480p",
"film", "movie", "movies"
]
def normalize_name(name: str) -> str:
"""
A more aggressive normalization that:
- Lowercases
- Removes bracketed/parenthesized text
- Removes punctuation
- Strips extraneous words
- Collapses extra spaces
"""
if not name:
return ""
norm = name.lower()
norm = re.sub(r"\[.*?\]", "", norm)
norm = re.sub(r"\(.*?\)", "", norm)
norm = re.sub(r"[^\w\s]", "", norm)
tokens = norm.split()
tokens = [t for t in tokens if t not in COMMON_EXTRANEOUS_WORDS]
norm = " ".join(tokens).strip()
return norm
@shared_task
def match_epg_channels():
"""
Goes through all Channels and tries to find a matching EPGData row by:
1) If channel.tvg_id is valid in EPGData, skip.
2) If channel has a tvg_id but not found in EPGData, attempt direct EPGData lookup.
3) Otherwise, perform name-based fuzzy matching with optional region-based bonus.
4) If a match is found, we set channel.tvg_id
5) Summarize and log results.
"""
try:
logger.info("Starting EPG matching logic...")
# Attempt to retrieve a "preferred-region" if configured
try:
region_obj = CoreSettings.objects.get(key="preferred-region")
region_code = region_obj.value.strip().lower()
except CoreSettings.DoesNotExist:
region_code = None
matched_channels = []
channels_to_update = []
# Get channels that don't have EPG data assigned
channels_without_epg = Channel.objects.filter(epg_data__isnull=True)
logger.info(f"Found {channels_without_epg.count()} channels without EPG data")
channels_json = []
for channel in channels_without_epg:
# Normalize TVG ID - strip whitespace and convert to lowercase
normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else ""
if normalized_tvg_id:
logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'")
channels_json.append({
"id": channel.id,
"name": channel.name,
"tvg_id": normalized_tvg_id, # Use normalized TVG ID
"original_tvg_id": channel.tvg_id, # Keep original for reference
"fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name,
"norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name)
})
# Similarly normalize EPG data TVG IDs
epg_json = []
for epg in EPGData.objects.all():
normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
epg_json.append({
'id': epg.id,
'tvg_id': normalized_tvg_id, # Use normalized TVG ID
'original_tvg_id': epg.tvg_id, # Keep original for reference
'name': epg.name,
'norm_name': normalize_name(epg.name),
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
})
# Log available EPG data TVG IDs for debugging
unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id'])
logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}")
payload = {
"channels": channels_json,
"epg_data": epg_json,
"region_code": region_code,
}
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(json.dumps(payload).encode('utf-8'))
temp_file_path = temp_file.name
# After writing to the file but before subprocess
# Explicitly delete the large data structures
del payload
gc.collect()
process = subprocess.Popen(
['python', '/app/scripts/epg_match.py', temp_file_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout = ''
block_size = 1024
while True:
# Monitor stdout and stderr for readability
readable, _, _ = select.select([process.stdout, process.stderr], [], [], 1) # timeout of 1 second
if not readable: # timeout expired
if process.poll() is not None: # check if process finished
break
else: # process still running, continue
continue
for stream in readable:
if stream == process.stdout:
stdout += stream.read(block_size)
elif stream == process.stderr:
error = stream.readline()
if error:
logger.info(error.strip())
if process.poll() is not None:
break
process.wait()
os.remove(temp_file_path)
if process.returncode != 0:
return f"Failed to process EPG matching"
result = json.loads(stdout)
# This returns lists of dicts, not model objects
channels_to_update_dicts = result["channels_to_update"]
matched_channels = result["matched_channels"]
# Explicitly clean up large objects
del stdout, result
gc.collect()
# Convert your dict-based 'channels_to_update' into real Channel objects
if channels_to_update_dicts:
# Extract IDs of the channels that need updates
channel_ids = [d["id"] for d in channels_to_update_dicts]
# Fetch them from DB
channels_qs = Channel.objects.filter(id__in=channel_ids)
channels_list = list(channels_qs)
# Build a map from channel_id -> epg_data_id (or whatever fields you need)
epg_mapping = {
d["id"]: d["epg_data_id"] for d in channels_to_update_dicts
}
# Populate each Channel object with the updated epg_data_id
for channel_obj in channels_list:
# The script sets 'epg_data_id' in the returned dict
# We either assign directly, or fetch the EPGData instance if needed.
channel_obj.epg_data_id = epg_mapping.get(channel_obj.id)
# Now we have real model objects, so bulk_update will work
Channel.objects.bulk_update(channels_list, ["epg_data"])
total_matched = len(matched_channels)
if total_matched:
logger.info(f"Match Summary: {total_matched} channel(s) matched.")
for (cid, cname, tvg) in matched_channels:
logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'")
else:
logger.info("No new channels were matched.")
logger.info("Finished EPG matching logic.")
# Send update with additional information for refreshing UI
channel_layer = get_channel_layer()
associations = [
{"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]}
for chan in channels_to_update_dicts
]
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {
"success": True,
"type": "epg_match",
"refresh_channels": True, # Flag to tell frontend to refresh channels
"matches_count": total_matched,
"message": f"EPG matching complete: {total_matched} channel(s) matched",
"associations": associations # Add the associations data
}
}
)
return f"Done. Matched {total_matched} channel(s)."
finally:
# Final cleanup
gc.collect()
# Use our standardized cleanup function for more thorough memory management
from core.utils import cleanup_memory
cleanup_memory(log_usage=True, force_collection=True)
def evaluate_series_rules_impl(tvg_id: str | None = None):
"""Synchronous implementation of series rule evaluation; returns details for debugging."""
from django.utils import timezone
from apps.channels.models import Recording, Channel
from apps.epg.models import EPGData, ProgramData
rules = CoreSettings.get_dvr_series_rules()
result = {"scheduled": 0, "details": []}
if not isinstance(rules, list) or not rules:
return result
# Optionally filter for tvg_id
if tvg_id:
rules = [r for r in rules if str(r.get("tvg_id")) == str(tvg_id)]
if not rules:
result["details"].append({"tvg_id": tvg_id, "status": "no_rule"})
return result
now = timezone.now()
horizon = now + timedelta(days=7)
# Preload existing recordings' program ids to avoid duplicates
existing_program_ids = set()
for rec in Recording.objects.all().only("custom_properties"):
try:
pid = rec.custom_properties.get("program", {}).get("id") if rec.custom_properties else None
if pid is not None:
# Normalize to string for consistent comparisons
existing_program_ids.add(str(pid))
except Exception:
continue
for rule in rules:
rv_tvg = str(rule.get("tvg_id") or "").strip()
mode = (rule.get("mode") or "all").lower()
series_title = (rule.get("title") or "").strip()
norm_series = normalize_name(series_title) if series_title else None
if not rv_tvg:
result["details"].append({"tvg_id": rv_tvg, "status": "invalid_rule"})
continue
epg = EPGData.objects.filter(tvg_id=rv_tvg).first()
if not epg:
result["details"].append({"tvg_id": rv_tvg, "status": "no_epg_match"})
continue
programs_qs = ProgramData.objects.filter(
epg=epg,
start_time__gte=now,
start_time__lte=horizon,
)
if series_title:
programs_qs = programs_qs.filter(title__iexact=series_title)
programs = list(programs_qs.order_by("start_time"))
# Fallback: if no direct matches and we have a title, try normalized comparison in Python
if series_title and not programs:
all_progs = ProgramData.objects.filter(
epg=epg,
start_time__gte=now,
start_time__lte=horizon,
).only("id", "title", "start_time", "end_time", "custom_properties", "tvg_id")
programs = [p for p in all_progs if normalize_name(p.title) == norm_series]
channel = Channel.objects.filter(epg_data=epg).order_by("channel_number").first()
if not channel:
result["details"].append({"tvg_id": rv_tvg, "status": "no_channel_for_epg"})
continue
#
# Many providers list multiple future airings of the same episode
# (e.g., prime-time and a late-night repeat). Previously we scheduled
# a recording for each airing which shows up as duplicates in the DVR.
#
# To avoid that, we collapse programs to the earliest airing per
# unique episode using the best identifier available:
# - season+episode from ProgramData.custom_properties
# - onscreen_episode (e.g., S08E03)
# - sub_title (episode name), scoped by tvg_id+series title
# If none of the above exist, we fall back to keeping each program
# (usually movies or specials without episode identifiers).
#
def _episode_key(p: "ProgramData"):
try:
props = p.custom_properties or {}
season = props.get("season")
episode = props.get("episode")
onscreen = props.get("onscreen_episode")
except Exception:
season = episode = onscreen = None
base = f"{p.tvg_id or ''}|{(p.title or '').strip().lower()}" # series scope
if season is not None and episode is not None:
return f"{base}|s{season}e{episode}"
if onscreen:
return f"{base}|{str(onscreen).strip().lower()}"
if p.sub_title:
return f"{base}|{p.sub_title.strip().lower()}"
# No reliable episode identity; use the program id to avoid over-merging
return f"id:{p.id}"
# Optionally filter to only brand-new episodes before grouping
if mode == "new":
filtered = []
for p in programs:
try:
if (p.custom_properties or {}).get("new"):
filtered.append(p)
except Exception:
pass
programs = filtered
# Pick the earliest airing for each episode key
earliest_by_key = {}
for p in programs:
k = _episode_key(p)
cur = earliest_by_key.get(k)
if cur is None or p.start_time < cur.start_time:
earliest_by_key[k] = p
unique_programs = list(earliest_by_key.values())
created_here = 0
for prog in unique_programs:
try:
# Skip if already scheduled by program id
if str(prog.id) in existing_program_ids:
continue
# Extra guard: skip if a recording exists for the same channel + timeslot
try:
from django.db.models import Q
if Recording.objects.filter(
channel=channel,
start_time=prog.start_time,
end_time=prog.end_time,
).filter(Q(custom_properties__program__id=prog.id) | Q(custom_properties__program__title=prog.title)).exists():
continue
except Exception:
continue # already scheduled/recorded
rec = Recording.objects.create(
channel=channel,
start_time=prog.start_time,
end_time=prog.end_time,
custom_properties={
"program": {
"id": prog.id,
"tvg_id": prog.tvg_id,
"title": prog.title,
"sub_title": prog.sub_title,
"description": prog.description,
"start_time": prog.start_time.isoformat(),
"end_time": prog.end_time.isoformat(),
}
},
)
existing_program_ids.add(str(prog.id))
created_here += 1
try:
prefetch_recording_artwork.apply_async(args=[rec.id], countdown=1)
except Exception:
pass
except Exception as e:
result["details"].append({"tvg_id": rv_tvg, "status": "error", "error": str(e)})
continue
result["scheduled"] += created_here
result["details"].append({"tvg_id": rv_tvg, "title": series_title, "status": "ok", "created": created_here})
# Notify frontend to refresh
try:
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'updates',
{'type': 'update', 'data': {"success": True, "type": "recordings_refreshed", "scheduled": result["scheduled"]}},
)
except Exception:
pass
return result
@shared_task
def evaluate_series_rules(tvg_id: str | None = None):
return evaluate_series_rules_impl(tvg_id)
@shared_task
def _safe_name(s):
try:
import re
s = s or ""
# Remove forbidden filename characters and normalize spaces
s = re.sub(r'[\\/:*?"<>|]+', '', s)
s = s.strip()
return s
except Exception:
return s or ""
def _parse_epg_tv_movie_info(program):
"""Return tuple (is_movie, season, episode, year, sub_title) from EPG ProgramData if available."""
is_movie = False
season = None
episode = None
year = None
sub_title = program.get('sub_title') if isinstance(program, dict) else None
try:
from apps.epg.models import ProgramData
prog_id = program.get('id') if isinstance(program, dict) else None
epg_program = ProgramData.objects.filter(id=prog_id).only('custom_properties').first() if prog_id else None
if epg_program and epg_program.custom_properties:
cp = epg_program.custom_properties
# Determine categories
cats = [c.lower() for c in (cp.get('categories') or []) if isinstance(c, str)]
is_movie = 'movie' in cats or 'film' in cats
season = cp.get('season')
episode = cp.get('episode')
onscreen = cp.get('onscreen_episode')
if (season is None or episode is None) and isinstance(onscreen, str):
import re as _re
m = _re.search(r'[sS](\d+)[eE](\d+)', onscreen)
if m:
season = season or int(m.group(1))
episode = episode or int(m.group(2))
d = cp.get('date')
if d:
year = str(d)[:4]
except Exception:
pass
return is_movie, season, episode, year, sub_title
def _build_output_paths(channel, program, start_time, end_time):
"""
Build (final_path, temp_ts_path, final_filename) using DVR templates.
"""
from core.models import CoreSettings
# Root for DVR recordings: fixed to /app/data inside the container
library_root = '/app/data'
is_movie, season, episode, year, sub_title = _parse_epg_tv_movie_info(program)
show = _safe_name(program.get('title') if isinstance(program, dict) else channel.name)
title = _safe_name(program.get('title') if isinstance(program, dict) else channel.name)
sub_title = _safe_name(sub_title)
season = int(season) if season is not None else 0
episode = int(episode) if episode is not None else 0
year = year or str(start_time.year)
values = {
'show': show,
'title': title,
'sub_title': sub_title,
'season': season,
'episode': episode,
'year': year,
'channel': _safe_name(channel.name),
'start': start_time.strftime('%Y%m%d_%H%M%S'),
'end': end_time.strftime('%Y%m%d_%H%M%S'),
}
template = CoreSettings.get_dvr_movie_template() if is_movie else CoreSettings.get_dvr_tv_template()
# Build relative path from templates with smart fallbacks
rel_path = None
if not is_movie and (season == 0 or episode == 0):
# TV fallback template when S/E are missing
try:
tv_fb = CoreSettings.get_dvr_tv_fallback_template()
rel_path = tv_fb.format(**values)
except Exception:
# Older setting support
try:
fallback_root = CoreSettings.get_dvr_tv_fallback_dir()
except Exception:
fallback_root = "TV_Shows"
rel_path = f"{fallback_root}/{show}/{values['start']}.mkv"
if not rel_path:
try:
rel_path = template.format(**values)
except Exception:
rel_path = None
# Movie-specific fallback if formatting failed or title missing
if is_movie and not rel_path:
try:
m_fb = CoreSettings.get_dvr_movie_fallback_template()
rel_path = m_fb.format(**values)
except Exception:
rel_path = f"Movies/{values['start']}.mkv"
# As a last resort for TV
if not is_movie and not rel_path:
rel_path = f"TV_Shows/{show}/S{season:02d}E{episode:02d}.mkv"
# Keep any leading folder like 'Recordings/' from the template so users can
# structure their library under /app/data as desired.
if not rel_path.lower().endswith('.mkv'):
rel_path = f"{rel_path}.mkv"
# Normalize path (strip ./)
if rel_path.startswith('./'):
rel_path = rel_path[2:]
final_path = rel_path if rel_path.startswith('/') else os.path.join(library_root, rel_path)
final_path = os.path.normpath(final_path)
# Ensure directory exists
os.makedirs(os.path.dirname(final_path), exist_ok=True)
# Derive temp TS path in same directory
base_no_ext = os.path.splitext(os.path.basename(final_path))[0]
temp_ts_path = os.path.join(os.path.dirname(final_path), f"{base_no_ext}.ts")
return final_path, temp_ts_path, os.path.basename(final_path)
@shared_task
def run_recording(recording_id, channel_id, start_time_str, end_time_str):
"""
Execute a scheduled recording for the given channel/recording.
Enhancements:
- Accepts recording_id so we can persist metadata back to the Recording row
- Persists basic file info (name/path) to Recording.custom_properties
- Attempts to capture stream stats from TS proxy (codec, resolution, fps, etc.)
- Attempts to capture a poster (via program.custom_properties) and store a Logo reference
"""
channel = Channel.objects.get(id=channel_id)
start_time = datetime.fromisoformat(start_time_str)
end_time = datetime.fromisoformat(end_time_str)
duration_seconds = int((end_time - start_time).total_seconds())
# Build output paths from templates
# We need program info; will refine after we load Recording cp below
filename = None
final_path = None
temp_ts_path = None
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"updates",
{
"type": "update",
"data": {"success": True, "type": "recording_started", "channel": channel.name}
},
)
logger.info(f"Starting recording for channel {channel.name}")
# Try to resolve the Recording row up front
recording_obj = None
try:
from .models import Recording, Logo
recording_obj = Recording.objects.get(id=recording_id)
# Prime custom_properties with file info/status
cp = recording_obj.custom_properties or {}
cp.update({
"status": "recording",
"started_at": str(datetime.now()),
})
# Provide a predictable playback URL for the frontend
cp["file_url"] = f"/api/channels/recordings/{recording_id}/file/"
cp["output_file_url"] = cp["file_url"]
# Determine program info (may include id for deeper details)
program = cp.get("program") or {}
final_path, temp_ts_path, filename = _build_output_paths(channel, program, start_time, end_time)
cp["file_name"] = filename
cp["file_path"] = final_path
cp["_temp_file_path"] = temp_ts_path
# Resolve poster the same way VODs do:
# 1) Prefer image(s) from EPG Program custom_properties (images/icon)
# 2) Otherwise reuse an existing VOD logo matching title (Movie/Series)
# 3) Otherwise save any direct poster URL from provided program fields
program = (cp.get("program") or {}) if isinstance(cp, dict) else {}
def pick_best_image_from_epg_props(epg_props):
try:
images = epg_props.get("images") or []
if not isinstance(images, list):
return None
# Prefer poster/cover and larger sizes
size_order = {"xxl": 6, "xl": 5, "l": 4, "m": 3, "s": 2, "xs": 1}
def score(img):
t = (img.get("type") or "").lower()
size = (img.get("size") or "").lower()
return (
2 if t in ("poster", "cover") else 1,
size_order.get(size, 0)
)
best = None
for im in images:
if not isinstance(im, dict):
continue
url = im.get("url")
if not url:
continue
if best is None or score(im) > score(best):
best = im
return best.get("url") if best else None
except Exception:
return None
poster_logo_id = None
poster_url = None
# Try EPG Program custom_properties by ID
try:
from apps.epg.models import ProgramData
prog_id = program.get("id")
if prog_id:
epg_program = ProgramData.objects.filter(id=prog_id).only("custom_properties").first()
if epg_program and epg_program.custom_properties:
epg_props = epg_program.custom_properties or {}
poster_url = pick_best_image_from_epg_props(epg_props)
if not poster_url:
icon = epg_props.get("icon")
if isinstance(icon, str) and icon:
poster_url = icon
except Exception as e:
logger.debug(f"EPG image lookup failed: {e}")
# Fallback: reuse VOD Logo by matching title
if not poster_url and not poster_logo_id:
try:
from apps.vod.models import Movie, Series
title = program.get("title") or channel.name
vod_logo = None
movie = Movie.objects.filter(name__iexact=title).select_related("logo").first()
if movie and movie.logo:
vod_logo = movie.logo
if not vod_logo:
series = Series.objects.filter(name__iexact=title).select_related("logo").first()
if series and series.logo:
vod_logo = series.logo
if vod_logo:
poster_logo_id = vod_logo.id
except Exception as e:
logger.debug(f"VOD logo fallback failed: {e}")
# External metadata lookups (TMDB/OMDb) when EPG/VOD didn't provide an image
if not poster_url and not poster_logo_id:
try:
tmdb_key = os.environ.get('TMDB_API_KEY')
omdb_key = os.environ.get('OMDB_API_KEY')
title = (program.get('title') or channel.name or '').strip()
year = None
imdb_id = None
# Try to derive year and imdb from EPG program custom_properties
try:
from apps.epg.models import ProgramData
prog_id = program.get('id')
epg_program = ProgramData.objects.filter(id=prog_id).only('custom_properties').first() if prog_id else None
if epg_program and epg_program.custom_properties:
d = epg_program.custom_properties.get('date')
if d and len(str(d)) >= 4:
year = str(d)[:4]
imdb_id = epg_program.custom_properties.get('imdb.com_id') or imdb_id
except Exception:
pass
# TMDB: by IMDb ID
if not poster_url and tmdb_key and imdb_id:
try:
url = f"https://api.themoviedb.org/3/find/{quote(imdb_id)}?api_key={tmdb_key}&external_source=imdb_id"
resp = requests.get(url, timeout=5)
if resp.ok:
data = resp.json() or {}
picks = []
for k in ('movie_results', 'tv_results', 'tv_episode_results', 'tv_season_results'):
lst = data.get(k) or []
picks.extend(lst)
poster_path = None
for item in picks:
if item.get('poster_path'):
poster_path = item['poster_path']
break
if poster_path:
poster_url = f"https://image.tmdb.org/t/p/w780{poster_path}"
except Exception:
pass
# TMDB: by title (and year if available)
if not poster_url and tmdb_key and title:
try:
q = quote(title)
extra = f"&year={year}" if year else ""
url = f"https://api.themoviedb.org/3/search/multi?api_key={tmdb_key}&query={q}{extra}"
resp = requests.get(url, timeout=5)
if resp.ok:
data = resp.json() or {}
results = data.get('results') or []
results.sort(key=lambda x: float(x.get('popularity') or 0), reverse=True)
for item in results:
if item.get('poster_path'):
poster_url = f"https://image.tmdb.org/t/p/w780{item['poster_path']}"
break
except Exception:
pass
# OMDb fallback
if not poster_url and omdb_key:
try:
if imdb_id:
url = f"https://www.omdbapi.com/?apikey={omdb_key}&i={quote(imdb_id)}"
elif title:
yy = f"&y={year}" if year else ""
url = f"https://www.omdbapi.com/?apikey={omdb_key}&t={quote(title)}{yy}"
else:
url = None
if url:
resp = requests.get(url, timeout=5)
if resp.ok:
data = resp.json() or {}
p = data.get('Poster')
if p and p != 'N/A':
poster_url = p
except Exception:
pass
except Exception as e:
logger.debug(f"External poster lookup failed: {e}")
# Keyless fallback providers (no API keys required)
if not poster_url and not poster_logo_id:
try:
title = (program.get('title') or channel.name or '').strip()
if title:
# 1) TVMaze (TV shows) - singlesearch by title
try:
url = f"https://api.tvmaze.com/singlesearch/shows?q={quote(title)}"
resp = requests.get(url, timeout=5)
if resp.ok:
data = resp.json() or {}
img = (data.get('image') or {})
p = img.get('original') or img.get('medium')
if p:
poster_url = p
except Exception:
pass
# 2) iTunes Search API (movies or tv shows)
if not poster_url:
try:
for media in ('movie', 'tvShow'):
url = f"https://itunes.apple.com/search?term={quote(title)}&media={media}&limit=1"
resp = requests.get(url, timeout=5)
if resp.ok:
data = resp.json() or {}
results = data.get('results') or []
if results:
art = results[0].get('artworkUrl100')
if art:
# Scale up to 600x600 by convention
poster_url = art.replace('100x100', '600x600')
break
except Exception:
pass
except Exception as e:
logger.debug(f"Keyless poster lookup failed: {e}")
# Last: check direct fields on provided program object
if not poster_url and not poster_logo_id:
for key in ("poster", "cover", "cover_big", "image", "icon"):
val = program.get(key)
if isinstance(val, dict):
candidate = val.get("url")
if candidate:
poster_url = candidate
break
elif isinstance(val, str) and val:
poster_url = val
break
# Create or assign Logo
if not poster_logo_id and poster_url and len(poster_url) <= 1000:
try:
logo, _ = Logo.objects.get_or_create(url=poster_url, defaults={"name": program.get("title") or channel.name})
poster_logo_id = logo.id
except Exception as e:
logger.debug(f"Unable to persist poster to Logo: {e}")
if poster_logo_id:
cp["poster_logo_id"] = poster_logo_id
if poster_url and "poster_url" not in cp:
cp["poster_url"] = poster_url
# Ensure destination exists so it's visible immediately
try:
os.makedirs(os.path.dirname(final_path), exist_ok=True)
if not os.path.exists(final_path):
open(final_path, 'ab').close()
except Exception:
pass
recording_obj.custom_properties = cp
recording_obj.save(update_fields=["custom_properties"])
except Exception as e:
logger.debug(f"Unable to prime Recording metadata: {e}")
interrupted = False
interrupted_reason = None
bytes_written = 0
from requests.exceptions import ReadTimeout, ConnectionError as ReqConnectionError, ChunkedEncodingError
# Determine internal base URL(s) for TS streaming
# Prefer explicit override, then try common ports for debug and docker
explicit = os.environ.get('DISPATCHARR_INTERNAL_TS_BASE_URL')
is_dev = (os.environ.get('DISPATCHARR_ENV', '').lower() == 'dev') or \
(os.environ.get('DISPATCHARR_DEBUG', '').lower() == 'true') or \
(os.environ.get('REDIS_HOST', 'redis') in ('localhost', '127.0.0.1'))
candidates = []
if explicit:
candidates.append(explicit)
if is_dev:
# Debug container typically exposes API on 5656
candidates.extend(['http://127.0.0.1:5656', 'http://127.0.0.1:9191'])
# Docker service name fallback
candidates.append(os.environ.get('DISPATCHARR_INTERNAL_API_BASE', 'http://web:9191'))
# Last-resort localhost ports
candidates.extend(['http://localhost:5656', 'http://localhost:9191'])
chosen_base = None
last_error = None
bytes_written = 0
interrupted = False
interrupted_reason = None
# We'll attempt each base until we receive some data
for base in candidates:
try:
test_url = f"{base.rstrip('/')}/proxy/ts/stream/{channel.uuid}"
logger.info(f"DVR: trying TS base {base} -> {test_url}")
with requests.get(
test_url,
headers={
'User-Agent': 'Dispatcharr-DVR',
},
stream=True,
timeout=(10, 15),
) as response:
response.raise_for_status()
# Open the file and start copying; if we get any data within a short window, accept this base
got_any_data = False
test_window = 3.0 # seconds to detect first bytes
window_start = time.time()
with open(temp_ts_path, 'wb') as file:
started_at = time.time()
for chunk in response.iter_content(chunk_size=8192):
if not chunk:
# keep-alives may be empty; continue
if not got_any_data and (time.time() - window_start) > test_window:
break
continue
# We have data
got_any_data = True
chosen_base = base
# Fall through to full recording loop using this same response/connection
file.write(chunk)
bytes_written += len(chunk)
elapsed = time.time() - started_at
if elapsed > duration_seconds:
break
# Continue draining the stream
for chunk2 in response.iter_content(chunk_size=8192):
if not chunk2:
continue
file.write(chunk2)
bytes_written += len(chunk2)
elapsed = time.time() - started_at
if elapsed > duration_seconds:
break
break # exit outer for-loop once we switched to full drain
# If we wrote any bytes, treat as success and stop trying candidates
if bytes_written > 0:
logger.info(f"DVR: selected TS base {base}; wrote initial {bytes_written} bytes")
break
else:
last_error = f"no_data_from_{base}"
logger.warning(f"DVR: no data received from {base} within {test_window}s, trying next base")
# Clean up empty temp file
try:
if os.path.exists(temp_ts_path) and os.path.getsize(temp_ts_path) == 0:
os.remove(temp_ts_path)
except Exception:
pass
except Exception as e:
last_error = str(e)
logger.warning(f"DVR: attempt failed for base {base}: {e}")
if chosen_base is None and bytes_written == 0:
interrupted = True
interrupted_reason = f"no_stream_data: {last_error or 'all_bases_failed'}"
else:
# If we ended before reaching planned duration, record reason
actual_elapsed = 0
try:
actual_elapsed = os.path.getsize(temp_ts_path) and (duration_seconds) # Best effort; we streamed until duration or disconnect above
except Exception:
pass
# We cannot compute accurate elapsed here; fine to leave as is
pass
# If no bytes were written at all, mark detail
if bytes_written == 0 and not interrupted:
interrupted = True
interrupted_reason = f"no_stream_data: {last_error or 'unknown'}"
# Update DB status immediately so the UI reflects the change on the event below
try:
if recording_obj is None:
from .models import Recording
recording_obj = Recording.objects.get(id=recording_id)
cp_now = recording_obj.custom_properties or {}
cp_now.update({
"status": "interrupted" if interrupted else "completed",
"ended_at": str(datetime.now()),
"file_name": filename or cp_now.get("file_name"),
"file_path": final_path or cp_now.get("file_path"),
})
if interrupted and interrupted_reason:
cp_now["interrupted_reason"] = interrupted_reason
recording_obj.custom_properties = cp_now
recording_obj.save(update_fields=["custom_properties"])
except Exception as e:
logger.debug(f"Failed to update immediate recording status: {e}")
async_to_sync(channel_layer.group_send)(
"updates",
{
"type": "update",
"data": {"success": True, "type": "recording_ended", "channel": channel.name}
},
)
# After the loop, the file and response are closed automatically.
logger.info(f"Finished recording for channel {channel.name}")
# Remux TS to MKV container
remux_success = False
try:
if temp_ts_path and os.path.exists(temp_ts_path):
subprocess.run([
"ffmpeg", "-y", "-i", temp_ts_path, "-c", "copy", final_path
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
remux_success = os.path.exists(final_path)
# Clean up temp file on success
if remux_success:
try:
os.remove(temp_ts_path)
except Exception:
pass
except Exception as e:
logger.warning(f"MKV remux failed: {e}")
# Persist final metadata to Recording (status, ended_at, and stream stats if available)
try:
if recording_obj is None:
from .models import Recording
recording_obj = Recording.objects.get(id=recording_id)
cp = recording_obj.custom_properties or {}
cp.update({
"ended_at": str(datetime.now()),
})
if interrupted:
cp["status"] = "interrupted"
if interrupted_reason:
cp["interrupted_reason"] = interrupted_reason
else:
cp["status"] = "completed"
cp["bytes_written"] = bytes_written
cp["remux_success"] = remux_success
# Try to get stream stats from TS proxy Redis metadata
try:
from core.utils import RedisClient
from apps.proxy.ts_proxy.redis_keys import RedisKeys
from apps.proxy.ts_proxy.constants import ChannelMetadataField
r = RedisClient.get_client()
if r is not None:
metadata_key = RedisKeys.channel_metadata(str(channel.uuid))
md = r.hgetall(metadata_key)
if md:
def _gv(bkey):
return md.get(bkey.encode('utf-8'))
def _d(bkey, cast=str):
v = _gv(bkey)
try:
if v is None:
return None
s = v.decode('utf-8')
return cast(s) if cast is not str else s
except Exception:
return None
stream_info = {}
# Video fields
for key, caster in [
(ChannelMetadataField.VIDEO_CODEC, str),
(ChannelMetadataField.RESOLUTION, str),
(ChannelMetadataField.WIDTH, float),
(ChannelMetadataField.HEIGHT, float),
(ChannelMetadataField.SOURCE_FPS, float),
(ChannelMetadataField.PIXEL_FORMAT, str),
(ChannelMetadataField.VIDEO_BITRATE, float),
]:
val = _d(key, caster)
if val is not None:
stream_info[key] = val
# Audio fields
for key, caster in [
(ChannelMetadataField.AUDIO_CODEC, str),
(ChannelMetadataField.SAMPLE_RATE, float),
(ChannelMetadataField.AUDIO_CHANNELS, str),
(ChannelMetadataField.AUDIO_BITRATE, float),
]:
val = _d(key, caster)
if val is not None:
stream_info[key] = val
if stream_info:
cp["stream_info"] = stream_info
except Exception as e:
logger.debug(f"Unable to capture stream stats for recording: {e}")
# Removed: local thumbnail generation. We rely on EPG/VOD/TMDB/OMDb/keyless providers only.
recording_obj.custom_properties = cp
recording_obj.save(update_fields=["custom_properties"])
except Exception as e:
logger.debug(f"Unable to finalize Recording metadata: {e}")
# Optionally run comskip post-process
try:
from core.models import CoreSettings
if CoreSettings.get_dvr_comskip_enabled():
comskip_process_recording.delay(recording_id)
except Exception:
pass
@shared_task
def recover_recordings_on_startup():
"""
On service startup, reschedule or resume recordings to handle server restarts.
- For recordings whose window includes 'now': mark interrupted and start a new recording for the remainder.
- For future recordings: ensure a task is scheduled at start_time.
Uses a Redis lock to ensure only one worker runs this recovery.
"""
try:
from django.utils import timezone
from .models import Recording
from core.utils import RedisClient
from .signals import schedule_recording_task
redis = RedisClient.get_client()
if redis:
lock_key = "dvr:recover_lock"
# Set lock with 60s TTL; only first winner proceeds
if not redis.set(lock_key, "1", ex=60, nx=True):
return "Recovery already in progress"
now = timezone.now()
# Resume in-window recordings
active = Recording.objects.filter(start_time__lte=now, end_time__gt=now)
for rec in active:
try:
cp = rec.custom_properties or {}
# Mark interrupted due to restart; will flip to 'recording' when task starts
cp["status"] = "interrupted"
cp["interrupted_reason"] = "server_restarted"
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
# Start recording for remaining window
run_recording.apply_async(
args=[rec.id, rec.channel_id, str(now), str(rec.end_time)], eta=now
)
except Exception as e:
logger.warning(f"Failed to resume recording {rec.id}: {e}")
# Ensure future recordings are scheduled
upcoming = Recording.objects.filter(start_time__gt=now, end_time__gt=now)
for rec in upcoming:
try:
# Schedule task at start_time
task_id = schedule_recording_task(rec)
if task_id:
rec.task_id = task_id
rec.save(update_fields=["task_id"])
except Exception as e:
logger.warning(f"Failed to schedule recording {rec.id}: {e}")
return "Recovery complete"
except Exception as e:
logger.error(f"Error during DVR recovery: {e}")
return f"Error: {e}"
@shared_task
def comskip_process_recording(recording_id: int):
"""Run comskip on the MKV to remove commercials and replace the file in place.
Safe to call even if comskip is not installed; stores status in custom_properties.comskip.
"""
import shutil
from .models import Recording
# Helper to broadcast status over websocket
def _ws(status: str, extra: dict | None = None):
try:
from core.utils import send_websocket_update
payload = {"success": True, "type": "comskip_status", "status": status, "recording_id": recording_id}
if extra:
payload.update(extra)
send_websocket_update('updates', 'update', payload)
except Exception:
pass
try:
rec = Recording.objects.get(id=recording_id)
except Recording.DoesNotExist:
return "not_found"
cp = rec.custom_properties or {}
file_path = (cp or {}).get("file_path")
if not file_path or not os.path.exists(file_path):
return "no_file"
if isinstance(cp.get("comskip"), dict) and cp["comskip"].get("status") == "completed":
return "already_processed"
comskip_bin = shutil.which("comskip")
if not comskip_bin:
cp["comskip"] = {"status": "skipped", "reason": "comskip_not_installed"}
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
_ws('skipped', {"reason": "comskip_not_installed"})
return "comskip_missing"
base, _ = os.path.splitext(file_path)
edl_path = f"{base}.edl"
# Notify start
_ws('started', {"title": (cp.get('program') or {}).get('title') or os.path.basename(file_path)})
try:
cmd = [comskip_bin, "--output", os.path.dirname(file_path)]
# Prefer system ini if present to squelch warning and get sane defaults
for ini_path in ("/etc/comskip/comskip.ini", "/app/docker/comskip.ini"):
if os.path.exists(ini_path):
cmd.extend([f"--ini={ini_path}"])
break
cmd.append(file_path)
subprocess.run(cmd, check=True)
except Exception as e:
cp["comskip"] = {"status": "error", "reason": f"comskip_failed: {e}"}
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
_ws('error', {"reason": str(e)})
return "comskip_failed"
if not os.path.exists(edl_path):
cp["comskip"] = {"status": "error", "reason": "edl_not_found"}
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
_ws('error', {"reason": "edl_not_found"})
return "no_edl"
# Duration via ffprobe
def _ffprobe_duration(path):
try:
p = subprocess.run([
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", path
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
return float(p.stdout.strip())
except Exception:
return None
duration = _ffprobe_duration(file_path)
if duration is None:
cp["comskip"] = {"status": "error", "reason": "duration_unknown"}
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
_ws('error', {"reason": "duration_unknown"})
return "no_duration"
commercials = []
try:
with open(edl_path, "r") as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 2:
try:
s = float(parts[0]); e = float(parts[1])
commercials.append((max(0.0, s), min(duration, e)))
except Exception:
pass
except Exception:
pass
commercials.sort()
keep = []
cur = 0.0
for s, e in commercials:
if s > cur:
keep.append((cur, max(cur, s)))
cur = max(cur, e)
if cur < duration:
keep.append((cur, duration))
if not commercials or sum((e - s) for s, e in commercials) <= 0.5:
cp["comskip"] = {"status": "completed", "skipped": True, "edl": os.path.basename(edl_path)}
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
_ws('skipped', {"reason": "no_commercials", "commercials": 0})
return "no_commercials"
workdir = os.path.dirname(file_path)
parts = []
try:
for idx, (s, e) in enumerate(keep):
seg = os.path.join(workdir, f"segment_{idx:03d}.mkv")
dur = max(0.0, e - s)
if dur <= 0.01:
continue
subprocess.run([
"ffmpeg", "-y", "-ss", f"{s:.3f}", "-i", file_path, "-t", f"{dur:.3f}",
"-c", "copy", "-avoid_negative_ts", "1", seg
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
parts.append(seg)
if not parts:
raise RuntimeError("no_parts")
list_path = os.path.join(workdir, "concat_list.txt")
with open(list_path, "w") as lf:
for pth in parts:
lf.write(f"file '{pth}'\n")
output_path = os.path.join(workdir, f"{os.path.splitext(os.path.basename(file_path))[0]}.cut.mkv")
subprocess.run([
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_path, "-c", "copy", output_path
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
os.replace(output_path, file_path)
except Exception:
shutil.copy(output_path, file_path)
try:
os.remove(list_path)
except Exception:
pass
for pth in parts:
try: os.remove(pth)
except Exception: pass
cp["comskip"] = {
"status": "completed",
"edl": os.path.basename(edl_path),
"segments_kept": len(parts),
"commercials": len(commercials),
}
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
_ws('completed', {"commercials": len(commercials), "segments_kept": len(parts)})
return "ok"
except Exception as e:
cp["comskip"] = {"status": "error", "reason": str(e)}
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
_ws('error', {"reason": str(e)})
return f"error:{e}"
def _resolve_poster_for_program(channel_name, program):
"""Internal helper that attempts to resolve a poster URL and/or Logo id.
Returns (poster_logo_id, poster_url) where either may be None.
"""
poster_logo_id = None
poster_url = None
# Try EPG Program images first
try:
from apps.epg.models import ProgramData
prog_id = program.get("id") if isinstance(program, dict) else None
if prog_id:
epg_program = ProgramData.objects.filter(id=prog_id).only("custom_properties").first()
if epg_program and epg_program.custom_properties:
epg_props = epg_program.custom_properties or {}
def pick_best_image_from_epg_props(epg_props):
images = epg_props.get("images") or []
if not isinstance(images, list):
return None
size_order = {"xxl": 6, "xl": 5, "l": 4, "m": 3, "s": 2, "xs": 1}
def score(img):
t = (img.get("type") or "").lower()
size = (img.get("size") or "").lower()
return (2 if t in ("poster", "cover") else 1, size_order.get(size, 0))
best = None
for im in images:
if not isinstance(im, dict):
continue
url = im.get("url")
if not url:
continue
if best is None or score(im) > score(best):
best = im
return best.get("url") if best else None
poster_url = pick_best_image_from_epg_props(epg_props)
if not poster_url:
icon = epg_props.get("icon")
if isinstance(icon, str) and icon:
poster_url = icon
except Exception:
pass
# VOD logo fallback by title
if not poster_url and not poster_logo_id:
try:
from apps.vod.models import Movie, Series
title = (program.get("title") if isinstance(program, dict) else None) or channel_name
vod_logo = None
movie = Movie.objects.filter(name__iexact=title).select_related("logo").first()
if movie and movie.logo:
vod_logo = movie.logo
if not vod_logo:
series = Series.objects.filter(name__iexact=title).select_related("logo").first()
if series and series.logo:
vod_logo = series.logo
if vod_logo:
poster_logo_id = vod_logo.id
except Exception:
pass
# Keyless providers (TVMaze & iTunes)
if not poster_url and not poster_logo_id:
try:
title = (program.get('title') if isinstance(program, dict) else None) or channel_name
if title:
# TVMaze
try:
url = f"https://api.tvmaze.com/singlesearch/shows?q={quote(title)}"
resp = requests.get(url, timeout=5)
if resp.ok:
data = resp.json() or {}
img = (data.get('image') or {})
p = img.get('original') or img.get('medium')
if p:
poster_url = p
except Exception:
pass
# iTunes
if not poster_url:
try:
for media in ('movie', 'tvShow'):
url = f"https://itunes.apple.com/search?term={quote(title)}&media={media}&limit=1"
resp = requests.get(url, timeout=5)
if resp.ok:
data = resp.json() or {}
results = data.get('results') or []
if results:
art = results[0].get('artworkUrl100')
if art:
poster_url = art.replace('100x100', '600x600')
break
except Exception:
pass
except Exception:
pass
# Fallback: search existing Logo entries by name if we still have nothing
if not poster_logo_id and not poster_url:
try:
from .models import Logo
title = (program.get("title") if isinstance(program, dict) else None) or channel_name
existing = Logo.objects.filter(name__iexact=title).first()
if existing:
poster_logo_id = existing.id
poster_url = existing.url
except Exception:
pass
# Save to Logo if URL available
if not poster_logo_id and poster_url and len(poster_url) <= 1000:
try:
from .models import Logo
logo, _ = Logo.objects.get_or_create(url=poster_url, defaults={"name": (program.get("title") if isinstance(program, dict) else None) or channel_name})
poster_logo_id = logo.id
except Exception:
pass
return poster_logo_id, poster_url
@shared_task
def prefetch_recording_artwork(recording_id):
"""Prefetch poster info for a scheduled recording so the UI can show art in Upcoming."""
try:
from .models import Recording
rec = Recording.objects.get(id=recording_id)
cp = rec.custom_properties or {}
program = cp.get("program") or {}
poster_logo_id, poster_url = _resolve_poster_for_program(rec.channel.name, program)
updated = False
if poster_logo_id and cp.get("poster_logo_id") != poster_logo_id:
cp["poster_logo_id"] = poster_logo_id
updated = True
if poster_url and cp.get("poster_url") != poster_url:
cp["poster_url"] = poster_url
updated = True
# Enrich with rating if available from ProgramData.custom_properties
try:
from apps.epg.models import ProgramData
prog_id = program.get("id") if isinstance(program, dict) else None
if prog_id:
epg_program = ProgramData.objects.filter(id=prog_id).only("custom_properties").first()
if epg_program and isinstance(epg_program.custom_properties, dict):
rating_val = epg_program.custom_properties.get("rating")
rating_sys = epg_program.custom_properties.get("rating_system")
season_val = epg_program.custom_properties.get("season")
episode_val = epg_program.custom_properties.get("episode")
onscreen = epg_program.custom_properties.get("onscreen_episode")
if rating_val and cp.get("rating") != rating_val:
cp["rating"] = rating_val
updated = True
if rating_sys and cp.get("rating_system") != rating_sys:
cp["rating_system"] = rating_sys
updated = True
if season_val is not None and cp.get("season") != season_val:
cp["season"] = season_val
updated = True
if episode_val is not None and cp.get("episode") != episode_val:
cp["episode"] = episode_val
updated = True
if onscreen and cp.get("onscreen_episode") != onscreen:
cp["onscreen_episode"] = onscreen
updated = True
except Exception:
pass
if updated:
rec.custom_properties = cp
rec.save(update_fields=["custom_properties"])
try:
from core.utils import send_websocket_update
send_websocket_update('updates', 'update', {"success": True, "type": "recording_updated", "recording_id": rec.id})
except Exception:
pass
return "ok"
except Exception as e:
logger.debug(f"prefetch_recording_artwork failed: {e}")
return f"error: {e}"