diff --git a/lib/faktory_worker/queue_manager.ex b/lib/faktory_worker/queue_manager.ex index b3fa804..2116fd4 100644 --- a/lib/faktory_worker/queue_manager.ex +++ b/lib/faktory_worker/queue_manager.ex @@ -13,25 +13,41 @@ defmodule FaktoryWorker.QueueManager do def start_link(opts) do name = opts[:name] pool_opts = Keyword.get(opts, :worker_pool, []) + pool_size = Keyword.get(pool_opts, :size, 10) queues = Keyword.get(pool_opts, :queues, ["default"]) - state = Enum.map(queues, &map_queues/1) - Agent.start_link(fn -> state end, name: format_queue_manager_name(name)) + queues = Enum.map(queues, fn queue -> map_queues(queue, pool_size) end) + concurrency_per_worker = calc_concurrency_per_worker(pool_size, queues) + + Agent.start_link(fn -> {concurrency_per_worker, queues} end, + name: format_queue_manager_name(name) + ) end @spec checkout_queues(queue_manager_name :: atom()) :: list(String.t()) def checkout_queues(queue_manager_name) do - Agent.get_and_update(queue_manager_name, fn queues -> - queues - |> Enum.map_reduce([], &map_queue_to_fetch/2) - |> format_queues_to_fetch() + Agent.get_and_update(queue_manager_name, fn {concurrency_per_worker, queues} -> + sorted_by_concurrency = Enum.sort_by(queues, & &1.max_concurrency, :desc) + + {resp, state} = + sorted_by_concurrency + |> Enum.take(concurrency_per_worker) + |> Enum.map_reduce([], &map_queue_to_fetch/2) + |> format_queues_to_fetch() + + state_queues = + sorted_by_concurrency + |> Enum.drop(concurrency_per_worker) + |> Enum.concat(state) + + {resp, {concurrency_per_worker, state_queues}} end) end @spec checkin_queues(queue_manager_name :: atom(), queues :: list(String.t())) :: :ok def checkin_queues(queue_manager_name, queues) do - Agent.cast(queue_manager_name, fn state_queues -> - Enum.map(state_queues, &update_checkin_queues(&1, queues)) + Agent.cast(queue_manager_name, fn {concurrency_per_worker, state_queues} -> + {concurrency_per_worker, Enum.map(state_queues, &update_checkin_queues(&1, queues))} end) :ok @@ -42,19 +58,15 @@ defmodule FaktoryWorker.QueueManager do :"#{name}_queue_manager" end - defp map_queues(queue) when is_binary(queue) do - %Queue{name: queue, max_concurrency: :infinity} + defp map_queues(queue, pool_size) when is_binary(queue) do + %Queue{name: queue, max_concurrency: pool_size} end - defp map_queues({queue, opts}) when is_binary(queue) do - max_concurrency = Keyword.get(opts, :max_concurrency, :infinity) + defp map_queues({queue, opts}, pool_size) when is_binary(queue) do + max_concurrency = Keyword.get(opts, :max_concurrency, pool_size) %Queue{name: queue, max_concurrency: max_concurrency} end - defp map_queue_to_fetch(%{max_concurrency: :infinity} = queue, acc) do - {queue.name, [queue | acc]} - end - defp map_queue_to_fetch(%{max_concurrency: 0} = queue, acc) do {nil, [queue | acc]} end @@ -65,8 +77,6 @@ defmodule FaktoryWorker.QueueManager do {queue.name, [queue | acc]} end - defp update_checkin_queues(%{max_concurrency: :infinity} = queue, _), do: queue - defp update_checkin_queues(queue, checkin_queues) do if Enum.member?(checkin_queues, queue.name) do %{queue | max_concurrency: queue.max_concurrency + 1} @@ -80,4 +90,9 @@ defmodule FaktoryWorker.QueueManager do state = Enum.reverse(state) {queues, state} end + + defp calc_concurrency_per_worker(pool_size, queues) do + sum_max_concurrency = queues |> Enum.map(& &1.max_concurrency) |> Enum.sum() + ceil(sum_max_concurrency / pool_size) + end end diff --git a/lib/faktory_worker/worker.ex b/lib/faktory_worker/worker.ex index 2d81372..680c894 100644 --- a/lib/faktory_worker/worker.ex +++ b/lib/faktory_worker/worker.ex @@ -146,12 +146,12 @@ defmodule FaktoryWorker.Worker do %{ state - | worker_state: :running_job, - job_timeout_ref: timeout_ref, - job_start: job_start, - job_ref: job_ref, - job_id: job["jid"], - job: job + | worker_state: :running_job, + job_timeout_ref: timeout_ref, + job_start: job_start, + job_ref: job_ref, + job_id: job["jid"], + job: job } end @@ -200,13 +200,13 @@ defmodule FaktoryWorker.Worker do schedule_fetch(%{ state - | worker_state: :ok, - queues: nil, - job_timeout_ref: nil, - job_start: nil, - job_ref: nil, - job_id: nil, - job: nil + | worker_state: :ok, + queues: nil, + job_timeout_ref: nil, + job_start: nil, + job_ref: nil, + job_id: nil, + job: nil }) end @@ -223,18 +223,21 @@ defmodule FaktoryWorker.Worker do schedule_fetch(%{ state - | worker_state: :ok, - queues: nil, - job_timeout_ref: nil, - job_start: nil, - job_ref: nil, - job_id: nil, - job: nil + | worker_state: :ok, + queues: nil, + job_timeout_ref: nil, + job_start: nil, + job_ref: nil, + job_id: nil, + job: nil }) end defp checkout_queues(%{queues: nil} = state) do - QueueManager.checkout_queues(queue_manager_name(state)) + state + |> queue_manager_name + |> QueueManager.checkout_queues() + |> Enum.shuffle() end defp checkout_queues(%{queues: queues}), do: queues diff --git a/mix.exs b/mix.exs index 2381924..71ca277 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule FaktoryWorker.MixProject do def project do [ app: :faktory_worker, - version: "1.9.5", + version: "1.9.6", elixir: "~> 1.8", description: description(), package: package(), diff --git a/test/faktory_worker/queue_manager_test.exs b/test/faktory_worker/queue_manager_test.exs index 89b6fc4..154d9d4 100644 --- a/test/faktory_worker/queue_manager_test.exs +++ b/test/faktory_worker/queue_manager_test.exs @@ -17,17 +17,17 @@ defmodule FaktoryWorker.QueueManagerTest do test "should set the queues state" do opts = [ name: FaktoryWorker, - worker_pool: [queues: ["test1", {"test2", max_concurrency: 5}, "test3"]] + worker_pool: [size: 10, queues: ["test1", {"test2", max_concurrency: 5}, "test3"]] ] {:ok, pid} = QueueManager.start_link(opts) - state = :sys.get_state(pid) + {_concurrency_per_worker, queues} = :sys.get_state(pid) - assert state == [ - %QueueManager.Queue{name: "test1", max_concurrency: :infinity}, + assert queues == [ + %QueueManager.Queue{name: "test1", max_concurrency: 10}, %QueueManager.Queue{name: "test2", max_concurrency: 5}, - %QueueManager.Queue{name: "test3", max_concurrency: :infinity} + %QueueManager.Queue{name: "test3", max_concurrency: 10} ] end end @@ -42,31 +42,34 @@ defmodule FaktoryWorker.QueueManagerTest do test "should return a list of queues eligible to fetch on" do opts = [ name: FaktoryWorker, - worker_pool: [queues: ["test1", {"test2", max_concurrency: 5}, "test3"]] + worker_pool: [ + size: 10, + queues: ["test1", {"test2", max_concurrency: 5}, {"test3", max_concurrency: 11}] + ] ] {:ok, pid} = QueueManager.start_link(opts) queues = QueueManager.checkout_queues(pid) - assert queues == ["test1", "test2", "test3"] + assert queues == ["test3", "test1", "test2"] end test "should update the max concurrency state when checking out a queue" do opts = [ name: FaktoryWorker, - worker_pool: [queues: ["test1", {"test2", max_concurrency: 5}]] + worker_pool: [size: 10, queues: ["test1", {"test2", max_concurrency: 5}]] ] {:ok, pid} = QueueManager.start_link(opts) queues = QueueManager.checkout_queues(pid) - state = :sys.get_state(pid) + {_concurrency_per_worker, state} = :sys.get_state(pid) assert queues == ["test1", "test2"] assert state == [ - %QueueManager.Queue{name: "test1", max_concurrency: :infinity}, + %QueueManager.Queue{name: "test1", max_concurrency: 9}, %QueueManager.Queue{name: "test2", max_concurrency: 4} ] end @@ -74,21 +77,26 @@ defmodule FaktoryWorker.QueueManagerTest do test "should not return a queue when the max concurrency reaches 0" do opts = [ name: FaktoryWorker, - worker_pool: [queues: ["test1", {"test2", max_concurrency: 1}]] + worker_pool: [ + size: 10, + queues: [{"test1", max_concurrency: 1}, {"test2", max_concurrency: 2}] + ] ] {:ok, pid} = QueueManager.start_link(opts) queues_result1 = QueueManager.checkout_queues(pid) queues_result2 = QueueManager.checkout_queues(pid) + queues_result3 = QueueManager.checkout_queues(pid) - state = :sys.get_state(pid) + {_concurrency_per_worker, state} = :sys.get_state(pid) - assert queues_result1 == ["test1", "test2"] + assert queues_result1 == ["test2"] assert queues_result2 == ["test1"] + assert queues_result3 == ["test2"] assert state == [ - %QueueManager.Queue{name: "test1", max_concurrency: :infinity}, + %QueueManager.Queue{name: "test1", max_concurrency: 0}, %QueueManager.Queue{name: "test2", max_concurrency: 0} ] end @@ -98,17 +106,17 @@ defmodule FaktoryWorker.QueueManagerTest do test "it should update the queues max concurrency counts when checking in a queue with a max concurrency set" do opts = [ name: FaktoryWorker, - worker_pool: [queues: ["test1", {"test2", max_concurrency: 1}]] + worker_pool: [size: 10, queues: ["test1", {"test2", max_concurrency: 1}]] ] {:ok, pid} = QueueManager.start_link(opts) :ok = QueueManager.checkin_queues(pid, ["test1", "test2"]) - state = :sys.get_state(pid) + {_concurrency_per_worker, state} = :sys.get_state(pid) assert state == [ - %QueueManager.Queue{name: "test1", max_concurrency: :infinity}, + %QueueManager.Queue{name: "test1", max_concurrency: 11}, %QueueManager.Queue{name: "test2", max_concurrency: 2} ] end @@ -116,19 +124,87 @@ defmodule FaktoryWorker.QueueManagerTest do test "it should not update the max concurrency value when the queue is not checked in" do opts = [ name: FaktoryWorker, - worker_pool: [queues: ["test1", {"test2", max_concurrency: 1}]] + worker_pool: [size: 10, queues: ["test1", {"test2", max_concurrency: 1}]] ] {:ok, pid} = QueueManager.start_link(opts) :ok = QueueManager.checkin_queues(pid, ["test1"]) - state = :sys.get_state(pid) + {_concurrency_per_worker, state} = :sys.get_state(pid) assert state == [ - %QueueManager.Queue{name: "test1", max_concurrency: :infinity}, + %QueueManager.Queue{name: "test1", max_concurrency: 11}, %QueueManager.Queue{name: "test2", max_concurrency: 1} ] end end + + describe "queue distribution" do + test "it should distribute queues evenly in limited concurrency" do + opts = [ + name: FaktoryWorker, + worker_pool: [ + size: 3, + queues: [ + {"A", max_concurrency: 2}, + {"B", max_concurrency: 3}, + {"C", max_concurrency: 1}, + {"D", max_concurrency: 3} + ] + ] + ] + + {:ok, pid} = QueueManager.start_link(opts) + + queues_result1 = pid |> QueueManager.checkout_queues() |> Enum.sort() + queues_result2 = pid |> QueueManager.checkout_queues() |> Enum.sort() + queues_result3 = pid |> QueueManager.checkout_queues() |> Enum.sort() + queues_result4 = pid |> QueueManager.checkout_queues() |> Enum.sort() + + assert queues_result1 == ["A", "B", "D"] + assert queues_result2 == ["B", "C", "D"] + assert queues_result3 == ["A", "B", "D"] + assert queues_result4 == [] + + :ok = QueueManager.checkin_queues(pid, queues_result1) + + queues_result5 = pid |> QueueManager.checkout_queues() |> Enum.sort() + + assert queues_result1 == queues_result5 + end + + test "every worker will get every queue in infinite concurrency" do + opts = [ + name: FaktoryWorker, + worker_pool: [ + size: 3, + queues: [ + "A", + "B", + "C", + "D" + ] + ] + ] + + {:ok, pid} = QueueManager.start_link(opts) + + queues_result1 = pid |> QueueManager.checkout_queues() |> Enum.sort() + queues_result2 = pid |> QueueManager.checkout_queues() |> Enum.sort() + queues_result3 = pid |> QueueManager.checkout_queues() |> Enum.sort() + queues_result4 = pid |> QueueManager.checkout_queues() |> Enum.sort() + + assert queues_result1 == ["A", "B", "C", "D"] + assert queues_result2 == ["A", "B", "C", "D"] + assert queues_result3 == ["A", "B", "C", "D"] + assert queues_result4 == [] + + :ok = QueueManager.checkin_queues(pid, queues_result1) + + queues_result5 = pid |> QueueManager.checkout_queues() |> Enum.sort() + + assert queues_result1 == queues_result5 + end + end end diff --git a/test/faktory_worker/worker/server_test.exs b/test/faktory_worker/worker/server_test.exs index 518fd81..ea0b1aa 100644 --- a/test/faktory_worker/worker/server_test.exs +++ b/test/faktory_worker/worker/server_test.exs @@ -32,7 +32,7 @@ defmodule FaktoryWorker.Worker.ServerTest do id: :test_worker_1, start: {FaktoryWorker.Worker.Server, :start_link, - [[name: :test_worker_1, connection: [port: 7000]]]}, + [[name: :test_worker_1, connection: [port: 7000]]]}, type: :worker } end @@ -115,11 +115,11 @@ defmodule FaktoryWorker.Worker.ServerTest do pid = start_supervised!({QueueManager, name: FaktoryWorker, worker_pool: worker_pool}) state = %{faktory_name: FaktoryWorker, queues: QueueManager.checkout_queues(pid)} - manager_state_before = :sys.get_state(pid) + {_, manager_state_before} = :sys.get_state(pid) Server.terminate(:shutdown, state) - manager_state_after = :sys.get_state(pid) + {_, manager_state_after} = :sys.get_state(pid) assert manager_state_before == [%QueueManager.Queue{name: "test_queue", max_concurrency: 0}] assert manager_state_after == [%QueueManager.Queue{name: "test_queue", max_concurrency: 1}] diff --git a/test/faktory_worker/worker_test.exs b/test/faktory_worker/worker_test.exs index 4b01680..4b3acbc 100644 --- a/test/faktory_worker/worker_test.exs +++ b/test/faktory_worker/worker_test.exs @@ -988,11 +988,11 @@ defmodule FaktoryWorker.WorkerTest do pid = start_supervised!({QueueManager, name: FaktoryWorker, worker_pool: worker_pool}) state = %{faktory_name: FaktoryWorker, queues: QueueManager.checkout_queues(pid)} - manager_state_before = :sys.get_state(pid) + {_, manager_state_before} = :sys.get_state(pid) Worker.checkin_queues(state) - manager_state_after = :sys.get_state(pid) + {_, manager_state_after} = :sys.get_state(pid) assert manager_state_before == [%QueueManager.Queue{name: "test_queue", max_concurrency: 0}] assert manager_state_after == [%QueueManager.Queue{name: "test_queue", max_concurrency: 1}] @@ -1004,11 +1004,11 @@ defmodule FaktoryWorker.WorkerTest do pid = start_supervised!({QueueManager, name: FaktoryWorker, worker_pool: worker_pool}) state = %{faktory_name: FaktoryWorker, queues: nil} - manager_state_before = :sys.get_state(pid) + {_, manager_state_before} = :sys.get_state(pid) :ok = Worker.checkin_queues(state) - manager_state_after = :sys.get_state(pid) + {_, manager_state_after} = :sys.get_state(pid) assert manager_state_before == [%QueueManager.Queue{name: "test_queue", max_concurrency: 1}] assert manager_state_after == [%QueueManager.Queue{name: "test_queue", max_concurrency: 1}]