From 0876746d5f94ad914e560d5ad0162741d88b3c29 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 18 May 2021 01:01:08 +0200 Subject: [PATCH] Remove randomized startup delays On initial cluster formation, only one node in a multi node cluster should initialize the Mnesia database schema (i.e. form the cluster). To ensure that for nodes starting up in parallel, RabbitMQ peer discovery backends have used either locks or randomized startup delays. Locks work great: When a node holds the lock, it either starts a new blank node (if there is no other node in the cluster), or it joins an existing node. This makes it impossible to have two nodes forming the cluster at the same time. Consul and etcd peer discovery backends use locks. The lock is acquired in the consul and etcd infrastructure, respectively. For other peer discovery backends (classic, DNS, AWS), randomized startup delays were used. They work good enough in most cases. However, in https://github.com/rabbitmq/cluster-operator/issues/662 we observed that in 1% - 10% of the cases (the more nodes or the smaller the randomized startup delay range, the higher the chances), two nodes decide to form the cluster. That's bad since it will end up in a single Erlang cluster, but in two RabbitMQ clusters. Even worse, no obvious alert got triggered or error message logged. To solve this issue, one could increase the randomized startup delay range from e.g. 0m - 1m to 0m - 3m. However, this makes initial cluster formation very slow since it will take up to 3 minutes until every node is ready. In rare cases, we still end up with two nodes forming the cluster. Another way to solve the problem is to name a dedicated node to be the seed node (forming the cluster). This was explored in https://github.com/rabbitmq/cluster-operator/pull/689 and works well. Two minor downsides to this approach are: 1. If the seed node never becomes available, the whole cluster won't be formed (which is okay), and 2. it doesn't integrate with existing dynamic peer discovery backends (e.g. K8s, AWS) since nodes are not yet known at deploy time. In this commit, we take a better approach: We remove randomized startup delays altogether. We replace them with locks. However, instead of implementing our own lock implementation in an external system (e.g. in K8s), we re-use Erlang's locking mechanism global:set_lock/3. global:set_lock/3 has some convenient properties: 1. It accepts a list of nodes to set the lock on. 2. The nodes in that list connect to each other (i.e. create an Erlang cluster). 3. The method is synchronous with a timeout (number of retries). It blocks until the lock becomes available. 4. If a process that holds a lock dies, or the node goes down, the lock held by the process is deleted. The list of nodes passed to global:set_lock/3 corresponds to the nodes the peer discovery backend discovers (lists). Two special cases worth mentioning: 1. That list can be all desired nodes in the cluster (e.g. in classic peer discovery where nodes are known at deploy time) while only a subset of nodes is available. In that case, global:set_lock/3 still sets the lock not blocking until all nodes can be connected to. This is good since nodes might start sequentially (non-parallel). 2. In dynamic peer discovery backends (e.g. K8s, AWS), this list can be just a subset of desired nodes since nodes might not startup in parallel. That's also not a problem as long as the following requirement is met: "The peer disovery backend does not list two disjoint sets of nodes (on different nodes) at the same time." For example, in a 2-node cluster, the peer discovery backend must not list only node 1 on node 1 and only node 2 on node 2. Existing peer discovery backends fullfil that requirement because the resource the nodes are discovered from is global. For example, in K8s, once node 1 is part of the Endpoints object, it will be returned on both node 1 and node 2. Likewise, in AWS, once node 1 started, the described list of instances with a specific tag will include node 1 when the AWS peer discovery backend runs on node 1 or node 2. Removing randomized startup delays also makes cluster formation considerably faster (up to 1 minute faster if that was the upper bound in the range). --- deps/rabbit/priv/schema/rabbit.schema | 36 +++--- deps/rabbit/src/rabbit_mnesia.erl | 15 +-- deps/rabbit/src/rabbit_nodes.erl | 20 +++ deps/rabbit/src/rabbit_peer_discovery.erl | 61 +-------- .../rabbit_peer_discovery_classic_config.erl | 56 ++++----- .../config_schema_SUITE_data/rabbit.snippets | 17 ++- .../peer_discovery_classic_config_SUITE.erl | 60 +++------ deps/rabbitmq_peer_discovery_aws/BUILD.bazel | 3 + deps/rabbitmq_peer_discovery_aws/Makefile | 2 +- .../src/rabbit_peer_discovery_aws.erl | 49 ++++++-- .../src/rabbitmq_peer_discovery_aws.erl | 4 +- .../test/aws_ecs_util.erl | 2 +- .../test/integration_SUITE.erl | 2 +- .../test/unit_SUITE.erl | 43 ++++++- .../rabbitmq_peer_discovery_consul_SUITE.erl | 6 - ...rabbitmq_peer_discovery_etcd_v3_client.erl | 2 +- .../src/rabbit_peer_discovery_k8s.erl | 118 ++++++++++-------- .../src/rabbitmq_peer_discovery_k8s.erl | 13 +- .../rabbitmq_peer_discovery_k8s_SUITE.erl | 56 +++++++-- 19 files changed, 305 insertions(+), 260 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 1a0168f7b64b..c2d2e508ca85 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -945,11 +945,11 @@ fun(Conf) -> end}. %% Cluster formation: Randomized startup delay +%% +%% DEPRECATED: This is a no-op. Old configs are still allowed, but a warning will be printed. -{mapping, "cluster_formation.randomized_startup_delay_range.min", "rabbit.cluster_formation.randomized_startup_delay_range", - [{datatype, integer}]}. -{mapping, "cluster_formation.randomized_startup_delay_range.max", "rabbit.cluster_formation.randomized_startup_delay_range", - [{datatype, integer}]}. +{mapping, "cluster_formation.randomized_startup_delay_range.min", "rabbit.cluster_formation.randomized_startup_delay_range", []}. +{mapping, "cluster_formation.randomized_startup_delay_range.max", "rabbit.cluster_formation.randomized_startup_delay_range", []}. {translation, "rabbit.cluster_formation.randomized_startup_delay_range", fun(Conf) -> @@ -957,19 +957,25 @@ fun(Conf) -> Max = cuttlefish:conf_get("cluster_formation.randomized_startup_delay_range.max", Conf, undefined), case {Min, Max} of - {undefined, undefined} -> - cuttlefish:unset(); - {undefined, Max} -> - %% fallback default - {5, Max}; - {Min, undefined} -> - %% fallback default - {Min, 60}; - {Min, Max} -> - {Min, Max} - end + {undefined, undefined} -> + ok; + _ -> + cuttlefish:warn("cluster_formation.randomized_startup_delay_range.min and " + "cluster_formation.randomized_startup_delay_range.max are deprecated") + end, + cuttlefish:unset() end}. +%% Cluster formation: lock acquisition retries as passed to https://erlang.org/doc/man/global.html#set_lock-3 +%% +%% Currently used in classic, k8s, and aws peer discovery backends. + +{mapping, "cluster_formation.internal_lock_retries", "rabbit.cluster_formation.internal_lock_retries", + [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} + ]}. + %% Cluster formation: discovery failure retries {mapping, "cluster_formation.lock_retry_limit", "rabbit.cluster_formation.lock_retry_limit", diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl index c19a52288e56..216055103fde 100644 --- a/deps/rabbit/src/rabbit_mnesia.erl +++ b/deps/rabbit/src/rabbit_mnesia.erl @@ -105,21 +105,18 @@ init_with_lock(Retries, Timeout, RunPeerDiscovery) -> rabbit_log:debug("rabbit_peer_discovery:lock returned ~p", [LockResult]), case LockResult of not_supported -> - rabbit_log:info("Peer discovery backend does not support locking, falling back to randomized delay"), - %% See rabbitmq/rabbitmq-server#1202 for details. - rabbit_peer_discovery:maybe_inject_randomized_delay(), RunPeerDiscovery(), rabbit_peer_discovery:maybe_register(); - {error, _Reason} -> - timer:sleep(Timeout), - init_with_lock(Retries - 1, Timeout, RunPeerDiscovery); {ok, Data} -> try RunPeerDiscovery(), rabbit_peer_discovery:maybe_register() after rabbit_peer_discovery:unlock(Data) - end + end; + {error, _Reason} -> + timer:sleep(Timeout), + init_with_lock(Retries - 1, Timeout, RunPeerDiscovery) end. -spec run_peer_discovery() -> ok | {[node()], node_type()}. @@ -178,7 +175,7 @@ join_discovered_peers(TryNodes, NodeType) -> join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterval). join_discovered_peers_with_retries(TryNodes, _NodeType, 0, _DelayInterval) -> - rabbit_log:warning( + rabbit_log:info( "Could not successfully contact any node of: ~s (as in Erlang distribution). " "Starting as a blank standalone node...", [string:join(lists:map(fun atom_to_list/1, TryNodes), ",")]), @@ -193,7 +190,7 @@ join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterva rabbit_node_monitor:notify_joined_cluster(); none -> RetriesLeft1 = RetriesLeft - 1, - rabbit_log:error("Trying to join discovered peers failed. Will retry after a delay of ~b ms, ~b retries left...", + rabbit_log:info("Trying to join discovered peers failed. Will retry after a delay of ~b ms, ~b retries left...", [DelayInterval, RetriesLeft1]), timer:sleep(DelayInterval), join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft1, DelayInterval) diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl index 39ba1e6e5760..ff0e28a2a136 100644 --- a/deps/rabbit/src/rabbit_nodes.erl +++ b/deps/rabbit/src/rabbit_nodes.erl @@ -15,6 +15,7 @@ boot/0]). -export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]). -export([all_running_with_hashes/0]). +-export([lock_id/1, lock_retries/0]). -include_lib("kernel/include/inet.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -23,6 +24,12 @@ -define(INTERNAL_CLUSTER_ID_PARAM_NAME, internal_cluster_id). +% Retries as passed to https://erlang.org/doc/man/global.html#set_lock-3 +% To understand how retries map to the timeout, read +% https://github.com/erlang/otp/blob/d256ae477014158a49bb860b283df9c040011197/lib/kernel/src/global.erl#L2062-L2075 +% 80 corresponds to a timeout of ca 300 seconds. +-define(DEFAULT_LOCK_RETRIES, 80). + %%---------------------------------------------------------------------------- %% API %%---------------------------------------------------------------------------- @@ -160,3 +167,16 @@ await_running_count_with_retries(TargetCount, Retries) -> -spec all_running_with_hashes() -> #{non_neg_integer() => node()}. all_running_with_hashes() -> maps:from_list([{erlang:phash2(Node), Node} || Node <- all_running()]). + +-spec lock_id(Node :: node()) -> {ResourceId :: string(), LockRequesterId :: node()}. +lock_id(Node) -> + {cookie_hash(), Node}. + +-spec lock_retries() -> integer(). +lock_retries() -> + case application:get_env(rabbit, cluster_formation) of + {ok, PropList} -> + proplists:get_value(internal_lock_retries, PropList, ?DEFAULT_LOCK_RETRIES); + undefined -> + ?DEFAULT_LOCK_RETRIES + end. diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index 3a815919420e..f386f36210bf 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -14,8 +14,7 @@ -export([maybe_init/0, discover_cluster_nodes/0, backend/0, node_type/0, normalize/1, format_discovered_nodes/1, log_configured_backend/0, register/0, unregister/0, maybe_register/0, maybe_unregister/0, - maybe_inject_randomized_delay/0, lock/0, unlock/1, - discovery_retries/0]). + lock/0, unlock/1, discovery_retries/0]). -export([append_node_prefix/1, node_prefix/0, locking_retry_timeout/0, lock_acquisition_failure_mode/0]). @@ -28,9 +27,6 @@ %% default node prefix to attach to discovered hostnames -define(DEFAULT_PREFIX, "rabbit"). -%% default randomized delay range, in seconds --define(DEFAULT_STARTUP_RANDOMIZED_DELAY, {5, 60}). - %% default discovery retries and interval. -define(DEFAULT_DISCOVERY_RETRY_COUNT, 10). -define(DEFAULT_DISCOVERY_RETRY_INTERVAL_MS, 500). @@ -159,61 +155,6 @@ discovery_retries() -> {?DEFAULT_DISCOVERY_RETRY_COUNT, ?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS} end. - --spec maybe_inject_randomized_delay() -> ok. -maybe_inject_randomized_delay() -> - Backend = backend(), - case Backend:supports_registration() of - true -> - rabbit_log:info("Peer discovery backend ~s supports registration.", [Backend]), - inject_randomized_delay(); - false -> - rabbit_log:info("Peer discovery backend ~s does not support registration, skipping randomized startup delay.", [Backend]), - ok - end. - --spec inject_randomized_delay() -> ok. - -inject_randomized_delay() -> - {Min, Max} = randomized_delay_range_in_ms(), - case {Min, Max} of - %% When the max value is set to 0, consider the delay to be disabled. - %% In addition, `rand:uniform/1` will fail with a "no function clause" - %% when the argument is 0. - {_, 0} -> - rabbit_log:info("Randomized delay range's upper bound is set to 0. Considering it disabled."), - ok; - {_, N} when is_number(N) -> - rand:seed(exsplus), - RandomVal = rand:uniform(round(N)), - rabbit_log:debug("Randomized startup delay: configured range is from ~p to ~p milliseconds, PRNG pick: ~p...", - [Min, Max, RandomVal]), - Effective = case RandomVal < Min of - true -> Min; - false -> RandomVal - end, - rabbit_log:info("Will wait for ~p milliseconds before proceeding with registration...", [Effective]), - timer:sleep(Effective), - ok - end. - --spec randomized_delay_range_in_ms() -> {integer(), integer()}. - -randomized_delay_range_in_ms() -> - Backend = backend(), - Default = case erlang:function_exported(Backend, randomized_startup_delay_range, 0) of - true -> Backend:randomized_startup_delay_range(); - false -> ?DEFAULT_STARTUP_RANDOMIZED_DELAY - end, - {Min, Max} = case application:get_env(rabbit, cluster_formation) of - {ok, Proplist} -> - proplists:get_value(randomized_startup_delay_range, Proplist, Default); - undefined -> - Default - end, - {Min * 1000, Max * 1000}. - - -spec register() -> ok. register() -> diff --git a/deps/rabbit/src/rabbit_peer_discovery_classic_config.erl b/deps/rabbit/src/rabbit_peer_discovery_classic_config.erl index 0ef976f60993..cc62dd4dd5e1 100644 --- a/deps/rabbit/src/rabbit_peer_discovery_classic_config.erl +++ b/deps/rabbit/src/rabbit_peer_discovery_classic_config.erl @@ -26,12 +26,36 @@ list_nodes() -> Nodes when is_list(Nodes) -> {ok, {Nodes, disc}} end. +-spec lock(Node :: node()) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}. + +lock(Node) -> + {ok, {Nodes, _NodeType}} = list_nodes(), + case lists:member(Node, Nodes) of + false when Nodes =/= [] -> + rabbit_log:warning("Local node ~s is not part of configured nodes ~p. " + "This might lead to incorrect cluster formation.", [Node, Nodes]); + _ -> ok + end, + LockId = rabbit_nodes:lock_id(Node), + Retries = rabbit_nodes:lock_retries(), + case global:set_lock(LockId, Nodes, Retries) of + true -> + {ok, LockId}; + false -> + {error, io_lib:format("Acquiring lock taking too long, bailing out after ~b retries", [Retries])} + end. + +-spec unlock({ResourceId :: string(), LockRequesterId :: node()}) -> ok. + +unlock(LockId) -> + {ok, {Nodes, _NodeType}} = list_nodes(), + global:del_lock(LockId, Nodes), + ok. + -spec supports_registration() -> boolean(). supports_registration() -> - %% If we don't have any nodes configured, skip randomized delay and similar operations - %% as we don't want to delay startup for no reason. MK. - has_any_peer_nodes_configured(). + false. -spec register() -> ok. @@ -47,29 +71,3 @@ unregister() -> post_registration() -> ok. - --spec lock(Node :: atom()) -> not_supported. - -lock(_Node) -> - not_supported. - --spec unlock(Data :: term()) -> ok. - -unlock(_Data) -> - ok. - -%% -%% Helpers -%% - -has_any_peer_nodes_configured() -> - case application:get_env(rabbit, cluster_nodes, []) of - {[], _NodeType} -> - false; - {Nodes, _NodeType} when is_list(Nodes) -> - true; - [] -> - false; - Nodes when is_list(Nodes) -> - true - end. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 405a2b788331..f54212c2e547 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -540,23 +540,22 @@ tcp_listen_options.exit_on_close = false", {cluster_formation_randomized_startup_delay_both_values, "cluster_formation.randomized_startup_delay_range.min = 10 cluster_formation.randomized_startup_delay_range.max = 30", - [{rabbit, [{cluster_formation, [ - {randomized_startup_delay_range, {10, 30}} - ]}]}], + [], []}, {cluster_formation_randomized_startup_delay_min_only, "cluster_formation.randomized_startup_delay_range.min = 10", - [{rabbit, [{cluster_formation, [ - {randomized_startup_delay_range, {10, 60}} - ]}]}], + [], []}, {cluster_formation_randomized_startup_delay_max_only, "cluster_formation.randomized_startup_delay_range.max = 30", - [{rabbit, [{cluster_formation, [ - {randomized_startup_delay_range, {5, 30}} - ]}]}], + [], + []}, + + {cluster_formation_internal_lock_retries, + "cluster_formation.internal_lock_retries = 10", + [{rabbit,[{cluster_formation,[{internal_lock_retries,10}]}]}], []}, {cluster_formation_dns, diff --git a/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl b/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl index 9585c466cf25..7ec621088c8f 100644 --- a/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl +++ b/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl @@ -37,7 +37,7 @@ suite() -> {timetrap, {minutes, 5}} ]. --define(TIMEOUT, 60000). +-define(TIMEOUT, 120_000). %% %% Setup/teardown. @@ -50,12 +50,6 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - init_per_testcase(successful_discovery = Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), @@ -75,7 +69,7 @@ init_per_testcase(successful_discovery = Testcase, Config) -> {rabbit, [ {cluster_nodes, {NodeNamesWithHostname, disc}}, {cluster_formation, [ - {randomized_startup_delay_range, {1, 20}} + {internal_lock_retries, 10} ]} ]}), rabbit_ct_helpers:run_steps(Config3, @@ -106,7 +100,7 @@ init_per_testcase(successful_discovery_with_a_subset_of_nodes_coming_online = Te ]}, {cluster_nodes, {NodeNamesWithHostname, disc}}, {cluster_formation, [ - {randomized_startup_delay_range, {1, 20}} + {internal_lock_retries, 10} ]} ]}), rabbit_ct_helpers:run_steps(Config3, @@ -123,7 +117,7 @@ init_per_testcase(no_nodes_configured = Testcase, Config) -> {rabbit, [ {cluster_nodes, {[], disc}}, {cluster_formation, [ - {randomized_startup_delay_range, {1, 20}} + {internal_lock_retries, 10} ]} ]}), rabbit_ct_helpers:run_steps(Config3, @@ -147,41 +141,21 @@ end_per_testcase(Testcase, Config) -> %% Test cases %% successful_discovery(Config) -> - with_retry(Config, [1, 2], fun () -> - ?awaitMatch( - {M1, M2} when length(M1) =:= 3; length(M2) =:= 3, - {cluster_members_online(Config, 0), - cluster_members_online(Config, 1)}, - ?TIMEOUT) - end). + ?awaitMatch( + {M1, M2} when length(M1) =:= 3; length(M2) =:= 3, + {cluster_members_online(Config, 0), + cluster_members_online(Config, 1)}, + ?TIMEOUT). successful_discovery_with_a_subset_of_nodes_coming_online(Config) -> - with_retry(Config, [1], fun () -> - ?awaitMatch( - {M1, M2} when length(M1) =:= 2; length(M2) =:= 2, - {cluster_members_online(Config, 0), - cluster_members_online(Config, 1)}, - ?TIMEOUT) - end). + ?awaitMatch( + {M1, M2} when length(M1) =:= 2; length(M2) =:= 2, + {cluster_members_online(Config, 0), + cluster_members_online(Config, 1)}, + ?TIMEOUT). no_nodes_configured(Config) -> - with_retry(Config, [1], fun () -> - ?awaitMatch( - M when length(M) < 2, + ?awaitMatch( + M when length(M) < 2, cluster_members_online(Config, 0), - ?TIMEOUT) - end). - -reset_and_restart_node(Config, I) when is_integer(I) andalso I >= 0 -> - Name = rabbit_ct_broker_helpers:get_node_config(Config, I, nodename), - rabbit_control_helper:command(stop_app, Name). - -with_retry(Config, Nodes, Fun) -> - try - Fun() - catch - error:{awaitMatch, _} -> - ct:pal(?LOW_IMPORTANCE, "Possible dead-lock; resetting/restarting these nodes: ~p", [Nodes]), - [reset_and_restart_node(Config, N) || N <- Nodes], - Fun() - end. + ?TIMEOUT). diff --git a/deps/rabbitmq_peer_discovery_aws/BUILD.bazel b/deps/rabbitmq_peer_discovery_aws/BUILD.bazel index 432381107aa3..a0b11a695336 100644 --- a/deps/rabbitmq_peer_discovery_aws/BUILD.bazel +++ b/deps/rabbitmq_peer_discovery_aws/BUILD.bazel @@ -82,4 +82,7 @@ rabbitmq_suite( rabbitmq_suite( name = "unit_SUITE", size = "small", + runtime_deps = [ + "@meck//:bazel_erlang_lib", + ], ) diff --git a/deps/rabbitmq_peer_discovery_aws/Makefile b/deps/rabbitmq_peer_discovery_aws/Makefile index 5f617f28eae8..6ec7bdddba33 100644 --- a/deps/rabbitmq_peer_discovery_aws/Makefile +++ b/deps/rabbitmq_peer_discovery_aws/Makefile @@ -3,7 +3,7 @@ PROJECT_DESCRIPTION = AWS-based RabbitMQ peer discovery backend LOCAL_DEPS = inets DEPS = rabbit_common rabbitmq_peer_discovery_common rabbitmq_aws rabbit -TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers ct_helper +TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers ct_helper meck dep_ct_helper = git https://github.com/extend/ct_helper.git master DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk diff --git a/deps/rabbitmq_peer_discovery_aws/src/rabbit_peer_discovery_aws.erl b/deps/rabbitmq_peer_discovery_aws/src/rabbit_peer_discovery_aws.erl index 15746ab958a8..7c355d4094c7 100644 --- a/deps/rabbitmq_peer_discovery_aws/src/rabbit_peer_discovery_aws.erl +++ b/deps/rabbitmq_peer_discovery_aws/src/rabbit_peer_discovery_aws.erl @@ -99,9 +99,7 @@ list_nodes() -> -spec supports_registration() -> boolean(). supports_registration() -> - %% see rabbitmq-peer-discovery-aws#17 - true. - + false. -spec register() -> ok. register() -> @@ -116,15 +114,44 @@ unregister() -> post_registration() -> ok. --spec lock(Node :: atom()) -> not_supported. - -lock(_Node) -> - not_supported. - --spec unlock(Data :: term()) -> ok. +-spec lock(Node :: node()) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}. -unlock(_Data) -> - ok. +lock(Node) -> + %% call list_nodes/0 externally such that meck can mock the function + case ?MODULE:list_nodes() of + {ok, {[], disc}} -> + {error, "Cannot lock since no nodes got discovered."}; + {ok, {Nodes, disc}} -> + case lists:member(Node, Nodes) of + true -> + rabbit_log:info("Will try to lock connecting to nodes ~p", [Nodes]), + LockId = rabbit_nodes:lock_id(Node), + Retries = rabbit_nodes:lock_retries(), + case global:set_lock(LockId, Nodes, Retries) of + true -> + {ok, LockId}; + false -> + {error, io_lib:format("Acquiring lock taking too long, bailing out after ~b retries", [Retries])} + end; + false -> + %% Don't try to acquire the global lock when our own node is not discoverable by peers. + %% We shouldn't run into this branch because our node is running and should have been discovered. + {error, lists:flatten(io_lib:format("Local node ~s is not part of discovered nodes ~p", [Node, Nodes]))} + end; + {error, _} = Error -> + Error + end. + +-spec unlock({ResourceId :: string(), LockRequesterId :: node()}) -> ok | {error, Reason :: string()}. + +unlock(LockId) -> + case ?MODULE:list_nodes() of + {ok, {Nodes, disc}} -> + global:del_lock(LockId, Nodes), + ok; + {error, _} = Error -> + Error + end. %% %% Implementation diff --git a/deps/rabbitmq_peer_discovery_aws/src/rabbitmq_peer_discovery_aws.erl b/deps/rabbitmq_peer_discovery_aws/src/rabbitmq_peer_discovery_aws.erl index 55e83a566f3f..05bae26c202a 100644 --- a/deps/rabbitmq_peer_discovery_aws/src/rabbitmq_peer_discovery_aws.erl +++ b/deps/rabbitmq_peer_discovery_aws/src/rabbitmq_peer_discovery_aws.erl @@ -45,10 +45,10 @@ unregister() -> post_registration() -> ?DELEGATE:post_registration(). --spec lock(Node :: atom()) -> not_supported. +-spec lock(Node :: node()) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}. lock(Node) -> ?DELEGATE:lock(Node). --spec unlock(Data :: term()) -> ok. +-spec unlock({ResourceId :: string(), LockRequesterId :: node()}) -> ok | {error, Reason :: string()}. unlock(Data) -> ?DELEGATE:unlock(Data). diff --git a/deps/rabbitmq_peer_discovery_aws/test/aws_ecs_util.erl b/deps/rabbitmq_peer_discovery_aws/test/aws_ecs_util.erl index ed71a6c6a964..02f80aa48776 100644 --- a/deps/rabbitmq_peer_discovery_aws/test/aws_ecs_util.erl +++ b/deps/rabbitmq_peer_discovery_aws/test/aws_ecs_util.erl @@ -23,7 +23,7 @@ public_dns_names/1, fetch_nodes_endpoint/2]). --define(ECS_CLUSTER_TIMEOUT, 120000). +-define(ECS_CLUSTER_TIMEOUT, 120_000). %% NOTE: %% These helpers assume certain permissions associated with the aws credentials diff --git a/deps/rabbitmq_peer_discovery_aws/test/integration_SUITE.erl b/deps/rabbitmq_peer_discovery_aws/test/integration_SUITE.erl index 97130e9c20bd..8b969908e853 100644 --- a/deps/rabbitmq_peer_discovery_aws/test/integration_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_aws/test/integration_SUITE.erl @@ -12,7 +12,7 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -define(CLUSTER_SIZE, 3). --define(TIMEOUT_MILLIS, 180000). +-define(TIMEOUT_MILLIS, 180_000). -export([all/0, suite/0, diff --git a/deps/rabbitmq_peer_discovery_aws/test/unit_SUITE.erl b/deps/rabbitmq_peer_discovery_aws/test/unit_SUITE.erl index 791f7550081a..61e8ed2561af 100644 --- a/deps/rabbitmq_peer_discovery_aws/test/unit_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_aws/test/unit_SUITE.erl @@ -14,7 +14,8 @@ all() -> [ - {group, unit} + {group, unit}, + {group, lock} ]. groups() -> @@ -23,7 +24,14 @@ groups() -> maybe_add_tag_filters, get_hostname_name_from_reservation_set, registration_support - ]}]. + ]}, + {lock, [], [ + lock_single_node, + lock_multiple_nodes, + lock_local_node_not_discovered, + lock_list_nodes_fails + ]} + ]. %%% %%% Testcases @@ -63,7 +71,36 @@ get_hostname_name_from_reservation_set(_Config) -> }. registration_support(_Config) -> - ?assertEqual(rabbit_peer_discovery_aws:supports_registration(), true). + ?assertEqual(false, rabbit_peer_discovery_aws:supports_registration()). + +lock_single_node(_Config) -> + LocalNode = node(), + meck:expect(rabbit_peer_discovery_aws, list_nodes, 0, {ok, {[LocalNode], disc}}), + + {ok, LockId} = rabbit_peer_discovery_aws:lock(LocalNode), + ?assertEqual(ok, rabbit_peer_discovery_aws:unlock(LockId)). + +lock_multiple_nodes(_Config) -> + application:set_env(rabbit, cluster_formation, [{internal_lock_retries, 2}]), + LocalNode = node(), + OtherNode = other@host, + meck:expect(rabbit_peer_discovery_aws, list_nodes, 0, {ok, {[OtherNode, LocalNode], disc}}), + + {ok, {LockResourceId, OtherNode}} = rabbit_peer_discovery_aws:lock(OtherNode), + ?assertEqual({error, "Acquiring lock taking too long, bailing out after 2 retries"}, + rabbit_peer_discovery_aws:lock(LocalNode)), + ?assertEqual(ok, rabbitmq_peer_discovery_aws:unlock({LockResourceId, OtherNode})), + ?assertEqual({ok, {LockResourceId, LocalNode}}, rabbit_peer_discovery_aws:lock(LocalNode)), + ?assertEqual(ok, rabbitmq_peer_discovery_aws:unlock({LockResourceId, LocalNode})). + +lock_local_node_not_discovered(_Config) -> + meck:expect(rabbit_peer_discovery_aws, list_nodes, 0, {ok, {[n1@host, n2@host], disc}} ), + Expectation = {error, "Local node me@host is not part of discovered nodes [n1@host,n2@host]"}, + ?assertEqual(Expectation, rabbit_peer_discovery_aws:lock(me@host)). + +lock_list_nodes_fails(_Config) -> + meck:expect(rabbit_peer_discovery_aws, list_nodes, 0, {error, "failed for some reason"}), + ?assertEqual({error, "failed for some reason"}, rabbit_peer_discovery_aws:lock(me@host)). %%% %%% Implementation diff --git a/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl b/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl index b422445c0923..acbc7690c37f 100644 --- a/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl @@ -325,7 +325,6 @@ list_nodes_return_value_basic_test(_Config) -> application:set_env(rabbit, cluster_formation, [ {peer_discovery_backend, rabbit_peer_discovery_consul}, - {randomized_startup_delay_range, {0, 1}}, {peer_discovery_consul, [ {consul_host, "localhost"}, {consul_port, 8500} @@ -344,7 +343,6 @@ list_nodes_return_value_basic_long_node_name_test(_Config) -> application:set_env(rabbit, cluster_formation, [ {peer_discovery_backend, rabbit_peer_discovery_consul}, - {randomized_startup_delay_range, {0, 1}}, {peer_discovery_consul, [ {consul_host, "localhost"}, {consul_port, 8500}, @@ -364,7 +362,6 @@ list_nodes_return_value_long_node_name_and_custom_domain_test(_Config) -> application:set_env(rabbit, cluster_formation, [ {peer_discovery_backend, rabbit_peer_discovery_consul}, - {randomized_startup_delay_range, {0, 1}}, {peer_discovery_consul, [ {consul_host, "localhost"}, {consul_port, 8500}, @@ -385,7 +382,6 @@ list_nodes_return_value_srv_address_test(_Config) -> application:set_env(rabbit, cluster_formation, [ {peer_discovery_backend, rabbit_peer_discovery_consul}, - {randomized_startup_delay_range, {0, 1}}, {peer_discovery_consul, [ {consul_host, "localhost"}, {consul_port, 8500} @@ -404,7 +400,6 @@ list_nodes_return_value_nodes_in_warning_state_included_test(_Config) -> application:set_env(rabbit, cluster_formation, [ {peer_discovery_backend, rabbit_peer_discovery_consul}, - {randomized_startup_delay_range, {0, 1}}, {peer_discovery_consul, [ {consul_host, "localhost"}, {consul_port, 8500} @@ -425,7 +420,6 @@ list_nodes_return_value_nodes_in_warning_state_filtered_out_test(_Config) -> application:set_env(rabbit, cluster_formation, [ {peer_discovery_backend, rabbit_peer_discovery_consul}, - {randomized_startup_delay_range, {0, 1}}, {peer_discovery_consul, [ {consul_host, "localhost"}, {consul_port, 8500} diff --git a/deps/rabbitmq_peer_discovery_etcd/src/rabbitmq_peer_discovery_etcd_v3_client.erl b/deps/rabbitmq_peer_discovery_etcd/src/rabbitmq_peer_discovery_etcd_v3_client.erl index 96cd3cdfa4ca..60855086f3a7 100644 --- a/deps/rabbitmq_peer_discovery_etcd/src/rabbitmq_peer_discovery_etcd_v3_client.erl +++ b/deps/rabbitmq_peer_discovery_etcd/src/rabbitmq_peer_discovery_etcd_v3_client.erl @@ -39,7 +39,7 @@ %% don't allow node lease key TTL to be lower than this %% as overly low values can cause annoying timeouts in etcd client operations -define(MINIMUM_NODE_KEY_LEASE_TTL, 15). -%% default randomized delay range is 5s to 60s, so this value +%% default randomized delay range was 5s to 60s, so this value %% produces a comparable delay -define(DEFAULT_LOCK_WAIT_TTL, 70). %% don't allow lock lease TTL to be lower than this diff --git a/deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl b/deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl index d238de97575e..7075d0e5ca01 100644 --- a/deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl +++ b/deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl @@ -14,9 +14,9 @@ -include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl"). -include("rabbit_peer_discovery_k8s.hrl"). + -export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0, - post_registration/0, lock/1, unlock/1, randomized_startup_delay_range/0, - send_event/3, generate_v1_event/7]). + post_registration/0, lock/1, unlock/1, send_event/3, generate_v1_event/7]). -ifdef(TEST). -compile(export_all). @@ -38,24 +38,27 @@ init() -> -spec list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}. list_nodes() -> - case make_request() of - {ok, Response} -> - Addresses = extract_node_list(Response), - {ok, {lists:map(fun node_name/1, Addresses), disc}}; - {error, Reason} -> - Details = io_lib:format("Failed to fetch a list of nodes from Kubernetes API: ~s", [Reason]), - rabbit_log:error(Details), - send_event("Warning", "Failed", Details), - {error, Reason} - end. + case make_request() of + {ok, Response} -> + Addresses = extract_node_list(Response), + Nodes = lists:map(fun node_name/1, Addresses), + {ok, {Nodes, disc}}; + {error, Reason} -> + Details = io_lib:format("Failed to fetch a list of nodes from Kubernetes API: ~s", [Reason]), + rabbit_log:error(Details), + send_event("Warning", "Failed", Details), + {error, Reason} + end. -spec supports_registration() -> boolean(). supports_registration() -> - %% see rabbitmq-peer-discovery-aws#17, - %% rabbitmq-peer-discovery-k8s#23 - true. + true. %% to send event in post_registration/0 +-spec post_registration() -> ok | {error, Reason :: string()}. +post_registration() -> + Details = io_lib:format("Node ~s is registered", [node()]), + send_event("Normal", "Created", Details). -spec register() -> ok. register() -> @@ -65,28 +68,43 @@ register() -> unregister() -> ok. --spec post_registration() -> ok | {error, Reason :: string()}. -post_registration() -> - Details = io_lib:format("Node ~s is registered", [node()]), - send_event("Normal", "Created", Details). - --spec lock(Node :: atom()) -> not_supported. - -lock(_Node) -> - not_supported. - --spec unlock(Data :: term()) -> ok. - -unlock(_Data) -> - ok. - --spec randomized_startup_delay_range() -> {integer(), integer()}. - -randomized_startup_delay_range() -> - %% Pods in a stateful set are initialized one by one, - %% so RSD is not really necessary for this plugin. - %% See https://www.rabbitmq.com/cluster-formation.html#peer-discovery-k8s for details. - {0, 2}. +-spec lock(Node :: node()) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}. + +lock(Node) -> + %% call list_nodes/0 externally such that meck can mock the function + case ?MODULE:list_nodes() of + {ok, {Nodes, disc}} -> + case lists:member(Node, Nodes) of + true -> + rabbit_log:info("Will try to lock connecting to nodes ~p", [Nodes]), + LockId = rabbit_nodes:lock_id(Node), + Retries = rabbit_nodes:lock_retries(), + case global:set_lock(LockId, Nodes, Retries) of + true -> + {ok, LockId}; + false -> + {error, io_lib:format("Acquiring lock taking too long, bailing out after ~b retries", [Retries])} + end; + false -> + %% Don't try to acquire the global lock when local node is not discoverable by peers. + %% This branch is just an additional safety check. We should never run into this branch + %% because the local Pod is in state 'Running' and we listed both ready and not-ready addresses. + {error, lists:flatten(io_lib:format("Local node ~s is not part of discovered nodes ~p", [Node, Nodes]))} + end; + {error, _} = Error -> + Error + end. + +-spec unlock({ResourceId :: string(), LockRequesterId :: node()}) -> ok | {error, Reason :: string()}. + +unlock(LockId) -> + case ?MODULE:list_nodes() of + {ok, {Nodes, disc}} -> + global:del_lock(LockId, Nodes), + ok; + {error, _} = Error -> + Error + end. %% %% Implementation @@ -125,32 +143,28 @@ node_name(Address) -> ?UTIL_MODULE:node_name( ?UTIL_MODULE:as_string(Address) ++ get_config_key(k8s_hostname_suffix, M)). - -%% @spec maybe_ready_address(k8s_subsets()) -> list() -%% @doc Return a list of ready nodes -%% SubSet can contain also "notReadyAddresses" +%% @spec address(k8s_subsets()) -> list() +%% @doc Return a list of both ready and not-ready nodes. +%% For the purpose of peer discovery, consider both ready and not-ready addresses. +%% Discover peers as quickly as possible not waiting for their readiness check to succeed. %% @end %% --spec maybe_ready_address([map()]) -> list(). +-spec address([map()]) -> list(). -maybe_ready_address(Subset) -> - case maps:get(<<"notReadyAddresses">>, Subset, undefined) of - undefined -> ok; - NotReadyAddresses -> - Formatted = string:join([binary_to_list(get_address(X)) || X <- NotReadyAddresses], ", "), - rabbit_log:info("k8s endpoint listing returned nodes not yet ready: ~s", [Formatted]) - end, - maps:get(<<"addresses">>, Subset, []). +address(Subset) -> + maps:get(<<"notReadyAddresses">>, Subset, []) ++ + maps:get(<<"addresses">>, Subset, []). %% @doc Return a list of nodes -%% see https://kubernetes.io/docs/api-reference/v1/definitions/#_v1_endpoints +%% "The set of all endpoints is the union of all subsets." +%% https://kubernetes.io/docs/reference/kubernetes-api/service-resources/endpoints-v1/ %% @end %% -spec extract_node_list(map()) -> list(). extract_node_list(Response) -> IpLists = [[get_address(Address) - || Address <- maybe_ready_address(Subset)] || Subset <- maps:get(<<"subsets">>, Response, [])], + || Address <- address(Subset)] || Subset <- maps:get(<<"subsets">>, Response, [])], sets:to_list(sets:union(lists:map(fun sets:from_list/1, IpLists))). diff --git a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s.erl b/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s.erl index f8c198e1327b..e4a718088b4e 100644 --- a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s.erl +++ b/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s.erl @@ -5,15 +5,14 @@ %% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. %% -%% This module exists as an alias for rabbit_peer_discovery_aws. +%% This module exists as an alias for rabbit_peer_discovery_k8s. %% Some users assume that the discovery module is the same as plugin %% name. This module tries to fill the naming gap between module and plugin names. -module(rabbitmq_peer_discovery_k8s). -behaviour(rabbit_peer_discovery_backend). -export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0, - post_registration/0, lock/1, unlock/1, randomized_startup_delay_range/0, - send_event/3, generate_v1_event/7]). + post_registration/0, lock/1, unlock/1, send_event/3, generate_v1_event/7]). -define(DELEGATE, rabbit_peer_discovery_k8s). @@ -45,18 +44,14 @@ unregister() -> post_registration() -> ?DELEGATE:post_registration(). --spec lock(Node :: atom()) -> not_supported. +-spec lock(Node :: node()) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}. lock(Node) -> ?DELEGATE:lock(Node). --spec unlock(Data :: term()) -> ok. +-spec unlock({ResourceId :: string(), LockRequesterId :: node()}) -> ok | {error, Reason :: string()}. unlock(Data) -> ?DELEGATE:unlock(Data). --spec randomized_startup_delay_range() -> {integer(), integer()}. -randomized_startup_delay_range() -> - ?DELEGATE:randomized_startup_delay_range(). - generate_v1_event(Namespace, Name, Type, Message, Reason, Timestamp, HostName) -> ?DELEGATE:generate_v1_event(Namespace, Name, Type, Message, Reason, Timestamp, HostName). diff --git a/deps/rabbitmq_peer_discovery_k8s/test/rabbitmq_peer_discovery_k8s_SUITE.erl b/deps/rabbitmq_peer_discovery_k8s/test/rabbitmq_peer_discovery_k8s_SUITE.erl index 97b7326a7a70..45b41f802bbe 100644 --- a/deps/rabbitmq_peer_discovery_k8s/test/rabbitmq_peer_discovery_k8s_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_k8s/test/rabbitmq_peer_discovery_k8s_SUITE.erl @@ -14,9 +14,14 @@ -include_lib("eunit/include/eunit.hrl"). +%% rabbitmq/cluster-operator contains an implicit integration test +%% for the rabbitmq_peer_discovery_k8s plugin added by +%% https://github.com/rabbitmq/cluster-operator/pull/704 + all() -> [ - {group, unit} + {group, unit}, + {group, lock} ]. groups() -> @@ -31,15 +36,21 @@ groups() -> node_name_suffix_test, registration_support, event_v1_test - ]}]. + ]}, + {lock, [], [ + lock_single_node, + lock_multiple_nodes, + lock_local_node_not_discovered, + lock_list_nodes_fails + ]} + ]. init_per_testcase(T, Config) when T == node_name_empty_test; T == node_name_suffix_test -> meck:new(net_kernel, [passthrough, unstick]), meck:expect(net_kernel, longnames, fun() -> true end), Config; -init_per_testcase(_, Config) -> - Config. +init_per_testcase(_, Config) -> Config. end_per_testcase(_, _Config) -> meck:unload(), @@ -52,7 +63,7 @@ end_per_testcase(_, _Config) -> %%% registration_support(_Config) -> - ?assertEqual(rabbit_peer_discovery_k8s:supports_registration(), true). + ?assertEqual(true, rabbit_peer_discovery_k8s:supports_registration()). extract_node_list_long_test(_Config) -> {ok, Response} = @@ -92,9 +103,10 @@ extract_node_list_with_not_ready_addresses_test(_Config) -> {ok, Response} = rabbit_json:try_decode( rabbit_data_coercion:to_binary( - "{\"kind\":\"Endpoints\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"rabbitmq\",\"namespace\":\"test-rabbitmq\",\"selfLink\":\"\/api\/v1\/namespaces\/test-rabbitmq\/endpoints\/rabbitmq\",\"uid\":\"4ff733b8-3ad2-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170098\",\"creationTimestamp\":\"2017-05-17T07:27:41Z\",\"labels\":{\"app\":\"rabbitmq\",\"type\":\"LoadBalancer\"}},\"subsets\":[{\"notReadyAddresses\":[{\"ip\":\"172.17.0.2\",\"hostname\":\"rabbitmq-0\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-0\",\"uid\":\"e980fe5a-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170044\"}},{\"ip\":\"172.17.0.4\",\"hostname\":\"rabbitmq-1\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-1\",\"uid\":\"f6285603-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170071\"}},{\"ip\":\"172.17.0.5\",\"hostname\":\"rabbitmq-2\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-2\",\"uid\":\"fd5a86dc-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170096\"}}],\"ports\":[{\"name\":\"amqp\",\"port\":5672,\"protocol\":\"TCP\"},{\"name\":\"http\",\"port\":15672,\"protocol\":\"TCP\"}]}]}")), - Expectation = [], - ?assertEqual(Expectation, rabbit_peer_discovery_k8s:extract_node_list(Response)). + "{\"kind\":\"Endpoints\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"rabbitmq\",\"namespace\":\"test-rabbitmq\",\"selfLink\":\"\/api\/v1\/namespaces\/test-rabbitmq\/endpoints\/rabbitmq\",\"uid\":\"4ff733b8-3ad2-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170098\",\"creationTimestamp\":\"2017-05-17T07:27:41Z\",\"labels\":{\"app\":\"rabbitmq\",\"type\":\"LoadBalancer\"}},\"subsets\":[{\"addresses\":[{\"ip\":\"10.1.29.8\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"default\",\"name\":\"mariadb-tco7k\",\"uid\":\"fb59cc71-558c-11e6-86e9-ecf4bbd91e6c\",\"resourceVersion\":\"13034802\"}}],\"ports\":[{\"name\":\"mysql\",\"port\":3306,\"protocol\":\"TCP\"}]},{\"notReadyAddresses\":[{\"ip\":\"172.17.0.2\",\"hostname\":\"rabbitmq-0\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-0\",\"uid\":\"e980fe5a-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170044\"}},{\"ip\":\"172.17.0.4\",\"hostname\":\"rabbitmq-1\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-1\",\"uid\":\"f6285603-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170071\"}},{\"ip\":\"172.17.0.5\",\"hostname\":\"rabbitmq-2\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-2\",\"uid\":\"fd5a86dc-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170096\"}}],\"ports\":[{\"name\":\"amqp\",\"port\":5672,\"protocol\":\"TCP\"},{\"name\":\"http\",\"port\":15672,\"protocol\":\"TCP\"}]}]}")), + Expectation = [<<"10.1.29.8">>, + <<"172.17.0.2">>, <<"172.17.0.4">>, <<"172.17.0.5">>], + ?assertEqual(Expectation, lists:sort(rabbit_peer_discovery_k8s:extract_node_list(Response))). node_name_empty_test(_Config) -> Expectation = 'rabbit@rabbitmq-0', @@ -130,3 +142,31 @@ event_v1_test(_Config) -> ?assertEqual(Expectation, rabbit_peer_discovery_k8s:generate_v1_event(<<"namespace">>, "test", "Normal", "Reason", "MyMessage", "2019-12-06T15:10:23+00:00", "MyHostName")). + +lock_single_node(_Config) -> + LocalNode = node(), + meck:expect(rabbit_peer_discovery_k8s, list_nodes, 0, {ok, {[LocalNode], disc}}), + + {ok, LockId} = rabbit_peer_discovery_k8s:lock(LocalNode), + ?assertEqual(ok, rabbit_peer_discovery_k8s:unlock(LockId)). + +lock_multiple_nodes(_Config) -> + application:set_env(rabbit, cluster_formation, [{internal_lock_retries, 2}]), + LocalNode = node(), + OtherNode = other@host, + meck:expect(rabbit_peer_discovery_k8s, list_nodes, 0, {ok, {[OtherNode, LocalNode], disc}}), + + {ok, {LockResourceId, OtherNode}} = rabbit_peer_discovery_k8s:lock(OtherNode), + ?assertEqual({error, "Acquiring lock taking too long, bailing out after 2 retries"}, rabbit_peer_discovery_k8s:lock(LocalNode)), + ?assertEqual(ok, rabbitmq_peer_discovery_k8s:unlock({LockResourceId, OtherNode})), + ?assertEqual({ok, {LockResourceId, LocalNode}}, rabbit_peer_discovery_k8s:lock(LocalNode)), + ?assertEqual(ok, rabbitmq_peer_discovery_k8s:unlock({LockResourceId, LocalNode})). + +lock_local_node_not_discovered(_Config) -> + meck:expect(rabbit_peer_discovery_k8s, list_nodes, 0, {ok, {[n1@host, n2@host], disc}} ), + Expectation = {error, "Local node me@host is not part of discovered nodes [n1@host,n2@host]"}, + ?assertEqual(Expectation, rabbit_peer_discovery_k8s:lock(me@host)). + +lock_list_nodes_fails(_Config) -> + meck:expect(rabbit_peer_discovery_k8s, list_nodes, 0, {error, "K8s API unavailable"}), + ?assertEqual({error, "K8s API unavailable"}, rabbit_peer_discovery_k8s:lock(me@host)).