Skip to content

Commit

Permalink
Support handle-max
Browse files Browse the repository at this point in the history
 ## What?
1. Support `handle-max` field in the AMQP 1.0 `begin` frame
2. Add a new setting `link_max_per_session` which defaults to 256.
3. Rename `session_max` to `session_max_per_connection`

 ## Why?
1. Operators might want to limit the number of links per session. A
   similar setting `consumer_max_per_channel` exists for AMQP 0.9.1.
2. We should use RabbitMQ 4.0 as an opportunity to set a sensible
   default as to how many links can be active on a given session simultaneously.
   The session code does iterate over every link in some scenarios (e.g.
   queue was deleted). At some point, it's better to just open 2nd
   session instead of attaching hundreds or thousands of links to a single session.
   A default `link_max_per_session` of 256 should be more than enough given
   that `session_max_per_connection` is 64. So, the defaults allow
   `256 * 64 = 16,384` links to be active on an AMQP 1.0 connection.
   (Operators might want to lower both defaults.)
3. The name is clearer given that we might introduce
   `session_max_per_node` in the future since
   `channel_max_per_node` exists for AMQP 0.9.1.

 ### Additional Context
> Link handles MAY be reused once a link is closed for both send and receive.

> To make it easier to monitor AMQP link attach frames, it is RECOMMENDED that
> implementations always assign the lowest available handle to this field.
  • Loading branch information
ansd committed Sep 6, 2024
1 parent 1fb2206 commit b2db635
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 316 deletions.
3 changes: 2 additions & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ _APP_ENV = """[
{frame_max, 131072},
%% see rabbitmq-server#1593
{channel_max, 2047},
{session_max, 64},
{session_max_per_connection, 64},
{link_max_per_session, 256},
{ranch_connection_max, infinity},
{heartbeat, 60},
{msg_store_file_size_limit, 16777216},
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ define PROJECT_ENV
{frame_max, 131072},
%% see rabbitmq-server#1593
{channel_max, 2047},
{session_max, 64},
{session_max_per_connection, 64},
{link_max_per_session, 256},
{ranch_connection_max, infinity},
{heartbeat, 60},
{msg_store_file_size_limit, 16777216},
Expand Down
28 changes: 20 additions & 8 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -951,9 +951,16 @@ end}.
%% Sets the maximum number of AMQP 1.0 sessions that can be simultaneously
%% active on an AMQP 1.0 connection.
%%
%% {session_max, 1},
{mapping, "session_max", "rabbit.session_max",
[{datatype, integer}, {validators, ["positive_16_bit_integer"]}]}.
%% {session_max_per_connection, 1},
{mapping, "session_max_per_connection", "rabbit.session_max_per_connection",
[{datatype, integer}, {validators, ["positive_16_bit_unsigned_integer"]}]}.

%% Sets the maximum number of AMQP 1.0 links that can be simultaneously
%% active on an AMQP 1.0 session.
%%
%% {link_max_per_session, 10},
{mapping, "link_max_per_session", "rabbit.link_max_per_session",
[{datatype, integer}, {validators, ["positive_32_bit_unsigned_integer"]}]}.

%% Set the max permissible number of client connections per node.
%% `infinity` means "no limit".
Expand Down Expand Up @@ -2436,7 +2443,7 @@ end}.

{mapping, "raft.segment_max_entries", "ra.segment_max_entries", [
{datatype, integer},
{validators, ["non_zero_positive_integer", "positive_16_bit_integer"]}
{validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]}
]}.

{translation, "ra.segment_max_entries",
Expand Down Expand Up @@ -2743,10 +2750,15 @@ fun(Int) when is_integer(Int) ->
Int >= 1
end}.

{validator, "positive_16_bit_integer", "number should be between 1 and 65535",
fun(Int) when is_integer(Int) ->
(Int >= 1) and (Int =< 65535)
end}.
{validator, "positive_16_bit_unsigned_integer", "number should be between 1 and 65535",
fun(Int) when is_integer(Int) ->
(Int >= 1) and (Int =< 16#ff_ff)
end}.

{validator, "positive_32_bit_unsigned_integer", "number should be between 1 and 4294967295",
fun(Int) when is_integer(Int) ->
(Int >= 1) and (Int =< 16#ff_ff_ff_ff)
end}.

{validator, "valid_regex", "string must be a valid regular expression",
fun("") -> false;
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ handle_connection_frame(
SendTimeoutSec, SendFun,
ReceiveTimeoutSec, ReceiveFun),
{ok, IncomingMaxFrameSize} = application:get_env(rabbit, frame_max),
{ok, SessionMax} = application:get_env(rabbit, session_max),
{ok, SessionMax} = application:get_env(rabbit, session_max_per_connection),
%% "The channel-max value is the highest channel number that can be used on the connection.
%% This value plus one is the maximum number of sessions that can be simultaneously active
%% on the connection." [2.7.1]
Expand Down
575 changes: 296 additions & 279 deletions deps/rabbit/src/rabbit_amqp_session.erl

Large diffs are not rendered by default.

63 changes: 45 additions & 18 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ groups() ->
incoming_window_closed_rabbitmq_internal_flow_quorum_queue,
tcp_back_pressure_rabbitmq_internal_flow_classic_queue,
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
session_max
session_max_per_connection,
link_max_per_session
]},

{cluster_size_3, [shuffle],
Expand Down Expand Up @@ -3350,17 +3351,7 @@ async_notify(SenderSettleMode, QType, Config) ->
flush(settled),
ok = detach_link_sync(Sender),

case QType of
<<"stream">> ->
%% If it is a stream we need to wait until there is a local member
%% on the node we want to subscibe from before proceeding.
rabbit_ct_helpers:await_condition(
fun() -> rpc(Config, 0, ?MODULE, has_local_member,
[rabbit_misc:r(<<"/">>, queue, QName)])
end, 30_000);
_ ->
ok
end,
ok = wait_for_local_member(QType, QName, Config),
Filter = consume_from_first(QType),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"test-receiver">>, Address,
Expand Down Expand Up @@ -3638,10 +3629,7 @@ leader_transfer_credit(QName, QType, Credit, Config) ->
ok = wait_for_accepts(NumMsgs),
ok = detach_link_sync(Sender),

%% Wait a bit to avoid the following error when attaching:
%% "stream queue <name> does not have a running replica on the local node"
timer:sleep(50),

ok = wait_for_local_member(QType, QName, Config),
Filter = consume_from_first(QType),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session0, <<"receiver">>, Address,
Expand Down Expand Up @@ -5666,15 +5654,18 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

session_max(Config) ->
session_max_per_connection(Config) ->
App = rabbit,
Par = session_max,
Par = session_max_per_connection,
{ok, Default} = rpc(Config, application, get_env, [App, Par]),
%% Let's allow only 1 session per connection.
ok = rpc(Config, application, set_env, [App, Par, 1]),

OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, opened}} -> ok
after 5000 -> ct:fail(opened_timeout)
end,
%% The 1st session should succeed.
{ok, _Session1} = amqp10_client:begin_session_sync(Connection),
%% The 2nd session should fail.
Expand All @@ -5688,6 +5679,32 @@ session_max(Config) ->

ok = rpc(Config, application, set_env, [App, Par, Default]).

link_max_per_session(Config) ->
App = rabbit,
Par = link_max_per_session,
{ok, Default} = rpc(Config, application, get_env, [App, Par]),
%% Let's allow only 1 link per session.
ok = rpc(Config, application, set_env, [App, Par, 1]),

OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, opened}} -> ok
after 5000 -> ct:fail(opened_timeout)
end,
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address1 = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"k1">>),
Address2 = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"k2">>),
%% The 1st link should succeed.
{ok, Link1} = amqp10_client:attach_sender_link_sync(Session, <<"link-1">>, Address1),
ok = wait_for_credit(Link1),
%% Since the 2nd link should fail, we expect our session process to die.
?assert(is_process_alive(Session)),
{ok, _Link2} = amqp10_client:attach_sender_link(Session, <<"link-2">>, Address2),
eventually(?_assertNot(is_process_alive(Session))),

flush(test_succeeded),
ok = rpc(Config, application, set_env, [App, Par, Default]).

%% internal
%%

Expand Down Expand Up @@ -5985,6 +6002,16 @@ ready_messages(QName, Config)
ra_name(Q) ->
binary_to_atom(<<"%2F_", Q/binary>>).

wait_for_local_member(<<"stream">>, QName, Config) ->
%% If it is a stream we need to wait until there is a local member
%% on the node we want to subscribe from before proceeding.
rabbit_ct_helpers:await_condition(
fun() -> rpc(Config, 0, ?MODULE, has_local_member,
[rabbit_misc:r(<<"/">>, queue, QName)])
end, 30_000);
wait_for_local_member(_, _, _) ->
ok.

has_local_member(QName) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/test/amqp_credit_api_v2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ credit_api_v2(Config) ->
ok = amqp10_client:detach_link(QQSender),

%% Consume with credit API v1
CQAttachArgs = #{handle => 300,
CQAttachArgs = #{handle => 100,
name => <<"cq receiver 1">>,
role => {receiver, #{address => CQAddr,
durable => configuration}, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
{ok, CQReceiver1} = amqp10_client:attach_link(Session, CQAttachArgs),
QQAttachArgs = #{handle => 400,
QQAttachArgs = #{handle => 200,
name => <<"qq receiver 1">>,
role => {receiver, #{address => QQAddr,
durable => configuration}, self()},
Expand Down
20 changes: 14 additions & 6 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,21 @@ tcp_listen_options.exit_on_close = false",
"channel_max_per_node = infinity",
[{rabbit,[{channel_max_per_node, infinity}]}],
[]},
{session_max_1,
"session_max = 1",
[{rabbit,[{session_max, 1}]}],
{session_max_per_connection_1,
"session_max_per_connection = 1",
[{rabbit,[{session_max_per_connection, 1}]}],
[]},
{session_max,
"session_max = 65000",
[{rabbit,[{session_max, 65000}]}],
{session_max_per_connection,
"session_max_per_connection = 65000",
[{rabbit,[{session_max_per_connection, 65_000}]}],
[]},
{link_max_per_session_1,
"link_max_per_session = 1",
[{rabbit,[{link_max_per_session, 1}]}],
[]},
{link_max_per_session,
"link_max_per_session = 4200000000",
[{rabbit,[{link_max_per_session, 4_200_000_000}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = 16",
Expand Down

0 comments on commit b2db635

Please sign in to comment.