Skip to content

Commit

Permalink
Save measurements to SD card
Browse files Browse the repository at this point in the history
  • Loading branch information
oxynad committed Aug 10, 2018
1 parent 6c3632d commit e733a4b
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 50 deletions.
29 changes: 14 additions & 15 deletions config/sys.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
[{node, [
{primary_workers, [
pinger_worker,
node_utils_server,
node_storage_server
node_utils_server
]},
{distributed_workers, [
generic_tasks_server,
Expand Down Expand Up @@ -42,18 +41,18 @@
]},

{logger, [
{level, debug}
{level, info}
]},

% {kernel, [
% {logger_level, info},
% {logger, [{handler,
% default,
% logger_std_h,
% #{level => info,
% formatter => {logger_formatter, #{single_line => true}}}
% }]}
% ]},
{kernel, [
{logger_level, info},
{logger, [{handler,
default,
logger_std_h,
#{level => info,
formatter => {logger_formatter, #{single_line => true}}}
}]}
]},

{lasp, [{membership, false},
{storage_backend, lasp_ets_storage_backend},
Expand Down Expand Up @@ -83,9 +82,9 @@
]},

{grisp, [{drivers, [
{spi, grisp_spi_drv},
{gpio, grisp_gpio_drv},
{i2c, grisp_i2c_drv}
{spi, grisp_spi_drv}
% {gpio, grisp_gpio_drv},
% {i2c, grisp_i2c_drv}
]},
{devices, [
% {gpio1, pmod_amp2},
Expand Down
4 changes: 2 additions & 2 deletions grisp/grisp_base/files/grisp.ini.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ image_path = /media/mmcsd-0-0/{{release_name}}/erts-{{erts_vsn}}/bin/beam.bin
args = erl.rtems -- +MHas aobf +MMmcs 5 +MHlmbcs 5240 -mode embedded -home . -pa . -root {{release_name}} -config {{release_name}}/releases/{{release_version}}/sys.config -boot {{release_name}}/releases/{{release_version}}/{{release_name}} -internal_epmd epmd_sup -kernel inetrc "./erl_inetrc" -sname {{release_name}} -setcookie MyCookie

[network]
ip_self=169.254.16.4
ip_self=169.254.16.1
wlan=enable
hostname=my_grisp_board_4
hostname=my_grisp_board_1
32 changes: 32 additions & 0 deletions priv/snippets/memory.snippets
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
erl -sname boo -setcookie MyCookie -remsh node@my_grisp_board_1
PidMSF = whereis(meteo_stats).
PidCAF = whereis(convergence_acknowledgement).
recon:info(PidMSF).
recon:info(PidMSF, heap_size).
recon:info(PidCAF).
recon:info(PidCAF, heap_size).
recon:get_state(node_ping_worker).
processes().
recon:proc_count(memory, 3).
[{X, erlang:round(recon_alloc:memory(X) / 1024)} || X <- [allocated, used, usage]].
_GC = [erlang:garbage_collect(Proc, [{type, 'major'}]) || Proc <- processes()].


lasp:query({<<"node@my_grisp_board_2">>, state_gset}).
{ok, Set} = lasp:query({<<"results">>, state_orset}).
sets:to_list(Set).


ets:match(node(), '$1').

ets:take(node(), {<<"node@my_grisp_board_2">>, state_gset}).
ets:take(Tmp, {1}).

X = #{"node@my_grisp_board_1" => #{1 => [3,4,5], 2 => [2,4,3], 3 => [4,3,2]}, "node@my_grisp_board_2" => #{1 => [2,2,5], 2 => [2,5,2], 3 => [6,3,2]}}.



TableName = list_to_atom(unicode:characters_to_list([atom_to_list(node()), "_results"], latin1)).
Tmp = ets:new(TableName, [ordered_set,named_table, public]).
ets:insert_new(Tmp, {node(),{{2,3,"444"}, {477,19,99}, #{lol => {3,6,1}}}}).
ets:take(Tmp, node()).
9 changes: 2 additions & 7 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{lasp, {git, "https://github.com/lasp-lang/lasp.git", {branch, "master"}}},

partisan,
observer_cli,
% {lager, {git, "https://github.com/erlang-lager/lager.git", {branch, "master"}}}
{lager, {git, "https://github.com/erlang-lager/lager.git", {branch, "master"}}},
% see issue : https://github.com/elixir-lang/elixir/issues/7825
Expand All @@ -30,13 +31,6 @@
{rebar3_elixir_compile, ".*", {git, "https://github.com/GrispLasp/rebar3_elixir_compile.git", {branch, "master"}}}
]}.

{provider_hooks, [
{pre, [{compile, {ex, compile}}]},
{pre, [{release, {ex, compile}}]}
]}.






Expand Down Expand Up @@ -109,6 +103,7 @@
{flow, load},
{gen_stage, load},
{numerix, load},
{observer_cli, load},
node
]},
{sys_config, "config/sys.config"},
Expand Down
2 changes: 2 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
{<<"lasp_support">>,{pkg,<<"lasp_support">>,<<"0.0.3">>},1},
{<<"mapz">>,{pkg,<<"mapz">>,<<"0.3.0">>},1},
{<<"numerix">>,{elixir,<<"numerix">>,<<"0.5.1">>},0},
{<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.3.3">>},0},
{<<"plumtree">>,{pkg,<<"plumtree">>,<<"0.5.0">>},1},
{<<"quickrand">>,{pkg,<<"quickrand">>,<<"1.7.4-rc1">>},2},
{<<"rand_compat">>,{pkg,<<"rand_compat">>,<<"0.0.3">>},1},
Expand All @@ -36,6 +37,7 @@
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"lasp_support">>, <<"C1B7E1A472037AE82C71D2D16A10B7D644A621B66AE5AFE834CECF170F2E9169">>},
{<<"mapz">>, <<"438D24746CE5A252101E00B2032EFDF7FC69EB32689D3B805DE5E6DD7F52614F">>},
{<<"observer_cli">>, <<"50E98C89FDE44C5B797D602D893EA30F7EE1B45B4D01AEAA3EA742D6B98B1EEE">>},
{<<"plumtree">>, <<"F3521CE631AD22AC00635E79C7F046A163BD1135889CDC1D0B18AB577F76EC70">>},
{<<"quickrand">>, <<"61A9AF693C8B4FC9F838818493BCEE127AA2EE70483BE32BAD20C760C208E8BA">>},
{<<"rand_compat">>, <<"011646BC1F0B0C432FE101B816F25B9BBB74A085713CEE1DAFD2D62E9415EAD3">>},
Expand Down
3 changes: 2 additions & 1 deletion src/node.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
{applications, [
kernel,
stdlib,
sasl
sasl,
grisp
% os_mon
% grisp,
% os_mon,
Expand Down
3 changes: 2 additions & 1 deletion src/node.app.src.prod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
{applications, [
kernel,
stdlib,
sasl
sasl,
grisp
% os_mon
% grisp,
% os_mon,
Expand Down
2 changes: 1 addition & 1 deletion src/node.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ end).


-define(ALL, lists:seq(1,12) ).
-define(DAN, lists:seq(1,4,1) ).
-define(DAN, lists:seq(1,3,1) ).
-define(ALEX, lists:seq(4,9,1) ).
-define(IGOR, lists:seq(10,12,1) ).
% NEVER FORGET THIS EPIC MACRO :
Expand Down
2 changes: 1 addition & 1 deletion src/node_benchmark_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ handle_info({benchmark_meteo_task, LoopCount}, State) ->
EvaluationMode = node_config:get(evaluation_mode, grisplasp),
logger:log(notice, "=== Starting meteo task benchmark in mode ~p ===~n", [EvaluationMode]),
SampleCount = 5,
SampleInterval = 5000,
SampleInterval = 3000,
node_generic_tasks_server:add_task({tasknav, all, fun () ->
case EvaluationMode of
grisplasp ->
Expand Down
84 changes: 65 additions & 19 deletions src/node_generic_tasks_functions_benchmark.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) ->

% logger:log(notice, "Starting Meteo statistics task benchmark with Lasp on GRiSP ~n"),


Self = self(),
MeteorologicalStatisticsFun = fun MSF (LoopCountRemaining, AccComputations) ->

Expand All @@ -33,7 +34,8 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) ->
FoldFun = fun
(Elem, AccIn) when is_integer(Elem) andalso is_map(AccIn) ->
timer:sleep(SampleInterval),
T = node_stream_worker:maybe_get_time(),
% T = node_stream_worker:maybe_get_time(),
T = calendar:local_time(),
% T = calendar:local_time(),
[Pr, Tmp] = gen_server:call(Pid, {read, alt, [press_out, temp_out], #{}}),
% logger:log(notice, "Getting data from nav sensor pr: ~p tmp: ~p", [Pr, Tmp]),
Expand Down Expand Up @@ -69,7 +71,7 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) ->
if LoopCountRemaining > 1 ->
MSF(LoopCountRemaining-1, NewAcc);
true ->
timer:sleep(5000), % Give time to the CA process to finish receiving acks
timer:sleep(5000), % Give time to the CAF process to finish receiving acks
convergence_acknowledgement ! {done, NewAcc}
end
end,
Expand Down Expand Up @@ -97,26 +99,16 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) ->
PidCAF = spawn(fun () -> ConvergenceAcknowledgementFun([]) end),
PidMSF = spawn(fun () -> MeteorologicalStatisticsFun(LoopCount, #{}) end),
register(convergence_acknowledgement, PidCAF),
register(meteo_stats, PidMSF),

receive
% {Acks} ->
% logger:log(notice, "Received all acks ~p", [Acks]),
% logger:log(notice, "CRDT converged on all nodes"),
% MapFun = fun(Elem) ->
% logger:log(notice, "Elem is ~p", [Elem]),
% {From, TConverged} = Elem,
% TConvergence = TConverged - T2Computation,
% logger:log(notice, "CRDT converged on ~p after ~p ms", [From, TConvergence]),
% TConvergence
% end,
% ListConvergence = lists:map(MapFun, Acks),
% AverageConvergenceTime = average(ListConvergence),
% logger:log(notice, "Average convergence time: ~p ms", [AverageConvergenceTime]);
%
{done, Computations, Acks} ->
grisp_led:color(1, blue),
grisp_led:color(2, green),
logger:log(notice, "Meteo task is done, received acks and computations. Calculating computation time + convergence time..."),
% logger:log(notice, "Computations: ~p - Acks: ~p", [Computations, Acks]),
AckMap = lists:foldl(

NodesConvergenceTime = lists:foldl(
fun(Ack, Acc) ->
{From, TConverged, Cardinality} = Ack,
{T2Computation, _} = maps:get(Cardinality, Computations),
Expand All @@ -129,8 +121,62 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) ->
maps:put(From, #{Cardinality => ConvergenceTime}, Acc)
end
end , #{}, Acks),
logger:log(notice, "Ack map is ~p", [AckMap]),
logger:log(notice, "Computations map is ~p", [Computations]),
% logger:log(notice, "Nodes convergence times is ~p", [NodesConvergenceTime]),
NodesConvergenceTimeCalculations = maps:map(fun(Node, ConvergenceTimes) ->
Convergences = maps:values(ConvergenceTimes),
SD = 'Elixir.Numerix.Statistics':std_dev(Convergences),
Var = 'Elixir.Numerix.Statistics':variance(Convergences),
Avg ='Elixir.Numerix.Statistics':mean(Convergences),
{{average, Avg}, {standard_deviation, SD}, {variance, Var}}
end, NodesConvergenceTime),

SumConvergenceTimes = lists:foldl(fun({{_, Mean},{_,_},{_,_}}, Acc) ->
Acc + Mean
end, 0, maps:values(NodesConvergenceTimeCalculations)),
MeanConvergenceTime = SumConvergenceTimes / length(maps:keys(NodesConvergenceTime)),

Variances = lists:foldl(fun({{_, _},{_,_},{_, Var}}, Acc) ->
Acc ++ [Var]
end, [], maps:values(NodesConvergenceTimeCalculations)),

SampleSize1 = length(maps:keys(maps:get(lists:nth(1, maps:keys(NodesConvergenceTime)), NodesConvergenceTime))),
SampleSize2 = length(maps:keys(maps:get(lists:nth(2, maps:keys(NodesConvergenceTime)), NodesConvergenceTime))),
PooledVarianceConvergence = (((SampleSize1-1)*lists:nth(1,Variances)) + ((SampleSize2-1)*lists:nth(2,Variances)))/((SampleSize1+SampleSize2)/2),
PooledStandardDeviationConvergence = math:sqrt(PooledVarianceConvergence),
ComputationFiltered = maps:map(fun(Cardinality, {T2Computation, ComputationTime}) ->
ComputationTime
end, Computations),
MeanComputationTime = 'Elixir.Numerix.Statistics':mean(maps:values(ComputationFiltered)),
StandardDeviationComputationTime = 'Elixir.Numerix.Statistics':std_dev(maps:values(ComputationFiltered)),
VarianceComputationTime = 'Elixir.Numerix.Statistics':variance(maps:values(ComputationFiltered)),
% logger:log(info, "nodes total converges averages = ~p ~n", [NodesConvergenceTimeAverage]),

%%%%%%%%%%%%%%%%%%%%%%%
% Save results to ETS %
%%%%%%%%%%%%%%%%%%%%%%%
% TableName = list_to_atom(unicode:characters_to_list([atom_to_list(node()), "_results"], latin1)),
% Tmp = ets:new(TableName, [ordered_set, named_table, public]),
% ResultSave = case ets:insert_new(Tmp, {node(), {ComputationFiltered}, {MeanComputationTime}, {VarianceComputationTime}, {NodesConvergenceTime}, {NodesConvergenceTimeCalculations},{MeanConvergenceTime},{PooledVarianceConvergence}}) of
% true ->
% FileName = node(),
% case ets:tab2file(Tmp, FileName, [{sync, true}]) of
% ok ->
% logger:log(notice, "Saved results to SD card."),
% true = ets:delete(Tmp),
% ok;
% {error, Reason} ->
% logger:log(error, "Could not save results to SD card"),
% {error, Reason}
% end;
% false ->
% logger:log(error, "Could not insert results in ets"),
% {error, insert}
% end,
%%%%%%%%%%%%%%%%%%%%%%%%
% Save results to file %
%%%%%%%%%%%%%%%%%%%%%%%%
file:write_file(atom_to_list(node()), io_lib:fwrite("Computations: ~p ~nMean Computation Time: ~pms~nStandard Deviation Computation Time: ~p~nVariance Computation Time: ~p~nNodes Convergence Time: ~p ~nNodes Convergence Calculations: ~p ~nMean Convergence Time: ~pms~nPooled Standard Deviation Convergence Time: ~p~nPooled Variance Convergence Time: ~p~n", [ComputationFiltered, MeanComputationTime, StandardDeviationComputationTime, VarianceComputationTime, NodesConvergenceTime, NodesConvergenceTimeCalculations, MeanConvergenceTime, PooledStandardDeviationConvergence, PooledVarianceConvergence])),
timer:sleep(2000),
exit(PidCAF, kill),
exit(PidMSF, kill)
end.
Expand Down
2 changes: 1 addition & 1 deletion src/node_storage_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ handle_info({monitor_memory_usage}, State) ->
if MemUsageValue > 32 ->
logger:log(info, "=== Memory usage is high, persisting global CRDT states === ~n"),
persist_crdts();
true -> logger:log(notice, "=== Usage ~p mb === ~n", [MemUsageValue])
true -> logger:log(info, "=== Usage ~p mb === ~n", [MemUsageValue])
end,
Interval = node_config:get(memcheck_interval, ?HMIN),
{noreply, State, Interval};
Expand Down
4 changes: 2 additions & 2 deletions src/node_storage_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,11 @@ gc() ->
% https://blog.heroku.com/logplex-down-the-rabbit-hole
% _GC = [erlang:garbage_collect(Proc, [{type, 'major'}]) || Proc <- processes()].

_ = [logger:log(notice, "Mem = ~p ~n", [X]) || X <- mem()],
_ = [logger:log(info, "Mem = ~p ~n", [X]) || X <- mem()],
_GC = [erlang:garbage_collect(Proc, [{type, 'major'}]) || Proc <- processes()],

logger:log(notice, "Garbage was collected manually"),
_ = [logger:log(notice, "Mem = ~p ~n", [Y]) || Y <- mem()],
_ = [logger:log(info, "Mem = ~p ~n", [Y]) || Y <- mem()],
% ok = print_alloc().
ok.

Expand Down

0 comments on commit e733a4b

Please sign in to comment.