mirror of
https://github.com/kieraneglin/pinchflat.git
synced 2026-01-23 02:24:24 +00:00
[Bugfix] Disallow concurrent slow-indexing runs for the same source (#384)
* Updated deps in case that does anything * Kills running slow-indexes when a new slow-index is enqueued * Revert deps upgrade (saving that for its own PR)
This commit is contained in:
parent
e0745bdfbe
commit
f6344d43d2
4 changed files with 63 additions and 64 deletions
|
|
@ -28,7 +28,7 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
|
|||
"""
|
||||
def kickoff_indexing_task(%Source{} = source, job_args \\ %{}, job_opts \\ []) do
|
||||
Tasks.delete_pending_tasks_for(source, "FastIndexingWorker")
|
||||
Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker")
|
||||
Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker", include_executing: true)
|
||||
|
||||
MediaCollectionIndexingWorker.kickoff_with_task(source, job_args, job_opts)
|
||||
end
|
||||
|
|
|
|||
|
|
@ -53,20 +53,6 @@ defmodule Pinchflat.Tasks do
|
|||
)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Returns the list of pending tasks for a given record type and ID. Optionally allows you to specify
|
||||
which worker to include.
|
||||
|
||||
Returns [%Task{}, ...]
|
||||
"""
|
||||
def list_pending_tasks_for(record, worker_name \\ nil) do
|
||||
list_tasks_for(
|
||||
record,
|
||||
worker_name,
|
||||
[:available, :scheduled, :retryable]
|
||||
)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets a single task.
|
||||
|
||||
|
|
@ -127,13 +113,13 @@ defmodule Pinchflat.Tasks do
|
|||
|
||||
@doc """
|
||||
Deletes all tasks attached to a given record, cancelling any attached jobs.
|
||||
Optionally allows you to specify which worker to include.
|
||||
Optionally allows you to specify which worker and job states to include.
|
||||
|
||||
Returns :ok
|
||||
"""
|
||||
def delete_tasks_for(record, worker_name \\ nil) do
|
||||
def delete_tasks_for(record, worker_name \\ nil, job_states \\ Oban.Job.states()) do
|
||||
record
|
||||
|> list_tasks_for(worker_name)
|
||||
|> list_tasks_for(worker_name, job_states)
|
||||
|> Enum.each(&delete_task/1)
|
||||
end
|
||||
|
||||
|
|
@ -143,10 +129,12 @@ defmodule Pinchflat.Tasks do
|
|||
|
||||
Returns :ok
|
||||
"""
|
||||
def delete_pending_tasks_for(record, worker_name \\ nil) do
|
||||
record
|
||||
|> list_pending_tasks_for(worker_name)
|
||||
|> Enum.each(&delete_task/1)
|
||||
def delete_pending_tasks_for(record, worker_name \\ nil, opts \\ []) do
|
||||
include_executing = Keyword.get(opts, :include_executing, false)
|
||||
base_job_states = [:available, :scheduled, :retryable]
|
||||
job_states = if include_executing, do: base_job_states ++ [:executing], else: base_job_states
|
||||
|
||||
delete_tasks_for(record, worker_name, job_states)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
|
|
|||
|
|
@ -41,6 +41,17 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
|
|||
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
|
||||
end
|
||||
|
||||
test "deletes any executing media collection tasks for the source" do
|
||||
source = source_fixture()
|
||||
{:ok, job} = Oban.insert(MediaCollectionIndexingWorker.new(%{"id" => source.id}))
|
||||
task = task_fixture(source_id: source.id, job_id: job.id)
|
||||
Repo.update_all(from(Oban.Job, where: [id: ^task.job_id], update: [set: [state: "executing"]]), [])
|
||||
|
||||
assert {:ok, _} = SlowIndexingHelpers.kickoff_indexing_task(source)
|
||||
|
||||
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
|
||||
end
|
||||
|
||||
test "deletes any pending media tasks for the source" do
|
||||
source = source_fixture()
|
||||
{:ok, job} = Oban.insert(FastIndexingWorker.new(%{"id" => source.id}))
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ defmodule Pinchflat.TasksTest do
|
|||
@invalid_attrs %{job_id: nil}
|
||||
|
||||
describe "schema" do
|
||||
test "it deletes a task when the job gets deleted" do
|
||||
test "deletes a task when the job gets deleted" do
|
||||
task = Repo.preload(task_fixture(), [:job])
|
||||
|
||||
{:ok, _} = Repo.delete(task.job)
|
||||
|
|
@ -20,7 +20,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
|
||||
end
|
||||
|
||||
test "it does not delete the other record when a job gets deleted" do
|
||||
test "does not delete the other record when a job gets deleted" do
|
||||
task = Repo.preload(task_fixture(), [:source, :job])
|
||||
|
||||
{:ok, _} = Repo.delete(task.job)
|
||||
|
|
@ -30,21 +30,21 @@ defmodule Pinchflat.TasksTest do
|
|||
end
|
||||
|
||||
describe "list_tasks/0" do
|
||||
test "it returns all tasks" do
|
||||
test "returns all tasks" do
|
||||
task = task_fixture()
|
||||
assert Tasks.list_tasks() == [task]
|
||||
end
|
||||
end
|
||||
|
||||
describe "list_tasks_for/3" do
|
||||
test "it lets you specify which record type/ID to join on" do
|
||||
test "lets you specify which record type/ID to join on" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
assert Tasks.list_tasks_for(source, nil, [:available]) == [task]
|
||||
end
|
||||
|
||||
test "it lets you specify which job states to include" do
|
||||
test "lets you specify which job states to include" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
|
|
@ -52,7 +52,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert Tasks.list_tasks_for(source, nil, [:cancelled]) == []
|
||||
end
|
||||
|
||||
test "it lets you specify which worker to include" do
|
||||
test "lets you specify which worker to include" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert Tasks.list_tasks_for(source, "FooBarWorker") == []
|
||||
end
|
||||
|
||||
test "it includes all workers if no worker is specified" do
|
||||
test "includes all workers if no worker is specified" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
|
|
@ -68,32 +68,8 @@ defmodule Pinchflat.TasksTest do
|
|||
end
|
||||
end
|
||||
|
||||
describe "list_pending_tasks_for/3" do
|
||||
test "it lists pending tasks" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
assert Tasks.list_pending_tasks_for(source) == [task]
|
||||
end
|
||||
|
||||
test "it does not list non-pending tasks" do
|
||||
task = Repo.preload(task_fixture(), [:job, :source])
|
||||
:ok = Oban.cancel_job(task.job)
|
||||
|
||||
assert Tasks.list_pending_tasks_for(task.source) == []
|
||||
end
|
||||
|
||||
test "it lets you specify which worker to include" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
assert Tasks.list_pending_tasks_for(source, "TestJobWorker") == [task]
|
||||
assert Tasks.list_pending_tasks_for(source, "FooBarWorker") == []
|
||||
end
|
||||
end
|
||||
|
||||
describe "get_task!/1" do
|
||||
test "it returns the task with given id" do
|
||||
test "returns the task with given id" do
|
||||
task = task_fixture()
|
||||
assert Tasks.get_task!(task.id) == task
|
||||
end
|
||||
|
|
@ -132,7 +108,7 @@ defmodule Pinchflat.TasksTest do
|
|||
end
|
||||
|
||||
describe "create_job_with_task/2" do
|
||||
test "it enqueues the given job" do
|
||||
test "enqueues the given job" do
|
||||
media_item = media_item_fixture()
|
||||
|
||||
refute_enqueued(worker: TestJobWorker)
|
||||
|
|
@ -140,7 +116,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert_enqueued(worker: TestJobWorker)
|
||||
end
|
||||
|
||||
test "it creates a task record if successful" do
|
||||
test "creates a task record if successful" do
|
||||
source = source_fixture()
|
||||
|
||||
assert {:ok, %Task{} = task} = Tasks.create_job_with_task(TestJobWorker.new(%{}), source)
|
||||
|
|
@ -148,7 +124,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert task.source_id == source.id
|
||||
end
|
||||
|
||||
test "it returns an error if the job already exists" do
|
||||
test "returns an error if the job already exists" do
|
||||
source = source_fixture()
|
||||
job = TestJobWorker.new(%{foo: "bar"}, unique: [period: :infinity])
|
||||
|
||||
|
|
@ -156,7 +132,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert {:error, :duplicate_job} = Tasks.create_job_with_task(job, source)
|
||||
end
|
||||
|
||||
test "it returns an error if the job fails to enqueue" do
|
||||
test "returns an error if the job fails to enqueue" do
|
||||
source = source_fixture()
|
||||
|
||||
assert {:error, %Ecto.Changeset{}} = Tasks.create_job_with_task(%Ecto.Changeset{}, source)
|
||||
|
|
@ -181,7 +157,7 @@ defmodule Pinchflat.TasksTest do
|
|||
end
|
||||
|
||||
describe "delete_tasks_for/2" do
|
||||
test "it deletes tasks attached to a source" do
|
||||
test "deletes tasks attached to a source" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
|
|
@ -189,7 +165,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end
|
||||
end
|
||||
|
||||
test "it deletes the tasks attached to a media_item" do
|
||||
test "deletes the tasks attached to a media_item" do
|
||||
media_item = media_item_fixture()
|
||||
task = task_fixture(media_item_id: media_item.id)
|
||||
|
||||
|
|
@ -208,6 +184,17 @@ defmodule Pinchflat.TasksTest do
|
|||
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
|
||||
end
|
||||
|
||||
test "deletion can specify which states to include" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
assert :ok = Tasks.delete_tasks_for(source, nil, [:executing])
|
||||
assert Repo.reload!(task)
|
||||
|
||||
assert :ok = Tasks.delete_tasks_for(source, nil, [:available])
|
||||
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
|
||||
end
|
||||
|
||||
test "deletion does not impact unintended records" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
|
@ -221,7 +208,7 @@ defmodule Pinchflat.TasksTest do
|
|||
end
|
||||
|
||||
describe "delete_pending_tasks_for/1" do
|
||||
test "it deletes pending tasks attached to a source" do
|
||||
test "deletes pending tasks attached to a source" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
|
|
@ -229,7 +216,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end
|
||||
end
|
||||
|
||||
test "it does not delete non-pending tasks" do
|
||||
test "does not delete non-pending tasks" do
|
||||
source = source_fixture()
|
||||
task = Repo.preload(task_fixture(source_id: source.id), :job)
|
||||
:ok = Oban.cancel_job(task.job)
|
||||
|
|
@ -238,7 +225,7 @@ defmodule Pinchflat.TasksTest do
|
|||
assert Tasks.get_task!(task.id)
|
||||
end
|
||||
|
||||
test "it works on media_items" do
|
||||
test "works on media_items" do
|
||||
media_item = media_item_fixture()
|
||||
pending_task = task_fixture(media_item_id: media_item.id)
|
||||
cancelled_task = Repo.preload(task_fixture(media_item_id: media_item.id), :job)
|
||||
|
|
@ -259,10 +246,23 @@ defmodule Pinchflat.TasksTest do
|
|||
assert :ok = Tasks.delete_pending_tasks_for(media_item, "TestJobWorker")
|
||||
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
|
||||
end
|
||||
|
||||
test "deletion can optionall include executing tasks" do
|
||||
source = source_fixture()
|
||||
task = task_fixture(source_id: source.id)
|
||||
|
||||
from(Oban.Job, where: [id: ^task.job_id], update: [set: [state: "executing"]])
|
||||
|> Repo.update_all([])
|
||||
|
||||
assert :ok = Tasks.delete_pending_tasks_for(source, nil, include_executing: false)
|
||||
assert Repo.reload!(task)
|
||||
assert :ok = Tasks.delete_pending_tasks_for(source, nil, include_executing: true)
|
||||
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
|
||||
end
|
||||
end
|
||||
|
||||
describe "change_task/1" do
|
||||
test "it returns a task changeset" do
|
||||
test "returns a task changeset" do
|
||||
task = task_fixture()
|
||||
assert %Ecto.Changeset{} = Tasks.change_task(task)
|
||||
end
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue