Hooked up script runner for indexing

This commit is contained in:
Kieran Eglin 2024-05-28 10:43:54 -07:00
parent 5d624f545b
commit c5bf51dfb3
No known key found for this signature in database
GPG key ID: 193984967FCF432D
9 changed files with 182 additions and 15 deletions

View file

@ -16,10 +16,14 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do
alias Pinchflat.Media.MediaItem
alias Pinchflat.Downloading.MediaDownloadWorker
alias Pinchflat.Lifecycle.UserScripts.CommandRunner, as: UserScriptRunner
@doc """
Starts tasks for downloading media for any of a sources _pending_ media items.
Jobs are not enqueued if the source is set to not download media. This will return :ok.
You can optionally set the `kickoff_delay` option to delay when the jobs are enqueued.
NOTE: this starts a download for each media item that is pending,
not just the ones that were indexed in this job run. This should ensure
that any stragglers are caught if, for some reason, they weren't enqueued
@ -30,8 +34,6 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do
def enqueue_pending_download_tasks(source, opts \\ [])
def enqueue_pending_download_tasks(%Source{download_media: true} = source, opts) do
# TODO: test
# TODO: doc
kickoff_delay = Keyword.get(opts, :kickoff_delay, 0)
source
@ -59,11 +61,11 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do
downloaded, based on the source's download settings and whether media is
considered pending.
You can optionally set the `kickoff_delay` option to delay when the jobs are enqueued.
Returns {:ok, %Task{}} | {:error, :should_not_download} | {:error, any()}
"""
def kickoff_download_if_pending(%MediaItem{} = media_item, opts \\ []) do
# TODO: test
# TODO: doc
kickoff_delay = Keyword.get(opts, :kickoff_delay, 0)
media_item = Repo.preload(media_item, :source)
@ -107,4 +109,32 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do
|> Repo.all()
|> Enum.map(&MediaDownloadWorker.kickoff_with_task/1)
end
@doc """
Creates a media item from the attributes returned by the video backend
(read: yt-dlp) and runs the user script with a `media_indexed` event type.
Only runs the user script if the media item was created successfully and the media item
doesn't already exist in the database.
Returns {:ok, %MediaItem{}} | {:error, any()}
"""
def create_media_item_and_run_script(%Source{} = source, media_attrs_struct) do
media_already_exists =
MediaQuery.new()
|> where(^dynamic(^MediaQuery.for_source(source) and ^MediaQuery.media_id(media_attrs_struct.media_id)))
|> Repo.exists?()
case Media.create_media_item_from_backend_attrs(source, media_attrs_struct) do
{:ok, media_item} ->
if !media_already_exists do
UserScriptRunner.run(:media_indexed, media_item)
end
{:ok, media_item}
err ->
err
end
end
end

View file

@ -10,7 +10,6 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
use Pinchflat.Media.MediaQuery
alias Pinchflat.Repo
alias Pinchflat.Media
alias Pinchflat.Sources.Source
alias Pinchflat.FastIndexing.YoutubeRss
alias Pinchflat.Downloading.DownloadingHelpers
@ -43,7 +42,6 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
end)
# Wait 5s before enqueuing downloads to give the post-indexing user script a chance to run
# TODO: test
DownloadingHelpers.enqueue_pending_download_tasks(source, kickoff_delay: 5)
Enum.filter(maybe_new_media_items, & &1)
@ -59,8 +57,8 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
url = "https://www.youtube.com/watch?v=#{media_id}"
case YtDlpMedia.get_media_attributes(url) do
{:ok, media_attrs} ->
Media.create_media_item_from_backend_attrs(source, media_attrs)
{:ok, media_attrs_struct} ->
DownloadingHelpers.create_media_item_and_run_script(source, media_attrs_struct)
err ->
err

View file

@ -12,6 +12,7 @@ defmodule Pinchflat.Lifecycle.UserScripts.CommandRunner do
@behaviour UserScriptCommandRunner
@event_types [
:media_indexed,
:media_downloaded,
:media_deleted
]

View file

@ -35,6 +35,8 @@ defmodule Pinchflat.Media.MediaQuery do
def culling_prevented, do: dynamic([mi], mi.prevent_culling == true)
def culled, do: dynamic([mi], not is_nil(mi.culled_at))
def redownloaded, do: dynamic([mi], not is_nil(mi.media_redownloaded_at))
def media_id(nil), do: dynamic(false)
def media_id(media_id), do: dynamic([mi], mi.media_id == ^media_id)
def upload_date_after_source_cutoff do
dynamic([mi, source], is_nil(source.download_cutoff_date) or mi.upload_date >= source.download_cutoff_date)

View file

@ -8,7 +8,6 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
require Logger
alias Pinchflat.Repo
alias Pinchflat.Media
alias Pinchflat.Tasks
alias Pinchflat.Sources
alias Pinchflat.Sources.Source
@ -39,6 +38,9 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
item belonging to the source. You can't tell me the method name isn't descriptive!
Returns a list of media items or changesets (if the media item couldn't be created).
For each new media item, the method will also run a user script with the `media_indexed`
event, if the script is present.
Indexing is slow and usually returns a list of all media data at once for record creation.
To help with this, we use a file follower to watch the file that yt-dlp writes to
so we can create media items as they come in. This parallelizes the process and adds
@ -64,8 +66,8 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
source = Repo.reload!(source)
result =
Enum.map(media_attributes, fn media_attrs ->
case Media.create_media_item_from_backend_attrs(source, media_attrs) do
Enum.map(media_attributes, fn media_attrs_struct ->
case DownloadingHelpers.create_media_item_and_run_script(source, media_attrs_struct) do
{:ok, media_item} -> media_item
{:error, changeset} -> changeset
end
@ -73,7 +75,6 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
Sources.update_source(source, %{last_indexed_at: DateTime.utc_now()})
# Wait 5s before enqueuing downloads to give the post-indexing user script a chance to run
# TODO: test
DownloadingHelpers.enqueue_pending_download_tasks(source, kickoff_delay: 5)
result
@ -120,15 +121,14 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
end)
end
defp create_media_item_and_enqueue_download(source, media_attrs) do
defp create_media_item_and_enqueue_download(source, media_attrs_struct) do
# Reload because the source may have been updated during the (long-running) indexing process
# and important settings like `download_media` may have changed.
source = Repo.reload!(source)
case Media.create_media_item_from_backend_attrs(source, media_attrs) do
case DownloadingHelpers.create_media_item_and_run_script(source, media_attrs_struct) do
{:ok, %MediaItem{} = media_item} ->
# Wait 5s before enqueuing downloads to give the post-indexing user script a chance to run
# TODO: test
DownloadingHelpers.kickoff_download_if_pending(media_item, kickoff_delay: 5)
{:error, changeset} ->

View file

@ -6,9 +6,13 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
import Pinchflat.ProfilesFixtures
alias Pinchflat.Tasks
alias Pinchflat.Media.MediaItem
alias Pinchflat.Utils.FilesystemUtils
alias Pinchflat.Downloading.DownloadingHelpers
alias Pinchflat.Downloading.MediaDownloadWorker
alias Pinchflat.YtDlp.Media, as: YtDlpMedia
describe "enqueue_pending_download_tasks/1" do
test "it enqueues a job for each pending media item" do
source = source_fixture()
@ -19,6 +23,16 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
end
test "it can optionally delay when those jobs are enqueued" do
source = source_fixture()
_media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source, kickoff_delay: 60)
[job] = all_enqueued(worker: MediaDownloadWorker)
assert_in_delta DateTime.diff(job.scheduled_at, now()), 60, 1
end
test "it does not enqueue a job for media items with a filepath" do
source = source_fixture()
_media_item = media_item_fixture(source_id: source.id, media_filepath: "some/filepath.mp4")
@ -84,6 +98,13 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
end
test "it can optionally delay when those jobs are enqueued", %{media_item: media_item} do
assert {:ok, _} = DownloadingHelpers.kickoff_download_if_pending(media_item, kickoff_delay: 60)
[job] = all_enqueued(worker: MediaDownloadWorker)
assert_in_delta DateTime.diff(job.scheduled_at, now()), 60, 1
end
test "creates and returns a download task record", %{media_item: media_item} do
assert {:ok, task} = DownloadingHelpers.kickoff_download_if_pending(media_item)
@ -138,4 +159,67 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
refute_enqueued(worker: MediaDownloadWorker)
end
end
describe "create_media_item_and_run_script/2" do
setup do
FilesystemUtils.write_p!(filepath(), "")
File.chmod(filepath(), 0o755)
on_exit(fn -> File.rm(filepath()) end)
source = source_fixture()
media_attrs =
media_attributes_return_fixture()
|> Phoenix.json_library().decode!()
|> YtDlpMedia.response_to_struct()
{:ok, source: source, media_attrs: media_attrs}
end
test "creates a media item for a given source and attributes", %{source: source, media_attrs: media_attrs} do
assert {:ok, %MediaItem{} = media_item} = DownloadingHelpers.create_media_item_and_run_script(source, media_attrs)
assert media_item.source_id == source.id
assert media_item.title == media_attrs.title
assert media_item.media_id == media_attrs.media_id
assert media_item.original_url == media_attrs.original_url
assert media_item.description == media_attrs.description
end
test "returns an error if the media item cannot be created", %{source: source, media_attrs: media_attrs} do
media_attrs = %YtDlpMedia{media_attrs | media_id: nil}
assert {:error, %Ecto.Changeset{}} = DownloadingHelpers.create_media_item_and_run_script(source, media_attrs)
end
test "runs a script if the media item is created", %{source: source, media_attrs: media_attrs} do
# We *love* indirectly testing side effects
tmp_dir = Application.get_env(:pinchflat, :tmpfile_directory)
filename = "#{tmp_dir}/test_file-#{Enum.random(1..1000)}"
File.write(filepath(), "#!/bin/bash\ntouch #{filename}\n")
refute File.exists?(filename)
assert {:ok, %MediaItem{}} = DownloadingHelpers.create_media_item_and_run_script(source, media_attrs)
assert File.exists?(filename)
end
test "does not run a script if the media item already exists", %{source: source, media_attrs: media_attrs} do
{:ok, %MediaItem{}} = DownloadingHelpers.create_media_item_and_run_script(source, media_attrs)
tmp_dir = Application.get_env(:pinchflat, :tmpfile_directory)
filename = "#{tmp_dir}/test_file-#{Enum.random(1..1000)}"
File.write(filepath(), "#!/bin/bash\ntouch #{filename}\n")
refute File.exists?(filename)
assert {:ok, %MediaItem{}} = DownloadingHelpers.create_media_item_and_run_script(source, media_attrs)
refute File.exists?(filename)
end
defp filepath do
base_dir = Application.get_env(:pinchflat, :extras_directory)
Path.join([base_dir, "user-scripts", "lifecycle"])
end
end
end

View file

@ -28,6 +28,16 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
assert worker.args["id"] == media_item.id
end
test "enqueues the worker with a small delay", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
[job | _] = all_enqueued(worker: MediaDownloadWorker)
assert_in_delta DateTime.diff(job.scheduled_at, now()), 5, 1
end
test "does not enqueue a new worker for the source's media IDs we already know about", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
media_item_fixture(source_id: source.id, media_id: "test_1")

View file

@ -835,6 +835,18 @@ defmodule Pinchflat.MediaTest do
assert media_item_1.id == media_item_2.id
assert media_item_2.title == different_attrs.title
end
test "returns an error if the media item cannot be created" do
source = source_fixture()
media_attrs =
media_attributes_return_fixture()
|> Phoenix.json_library().decode!()
|> Map.put("id", nil)
|> YtDlpMedia.response_to_struct()
assert {:error, %Ecto.Changeset{}} = Media.create_media_item_from_backend_attrs(source, media_attrs)
end
end
describe "update_media_item/2" do

View file

@ -158,6 +158,16 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
end
test "it enqueues the job with a small delay", %{source: source} do
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source)
[job] = all_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
assert_in_delta DateTime.diff(job.scheduled_at, now()), 5, 1
end
test "it does not attach tasks if the source is set to not download" do
source = source_fixture(download_media: false)
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
@ -235,6 +245,26 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
assert_enqueued(worker: MediaDownloadWorker)
end
test "sets a small delay on the download job", %{source: source} do
watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval)
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts ->
filepath = Keyword.get(addl_opts, :output_filepath)
File.write(filepath, source_attributes_return_fixture())
# Need to add a delay to ensure the file watcher has time to read the file
:timer.sleep(watcher_poll_interval * 2)
# We know we're testing the file watcher since the syncronous call will only
# return an empty string (creating no records)
{:ok, ""}
end)
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source)
[job | _] = all_enqueued(worker: MediaDownloadWorker)
assert_in_delta DateTime.diff(job.scheduled_at, now()), 5, 1
end
test "does not enqueue downloads if the source is set to not download" do
watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval)
source = source_fixture(download_media: false)