[Enhancement] Overhaul indexing to be more efficient (#540)

* WIP - created methods for breaking on existing media

* WIP - got everything hooked up for POC

* Add some docs, tests

* Refactors

* Updated TODO
This commit is contained in:
Kieran 2025-01-02 15:48:18 -08:00 committed by GitHub
parent 09d1653f4b
commit 9185f075ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 236 additions and 48 deletions

View file

@ -79,21 +79,21 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorker do
case {source.index_frequency_minutes, source.last_indexed_at} do
{index_freq, _} when index_freq > 0 ->
# If the indexing is on a schedule simply run indexing and reschedule
perform_indexing_and_notification(source)
perform_indexing_and_notification(source, was_forced: args["force"])
maybe_enqueue_fast_indexing_task(source)
reschedule_indexing(source)
{_, nil} ->
# If the source has never been indexed, index it once
# even if it's not meant to reschedule
perform_indexing_and_notification(source)
perform_indexing_and_notification(source, was_forced: args["force"])
:ok
_ ->
# If the source HAS been indexed and is not meant to reschedule,
# perform a no-op (unless forced)
if args["force"] do
perform_indexing_and_notification(source)
perform_indexing_and_notification(source, was_forced: true)
end
:ok
@ -103,11 +103,11 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorker do
Ecto.StaleEntryError -> Logger.info("#{__MODULE__} discarded: source #{source_id} stale")
end
defp perform_indexing_and_notification(source) do
defp perform_indexing_and_notification(source, indexing_opts) do
apprise_server = Settings.get!(:apprise_server)
SourceNotifications.wrap_new_media_notification(apprise_server, source, fn ->
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source)
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source, indexing_opts)
end)
end

View file

@ -5,6 +5,8 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
Many of these methods are made to be kickoff or be consumed by workers.
"""
use Pinchflat.Media.MediaQuery
require Logger
alias Pinchflat.Repo
@ -14,6 +16,7 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
alias Pinchflat.Sources.Source
alias Pinchflat.Media.MediaItem
alias Pinchflat.YtDlp.MediaCollection
alias Pinchflat.Utils.FilesystemUtils
alias Pinchflat.Downloading.DownloadingHelpers
alias Pinchflat.SlowIndexing.FileFollowerServer
alias Pinchflat.Downloading.DownloadOptionBuilder
@ -22,13 +25,19 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
alias Pinchflat.YtDlp.Media, as: YtDlpMedia
@doc """
Starts tasks for indexing a source's media regardless of the source's indexing
frequency. It's assumed the caller will check for indexing frequency.
Kills old indexing tasks and starts a new task to index the media collection.
The job is delayed based on the source's `index_frequency_minutes` setting unless
one of the following is true:
- The `force` option is set to true
- The source has never been indexed before
- The source has been indexed before, but the last indexing job was more than
`index_frequency_minutes` ago
Returns {:ok, %Task{}}
"""
def kickoff_indexing_task(%Source{} = source, job_args \\ %{}, job_opts \\ []) do
job_offset_seconds = calculate_job_offset_seconds(source)
job_offset_seconds = if job_args[:force], do: 0, else: calculate_job_offset_seconds(source)
Tasks.delete_pending_tasks_for(source, "FastIndexingWorker")
Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker", include_executing: true)
@ -52,8 +61,8 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
@doc """
Given a media source, creates (indexes) the media by creating media_items for each
media ID in the source. Afterward, kicks off a download task for each pending media
item belonging to the source. You can't tell me the method name isn't descriptive!
Returns a list of media items or changesets (if the media item couldn't be created).
item belonging to the source. Returns a list of media items or changesets
(if the media item couldn't be created).
Indexing is slow and usually returns a list of all media data at once for record creation.
To help with this, we use a file follower to watch the file that yt-dlp writes to
@ -61,23 +70,33 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
clarity to the user experience. This has a few things to be aware of which are documented
below in the file watcher setup method.
Additionally, in the case of a repeat index we create a download archive file that
contains some media IDs that we've indexed in the past. Note that this archive doesn't
contain the most recent IDs but rather a subset of IDs that are offset by some amount.
Practically, this means that we'll re-index a small handful of media that we've recently
indexed, but this is a good thing since it'll let us pick up on any recent changes to the
most recent media items.
We don't create a download archive for playlists (only channels), nor do we create one if
the indexing was forced by the user.
NOTE: downloads are only enqueued if the source is set to download media. Downloads are
also enqueued for ALL pending media items, not just the ones that were indexed in this
job run. This should ensure that any stragglers are caught if, for some reason, they
weren't enqueued or somehow got de-queued.
Since indexing returns all media data EVERY TIME, we that that opportunity to update
indexing metadata for media items that have already been created.
Available options:
- `was_forced`: Whether the indexing was forced by the user
Returns [%MediaItem{} | %Ecto.Changeset{}]
"""
def index_and_enqueue_download_for_media_items(%Source{} = source) do
def index_and_enqueue_download_for_media_items(%Source{} = source, opts \\ []) do
# The media_profile is needed to determine the quality options to _then_ determine a more
# accurate predicted filepath
source = Repo.preload(source, [:media_profile])
# See the method definition below for more info on how file watchers work
# (important reading if you're not familiar with it)
{:ok, media_attributes} = setup_file_watcher_and_kickoff_indexing(source)
{:ok, media_attributes} = setup_file_watcher_and_kickoff_indexing(source, opts)
# Reload because the source may have been updated during the (long-running) indexing process
# and important settings like `download_media` may have changed.
source = Repo.reload!(source)
@ -109,14 +128,16 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
# It attempts a graceful shutdown of the file follower after the indexing is done,
# but the FileFollowerServer will also stop itself if it doesn't see any activity
# for a sufficiently long time.
defp setup_file_watcher_and_kickoff_indexing(source) do
defp setup_file_watcher_and_kickoff_indexing(source, opts) do
was_forced = Keyword.get(opts, :was_forced, false)
{:ok, pid} = FileFollowerServer.start_link()
handler = fn filepath -> setup_file_follower_watcher(pid, filepath, source) end
command_opts =
[output: DownloadOptionBuilder.build_output_path_for(source)] ++
DownloadOptionBuilder.build_quality_options_for(source)
DownloadOptionBuilder.build_quality_options_for(source) ++
build_download_archive_options(source, was_forced)
runner_opts = [file_listener_handler: handler, use_cookies: source.use_cookies]
result = MediaCollection.get_media_attributes_for_collection(source.original_url, command_opts, runner_opts)
@ -166,4 +187,57 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
max(0, index_frequency_seconds - offset_seconds)
end
# The download archive file works in tandem with --break-on-existing to stop
# yt-dlp once we've hit media items we've already indexed. But we generate
# this list with a bit of an offset so we do intentionally re-scan some media
# items to pick up any recent changes (see `get_media_items_for_download_archive`).
#
# From there, we format the media IDs in the way that yt-dlp expects (ie: "<extractor> <media_id>")
# and return the filepath to the caller.
defp create_download_archive_file(source) do
tmpfile = FilesystemUtils.generate_metadata_tmpfile(:txt)
archive_contents =
source
|> get_media_items_for_download_archive()
|> Enum.map_join("\n", fn media_item -> "youtube #{media_item.media_id}" end)
case File.write(tmpfile, archive_contents) do
:ok -> tmpfile
err -> err
end
end
# Sorting by `uploaded_at` is important because we want to re-index the most recent
# media items first but there is no guarantee of any correlation between ID and uploaded_at.
#
# The offset is important because we want to re-index some media items that we've
# recently indexed to pick up on any changes. The limit is because we want this mechanism
# to work even if, for example, the video we were using as a stopping point was deleted.
# It's not a perfect system, but it should do well enough.
#
# The chosen limit and offset are arbitary, independent, and vibes-based. Feel free to
# tweak as-needed
defp get_media_items_for_download_archive(source) do
MediaQuery.new()
|> where(^MediaQuery.for_source(source))
|> order_by(desc: :uploaded_at)
|> limit(50)
|> offset(20)
|> Repo.all()
end
# The download archive isn't useful for playlists (since those are ordered arbitrarily)
# and we don't want to use it if the indexing was forced by the user. In other words,
# only create an archive for channels that are being indexed as part of their regular
# indexing schedule
defp build_download_archive_options(%Source{collection_type: :playlist}, _was_forced), do: []
defp build_download_archive_options(_source, true), do: []
defp build_download_archive_options(source, _was_forced) do
archive_file = create_download_archive_file(source)
[:break_on_existing, download_archive: archive_file]
end
end

View file

@ -45,8 +45,20 @@ defmodule Pinchflat.Utils.FilesystemUtils do
Returns binary()
"""
def generate_metadata_tmpfile(type) do
filename = StringUtils.random_string(64)
# This "namespacing" is more to help with development since things get
# weird in my editor when there are thousands of files in a single directory
first_two = String.slice(filename, 0..1)
second_two = String.slice(filename, 2..3)
tmpfile_directory = Application.get_env(:pinchflat, :tmpfile_directory)
filepath = Path.join([tmpfile_directory, "#{StringUtils.random_string(64)}.#{type}"])
filepath =
Path.join([
tmpfile_directory,
first_two,
second_two,
"#{filename}.#{type}"
])
:ok = write_p!(filepath, "")

View file

@ -39,7 +39,13 @@ defmodule Pinchflat.YtDlp.CommandRunner do
formatted_command_opts = [url] ++ CliUtils.parse_options(all_opts)
case CliUtils.wrap_cmd(command, formatted_command_opts, stderr_to_stdout: true) do
{_, 0} ->
# yt-dlp exit codes:
# 0 = Everything is successful
# 100 = yt-dlp must restart for update to complete
# 101 = Download cancelled by --max-downloads etc
# 2 = Error in user-provided options
# 1 = Any other error
{_, status} when status in [0, 101] ->
# IDEA: consider deleting the file after reading it. It's in the tmp dir, so it's not
# a huge deal, but it's still a good idea to clean up after ourselves.
# (even on error? especially on error?)

View file

@ -23,7 +23,6 @@ defmodule Pinchflat.YtDlp.MediaCollection do
Returns {:ok, [map()]} | {:error, any, ...}.
"""
def get_media_attributes_for_collection(url, command_opts \\ [], addl_opts \\ []) do
runner = Application.get_env(:pinchflat, :yt_dlp_runner)
# `ignore_no_formats_error` is necessary because yt-dlp will error out if
# the first video has not released yet (ie: is a premier). We don't care about
# available formats since we're just getting the media details
@ -39,7 +38,7 @@ defmodule Pinchflat.YtDlp.MediaCollection do
file_listener_handler.(output_filepath)
end
case runner.run(url, action, all_command_opts, output_template, runner_opts) do
case backend_runner().run(url, action, all_command_opts, output_template, runner_opts) do
{:ok, output} ->
parsed_lines =
output

View file

@ -54,7 +54,7 @@
<.link
href={~p"/sources/#{@source}/force_index"}
method="post"
data-confirm="Are you sure you want to force an index of this source? This isn't normally needed."
data-confirm="Are you sure you want index all content from this source? This isn't normally needed."
>
Force Index
</.link>

View file

@ -57,25 +57,51 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
:ok
end
test "it indexes the source if it should be indexed" do
test "indexes the source if it should be indexed" do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, ""}
end)
source = source_fixture(index_frequency_minutes: 10)
perform_job(MediaCollectionIndexingWorker, %{id: source.id})
end
test "it indexes the source no matter what if the source has never been indexed before" do
test "indexes the source no matter what if the source has never been indexed before" do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, ""}
end)
source = source_fixture(index_frequency_minutes: 0, last_indexed_at: nil)
perform_job(MediaCollectionIndexingWorker, %{id: source.id})
end
test "it indexes the source no matter what if the 'force' arg is passed" do
test "indexes the source no matter what if the 'force' arg is passed" do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, ""}
end)
source = source_fixture(index_frequency_minutes: 0, last_indexed_at: DateTime.utc_now())
perform_job(MediaCollectionIndexingWorker, %{id: source.id, force: true})
end
test "it does not do any indexing if the source has been indexed and shouldn't be rescheduled" do
test "doesn't use a download archive if the index has been forced" do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, opts, _ot, _addl_opts ->
refute :break_on_existing in opts
refute Keyword.has_key?(opts, :download_archive)
{:ok, ""}
end)
source =
source_fixture(collection_type: :channel, index_frequency_minutes: 0, last_indexed_at: DateTime.utc_now())
perform_job(MediaCollectionIndexingWorker, %{id: source.id, force: true})
end
test "does not do any indexing if the source has been indexed and shouldn't be rescheduled" do
expect(YtDlpRunnerMock, :run, 0, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, ""}
end)
@ -85,7 +111,7 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
perform_job(MediaCollectionIndexingWorker, %{id: source.id})
end
test "it does not reschedule if the source shouldn't be indexed" do
test "does not reschedule if the source shouldn't be indexed" do
stub(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts -> {:ok, ""} end)
source = source_fixture(index_frequency_minutes: -1)
@ -94,7 +120,7 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
refute_enqueued(worker: MediaCollectionIndexingWorker, args: %{"id" => source.id})
end
test "it kicks off a download job for each pending media item" do
test "kicks off a download job for each pending media item" do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
@ -105,7 +131,7 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
assert length(all_enqueued(worker: MediaDownloadWorker)) == 3
end
test "it starts a job for any pending media item even if it's from another run" do
test "starts a job for any pending media item even if it's from another run" do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
@ -117,7 +143,7 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
assert length(all_enqueued(worker: MediaDownloadWorker)) == 4
end
test "it does not kick off a job for media items that could not be saved" do
test "does not kick off a job for media items that could not be saved" do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
@ -130,7 +156,7 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
assert length(all_enqueued(worker: MediaDownloadWorker))
end
test "it reschedules the job based on the index frequency" do
test "reschedules the job based on the index frequency" do
source = source_fixture(index_frequency_minutes: 10)
perform_job(MediaCollectionIndexingWorker, %{id: source.id})
@ -141,7 +167,7 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
)
end
test "it creates a task for the rescheduled job" do
test "creates a task for the rescheduled job" do
source = source_fixture(index_frequency_minutes: 10)
task_count_fetcher = fn ->
@ -153,7 +179,7 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
end)
end
test "it creates a future task for fast indexing if appropriate" do
test "creates a future task for fast indexing if appropriate" do
source = source_fixture(index_frequency_minutes: 10, fast_index: true)
perform_job(MediaCollectionIndexingWorker, %{id: source.id})
@ -164,7 +190,7 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
)
end
test "it deletes existing fast indexing tasks if a new one is created" do
test "deletes existing fast indexing tasks if a new one is created" do
source = source_fixture(index_frequency_minutes: 10, fast_index: true)
{:ok, job} = Oban.insert(FastIndexingWorker.new(%{"id" => source.id}))
task = task_fixture(source_id: source.id, job_id: job.id)
@ -174,14 +200,14 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorkerTest do
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end
test "it does not create a task for fast indexing otherwise" do
test "does not create a task for fast indexing otherwise" do
source = source_fixture(index_frequency_minutes: 10, fast_index: false)
perform_job(MediaCollectionIndexingWorker, %{id: source.id})
refute_enqueued(worker: FastIndexingWorker)
end
test "it creates the basic media_item records" do
test "creates the basic media_item records" do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)

View file

@ -14,6 +14,10 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
alias Pinchflat.SlowIndexing.SlowIndexingHelpers
alias Pinchflat.SlowIndexing.MediaCollectionIndexingWorker
setup do
{:ok, %{source: source_fixture()}}
end
describe "kickoff_indexing_task/3" do
test "schedules a job" do
source = source_fixture(index_frequency_minutes: 1)
@ -53,6 +57,16 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
assert_in_delta DateTime.diff(job.scheduled_at, DateTime.utc_now(), :second), 0, 1
end
test "schedules a job immediately if the user is forcing an index" do
source = source_fixture(index_frequency_minutes: 30, last_indexed_at: now_minus(5, :minutes))
assert {:ok, _} = SlowIndexingHelpers.kickoff_indexing_task(source, %{force: true})
[job] = all_enqueued(worker: MediaCollectionIndexingWorker, args: %{"id" => source.id})
assert_in_delta DateTime.diff(job.scheduled_at, DateTime.utc_now(), :second), 0, 1
end
test "creates and attaches a task" do
source = source_fixture(index_frequency_minutes: 1)
@ -123,12 +137,6 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
end
describe "delete_indexing_tasks/2" do
setup do
source = source_fixture()
{:ok, %{source: source}}
end
test "deletes slow indexing tasks for the source", %{source: source} do
{:ok, job} = Oban.insert(MediaCollectionIndexingWorker.new(%{"id" => source.id}))
_task = task_fixture(source_id: source.id, job_id: job.id)
@ -172,13 +180,13 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
end
end
describe "index_and_enqueue_download_for_media_items/1" do
describe "index_and_enqueue_download_for_media_items/2" do
setup do
stub(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
{:ok, [source: source_fixture()]}
:ok
end
test "creates a media_item record for each media ID returned", %{source: source} do
@ -315,11 +323,7 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
end
end
describe "index_and_enqueue_download_for_media_items/1 when testing file watcher" do
setup do
{:ok, [source: source_fixture()]}
end
describe "index_and_enqueue_download_for_media_items/2 when testing file watcher" do
test "creates a new media item for everything already in the file", %{source: source} do
watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval)
@ -446,4 +450,62 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
assert [] = SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source)
end
end
describe "index_and_enqueue_download_for_media_items when testing the download archive" do
test "a download archive is used if the source is a channel", %{source: source} do
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, opts, _ot, _addl_opts ->
assert :break_on_existing in opts
assert Keyword.has_key?(opts, :download_archive)
{:ok, source_attributes_return_fixture()}
end)
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source)
end
test "a download archive is not used if the source is not a channel" do
source = source_fixture(%{collection_type: :playlist})
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, opts, _ot, _addl_opts ->
refute :break_on_existing in opts
refute Keyword.has_key?(opts, :download_archive)
{:ok, source_attributes_return_fixture()}
end)
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source)
end
test "a download archive is not used if the index has been forced to run" do
source = source_fixture(%{collection_type: :channel})
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, opts, _ot, _addl_opts ->
refute :break_on_existing in opts
refute Keyword.has_key?(opts, :download_archive)
{:ok, source_attributes_return_fixture()}
end)
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source, was_forced: true)
end
test "the download archive is formatted correctly and contains the right video", %{source: source} do
media_items =
1..21
|> Enum.map(fn n ->
media_item_fixture(%{source_id: source.id, uploaded_at: now_minus(n, :days)})
end)
expect(YtDlpRunnerMock, :run, fn _url, :get_media_attributes_for_collection, opts, _ot, _addl_opts ->
archive_file = Keyword.get(opts, :download_archive)
last_media_item = List.last(media_items)
assert File.read!(archive_file) == "youtube #{last_media_item.media_id}"
{:ok, source_attributes_return_fixture()}
end)
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source)
end
end
end

View file

@ -17,6 +17,12 @@ defmodule Pinchflat.YtDlp.CommandRunnerTest do
assert {:ok, _output} = Runner.run(@media_url, :foo, [], "")
end
test "considers a 101 exit code as being successful" do
wrap_executable("/app/test/support/scripts/yt-dlp-mocks/101_exit_code.sh", fn ->
assert {:ok, _output} = Runner.run(@media_url, :foo, [], "")
end)
end
test "includes the media url as the first argument" do
assert {:ok, output} = Runner.run(@media_url, :foo, [:ignore_errors], "")

View file

@ -0,0 +1,3 @@
#!/bin/bash
exit 101