[Enhancement] Run fast indexing on source creation and at higher priority (#583)

* Updated default job priorities for downloading queue

* Added the ability to set priority to various downloading helpers

* Sets sources to fast index on creation
This commit is contained in:
Kieran 2025-01-22 14:54:15 -08:00 committed by GitHub
parent 704d29dc7e
commit 62214b80a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 114 additions and 61 deletions

View file

@ -47,7 +47,7 @@ config :pinchflat, Pinchflat.Repo,
config :pinchflat, Oban,
queues: [
default: 10,
fast_indexing: 6,
fast_indexing: yt_dlp_worker_count,
media_collection_indexing: yt_dlp_worker_count,
media_fetching: yt_dlp_worker_count,
remote_metadata: yt_dlp_worker_count,

View file

@ -27,13 +27,15 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do
Returns :ok
"""
def enqueue_pending_download_tasks(%Source{download_media: true} = source) do
def enqueue_pending_download_tasks(source, job_opts \\ [])
def enqueue_pending_download_tasks(%Source{download_media: true} = source, job_opts) do
source
|> Media.list_pending_media_items_for()
|> Enum.each(&MediaDownloadWorker.kickoff_with_task/1)
|> Enum.each(&MediaDownloadWorker.kickoff_with_task(&1, %{}, job_opts))
end
def enqueue_pending_download_tasks(%Source{download_media: false}) do
def enqueue_pending_download_tasks(%Source{download_media: false}, _job_opts) do
:ok
end
@ -55,13 +57,13 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do
Returns {:ok, %Task{}} | {:error, :should_not_download} | {:error, any()}
"""
def kickoff_download_if_pending(%MediaItem{} = media_item) do
def kickoff_download_if_pending(%MediaItem{} = media_item, job_opts \\ []) do
media_item = Repo.preload(media_item, :source)
if media_item.source.download_media && Media.pending_download?(media_item) do
Logger.info("Kicking off download for media item ##{media_item.id} (#{media_item.media_id})")
MediaDownloadWorker.kickoff_with_task(media_item)
MediaDownloadWorker.kickoff_with_task(media_item, %{}, job_opts)
else
{:error, :should_not_download}
end

View file

@ -3,6 +3,7 @@ defmodule Pinchflat.Downloading.MediaDownloadWorker do
use Oban.Worker,
queue: :media_fetching,
priority: 5,
unique: [period: :infinity, states: [:available, :scheduled, :retryable, :executing]],
tags: ["media_item", "media_fetching", "show_in_dashboard"]

View file

@ -40,7 +40,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
Returns [%MediaItem{}] where each item is a new media item that was created _but not necessarily
downloaded_.
"""
def kickoff_download_tasks_from_youtube_rss_feed(%Source{} = source) do
def index_and_kickoff_downloads(%Source{} = source) do
# The media_profile is needed to determine the quality options to _then_ determine a more
# accurate predicted filepath
source = Repo.preload(source, [:media_profile])
@ -53,6 +53,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
Enum.map(new_media_ids, fn media_id ->
case create_media_item_from_media_id(source, media_id) do
{:ok, media_item} ->
DownloadingHelpers.kickoff_download_if_pending(media_item, priority: 0)
media_item
err ->
@ -61,7 +62,9 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
end
end)
DownloadingHelpers.enqueue_pending_download_tasks(source)
# Pick up any stragglers. Intentionally has a lower priority than the per-media item
# kickoff above
DownloadingHelpers.enqueue_pending_download_tasks(source, priority: 1)
Enum.filter(maybe_new_media_items, & &1)
end

View file

@ -38,8 +38,8 @@ defmodule Pinchflat.FastIndexing.FastIndexingWorker do
Order of operations:
1. FastIndexingWorker (this module) periodically checks the YouTube RSS feed for new media.
with `FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed`
2. If the above `kickoff_download_tasks_from_youtube_rss_feed` finds new media items in the RSS feed,
with `FastIndexingHelpers.index_and_kickoff_downloads`
2. If the above `index_and_kickoff_downloads` finds new media items in the RSS feed,
it indexes them with a yt-dlp call to create the media item records then kicks off downloading
tasks (MediaDownloadWorker) for any new media items _that should be downloaded_.
3. Once downloads are kicked off, this worker sends a notification to the apprise server if applicable
@ -67,7 +67,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingWorker do
new_media_items =
source
|> FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed()
|> FastIndexingHelpers.index_and_kickoff_downloads()
|> Enum.filter(&Media.pending_download?(&1))
if source.download_media do

View file

@ -39,7 +39,6 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
def kickoff_indexing_task(%Source{} = source, job_args \\ %{}, job_opts \\ []) do
job_offset_seconds = if job_args[:force], do: 0, else: calculate_job_offset_seconds(source)
Tasks.delete_pending_tasks_for(source, "FastIndexingWorker")
Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker", include_executing: true)
MediaCollectionIndexingWorker.kickoff_with_task(source, job_args, job_opts ++ [schedule_in: job_offset_seconds])

View file

@ -300,6 +300,10 @@ defmodule Pinchflat.Sources do
%{__meta__: %{state: :built}} ->
SlowIndexingHelpers.kickoff_indexing_task(source)
if Ecto.Changeset.get_field(changeset, :fast_index) do
FastIndexingHelpers.kickoff_indexing_task(source)
end
# If the record has been persisted, only run indexing if the
# indexing frequency has been changed and is now greater than 0
%{__meta__: %{state: :loaded}} ->

View file

@ -10,7 +10,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
alias Pinchflat.Downloading.MediaDownloadWorker
describe "enqueue_pending_download_tasks/1" do
test "it enqueues a job for each pending media item" do
test "enqueues a job for each pending media item" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
@ -19,7 +19,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
end
test "it does not enqueue a job for media items with a filepath" do
test "does not enqueue a job for media items with a filepath" do
source = source_fixture()
_media_item = media_item_fixture(source_id: source.id, media_filepath: "some/filepath.mp4")
@ -28,7 +28,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
refute_enqueued(worker: MediaDownloadWorker)
end
test "it attaches a task to each enqueued job" do
test "attaches a task to each enqueued job" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
@ -39,7 +39,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
assert [_] = Tasks.list_tasks_for(media_item)
end
test "it does not create a job if the source is set to not download" do
test "does not create a job if the source is set to not download" do
source = source_fixture(download_media: false)
assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source)
@ -47,17 +47,26 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
refute_enqueued(worker: MediaDownloadWorker)
end
test "it does not attach tasks if the source is set to not download" do
test "does not attach tasks if the source is set to not download" do
source = source_fixture(download_media: false)
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source)
assert [] = Tasks.list_tasks_for(media_item)
end
test "can pass job options" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source, priority: 1)
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}, priority: 1)
end
end
describe "dequeue_pending_download_tasks/1" do
test "it deletes all pending tasks for a source's media items" do
test "deletes all pending tasks for a source's media items" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
@ -109,6 +118,14 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
refute_enqueued(worker: MediaDownloadWorker)
end
test "can pass job options" do
media_item = media_item_fixture(media_filepath: nil)
assert {:ok, _} = DownloadingHelpers.kickoff_download_if_pending(media_item, priority: 1)
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}, priority: 1)
end
end
describe "kickoff_redownload_for_existing_media/1" do

View file

@ -46,13 +46,20 @@ defmodule Pinchflat.Downloading.MediaDownloadWorkerTest do
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id, "force" => true})
end
test "can be called with additional job options", %{media_item: media_item} do
job_opts = [max_attempts: 5]
assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item, %{}, job_opts)
test "has a priority of 5 by default", %{media_item: media_item} do
assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item)
[job] = all_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
assert job.max_attempts == 5
assert job.priority == 5
end
test "priority can be set", %{media_item: media_item} do
assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item, %{}, priority: 0)
[job] = all_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
assert job.priority == 0
end
end

View file

@ -38,36 +38,48 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
end
end
describe "kickoff_download_tasks_from_youtube_rss_feed/1" do
test "enqueues a new worker for each new media_id in the source's RSS feed", %{source: source} do
describe "index_and_kickoff_downloads/1" do
test "enqueues a worker for each new media_id in the source's RSS feed", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
assert [media_item] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [media_item] = FastIndexingHelpers.index_and_kickoff_downloads(source)
assert [worker] = all_enqueued(worker: MediaDownloadWorker)
assert worker.args["id"] == media_item.id
assert worker.priority == 0
end
test "does not enqueue a new worker for the source's media IDs we already know about", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
media_item_fixture(source_id: source.id, media_id: "test_1")
assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source)
refute_enqueued(worker: MediaDownloadWorker)
end
test "kicks off a download task for all pending media but at a lower priority", %{source: source} do
pending_item = media_item_fixture(source_id: source.id, media_filepath: nil)
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
assert [worker_1, _worker_2] = all_enqueued(worker: MediaDownloadWorker)
assert worker_1.args["id"] == pending_item.id
assert worker_1.priority == 1
end
test "returns the found media items", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
test "does not enqueue a download job if the source does not allow it" do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
source = source_fixture(%{download_media: false})
assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
refute_enqueued(worker: MediaDownloadWorker)
end
@ -75,7 +87,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
test "creates a download task record", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
assert [media_item] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [media_item] = FastIndexingHelpers.index_and_kickoff_downloads(source)
assert [_] = Tasks.list_tasks_for(media_item, "MediaDownloadWorker")
end
@ -89,7 +101,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
{:ok, media_attributes_return_fixture()}
end)
FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
FastIndexingHelpers.index_and_kickoff_downloads(source)
end
test "sets use_cookies if the source uses cookies" do
@ -103,7 +115,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
source = source_fixture(%{use_cookies: true})
assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
test "does not set use_cookies if the source does not use cookies" do
@ -117,7 +129,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
source = source_fixture(%{use_cookies: false})
assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
test "does not enqueue a download job if the media item does not match the format rules" do
@ -142,7 +154,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
{:ok, output}
end)
assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
refute_enqueued(worker: MediaDownloadWorker)
end
@ -154,7 +166,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
{:ok, "{}"}
end)
assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
test "does not blow up if a media item causes a yt-dlp error", %{source: source} do
@ -164,11 +176,11 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
{:error, "message", 1}
end)
assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
end
describe "kickoff_download_tasks_from_youtube_rss_feed/1 when testing backends" do
describe "index_and_kickoff_downloads/1 when testing backends" do
test "uses the YouTube API if it is enabled", %{source: source} do
expect(HTTPClientMock, :get, fn url, _headers ->
assert url =~ "https://youtube.googleapis.com/youtube/v3/playlistItems"
@ -178,7 +190,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
Settings.set(youtube_api_key: "test_key")
assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
test "the YouTube API creates records as expected", %{source: source} do
@ -188,7 +200,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
Settings.set(youtube_api_key: "test_key")
assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
test "RSS is used as a backup if the API fails", %{source: source} do
@ -197,7 +209,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
Settings.set(youtube_api_key: "test_key")
assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
test "RSS is used if the API is not enabled", %{source: source} do
@ -209,7 +221,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
Settings.set(youtube_api_key: nil)
assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
end
end

View file

@ -96,26 +96,6 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end
test "deletes any pending media tasks for the source" do
source = source_fixture()
{:ok, job} = Oban.insert(FastIndexingWorker.new(%{"id" => source.id}))
task = task_fixture(source_id: source.id, job_id: job.id)
assert {:ok, _} = SlowIndexingHelpers.kickoff_indexing_task(source)
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end
test "deletes any fast indexing tasks for the source" do
source = source_fixture()
{:ok, job} = Oban.insert(FastIndexingWorker.new(%{"id" => source.id}))
task = task_fixture(source_id: source.id, job_id: job.id)
assert {:ok, _} = SlowIndexingHelpers.kickoff_indexing_task(source)
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end
test "can be called with additional job arguments" do
source = source_fixture(index_frequency_minutes: 1)
job_args = %{"force" => true}

View file

@ -294,6 +294,34 @@ defmodule Pinchflat.SourcesTest do
assert_enqueued(worker: MediaCollectionIndexingWorker, args: %{"id" => source.id})
end
test "creation will schedule a fast indexing job if the fast_index option is set" do
expect(YtDlpRunnerMock, :run, &channel_mock/5)
valid_attrs = %{
media_profile_id: media_profile_fixture().id,
original_url: "https://www.youtube.com/channel/abc123",
fast_index: true
}
assert {:ok, %Source{} = source} = Sources.create_source(valid_attrs)
assert_enqueued(worker: FastIndexingWorker, args: %{"id" => source.id})
end
test "creation will not schedule a fast indexing job if the fast_index option is not set" do
expect(YtDlpRunnerMock, :run, &channel_mock/5)
valid_attrs = %{
media_profile_id: media_profile_fixture().id,
original_url: "https://www.youtube.com/channel/abc123",
fast_index: false
}
assert {:ok, %Source{}} = Sources.create_source(valid_attrs)
refute_enqueued(worker: FastIndexingWorker)
end
test "creation schedules an index test even if the index frequency is 0" do
expect(YtDlpRunnerMock, :run, &channel_mock/5)