Skip to content

Commit

Permalink
Add rebar_parallel pool, use in DAG scans
Browse files Browse the repository at this point in the history
This addresses #2767 by creating
a pool mechanism in rebar_parallel that keeps as similar of an interface
as possible as the queue mechanism, with the one caveat that it allows
the asynchronous creation of tasks rather than requiring them all at
start time.

The mechanism is not tested super deeply, which is probably a mistake,
but I wanted to get a reviewable PR first.

The mechanism is also added to the rebar_compiler_dag module to cover
use cases that were handled by spawning an unbounded number of processes
before, which would cause problem with low file descriptors being
allocated and lots of files being used and open in parallel. The pool
mechanism puts an upper bound on processing but also on resource usage.

So this PR may also come with a performance regression, and if so we'd
want to override the default 1-per-scheduler pool options to use a lot
more and hit a middleground in performance vs. resource usage.
  • Loading branch information
ferd committed Jan 23, 2023
1 parent 69a5898 commit 16b6f35
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 53 deletions.
96 changes: 44 additions & 52 deletions apps/rebar/src/rebar_compiler_dag.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,46 +132,41 @@ filter_prefix(G, [{App, Out} | AppTail] = AppPaths, [File | FTail]) ->
filter_prefix(G, AppPaths, FTail)
end.

finalise_populate_sources(_G, _InDirs, Waiting) when Waiting =:= #{} ->
finalise_populate_sources(G, InDirs, Pool) ->
Res = rebar_parallel:pool_terminate(Pool),
finalise_populate_sources_(G, InDirs, Res).

finalise_populate_sources_(_G, _InDirs, []) ->
ok;
finalise_populate_sources(G, InDirs, Waiting) ->
%% wait for all deps to complete
receive
{deps, Pid, AbsIncls} ->
{Status, Source} = maps:get(Pid, Waiting),
%% the file hasn't been visited yet; set it to existing, but with
%% a last modified value that's null so it gets updated to something new.
[digraph:add_vertex(G, Src, 0) || Src <- AbsIncls,
digraph:vertex(G, Src) =:= false],
%% drop edges from deps that aren't included!
[digraph:del_edge(G, Edge) || Status == old,
Edge <- digraph:out_edges(G, Source),
{_, _Src, Path, _Label} <- [digraph:edge(G, Edge)],
not lists:member(Path, AbsIncls)],
%% Add the rest
[digraph:add_edge(G, Source, Incl) || Incl <- AbsIncls],
%% mark the digraph dirty when there is any change in
%% dependencies, for any application in the project
mark_dirty(G),
finalise_populate_sources(G, InDirs, Waiting);
{'DOWN', _MRef, process, Pid, normal} ->
finalise_populate_sources(G, InDirs, maps:remove(Pid, Waiting));
{'DOWN', _MRef, process, Pid, Reason} ->
{_Status, Source} = maps:get(Pid, Waiting),
?ERROR("Failed to get dependencies for ~s~n~p", [Source, Reason]),
?ABORT
end.
finalise_populate_sources_(G, InDirs, [{Status, {deps, Source, AbsIncls}}|Acc]) ->
%% the file hasn't been visited yet; set it to existing, but with
%% a last modified value that's null so it gets updated to something new.
[digraph:add_vertex(G, Src, 0) || Src <- AbsIncls,
digraph:vertex(G, Src) =:= false],
%% drop edges from deps that aren't included!
[digraph:del_edge(G, Edge) || Status == old,
Edge <- digraph:out_edges(G, Source),
{_, _Src, Path, _Label} <- [digraph:edge(G, Edge)],
not lists:member(Path, AbsIncls)],
%% Add the rest
[digraph:add_edge(G, Source, Incl) || Incl <- AbsIncls],
%% mark the digraph dirty when there is any change in
%% dependencies, for any application in the project
mark_dirty(G),
finalise_populate_sources_(G, InDirs, Acc).

%% @doc this function scans all the source files found and looks into
%% all the `InDirs' for deps (other source files, or files that aren't source
%% but still returned by the compiler module) that are related
%% to them.
populate_sources(G, Compiler, InDirs, Sources, DepOpts) ->
populate_sources(G, Compiler, InDirs, Sources, DepOpts, #{}).
Pool = rebar_parallel:pool(fun erlang:apply/2, [],
fun(Res, _) -> {ok, Res} end, []),
populate_sources(G, Compiler, InDirs, Sources, DepOpts, Pool).

populate_sources(G, _Compiler, InDirs, [], _DepOpts, Waiting) ->
finalise_populate_sources(G, InDirs, Waiting);
populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Waiting) ->
populate_sources(G, _Compiler, InDirs, [], _DepOpts, Pool) ->
finalise_populate_sources(G, InDirs, Pool);
populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Pool) ->
case digraph:vertex(G, Source) of
{_, LastUpdated} ->
case filelib:last_modified(Source) of
Expand All @@ -180,19 +175,21 @@ populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Waiting) ->
%% from the graph.
digraph:del_vertex(G, Source),
mark_dirty(G),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting);
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool);
LastModified when LastUpdated < LastModified ->
digraph:add_vertex(G, Source, LastModified),
Worker = prepopulate_deps(Compiler, InDirs, Source, DepOpts, self()),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting#{Worker => {old, Source}});
Work = fun() -> {old, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end,
rebar_parallel:pool_task(Pool, Work),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool);
_ -> % unchanged
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting)
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool)
end;
false ->
LastModified = filelib:last_modified(Source),
digraph:add_vertex(G, Source, LastModified),
Worker = prepopulate_deps(Compiler, InDirs, Source, DepOpts, self()),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting#{Worker => {new, Source}})
Work = fun() -> {new, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end,
rebar_parallel:pool_task(Pool, Work),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool)
end.

%% @doc Scan all files in the digraph that are seen as dependencies, but are
Expand Down Expand Up @@ -450,20 +447,15 @@ maybe_rm_vertex(G, Source) ->
%% mark its timestamp to 0, which means we have no info on it.
%% Source files will be covered at a later point in their own scan, and
%% non-source files are going to be covered by `populate_deps/3'.
prepopulate_deps(Compiler, InDirs, Source, DepOpts, Control) ->
{Worker, _MRef} = spawn_monitor(
fun () ->
SourceDir = filename:dirname(Source),
AbsIncls = case erlang:function_exported(Compiler, dependencies, 4) of
false ->
Compiler:dependencies(Source, SourceDir, InDirs);
true ->
Compiler:dependencies(Source, SourceDir, InDirs, DepOpts)
end,
Control ! {deps, self(), AbsIncls}
end
),
Worker.
prepopulate_deps(Compiler, InDirs, Source, DepOpts) ->
SourceDir = filename:dirname(Source),
AbsIncls = case erlang:function_exported(Compiler, dependencies, 4) of
false ->
Compiler:dependencies(Source, SourceDir, InDirs);
true ->
Compiler:dependencies(Source, SourceDir, InDirs, DepOpts)
end,
{deps, Source, AbsIncls}.

%% check that a dep file is up to date
refresh_dep(_G, {artifact, _}) ->
Expand Down
84 changes: 83 additions & 1 deletion apps/rebar/src/rebar_parallel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
%%% and extracted here to be reused in other circumstances.
%%% @end
-module(rebar_parallel).
-export([queue/5]).
-export([queue/5,
pool/4, pool/5, pool_task/2, pool_terminate/1]).
-include("rebar.hrl").

queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Expand All @@ -18,6 +19,36 @@ queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
parallel_dispatch(Tasks, Pids, Handler, HArgs).

pool(WorkF, WArgs, Handler, HArgs) ->
pool(WorkF, WArgs, Handler, HArgs, erlang:system_info(schedulers)).

pool(WorkF, WArgs, Handler, HArgs, Jobs) ->
Parent = self(),
Coordinator = spawn_link(fun() ->
Coord = self(),
Worker = fun() -> worker(Coord, WorkF, WArgs) end,
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
Parent ! pool_ready,
pool_coordinator([], [], Pids, Handler, HArgs, [], undefined)
end),
receive pool_ready -> ok end,
Coordinator.

pool_task(Pid, Task) ->
Pid ! {task, Task},
ok.

pool_terminate(Pid) ->
Ref = erlang:monitor(process, Pid),
Pid ! {self(), terminate},
receive
{Pid, Res} ->
erlang:demonitor(Ref, [flush]),
Res;
{'DOWN', Ref, process, Pid, Info} ->
error(Info)
end.

parallel_dispatch([], [], _, _) ->
[];
parallel_dispatch(Targets, Pids, Handler, Args) ->
Expand Down Expand Up @@ -54,3 +85,54 @@ worker(QueuePid, F, Args) ->
ok
end.

pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report) ->
receive
{task, Task} when Report =/= undefined ->
?ERROR("Task added to pool after being terminated: ~p", [Task]),
?ABORT;
{task, Task} when FreePids =:= [] ->
pool_coordinator([Task|Tasks], FreePids, Pids, Handler, HArgs, Acc, Report);
{task, Task} ->
[Pid|NewFree] = FreePids,
Pid ! {task, Task},
pool_coordinator(Tasks, NewFree, Pids, Handler, HArgs, Acc, Report);
{ready, _Pid} when Tasks =:= [], Report =/= undefined ->
%% And we're done
%% terminate workers async, return results if done
[Pid ! empty || {Pid,_Ref} <- Pids],
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report);
{ready, Pid} when Tasks =:= [] ->
pool_coordinator(Tasks, [Pid|FreePids], Pids, Handler, HArgs, Acc, Report);
{ready, Pid} ->
[Task|NewTasks] = Tasks,
Pid ! {task, Task},
pool_coordinator(NewTasks, FreePids, Pids, Handler, HArgs, Acc, Report);
{'DOWN', Mref, _, Pid, normal} ->
NewPids = lists:delete({Pid, Mref}, Pids),
NewFree = lists:delete(Pid, FreePids),
case {NewPids, NewFree} of
{[], []} when is_pid(Report) ->
Report ! {self(), Acc};
{_, _} ->
pool_coordinator(Tasks, NewFree, NewPids, Handler, HArgs, Acc, Report)
end;
{'DOWN', _Mref, _, _Pid, Info} ->
?ERROR("Task failed: ~p", [Info]),
?ABORT;
{result, Result} ->
case Handler(Result, HArgs) of
ok ->
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report);
{ok, Res} ->
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, [Res|Acc], Report)
end;
{Caller, terminate} ->
if Pids =:= []; % no workers somehow
length(Pids) =:= length(FreePids), Tasks =:= [] -> % All Idle
Caller ! {self(), Acc};
true ->
%% Still work to do
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Caller)
end
end.

80 changes: 80 additions & 0 deletions apps/rebar/test/rebar_parallel_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
-module(rebar_parallel_SUITE).
-compile(export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

all() -> [empty_set, same_results, pool_fetcher, pool_misuse].

empty_set() ->
[{doc, "Running with null tasks still works"}].
empty_set(_) ->
?assertEqual([],
rebar_parallel:queue([],
fun(X,_) -> X end, [arg],
fun(_,_) -> ok end, [arg])),
?assertEqual([],
rebar_parallel:queue([],
fun(X,_) -> X end, [arg],
fun(X,_) -> {ok, X} end, [arg])),
P1 = rebar_parallel:pool(fun(X,_) -> X end, [arg],
fun(_,_) -> ok end, [arg]),
?assertEqual([],
rebar_parallel:pool_terminate(P1)),
P2 = rebar_parallel:pool(fun(X,_) -> X end, [arg],
fun(X,_) -> {ok,X} end, [arg]),
?assertEqual([],
rebar_parallel:pool_terminate(P2)),
ok.

same_results() ->
[{doc, "The two parallel methods can be equivalent but the pool can "
"be used to do asynchronous task creation"}].
same_results(_) ->
?assertEqual([2,4,6,8,10,12,14],
lists:sort(
rebar_parallel:queue([1,2,3,4,5,6,7],
fun(X,_) -> X*2 end, [],
fun(X,_) -> {ok, X} end, []))),
P = rebar_parallel:pool(fun(X,_) -> X*2 end, [],
fun(X,_) -> {ok, X} end, []),
_ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]],
?assertEqual([2,4,6,8,10,12,14],
lists:sort(rebar_parallel:pool_terminate(P))),
ok.

pool_fetcher() ->
[{doc, "The fetcher from a pool can be from a different process "
"and the other one will get an error."}].
pool_fetcher(_) ->
Parent = self(),
P = rebar_parallel:pool(fun(X,_) -> X*2 end, [],
fun(X,_) -> {ok, X} end, []),
_ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]],
spawn_link(fun() -> Parent ! {res, lists:sort(rebar_parallel:pool_terminate(P))} end),
receive
{res, X} -> ?assertEqual([2,4,6,8,10,12,14], X)
after 500 -> error(timeout)
end,
ok.

pool_misuse() ->
[{doc, "Using the pool for tasks after it is terminated but before "
"it returns, you get a crash even if it's async"}].
pool_misuse(_) ->
Parent = self(),
P = rebar_parallel:pool(fun(_,_) -> timer:sleep(1000) end, [],
fun(X,_) -> {ok, X} end, []),
_ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]],
spawn(fun() -> Parent ! ok, rebar_parallel:pool_terminate(P) end),
receive ok -> timer:sleep(100) end,
Old = process_flag(trap_exit, true),
rebar_parallel:pool_task(P, 0),
receive
{'EXIT', P, {{nocatch, rebar_abort}, _Stack}} ->
process_flag(trap_exit, Old)
after 1000 ->
process_flag(trap_exit, Old),
error(no_abort)
end,
ok.

0 comments on commit 16b6f35

Please sign in to comment.