diff --git a/lib/pinchflat/slow_indexing/slow_indexing_helpers.ex b/lib/pinchflat/slow_indexing/slow_indexing_helpers.ex index 816d162..7b82a1f 100644 --- a/lib/pinchflat/slow_indexing/slow_indexing_helpers.ex +++ b/lib/pinchflat/slow_indexing/slow_indexing_helpers.ex @@ -28,7 +28,7 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do """ def kickoff_indexing_task(%Source{} = source, job_args \\ %{}, job_opts \\ []) do Tasks.delete_pending_tasks_for(source, "FastIndexingWorker") - Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker") + Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker", include_executing: true) MediaCollectionIndexingWorker.kickoff_with_task(source, job_args, job_opts) end diff --git a/lib/pinchflat/tasks/tasks.ex b/lib/pinchflat/tasks/tasks.ex index 7b94e3c..2dfef0a 100644 --- a/lib/pinchflat/tasks/tasks.ex +++ b/lib/pinchflat/tasks/tasks.ex @@ -53,20 +53,6 @@ defmodule Pinchflat.Tasks do ) end - @doc """ - Returns the list of pending tasks for a given record type and ID. Optionally allows you to specify - which worker to include. - - Returns [%Task{}, ...] - """ - def list_pending_tasks_for(record, worker_name \\ nil) do - list_tasks_for( - record, - worker_name, - [:available, :scheduled, :retryable] - ) - end - @doc """ Gets a single task. @@ -127,13 +113,13 @@ defmodule Pinchflat.Tasks do @doc """ Deletes all tasks attached to a given record, cancelling any attached jobs. - Optionally allows you to specify which worker to include. + Optionally allows you to specify which worker and job states to include. Returns :ok """ - def delete_tasks_for(record, worker_name \\ nil) do + def delete_tasks_for(record, worker_name \\ nil, job_states \\ Oban.Job.states()) do record - |> list_tasks_for(worker_name) + |> list_tasks_for(worker_name, job_states) |> Enum.each(&delete_task/1) end @@ -143,10 +129,12 @@ defmodule Pinchflat.Tasks do Returns :ok """ - def delete_pending_tasks_for(record, worker_name \\ nil) do - record - |> list_pending_tasks_for(worker_name) - |> Enum.each(&delete_task/1) + def delete_pending_tasks_for(record, worker_name \\ nil, opts \\ []) do + include_executing = Keyword.get(opts, :include_executing, false) + base_job_states = [:available, :scheduled, :retryable] + job_states = if include_executing, do: base_job_states ++ [:executing], else: base_job_states + + delete_tasks_for(record, worker_name, job_states) end @doc """ diff --git a/test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs b/test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs index cca56c1..734afb9 100644 --- a/test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs +++ b/test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs @@ -41,6 +41,17 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end end + test "deletes any executing media collection tasks for the source" do + source = source_fixture() + {:ok, job} = Oban.insert(MediaCollectionIndexingWorker.new(%{"id" => source.id})) + task = task_fixture(source_id: source.id, job_id: job.id) + Repo.update_all(from(Oban.Job, where: [id: ^task.job_id], update: [set: [state: "executing"]]), []) + + assert {:ok, _} = SlowIndexingHelpers.kickoff_indexing_task(source) + + assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end + end + test "deletes any pending media tasks for the source" do source = source_fixture() {:ok, job} = Oban.insert(FastIndexingWorker.new(%{"id" => source.id})) diff --git a/test/pinchflat/tasks_test.exs b/test/pinchflat/tasks_test.exs index 36140d5..4b99f12 100644 --- a/test/pinchflat/tasks_test.exs +++ b/test/pinchflat/tasks_test.exs @@ -12,7 +12,7 @@ defmodule Pinchflat.TasksTest do @invalid_attrs %{job_id: nil} describe "schema" do - test "it deletes a task when the job gets deleted" do + test "deletes a task when the job gets deleted" do task = Repo.preload(task_fixture(), [:job]) {:ok, _} = Repo.delete(task.job) @@ -20,7 +20,7 @@ defmodule Pinchflat.TasksTest do assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end end - test "it does not delete the other record when a job gets deleted" do + test "does not delete the other record when a job gets deleted" do task = Repo.preload(task_fixture(), [:source, :job]) {:ok, _} = Repo.delete(task.job) @@ -30,21 +30,21 @@ defmodule Pinchflat.TasksTest do end describe "list_tasks/0" do - test "it returns all tasks" do + test "returns all tasks" do task = task_fixture() assert Tasks.list_tasks() == [task] end end describe "list_tasks_for/3" do - test "it lets you specify which record type/ID to join on" do + test "lets you specify which record type/ID to join on" do source = source_fixture() task = task_fixture(source_id: source.id) assert Tasks.list_tasks_for(source, nil, [:available]) == [task] end - test "it lets you specify which job states to include" do + test "lets you specify which job states to include" do source = source_fixture() task = task_fixture(source_id: source.id) @@ -52,7 +52,7 @@ defmodule Pinchflat.TasksTest do assert Tasks.list_tasks_for(source, nil, [:cancelled]) == [] end - test "it lets you specify which worker to include" do + test "lets you specify which worker to include" do source = source_fixture() task = task_fixture(source_id: source.id) @@ -60,7 +60,7 @@ defmodule Pinchflat.TasksTest do assert Tasks.list_tasks_for(source, "FooBarWorker") == [] end - test "it includes all workers if no worker is specified" do + test "includes all workers if no worker is specified" do source = source_fixture() task = task_fixture(source_id: source.id) @@ -68,32 +68,8 @@ defmodule Pinchflat.TasksTest do end end - describe "list_pending_tasks_for/3" do - test "it lists pending tasks" do - source = source_fixture() - task = task_fixture(source_id: source.id) - - assert Tasks.list_pending_tasks_for(source) == [task] - end - - test "it does not list non-pending tasks" do - task = Repo.preload(task_fixture(), [:job, :source]) - :ok = Oban.cancel_job(task.job) - - assert Tasks.list_pending_tasks_for(task.source) == [] - end - - test "it lets you specify which worker to include" do - source = source_fixture() - task = task_fixture(source_id: source.id) - - assert Tasks.list_pending_tasks_for(source, "TestJobWorker") == [task] - assert Tasks.list_pending_tasks_for(source, "FooBarWorker") == [] - end - end - describe "get_task!/1" do - test "it returns the task with given id" do + test "returns the task with given id" do task = task_fixture() assert Tasks.get_task!(task.id) == task end @@ -132,7 +108,7 @@ defmodule Pinchflat.TasksTest do end describe "create_job_with_task/2" do - test "it enqueues the given job" do + test "enqueues the given job" do media_item = media_item_fixture() refute_enqueued(worker: TestJobWorker) @@ -140,7 +116,7 @@ defmodule Pinchflat.TasksTest do assert_enqueued(worker: TestJobWorker) end - test "it creates a task record if successful" do + test "creates a task record if successful" do source = source_fixture() assert {:ok, %Task{} = task} = Tasks.create_job_with_task(TestJobWorker.new(%{}), source) @@ -148,7 +124,7 @@ defmodule Pinchflat.TasksTest do assert task.source_id == source.id end - test "it returns an error if the job already exists" do + test "returns an error if the job already exists" do source = source_fixture() job = TestJobWorker.new(%{foo: "bar"}, unique: [period: :infinity]) @@ -156,7 +132,7 @@ defmodule Pinchflat.TasksTest do assert {:error, :duplicate_job} = Tasks.create_job_with_task(job, source) end - test "it returns an error if the job fails to enqueue" do + test "returns an error if the job fails to enqueue" do source = source_fixture() assert {:error, %Ecto.Changeset{}} = Tasks.create_job_with_task(%Ecto.Changeset{}, source) @@ -181,7 +157,7 @@ defmodule Pinchflat.TasksTest do end describe "delete_tasks_for/2" do - test "it deletes tasks attached to a source" do + test "deletes tasks attached to a source" do source = source_fixture() task = task_fixture(source_id: source.id) @@ -189,7 +165,7 @@ defmodule Pinchflat.TasksTest do assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end end - test "it deletes the tasks attached to a media_item" do + test "deletes the tasks attached to a media_item" do media_item = media_item_fixture() task = task_fixture(media_item_id: media_item.id) @@ -208,6 +184,17 @@ defmodule Pinchflat.TasksTest do assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end end + test "deletion can specify which states to include" do + source = source_fixture() + task = task_fixture(source_id: source.id) + + assert :ok = Tasks.delete_tasks_for(source, nil, [:executing]) + assert Repo.reload!(task) + + assert :ok = Tasks.delete_tasks_for(source, nil, [:available]) + assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end + end + test "deletion does not impact unintended records" do source = source_fixture() task = task_fixture(source_id: source.id) @@ -221,7 +208,7 @@ defmodule Pinchflat.TasksTest do end describe "delete_pending_tasks_for/1" do - test "it deletes pending tasks attached to a source" do + test "deletes pending tasks attached to a source" do source = source_fixture() task = task_fixture(source_id: source.id) @@ -229,7 +216,7 @@ defmodule Pinchflat.TasksTest do assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end end - test "it does not delete non-pending tasks" do + test "does not delete non-pending tasks" do source = source_fixture() task = Repo.preload(task_fixture(source_id: source.id), :job) :ok = Oban.cancel_job(task.job) @@ -238,7 +225,7 @@ defmodule Pinchflat.TasksTest do assert Tasks.get_task!(task.id) end - test "it works on media_items" do + test "works on media_items" do media_item = media_item_fixture() pending_task = task_fixture(media_item_id: media_item.id) cancelled_task = Repo.preload(task_fixture(media_item_id: media_item.id), :job) @@ -259,10 +246,23 @@ defmodule Pinchflat.TasksTest do assert :ok = Tasks.delete_pending_tasks_for(media_item, "TestJobWorker") assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end end + + test "deletion can optionall include executing tasks" do + source = source_fixture() + task = task_fixture(source_id: source.id) + + from(Oban.Job, where: [id: ^task.job_id], update: [set: [state: "executing"]]) + |> Repo.update_all([]) + + assert :ok = Tasks.delete_pending_tasks_for(source, nil, include_executing: false) + assert Repo.reload!(task) + assert :ok = Tasks.delete_pending_tasks_for(source, nil, include_executing: true) + assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end + end end describe "change_task/1" do - test "it returns a task changeset" do + test "returns a task changeset" do task = task_fixture() assert %Ecto.Changeset{} = Tasks.change_task(task) end