Skip to content

Commit

Permalink
Merge pull request #11672 from rabbitmq/bump-khepri-to-0.15.0
Browse files Browse the repository at this point in the history
Bump Ra from 2.13.6 to 2.14.0 + Khepri from 0.14.0 to 0.15.0
  • Loading branch information
dumbbell authored Sep 5, 2024
2 parents b9eebc4 + be9e5d8 commit 4a876db
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 73 deletions.
12 changes: 6 additions & 6 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ erlang_package.hex_package(
erlang_package.hex_package(
name = "khepri",
build_file = "@rabbitmq-server//bazel:BUILD.khepri",
sha256 = "dccfaeb3583a04722e2258911f7f906ce67f8efac80504be4923aaafae6d4e21",
version = "0.14.0",
sha256 = "3fca316af28f0a7524be01164a3e9dd484505f18887c5c2065e0db40802522d1",
version = "0.15.0",
)

erlang_package.hex_package(
name = "khepri_mnesia_migration",
build_file = "@rabbitmq-server//bazel:BUILD.khepri_mnesia_migration",
sha256 = "f56d277ca7876371615cef9c5674c78854f31cf9f26ce97fd3f4b5a65573ccc4",
version = "0.5.0",
sha256 = "c2426e113ca9901180cc141967ef81c0beaba2bf702ed1456360b6ec02280a71",
version = "0.6.0",
)

erlang_package.hex_package(
Expand Down Expand Up @@ -253,8 +253,8 @@ erlang_package.hex_package(
name = "ra",
build_file = "@rabbitmq-server//bazel:BUILD.ra",
pkg = "ra",
sha256 = "0be7645dce4a76edd4c4642d0fa69639518c72b6b60a34fc86590d1909166aeb",
version = "2.13.6",
sha256 = "1d553dd971a0b398b7af0fa8c8458dda575715ff71c65c972e9500b24039b240",
version = "2.14.0",
)

erlang_package.git_package(
Expand Down
6 changes: 1 addition & 5 deletions deps/rabbit/src/rabbit_db_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,7 @@ get_consistent_in_mnesia(Node) ->

get_consistent_in_khepri(Node) ->
Path = khepri_maintenance_path(Node),
%% FIXME: Ra consistent queries are fragile in the sense that the query
%% function may run on a remote node and the function reference or MFA may
%% not be valid on that node. That's why we force a local query for now.
%Options = #{favor => consistent},
Options = #{favor => local},
Options = #{favor => consistency},
case rabbit_khepri:get(Path, Options) of
{ok, #node_maintenance_state{status = Status}} ->
Status;
Expand Down
89 changes: 30 additions & 59 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,23 @@ setup(_) ->
friendly_name => ?RA_FRIENDLY_NAME},
case khepri:start(?RA_SYSTEM, RaServerConfig) of
{ok, ?STORE_ID} ->
wait_for_leader(),
wait_for_register_projections(),
?LOG_DEBUG(
"Khepri-based " ?RA_FRIENDLY_NAME " ready",
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok;
RetryTimeout = retry_timeout(),
case khepri_cluster:wait_for_leader(?STORE_ID, RetryTimeout) of
ok ->
wait_for_register_projections(),
?LOG_DEBUG(
"Khepri-based " ?RA_FRIENDLY_NAME " ready",
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok;
{error, timeout} ->
exit(timeout_waiting_for_leader);
{error, _} = Error ->
exit(Error)
end;
{error, _} = Error ->
exit(Error)
end.

wait_for_leader() ->
wait_for_leader(retry_timeout(), retry_limit()).

retry_timeout() ->
case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of
{ok, T} -> T;
Expand All @@ -289,25 +293,6 @@ retry_limit() ->
undefined -> 10
end.

wait_for_leader(_Timeout, 0) ->
exit(timeout_waiting_for_leader);
wait_for_leader(Timeout, Retries) ->
rabbit_log:info("Waiting for Khepri leader for ~tp ms, ~tp retries left",
[Timeout, Retries - 1]),
Options = #{timeout => Timeout,
favor => low_latency},
case khepri:exists(?STORE_ID, [], Options) of
Exists when is_boolean(Exists) ->
rabbit_log:info("Khepri leader elected"),
ok;
{error, timeout} -> %% Khepri >= 0.14.0
wait_for_leader(Timeout, Retries -1);
{error, {timeout, _ServerId}} -> %% Khepri < 0.14.0
wait_for_leader(Timeout, Retries -1);
{error, Reason} ->
throw(Reason)
end.

wait_for_register_projections() ->
wait_for_register_projections(retry_timeout(), retry_limit()).

Expand Down Expand Up @@ -940,50 +925,46 @@ cas(Path, Pattern, Data) ->
?STORE_ID, Path, Pattern, Data, ?DEFAULT_COMMAND_OPTIONS).

fold(Path, Pred, Acc) ->
khepri:fold(?STORE_ID, Path, Pred, Acc, #{favor => low_latency}).
khepri:fold(?STORE_ID, Path, Pred, Acc).

fold(Path, Pred, Acc, Options) ->
Options1 = Options#{favor => low_latency},
khepri:fold(?STORE_ID, Path, Pred, Acc, Options1).
khepri:fold(?STORE_ID, Path, Pred, Acc, Options).

foreach(Path, Pred) ->
khepri:foreach(?STORE_ID, Path, Pred, #{favor => low_latency}).
khepri:foreach(?STORE_ID, Path, Pred).

filter(Path, Pred) ->
khepri:filter(?STORE_ID, Path, Pred, #{favor => low_latency}).
khepri:filter(?STORE_ID, Path, Pred).

get(Path) ->
khepri:get(?STORE_ID, Path, #{favor => low_latency}).
khepri:get(?STORE_ID, Path).

get(Path, Options) ->
Options1 = Options#{favor => low_latency},
khepri:get(?STORE_ID, Path, Options1).
khepri:get(?STORE_ID, Path, Options).

get_many(PathPattern) ->
khepri:get_many(?STORE_ID, PathPattern, #{favor => low_latency}).
khepri:get_many(?STORE_ID, PathPattern).

adv_get(Path) ->
khepri_adv:get(?STORE_ID, Path, #{favor => low_latency}).
khepri_adv:get(?STORE_ID, Path).

adv_get_many(PathPattern) ->
khepri_adv:get_many(?STORE_ID, PathPattern, #{favor => low_latency}).
khepri_adv:get_many(?STORE_ID, PathPattern).

match(Path) ->
match(Path, #{}).

match(Path, Options) ->
Options1 = Options#{favor => low_latency},
khepri:get_many(?STORE_ID, Path, Options1).
khepri:get_many(?STORE_ID, Path, Options).

exists(Path) -> khepri:exists(?STORE_ID, Path, #{favor => low_latency}).
exists(Path) -> khepri:exists(?STORE_ID, Path).

list(Path) ->
khepri:get_many(
?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR], #{favor => low_latency}).
?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR]).

list_child_nodes(Path) ->
Options = #{props_to_return => [child_names],
favor => low_latency},
Options = #{props_to_return => [child_names]},
case khepri_adv:get_many(?STORE_ID, Path, Options) of
{ok, Result} ->
case maps:values(Result) of
Expand All @@ -997,8 +978,7 @@ list_child_nodes(Path) ->
end.

count_children(Path) ->
Options = #{props_to_return => [child_list_length],
favor => low_latency},
Options = #{props_to_return => [child_list_length]},
case khepri_adv:get_many(?STORE_ID, Path, Options) of
{ok, Map} ->
lists:sum([L || #{child_list_length := L} <- maps:values(Map)]);
Expand Down Expand Up @@ -1049,18 +1029,9 @@ transaction(Fun) ->
transaction(Fun, ReadWrite) ->
transaction(Fun, ReadWrite, #{}).

transaction(Fun, ReadWrite, Options0) ->
%% If the transaction is read-only, use the same default options we use
%% for most queries.
DefaultQueryOptions = case ReadWrite of
ro ->
#{favor => low_latency};
_ ->
#{}
end,
Options1 = maps:merge(DefaultQueryOptions, Options0),
Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options1),
case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options) of
transaction(Fun, ReadWrite, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options1) of
ok -> ok;
{ok, Result} -> Result;
{error, Reason} -> throw({error, Reason})
Expand Down
6 changes: 3 additions & 3 deletions rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ dep_credentials_obfuscation = hex 3.4.0
dep_cuttlefish = hex 3.4.0
dep_gen_batch_server = hex 0.8.8
dep_jose = hex 1.11.10
dep_khepri = hex 0.14.0
dep_khepri_mnesia_migration = hex 0.5.0
dep_khepri = hex 0.15.0
dep_khepri_mnesia_migration = hex 0.6.0
dep_prometheus = hex 4.11.0
dep_ra = hex 2.13.6
dep_ra = hex 2.14.0
dep_ranch = hex 2.1.0
dep_recon = hex 2.5.3
dep_redbug = hex 2.0.7
Expand Down

0 comments on commit 4a876db

Please sign in to comment.