diff --git a/config/sys.config b/config/sys.config index 527bd20..a10ce12 100644 --- a/config/sys.config +++ b/config/sys.config @@ -2,8 +2,7 @@ [{node, [ {primary_workers, [ pinger_worker, - node_utils_server, - node_storage_server + node_utils_server ]}, {distributed_workers, [ generic_tasks_server, @@ -42,18 +41,18 @@ ]}, {logger, [ - {level, debug} + {level, info} ]}, -% {kernel, [ -% {logger_level, info}, -% {logger, [{handler, -% default, -% logger_std_h, -% #{level => info, -% formatter => {logger_formatter, #{single_line => true}}} -% }]} -% ]}, +{kernel, [ + {logger_level, info}, + {logger, [{handler, + default, + logger_std_h, + #{level => info, + formatter => {logger_formatter, #{single_line => true}}} + }]} +]}, {lasp, [{membership, false}, {storage_backend, lasp_ets_storage_backend}, @@ -83,9 +82,9 @@ ]}, {grisp, [{drivers, [ - {spi, grisp_spi_drv}, - {gpio, grisp_gpio_drv}, - {i2c, grisp_i2c_drv} + {spi, grisp_spi_drv} + % {gpio, grisp_gpio_drv}, + % {i2c, grisp_i2c_drv} ]}, {devices, [ % {gpio1, pmod_amp2}, diff --git a/grisp/grisp_base/files/grisp.ini.mustache b/grisp/grisp_base/files/grisp.ini.mustache index 377cb3c..6e895ab 100644 --- a/grisp/grisp_base/files/grisp.ini.mustache +++ b/grisp/grisp_base/files/grisp.ini.mustache @@ -5,6 +5,6 @@ image_path = /media/mmcsd-0-0/{{release_name}}/erts-{{erts_vsn}}/bin/beam.bin args = erl.rtems -- +MHas aobf +MMmcs 5 +MHlmbcs 5240 -mode embedded -home . -pa . -root {{release_name}} -config {{release_name}}/releases/{{release_version}}/sys.config -boot {{release_name}}/releases/{{release_version}}/{{release_name}} -internal_epmd epmd_sup -kernel inetrc "./erl_inetrc" -sname {{release_name}} -setcookie MyCookie [network] -ip_self=169.254.16.4 +ip_self=169.254.16.1 wlan=enable -hostname=my_grisp_board_4 +hostname=my_grisp_board_1 diff --git a/priv/snippets/memory.snippets b/priv/snippets/memory.snippets new file mode 100644 index 0000000..737d728 --- /dev/null +++ b/priv/snippets/memory.snippets @@ -0,0 +1,32 @@ +erl -sname boo -setcookie MyCookie -remsh node@my_grisp_board_1 +PidMSF = whereis(meteo_stats). +PidCAF = whereis(convergence_acknowledgement). +recon:info(PidMSF). +recon:info(PidMSF, heap_size). +recon:info(PidCAF). +recon:info(PidCAF, heap_size). +recon:get_state(node_ping_worker). +processes(). +recon:proc_count(memory, 3). +[{X, erlang:round(recon_alloc:memory(X) / 1024)} || X <- [allocated, used, usage]]. +_GC = [erlang:garbage_collect(Proc, [{type, 'major'}]) || Proc <- processes()]. + + +lasp:query({<<"node@my_grisp_board_2">>, state_gset}). +{ok, Set} = lasp:query({<<"results">>, state_orset}). +sets:to_list(Set). + + +ets:match(node(), '$1'). + +ets:take(node(), {<<"node@my_grisp_board_2">>, state_gset}). +ets:take(Tmp, {1}). + +X = #{"node@my_grisp_board_1" => #{1 => [3,4,5], 2 => [2,4,3], 3 => [4,3,2]}, "node@my_grisp_board_2" => #{1 => [2,2,5], 2 => [2,5,2], 3 => [6,3,2]}}. + + + +TableName = list_to_atom(unicode:characters_to_list([atom_to_list(node()), "_results"], latin1)). +Tmp = ets:new(TableName, [ordered_set,named_table, public]). +ets:insert_new(Tmp, {node(),{{2,3,"444"}, {477,19,99}, #{lol => {3,6,1}}}}). +ets:take(Tmp, node()). diff --git a/rebar.config b/rebar.config index 6da3f2b..e0f2c65 100644 --- a/rebar.config +++ b/rebar.config @@ -8,6 +8,7 @@ {lasp, {git, "https://github.com/lasp-lang/lasp.git", {branch, "master"}}}, partisan, + observer_cli, % {lager, {git, "https://github.com/erlang-lager/lager.git", {branch, "master"}}} {lager, {git, "https://github.com/erlang-lager/lager.git", {branch, "master"}}}, % see issue : https://github.com/elixir-lang/elixir/issues/7825 @@ -30,13 +31,6 @@ {rebar3_elixir_compile, ".*", {git, "https://github.com/GrispLasp/rebar3_elixir_compile.git", {branch, "master"}}} ]}. -{provider_hooks, [ - {pre, [{compile, {ex, compile}}]}, - {pre, [{release, {ex, compile}}]} -]}. - - - @@ -109,6 +103,7 @@ {flow, load}, {gen_stage, load}, {numerix, load}, + {observer_cli, load}, node ]}, {sys_config, "config/sys.config"}, diff --git a/rebar.lock b/rebar.lock index c9521b9..837d6c6 100644 --- a/rebar.lock +++ b/rebar.lock @@ -19,6 +19,7 @@ {<<"lasp_support">>,{pkg,<<"lasp_support">>,<<"0.0.3">>},1}, {<<"mapz">>,{pkg,<<"mapz">>,<<"0.3.0">>},1}, {<<"numerix">>,{elixir,<<"numerix">>,<<"0.5.1">>},0}, + {<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.3.3">>},0}, {<<"plumtree">>,{pkg,<<"plumtree">>,<<"0.5.0">>},1}, {<<"quickrand">>,{pkg,<<"quickrand">>,<<"1.7.4-rc1">>},2}, {<<"rand_compat">>,{pkg,<<"rand_compat">>,<<"0.0.3">>},1}, @@ -36,6 +37,7 @@ {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, {<<"lasp_support">>, <<"C1B7E1A472037AE82C71D2D16A10B7D644A621B66AE5AFE834CECF170F2E9169">>}, {<<"mapz">>, <<"438D24746CE5A252101E00B2032EFDF7FC69EB32689D3B805DE5E6DD7F52614F">>}, + {<<"observer_cli">>, <<"50E98C89FDE44C5B797D602D893EA30F7EE1B45B4D01AEAA3EA742D6B98B1EEE">>}, {<<"plumtree">>, <<"F3521CE631AD22AC00635E79C7F046A163BD1135889CDC1D0B18AB577F76EC70">>}, {<<"quickrand">>, <<"61A9AF693C8B4FC9F838818493BCEE127AA2EE70483BE32BAD20C760C208E8BA">>}, {<<"rand_compat">>, <<"011646BC1F0B0C432FE101B816F25B9BBB74A085713CEE1DAFD2D62E9415EAD3">>}, diff --git a/src/node.app.src b/src/node.app.src index 398166b..0bc6e7b 100644 --- a/src/node.app.src +++ b/src/node.app.src @@ -6,7 +6,8 @@ {applications, [ kernel, stdlib, - sasl + sasl, + grisp % os_mon % grisp, % os_mon, diff --git a/src/node.app.src.prod b/src/node.app.src.prod index 398166b..0bc6e7b 100644 --- a/src/node.app.src.prod +++ b/src/node.app.src.prod @@ -6,7 +6,8 @@ {applications, [ kernel, stdlib, - sasl + sasl, + grisp % os_mon % grisp, % os_mon, diff --git a/src/node.hrl b/src/node.hrl index e472bbc..82e0449 100644 --- a/src/node.hrl +++ b/src/node.hrl @@ -77,7 +77,7 @@ end). -define(ALL, lists:seq(1,12) ). --define(DAN, lists:seq(1,4,1) ). +-define(DAN, lists:seq(1,3,1) ). -define(ALEX, lists:seq(4,9,1) ). -define(IGOR, lists:seq(10,12,1) ). % NEVER FORGET THIS EPIC MACRO : diff --git a/src/node_benchmark_server.erl b/src/node_benchmark_server.erl index 06de905..5967aa0 100644 --- a/src/node_benchmark_server.erl +++ b/src/node_benchmark_server.erl @@ -44,7 +44,7 @@ handle_info({benchmark_meteo_task, LoopCount}, State) -> EvaluationMode = node_config:get(evaluation_mode, grisplasp), logger:log(notice, "=== Starting meteo task benchmark in mode ~p ===~n", [EvaluationMode]), SampleCount = 5, - SampleInterval = 5000, + SampleInterval = 3000, node_generic_tasks_server:add_task({tasknav, all, fun () -> case EvaluationMode of grisplasp -> diff --git a/src/node_generic_tasks_functions_benchmark.erl b/src/node_generic_tasks_functions_benchmark.erl index fdde56f..658acbb 100644 --- a/src/node_generic_tasks_functions_benchmark.erl +++ b/src/node_generic_tasks_functions_benchmark.erl @@ -18,6 +18,7 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) -> % logger:log(notice, "Starting Meteo statistics task benchmark with Lasp on GRiSP ~n"), + Self = self(), MeteorologicalStatisticsFun = fun MSF (LoopCountRemaining, AccComputations) -> @@ -33,7 +34,8 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) -> FoldFun = fun (Elem, AccIn) when is_integer(Elem) andalso is_map(AccIn) -> timer:sleep(SampleInterval), - T = node_stream_worker:maybe_get_time(), + % T = node_stream_worker:maybe_get_time(), + T = calendar:local_time(), % T = calendar:local_time(), [Pr, Tmp] = gen_server:call(Pid, {read, alt, [press_out, temp_out], #{}}), % logger:log(notice, "Getting data from nav sensor pr: ~p tmp: ~p", [Pr, Tmp]), @@ -69,7 +71,7 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) -> if LoopCountRemaining > 1 -> MSF(LoopCountRemaining-1, NewAcc); true -> - timer:sleep(5000), % Give time to the CA process to finish receiving acks + timer:sleep(5000), % Give time to the CAF process to finish receiving acks convergence_acknowledgement ! {done, NewAcc} end end, @@ -97,26 +99,16 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) -> PidCAF = spawn(fun () -> ConvergenceAcknowledgementFun([]) end), PidMSF = spawn(fun () -> MeteorologicalStatisticsFun(LoopCount, #{}) end), register(convergence_acknowledgement, PidCAF), + register(meteo_stats, PidMSF), receive - % {Acks} -> - % logger:log(notice, "Received all acks ~p", [Acks]), - % logger:log(notice, "CRDT converged on all nodes"), - % MapFun = fun(Elem) -> - % logger:log(notice, "Elem is ~p", [Elem]), - % {From, TConverged} = Elem, - % TConvergence = TConverged - T2Computation, - % logger:log(notice, "CRDT converged on ~p after ~p ms", [From, TConvergence]), - % TConvergence - % end, - % ListConvergence = lists:map(MapFun, Acks), - % AverageConvergenceTime = average(ListConvergence), - % logger:log(notice, "Average convergence time: ~p ms", [AverageConvergenceTime]); - % {done, Computations, Acks} -> + grisp_led:color(1, blue), + grisp_led:color(2, green), logger:log(notice, "Meteo task is done, received acks and computations. Calculating computation time + convergence time..."), % logger:log(notice, "Computations: ~p - Acks: ~p", [Computations, Acks]), - AckMap = lists:foldl( + + NodesConvergenceTime = lists:foldl( fun(Ack, Acc) -> {From, TConverged, Cardinality} = Ack, {T2Computation, _} = maps:get(Cardinality, Computations), @@ -129,8 +121,62 @@ meteorological_statistics_grisplasp(LoopCount, SampleCount, SampleInterval) -> maps:put(From, #{Cardinality => ConvergenceTime}, Acc) end end , #{}, Acks), - logger:log(notice, "Ack map is ~p", [AckMap]), - logger:log(notice, "Computations map is ~p", [Computations]), + % logger:log(notice, "Nodes convergence times is ~p", [NodesConvergenceTime]), + NodesConvergenceTimeCalculations = maps:map(fun(Node, ConvergenceTimes) -> + Convergences = maps:values(ConvergenceTimes), + SD = 'Elixir.Numerix.Statistics':std_dev(Convergences), + Var = 'Elixir.Numerix.Statistics':variance(Convergences), + Avg ='Elixir.Numerix.Statistics':mean(Convergences), + {{average, Avg}, {standard_deviation, SD}, {variance, Var}} + end, NodesConvergenceTime), + + SumConvergenceTimes = lists:foldl(fun({{_, Mean},{_,_},{_,_}}, Acc) -> + Acc + Mean + end, 0, maps:values(NodesConvergenceTimeCalculations)), + MeanConvergenceTime = SumConvergenceTimes / length(maps:keys(NodesConvergenceTime)), + + Variances = lists:foldl(fun({{_, _},{_,_},{_, Var}}, Acc) -> + Acc ++ [Var] + end, [], maps:values(NodesConvergenceTimeCalculations)), + + SampleSize1 = length(maps:keys(maps:get(lists:nth(1, maps:keys(NodesConvergenceTime)), NodesConvergenceTime))), + SampleSize2 = length(maps:keys(maps:get(lists:nth(2, maps:keys(NodesConvergenceTime)), NodesConvergenceTime))), + PooledVarianceConvergence = (((SampleSize1-1)*lists:nth(1,Variances)) + ((SampleSize2-1)*lists:nth(2,Variances)))/((SampleSize1+SampleSize2)/2), + PooledStandardDeviationConvergence = math:sqrt(PooledVarianceConvergence), + ComputationFiltered = maps:map(fun(Cardinality, {T2Computation, ComputationTime}) -> + ComputationTime + end, Computations), + MeanComputationTime = 'Elixir.Numerix.Statistics':mean(maps:values(ComputationFiltered)), + StandardDeviationComputationTime = 'Elixir.Numerix.Statistics':std_dev(maps:values(ComputationFiltered)), + VarianceComputationTime = 'Elixir.Numerix.Statistics':variance(maps:values(ComputationFiltered)), + % logger:log(info, "nodes total converges averages = ~p ~n", [NodesConvergenceTimeAverage]), + + %%%%%%%%%%%%%%%%%%%%%%% + % Save results to ETS % + %%%%%%%%%%%%%%%%%%%%%%% + % TableName = list_to_atom(unicode:characters_to_list([atom_to_list(node()), "_results"], latin1)), + % Tmp = ets:new(TableName, [ordered_set, named_table, public]), + % ResultSave = case ets:insert_new(Tmp, {node(), {ComputationFiltered}, {MeanComputationTime}, {VarianceComputationTime}, {NodesConvergenceTime}, {NodesConvergenceTimeCalculations},{MeanConvergenceTime},{PooledVarianceConvergence}}) of + % true -> + % FileName = node(), + % case ets:tab2file(Tmp, FileName, [{sync, true}]) of + % ok -> + % logger:log(notice, "Saved results to SD card."), + % true = ets:delete(Tmp), + % ok; + % {error, Reason} -> + % logger:log(error, "Could not save results to SD card"), + % {error, Reason} + % end; + % false -> + % logger:log(error, "Could not insert results in ets"), + % {error, insert} + % end, + %%%%%%%%%%%%%%%%%%%%%%%% + % Save results to file % + %%%%%%%%%%%%%%%%%%%%%%%% + file:write_file(atom_to_list(node()), io_lib:fwrite("Computations: ~p ~nMean Computation Time: ~pms~nStandard Deviation Computation Time: ~p~nVariance Computation Time: ~p~nNodes Convergence Time: ~p ~nNodes Convergence Calculations: ~p ~nMean Convergence Time: ~pms~nPooled Standard Deviation Convergence Time: ~p~nPooled Variance Convergence Time: ~p~n", [ComputationFiltered, MeanComputationTime, StandardDeviationComputationTime, VarianceComputationTime, NodesConvergenceTime, NodesConvergenceTimeCalculations, MeanConvergenceTime, PooledStandardDeviationConvergence, PooledVarianceConvergence])), + timer:sleep(2000), exit(PidCAF, kill), exit(PidMSF, kill) end. diff --git a/src/node_storage_server.erl b/src/node_storage_server.erl index b9615e4..7cff13a 100644 --- a/src/node_storage_server.erl +++ b/src/node_storage_server.erl @@ -42,7 +42,7 @@ handle_info({monitor_memory_usage}, State) -> if MemUsageValue > 32 -> logger:log(info, "=== Memory usage is high, persisting global CRDT states === ~n"), persist_crdts(); - true -> logger:log(notice, "=== Usage ~p mb === ~n", [MemUsageValue]) + true -> logger:log(info, "=== Usage ~p mb === ~n", [MemUsageValue]) end, Interval = node_config:get(memcheck_interval, ?HMIN), {noreply, State, Interval}; diff --git a/src/node_storage_util.erl b/src/node_storage_util.erl index 149b090..373139a 100644 --- a/src/node_storage_util.erl +++ b/src/node_storage_util.erl @@ -381,11 +381,11 @@ gc() -> % https://blog.heroku.com/logplex-down-the-rabbit-hole % _GC = [erlang:garbage_collect(Proc, [{type, 'major'}]) || Proc <- processes()]. - _ = [logger:log(notice, "Mem = ~p ~n", [X]) || X <- mem()], + _ = [logger:log(info, "Mem = ~p ~n", [X]) || X <- mem()], _GC = [erlang:garbage_collect(Proc, [{type, 'major'}]) || Proc <- processes()], logger:log(notice, "Garbage was collected manually"), - _ = [logger:log(notice, "Mem = ~p ~n", [Y]) || Y <- mem()], + _ = [logger:log(info, "Mem = ~p ~n", [Y]) || Y <- mem()], % ok = print_alloc(). ok.