diff --git a/config/runtime.exs b/config/runtime.exs index df8ef50..4f0a5ca 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -47,7 +47,7 @@ config :pinchflat, Pinchflat.Repo, config :pinchflat, Oban, queues: [ default: 10, - fast_indexing: 6, + fast_indexing: yt_dlp_worker_count, media_collection_indexing: yt_dlp_worker_count, media_fetching: yt_dlp_worker_count, remote_metadata: yt_dlp_worker_count, diff --git a/lib/pinchflat/downloading/downloading_helpers.ex b/lib/pinchflat/downloading/downloading_helpers.ex index 5898533..eae187c 100644 --- a/lib/pinchflat/downloading/downloading_helpers.ex +++ b/lib/pinchflat/downloading/downloading_helpers.ex @@ -27,13 +27,15 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do Returns :ok """ - def enqueue_pending_download_tasks(%Source{download_media: true} = source) do + def enqueue_pending_download_tasks(source, job_opts \\ []) + + def enqueue_pending_download_tasks(%Source{download_media: true} = source, job_opts) do source |> Media.list_pending_media_items_for() - |> Enum.each(&MediaDownloadWorker.kickoff_with_task/1) + |> Enum.each(&MediaDownloadWorker.kickoff_with_task(&1, %{}, job_opts)) end - def enqueue_pending_download_tasks(%Source{download_media: false}) do + def enqueue_pending_download_tasks(%Source{download_media: false}, _job_opts) do :ok end @@ -55,13 +57,13 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do Returns {:ok, %Task{}} | {:error, :should_not_download} | {:error, any()} """ - def kickoff_download_if_pending(%MediaItem{} = media_item) do + def kickoff_download_if_pending(%MediaItem{} = media_item, job_opts \\ []) do media_item = Repo.preload(media_item, :source) if media_item.source.download_media && Media.pending_download?(media_item) do Logger.info("Kicking off download for media item ##{media_item.id} (#{media_item.media_id})") - MediaDownloadWorker.kickoff_with_task(media_item) + MediaDownloadWorker.kickoff_with_task(media_item, %{}, job_opts) else {:error, :should_not_download} end diff --git a/lib/pinchflat/downloading/media_download_worker.ex b/lib/pinchflat/downloading/media_download_worker.ex index e442344..7a6f8e8 100644 --- a/lib/pinchflat/downloading/media_download_worker.ex +++ b/lib/pinchflat/downloading/media_download_worker.ex @@ -3,6 +3,7 @@ defmodule Pinchflat.Downloading.MediaDownloadWorker do use Oban.Worker, queue: :media_fetching, + priority: 5, unique: [period: :infinity, states: [:available, :scheduled, :retryable, :executing]], tags: ["media_item", "media_fetching", "show_in_dashboard"] diff --git a/lib/pinchflat/fast_indexing/fast_indexing_helpers.ex b/lib/pinchflat/fast_indexing/fast_indexing_helpers.ex index c31388d..6edd3e5 100644 --- a/lib/pinchflat/fast_indexing/fast_indexing_helpers.ex +++ b/lib/pinchflat/fast_indexing/fast_indexing_helpers.ex @@ -40,7 +40,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do Returns [%MediaItem{}] where each item is a new media item that was created _but not necessarily downloaded_. """ - def kickoff_download_tasks_from_youtube_rss_feed(%Source{} = source) do + def index_and_kickoff_downloads(%Source{} = source) do # The media_profile is needed to determine the quality options to _then_ determine a more # accurate predicted filepath source = Repo.preload(source, [:media_profile]) @@ -53,6 +53,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do Enum.map(new_media_ids, fn media_id -> case create_media_item_from_media_id(source, media_id) do {:ok, media_item} -> + DownloadingHelpers.kickoff_download_if_pending(media_item, priority: 0) media_item err -> @@ -61,7 +62,9 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do end end) - DownloadingHelpers.enqueue_pending_download_tasks(source) + # Pick up any stragglers. Intentionally has a lower priority than the per-media item + # kickoff above + DownloadingHelpers.enqueue_pending_download_tasks(source, priority: 1) Enum.filter(maybe_new_media_items, & &1) end diff --git a/lib/pinchflat/fast_indexing/fast_indexing_worker.ex b/lib/pinchflat/fast_indexing/fast_indexing_worker.ex index 368da17..ed83bf3 100644 --- a/lib/pinchflat/fast_indexing/fast_indexing_worker.ex +++ b/lib/pinchflat/fast_indexing/fast_indexing_worker.ex @@ -38,8 +38,8 @@ defmodule Pinchflat.FastIndexing.FastIndexingWorker do Order of operations: 1. FastIndexingWorker (this module) periodically checks the YouTube RSS feed for new media. - with `FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed` - 2. If the above `kickoff_download_tasks_from_youtube_rss_feed` finds new media items in the RSS feed, + with `FastIndexingHelpers.index_and_kickoff_downloads` + 2. If the above `index_and_kickoff_downloads` finds new media items in the RSS feed, it indexes them with a yt-dlp call to create the media item records then kicks off downloading tasks (MediaDownloadWorker) for any new media items _that should be downloaded_. 3. Once downloads are kicked off, this worker sends a notification to the apprise server if applicable @@ -67,7 +67,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingWorker do new_media_items = source - |> FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed() + |> FastIndexingHelpers.index_and_kickoff_downloads() |> Enum.filter(&Media.pending_download?(&1)) if source.download_media do diff --git a/lib/pinchflat/slow_indexing/slow_indexing_helpers.ex b/lib/pinchflat/slow_indexing/slow_indexing_helpers.ex index ed70007..60d0446 100644 --- a/lib/pinchflat/slow_indexing/slow_indexing_helpers.ex +++ b/lib/pinchflat/slow_indexing/slow_indexing_helpers.ex @@ -39,7 +39,6 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do def kickoff_indexing_task(%Source{} = source, job_args \\ %{}, job_opts \\ []) do job_offset_seconds = if job_args[:force], do: 0, else: calculate_job_offset_seconds(source) - Tasks.delete_pending_tasks_for(source, "FastIndexingWorker") Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker", include_executing: true) MediaCollectionIndexingWorker.kickoff_with_task(source, job_args, job_opts ++ [schedule_in: job_offset_seconds]) diff --git a/lib/pinchflat/sources/sources.ex b/lib/pinchflat/sources/sources.ex index 99b4a56..ad8365c 100644 --- a/lib/pinchflat/sources/sources.ex +++ b/lib/pinchflat/sources/sources.ex @@ -300,6 +300,10 @@ defmodule Pinchflat.Sources do %{__meta__: %{state: :built}} -> SlowIndexingHelpers.kickoff_indexing_task(source) + if Ecto.Changeset.get_field(changeset, :fast_index) do + FastIndexingHelpers.kickoff_indexing_task(source) + end + # If the record has been persisted, only run indexing if the # indexing frequency has been changed and is now greater than 0 %{__meta__: %{state: :loaded}} -> diff --git a/test/pinchflat/downloading/downloading_helpers_test.exs b/test/pinchflat/downloading/downloading_helpers_test.exs index 5ab4f40..cc2f62a 100644 --- a/test/pinchflat/downloading/downloading_helpers_test.exs +++ b/test/pinchflat/downloading/downloading_helpers_test.exs @@ -10,7 +10,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do alias Pinchflat.Downloading.MediaDownloadWorker describe "enqueue_pending_download_tasks/1" do - test "it enqueues a job for each pending media item" do + test "enqueues a job for each pending media item" do source = source_fixture() media_item = media_item_fixture(source_id: source.id, media_filepath: nil) @@ -19,7 +19,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}) end - test "it does not enqueue a job for media items with a filepath" do + test "does not enqueue a job for media items with a filepath" do source = source_fixture() _media_item = media_item_fixture(source_id: source.id, media_filepath: "some/filepath.mp4") @@ -28,7 +28,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do refute_enqueued(worker: MediaDownloadWorker) end - test "it attaches a task to each enqueued job" do + test "attaches a task to each enqueued job" do source = source_fixture() media_item = media_item_fixture(source_id: source.id, media_filepath: nil) @@ -39,7 +39,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do assert [_] = Tasks.list_tasks_for(media_item) end - test "it does not create a job if the source is set to not download" do + test "does not create a job if the source is set to not download" do source = source_fixture(download_media: false) assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source) @@ -47,17 +47,26 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do refute_enqueued(worker: MediaDownloadWorker) end - test "it does not attach tasks if the source is set to not download" do + test "does not attach tasks if the source is set to not download" do source = source_fixture(download_media: false) media_item = media_item_fixture(source_id: source.id, media_filepath: nil) assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source) assert [] = Tasks.list_tasks_for(media_item) end + + test "can pass job options" do + source = source_fixture() + media_item = media_item_fixture(source_id: source.id, media_filepath: nil) + + assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source, priority: 1) + + assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}, priority: 1) + end end describe "dequeue_pending_download_tasks/1" do - test "it deletes all pending tasks for a source's media items" do + test "deletes all pending tasks for a source's media items" do source = source_fixture() media_item = media_item_fixture(source_id: source.id, media_filepath: nil) @@ -109,6 +118,14 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do refute_enqueued(worker: MediaDownloadWorker) end + + test "can pass job options" do + media_item = media_item_fixture(media_filepath: nil) + + assert {:ok, _} = DownloadingHelpers.kickoff_download_if_pending(media_item, priority: 1) + + assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}, priority: 1) + end end describe "kickoff_redownload_for_existing_media/1" do diff --git a/test/pinchflat/downloading/media_download_worker_test.exs b/test/pinchflat/downloading/media_download_worker_test.exs index e104f27..812ec95 100644 --- a/test/pinchflat/downloading/media_download_worker_test.exs +++ b/test/pinchflat/downloading/media_download_worker_test.exs @@ -46,13 +46,20 @@ defmodule Pinchflat.Downloading.MediaDownloadWorkerTest do assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id, "force" => true}) end - test "can be called with additional job options", %{media_item: media_item} do - job_opts = [max_attempts: 5] - - assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item, %{}, job_opts) + test "has a priority of 5 by default", %{media_item: media_item} do + assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item) [job] = all_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}) - assert job.max_attempts == 5 + + assert job.priority == 5 + end + + test "priority can be set", %{media_item: media_item} do + assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item, %{}, priority: 0) + + [job] = all_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}) + + assert job.priority == 0 end end diff --git a/test/pinchflat/fast_indexing/fast_indexing_helpers_test.exs b/test/pinchflat/fast_indexing/fast_indexing_helpers_test.exs index 8ef98d7..a798275 100644 --- a/test/pinchflat/fast_indexing/fast_indexing_helpers_test.exs +++ b/test/pinchflat/fast_indexing/fast_indexing_helpers_test.exs @@ -38,36 +38,48 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do end end - describe "kickoff_download_tasks_from_youtube_rss_feed/1" do - test "enqueues a new worker for each new media_id in the source's RSS feed", %{source: source} do + describe "index_and_kickoff_downloads/1" do + test "enqueues a worker for each new media_id in the source's RSS feed", %{source: source} do expect(HTTPClientMock, :get, fn _url -> {:ok, "test_1"} end) - assert [media_item] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [media_item] = FastIndexingHelpers.index_and_kickoff_downloads(source) assert [worker] = all_enqueued(worker: MediaDownloadWorker) assert worker.args["id"] == media_item.id + assert worker.priority == 0 end test "does not enqueue a new worker for the source's media IDs we already know about", %{source: source} do expect(HTTPClientMock, :get, fn _url -> {:ok, "test_1"} end) media_item_fixture(source_id: source.id, media_id: "test_1") - assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source) refute_enqueued(worker: MediaDownloadWorker) end + test "kicks off a download task for all pending media but at a lower priority", %{source: source} do + pending_item = media_item_fixture(source_id: source.id, media_filepath: nil) + expect(HTTPClientMock, :get, fn _url -> {:ok, "test_1"} end) + + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) + + assert [worker_1, _worker_2] = all_enqueued(worker: MediaDownloadWorker) + assert worker_1.args["id"] == pending_item.id + assert worker_1.priority == 1 + end + test "returns the found media items", %{source: source} do expect(HTTPClientMock, :get, fn _url -> {:ok, "test_1"} end) - assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) end test "does not enqueue a download job if the source does not allow it" do expect(HTTPClientMock, :get, fn _url -> {:ok, "test_1"} end) source = source_fixture(%{download_media: false}) - assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) refute_enqueued(worker: MediaDownloadWorker) end @@ -75,7 +87,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do test "creates a download task record", %{source: source} do expect(HTTPClientMock, :get, fn _url -> {:ok, "test_1"} end) - assert [media_item] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [media_item] = FastIndexingHelpers.index_and_kickoff_downloads(source) assert [_] = Tasks.list_tasks_for(media_item, "MediaDownloadWorker") end @@ -89,7 +101,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do {:ok, media_attributes_return_fixture()} end) - FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + FastIndexingHelpers.index_and_kickoff_downloads(source) end test "sets use_cookies if the source uses cookies" do @@ -103,7 +115,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do source = source_fixture(%{use_cookies: true}) - assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) end test "does not set use_cookies if the source does not use cookies" do @@ -117,7 +129,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do source = source_fixture(%{use_cookies: false}) - assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) end test "does not enqueue a download job if the media item does not match the format rules" do @@ -142,7 +154,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do {:ok, output} end) - assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) refute_enqueued(worker: MediaDownloadWorker) end @@ -154,7 +166,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do {:ok, "{}"} end) - assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source) end test "does not blow up if a media item causes a yt-dlp error", %{source: source} do @@ -164,11 +176,11 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do {:error, "message", 1} end) - assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source) end end - describe "kickoff_download_tasks_from_youtube_rss_feed/1 when testing backends" do + describe "index_and_kickoff_downloads/1 when testing backends" do test "uses the YouTube API if it is enabled", %{source: source} do expect(HTTPClientMock, :get, fn url, _headers -> assert url =~ "https://youtube.googleapis.com/youtube/v3/playlistItems" @@ -178,7 +190,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do Settings.set(youtube_api_key: "test_key") - assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source) end test "the YouTube API creates records as expected", %{source: source} do @@ -188,7 +200,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do Settings.set(youtube_api_key: "test_key") - assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) end test "RSS is used as a backup if the API fails", %{source: source} do @@ -197,7 +209,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do Settings.set(youtube_api_key: "test_key") - assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) end test "RSS is used if the API is not enabled", %{source: source} do @@ -209,7 +221,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do Settings.set(youtube_api_key: nil) - assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source) + assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source) end end end diff --git a/test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs b/test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs index 2ef57ef..6e00622 100644 --- a/test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs +++ b/test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs @@ -96,26 +96,6 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end end - test "deletes any pending media tasks for the source" do - source = source_fixture() - {:ok, job} = Oban.insert(FastIndexingWorker.new(%{"id" => source.id})) - task = task_fixture(source_id: source.id, job_id: job.id) - - assert {:ok, _} = SlowIndexingHelpers.kickoff_indexing_task(source) - - assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end - end - - test "deletes any fast indexing tasks for the source" do - source = source_fixture() - {:ok, job} = Oban.insert(FastIndexingWorker.new(%{"id" => source.id})) - task = task_fixture(source_id: source.id, job_id: job.id) - - assert {:ok, _} = SlowIndexingHelpers.kickoff_indexing_task(source) - - assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end - end - test "can be called with additional job arguments" do source = source_fixture(index_frequency_minutes: 1) job_args = %{"force" => true} diff --git a/test/pinchflat/sources_test.exs b/test/pinchflat/sources_test.exs index 766c71f..e8e02ae 100644 --- a/test/pinchflat/sources_test.exs +++ b/test/pinchflat/sources_test.exs @@ -294,6 +294,34 @@ defmodule Pinchflat.SourcesTest do assert_enqueued(worker: MediaCollectionIndexingWorker, args: %{"id" => source.id}) end + test "creation will schedule a fast indexing job if the fast_index option is set" do + expect(YtDlpRunnerMock, :run, &channel_mock/5) + + valid_attrs = %{ + media_profile_id: media_profile_fixture().id, + original_url: "https://www.youtube.com/channel/abc123", + fast_index: true + } + + assert {:ok, %Source{} = source} = Sources.create_source(valid_attrs) + + assert_enqueued(worker: FastIndexingWorker, args: %{"id" => source.id}) + end + + test "creation will not schedule a fast indexing job if the fast_index option is not set" do + expect(YtDlpRunnerMock, :run, &channel_mock/5) + + valid_attrs = %{ + media_profile_id: media_profile_fixture().id, + original_url: "https://www.youtube.com/channel/abc123", + fast_index: false + } + + assert {:ok, %Source{}} = Sources.create_source(valid_attrs) + + refute_enqueued(worker: FastIndexingWorker) + end + test "creation schedules an index test even if the index frequency is 0" do expect(YtDlpRunnerMock, :run, &channel_mock/5)