Skip to content

Commit

Permalink
Merge pull request #12611 from rabbitmq/mergify/bp/v4.0.x/pr-12610
Browse files Browse the repository at this point in the history
Fix rabbitmq_federation queue_SUITE flake (backport #12610)
  • Loading branch information
michaelklishin authored Oct 29, 2024
2 parents 1fd9e33 + 408e4f1 commit 29b426e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,20 @@ run_federated(Config) ->
timer:sleep(3000),
{stream, [Props]} = ?CMD:run([], Opts#{only_down => false}),
<<"upstream">> = proplists:get_value(upstream_queue, Props),
<<"fed.downstream">> = proplists:get_value(queue, Props),
<<"fed1.downstream">> = proplists:get_value(queue, Props),
<<"fed.tag">> = proplists:get_value(consumer_tag, Props),
running = proplists:get_value(status, Props)
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]),
rabbit_federation_test_util:q(<<"fed1.downstream">>)]),
%% Down
rabbit_federation_test_util:with_ch(
Config,
fun(_) ->
{stream, []} = ?CMD:run([], Opts#{only_down => true})
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]).
rabbit_federation_test_util:q(<<"fed1.downstream">>)]).

run_down_federated(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand All @@ -128,7 +128,7 @@ run_down_federated(Config) ->
end, 15000)
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]),
rabbit_federation_test_util:q(<<"fed1.downstream">>)]),
%% Down
rabbit_federation_test_util:with_ch(
Config,
Expand All @@ -142,12 +142,12 @@ run_down_federated(Config) ->
end, 15000)
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]).
rabbit_federation_test_util:q(<<"fed1.downstream">>)]).

output_federated(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A},
Input = {stream,[[{queue, <<"fed.downstream">>},
Input = {stream,[[{queue, <<"fed1.downstream">>},
{consumer_tag, <<"fed.tag">>},
{upstream_queue, <<"upstream">>},
{type, queue},
Expand All @@ -157,7 +157,7 @@ output_federated(Config) ->
{local_connection, <<"<[email protected]>">>},
{uri, <<"amqp://localhost:21000">>},
{timestamp, {{2016,11,21},{8,51,19}}}]]},
{stream, [#{queue := <<"fed.downstream">>,
{stream, [#{queue := <<"fed1.downstream">>,
upstream_queue := <<"upstream">>,
type := queue,
vhost := <<"/">>,
Expand Down
48 changes: 25 additions & 23 deletions deps/rabbitmq_federation/test/queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ end_per_testcase(Testcase, Config) ->
simple(Config) ->
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>)
end, upstream_downstream(Config)).

multiple_upstreams_pattern(Config) ->
Expand Down Expand Up @@ -200,9 +200,9 @@ multiple_downstreams(Config) ->
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT)
end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]).
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>, ?EXPECT_FEDERATION_TIMEOUT)
end, upstream_downstream(Config) ++ [q(<<"fed2.downstream">>, Args)]).

message_flow(Config) ->
%% TODO: specifc source / target here
Expand Down Expand Up @@ -236,11 +236,11 @@ dynamic_reconfiguration(Config) ->
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),

%% Test that clearing connections works
clear_upstream(Config, 0, <<"localhost">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed1.downstream">>),

%% Test that reading them and changing them works
set_upstream(Config, 0,
Expand All @@ -249,45 +249,46 @@ dynamic_reconfiguration(Config) ->
URI = rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr]),
set_upstream(Config, 0, <<"localhost">>, URI),
set_upstream(Config, 0, <<"localhost">>, URI),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>)
end, upstream_downstream(Config)).

federate_unfederate(Config) ->
Args = ?config(target_queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed1.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),

%% clear the policy
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"fed">>),

expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream2">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed1.downstream">>),
expect_no_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>),

rabbit_ct_broker_helpers:set_policy(Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [
<<"fed">>, <<"^fed1\.">>, <<"all">>, [
{<<"federation-upstream-set">>, <<"upstream">>}])
end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]).
end, upstream_downstream(Config) ++ [q(<<"fed2.downstream">>, Args)]).

dynamic_plugin_stop_start(Config) ->
DownQ2 = <<"fed.downstream2">>,
DownQ2 = <<"fed2.downstream">>,
Args = ?config(target_queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
UpQ = <<"upstream">>,
DownQ1 = <<"fed.downstream">>,
expect_federation(Ch, UpQ, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),
UpQ1 = <<"upstream">>,
UpQ2 = <<"upstream2">>,
DownQ1 = <<"fed1.downstream">>,
expect_federation(Ch, UpQ1, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, UpQ2, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),

%% Disable the plugin, the link disappears
ct:pal("Stopping rabbitmq_federation"),
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, "rabbitmq_federation"),

expect_no_federation(Ch, UpQ, DownQ1),
expect_no_federation(Ch, UpQ, DownQ2),
expect_no_federation(Ch, UpQ1, DownQ1),
expect_no_federation(Ch, UpQ2, DownQ2),

maybe_declare_queue(Config, Ch, q(DownQ1, Args)),
maybe_declare_queue(Config, Ch, q(DownQ2, Args)),
Expand All @@ -305,12 +306,13 @@ dynamic_plugin_stop_start(Config) ->
Entry || Entry <- Status,
proplists:get_value(queue, Entry) =:= DownQ1 orelse
proplists:get_value(queue, Entry) =:= DownQ2,
proplists:get_value(upstream_queue, Entry) =:= UpQ,
proplists:get_value(upstream_queue, Entry) =:= UpQ1 orelse
proplists:get_value(upstream_queue, Entry) =:= UpQ2,
proplists:get_value(status, Entry) =:= running
],
length(L) =:= 2
end),
expect_federation(Ch, UpQ, DownQ1, 120000)
expect_federation(Ch, UpQ1, DownQ1, 120000)
end, upstream_downstream(Config) ++ [q(DownQ2, Args)]).

restart_upstream(Config) ->
Expand Down Expand Up @@ -392,4 +394,4 @@ upstream_downstream() ->
upstream_downstream(Config) ->
SourceArgs = ?config(source_queue_args, Config),
TargetArgs = ?config(target_queue_args, Config),
[q(<<"upstream">>, SourceArgs), q(<<"fed.downstream">>, TargetArgs)].
[q(<<"upstream">>, SourceArgs), q(<<"fed1.downstream">>, TargetArgs)].
13 changes: 9 additions & 4 deletions deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ setup_federation_with_upstream_params(Config, ExtraParams) ->

rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_policy, set,
[<<"/">>, <<"fed">>, <<"^fed\.">>, [{<<"federation-upstream-set">>, <<"upstream">>}],
[<<"/">>, <<"fed">>, <<"^fed1\.">>, [{<<"federation-upstream-set">>, <<"upstream">>}],
0, <<"all">>, <<"acting-user">>]),

rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_policy, set,
[<<"/">>, <<"fed12">>, <<"^fed12\.">>, [{<<"federation-upstream-set">>, <<"upstream12">>}],
[<<"/">>, <<"fed2">>, <<"^fed2\.">>, [{<<"federation-upstream-set">>, <<"upstream2">>}],
0, <<"all">>, <<"acting-user">>]),

rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_policy, set,
[<<"/">>, <<"fed12">>, <<"^fed3\.">>, [{<<"federation-upstream-set">>, <<"upstream12">>}],
2, <<"all">>, <<"acting-user">>]),

rabbit_ct_broker_helpers:set_policy(Config, 0,
Expand Down Expand Up @@ -144,10 +149,10 @@ setup_down_federation(Config) ->
{<<"queue">>, <<"upstream">>}]]),
rabbit_ct_broker_helpers:set_policy(
Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
<<"fed">>, <<"^fed1\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
rabbit_ct_broker_helpers:set_policy(
Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
<<"fed">>, <<"^fed1\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
Config.

wait_for_federation(Retries, Fun) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ run(Config) ->
ok = ?CMD:run([Id], Opts)
end,
[rabbit_federation_test_util:q(<<"upstream">>),
rabbit_federation_test_util:q(<<"fed.downstream">>)]).
rabbit_federation_test_util:q(<<"fed1.downstream">>)]).

run_not_found(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down

0 comments on commit 29b426e

Please sign in to comment.