Not enough resources but still need to get things done? Welcome to the Workforce - get on track and Godspeed! you! to! management! (this introduction could use some more exclamation marks! there! fixed it!)
Workforce is a fast, reliable and customisable generic worker pool for Erlang/Elixir.
% #{
% sup_name => {local, workforce_supervisor},
% watcher_name => {local, workforce_watcher},
% pool_name => {local, workforce},
% worker => nil,
% default_workers => 20,
% max_workers => 25,
% max_queue => 0,
% hibernate_when_free => false,
% shrink_timeout => {5000, 10000}
% } % defaults
Config = #{worker => {my_nifty_worker, start_link, []}, default_workers => 5, max_workers => 6},
{ok, _Sup_Pid} = workforce_supervisor:start_link(Config),
{ok, Worker1} = workforce:checkout(workforce),
Worker1 ! {work_you_lazy_bum, 1},
receive {Worker1, done} end,
workforce:checkin(workforce, Worker1).
{deps, [{workforce, "0.1.0"}]}.
Add to your deps:
defp deps do
[
# ...
{:workforce, "~> 0.1.0"}
]
end
To use workforce correctly start it as part of a supervision tree and pass it a map containing at least the key worker
set to the {module, function, arguments_list}
that starts your worker. Optionally if you want to use the defaults you can pass directly the tuple to start_link workforce_supervisor:start_link({module, function, args}
. To customise the options use the map form and include any of the following keys:
- sup_name/watcher_name/pool_name: how each of the elements are going to be registered;
- default_workers: Number of workers to start by default;
- max_workers: Number of maximum workers, >= than default_workers, if higher, then an additional up to N = (Maximum - Default) workers will be started whenever the pooler requires to answer requests;
- max_queue: If the pooler is allowed to enqueue requests when no worker is available, 0 means no, -1 means yes in an unbound queue, N > 0 means yes up to N;
- hibernate_when_free - if the pooler should trigger an hibernate/gc whenever the number of available workers == the number of active workers (impacts performance);
- shrink_timeout - If max_workers > than default_workers, whenever an extra worker is started how much should it wait to check if a worker is no longer necessary (the defaults are quite okey for about everything - this option is mostly to ease testing). Can be passed as a positive integer (same timeout always) or a tuple {positive_integer, positive_integer}, where the first element is the first timeout from creation, and the second is all subsequent timeouts. A worker is decomissioned when the number of Active workers is > than the Default Workers, and the Number of Available Workers is > than the Default/5, checked on the specified timeout intervals.
The workforce API has 3 different kinds of checkout interfaces for different use cases.
This is the normal form and should be used whenever both the requester and the pooler are on the same node.
{ok, Worker1} = workforce:checkout(Pid_or_name_of_pool),
%defaults to: workforce:checkout(Pid_or_name, 5000, false)
Timeout = 3000 % a positive integer or infinity, use infinity with care
{ok, Worker2} = workforce:checkout(Pid_or_name_of_pool, Timeout),
ForceEnqueue = true, % default is false, forces this request to be enqueued independently of the pooler having the queue enabled or not
{ok, Worker3} = workforce:checkout(Pid_or_name_of_pool, 5000, true).
This form should be used whenever the processes requesting work from the pooler may be on different nodes from the pooler. Underneath, if both the caller and the pooler are on the same node it uses the regular checkout, if they're not then it uses a gen_statem:call
- this means it can raise any call
exception and you should take that into account. This mode doesn't allow for requests to be enqueued.
{ok, Worker1} = workforce:remote_checkout(Pid_or_name_of_pool).
% defaults to: workforce:remote_checkout(Pid_or_name, 5000)
To always force the remote path even when in the same node use workforce:remote_checkout(Pid_or_name, Timeout, force)
.
This form is for when you want full control of the request, e.g., you want to request workers from inside OTP behavioured processes without blocking. In this form you need to do mostly everything by yourself, in exchange it is completely asynchronous and you receive messages (use with care on distributed settings due to send and pray semantics - Amen!).
Mref = workforce:async_checkout(Pool_pid),
receive {Mref, overloaded} -> ok end,
demonitor(Mref, [flush]),
Mref2 = workforce:async_checkout(Pool_pid),
receive {Mref2, enqueued} -> ok end,
receive {Mref2, Worker1} when is_pid(Worker1) ->
workforce:async_confirm(Pool_pid, Mref2),
% do work
workforce:checkin(Pool_pid, Worker1),
demonitor(Mref2, [flush])
end.
A more complete example is further down.
workforce:checkin(Pool_pid_or_name, Worker_pid).
#{
state := State, % starting | ready
queue := Queue, % queue:queue()
pool := Pool, % queue:queue()
q_count := Qc, % non_neg_integer() - Number of active requests enqueued
p_count := Pc, % non_neg_integer() - Number of available workers
q_track := Qtrack, % map() - Mapping of enqueued requests #{From => 0 | 1} (1 cancelled)
active := Active, % non_neg_integer() - Number of Running workers
starting := Starting, % non_neg_integer() - Number of Workers currently starting
max_w := Max_w, % non_neg_integer() - Max number of allowed workers
default_w := Def_w, % non_neg_integer() - Default number of workers
max_q := Max_q, % integer() - -1 Unbound queue, 0 no queue, N > 0 Max N queue
monitors := Monitors, % map() - Worker monitors refs
checked_out := Cout, % map() - Currently checked out Worker mappings
in_transit := It, % map() - Current workers currently checked out but not yet aknowledged as received,
requests_monitors := Req_mons, % map() - Requester monitors refs mappings
watcher := Watcher, % pid() - Pid of the watcher process gen_server
ets := Ets, % reference() - ETS table reference for overload prechecks
overloaded := O, % 0 | 1 - Current internal state, 0 not overloaded, 1 overloaded
hibernate_when_free := HWF, % false | true - if should hibernate when totally free
shrink_timeout := St % non_neg_integer | {nni, nni} - Intervals to decomission each additionally started worker
} = workforce:state_as_map(Pool_pid_or_name).
Any of the API calls accepts the Pool name to be given as:
Pid :: pid()
Name :: atom() (when registered locally at start up)
{local, Name :: atom()}
{global, Term :: term()}
{via, Module :: atom(), Term :: term()}
workforce:checkout/1/2/3
(Pid_or_name, Timeout :: timeout() - defaults to 5000, ForceEnqueue :: boolean() - defaults to false)
% possible results
{error, timeout}
{error, overloaded}
{error, {pool_down, Reason}}
{error, {no_such_pool, Name}}
{ok, Pid}
workforce:remote_checkout/1/2/3
(Pid_or_name, Timeout :: timeout(), force)
% possible results
{error, overloaded}
{error, {no_such_pool, Name}}
{ok, Pid}
call exception
workforce:async_checkout/1/2
(Pid_or_name, ForceEnqueue :: boolean())
% possible results
reference()
{error, {no_such_pool, Name}}
Which can then be used to match incoming messages related to the request, the calling process may receive any of the following:
{reference(), overloaded}
{reference(), enqueued}
{reference(), Pid} when is_pid(Pid)
{'DOWN', reference(), _, Pool_pid, Reason}
On receiving a worker, the caller process needs to acknowledge it received the worker so the pooler can monitor the caller in order to return the worker in case of the caller crashing, with:
workforce:async_confirm(Pool_pid_or_name, Reference).
In case you want to cancel a request you can use:
workforce:async_cancel(Pool_pid_or_name, Reference).
This will also demonitor the Reference.
If you don't call async_cancel you need to demonitor it yourself after receiving the Worker pid.
If you receive {reference(), enqueued}
this means you will eventually either receive another {reference(), Pid}
when your time reaches in the queue, or {'DOWN'......}
if the pooler dies in the meanwhile.
workforce:checkin/2
(Pid_or_name, Worker_pid)
%possible results
ok
workforce:synchronous_checkin/2
(Pid_or_name, Worker :: pid(), Timeout :: timeout() - defaults to 5000)
%possible results
ok | {error, no_such_pool()} | no_return()
checkin/2
does a cast while synchronous_checkin/2,3
does a call. This can be useful if you want to implement the logic for checkin on the worker process itself (e.g. after doing wtv it checks itself in) since there you can use an infinity
timeout and it will either succeed or raise, meaning the worker gets back to the pool or dies (subsequently starting another one if needed).
The max_queue value should be used reasonably, usually a low number equal to the number of workers tends to improve the performance, while an unbound queue or a large queue might slow it down if the timeouts are short relative to its size and the average time a worker remains checked out. It all depends though and you should benchmark your particular scenarios to see if it improves or worsens. You can always force specific requests to be enqueued when checking out even if the queue is set to 0, the same if it's not set to 0 but full already, a forced checkout will always enqueue no matter what. This means you can have a queue for certain "regular" requests that is respected by these requests and then always force some specific requests to be enqueued.
The workforce_watcher is responsible for starting workers. Right now it's not a supervisor so a worker that repeatedly crashes will go unnoticed, meaning it won't trigger a supervisor failure. This might change in the future if requested.
It calls apply(M, F, A) and always links to the worker. On an controlled shutdown it will tell all workers to exit with shutdown
, and the same when an extra worker is to be removed.
-module(async_example).
-behaviour(gen_server).
-export([start_link/1, init/1, handle_continue/2, handle_call/3, handle_cast/2, handle_info/2]).
-record(state, {
refs = #{} :: map(),
workers = #{} :: map(),
monitors = #{} :: map(),
config = #{} :: map(),
}
).
start_link(Config)->
gen_server:start_link(?MODULE, Config, []).
init(Config)->
{ok, #state{config = Config}}.
handle_call({start_work, Data}, {Pid, _}, #state{refs = Refs} = State)->
Mref = workforce:async_checkout(workforce, true),
N_refs = maps:put(Mref, {Pid, waiting, Data}, Refs),
erlang:send_after(15000, self(), {Mref, timeout}),
{reply, {ok, Mref}, State#state{refs = N_refs}};
handle_call({Mref, finished, Results}, {Pid, _}, #state{monitors = Mons, workers = Workers} = State) ->
{{P_ref, Mref, Worker_pid, _}, N_mons} = maps:take(Pid, Mons),
{{Mref, Original_caller, _}, N_workers} = maps:take(Worker_pid, Workers),
ets:insert(some_public_ets, Results),
workforce:checkin(workforce, Worker_pid),
demonitor(P_ref, [flush]),
Original_caller ! {finished, Mref},
{reply, ok, State#state{monitors = N_mons, workers = N_workers}}.
handle_info({Mref, enqueued}, #state{refs = Refs} = State) when is_map_key(Mref, Refs) ->
N_refs = maps:update_with(Mref, fun({Pid, _, Data})-> {Pid, enqueued, Data} end, Refs),
{noreply, State#state{refs = N_refs}};
handle_info({Mref, Pid}, #state{refs = Refs, workers = Workers} = State) when is_pid(Pid), is_map_key(Mref, Refs) ->
{{Original_caller, _, Data}, N_refs} = maps:take(Mref, Refs),
N_workers = maps:put(Pid, {Mref, Original_caller, Data}, Workers),
workforce:async_confirm(workforce, Mref),
demonitor(Mref, [flush])
Original_caller ! {starting_work, Mref},
{noreply, State#state{refs = N_refs, workers = N_workers}, {continue, {work, Pid}}};
handle_info({Mref, overloaded}, #state{refs = Refs} = State) when is_map_key(Mref, Refs) ->
{{Original_caller, _, _}, N_refs} = maps:take(Mref, Refs),
Original_caller ! {failed, Mref},
{noreply, State#state{refs = N_refs}};
handle_info({Mref, timeout}, #state{refs = Refs} = State) when is_map_key(Mref, Refs) ->
{{Original_caller, _, _}, N_refs} = maps:take(Mref, Refs),
workforce:async_cancel(workforce, Mref),
Original_caller ! {failed, Mref},
{noreply, State#state{refs = N_refs}};
handle_info(_, State)-> {noreply, State}.
handle_continue({work, Pid}, #state{workers = Workers, monitors = Mons} = State) ->
{Mref, _, Data} = maps:get(Pid, Workers),
{P_pid, P_ref} = spawn_monitor(some_module, do_work, [Mref, self(), Pid, Data]),
N_mons = maps:put(P_pid, {P_ref, Mref, Pid, Data}, Mons),
{noreply, State#state{monitors = N_mons}}.
handle_cast(_, State) -> {noreply, State}.
This is a fairly contrived example, because there's no reason to be async, but there could be, it's just to illustrate how it can be used, there's missing handles for dealing with processes going down, etc...
There's some comparisons with poolboy in the benchmarking
folder, Read, and also the module used to run them. If you want to run them yourself you'll need to create a folder, copy all modules from /src into it, add the poolboy modules and then you can try it out from an erl shell. I'll try to checkout other pooler implementations to compare and add there. Those benchmarks were done with the first version that had an overload timeout of 50ms. In the meanwhile it was reduced and it's now faster but I haven't redone them.
- Maybe switch the watcher implementation to be a supervisor instead of a gen_server;
- Add common tests to test the various checkouts when requester node is different from the workforce node
© rooster image in the cocktail logo
Copyright [2019] [Micael Nussbaumer]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use the files contained in this library except in
compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.