diff --git a/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl b/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl index 229afd494d4d..eff5e969be4b 100644 --- a/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl +++ b/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl @@ -95,12 +95,12 @@ run_federated(Config) -> timer:sleep(3000), {stream, [Props]} = ?CMD:run([], Opts#{only_down => false}), <<"upstream">> = proplists:get_value(upstream_queue, Props), - <<"fed.downstream">> = proplists:get_value(queue, Props), + <<"fed1.downstream">> = proplists:get_value(queue, Props), <<"fed.tag">> = proplists:get_value(consumer_tag, Props), running = proplists:get_value(status, Props) end, [rabbit_federation_test_util:q(<<"upstream">>), - rabbit_federation_test_util:q(<<"fed.downstream">>)]), + rabbit_federation_test_util:q(<<"fed1.downstream">>)]), %% Down rabbit_federation_test_util:with_ch( Config, @@ -108,7 +108,7 @@ run_federated(Config) -> {stream, []} = ?CMD:run([], Opts#{only_down => true}) end, [rabbit_federation_test_util:q(<<"upstream">>), - rabbit_federation_test_util:q(<<"fed.downstream">>)]). + rabbit_federation_test_util:q(<<"fed1.downstream">>)]). run_down_federated(Config) -> [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -128,7 +128,7 @@ run_down_federated(Config) -> end, 15000) end, [rabbit_federation_test_util:q(<<"upstream">>), - rabbit_federation_test_util:q(<<"fed.downstream">>)]), + rabbit_federation_test_util:q(<<"fed1.downstream">>)]), %% Down rabbit_federation_test_util:with_ch( Config, @@ -142,12 +142,12 @@ run_down_federated(Config) -> end, 15000) end, [rabbit_federation_test_util:q(<<"upstream">>), - rabbit_federation_test_util:q(<<"fed.downstream">>)]). + rabbit_federation_test_util:q(<<"fed1.downstream">>)]). output_federated(Config) -> [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Opts = #{node => A}, - Input = {stream,[[{queue, <<"fed.downstream">>}, + Input = {stream,[[{queue, <<"fed1.downstream">>}, {consumer_tag, <<"fed.tag">>}, {upstream_queue, <<"upstream">>}, {type, queue}, @@ -157,7 +157,7 @@ output_federated(Config) -> {local_connection, <<"">>}, {uri, <<"amqp://localhost:21000">>}, {timestamp, {{2016,11,21},{8,51,19}}}]]}, - {stream, [#{queue := <<"fed.downstream">>, + {stream, [#{queue := <<"fed1.downstream">>, upstream_queue := <<"upstream">>, type := queue, vhost := <<"/">>, diff --git a/deps/rabbitmq_federation/test/queue_SUITE.erl b/deps/rabbitmq_federation/test/queue_SUITE.erl index 77afe87a1236..60779fc3fdf2 100644 --- a/deps/rabbitmq_federation/test/queue_SUITE.erl +++ b/deps/rabbitmq_federation/test/queue_SUITE.erl @@ -160,7 +160,7 @@ end_per_testcase(Testcase, Config) -> simple(Config) -> with_ch(Config, fun (Ch) -> - expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) + expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>) end, upstream_downstream(Config)). multiple_upstreams_pattern(Config) -> @@ -200,9 +200,9 @@ multiple_downstreams(Config) -> with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), - expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), - expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT) - end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]). + expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>, ?EXPECT_FEDERATION_TIMEOUT) + end, upstream_downstream(Config) ++ [q(<<"fed2.downstream">>, Args)]). message_flow(Config) -> %% TODO: specifc source / target here @@ -236,11 +236,11 @@ dynamic_reconfiguration(Config) -> with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), - expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), %% Test that clearing connections works clear_upstream(Config, 0, <<"localhost">>), - expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>), + expect_no_federation(Ch, <<"upstream">>, <<"fed1.downstream">>), %% Test that reading them and changing them works set_upstream(Config, 0, @@ -249,7 +249,7 @@ dynamic_reconfiguration(Config) -> URI = rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr]), set_upstream(Config, 0, <<"localhost">>, URI), set_upstream(Config, 0, <<"localhost">>, URI), - expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) + expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>) end, upstream_downstream(Config)). federate_unfederate(Config) -> @@ -257,37 +257,38 @@ federate_unfederate(Config) -> with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), - expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), - expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), %% clear the policy rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"fed">>), - expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>), - expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream2">>), + expect_no_federation(Ch, <<"upstream">>, <<"fed1.downstream">>), + expect_no_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>), rabbit_ct_broker_helpers:set_policy(Config, 0, - <<"fed">>, <<"^fed\.">>, <<"all">>, [ + <<"fed">>, <<"^fed1\.">>, <<"all">>, [ {<<"federation-upstream-set">>, <<"upstream">>}]) - end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]). + end, upstream_downstream(Config) ++ [q(<<"fed2.downstream">>, Args)]). dynamic_plugin_stop_start(Config) -> - DownQ2 = <<"fed.downstream2">>, + DownQ2 = <<"fed2.downstream">>, Args = ?config(target_queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), - UpQ = <<"upstream">>, - DownQ1 = <<"fed.downstream">>, - expect_federation(Ch, UpQ, DownQ1, ?EXPECT_FEDERATION_TIMEOUT), - expect_federation(Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT), + UpQ1 = <<"upstream">>, + UpQ2 = <<"upstream2">>, + DownQ1 = <<"fed1.downstream">>, + expect_federation(Ch, UpQ1, DownQ1, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, UpQ2, DownQ2, ?EXPECT_FEDERATION_TIMEOUT), %% Disable the plugin, the link disappears ct:pal("Stopping rabbitmq_federation"), ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, "rabbitmq_federation"), - expect_no_federation(Ch, UpQ, DownQ1), - expect_no_federation(Ch, UpQ, DownQ2), + expect_no_federation(Ch, UpQ1, DownQ1), + expect_no_federation(Ch, UpQ2, DownQ2), maybe_declare_queue(Config, Ch, q(DownQ1, Args)), maybe_declare_queue(Config, Ch, q(DownQ2, Args)), @@ -305,12 +306,13 @@ dynamic_plugin_stop_start(Config) -> Entry || Entry <- Status, proplists:get_value(queue, Entry) =:= DownQ1 orelse proplists:get_value(queue, Entry) =:= DownQ2, - proplists:get_value(upstream_queue, Entry) =:= UpQ, + proplists:get_value(upstream_queue, Entry) =:= UpQ1 orelse + proplists:get_value(upstream_queue, Entry) =:= UpQ2, proplists:get_value(status, Entry) =:= running ], length(L) =:= 2 end), - expect_federation(Ch, UpQ, DownQ1, 120000) + expect_federation(Ch, UpQ1, DownQ1, 120000) end, upstream_downstream(Config) ++ [q(DownQ2, Args)]). restart_upstream(Config) -> @@ -392,4 +394,4 @@ upstream_downstream() -> upstream_downstream(Config) -> SourceArgs = ?config(source_queue_args, Config), TargetArgs = ?config(target_queue_args, Config), - [q(<<"upstream">>, SourceArgs), q(<<"fed.downstream">>, TargetArgs)]. + [q(<<"upstream">>, SourceArgs), q(<<"fed1.downstream">>, TargetArgs)]. diff --git a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl index 209cbb2b3faa..250f8fcbdca5 100644 --- a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl +++ b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl @@ -96,12 +96,17 @@ setup_federation_with_upstream_params(Config, ExtraParams) -> rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_policy, set, - [<<"/">>, <<"fed">>, <<"^fed\.">>, [{<<"federation-upstream-set">>, <<"upstream">>}], + [<<"/">>, <<"fed">>, <<"^fed1\.">>, [{<<"federation-upstream-set">>, <<"upstream">>}], 0, <<"all">>, <<"acting-user">>]), rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_policy, set, - [<<"/">>, <<"fed12">>, <<"^fed12\.">>, [{<<"federation-upstream-set">>, <<"upstream12">>}], + [<<"/">>, <<"fed2">>, <<"^fed2\.">>, [{<<"federation-upstream-set">>, <<"upstream2">>}], + 0, <<"all">>, <<"acting-user">>]), + + rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_policy, set, + [<<"/">>, <<"fed12">>, <<"^fed3\.">>, [{<<"federation-upstream-set">>, <<"upstream12">>}], 2, <<"all">>, <<"acting-user">>]), rabbit_ct_broker_helpers:set_policy(Config, 0, @@ -144,10 +149,10 @@ setup_down_federation(Config) -> {<<"queue">>, <<"upstream">>}]]), rabbit_ct_broker_helpers:set_policy( Config, 0, - <<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]), + <<"fed">>, <<"^fed1\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]), rabbit_ct_broker_helpers:set_policy( Config, 0, - <<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]), + <<"fed">>, <<"^fed1\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]), Config. wait_for_federation(Retries, Fun) -> diff --git a/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl b/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl index f7c1d14a8def..2b504e8d347b 100644 --- a/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl +++ b/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl @@ -87,7 +87,7 @@ run(Config) -> ok = ?CMD:run([Id], Opts) end, [rabbit_federation_test_util:q(<<"upstream">>), - rabbit_federation_test_util:q(<<"fed.downstream">>)]). + rabbit_federation_test_util:q(<<"fed1.downstream">>)]). run_not_found(Config) -> [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),