Streaming media item creation during indexing (#49)

* Implemented streaming during indexing

* Updated file watcher to enqueue download; refactored download worker methods

* Updated File Follower Server timeout
This commit is contained in:
Kieran 2024-03-04 17:14:02 -08:00 committed by GitHub
parent b370c97bfb
commit f55cdc80dd
22 changed files with 613 additions and 98 deletions

View file

@ -15,6 +15,8 @@ alias Pinchflat.Sources
alias Pinchflat.MediaClient.{SourceDetails, VideoDownloader}
alias Pinchflat.Metadata.{Zipper, ThumbnailFetcher}
alias Pinchflat.Utils.FilesystemUtils.FileFollowerServer
defmodule IexHelpers do
def playlist_url do
"https://www.youtube.com/playlist?list=PLmqC3wPkeL8kSlTCcSMDD63gmSi7evcXS"

View file

@ -20,7 +20,8 @@ config :pinchflat,
# Setting AUTH_USERNAME and AUTH_PASSWORD implies you want to use basic auth.
# If either is unset, basic auth will not be used.
basic_auth_username: System.get_env("AUTH_USERNAME"),
basic_auth_password: System.get_env("AUTH_PASSWORD")
basic_auth_password: System.get_env("AUTH_PASSWORD"),
file_watcher_poll_interval: 1000
# Configures the endpoint
config :pinchflat, PinchflatWeb.Endpoint,

View file

@ -5,7 +5,8 @@ config :pinchflat,
yt_dlp_executable: Path.join([File.cwd!(), "/test/support/scripts/yt-dlp-mocks/repeater.sh"]),
media_directory: Path.join([System.tmp_dir!(), "test", "videos"]),
metadata_directory: Path.join([System.tmp_dir!(), "test", "metadata"]),
tmpfile_directory: Path.join([System.tmp_dir!(), "test", "tmpfiles"])
tmpfile_directory: Path.join([System.tmp_dir!(), "test", "tmpfiles"]),
file_watcher_poll_interval: 50
config :pinchflat, Oban, testing: :manual

View file

@ -66,6 +66,24 @@ defmodule Pinchflat.Media do
|> Repo.all()
end
@doc """
For a given media_item, tells you if it is pending download. This is defined as
the media_item having a `media_filepath` of `nil` and matching the format selection
rules of the parent media_profile.
Intentionally does not take the `download_media` setting of the source into account.
Returns boolean()
"""
def pending_download?(%MediaItem{} = media_item) do
media_profile = Repo.preload(media_item, source: :media_profile).source.media_profile
MediaItem
|> where([mi], mi.id == ^media_item.id and is_nil(mi.media_filepath))
|> where(^build_format_clauses(media_profile))
|> Repo.exists?()
end
@doc """
Returns a list of media_items that match the search term. Adds a `matching_search_term`
virtual field to the result set.

View file

@ -4,4 +4,5 @@ defmodule Pinchflat.MediaClient.Backends.BackendCommandRunner do
"""
@callback run(binary(), keyword(), binary()) :: {:ok, binary()} | {:error, binary(), integer()}
@callback run(binary(), keyword(), binary(), keyword()) :: {:ok, binary()} | {:error, binary(), integer()}
end

View file

@ -6,6 +6,7 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do
require Logger
alias Pinchflat.Utils.StringUtils
alias Pinchflat.Utils.FilesystemUtils, as: FSUtils
alias Pinchflat.MediaClient.Backends.BackendCommandRunner
@behaviour BackendCommandRunner
@ -15,19 +16,20 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do
a file and then returns its contents because yt-dlp will return warnings
to stdout even if the command is successful, but these will break JSON parsing.
Returns {:ok, binary()} | {:error, output, status}.
Additional Opts:
- :output_filepath - the path to save the output to. If not provided, a temporary
file will be created and used. Useful for if you need a reference to the file
for a file watcher.
IDEA: Indexing takes a long time, but the output is actually streamed to stdout.
Maybe we could listen to that stream instead so we can index videos as they're discovered.
See: https://stackoverflow.com/a/49061086/5665799
Returns {:ok, binary()} | {:error, output, status}.
"""
@impl BackendCommandRunner
def run(url, command_opts, output_template) do
def run(url, command_opts, output_template, addl_opts \\ []) do
command = backend_executable()
# These must stay in exactly this order, hence why I'm giving it its own variable.
# Also, can't use RAM file since yt-dlp needs a concrete filepath.
json_output_path = generate_json_output_path()
print_to_file_opts = [{:print_to_file, output_template}, json_output_path]
output_filepath = Keyword.get(addl_opts, :output_filepath, FSUtils.generate_metadata_tmpfile(:json))
print_to_file_opts = [{:print_to_file, output_template}, output_filepath]
formatted_command_opts = [url] ++ parse_options(command_opts ++ print_to_file_opts)
Logger.info("[yt-dlp] called with: #{Enum.join(formatted_command_opts, " ")}")
@ -36,24 +38,13 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do
{_, 0} ->
# IDEA: consider deleting the file after reading it
# (even on error? especially on error?)
File.read(json_output_path)
File.read(output_filepath)
{output, status} ->
{:error, output, status}
end
end
defp generate_json_output_path do
tmpfile_directory = Application.get_env(:pinchflat, :tmpfile_directory)
filepath = Path.join([tmpfile_directory, "#{StringUtils.random_string(64)}.json"])
# Ensure the file can be created and written to BEFORE we run the `yt-dlp` command
:ok = File.mkdir_p!(Path.dirname(filepath))
:ok = File.write(filepath, "")
filepath
end
# We want to satisfy the following behaviours:
#
# 1. If the key is an atom, convert it to a string and convert it to kebab case (for convenience)

View file

@ -4,19 +4,33 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.VideoCollection do
videos (aka: a source [ie: channels, playlists]).
"""
require Logger
alias Pinchflat.Utils.FunctionUtils
alias Pinchflat.Utils.FilesystemUtils
@doc """
Returns a list of maps representing the videos in the collection.
Options:
- :file_listener_handler - a function that will be called with the path to the
file that will be written to when yt-dlp is done. This is useful for
setting up a file watcher to know when the file is ready to be read.
Returns {:ok, [map()]} | {:error, any, ...}.
"""
def get_media_attributes(url, command_opts \\ []) do
def get_media_attributes(url, addl_opts \\ []) do
runner = Application.get_env(:pinchflat, :yt_dlp_runner)
opts = command_opts ++ [:simulate, :skip_download]
command_opts = [:simulate, :skip_download]
output_template = "%(.{id,title,was_live,original_url,description})j"
output_filepath = FilesystemUtils.generate_metadata_tmpfile(:json)
file_listener_handler = Keyword.get(addl_opts, :file_listener_handler, false)
case runner.run(url, opts, output_template) do
if file_listener_handler do
file_listener_handler.(output_filepath)
end
case runner.run(url, command_opts, output_template, output_filepath: output_filepath) do
{:ok, output} ->
output
|> String.split("\n", trim: true)

View file

@ -22,16 +22,21 @@ defmodule Pinchflat.MediaClient.SourceDetails do
Returns a list of basic video data mapsfor the given source URL OR
source record using the given backend.
Options:
- :file_listener_handler - a function that will be called with the path to the
file that will be written to by yt-dlp. This is useful for
setting up a file watcher to read the file as it gets written to.
Returns {:ok, [map()]} | {:error, any, ...}.
"""
def get_media_attributes(sourceable, backend \\ :yt_dlp)
def get_media_attributes(sourceable, opts \\ [], backend \\ :yt_dlp)
def get_media_attributes(%Source{} = source, backend) do
source_module(backend).get_media_attributes(source.collection_id)
def get_media_attributes(%Source{} = source, opts, backend) do
get_media_attributes(source.collection_id, opts, backend)
end
def get_media_attributes(source_url, backend) when is_binary(source_url) do
source_module(backend).get_media_attributes(source_url)
def get_media_attributes(source_url, opts, backend) when is_binary(source_url) do
source_module(backend).get_media_attributes(source_url, opts)
end
defp source_module(backend) do

View file

@ -2,7 +2,6 @@ defmodule Pinchflat.Tasks do
@moduledoc """
The Tasks context.
"""
import Ecto.Query, warn: false
alias Pinchflat.Repo

View file

@ -1,7 +1,9 @@
defmodule Pinchflat.Tasks.MediaItemTasks do
@moduledoc """
This module contains methods used by or used to control tasks (aka workers)
related to media items.
Contains methods used by OR used to create/manage tasks for media items.
Tasks/workers are meant to be thin wrappers so most of the actual work they
do is also defined here. Essentially, a one-stop-shop for media-related tasks/workers.
"""
alias Pinchflat.Media

View file

@ -1,16 +1,22 @@
defmodule Pinchflat.Tasks.SourceTasks do
@moduledoc """
This module contains methods used by or used to control tasks (aka workers)
related to sources.
Contains methods used by OR used to create/manage tasks for sources.
Tasks/workers are meant to be thin wrappers so most of the actual work they
do is also defined here. Essentially, a one-stop-shop for source-related tasks/workers.
"""
require Logger
alias Pinchflat.Media
alias Pinchflat.Tasks
alias Pinchflat.Sources
alias Pinchflat.Sources.Source
alias Pinchflat.Media.MediaItem
alias Pinchflat.MediaClient.SourceDetails
alias Pinchflat.Workers.MediaIndexingWorker
alias Pinchflat.Workers.VideoDownloadWorker
alias Pinchflat.Utils.FilesystemUtils.FileFollowerServer
@doc """
Starts tasks for indexing a source's media regardless of the source's indexing
@ -35,30 +41,37 @@ defmodule Pinchflat.Tasks.SourceTasks do
@doc """
Given a media source, creates (indexes) the media by creating media_items for each
media ID in the source.
media ID in the source. Afterward, kicks off a download task for each pending media
item belonging to the source. You can't tell me the method name isn't descriptive!
Indexing is slow and usually returns a list of all media data at once for record creation.
To help with this, we use a file follower to watch the file that yt-dlp writes to
so we can create media items as they come in. This parallelizes the process and adds
clarity to the user experience. This has a few things to be aware of which are documented
below in the file watcher setup method.
NOTE: downloads are only enqueued if the source is set to download media. Downloads are
also enqueued for ALL pending media items, not just the ones that were indexed in this
job run. This should ensure that any stragglers are caught if, for some reason, they
weren't enqueued or somehow got de-queued.
Since indexing returns all media data EVERY TIME, we rely on the unique index of the
media_id to prevent duplicates. Due to both the file follower and the fact that future
indexing will index a lot of existing data, this method will MOSTLY return error
changesets (from the unique index violation) and not media items. This is intended.
Returns [%MediaItem{}, ...] | [%Ecto.Changeset{}, ...]
"""
def index_media_items(%Source{} = source) do
{:ok, media_attributes} = SourceDetails.get_media_attributes(source.original_url)
def index_and_enqueue_download_for_media_items(%Source{} = source) do
# See the method definition below for more info on how file watchers work
# (important reading if you're not familiar with it)
{:ok, media_attributes} = get_media_attributes_and_setup_file_watcher(source)
result = Enum.map(media_attributes, fn media_attrs -> create_media_item_from_attributes(source, media_attrs) end)
Sources.update_source(source, %{last_indexed_at: DateTime.utc_now()})
enqueue_pending_media_tasks(source)
media_attributes
|> Enum.map(fn media_attrs ->
attrs = %{
source_id: source.id,
title: media_attrs["title"],
media_id: media_attrs["id"],
original_url: media_attrs["original_url"],
livestream: media_attrs["was_live"],
description: media_attrs["description"]
}
case Media.create_media_item(attrs) do
{:ok, media_item} -> media_item
{:error, changeset} -> changeset
end
end)
result
end
@doc """
@ -70,8 +83,6 @@ defmodule Pinchflat.Tasks.SourceTasks do
that any stragglers are caught if, for some reason, they weren't enqueued
or somehow got de-queued.
I'm not sure of a case where this would happen, but it's cheap insurance.
Returns :ok
"""
def enqueue_pending_media_tasks(%Source{download_media: true} = source) do
@ -99,4 +110,79 @@ defmodule Pinchflat.Tasks.SourceTasks do
|> Media.list_pending_media_items_for()
|> Enum.each(&Tasks.delete_pending_tasks_for/1)
end
# The file follower is a GenServer that watches a file for new lines and
# processes them. This works well, but we have to be resilliant to partially-written
# lines (ie: you should gracefully fail if you can't parse a line).
#
# This works in-tandem with the normal (blocking) media indexing behaviour. When
# the `get_media_attributes` method completes it'll return the FULL result to
# the caller for parsing. Ideally, every item in the list will have already
# been processed by the file follower, but if not, the caller handles creation
# of any media items that were missed/initially failed.
#
# It attempts a graceful shutdown of the file follower after the indexing is done,
# but the FileFollowerServer will also stop itself if it doesn't see any activity
# for a sufficiently long time.
defp get_media_attributes_and_setup_file_watcher(source) do
{:ok, pid} = FileFollowerServer.start_link()
handler = fn filepath -> setup_file_follower_watcher(pid, filepath, source) end
result = SourceDetails.get_media_attributes(source.original_url, file_listener_handler: handler)
FileFollowerServer.stop(pid)
result
end
defp setup_file_follower_watcher(pid, filepath, source) do
FileFollowerServer.watch_file(pid, filepath, fn line ->
case Phoenix.json_library().decode(line) do
{:ok, media_attrs} ->
Logger.debug("FileFollowerServer Handler: Got media attributes: #{inspect(media_attrs)}")
create_media_item_and_enqueue_download(source, media_attrs)
err ->
Logger.debug("FileFollowerServer Handler: Error decoding JSON: #{inspect(err)}")
err
end
end)
end
defp create_media_item_and_enqueue_download(source, media_attrs) do
maybe_media_item = create_media_item_from_attributes(source, media_attrs)
case maybe_media_item do
%MediaItem{} = media_item ->
if source.download_media && Media.pending_download?(media_item) do
Logger.debug("FileFollowerServer Handler: Enqueuing download task for #{inspect(media_attrs)}")
media_item
|> Map.take([:id])
|> VideoDownloadWorker.new()
|> Tasks.create_job_with_task(media_item)
end
changeset ->
changeset
end
end
defp create_media_item_from_attributes(source, media_attrs) do
attrs = %{
source_id: source.id,
title: media_attrs["title"],
media_id: media_attrs["id"],
original_url: media_attrs["original_url"],
livestream: media_attrs["was_live"],
description: media_attrs["description"]
}
case Media.create_media_item(attrs) do
{:ok, media_item} -> media_item
{:error, changeset} -> changeset
end
end
end

View file

@ -0,0 +1,23 @@
defmodule Pinchflat.Utils.FilesystemUtils do
@moduledoc """
Utility methods for working with the filesystem
"""
alias Pinchflat.Utils.StringUtils
@doc """
Generates a temporary file and returns its path. The file is empty and has the given type.
Generates all the directories in the path if they don't exist.
Returns binary()
"""
def generate_metadata_tmpfile(type) do
tmpfile_directory = Application.get_env(:pinchflat, :tmpfile_directory)
filepath = Path.join([tmpfile_directory, "#{StringUtils.random_string(64)}.#{type}"])
:ok = File.mkdir_p!(Path.dirname(filepath))
:ok = File.write(filepath, "")
filepath
end
end

View file

@ -0,0 +1,121 @@
defmodule Pinchflat.Utils.FilesystemUtils.FileFollowerServer do
@moduledoc """
A GenServer that watches a file for new lines and processes them as they come in.
This is useful for tailing log files and other similar tasks. If there's no activity
for a certain amount of time, the server will stop itself.
"""
use GenServer
require Logger
@poll_interval_ms Application.compile_env(:pinchflat, :file_watcher_poll_interval)
@activity_timeout_ms 60_000
# Client API
@doc """
Starts the file follower server
Returns {:ok, pid} or {:error, reason}
"""
def start_link() do
GenServer.start_link(__MODULE__, [])
end
@doc """
Starts the file watcher for a given filepath and handler function.
Returns :ok
"""
def watch_file(process, filepath, handler) do
GenServer.cast(process, {:watch_file, filepath, handler})
end
@doc """
Stops the file watcher and closes the file.
Returns :ok
"""
def stop(process) do
GenServer.cast(process, :stop)
end
# Server Callbacks
@impl true
def init(_opts) do
# Start with a blank state because, based on the common calling
# pattern for this module, we'll need a reference to the server's
# PID before we start watching any files so we can later stop the
# server gracefully.
{:ok, %{}}
end
@impl true
def handle_cast({:watch_file, filepath, handler}, _old_state) do
{:ok, io_device} = :file.open(filepath, [:raw, :read_ahead, :binary])
state = %{
io_device: io_device,
last_activity: DateTime.utc_now(),
handler: handler
}
Process.send(self(), :read_new_lines, [])
{:noreply, state}
end
@impl true
def handle_cast(:stop, state) do
Logger.debug("Gracefully stopping file follower")
:file.close(state.io_device)
{:stop, :normal, state}
end
@impl true
def handle_info(:read_new_lines, state) do
last_activity = state.last_activity
# If there's no new lines written for a certain amount of time, stop the server
if DateTime.diff(DateTime.utc_now(), last_activity, :millisecond) > @activity_timeout_ms do
Logger.debug("No activity for #{@activity_timeout_ms}ms. Requesting stop.")
stop(self())
{:noreply, state}
else
attempt_process_new_lines(state)
end
end
defp attempt_process_new_lines(state) do
io_device = state.io_device
# This reads one line at a time. If a line is found, it
# will be passed to the handler, we'll note the time of
# the last activity, and then we'll immediately call this
# again to read the next line.
#
# If there are no lines, it waits for the poll interval
# before trying again.
case :file.read_line(io_device) do
{:ok, line} ->
state.handler.(line)
Process.send(self(), :read_new_lines, [])
{:noreply, %{state | last_activity: DateTime.utc_now()}}
:eof ->
Logger.debug("EOF reached, waiting before trying to read new lines")
Process.send_after(self(), :read_new_lines, @poll_interval_ms)
{:noreply, state}
{:error, reason} ->
Logger.error("Error reading file: #{reason}")
stop(self())
{:noreply, state}
end
end
end

View file

@ -43,13 +43,14 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do
case {source.index_frequency_minutes, source.last_indexed_at} do
{index_freq, _} when index_freq > 0 ->
# If the indexing is on a schedule simply run indexing and reschedule
index_media(source)
SourceTasks.index_and_enqueue_download_for_media_items(source)
reschedule_indexing(source)
{_, nil} ->
# If the source has never been indexed, index it once
# even if it's not meant to reschedule
index_media(source)
SourceTasks.index_and_enqueue_download_for_media_items(source)
:ok
_ ->
# If the source HAS been indexed and is not meant to reschedule,
@ -58,12 +59,6 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do
end
end
defp index_media(source) do
SourceTasks.index_media_items(source)
# This method handles the case where a source is set to not download media
SourceTasks.enqueue_pending_media_tasks(source)
end
defp reschedule_indexing(source) do
source
|> Map.take([:id])

View file

@ -10,7 +10,7 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunnerTest do
on_exit(&reset_executable/0)
end
describe "run/2" do
describe "run/4" do
test "it returns the output and status when the command succeeds" do
assert {:ok, _output} = Runner.run(@video_url, [], "")
end
@ -57,6 +57,12 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunnerTest do
assert {:error, "", 1} = Runner.run(@video_url, [], "")
end)
end
test "optionally lets you specify an output_filepath" do
assert {:ok, output} = Runner.run(@video_url, [], "%(id)s", output_filepath: "/tmp/yt-dlp-output.json")
assert String.contains?(output, "--print-to-file %(id)s /tmp/yt-dlp-output.json")
end
end
defp wrap_executable(new_executable, fun) do

View file

@ -11,14 +11,16 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.VideoCollectionTest do
describe "get_media_attributes/2" do
test "returns a list of video attributes with no blank elements" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture() <> "\n\n"} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture() <> "\n\n"}
end)
assert {:ok, [%{"id" => "video1"}, %{"id" => "video2"}, %{"id" => "video3"}]} =
VideoCollection.get_media_attributes(@channel_url)
end
test "it passes the expected default args" do
expect(YtDlpRunnerMock, :run, fn _url, opts, ot ->
expect(YtDlpRunnerMock, :run, fn _url, opts, ot, _addl_opts ->
assert opts == [:simulate, :skip_download]
assert ot == "%(.{id,title,was_live,original_url,description})j"
@ -28,20 +30,35 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.VideoCollectionTest do
assert {:ok, _} = VideoCollection.get_media_attributes(@channel_url)
end
test "it passes the expected custom args" do
expect(YtDlpRunnerMock, :run, fn _url, opts, _ot ->
assert opts == [:custom_arg, :simulate, :skip_download]
test "returns the error straight through when the command fails" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:error, "Big issue", 1} end)
assert {:error, "Big issue", 1} = VideoCollection.get_media_attributes(@channel_url)
end
test "passes the explict tmpfile path to runner" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts ->
assert [{:output_filepath, filepath}] = addl_opts
assert String.ends_with?(filepath, ".json")
{:ok, ""}
end)
assert {:ok, _} = VideoCollection.get_media_attributes(@channel_url, [:custom_arg])
assert {:ok, _} = VideoCollection.get_media_attributes(@channel_url)
end
test "returns the error straight through when the command fails" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:error, "Big issue", 1} end)
test "supports an optional file_listener_handler that gets passed a filename" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end)
current_self = self()
assert {:error, "Big issue", 1} = VideoCollection.get_media_attributes(@channel_url)
handler = fn filename ->
send(current_self, {:handler, filename})
end
assert {:ok, _} = VideoCollection.get_media_attributes(@channel_url, file_listener_handler: handler)
assert_receive {:handler, filename}
assert String.ends_with?(filename, ".json")
end
end

View file

@ -45,7 +45,7 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do
describe "get_media_attributes/2 when passed a string" do
test "it passes the expected arguments to the backend" do
expect(YtDlpRunnerMock, :run, fn @channel_url, opts, ot ->
expect(YtDlpRunnerMock, :run, fn @channel_url, opts, ot, _addl_opts ->
assert opts == [:simulate, :skip_download]
assert ot == "%(.{id,title,was_live,original_url,description})j"
@ -56,7 +56,7 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do
end
test "it returns a list of maps" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot ->
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
@ -68,7 +68,7 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do
test "it calls the backend with the source's collection ID" do
source = source_fixture()
expect(YtDlpRunnerMock, :run, fn url, _opts, _ot ->
expect(YtDlpRunnerMock, :run, fn url, _opts, _ot, _addl_opts ->
assert source.collection_id == url
{:ok, source_attributes_return_fixture()}
end)
@ -77,7 +77,7 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do
end
test "it builds options based on the source's media profile" do
expect(YtDlpRunnerMock, :run, fn _url, opts, _ot ->
expect(YtDlpRunnerMock, :run, fn _url, opts, _ot, _addl_opts ->
assert opts == [:simulate, :skip_download]
{:ok, ""}
end)
@ -91,5 +91,22 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do
source = source_fixture(media_profile_id: media_profile.id)
assert {:ok, _} = SourceDetails.get_media_attributes(source)
end
test "lets you pass through an optional file_listener_handler" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
source = source_fixture()
current_self = self()
handler = fn filename ->
send(current_self, {:handler, filename})
end
assert {:ok, _} = SourceDetails.get_media_attributes(source, file_listener_handler: handler)
assert_receive {:handler, _}
end
end
end

View file

@ -215,6 +215,34 @@ defmodule Pinchflat.MediaTest do
end
end
describe "pending_download?/1" do
test "returns true when the media hasn't been downloaded" do
media_item = media_item_fixture(%{media_filepath: nil})
assert Media.pending_download?(media_item)
end
test "returns false if the media has been downloaded" do
media_item = media_item_fixture(%{media_filepath: "/video/#{Faker.File.file_name(:video)}"})
refute Media.pending_download?(media_item)
end
test "returns false if the media hasn't been downloaded but the profile doesn't DL shorts" do
source = source_fixture(%{media_profile_id: media_profile_fixture(%{shorts_behaviour: :exclude}).id})
media_item = media_item_fixture(%{source_id: source.id, media_filepath: nil, original_url: "/shorts/"})
refute Media.pending_download?(media_item)
end
test "returns false if the media hasn't been downloaded but the profile doesn't DL livestreams" do
source = source_fixture(%{media_profile_id: media_profile_fixture(%{livestream_behaviour: :exclude}).id})
media_item = media_item_fixture(%{source_id: source.id, media_filepath: nil, livestream: true})
refute Media.pending_download?(media_item)
end
end
describe "search/1" do
setup do
media_item =

View file

@ -5,6 +5,7 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
import Pinchflat.TasksFixtures
import Pinchflat.MediaFixtures
import Pinchflat.SourcesFixtures
import Pinchflat.ProfilesFixtures
alias Pinchflat.Tasks
alias Pinchflat.Tasks.Task
@ -43,15 +44,17 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
end
end
describe "index_media_items/1" do
describe "index_and_enqueue_download_for_media_items/1" do
setup do
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end)
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
{:ok, [source: source_fixture()]}
end
test "it creates a media_item record for each media ID returned", %{source: source} do
assert media_items = SourceTasks.index_media_items(source)
assert media_items = SourceTasks.index_and_enqueue_download_for_media_items(source)
assert Enum.count(media_items) == 3
assert ["video1", "video2", "video3"] == Enum.map(media_items, & &1.media_id)
@ -63,15 +66,15 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
test "it attaches all media_items to the given source", %{source: source} do
source_id = source.id
assert media_items = SourceTasks.index_media_items(source)
assert media_items = SourceTasks.index_and_enqueue_download_for_media_items(source)
assert Enum.count(media_items) == 3
assert Enum.all?(media_items, fn %MediaItem{source_id: ^source_id} -> true end)
end
test "it won't duplicate media_items based on media_id and source", %{source: source} do
_first_run = SourceTasks.index_media_items(source)
_duplicate_run = SourceTasks.index_media_items(source)
_first_run = SourceTasks.index_and_enqueue_download_for_media_items(source)
_duplicate_run = SourceTasks.index_and_enqueue_download_for_media_items(source)
media_items = Repo.preload(source, :media_items).media_items
assert Enum.count(media_items) == 3
@ -80,8 +83,8 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
test "it can duplicate media_ids for different sources", %{source: source} do
other_source = source_fixture()
media_items = SourceTasks.index_media_items(source)
media_items_other_source = SourceTasks.index_media_items(other_source)
media_items = SourceTasks.index_and_enqueue_download_for_media_items(source)
media_items_other_source = SourceTasks.index_and_enqueue_download_for_media_items(other_source)
assert Enum.count(media_items) == 3
assert Enum.count(media_items_other_source) == 3
@ -91,8 +94,8 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
end
test "it returns a list of media_items or changesets", %{source: source} do
first_run = SourceTasks.index_media_items(source)
duplicate_run = SourceTasks.index_media_items(source)
first_run = SourceTasks.index_and_enqueue_download_for_media_items(source)
duplicate_run = SourceTasks.index_and_enqueue_download_for_media_items(source)
assert Enum.all?(first_run, fn %MediaItem{} -> true end)
assert Enum.all?(duplicate_run, fn %Ecto.Changeset{} -> true end)
@ -101,11 +104,122 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
test "it updates the source's last_indexed_at field", %{source: source} do
assert source.last_indexed_at == nil
SourceTasks.index_media_items(source)
SourceTasks.index_and_enqueue_download_for_media_items(source)
source = Repo.reload!(source)
assert DateTime.diff(DateTime.utc_now(), source.last_indexed_at) < 2
end
test "it enqueues a job for each pending media item" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
SourceTasks.index_and_enqueue_download_for_media_items(source)
assert_enqueued(worker: VideoDownloadWorker, args: %{"id" => media_item.id})
end
test "it does not attach tasks if the source is set to not download" do
source = source_fixture(download_media: false)
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
SourceTasks.index_and_enqueue_download_for_media_items(source)
assert [] = Tasks.list_tasks_for(:media_item_id, media_item.id)
end
end
describe "index_and_enqueue_download_for_media_items/1 when testing file watcher" do
setup do
{:ok, [source: source_fixture()]}
end
test "creates a new media item for everything already in the file", %{source: source} do
watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval)
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts ->
filepath = Keyword.get(addl_opts, :output_filepath)
File.write(filepath, source_attributes_return_fixture())
# Need to add a delay to ensure the file watcher has time to read the file
:timer.sleep(watcher_poll_interval * 2)
# We know we're testing the file watcher since the syncronous call will only
# return an empty string (creating no records)
{:ok, ""}
end)
assert Repo.aggregate(MediaItem, :count, :id) == 0
SourceTasks.index_and_enqueue_download_for_media_items(source)
assert Repo.aggregate(MediaItem, :count, :id) == 3
end
test "enqueues a download for everything already in the file", %{source: source} do
watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval)
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts ->
filepath = Keyword.get(addl_opts, :output_filepath)
File.write(filepath, source_attributes_return_fixture())
# Need to add a delay to ensure the file watcher has time to read the file
:timer.sleep(watcher_poll_interval * 2)
# We know we're testing the file watcher since the syncronous call will only
# return an empty string (creating no records)
{:ok, ""}
end)
refute_enqueued(worker: VideoDownloadWorker)
SourceTasks.index_and_enqueue_download_for_media_items(source)
assert_enqueued(worker: VideoDownloadWorker)
end
test "does not enqueue downloads if the source is set to not download" do
watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval)
source = source_fixture(download_media: false)
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts ->
filepath = Keyword.get(addl_opts, :output_filepath)
File.write(filepath, source_attributes_return_fixture())
# Need to add a delay to ensure the file watcher has time to read the file
:timer.sleep(watcher_poll_interval * 2)
# We know we're testing the file watcher since the syncronous call will only
# return an empty string (creating no records)
{:ok, ""}
end)
SourceTasks.index_and_enqueue_download_for_media_items(source)
refute_enqueued(worker: VideoDownloadWorker)
end
test "does not enqueue downloads for media that doesn't match the profile's format options" do
watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval)
profile = media_profile_fixture(%{shorts_behaviour: :exclude})
source = source_fixture(%{media_profile_id: profile.id})
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts ->
filepath = Keyword.get(addl_opts, :output_filepath)
contents =
Phoenix.json_library().encode!(%{
id: "video2",
title: "Video 2",
original_url: "https://example.com/shorts/video2",
was_live: true,
description: "desc2"
})
File.write(filepath, contents)
# Need to add a delay to ensure the file watcher has time to read the file
:timer.sleep(watcher_poll_interval * 2)
# We know we're testing the file watcher since the syncronous call will only
# return an empty string (creating no records)
{:ok, ""}
end)
SourceTasks.index_and_enqueue_download_for_media_items(source)
refute_enqueued(worker: VideoDownloadWorker)
end
end
describe "enqueue_pending_media_tasks/1" do

View file

@ -0,0 +1,52 @@
defmodule Pinchflat.Utils.FilesystemUtils.FileFollowerServerTest do
use ExUnit.Case, async: true
alias alias Pinchflat.Utils.FilesystemUtils
alias Pinchflat.Utils.FilesystemUtils.FileFollowerServer
setup do
{:ok, pid} = FileFollowerServer.start_link()
tmpfile = FilesystemUtils.generate_metadata_tmpfile(:txt)
{:ok, %{pid: pid, tmpfile: tmpfile}}
end
describe "watch_file" do
test "calls the handler for each existing line in the file", %{pid: pid, tmpfile: tmpfile} do
File.write!(tmpfile, "line1\nline2")
parent = self()
handler = fn line -> send(parent, line) end
FileFollowerServer.watch_file(pid, tmpfile, handler)
assert_receive "line1\n"
assert_receive "line2"
end
test "calls the handler for each new line in the file", %{pid: pid, tmpfile: tmpfile} do
parent = self()
file = File.open!(tmpfile, [:append])
handler = fn line -> send(parent, line) end
FileFollowerServer.watch_file(pid, tmpfile, handler)
IO.binwrite(file, "line1\n")
assert_receive "line1\n"
IO.binwrite(file, "line2")
assert_receive "line2"
end
end
describe "stop" do
test "stops the watcher", %{pid: pid, tmpfile: tmpfile} do
handler = fn _line -> :noop end
FileFollowerServer.watch_file(pid, tmpfile, handler)
refute is_nil(Process.info(pid))
FileFollowerServer.stop(pid)
# Gotta wait for the server to stop async
:timer.sleep(10)
assert is_nil(Process.info(pid))
end
end
end

View file

@ -0,0 +1,16 @@
defmodule Pinchflat.Utils.FilesystemUtilsTest do
use ExUnit.Case, async: true
alias Pinchflat.Utils.FilesystemUtils
describe "generate_metadata_tmpfile/1" do
test "creates a tmpfile and returns its path" do
res = FilesystemUtils.generate_metadata_tmpfile(:json)
assert String.ends_with?(res, ".json")
assert File.exists?(res)
File.rm!(res)
end
end
end

View file

@ -13,7 +13,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
describe "perform/1" do
test "it indexes the source if it should be indexed" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end)
source = source_fixture(index_frequency_minutes: 10)
@ -21,7 +21,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it indexes the source no matter what if the source has never been indexed before" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end)
source = source_fixture(index_frequency_minutes: 0, last_indexed_at: nil)
@ -29,7 +29,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it does not do any indexing if the source has been indexed and shouldn't be rescheduled" do
expect(YtDlpRunnerMock, :run, 0, fn _url, _opts, _ot -> {:ok, ""} end)
expect(YtDlpRunnerMock, :run, 0, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end)
source = source_fixture(index_frequency_minutes: -1, last_indexed_at: DateTime.utc_now())
@ -37,7 +37,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it does not reschedule if the source shouldn't be indexed" do
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end)
stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end)
source = source_fixture(index_frequency_minutes: -1)
perform_job(MediaIndexingWorker, %{id: source.id})
@ -46,7 +46,9 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it kicks off a download job for each pending media item" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
source = source_fixture(index_frequency_minutes: 10)
perform_job(MediaIndexingWorker, %{id: source.id})
@ -55,7 +57,9 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it starts a job for any pending media item even if it's from another run" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
source = source_fixture(index_frequency_minutes: 10)
media_item_fixture(%{source_id: source.id, media_filepath: nil})
@ -65,7 +69,9 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it does not kick off a job for media items that could not be saved" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts ->
{:ok, source_attributes_return_fixture()}
end)
source = source_fixture(index_frequency_minutes: 10)
media_item_fixture(%{source_id: source.id, media_filepath: nil, media_id: "video1"})
@ -76,7 +82,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it reschedules the job based on the index frequency" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end)
source = source_fixture(index_frequency_minutes: 10)
perform_job(MediaIndexingWorker, %{id: source.id})
@ -89,7 +95,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it creates a task for the rescheduled job" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end)
source = source_fixture(index_frequency_minutes: 10)
task_count_fetcher = fn -> Enum.count(Tasks.list_tasks()) end
@ -100,7 +106,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end
test "it creates the basic media_item records" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, source_attributes_return_fixture()} end)
source = source_fixture(index_frequency_minutes: 10)