Add types and docs to rebar_parallel
ferd committed Jan 26, 2023
1 parent 0b99c8f commit b23b689
Showing 3 changed files with 151 additions and 25 deletions.
4 changes: 2 additions & 2 deletions apps/rebar/src/rebar_compiler_dag.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Pool) ->
LastModified when LastUpdated < LastModified ->
digraph:add_vertex(G, Source, LastModified),
Work = fun() -> {old, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end,
rebar_parallel:pool_task(Pool, Work),
rebar_parallel:pool_task_async(Pool, Work),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool);
_ -> % unchanged
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool)
Expand All @@ -188,7 +188,7 @@ populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Pool) ->
LastModified = filelib:last_modified(Source),
digraph:add_vertex(G, Source, LastModified),
Work = fun() -> {new, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end,
rebar_parallel:pool_task(Pool, Work),
rebar_parallel:pool_task_async(Pool, Work),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool)

Expand Down
164 changes: 145 additions & 19 deletions apps/rebar/src/rebar_parallel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,38 @@
%%% Original design by Max Fedorov in the rebar compiler, then generalised
%%% and extracted here to be reused in other circumstances.
%%% It also contains an asynchronous version of the queue built with a
%%% naive pool, which allows similar semantics in worker definitions, but
%%% without demanding to know all the tasks to run ahead of time.
%%% @end
pool/4, pool/5, pool_task/2, pool_terminate/1]).
pool/4, pool/5, pool_task_async/2, pool_terminate/1]).

%% @doc Create a queue using as many workers as there are schedulers,
%% that will spread all `Task' entries among them based on whichever
%% is available first.
%% The values returned by the worker function `WorkF' for each value
%% is then passed to a `Handler' which either discards its result
%% after having done a side-effectful operation (by returning `ok')
%% as in a `lists:foreach/2' call, or returns a value that gets
%% added to an accumulator (by returning `{ok, Val}'). The handler
%% can return both types as required.
%% The accumulated list of value is in no specific order and depends
%% on how tasks were scheduled and completed.
-spec queue([Task], WorkF, WArgs, Handler, HArgs) -> [Ret] when
Task :: term(),
WorkF :: fun((Task, WArgs) -> TmpRet),
WArgs :: term(),
Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal),
HArgs :: term(),
NoRet :: ok,
AccVal :: {ok, Ret},
Ret :: term().
queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Parent = self(),
Worker = fun() -> worker(Parent, WorkF, WArgs) end,
Expand All @@ -19,25 +45,90 @@ queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
parallel_dispatch(Tasks, Pids, Handler, HArgs).

%% @doc Create a pool using as many workers as there are schedulers,
%% and for which tasks can be added by calling `pool_async_task/2'.
%% The values returned by the worker function `WorkF' for each value
%% is then passed to a `Handler' which either discards its result
%% after having done a side-effectful operation (by returning `ok')
%% as in a `lists:foreach/2' call, or returns a value that gets
%% added to an accumulator (by returning `{ok, Val}'). The handler
%% can return both types as required.
%% The accumulated list of value is in no specific order and depends
%% on how tasks were scheduled and completed, and can only
%% be obtained by calling `pool_terminate/1'.
%% The pool process is linked to its initial caller and will error
%% out via a link if any task crashes or other invalid states are found
-spec pool(WorkF, WArgs, Handler, HArgs) -> pid() when
WorkF :: fun((Task, WArgs) -> TmpRet),
Task :: term(),
WArgs :: term(),
Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal),
HArgs :: term(),
NoRet :: ok,
AccVal :: {ok, term()}.
pool(WorkF, WArgs, Handler, HArgs) ->
pool(WorkF, WArgs, Handler, HArgs, erlang:system_info(schedulers)).

pool(WorkF, WArgs, Handler, HArgs, Jobs) ->
%% @doc Create a pool using `PoolSize' workers and for which tasks can be
%% added by calling `pool_async_task/2'.
%% The values returned by the worker function `WorkF' for each value
%% is then passed to a `Handler' which either discards its result
%% after having done a side-effectful operation (by returning `ok')
%% as in a `lists:foreach/2' call, or returns a value that gets
%% added to an accumulator (by returning `{ok, Val}'). The handler
%% can return both types as required.
%% The accumulated list of value is in no specific order and depends
%% on how tasks were scheduled and completed, and can only
%% be obtained by calling `pool_terminate/1'.
%% The pool process is linked to its initial caller and will error
%% out via a link if any task crashes or other invalid states are found
-spec pool(WorkF, WArgs, Handler, HArgs, PoolSize) -> pid() when
WorkF :: fun((Task, WArgs) -> TmpRet),
Task :: term(),
WArgs :: term(),
Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal),
HArgs :: term(),
PoolSize :: pos_integer(),
NoRet :: ok,
AccVal :: {ok, term()}.
pool(WorkF, WArgs, Handler, HArgs, PoolSize) when PoolSize > 0 ->
Parent = self(),
Coordinator = spawn_link(fun() ->
Coord = self(),
Worker = fun() -> worker(Coord, WorkF, WArgs) end,
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, PoolSize)],
Parent ! pool_ready,
pool_coordinator([], [], Pids, Handler, HArgs, [], undefined)
receive pool_ready -> ok end,

pool_task(Pid, Task) ->
%% @doc Add a task to a pool.
%% This call is asynchronous and does no validation about whether the pool
%% process exists or not. If the pool has already been terminated or is
%% in the process of being terminated, the task may trigger the pool to
%% abort and error out to point out invalid usage.
-spec pool_task_async(pid(), term()) -> ok.
pool_task_async(Pid, Task) ->
Pid ! {task, Task},

%% @doc Mark the pool as terminated. At this point it will stop
%% accepting new tasks but will keep processing those that have been
%% scheduled.
%% Once all tasks are complete and workers have shut down, the
%% accumulated value will be returned.
%% Any process may terminate the pool, and the pool may only be
%% terminated once.
-spec pool_terminate(pid()) -> [term()].
pool_terminate(Pid) ->
Ref = erlang:monitor(process, Pid),
Pid ! {self(), terminate},
Expand All @@ -49,6 +140,15 @@ pool_terminate(Pid) ->

%%% INTERNAL %%%

%%% Queue implementation %%%
%% @private the queue is rather straightforward. `Targets' represents the tasks
%% yet to be run, which are sent to workers in `Pids' as they mark themselves
%% as free. When workers are empty but no tasks are left, they are shutdown.
%% Once the shutdown is complete, the result is returned.
parallel_dispatch([], [], _, _) ->
parallel_dispatch(Targets, Pids, Handler, Args) ->
Expand All @@ -75,45 +175,55 @@ parallel_dispatch(Targets, Pids, Handler, Args) ->

worker(QueuePid, F, Args) ->
QueuePid ! {ready, self()},
{task, Task} ->
QueuePid ! {result, F(Task, Args)},
worker(QueuePid, F, Args);
empty ->

%%% Pool implementation %%%
%% @private The pool supports asynchronous tasks addition, which makes it
%% significantly hairier than the task queue. It uses `Tasks' to track
%% enqueued tasks, `FreePids' to track the workers that currently do not
%% have work to do, `Pids' to track all workers (and know what remains to
%% be shut down at the end), an accumulator (`Acc') for results that must
%% be returned on-demand, and a `Report' value that is either `undefined'
%% or marks the Pid of the process calling for a report.
%% Tasks and free processes can grow individually at different times, and
%% we only demand that workers start shutting down once a `Report' entry
%% has been defined, which returns once tasks that were in flight have all
%% terminated.
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report) ->
{task, Task} when Report =/= undefined ->
?ERROR("Task added to pool after being terminated: ~p", [Task]),
{task, Task} when FreePids =:= [] ->
%% no worker is free, enqueue.
pool_coordinator([Task|Tasks], FreePids, Pids, Handler, HArgs, Acc, Report);
{task, Task} ->
%% workers are free, assign right away
[Pid|NewFree] = FreePids,
Pid ! {task, Task},
pool_coordinator(Tasks, NewFree, Pids, Handler, HArgs, Acc, Report);
{ready, _Pid} when Tasks =:= [], Report =/= undefined ->
%% And we're done
%% terminate workers async, return results if done
%% And we're done!
%% terminate workers async, and wait for their shutdown
[Pid ! empty || {Pid,_Ref} <- Pids],
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report);
{ready, Pid} when Tasks =:= [] ->
%% worker free, no tasks to run, put in the free list
pool_coordinator(Tasks, [Pid|FreePids], Pids, Handler, HArgs, Acc, Report);
{ready, Pid} ->
%% worker free, tasks are available, assign right away
[Task|NewTasks] = Tasks,
Pid ! {task, Task},
pool_coordinator(NewTasks, FreePids, Pids, Handler, HArgs, Acc, Report);
{'DOWN', Mref, _, Pid, normal} ->
%% worker terminated as expected
NewPids = lists:delete({Pid, Mref}, Pids),
NewFree = lists:delete(Pid, FreePids),
case NewPids of
[] when is_pid(Report) ->
%% shutdown complete!
Report ! {self(), Acc};
_ ->
%% still shutting down
pool_coordinator(Tasks, NewFree, NewPids, Handler, HArgs, Acc, Report)
{'DOWN', _Mref, _, _Pid, Info} ->
Expand All @@ -126,13 +236,29 @@ pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report) ->
{ok, Res} ->
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, [Res|Acc], Report)
{Caller, terminate} ->
{Caller, terminate} when Report =:= undefined ->
%% We're being asked to return results!
if Pids =:= []; % no workers somehow
length(Pids) =:= length(FreePids), Tasks =:= [] -> % All Idle
length(Pids) =:= length(FreePids), Tasks =:= [] -> % All Idle, no work to do
Caller ! {self(), Acc};
true ->
%% Still work to do
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Caller)
{Caller, terminate} when is_pid(Report) ->
?ERROR("Another process (~p) already terminates the pool, demand from ~p is invalid",
[Report, Caller]),

%%% Shared components %%%

worker(QueuePid, F, Args) ->
QueuePid ! {ready, self()},
{task, Task} ->
QueuePid ! {result, F(Task, Args)},
worker(QueuePid, F, Args);
empty ->
8 changes: 4 additions & 4 deletions apps/rebar/test/rebar_parallel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ same_results(_) ->
fun(X,_) -> {ok, X} end, []))),
P = rebar_parallel:pool(fun(X,_) -> X*2 end, [],
fun(X,_) -> {ok, X} end, []),
_ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]],
_ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]],
Expand All @@ -49,7 +49,7 @@ pool_fetcher(_) ->
Parent = self(),
P = rebar_parallel:pool(fun(X,_) -> X*2 end, [],
fun(X,_) -> {ok, X} end, []),
_ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]],
_ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]],
spawn_link(fun() -> Parent ! {res, lists:sort(rebar_parallel:pool_terminate(P))} end),
{res, X} -> ?assertEqual([2,4,6,8,10,12,14], X)
Expand All @@ -64,11 +64,11 @@ pool_misuse(_) ->
Parent = self(),
P = rebar_parallel:pool(fun(_,_) -> timer:sleep(1000) end, [],
fun(X,_) -> {ok, X} end, []),
_ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]],
_ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]],
spawn(fun() -> Parent ! ok, rebar_parallel:pool_terminate(P) end),
receive ok -> timer:sleep(100) end,
Old = process_flag(trap_exit, true),
rebar_parallel:pool_task(P, 0),
rebar_parallel:pool_task_async(P, 0),
{'EXIT', P, {{nocatch, rebar_abort}, _Stack}} ->
process_flag(trap_exit, Old)
Expand Down

