diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index d88ab2efcd79..3939ab11aef4 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -958,6 +958,14 @@ rabbitmq_suite(
],
)
+rabbitmq_suite(
+ name = "unit_cluster_formation_sort_nodes_SUITE",
+ size = "small",
+ deps = [
+ "@meck//:erlang_app",
+ ],
+)
+
rabbitmq_suite(
name = "unit_collections_SUITE",
size = "small",
diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl
index 207bb4474a5b..13ff05c6a2f6 100644
--- a/deps/rabbit/app.bzl
+++ b/deps/rabbit/app.bzl
@@ -1735,6 +1735,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
+ erlang_bytecode(
+ name = "unit_cluster_formation_sort_nodes_SUITE_beam_files",
+ testonly = True,
+ srcs = ["test/unit_cluster_formation_sort_nodes_SUITE.erl"],
+ outs = ["test/unit_cluster_formation_sort_nodes_SUITE.beam"],
+ app_name = "rabbit",
+ erlc_opts = "//:test_erlc_opts",
+ )
erlang_bytecode(
name = "unit_collections_SUITE_beam_files",
testonly = True,
diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl
index f377756d9e94..2dafb7a73877 100644
--- a/deps/rabbit/src/rabbit_db.erl
+++ b/deps/rabbit/src/rabbit_db.erl
@@ -51,7 +51,6 @@ init() ->
#{domain => ?RMQLOG_DOMAIN_DB}),
ensure_dir_exists(),
- rabbit_peer_discovery:log_configured_backend(),
rabbit_peer_discovery:maybe_init(),
pre_init(IsVirgin),
@@ -66,8 +65,8 @@ init() ->
"DB: initialization successeful",
#{domain => ?RMQLOG_DOMAIN_DB}),
+ rabbit_peer_discovery:maybe_register(),
init_finished(),
- post_init(IsVirgin),
ok;
Error ->
@@ -82,12 +81,6 @@ pre_init(IsVirgin) ->
OtherMembers = rabbit_nodes:nodes_excl_me(Members),
rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin).
-post_init(false = _IsVirgin) ->
- rabbit_peer_discovery:maybe_register();
-post_init(true = _IsVirgin) ->
- %% Registration handled by rabbit_peer_discovery.
- ok.
-
init_using_mnesia() ->
?LOG_DEBUG(
"DB: initialize Mnesia",
diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl
index fa98deac504b..1f7d2452f212 100644
--- a/deps/rabbit/src/rabbit_mnesia.erl
+++ b/deps/rabbit/src/rabbit_mnesia.erl
@@ -114,13 +114,24 @@
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
+ %% If this node is virgin, we call peer discovery to see if this node
+ %% should start as a standalone node or if it should join a cluster.
case is_virgin_node() of
true ->
rabbit_log:info("Node database directory at ~ts is empty. "
"Assuming we need to join an existing cluster or initialise from scratch...",
[dir()]),
- rabbit_peer_discovery:maybe_create_cluster(
- fun create_cluster_callback/2);
+ rabbit_peer_discovery:sync_desired_cluster();
+ false ->
+ ok
+ end,
+ %% Peer discovery may have been a no-op if it decided that all other nodes
+ %% should join this one. Therefore, we need to look at if this node is
+ %% still virgin and finish our init of Mnesia accordingly. In particular,
+ %% this second part crates all our Mnesia tables.
+ case is_virgin_node() of
+ true ->
+ init_db_and_upgrade([node()], disc, true, _Retry = true);
false ->
NodeType = node_type(),
case is_node_type_permitted(NodeType) of
@@ -141,23 +152,6 @@ init() ->
ok = rabbit_node_monitor:global_sync(),
ok.
-create_cluster_callback(none, NodeType) ->
- DiscNodes = [node()],
- NodeType1 = case is_node_type_permitted(NodeType) of
- false -> disc;
- true -> NodeType
- end,
- init_db_and_upgrade(DiscNodes, NodeType1, true, _Retry = true),
- ok;
-create_cluster_callback(RemoteNode, NodeType) ->
- {ok, {_, DiscNodes, _}} = discover_cluster0(RemoteNode),
- NodeType1 = case is_node_type_permitted(NodeType) of
- false -> disc;
- true -> NodeType
- end,
- init_db_and_upgrade(DiscNodes, NodeType1, true, _Retry = true),
- ok.
-
%% Make the node join a cluster. The node will be reset automatically
%% before we actually cluster it. The nodes provided will be used to
%% find out about the nodes in the cluster.
diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl
index a5ba1e6d6007..d341115fcd0f 100644
--- a/deps/rabbit/src/rabbit_peer_discovery.erl
+++ b/deps/rabbit/src/rabbit_peer_discovery.erl
@@ -8,6 +8,7 @@
-module(rabbit_peer_discovery).
-include_lib("kernel/include/logger.hrl").
+-include_lib("stdlib/include/assert.hrl").
-include_lib("rabbit_common/include/logging.hrl").
@@ -15,21 +16,24 @@
%% API
%%
--export([maybe_init/0, maybe_create_cluster/1, discover_cluster_nodes/0,
- backend/0, node_type/0,
- normalize/1, format_discovered_nodes/1, log_configured_backend/0,
- register/0, unregister/0, maybe_register/0, maybe_unregister/0,
- lock/0, unlock/1, discovery_retries/0]).
--export([append_node_prefix/1, node_prefix/0, locking_retry_timeout/0,
- lock_acquisition_failure_mode/0]).
+-export([maybe_init/0,
+ sync_desired_cluster/0,
+ maybe_register/0,
+ maybe_unregister/0,
+ discover_cluster_nodes/0]).
+-export([backend/0,
+ node_type/0,
+ normalize/1,
+ append_node_prefix/1,
+ node_prefix/0]).
-ifdef(TEST).
--export([maybe_create_cluster/3]).
+-export([sort_nodes_and_props/1,
+ join_selected_node/3]).
-endif.
--type create_cluster_callback() :: fun((node(),
- rabbit_db_cluster:node_type())
- -> ok).
+-type backend() :: atom().
+-type node_and_props() :: {node(), [node()], non_neg_integer()}.
-define(DEFAULT_BACKEND, rabbit_peer_discovery_classic_config).
@@ -41,43 +45,36 @@
-define(DEFAULT_PREFIX, "rabbit").
%% default discovery retries and interval.
--define(DEFAULT_DISCOVERY_RETRY_COUNT, 10).
--define(DEFAULT_DISCOVERY_RETRY_INTERVAL_MS, 500).
+-define(DEFAULT_DISCOVERY_RETRY_COUNT, 30).
+-define(DEFAULT_DISCOVERY_RETRY_INTERVAL_MS, 1000).
-define(NODENAME_PART_SEPARATOR, "@").
--spec backend() -> atom().
+-define(PT_PEER_DISC_BACKEND, {?MODULE, backend}).
-backend() ->
- case application:get_env(rabbit, cluster_formation) of
- {ok, Proplist} ->
- proplists:get_value(peer_discovery_backend, Proplist, ?DEFAULT_BACKEND);
- undefined ->
- ?DEFAULT_BACKEND
- end.
+-compile({no_auto_import, [register/1, unregister/1]}).
+-spec backend() -> backend().
+backend() ->
+ case application:get_env(rabbit, cluster_formation) of
+ {ok, Proplist} ->
+ Backend = proplists:get_value(
+ peer_discovery_backend, Proplist, ?DEFAULT_BACKEND),
+ ?assert(is_atom(Backend)),
+ Backend;
+ undefined ->
+ ?DEFAULT_BACKEND
+ end.
-spec node_type() -> rabbit_types:node_type().
node_type() ->
- case application:get_env(rabbit, cluster_formation) of
- {ok, Proplist} ->
- proplists:get_value(node_type, Proplist, ?DEFAULT_NODE_TYPE);
- undefined ->
- ?DEFAULT_NODE_TYPE
- end.
-
--spec locking_retry_timeout() -> {Retries :: integer(), Timeout :: integer()}.
-
-locking_retry_timeout() ->
case application:get_env(rabbit, cluster_formation) of
{ok, Proplist} ->
- Retries = proplists:get_value(lock_retry_limit, Proplist, 10),
- Timeout = proplists:get_value(lock_retry_timeout, Proplist, 30000),
- {Retries, Timeout};
+ proplists:get_value(node_type, Proplist, ?DEFAULT_NODE_TYPE);
undefined ->
- {10, 30000}
+ ?DEFAULT_NODE_TYPE
end.
-spec lock_acquisition_failure_mode() -> ignore | fail.
@@ -90,142 +87,552 @@ lock_acquisition_failure_mode() ->
fail
end.
--spec log_configured_backend() -> ok.
-
-log_configured_backend() ->
- rabbit_log:info("Configured peer discovery backend: ~ts", [backend()]).
+-spec maybe_init() -> ok.
+%% @doc Initializes the peer discovery subsystem.
maybe_init() ->
Backend = backend(),
+ ?LOG_DEBUG(
+ "Peer discovery: configured backend: ~tp",
+ [Backend],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+
+ %% We cache the configured backend as well. This is used by
+ %% `sync_desired_cluster/0' and `maybe_unregister/0', to ensure that the
+ %% same backend is used to create/sync the cluster and (un)register the
+ %% node, even if the configuration changed in between.
+ persistent_term:put(?PT_PEER_DISC_BACKEND, Backend),
+
_ = code:ensure_loaded(Backend),
case erlang:function_exported(Backend, init, 0) of
true ->
- rabbit_log:debug("Peer discovery backend supports initialisation"),
+ ?LOG_DEBUG(
+ "Peer discovery: backend supports initialisation",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case Backend:init() of
ok ->
- rabbit_log:debug("Peer discovery backend initialisation succeeded"),
+ ?LOG_DEBUG(
+ "Peer discovery: backend initialisation succeeded",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
- {error, Error} ->
- rabbit_log:warning("Peer discovery backend initialisation failed: ~tp.", [Error]),
+ {error, _Reason} = Error ->
+ ?LOG_WARNING(
+ "Peer discovery: backend initialisation failed: ~tp.",
+ [Error],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end;
false ->
- rabbit_log:debug("Peer discovery backend does not support initialisation"),
+ ?LOG_DEBUG(
+ "Peer discovery: backend does not support initialisation",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end.
-maybe_create_cluster(CreateClusterCallback) ->
- {Retries, Timeout} = locking_retry_timeout(),
- maybe_create_cluster(Retries, Timeout, CreateClusterCallback).
+-spec sync_desired_cluster() -> ok.
+%% @doc Creates or synchronizes the cluster membership of this node based on a
+%% peer discovery backend.
+%%
+%% If the peer discovery backend finds nodes that this node should cluster
+%% with, this function calls {@link rabbit_db_cluster:join/2} to join one of
+%% these nodes.
+%%
+%% This function always returns `ok', regardless if this node joined a cluster
+%% or it should boot as a standalone node.
+%%
+%% Currently, it only expands the cluster. It won't take care of kicking
+%% members that are not listed by the backend.
+
+sync_desired_cluster() ->
+ Backend = persistent_term:get(?PT_PEER_DISC_BACKEND),
+
+ %% We handle retries at the top level: steps are followed sequentially and
+ %% if one of them fails, we retry the whole process.
+ {Retries, RetryDelay} = discovery_retries(),
+
+ sync_desired_cluster(Backend, Retries, RetryDelay).
+
+-spec sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> ok when
+ Backend :: backend(),
+ RetriesLeft :: non_neg_integer(),
+ RetryDelay :: non_neg_integer().
+%% @private
+
+sync_desired_cluster(Backend, RetriesLeft, RetryDelay) ->
+ %% The peer discovery process follows the following steps:
+ %% 1. It uses the configured backend to query the nodes that should form
+ %% a cluster. It takes care of checking the validity of the returned
+ %% values: the list of nodes is made of atoms and the node type is
+ %% valid.
+ %% 2. It queries some properties for each node in the list. This is used
+ %% to filter out unreachable nodes and to sort the final list. The
+ %% sorting is important because it determines which node it will try
+ %% to join.
+ %% 3. It joins the selected node using a regular `join_cluster'. This
+ %% step is protected by a lock if the backend supports this
+ %% mechanism.
+ case discover_cluster_nodes(Backend) of
+ {ok, {[ThisNode], _NodeType}} when ThisNode =:= node() ->
+ ?LOG_DEBUG(
+ "Peer discovery: no nodes to cluster with according to "
+ "backend; proceeding as a standalone node",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ ok;
+ {ok, {DiscoveredNodes, NodeType}} ->
+ NodesAndProps = query_node_props(DiscoveredNodes),
+ case can_use_discovered_nodes(DiscoveredNodes, NodesAndProps) of
+ true ->
+ SelectedNode = select_node_to_join(NodesAndProps),
+ Ret = join_selected_node(Backend, SelectedNode, NodeType),
+ case Ret of
+ ok ->
+ %% TODO: Check if there are multiple "concurrent"
+ %% clusters in `NodesAndProps' instead of one, or
+ %% standalone ready nodes that joined no one.
+ %%
+ %% TODO: After some delay, re-evaluate peer
+ %% discovery, in case there are again multiple
+ %% clusters or standalone ready nodes.
+ %%
+ %% TODO: Remove members which are not in the list
+ %% returned by the backend.
+ ok;
+ {error, _Reason} ->
+ retry_sync_desired_cluster(
+ Backend, RetriesLeft, RetryDelay)
+ end;
+ false ->
+ retry_sync_desired_cluster(
+ Backend, RetriesLeft, RetryDelay)
+ end;
+ {error, _Reason} ->
+ retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay)
+ end.
-maybe_create_cluster(0, _, CreateClusterCallback)
- when is_function(CreateClusterCallback, 2) ->
- case lock_acquisition_failure_mode() of
- ignore ->
- ?LOG_WARNING(
- "Peer discovery: Could not acquire a peer discovery lock, "
- "out of retries", [],
+-spec retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> ok when
+ Backend :: backend(),
+ RetriesLeft :: non_neg_integer(),
+ RetryDelay :: non_neg_integer().
+%% @private
+
+retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay)
+ when RetriesLeft > 0 ->
+ RetriesLeft1 = RetriesLeft - 1,
+ ?LOG_DEBUG(
+ "Peer discovery: retrying to create/sync cluster in ~b ms "
+ "(~b attempts left)",
+ [RetryDelay, RetriesLeft1],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ timer:sleep(RetryDelay),
+ sync_desired_cluster(Backend, RetriesLeft1, RetryDelay);
+retry_sync_desired_cluster(_Backend, 0, _RetryDelay) ->
+ ?LOG_ERROR(
+ "Peer discovery: could not discover and join another node; "
+ "proceeding as a standalone node",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ ok.
+
+-spec discover_cluster_nodes() -> {ok, Discovery} when
+ Discovery :: {DiscoveredNodes, NodeType},
+ DiscoveredNodes :: [node()],
+ NodeType :: rabbit_types:node_type().
+%% @doc Queries the peer discovery backend to discover nodes.
+%%
+%% This is used by the CLI.
+
+discover_cluster_nodes() ->
+ Backend = persistent_term:get(?PT_PEER_DISC_BACKEND, backend()),
+ discover_cluster_nodes(Backend).
+
+-spec discover_cluster_nodes(Backend) -> Ret when
+ Backend :: backend(),
+ Discovery :: {DiscoveredNodes, NodeType},
+ DiscoveredNodes :: [node()],
+ NodeType :: rabbit_types:node_type(),
+ Ret :: {ok, Discovery} | {error, Reason},
+ Reason :: any().
+%% @private
+
+discover_cluster_nodes(Backend) ->
+ %% The returned list of nodes and the node type are only sanity-checked by
+ %% this function. In other words, the list contains atoms and the node
+ %% type is valid. Nodes availability and inter-node compatibility are
+ %% taken care of later.
+ Ret = Backend:list_nodes(),
+ ?LOG_DEBUG(
+ "Peer discovery: backend returned the following configuration:~n"
+ " ~tp",
+ [Ret],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ case normalize(Ret) of
+ {ok, {DiscoveredNodes, NodeType} = Discovery} ->
+ check_discovered_nodes_list_validity(DiscoveredNodes, NodeType),
+ {ok, Discovery};
+ {error, _} = Error ->
+ ?LOG_ERROR(
+ "Peer discovery: failed to query the list of nodes from the "
+ "backend: ~0tp",
+ [Error],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ Error
+ end.
+
+-spec check_discovered_nodes_list_validity(DiscoveredNodes, NodeType) ->
+ Ret when
+ DiscoveredNodes :: [node()],
+ NodeType :: rabbit_types:node_type(),
+ Ret :: ok.
+%% @private
+
+check_discovered_nodes_list_validity(DiscoveredNodes, NodeType)
+ when is_list(DiscoveredNodes) andalso
+ NodeType =:= disc orelse NodeType =:= disk orelse NodeType =:= ram ->
+ BadNodenames = lists:filter(
+ fun(Nodename) -> not is_atom(Nodename) end,
+ DiscoveredNodes),
+ case BadNodenames of
+ [] -> ok;
+ _ -> e({invalid_cluster_node_names, BadNodenames})
+ end;
+check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType)
+ when is_list(DiscoveredNodes) ->
+ e({invalid_cluster_node_type, BadNodeType}).
+
+-spec query_node_props(Nodes) -> NodesAndProps when
+ Nodes :: [node()],
+ NodesAndProps :: [node_and_props()].
+%% @doc Queries properties for each node in `Nodes' and sorts the list using
+%% these properties.
+%%
+%% The following properties are queried:
+%%
+%% - the cluster membership of the node, i.e. the list of nodes it is
+%% clustered with, including itself
+%% - the node's Erlang VM start time
+%%
+%%
+%% If a node can't be queried because it is unavailable, it is excluded from
+%% the returned list.
+%%
+%% These properties are then used to sort the list of nodes according to the
+%% following criterias:
+%%
+%% - Nodes are sorted by cluster size, from the bigger to the smaller.
+%% - For nodes with the same cluster size, nodes are sorted by start time,
+%% from the oldest node to the youngest.
+%% - For nodes with the same cluster size and start time, nodes are sorted by
+%% names, alphabetically.
+%%
+%% The goal is that every nodes returned by the backend will select the same
+%% node to join (the first in the list). The cluster size criteria is here to
+%% make sure the node joins the cluster that is being expanded instead of
+%% another standalone node. The start time is used because it's a better
+%% criteria to sort nodes in a deterministic way than their name in case nodes
+%% start in an arbitrary number and the first node in alphabetical order
+%% becomes available later.
+%%
+%% An example of what we want to avoid is e.g. node 4 joining node 1, then node
+%% 1 joining node 2. Indeed, now that peer discovery uses {@link
+%% rabbit_db_cluster:join/2} instead of its own code path, there is the risk
+%% that node 1 kicks node 4 out of the cluster by joining node 2 because
+%% `join_cluster' includes a reset.
+%%
+%% @private
+
+query_node_props(Nodes) when Nodes =/= [] ->
+ %% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest
+ %% supported version has it.
+ MembersPerNode = erpc:multicall(Nodes, rabbit_nodes, all, []),
+ query_node_props1(Nodes, MembersPerNode, []);
+query_node_props([]) ->
+ [].
+
+query_node_props1(
+ [Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps) ->
+ NodeAndProps = {Node, Members},
+ NodesAndProps1 = [NodeAndProps | NodesAndProps],
+ query_node_props1(Nodes, MembersPerNode, NodesAndProps1);
+query_node_props1(
+ [Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps) ->
+ %% We consider that an error means the remote node is unreachable or not
+ %% ready. Therefore, we exclude it from the list of discovered nodes as we
+ %% won't be able to join it anyway.
+ ?LOG_DEBUG(
+ "Peer discovery: failed to query cluster members of node '~ts': ~0tp~n"
+ "Peer discovery: node '~ts' excluded from the discovered nodes",
+ [Node, Error, Node],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ query_node_props1(Nodes, MembersPerNode, NodesAndProps);
+query_node_props1([], [], NodesAndProps) ->
+ NodesAndProps1 = lists:reverse(NodesAndProps),
+ query_node_props2(NodesAndProps1, []).
+
+query_node_props2([{Node, Members} | Rest], NodesAndProps) ->
+ try
+ StartTime = get_node_start_time(Node, microsecond),
+ NodeAndProps = {Node, Members, StartTime},
+ NodesAndProps1 = [NodeAndProps | NodesAndProps],
+ query_node_props2(Rest, NodesAndProps1)
+ catch
+ _:Error:_ ->
+ %% If one of the erpc calls we use to get the start time fails,
+ %% there is something wrong with the remote node because it
+ %% doesn't depend on RabbitMQ. We exclude it from the discovered
+ %% nodes.
+ ?LOG_DEBUG(
+ "Peer discovery: failed to query start time of node '~ts': "
+ "~0tp~n"
+ "Peer discovery: node '~ts' excluded from the discovered nodes",
+ [Node, Error, Node],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- run_peer_discovery(CreateClusterCallback),
- maybe_register();
- fail ->
- exit(cannot_acquire_startup_lock)
+ query_node_props2(Rest, NodesAndProps)
end;
-maybe_create_cluster(Retries, Timeout, CreateClusterCallback)
- when is_function(CreateClusterCallback, 2) ->
- LockResult = lock(),
+query_node_props2([], NodesAndProps) ->
+ NodesAndProps1 = lists:reverse(NodesAndProps),
+ sort_nodes_and_props(NodesAndProps1).
+
+-spec get_node_start_time(Node, Unit) -> StartTime when
+ Node :: node(),
+ Unit :: erlang:time_unit(),
+ StartTime :: non_neg_integer().
+%% @doc Returns the start time of the given `Node' in `Unit'.
+%%
+%% The start time is an arbitrary point in time (in the past or the future),
+%% expressed native time unit. It is a monotonic time that is specific to that
+%% node. It can't be compared as is with other nodes' start time. To convert it
+%% to a system time so that we can compare it, we must add the node's time
+%% offset.
+%%
+%% Both the start time and the time offset are expressed in native time unit.
+%% Again, this can't be compared to other nodes' native time unit values. We
+%% must convert it to a common time unit first.
+%%
+%% See the documentation of {@link erlang:time_offset/0} at
+%% https://www.erlang.org/doc/man/erlang#time_offset-0 to get the full
+%% explanation of the computation.
+%%
+%% @private
+
+get_node_start_time(Node, Unit) ->
+ NativeStartTime = erpc:call(Node, erlang, system_info, [start_time]),
+ TimeOffset = erpc:call(Node, erlang, time_offset, []),
+ SystemStartTime = NativeStartTime + TimeOffset,
+ StartTime = erpc:call(
+ Node, erlang, convert_time_unit,
+ [SystemStartTime, native, Unit]),
+ StartTime.
+
+-spec sort_nodes_and_props(NodesAndProps) -> SortedNodesAndProps when
+ NodesAndProps :: [node_and_props()],
+ SortedNodesAndProps :: [node_and_props()].
+%% @doc Sorts the list of nodes according to their properties.
+%%
+%% See {@link query_node_props/1} for an explanation of the criterias used to
+%% sort the list.
+%%
+%% @see query_node_props/1.
+%%
+%% @private
+
+sort_nodes_and_props(NodesAndProps) ->
+ NodesAndProps1 = lists:sort(
+ fun(
+ {NodeA, MembersA, StartTimeA},
+ {NodeB, MembersB, StartTimeB}) ->
+ length(MembersA) > length(MembersB) orelse
+ (length(MembersA) =:= length(MembersB) andalso
+ StartTimeA < StartTimeB) orelse
+ (length(MembersA) =:= length(MembersB) andalso
+ StartTimeA =:= StartTimeB andalso
+ NodeA =< NodeB)
+ end, NodesAndProps),
+ ?LOG_DEBUG(
+ lists:flatten(
+ ["Peer discovery: sorted list of nodes and their properties "
+ "considered to create/sync the cluster:"] ++
+ ["~n - ~0tp" || _ <- NodesAndProps1]),
+ NodesAndProps1,
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ NodesAndProps1.
+
+-spec can_use_discovered_nodes(DiscoveredNodes, NodesAndProps) -> CanUse when
+ DiscoveredNodes :: [node()],
+ NodesAndProps :: [node_and_props()],
+ CanUse :: boolean().
+%% @doc Indicates if the list of discovered nodes is good enough to proceed
+%% with peer discovery.
+%%
+%% It is possible that we queried the backend early enough that it doesn't yet
+%% know about the nodes that should form a cluster. To reduce the chance of a
+%% list of nodes which makes little sense, we checks two criterias:
+%%
+%% - We want that this node is part of the list.
+%% - If we have a cluster size hint and the expected size is greater than 1,
+%% we want the list to have at least two nodes. The cluster size hint is
+%% computed from the configured target cluster size hint and the length of the
+%% nodes list returned by the backend. This function picks the maximum of the
+%% two. This is useful for backends such as the classic config one where the
+%% returned list is static (i.e. it can be used as the cluster size hint).
+%%
+%%
+%% @private
+
+can_use_discovered_nodes(DiscoveredNodes, NodesAndProps)
+ when NodesAndProps =/= [] ->
+ Nodes = [Node || {Node, _Members, _StartTime} <- NodesAndProps],
+
+ ThisNode = node(),
+ ThisNodeIsIncluded = lists:member(ThisNode, Nodes),
+ case ThisNodeIsIncluded of
+ true ->
+ ok;
+ false ->
+ ?LOG_DEBUG(
+ "Peer discovery: not satisfyied with discovered peers: the "
+ "list does not contain this node",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC})
+ end,
+
+ %% We consider that the list of nodes returned by the backend can be a
+ %% cluster size hint too. That's why we pick the maximum between the
+ %% configured one and the list length.
+ ClusterSizeHint = erlang:max(
+ rabbit_nodes:target_cluster_size_hint(),
+ length(DiscoveredNodes)),
+ HasEnoughNodes = ClusterSizeHint =< 1 orelse length(Nodes) >= 2,
+ case HasEnoughNodes of
+ true ->
+ ok;
+ false ->
+ ?LOG_DEBUG(
+ "Peer discovery: not satisfyied with discovered peers: the "
+ "list should contain at least two nodes with a configured "
+ "cluster size hint of ~b nodes",
+ [ClusterSizeHint],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC})
+ end,
+
+ ThisNodeIsIncluded andalso HasEnoughNodes;
+can_use_discovered_nodes(_DiscoveredNodes, []) ->
+ ?LOG_DEBUG(
+ "Peer discovery: discovered no peer nodes to cluster with. "
+ "Some discovery backends can filter nodes out based on a "
+ "readiness criteria. "
+ "Enabling debug logging might help troubleshoot.",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ false.
+
+-spec select_node_to_join(NodesAndProps) -> SelectedNode when
+ NodesAndProps :: [node_and_props()],
+ SelectedNode :: node().
+%% @doc Selects the node to join among the sorted list of nodes.
+%%
+%% The selection is simple: we take the first entry. It corresponds to the
+%% oldest node we could reach, clustered with the greatest number of nodes.
+%%
+%% @private
+
+select_node_to_join([{Node, _Members, _StartTime} | _]) ->
+ ?LOG_INFO(
+ "Peer discovery: node '~ts' selected for auto-clustering",
+ [Node],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ Node.
+
+-spec join_selected_node(Backend, Node, NodeType) -> Ret when
+ Backend :: backend(),
+ Node :: node(),
+ NodeType :: rabbit_types:node_type(),
+ Ret :: ok | {error, Reason},
+ Reason :: any().
+%% @doc Joins the selected node.
+%%
+%% This function relies on {@link rabbit_db_cluster:join/2}. It acquires a lock
+%% before proceeding with the join if the backend provides such a mechanism.
+%%
+%% If the selected node is this node, this is a no-op and no lock is acquired.
+%%
+%% @private
+
+join_selected_node(_Backend, ThisNode, _NodeType) when ThisNode =:= node() ->
+ ?LOG_DEBUG(
+ "Peer discovery: the selected node is this node; proceed with boot",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ ok;
+join_selected_node(Backend, SelectedNode, NodeType) ->
+ ?LOG_DEBUG(
+ "Peer discovery: trying to acquire lock",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ LockResult = lock(Backend),
?LOG_DEBUG(
- "Peer discovery: rabbit_peer_discovery:lock/0 returned ~tp",
+ "Peer discovery: rabbit_peer_discovery:lock/0 returned ~0tp",
[LockResult],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case LockResult of
not_supported ->
- run_peer_discovery(CreateClusterCallback),
- maybe_register();
+ ?LOG_DEBUG(
+ "Peer discovery: no lock acquired",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ join_selected_node_locked(SelectedNode, NodeType);
{ok, Data} ->
+ ?LOG_DEBUG(
+ "Peer discovery: lock acquired",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
try
- run_peer_discovery(CreateClusterCallback),
- maybe_register()
+ join_selected_node_locked(SelectedNode, NodeType)
after
- unlock(Data)
+ ?LOG_DEBUG(
+ "Peer discovery: lock released",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ unlock(Backend, Data)
end;
- {error, _Reason} ->
- timer:sleep(Timeout),
- maybe_create_cluster(
- Retries - 1, Timeout, CreateClusterCallback)
+ {error, _Reason} = Error ->
+ ?LOG_WARNING(
+ "Peer discovery: failed to acquire a lock: ~0tp",
+ [Error],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ case lock_acquisition_failure_mode() of
+ ignore -> join_selected_node_locked(SelectedNode, NodeType);
+ fail -> Error
+ end
end.
--spec run_peer_discovery(CreateClusterCallback) -> Ret when
- CreateClusterCallback :: create_cluster_callback(),
- Ret :: ok | {Nodes, NodeType},
- Nodes :: [node()],
- NodeType :: rabbit_db_cluster:node_type().
-
-run_peer_discovery(CreateClusterCallback) ->
- {RetriesLeft, DelayInterval} = discovery_retries(),
- run_peer_discovery_with_retries(
- RetriesLeft, DelayInterval, CreateClusterCallback).
-
--spec run_peer_discovery_with_retries(
- Retries, DelayInterval, CreateClusterCallback) -> ok when
- CreateClusterCallback :: create_cluster_callback(),
- Retries :: non_neg_integer(),
- DelayInterval :: non_neg_integer().
-
-run_peer_discovery_with_retries(
- 0, _DelayInterval, _CreateClusterCallback) ->
- ok;
-run_peer_discovery_with_retries(
- RetriesLeft, DelayInterval, CreateClusterCallback) ->
- FindBadNodeNames = fun
- (Name, BadNames) when is_atom(Name) -> BadNames;
- (Name, BadNames) -> [Name | BadNames]
- end,
- {DiscoveredNodes0, NodeType} =
- case discover_cluster_nodes() of
- {error, Reason} ->
- RetriesLeft1 = RetriesLeft - 1,
- ?LOG_ERROR(
- "Peer discovery: Failed to discover nodes: ~tp. "
- "Will retry after a delay of ~b ms, ~b retries left...",
- [Reason, DelayInterval, RetriesLeft1],
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- timer:sleep(DelayInterval),
- run_peer_discovery_with_retries(
- RetriesLeft1, DelayInterval, CreateClusterCallback);
- {ok, {Nodes, Type} = Config}
- when is_list(Nodes) andalso
- (Type == disc orelse Type == disk orelse Type == ram) ->
- case lists:foldr(FindBadNodeNames, [], Nodes) of
- [] -> Config;
- BadNames -> e({invalid_cluster_node_names, BadNames})
- end;
- {ok, {_, BadType}} when BadType /= disc andalso BadType /= ram ->
- e({invalid_cluster_node_type, BadType});
- {ok, _} ->
- e(invalid_cluster_nodes_conf)
+-spec join_selected_node_locked(Node, NodeType) -> Ret when
+ Node :: node(),
+ NodeType :: rabbit_types:node_type(),
+ Ret :: ok | {error, Reason},
+ Reason :: any().
+
+join_selected_node_locked(Node, NodeType) ->
+ %% We used to synchronize feature flags here before we updated the cluster
+ %% membership. We don't do it anymore because the `join_cluster' code
+ %% resets the joining node and copies the feature flags states from the
+ %% cluster.
+ try
+ Ret = rabbit_db_cluster:join(Node, NodeType),
+ ?assertNotEqual({ok, already_member}, Ret),
+ case Ret of
+ ok ->
+ ?LOG_INFO(
+ "Peer discovery: this node (~ts) successfully joined "
+ "node '~ts' cluster",
+ [node(), Node],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC});
+ Error1 ->
+ ?LOG_WARNING(
+ "Peer discovery: could not auto-cluster with node '~ts': "
+ "~0tp",
+ [Node, Error1],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC})
end,
- DiscoveredNodes = lists:usort(DiscoveredNodes0),
- ?LOG_INFO(
- "Peer discovery: All discovered existing cluster peers: ~ts",
- [format_discovered_nodes(DiscoveredNodes)],
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- Peers = rabbit_nodes:nodes_excl_me(DiscoveredNodes),
- case Peers of
- [] ->
- ?LOG_INFO(
- "Peer discovery: Discovered no peer nodes to cluster with. "
- "Some discovery backends can filter nodes out based on a "
- "readiness criteria. "
- "Enabling debug logging might help troubleshoot.",
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- CreateClusterCallback(none, disc);
- _ ->
- ?LOG_INFO(
- "Peer discovery: Peer nodes we can cluster with: ~ts",
- [format_discovered_nodes(Peers)],
+ Ret
+ catch
+ throw:Error2 ->
+ ?LOG_WARNING(
+ "Peer discovery: could not auto-cluster with node '~ts': ~0tp",
+ [Node, Error2],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- join_discovered_peers(Peers, NodeType, CreateClusterCallback)
+ Error2
end.
-spec e(any()) -> no_return().
@@ -238,135 +645,47 @@ error_description({invalid_cluster_node_names, BadNames}) ->
error_description({invalid_cluster_node_type, BadType}) ->
"In the 'cluster_nodes' configuration key, the node type is invalid "
"(expected 'disc' or 'ram'): " ++
- lists:flatten(io_lib:format("~tp", [BadType]));
-error_description(invalid_cluster_nodes_conf) ->
- "The 'cluster_nodes' configuration key is invalid, it must be of the "
- "form {[Nodes], Type}, where Nodes is a list of node names and "
- "Type is either 'disc' or 'ram'".
-
-%% Attempts to join discovered, reachable and compatible (in terms of Mnesia
-%% internal protocol version and such) cluster peers in order.
-join_discovered_peers(TryNodes, NodeType, CreateClusterCallback) ->
- {RetriesLeft, DelayInterval} = discovery_retries(),
- join_discovered_peers_with_retries(
- TryNodes, NodeType, RetriesLeft, DelayInterval, CreateClusterCallback).
-
-join_discovered_peers_with_retries(
- TryNodes, _NodeType, 0, _DelayInterval, CreateClusterCallback) ->
- ?LOG_INFO(
- "Peer discovery: Could not successfully contact any node of: ~ts "
- "(as in Erlang distribution). "
- "Starting as a blank standalone node...",
- [string:join(lists:map(fun atom_to_list/1, TryNodes), ",")],
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- init_single_node(CreateClusterCallback);
-join_discovered_peers_with_retries(
- TryNodes, NodeType, RetriesLeft, DelayInterval, CreateClusterCallback) ->
- case find_reachable_peer_to_cluster_with(TryNodes) of
- {ok, Node} ->
- ?LOG_INFO(
- "Peer discovery: Node '~ts' selected for auto-clustering",
- [Node],
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- create_cluster(Node, NodeType, CreateClusterCallback);
- none ->
- RetriesLeft1 = RetriesLeft - 1,
- ?LOG_INFO(
- "Peer discovery: Trying to join discovered peers failed. "
- "Will retry after a delay of ~b ms, ~b retries left...",
- [DelayInterval, RetriesLeft1],
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- timer:sleep(DelayInterval),
- join_discovered_peers_with_retries(
- TryNodes, NodeType, RetriesLeft1, DelayInterval,
- CreateClusterCallback)
- end.
-
-find_reachable_peer_to_cluster_with([]) ->
- none;
-find_reachable_peer_to_cluster_with([Node | Nodes]) when Node =/= node() ->
- case rabbit_db_cluster:check_compatibility(Node) of
- ok ->
- {ok, Node};
- Error ->
- ?LOG_WARNING(
- "Peer discovery: Could not auto-cluster with node ~ts: ~0p",
- [Node, Error],
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- find_reachable_peer_to_cluster_with(Nodes)
- end;
-find_reachable_peer_to_cluster_with([Node | Nodes]) when Node =:= node() ->
- find_reachable_peer_to_cluster_with(Nodes).
-
-init_single_node(CreateClusterCallback) ->
- IsVirgin = rabbit_db:is_virgin_node(),
- rabbit_db_cluster:ensure_feature_flags_are_in_sync([], IsVirgin),
- CreateClusterCallback(none, disc),
- ok.
-
-create_cluster(RemoteNode, NodeType, CreateClusterCallback) ->
- %% We want to synchronize feature flags first before we update the cluster
- %% membership. This is needed to ensure the local list of Mnesia tables
- %% matches the rest of the cluster for example, in case a feature flag
- %% adds or removes tables.
- %%
- %% For instance, a feature flag may remove a table (so it's gone from the
- %% cluster). If we were to wait for that table locally before
- %% synchronizing feature flags, we would wait forever; indeed the feature
- %% flag being disabled before sync, `rabbit_table:definitions()' would
- %% return the old table.
- %%
- %% Feature flags need to be synced before any change to Mnesia membership.
- %% If enabling feature flags fails, Mnesia could remain in an inconsistent
- %% state that prevents later joining the nodes.
- IsVirgin = rabbit_db:is_virgin_node(),
- rabbit_db_cluster:ensure_feature_flags_are_in_sync([RemoteNode], IsVirgin),
- CreateClusterCallback(RemoteNode, NodeType),
- rabbit_node_monitor:notify_joined_cluster(),
- ok.
-
-%% This module doesn't currently sanity-check the return value of
-%% `Backend:list_nodes()`. Therefore, it could return something invalid:
-%% thus the `{œk, any()} in the spec.
-%%
-%% `rabbit_mnesia:init_from_config()` does some verifications.
-
--spec discover_cluster_nodes() ->
- {ok, {Nodes :: [node()], NodeType :: rabbit_types:node_type()} | any()} |
- {error, Reason :: string()}.
-
-discover_cluster_nodes() ->
- Backend = backend(),
- normalize(Backend:list_nodes()).
-
+ lists:flatten(io_lib:format("~tp", [BadType])).
-spec maybe_register() -> ok.
maybe_register() ->
- Backend = backend(),
- case Backend:supports_registration() of
- true ->
- register(),
- Backend:post_registration();
- false ->
- rabbit_log:info("Peer discovery backend ~ts does not support registration, skipping registration.", [Backend]),
- ok
- end.
-
+ Backend = persistent_term:get(?PT_PEER_DISC_BACKEND, backend()),
+ case Backend:supports_registration() of
+ true ->
+ ?LOG_DEBUG(
+ "Peer discovery: registering this node",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ register(Backend),
+ _ = Backend:post_registration(),
+ ok;
+ false ->
+ ?LOG_DEBUG(
+ "Peer discovery: registration unsupported, skipping register",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ ok
+ end.
-spec maybe_unregister() -> ok.
maybe_unregister() ->
- Backend = backend(),
- case Backend:supports_registration() of
- true ->
- unregister();
- false ->
- rabbit_log:info("Peer discovery backend ~ts does not support registration, skipping unregistration.", [Backend]),
- ok
- end.
+ Backend = persistent_term:get(?PT_PEER_DISC_BACKEND),
+ case Backend:supports_registration() of
+ true ->
+ ?LOG_DEBUG(
+ "Peer discovery: unregistering this node",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ unregister(Backend);
+ false ->
+ ?LOG_DEBUG(
+ "Peer discovery: registration unsupported, skipping unregister",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ ok
+ end.
--spec discovery_retries() -> {Retries :: integer(), Interval :: integer()}.
+-spec discovery_retries() -> {Retries, RetryDelay} when
+ Retries :: non_neg_integer(),
+ RetryDelay :: non_neg_integer().
discovery_retries() ->
case application:get_env(rabbit, cluster_formation) of
@@ -378,57 +697,89 @@ discovery_retries() ->
{?DEFAULT_DISCOVERY_RETRY_COUNT, ?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS}
end.
--spec register() -> ok.
+-spec register(Backend) -> ok when
+ Backend :: backend().
-register() ->
- Backend = backend(),
- rabbit_log:info("Will register with peer discovery backend ~ts", [Backend]),
- case Backend:register() of
- ok -> ok;
- {error, Error} ->
- rabbit_log:error("Failed to register with peer discovery backend ~ts: ~tp",
- [Backend, Error]),
- ok
- end.
+register(Backend) ->
+ ?LOG_INFO(
+ "Peer discovery: will register with peer discovery backend ~ts",
+ [Backend],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ case Backend:register() of
+ ok ->
+ ok;
+ {error, _Reason} = Error ->
+ ?LOG_ERROR(
+ "Peer discovery: failed to register with peer discovery "
+ "backend ~ts: ~tp",
+ [Backend, Error],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ ok
+ end.
--spec unregister() -> ok.
+-spec unregister(Backend) -> ok when
+ Backend :: backend().
-unregister() ->
- Backend = backend(),
- rabbit_log:info("Will unregister with peer discovery backend ~ts", [Backend]),
+unregister(Backend) ->
+ ?LOG_INFO(
+ "Peer discovery: will unregister with peer discovery backend ~ts",
+ [Backend],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case Backend:unregister() of
- ok -> ok;
- {error, Error} ->
- rabbit_log:error("Failed to unregister with peer discovery backend ~ts: ~tp",
- [Backend, Error]),
+ ok ->
+ ok;
+ {error, _Reason} = Error ->
+ ?LOG_ERROR(
+ "Peer discovery: failed to unregister with peer discovery "
+ "backend ~ts: ~tp",
+ [Backend, Error],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end.
--spec lock() -> {ok, Data :: term()} | not_supported | {error, Reason :: string()}.
+-spec lock(Backend) -> Ret when
+ Backend :: backend(),
+ Ret :: {ok, Data} | not_supported | {error, Reason},
+ Data :: any(),
+ Reason :: string().
-lock() ->
- Backend = backend(),
- rabbit_log:info("Will try to lock with peer discovery backend ~ts", [Backend]),
+lock(Backend) ->
+ ?LOG_INFO(
+ "Peer discovery: will try to lock with peer discovery backend ~ts",
+ [Backend],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case Backend:lock(node()) of
{error, Reason} = Error ->
- rabbit_log:error("Failed to lock with peer discovery backend ~ts: ~tp",
- [Backend, Reason]),
+ ?LOG_ERROR(
+ "Peer discovery: failed to lock with peer discovery "
+ "backend ~ts: ~0tp",
+ [Backend, Reason],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Error;
Any ->
Any
end.
--spec unlock(Data :: term()) -> ok | {error, Reason :: string()}.
+-spec unlock(Backend, Data) -> Ret when
+ Backend :: backend(),
+ Data :: any(),
+ Ret :: ok | {error, Reason},
+ Reason :: string().
-unlock(Data) ->
- Backend = backend(),
- rabbit_log:info("Will try to unlock with peer discovery backend ~ts", [Backend]),
+unlock(Backend, Data) ->
+ ?LOG_INFO(
+ "Peer discovery: will try to unlock with peer discovery "
+ "backend ~ts",
+ [Backend],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case Backend:unlock(Data) of
{error, Reason} = Error ->
- rabbit_log:error("Failed to unlock with peer discovery backend ~ts: ~tp, "
- "lock data: ~tp",
- [Backend, Reason, Data]),
+ ?LOG_ERROR(
+ "Peer discovery: failed to unlock with peer discovery "
+ "backend ~ts: ~0tp, lock data: ~0tp",
+ [Backend, Reason, Data],
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Error;
Any ->
Any
@@ -459,16 +810,6 @@ normalize({ok, {Nodes, NodeType}}) when is_list(Nodes) andalso is_atom(NodeType)
normalize({error, Reason}) ->
{error, Reason}.
--spec format_discovered_nodes(Nodes :: list()) -> string().
-
-format_discovered_nodes(Nodes) ->
- %% NOTE: in OTP 21 string:join/2 is deprecated but still available.
- %% Its recommended replacement is not a drop-in one, though, so
- %% we will not be switching just yet.
- string:join(lists:map(fun rabbit_data_coercion:to_list/1, Nodes), ", ").
-
-
-
-spec node_prefix() -> string().
node_prefix() ->
@@ -477,8 +818,6 @@ node_prefix() ->
[_] -> ?DEFAULT_PREFIX
end.
-
-
-spec append_node_prefix(Value :: binary() | string()) -> string().
append_node_prefix(Value) when is_binary(Value) orelse is_list(Value) ->
diff --git a/deps/rabbit/src/rabbit_peer_discovery_classic_config.erl b/deps/rabbit/src/rabbit_peer_discovery_classic_config.erl
index 2c2aef2c615a..de5e46873545 100644
--- a/deps/rabbit/src/rabbit_peer_discovery_classic_config.erl
+++ b/deps/rabbit/src/rabbit_peer_discovery_classic_config.erl
@@ -22,8 +22,17 @@
list_nodes() ->
case application:get_env(rabbit, cluster_nodes, {[], disc}) of
- {_Nodes, _NodeType} = Pair -> {ok, Pair};
- Nodes when is_list(Nodes) -> {ok, {Nodes, disc}}
+ {Nodes, NodeType} ->
+ {ok, {add_this_node(Nodes), NodeType}};
+ Nodes when is_list(Nodes) ->
+ {ok, {add_this_node(Nodes), disc}}
+ end.
+
+add_this_node(Nodes) ->
+ ThisNode = node(),
+ case lists:member(ThisNode, Nodes) of
+ true -> Nodes;
+ false -> [ThisNode | Nodes]
end.
-spec lock(Node :: node()) -> {ok, {{ResourceId :: string(), LockRequesterId :: node()}, Nodes :: [node()]}} |
diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl
index a54cdd7d83dc..1dc26a051813 100644
--- a/deps/rabbit/test/clustering_management_SUITE.erl
+++ b/deps/rabbit/test/clustering_management_SUITE.erl
@@ -988,6 +988,20 @@ is_not_supported(Ret) ->
classic_config_discovery_node_list(Config) ->
[Rabbit, Hare] = cluster_members(Config),
+ %% We restart the node that is reconfigured during this testcase to make
+ %% sure it has the latest start time. This ensures that peer discovery will
+ %% always select the other node as the one to join.
+ %%
+ %% We do this because this testcase does not really reflect a real world
+ %% situation. Indeed, both nodes have inconsistent peer discovery
+ %% configuration and the configuration is changed at runtime using internal
+ %% calls (which we don't support).
+ %%
+ %% Without this, if node 2 was started first, it will select itself and
+ %% thus boot as a standalone node, expecting node 1 to join it. But node 1
+ %% is ready and never restarted/reconfigured.
+ rabbit_ct_broker_helpers:restart_node(Config, Hare),
+
ok = stop_app(Config, Hare),
ok = reset(Config, Hare),
ok = rpc:call(Hare, application, set_env,
diff --git a/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl b/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl
index 4341387a25a3..ce35d0250e8b 100644
--- a/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl
+++ b/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl
@@ -126,8 +126,6 @@ init_per_testcase(successful_discovery_with_a_subset_of_nodes_coming_online = Te
{rabbit, [
{cluster_nodes, {NodeNamesWithHostname, disc}},
{cluster_formation, [
- {discovery_retry_limit, 2},
- {discovery_retry_interval, 100},
{internal_lock_retries, 10}
]}
]}),
diff --git a/deps/rabbit/test/unit_cluster_formation_locking_mocks_SUITE.erl b/deps/rabbit/test/unit_cluster_formation_locking_mocks_SUITE.erl
index 8082607a364e..95cc3b2ba967 100644
--- a/deps/rabbit/test/unit_cluster_formation_locking_mocks_SUITE.erl
+++ b/deps/rabbit/test/unit_cluster_formation_locking_mocks_SUITE.erl
@@ -47,25 +47,37 @@ end_per_testcase(_, _) ->
init_with_lock_exits_after_errors(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> {error, "test error"} end),
- ?assertExit(cannot_acquire_startup_lock, rabbit_peer_discovery:maybe_create_cluster(2, 10, fun(_, _) -> ok end)),
+ ?assertEqual(
+ {error, "test error"},
+ rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, missing@localhost, disc)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.
+%% The `aborted_feature_flags_compat_check' error means the function called
+%% `rabbit_db_cluster:join/2', so it passed the locking step. The error is
+%% expected because the test runs outside of a working RabbitMQ.
+
init_with_lock_ignore_after_errors(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> {error, "test error"} end),
- ?assertEqual(ok, rabbit_peer_discovery:maybe_create_cluster(2, 10, fun(_, _) -> ok end)),
+ ?assertEqual(
+ {error, {aborted_feature_flags_compat_check, {error, feature_flags_file_not_set}}},
+ rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, missing@localhost, disc)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.
init_with_lock_not_supported(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> not_supported end),
- ?assertEqual(ok, rabbit_peer_discovery:maybe_create_cluster(2, 10, fun(_, _) -> ok end)),
+ ?assertEqual(
+ {error, {aborted_feature_flags_compat_check, {error, feature_flags_file_not_set}}},
+ rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, missing@localhost, disc)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.
init_with_lock_supported(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> {ok, data} end),
meck:expect(rabbit_peer_discovery_classic_config, unlock, fun(data) -> ok end),
- ?assertEqual(ok, rabbit_peer_discovery:maybe_create_cluster(2, 10, fun(_, _) -> ok end)),
+ ?assertEqual(
+ {error, {aborted_feature_flags_compat_check, {error, feature_flags_file_not_set}}},
+ rabbit_peer_discovery:join_selected_node(rabbit_peer_discovery_classic_config, missing@localhost, disc)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.
diff --git a/deps/rabbit/test/unit_cluster_formation_sort_nodes_SUITE.erl b/deps/rabbit/test/unit_cluster_formation_sort_nodes_SUITE.erl
new file mode 100644
index 000000000000..85745fe5fe11
--- /dev/null
+++ b/deps/rabbit/test/unit_cluster_formation_sort_nodes_SUITE.erl
@@ -0,0 +1,214 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(unit_cluster_formation_sort_nodes_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-export([all/0,
+ groups/0,
+ init_per_suite/1, end_per_suite/1,
+ init_per_group/2, end_per_group/2,
+ init_per_testcase/2, end_per_testcase/2,
+
+ sort_single_node/1,
+ sort_by_cluster_size/1,
+ sort_by_start_time/1,
+ sort_by_node_name/1,
+ cluster_size_has_precedence_over_start_time/1,
+ start_time_has_precedence_over_node_name/1,
+ failed_in_ci_1/1,
+ failed_in_ci_2/1]).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ [
+ {parallel_tests, [parallel],
+ [
+ sort_single_node,
+ sort_by_cluster_size,
+ sort_by_start_time,
+ sort_by_node_name,
+ cluster_size_has_precedence_over_start_time,
+ start_time_has_precedence_over_node_name,
+ failed_in_ci_1,
+ failed_in_ci_2
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(Config) ->
+ Config.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, Config) ->
+ Config.
+
+init_per_testcase(_Testcase, Config) ->
+ Config.
+
+end_per_testcase(_Testcase, Config) ->
+ Config.
+
+sort_single_node(_Config) ->
+ NodesAndProps = [{a, [a], 100}],
+ ?assertEqual(
+ NodesAndProps,
+ rabbit_peer_discovery:sort_nodes_and_props(NodesAndProps)).
+
+sort_by_cluster_size(_Config) ->
+ NodesAndProps = [{a, [a], 100},
+ {a, [a, a], 100}],
+ ?assertEqual(
+ [{a, [a, a], 100},
+ {a, [a], 100}],
+ rabbit_peer_discovery:sort_nodes_and_props(NodesAndProps)).
+
+sort_by_start_time(_Config) ->
+ NodesAndProps = [{a, [a], 20},
+ {a, [a], 10}],
+ ?assertEqual(
+ [{a, [a], 10},
+ {a, [a], 20}],
+ rabbit_peer_discovery:sort_nodes_and_props(NodesAndProps)).
+
+sort_by_node_name(_Config) ->
+ NodesAndProps = [{b, [b], 100},
+ {a, [a], 100}],
+ ?assertEqual(
+ [{a, [a], 100},
+ {b, [b], 100}],
+ rabbit_peer_discovery:sort_nodes_and_props(NodesAndProps)).
+
+cluster_size_has_precedence_over_start_time(_Config) ->
+ NodesAndProps = [{a, [a], 100},
+ {b, [b, c], 90}],
+ ?assertEqual(
+ [{b, [b, c], 90},
+ {a, [a], 100}],
+ rabbit_peer_discovery:sort_nodes_and_props(NodesAndProps)).
+
+start_time_has_precedence_over_node_name(_Config) ->
+ NodesAndProps = [{a, [a], 100},
+ {b, [b], 90}],
+ ?assertEqual(
+ [{b, [b], 90},
+ {a, [a], 100}],
+ rabbit_peer_discovery:sort_nodes_and_props(NodesAndProps)).
+
+failed_in_ci_1(_Config) ->
+ NodesAndProps = [{'successful_discovery-cluster_size_7-7@localhost',
+ ['successful_discovery-cluster_size_7-7@localhost'],
+ 1699635835018},
+ {'successful_discovery-cluster_size_7-6@localhost',
+ ['successful_discovery-cluster_size_7-6@localhost'],
+ 1699635835006},
+ {'successful_discovery-cluster_size_7-5@localhost',
+ ['successful_discovery-cluster_size_7-5@localhost'],
+ 1699635835019},
+ {'successful_discovery-cluster_size_7-4@localhost',
+ ['successful_discovery-cluster_size_7-4@localhost'],
+ 1699635835007},
+ {'successful_discovery-cluster_size_7-3@localhost',
+ ['successful_discovery-cluster_size_7-3@localhost'],
+ 1699635835006},
+ {'successful_discovery-cluster_size_7-2@localhost',
+ ['successful_discovery-cluster_size_7-2@localhost'],
+ 1699635835013},
+ {'successful_discovery-cluster_size_7-1@localhost',
+ ['successful_discovery-cluster_size_7-1@localhost'],
+ 1699635835011}],
+ ?assertEqual(
+ [{'successful_discovery-cluster_size_7-3@localhost',
+ ['successful_discovery-cluster_size_7-3@localhost'],
+ 1699635835006},
+ {'successful_discovery-cluster_size_7-6@localhost',
+ ['successful_discovery-cluster_size_7-6@localhost'],
+ 1699635835006},
+ {'successful_discovery-cluster_size_7-4@localhost',
+ ['successful_discovery-cluster_size_7-4@localhost'],
+ 1699635835007},
+ {'successful_discovery-cluster_size_7-1@localhost',
+ ['successful_discovery-cluster_size_7-1@localhost'],
+ 1699635835011},
+ {'successful_discovery-cluster_size_7-2@localhost',
+ ['successful_discovery-cluster_size_7-2@localhost'],
+ 1699635835013},
+ {'successful_discovery-cluster_size_7-7@localhost',
+ ['successful_discovery-cluster_size_7-7@localhost'],
+ 1699635835018},
+ {'successful_discovery-cluster_size_7-5@localhost',
+ ['successful_discovery-cluster_size_7-5@localhost'],
+ 1699635835019}],
+ rabbit_peer_discovery:sort_nodes_and_props(NodesAndProps)).
+
+failed_in_ci_2(_Config) ->
+ NodesAndProps = [{'successful_discovery-cluster_size_7-7@localhost',
+ ['successful_discovery-cluster_size_7-1@localhost',
+ 'successful_discovery-cluster_size_7-7@localhost',
+ 'successful_discovery-cluster_size_7-2@localhost'],
+ 1699635835018},
+ {'successful_discovery-cluster_size_7-6@localhost',
+ ['successful_discovery-cluster_size_7-6@localhost'],
+ 1699635835006},
+ {'successful_discovery-cluster_size_7-5@localhost',
+ ['successful_discovery-cluster_size_7-5@localhost'],
+ 1699635835019},
+ {'successful_discovery-cluster_size_7-4@localhost',
+ ['successful_discovery-cluster_size_7-4@localhost'],
+ 1699635835007},
+ {'successful_discovery-cluster_size_7-3@localhost',
+ ['successful_discovery-cluster_size_7-3@localhost'],
+ 1699635835006},
+ {'successful_discovery-cluster_size_7-2@localhost',
+ ['successful_discovery-cluster_size_7-1@localhost',
+ 'successful_discovery-cluster_size_7-7@localhost',
+ 'successful_discovery-cluster_size_7-2@localhost'],
+ 1699635835013},
+ {'successful_discovery-cluster_size_7-1@localhost',
+ ['successful_discovery-cluster_size_7-1@localhost',
+ 'successful_discovery-cluster_size_7-7@localhost',
+ 'successful_discovery-cluster_size_7-2@localhost'],
+ 1699635835011}],
+ ?assertEqual(
+ [{'successful_discovery-cluster_size_7-1@localhost',
+ ['successful_discovery-cluster_size_7-1@localhost',
+ 'successful_discovery-cluster_size_7-7@localhost',
+ 'successful_discovery-cluster_size_7-2@localhost'],
+ 1699635835011},
+ {'successful_discovery-cluster_size_7-2@localhost',
+ ['successful_discovery-cluster_size_7-1@localhost',
+ 'successful_discovery-cluster_size_7-7@localhost',
+ 'successful_discovery-cluster_size_7-2@localhost'],
+ 1699635835013},
+ {'successful_discovery-cluster_size_7-7@localhost',
+ ['successful_discovery-cluster_size_7-1@localhost',
+ 'successful_discovery-cluster_size_7-7@localhost',
+ 'successful_discovery-cluster_size_7-2@localhost'],
+ 1699635835018},
+ {'successful_discovery-cluster_size_7-3@localhost',
+ ['successful_discovery-cluster_size_7-3@localhost'],
+ 1699635835006},
+ {'successful_discovery-cluster_size_7-6@localhost',
+ ['successful_discovery-cluster_size_7-6@localhost'],
+ 1699635835006},
+ {'successful_discovery-cluster_size_7-4@localhost',
+ ['successful_discovery-cluster_size_7-4@localhost'],
+ 1699635835007},
+ {'successful_discovery-cluster_size_7-5@localhost',
+ ['successful_discovery-cluster_size_7-5@localhost'],
+ 1699635835019}],
+ rabbit_peer_discovery:sort_nodes_and_props(NodesAndProps)).
diff --git a/deps/rabbitmq_cli/test/diagnostics/discover_peers_command_test.exs b/deps/rabbitmq_cli/test/diagnostics/discover_peers_command_test.exs
index 339fab55d83e..791d07dc484f 100644
--- a/deps/rabbitmq_cli/test/diagnostics/discover_peers_command_test.exs
+++ b/deps/rabbitmq_cli/test/diagnostics/discover_peers_command_test.exs
@@ -37,6 +37,7 @@ defmodule DiscoverPeersCommandTest do
@tag test_timeout: 15000
test "run: returns a list of nodes when the backend isn't configured", context do
- assert match?({:ok, {[], _}}, @command.run([], context[:opts]))
+ this_node = node()
+ assert match?({:ok, {[this_node], _}}, @command.run([], context[:opts]))
end
end