Skip to content

Commit

Permalink
fix queue distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
criticalbh committed Oct 10, 2023
1 parent eff622f commit 3450de1
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 66 deletions.
3 changes: 3 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
erlang 25.3
elixir 1.14.4
nodejs 18.12.1
56 changes: 38 additions & 18 deletions lib/faktory_worker/queue_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,41 @@ defmodule FaktoryWorker.QueueManager do
def start_link(opts) do
name = opts[:name]
pool_opts = Keyword.get(opts, :worker_pool, [])
pool_size = Keyword.get(pool_opts, :size, 10)
queues = Keyword.get(pool_opts, :queues, ["default"])
state = Enum.map(queues, &map_queues/1)

Agent.start_link(fn -> state end, name: format_queue_manager_name(name))
queues = Enum.map(queues, fn queue -> map_queues(queue, pool_size) end)
concurrency_per_worker = calc_concurrency_per_worker(pool_size, queues)

Agent.start_link(fn -> {concurrency_per_worker, queues} end,
name: format_queue_manager_name(name)
)
end

@spec checkout_queues(queue_manager_name :: atom()) :: list(String.t())
def checkout_queues(queue_manager_name) do
Agent.get_and_update(queue_manager_name, fn queues ->
queues
|> Enum.map_reduce([], &map_queue_to_fetch/2)
|> format_queues_to_fetch()
Agent.get_and_update(queue_manager_name, fn {concurrency_per_worker, queues} ->
sorted_by_concurrency = Enum.sort_by(queues, & &1.max_concurrency, :desc)

{resp, state} =
sorted_by_concurrency
|> Enum.take(concurrency_per_worker)
|> Enum.map_reduce([], &map_queue_to_fetch/2)
|> format_queues_to_fetch()

state_queues =
sorted_by_concurrency
|> Enum.drop(concurrency_per_worker)
|> Enum.concat(state)

{resp, {concurrency_per_worker, state_queues}}
end)
end

@spec checkin_queues(queue_manager_name :: atom(), queues :: list(String.t())) :: :ok
def checkin_queues(queue_manager_name, queues) do
Agent.cast(queue_manager_name, fn state_queues ->
Enum.map(state_queues, &update_checkin_queues(&1, queues))
Agent.cast(queue_manager_name, fn {concurrency_per_worker, state_queues} ->
{concurrency_per_worker, Enum.map(state_queues, &update_checkin_queues(&1, queues))}
end)

:ok
Expand All @@ -42,19 +58,15 @@ defmodule FaktoryWorker.QueueManager do
:"#{name}_queue_manager"
end

defp map_queues(queue) when is_binary(queue) do
%Queue{name: queue, max_concurrency: :infinity}
defp map_queues(queue, pool_size) when is_binary(queue) do
%Queue{name: queue, max_concurrency: pool_size}
end

defp map_queues({queue, opts}) when is_binary(queue) do
max_concurrency = Keyword.get(opts, :max_concurrency, :infinity)
defp map_queues({queue, opts}, pool_size) when is_binary(queue) do
max_concurrency = Keyword.get(opts, :max_concurrency, pool_size)
%Queue{name: queue, max_concurrency: max_concurrency}
end

defp map_queue_to_fetch(%{max_concurrency: :infinity} = queue, acc) do
{queue.name, [queue | acc]}
end

defp map_queue_to_fetch(%{max_concurrency: 0} = queue, acc) do
{nil, [queue | acc]}
end
Expand All @@ -65,8 +77,6 @@ defmodule FaktoryWorker.QueueManager do
{queue.name, [queue | acc]}
end

defp update_checkin_queues(%{max_concurrency: :infinity} = queue, _), do: queue

defp update_checkin_queues(queue, checkin_queues) do
if Enum.member?(checkin_queues, queue.name) do
%{queue | max_concurrency: queue.max_concurrency + 1}
Expand All @@ -80,4 +90,14 @@ defmodule FaktoryWorker.QueueManager do
state = Enum.reverse(state)
{queues, state}
end

defp calc_concurrency_per_worker(pool_size, queues) do
sum_max_concurrency = queues |> Enum.map(& &1.max_concurrency) |> Enum.sum()
queues_per_worker = round(sum_max_concurrency / pool_size)

case queues_per_worker < 1 do
true -> 1
false -> queues_per_worker
end
end
end
45 changes: 24 additions & 21 deletions lib/faktory_worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ defmodule FaktoryWorker.Worker do

%{
state
| worker_state: :running_job,
job_timeout_ref: timeout_ref,
job_start: job_start,
job_ref: job_ref,
job_id: job["jid"],
job: job
| worker_state: :running_job,
job_timeout_ref: timeout_ref,
job_start: job_start,
job_ref: job_ref,
job_id: job["jid"],
job: job
}
end

Expand Down Expand Up @@ -200,13 +200,13 @@ defmodule FaktoryWorker.Worker do

schedule_fetch(%{
state
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
})
end

Expand All @@ -223,18 +223,21 @@ defmodule FaktoryWorker.Worker do

schedule_fetch(%{
state
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
})
end

defp checkout_queues(%{queues: nil} = state) do
QueueManager.checkout_queues(queue_manager_name(state))
state
|> queue_manager_name
|> QueueManager.checkout_queues()
|> Enum.shuffle()
end

defp checkout_queues(%{queues: queues}), do: queues
Expand Down
116 changes: 96 additions & 20 deletions test/faktory_worker/queue_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ defmodule FaktoryWorker.QueueManagerTest do
test "should set the queues state" do
opts = [
name: FaktoryWorker,
worker_pool: [queues: ["test1", {"test2", max_concurrency: 5}, "test3"]]
worker_pool: [size: 10, queues: ["test1", {"test2", max_concurrency: 5}, "test3"]]
]

{:ok, pid} = QueueManager.start_link(opts)

state = :sys.get_state(pid)
{_concurrency_per_worker, queues} = :sys.get_state(pid)

assert state == [
%QueueManager.Queue{name: "test1", max_concurrency: :infinity},
assert queues == [
%QueueManager.Queue{name: "test1", max_concurrency: 10},
%QueueManager.Queue{name: "test2", max_concurrency: 5},
%QueueManager.Queue{name: "test3", max_concurrency: :infinity}
%QueueManager.Queue{name: "test3", max_concurrency: 10}
]
end
end
Expand All @@ -42,53 +42,61 @@ defmodule FaktoryWorker.QueueManagerTest do
test "should return a list of queues eligible to fetch on" do
opts = [
name: FaktoryWorker,
worker_pool: [queues: ["test1", {"test2", max_concurrency: 5}, "test3"]]
worker_pool: [
size: 10,
queues: ["test1", {"test2", max_concurrency: 5}, {"test3", max_concurrency: 11}]
]
]

{:ok, pid} = QueueManager.start_link(opts)

queues = QueueManager.checkout_queues(pid)

assert queues == ["test1", "test2", "test3"]
assert queues == ["test3", "test1", "test2"]
end

test "should update the max concurrency state when checking out a queue" do
opts = [
name: FaktoryWorker,
worker_pool: [queues: ["test1", {"test2", max_concurrency: 5}]]
worker_pool: [size: 10, queues: ["test1", {"test2", max_concurrency: 5}]]
]

{:ok, pid} = QueueManager.start_link(opts)

queues = QueueManager.checkout_queues(pid)
state = :sys.get_state(pid)
{_concurrency_per_worker, state} = :sys.get_state(pid)

assert queues == ["test1", "test2"]

assert state == [
%QueueManager.Queue{name: "test1", max_concurrency: :infinity},
%QueueManager.Queue{name: "test1", max_concurrency: 9},
%QueueManager.Queue{name: "test2", max_concurrency: 4}
]
end

test "should not return a queue when the max concurrency reaches 0" do
opts = [
name: FaktoryWorker,
worker_pool: [queues: ["test1", {"test2", max_concurrency: 1}]]
worker_pool: [
size: 10,
queues: [{"test1", max_concurrency: 1}, {"test2", max_concurrency: 2}]
]
]

{:ok, pid} = QueueManager.start_link(opts)

queues_result1 = QueueManager.checkout_queues(pid)
queues_result2 = QueueManager.checkout_queues(pid)
queues_result3 = QueueManager.checkout_queues(pid)

state = :sys.get_state(pid)
{_concurrency_per_worker, state} = :sys.get_state(pid)

assert queues_result1 == ["test1", "test2"]
assert queues_result1 == ["test2"]
assert queues_result2 == ["test1"]
assert queues_result3 == ["test2"]

assert state == [
%QueueManager.Queue{name: "test1", max_concurrency: :infinity},
%QueueManager.Queue{name: "test1", max_concurrency: 0},
%QueueManager.Queue{name: "test2", max_concurrency: 0}
]
end
Expand All @@ -98,37 +106,105 @@ defmodule FaktoryWorker.QueueManagerTest do
test "it should update the queues max concurrency counts when checking in a queue with a max concurrency set" do
opts = [
name: FaktoryWorker,
worker_pool: [queues: ["test1", {"test2", max_concurrency: 1}]]
worker_pool: [size: 10, queues: ["test1", {"test2", max_concurrency: 1}]]
]

{:ok, pid} = QueueManager.start_link(opts)

:ok = QueueManager.checkin_queues(pid, ["test1", "test2"])

state = :sys.get_state(pid)
{_concurrency_per_worker, state} = :sys.get_state(pid)

assert state == [
%QueueManager.Queue{name: "test1", max_concurrency: :infinity},
%QueueManager.Queue{name: "test1", max_concurrency: 11},
%QueueManager.Queue{name: "test2", max_concurrency: 2}
]
end

test "it should not update the max concurrency value when the queue is not checked in" do
opts = [
name: FaktoryWorker,
worker_pool: [queues: ["test1", {"test2", max_concurrency: 1}]]
worker_pool: [size: 10, queues: ["test1", {"test2", max_concurrency: 1}]]
]

{:ok, pid} = QueueManager.start_link(opts)

:ok = QueueManager.checkin_queues(pid, ["test1"])

state = :sys.get_state(pid)
{_concurrency_per_worker, state} = :sys.get_state(pid)

assert state == [
%QueueManager.Queue{name: "test1", max_concurrency: :infinity},
%QueueManager.Queue{name: "test1", max_concurrency: 11},
%QueueManager.Queue{name: "test2", max_concurrency: 1}
]
end
end

describe "queue distribution" do
test "it should distribute queues evenly in limited concurrency" do
opts = [
name: FaktoryWorker,
worker_pool: [
size: 3,
queues: [
{"A", max_concurrency: 2},
{"B", max_concurrency: 3},
{"C", max_concurrency: 1},
{"D", max_concurrency: 3}
]
]
]

{:ok, pid} = QueueManager.start_link(opts)

queues_result1 = pid |> QueueManager.checkout_queues() |> Enum.sort()
queues_result2 = pid |> QueueManager.checkout_queues() |> Enum.sort()
queues_result3 = pid |> QueueManager.checkout_queues() |> Enum.sort()
queues_result4 = pid |> QueueManager.checkout_queues() |> Enum.sort()

assert queues_result1 == ["A", "B", "D"]
assert queues_result2 == ["B", "C", "D"]
assert queues_result3 == ["A", "B", "D"]
assert queues_result4 == []

:ok = QueueManager.checkin_queues(pid, queues_result1)

queues_result5 = pid |> QueueManager.checkout_queues() |> Enum.sort()

assert queues_result1 == queues_result5
end

test "every worker will get every queue in infinite concurrency" do
opts = [
name: FaktoryWorker,
worker_pool: [
size: 3,
queues: [
"A",
"B",
"C",
"D"
]
]
]

{:ok, pid} = QueueManager.start_link(opts)

queues_result1 = pid |> QueueManager.checkout_queues() |> Enum.sort()
queues_result2 = pid |> QueueManager.checkout_queues() |> Enum.sort()
queues_result3 = pid |> QueueManager.checkout_queues() |> Enum.sort()
queues_result4 = pid |> QueueManager.checkout_queues() |> Enum.sort()

assert queues_result1 == ["A", "B", "C", "D"]
assert queues_result2 == ["A", "B", "C", "D"]
assert queues_result3 == ["A", "B", "C", "D"]
assert queues_result4 == []

:ok = QueueManager.checkin_queues(pid, queues_result1)

queues_result5 = pid |> QueueManager.checkout_queues() |> Enum.sort()

assert queues_result1 == queues_result5
end
end
end
Loading

0 comments on commit 3450de1

Please sign in to comment.