diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index c26f7175555c..ab57bb647b79 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -43,7 +43,8 @@ _APP_ENV = """[ {frame_max, 131072}, %% see rabbitmq-server#1593 {channel_max, 2047}, - {session_max, 64}, + {session_max_per_connection, 64}, + {link_max_per_session, 256}, {ranch_connection_max, infinity}, {heartbeat, 60}, {msg_store_file_size_limit, 16777216}, diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 83a8601e5898..7130636dda8a 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -23,7 +23,8 @@ define PROJECT_ENV {frame_max, 131072}, %% see rabbitmq-server#1593 {channel_max, 2047}, - {session_max, 64}, + {session_max_per_connection, 64}, + {link_max_per_session, 256}, {ranch_connection_max, infinity}, {heartbeat, 60}, {msg_store_file_size_limit, 16777216}, diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index c94dc7a629b7..e1dfbe5b4c71 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -951,9 +951,16 @@ end}. %% Sets the maximum number of AMQP 1.0 sessions that can be simultaneously %% active on an AMQP 1.0 connection. %% -%% {session_max, 1}, -{mapping, "session_max", "rabbit.session_max", - [{datatype, integer}, {validators, ["positive_16_bit_integer"]}]}. +%% {session_max_per_connection, 1}, +{mapping, "session_max_per_connection", "rabbit.session_max_per_connection", + [{datatype, integer}, {validators, ["positive_16_bit_unsigned_integer"]}]}. + +%% Sets the maximum number of AMQP 1.0 links that can be simultaneously +%% active on an AMQP 1.0 session. +%% +%% {link_max_per_session, 10}, +{mapping, "link_max_per_session", "rabbit.link_max_per_session", + [{datatype, integer}, {validators, ["positive_32_bit_unsigned_integer"]}]}. %% Set the max permissible number of client connections per node. %% `infinity` means "no limit". @@ -2436,7 +2443,7 @@ end}. {mapping, "raft.segment_max_entries", "ra.segment_max_entries", [ {datatype, integer}, - {validators, ["non_zero_positive_integer", "positive_16_bit_integer"]} + {validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]} ]}. {translation, "ra.segment_max_entries", @@ -2743,10 +2750,15 @@ fun(Int) when is_integer(Int) -> Int >= 1 end}. -{validator, "positive_16_bit_integer", "number should be between 1 and 65535", -fun(Int) when is_integer(Int) -> - (Int >= 1) and (Int =< 65535) -end}. +{validator, "positive_16_bit_unsigned_integer", "number should be between 1 and 65535", + fun(Int) when is_integer(Int) -> + (Int >= 1) and (Int =< 16#ff_ff) + end}. + +{validator, "positive_32_bit_unsigned_integer", "number should be between 1 and 4294967295", + fun(Int) when is_integer(Int) -> + (Int >= 1) and (Int =< 16#ff_ff_ff_ff) + end}. {validator, "valid_regex", "string must be a valid regular expression", fun("") -> false; diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 2507f2ec1690..52e2ba2e8f9c 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -476,7 +476,7 @@ handle_connection_frame( SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun), {ok, IncomingMaxFrameSize} = application:get_env(rabbit, frame_max), - {ok, SessionMax} = application:get_env(rabbit, session_max), + {ok, SessionMax} = application:get_env(rabbit, session_max_per_connection), %% "The channel-max value is the highest channel number that can be used on the connection. %% This value plus one is the maximum number of sessions that can be simultaneously active %% on the connection." [2.7.1] diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 16f69733d68e..7ec8161ce7c8 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -67,7 +67,6 @@ %% sequence number initialized at an arbitrary point by the sender." [2.6.7] -define(INITIAL_DELIVERY_COUNT, ?UINT_MAX - 4). -define(INITIAL_OUTGOING_DELIVERY_ID, 0). --define(DEFAULT_MAX_HANDLE, ?UINT_MAX). -define(UINT(N), {uint, N}). %% [3.4] -define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED, @@ -261,6 +260,7 @@ resource_alarms :: sets:set(rabbit_alarm:resource_alarm_source()), trace_state :: rabbit_trace:state(), conn_name :: binary(), + max_handle :: link_handle(), max_incoming_window :: pos_integer(), max_link_credit :: pos_integer(), max_queue_credit :: pos_integer() @@ -382,7 +382,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, next_outgoing_id = ?UINT(RemoteNextOutgoingId), incoming_window = ?UINT(RemoteIncomingWindow), outgoing_window = ?UINT(RemoteOutgoingWindow), - handle_max = HandleMax0}}) -> + handle_max = ClientHandleMax}}) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), @@ -390,6 +390,19 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), Alarms = sets:from_list(Alarms0, [{version, 2}]), + {ok, LinkMax} = application:get_env(rabbit, link_max_per_session), + %% "The handle-max value is the highest handle value that can be used on the session." [2.7.2] + %% The lowest handle is 0. + HandleMax = LinkMax - 1, + %% Assert config is valid. + true = HandleMax >= 0 andalso HandleMax =< ?UINT_MAX, + EffectiveHandleMax = case ClientHandleMax of + undefined -> + HandleMax; + ?UINT(N) -> + min(N, HandleMax) + end, + MaxLinkCredit = application:get_env( rabbit, max_link_credit, ?DEFAULT_MAX_LINK_CREDIT), MaxQueueCredit = application:get_env( @@ -405,18 +418,14 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, end, NextOutgoingId = ?INITIAL_OUTGOING_TRANSFER_ID, - HandleMax = case HandleMax0 of - ?UINT(Max) -> Max; - _ -> ?DEFAULT_MAX_HANDLE - end, Reply = #'v1_0.begin'{ %% "When an endpoint responds to a remotely initiated session, the remote-channel %% MUST be set to the channel on which the remote session sent the begin." [2.7.2] remote_channel = {ushort, ChannelNum}, - handle_max = ?UINT(HandleMax), next_outgoing_id = ?UINT(NextOutgoingId), incoming_window = ?UINT(IncomingWindow), - outgoing_window = ?UINT_OUTGOING_WINDOW}, + outgoing_window = ?UINT_OUTGOING_WINDOW, + handle_max = ?UINT(EffectiveHandleMax)}, rabbit_amqp_writer:send_command(WriterPid, ChannelNum, Reply), {ok, #state{next_incoming_id = RemoteNextOutgoingId, @@ -434,6 +443,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, resource_alarms = Alarms, trace_state = rabbit_trace:init(Vhost), conn_name = ConnName, + max_handle = EffectiveHandleMax, max_incoming_window = MaxIncomingWindow, max_link_credit = MaxLinkCredit, max_queue_credit = MaxQueueCredit @@ -489,7 +499,7 @@ handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason}, handle_cast({frame_body, FrameBody}, #state{cfg = #cfg{writer_pid = WriterPid, channel_num = Ch}} = State0) -> - try handle_control(FrameBody, State0) of + try handle_frame(FrameBody, State0) of {reply, Replies, State} when is_list(Replies) -> lists:foreach(fun (Reply) -> rabbit_amqp_writer:send_command(WriterPid, Ch, Reply) @@ -884,20 +894,243 @@ disposition(DeliveryState, First, Last) -> first = ?UINT(First), last = Last1}. -handle_control(#'v1_0.attach'{ - role = ?AMQP_ROLE_SENDER, - snd_settle_mode = ?V_1_0_SENDER_SETTLE_MODE_SETTLED, - name = Name = {utf8, LinkName}, - handle = Handle = ?UINT(HandleInt), - source = Source = #'v1_0.source'{address = ClientTerminusAddress}, - target = Target = #'v1_0.target'{address = {utf8, ?MANAGEMENT_NODE_ADDRESS}}, - initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt), - properties = Properties - } = Attach, - #state{management_link_pairs = Pairs0, - incoming_management_links = Links - } = State0) -> +handle_frame({Performative = #'v1_0.transfer'{handle = ?UINT(Handle)}, Paylaod}, + State0 = #state{incoming_links = IncomingLinks}) -> + {Flows, State1} = session_flow_control_received_transfer(State0), + + {Reply, State} = + case IncomingLinks of + #{Handle := Link0} -> + case incoming_link_transfer(Performative, Paylaod, Link0, State1) of + {ok, Reply0, Link, State2} -> + {Reply0, State2#state{incoming_links = IncomingLinks#{Handle := Link}}}; + {error, Reply0} -> + %% "When an error occurs at a link endpoint, the endpoint MUST be detached + %% with appropriate error information supplied in the error field of the + %% detach frame. The link endpoint MUST then be destroyed." [2.6.5] + {Reply0, State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}} + end; + _ -> + incoming_mgmt_link_transfer(Performative, Paylaod, State1) + end, + reply0(Reply ++ Flows, State); + +%% Although the AMQP message format [3.2] requires a body, it is valid to send a transfer frame without payload. +%% For example, when a large multi transfer message is streamed using the ProtonJ2 client, the client could send +%% a final #'v1_0.transfer'{more=false} frame without a payload. +handle_frame(Performative = #'v1_0.transfer'{}, State) -> + handle_frame({Performative, <<>>}, State); + +%% Flow control. These frames come with two pieces of information: +%% the session window, and optionally, credit for a particular link. +%% We'll deal with each of them separately. +handle_frame(#'v1_0.flow'{handle = Handle} = Flow, + #state{incoming_links = IncomingLinks, + outgoing_links = OutgoingLinks, + incoming_management_links = IncomingMgmtLinks, + outgoing_management_links = OutgoingMgmtLinks + } = State0) -> + State = session_flow_control_received_flow(Flow, State0), + S = case Handle of + undefined -> + %% "If not set, the flow frame is carrying only information + %% pertaining to the session endpoint." [2.7.4] + State; + ?UINT(HandleInt) -> + %% "If set, indicates that the flow frame carries flow state information + %% for the local link endpoint associated with the given handle." [2.7.4] + case OutgoingLinks of + #{HandleInt := OutgoingLink} -> + handle_outgoing_link_flow_control(OutgoingLink, Flow, State); + _ -> + case OutgoingMgmtLinks of + #{HandleInt := OutgoingMgmtLink} -> + handle_outgoing_mgmt_link_flow_control(OutgoingMgmtLink, Flow, State); + _ when is_map_key(HandleInt, IncomingLinks) orelse + is_map_key(HandleInt, IncomingMgmtLinks) -> + %% We're being told about available messages at the sender. + State; + _ -> + %% "If set to a handle that is not currently associated with + %% an attached link, the recipient MUST respond by ending the + %% session with an unattached-handle session error." [2.7.4] + rabbit_log:warning( + "Received Flow frame for unknown link handle: ~tp", [Flow]), + protocol_error( + ?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE, + "Unattached link handle: ~b", [HandleInt]) + end + end + end, + {noreply, S}; + +handle_frame(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER, + first = ?UINT(First), + last = Last0, + state = Outcome, + settled = DispositionSettled} = Disposition, + #state{outgoing_unsettled_map = UnsettledMap0, + queue_states = QStates0} = State0) -> + Last = case Last0 of + ?UINT(L) -> + L; + undefined -> + %% "If not set, this is taken to be the same as first." [2.7.6] + First + end, + UnsettledMapSize = map_size(UnsettledMap0), + case UnsettledMapSize of + 0 -> + {noreply, State0}; + _ -> + DispositionRangeSize = diff(Last, First) + 1, + {Settled, UnsettledMap} = + case DispositionRangeSize =< UnsettledMapSize of + true -> + %% It is cheaper to iterate over the range of settled delivery IDs. + serial_number:foldl(fun settle_delivery_id/2, + {#{}, UnsettledMap0}, + First, Last); + false -> + %% It is cheaper to iterate over the outgoing unsettled map. + Iter = maps:iterator(UnsettledMap0, + fun(D1, D2) -> compare(D1, D2) =/= greater end), + {Settled0, UnsettledList} = + maps:fold( + fun (DeliveryId, + #outgoing_unsettled{queue_name = QName, + consumer_tag = Ctag, + msg_id = MsgId} = Unsettled, + {SettledAcc, UnsettledAcc}) -> + case serial_number:in_range(DeliveryId, First, Last) of + true -> + SettledAcc1 = maps_update_with( + {QName, Ctag}, + fun(MsgIds) -> [MsgId | MsgIds] end, + [MsgId], + SettledAcc), + {SettledAcc1, UnsettledAcc}; + false -> + {SettledAcc, [{DeliveryId, Unsettled} | UnsettledAcc]} + end + end, + {#{}, []}, Iter), + {Settled0, maps:from_list(UnsettledList)} + end, + + SettleOp = settle_op_from_outcome(Outcome), + {QStates, Actions} = + maps:fold( + fun({QName, Ctag}, MsgIdsRev, {QS0, ActionsAcc}) -> + MsgIds = lists:reverse(MsgIdsRev), + case rabbit_queue_type:settle(QName, SettleOp, Ctag, MsgIds, QS0) of + {ok, QS, Actions0} -> + messages_acknowledged(SettleOp, QName, QS, MsgIds), + {QS, ActionsAcc ++ Actions0}; + {protocol_error, _ErrorType, Reason, ReasonArgs} -> + protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + Reason, ReasonArgs) + end + end, {QStates0, []}, Settled), + + State1 = State0#state{outgoing_unsettled_map = UnsettledMap, + queue_states = QStates}, + Reply = case DispositionSettled of + true -> []; + false -> [Disposition#'v1_0.disposition'{settled = true, + role = ?AMQP_ROLE_SENDER}] + end, + State = handle_queue_actions(Actions, State1), + reply0(Reply, State) + end; + +handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach, + #state{cfg = #cfg{max_handle = MaxHandle}} = State) -> ok = validate_attach(Attach), + case Handle > MaxHandle of + true -> + protocol_error(?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, + "link handle value (~b) exceeds maximum link handle value (~b)", + [Handle, MaxHandle]); + false -> + handle_attach(Attach, State) + end; + +handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, + State0 = #state{incoming_links = IncomingLinks, + outgoing_links = OutgoingLinks0, + outgoing_unsettled_map = Unsettled0, + outgoing_pending = Pending0, + queue_states = QStates0, + cfg = #cfg{user = #user{username = Username}}}) -> + {OutgoingLinks, Unsettled, Pending, QStates} = + case maps:take(HandleInt, OutgoingLinks0) of + {#outgoing_link{queue_name = QName}, OutgoingLinks1} -> + Ctag = handle_to_ctag(HandleInt), + {Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0), + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Spec = #{consumer_tag => Ctag, + reason => remove, + user => Username}, + case rabbit_queue_type:cancel(Q, Spec, QStates0) of + {ok, QStates1} -> + {OutgoingLinks1, Unsettled1, Pending1, QStates1}; + {error, Reason} -> + protocol_error( + ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + "Failed to remove consumer from ~s: ~tp", + [rabbit_misc:rs(amqqueue:get_name(Q)), Reason]) + end; + {error, not_found} -> + {OutgoingLinks1, Unsettled1, Pending1, QStates0} + end; + error -> + {OutgoingLinks0, Unsettled0, Pending0, QStates0} + end, + + State1 = State0#state{incoming_links = maps:remove(HandleInt, IncomingLinks), + outgoing_links = OutgoingLinks, + outgoing_unsettled_map = Unsettled, + outgoing_pending = Pending, + queue_states = QStates}, + State = maybe_detach_mgmt_link(HandleInt, State1), + maybe_detach_reply(Detach, State, State0), + publisher_or_consumer_deleted(State, State0), + {noreply, State}; + +handle_frame(#'v1_0.end'{}, + State0 = #state{cfg = #cfg{writer_pid = WriterPid, + channel_num = Ch}}) -> + State = send_delivery_state_changes(State0), + ok = try rabbit_amqp_writer:send_command_sync(WriterPid, Ch, #'v1_0.end'{}) + catch exit:{Reason, {gen_server, call, _ArgList}} + when Reason =:= shutdown orelse + Reason =:= noproc -> + %% AMQP connection and therefore the writer process got already terminated + %% before we had the chance to synchronously end the session. + ok + end, + {stop, normal, State}; + +handle_frame(Frame, _State) -> + protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + "Unexpected frame ~tp", + [amqp10_framing:pprint(Frame)]). + +handle_attach(#'v1_0.attach'{ + role = ?AMQP_ROLE_SENDER, + snd_settle_mode = ?V_1_0_SENDER_SETTLE_MODE_SETTLED, + name = Name = {utf8, LinkName}, + handle = Handle = ?UINT(HandleInt), + source = Source = #'v1_0.source'{address = ClientTerminusAddress}, + target = Target = #'v1_0.target'{address = {utf8, ?MANAGEMENT_NODE_ADDRESS}}, + initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt), + properties = Properties + } = Attach, + #state{management_link_pairs = Pairs0, + incoming_management_links = Links + } = State0) -> ok = check_paired(Properties), Pairs = case Pairs0 of #{LinkName := #management_link_pair{ @@ -942,20 +1175,19 @@ handle_control(#'v1_0.attach'{ link_credit = ?UINT(?MAX_MANAGEMENT_LINK_CREDIT)}, reply0([Reply, Flow], State); -handle_control(#'v1_0.attach'{ - role = ?AMQP_ROLE_RECEIVER, - name = Name = {utf8, LinkName}, - handle = Handle = ?UINT(HandleInt), - source = Source = #'v1_0.source'{address = {utf8, ?MANAGEMENT_NODE_ADDRESS}}, - target = Target = #'v1_0.target'{address = ClientTerminusAddress}, - rcv_settle_mode = RcvSettleMode, - max_message_size = MaybeMaxMessageSize, - properties = Properties - } = Attach, - #state{management_link_pairs = Pairs0, - outgoing_management_links = Links - } = State0) -> - ok = validate_attach(Attach), +handle_attach(#'v1_0.attach'{ + role = ?AMQP_ROLE_RECEIVER, + name = Name = {utf8, LinkName}, + handle = Handle = ?UINT(HandleInt), + source = Source = #'v1_0.source'{address = {utf8, ?MANAGEMENT_NODE_ADDRESS}}, + target = Target = #'v1_0.target'{address = ClientTerminusAddress}, + rcv_settle_mode = RcvSettleMode, + max_message_size = MaybeMaxMessageSize, + properties = Properties + } = Attach, + #state{management_link_pairs = Pairs0, + outgoing_management_links = Links + } = State0) -> ok = check_paired(Properties), Pairs = case Pairs0 of #{LinkName := #management_link_pair{ @@ -998,20 +1230,19 @@ handle_control(#'v1_0.attach'{ properties = Properties}, reply0(Reply, State); -handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, - name = LinkName, - handle = Handle = ?UINT(HandleInt), - source = Source, - snd_settle_mode = SndSettleMode, - target = Target, - initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) - } = Attach, - State0 = #state{incoming_links = IncomingLinks0, - permission_cache = PermCache0, - cfg = #cfg{max_link_credit = MaxLinkCredit, - vhost = Vhost, - user = User}}) -> - ok = validate_attach(Attach), +handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, + name = LinkName, + handle = Handle = ?UINT(HandleInt), + source = Source, + snd_settle_mode = SndSettleMode, + target = Target, + initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) + }, + State0 = #state{incoming_links = IncomingLinks0, + permission_cache = PermCache0, + cfg = #cfg{max_link_credit = MaxLinkCredit, + vhost = Vhost, + user = User}}) -> case ensure_target(Target, Vhost, User, PermCache0) of {ok, Exchange, RoutingKey, QNameBin, PermCache} -> MaxMessageSize = persistent_term:get(max_message_size), @@ -1051,21 +1282,20 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, [Reason]) end; -handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, - name = LinkName, - handle = Handle = ?UINT(HandleInt), - source = Source, - snd_settle_mode = SndSettleMode, - rcv_settle_mode = RcvSettleMode, - max_message_size = MaybeMaxMessageSize} = Attach, - State0 = #state{queue_states = QStates0, - outgoing_links = OutgoingLinks0, - permission_cache = PermCache0, - topic_permission_cache = TopicPermCache0, - cfg = #cfg{vhost = Vhost, - user = User = #user{username = Username}, - reader_pid = ReaderPid}}) -> - ok = validate_attach(Attach), +handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, + name = LinkName, + handle = Handle = ?UINT(HandleInt), + source = Source, + snd_settle_mode = SndSettleMode, + rcv_settle_mode = RcvSettleMode, + max_message_size = MaybeMaxMessageSize} = Attach, + State0 = #state{queue_states = QStates0, + outgoing_links = OutgoingLinks0, + permission_cache = PermCache0, + topic_permission_cache = TopicPermCache0, + cfg = #cfg{vhost = Vhost, + user = User = #user{username = Username}, + reader_pid = ReaderPid}}) -> {SndSettled, EffectiveSndSettleMode} = case SndSettleMode of ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> @@ -1193,220 +1423,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, "Could not operate on ~s: ~tp", [rabbit_misc:rs(QName), Reason]) end - end; - -handle_control({Performative = #'v1_0.transfer'{handle = ?UINT(Handle)}, Paylaod}, - State0 = #state{incoming_links = IncomingLinks}) -> - {Flows, State1} = session_flow_control_received_transfer(State0), - - {Reply, State} = - case IncomingLinks of - #{Handle := Link0} -> - case incoming_link_transfer(Performative, Paylaod, Link0, State1) of - {ok, Reply0, Link, State2} -> - {Reply0, State2#state{incoming_links = IncomingLinks#{Handle := Link}}}; - {error, Reply0} -> - %% "When an error occurs at a link endpoint, the endpoint MUST be detached - %% with appropriate error information supplied in the error field of the - %% detach frame. The link endpoint MUST then be destroyed." [2.6.5] - {Reply0, State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}} - end; - _ -> - incoming_mgmt_link_transfer(Performative, Paylaod, State1) - end, - reply0(Reply ++ Flows, State); - - -%% Although the AMQP message format [3.2] requires a body, it is valid to send a transfer frame without payload. -%% For example, when a large multi transfer message is streamed using the ProtonJ2 client, the client could send -%% a final #'v1_0.transfer'{more=false} frame without a payload. -handle_control(Performative = #'v1_0.transfer'{}, State) -> - handle_control({Performative, <<>>}, State); - -%% Flow control. These frames come with two pieces of information: -%% the session window, and optionally, credit for a particular link. -%% We'll deal with each of them separately. -handle_control(#'v1_0.flow'{handle = Handle} = Flow, - #state{incoming_links = IncomingLinks, - outgoing_links = OutgoingLinks, - incoming_management_links = IncomingMgmtLinks, - outgoing_management_links = OutgoingMgmtLinks - } = State0) -> - State = session_flow_control_received_flow(Flow, State0), - S = case Handle of - undefined -> - %% "If not set, the flow frame is carrying only information - %% pertaining to the session endpoint." [2.7.4] - State; - ?UINT(HandleInt) -> - %% "If set, indicates that the flow frame carries flow state information - %% for the local link endpoint associated with the given handle." [2.7.4] - case OutgoingLinks of - #{HandleInt := OutgoingLink} -> - handle_outgoing_link_flow_control(OutgoingLink, Flow, State); - _ -> - case OutgoingMgmtLinks of - #{HandleInt := OutgoingMgmtLink} -> - handle_outgoing_mgmt_link_flow_control(OutgoingMgmtLink, Flow, State); - _ when is_map_key(HandleInt, IncomingLinks) orelse - is_map_key(HandleInt, IncomingMgmtLinks) -> - %% We're being told about available messages at the sender. - State; - _ -> - %% "If set to a handle that is not currently associated with - %% an attached link, the recipient MUST respond by ending the - %% session with an unattached-handle session error." [2.7.4] - rabbit_log:warning( - "Received Flow frame for unknown link handle: ~tp", [Flow]), - protocol_error( - ?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE, - "Unattached link handle: ~b", [HandleInt]) - end - end - end, - {noreply, S}; - -handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, - State0 = #state{incoming_links = IncomingLinks, - outgoing_links = OutgoingLinks0, - outgoing_unsettled_map = Unsettled0, - outgoing_pending = Pending0, - queue_states = QStates0, - cfg = #cfg{user = #user{username = Username}}}) -> - {OutgoingLinks, Unsettled, Pending, QStates} = - case maps:take(HandleInt, OutgoingLinks0) of - {#outgoing_link{queue_name = QName}, OutgoingLinks1} -> - Ctag = handle_to_ctag(HandleInt), - {Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0), - case rabbit_amqqueue:lookup(QName) of - {ok, Q} -> - Spec = #{consumer_tag => Ctag, - reason => remove, - user => Username}, - case rabbit_queue_type:cancel(Q, Spec, QStates0) of - {ok, QStates1} -> - {OutgoingLinks1, Unsettled1, Pending1, QStates1}; - {error, Reason} -> - protocol_error( - ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Failed to remove consumer from ~s: ~tp", - [rabbit_misc:rs(amqqueue:get_name(Q)), Reason]) - end; - {error, not_found} -> - {OutgoingLinks1, Unsettled1, Pending1, QStates0} - end; - error -> - {OutgoingLinks0, Unsettled0, Pending0, QStates0} - end, - - State1 = State0#state{incoming_links = maps:remove(HandleInt, IncomingLinks), - outgoing_links = OutgoingLinks, - outgoing_unsettled_map = Unsettled, - outgoing_pending = Pending, - queue_states = QStates}, - State = maybe_detach_mgmt_link(HandleInt, State1), - maybe_detach_reply(Detach, State, State0), - publisher_or_consumer_deleted(State, State0), - {noreply, State}; - -handle_control(#'v1_0.end'{}, - State0 = #state{cfg = #cfg{writer_pid = WriterPid, - channel_num = Ch}}) -> - State = send_delivery_state_changes(State0), - ok = try rabbit_amqp_writer:send_command_sync(WriterPid, Ch, #'v1_0.end'{}) - catch exit:{Reason, {gen_server, call, _ArgList}} - when Reason =:= shutdown orelse - Reason =:= noproc -> - %% AMQP connection and therefore the writer process got already terminated - %% before we had the chance to synchronously end the session. - ok - end, - {stop, normal, State}; - -handle_control(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER, - first = ?UINT(First), - last = Last0, - state = Outcome, - settled = DispositionSettled} = Disposition, - #state{outgoing_unsettled_map = UnsettledMap0, - queue_states = QStates0} = State0) -> - Last = case Last0 of - ?UINT(L) -> - L; - undefined -> - %% "If not set, this is taken to be the same as first." [2.7.6] - First - end, - UnsettledMapSize = map_size(UnsettledMap0), - case UnsettledMapSize of - 0 -> - {noreply, State0}; - _ -> - DispositionRangeSize = diff(Last, First) + 1, - {Settled, UnsettledMap} = - case DispositionRangeSize =< UnsettledMapSize of - true -> - %% It is cheaper to iterate over the range of settled delivery IDs. - serial_number:foldl(fun settle_delivery_id/2, - {#{}, UnsettledMap0}, - First, Last); - false -> - %% It is cheaper to iterate over the outgoing unsettled map. - Iter = maps:iterator(UnsettledMap0, - fun(D1, D2) -> compare(D1, D2) =/= greater end), - {Settled0, UnsettledList} = - maps:fold( - fun (DeliveryId, - #outgoing_unsettled{queue_name = QName, - consumer_tag = Ctag, - msg_id = MsgId} = Unsettled, - {SettledAcc, UnsettledAcc}) -> - case serial_number:in_range(DeliveryId, First, Last) of - true -> - SettledAcc1 = maps_update_with( - {QName, Ctag}, - fun(MsgIds) -> [MsgId | MsgIds] end, - [MsgId], - SettledAcc), - {SettledAcc1, UnsettledAcc}; - false -> - {SettledAcc, [{DeliveryId, Unsettled} | UnsettledAcc]} - end - end, - {#{}, []}, Iter), - {Settled0, maps:from_list(UnsettledList)} - end, - - SettleOp = settle_op_from_outcome(Outcome), - {QStates, Actions} = - maps:fold( - fun({QName, Ctag}, MsgIdsRev, {QS0, ActionsAcc}) -> - MsgIds = lists:reverse(MsgIdsRev), - case rabbit_queue_type:settle(QName, SettleOp, Ctag, MsgIds, QS0) of - {ok, QS, Actions0} -> - messages_acknowledged(SettleOp, QName, QS, MsgIds), - {QS, ActionsAcc ++ Actions0}; - {protocol_error, _ErrorType, Reason, ReasonArgs} -> - protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - Reason, ReasonArgs) - end - end, {QStates0, []}, Settled), - - State1 = State0#state{outgoing_unsettled_map = UnsettledMap, - queue_states = QStates}, - Reply = case DispositionSettled of - true -> []; - false -> [Disposition#'v1_0.disposition'{settled = true, - role = ?AMQP_ROLE_SENDER}] - end, - State = handle_queue_actions(Actions, State1), - reply0(Reply, State) - end; - -handle_control(Frame, _State) -> - protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Unexpected frame ~tp", - [amqp10_framing:pprint(Frame)]). + end. send_pending(#state{remote_incoming_window = RemoteIncomingWindow, outgoing_pending = Buf0 diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 62b2e6d1fb05..7b1d518307b3 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -141,7 +141,8 @@ groups() -> incoming_window_closed_rabbitmq_internal_flow_quorum_queue, tcp_back_pressure_rabbitmq_internal_flow_classic_queue, tcp_back_pressure_rabbitmq_internal_flow_quorum_queue, - session_max + session_max_per_connection, + link_max_per_session ]}, {cluster_size_3, [shuffle], @@ -3350,17 +3351,7 @@ async_notify(SenderSettleMode, QType, Config) -> flush(settled), ok = detach_link_sync(Sender), - case QType of - <<"stream">> -> - %% If it is a stream we need to wait until there is a local member - %% on the node we want to subscibe from before proceeding. - rabbit_ct_helpers:await_condition( - fun() -> rpc(Config, 0, ?MODULE, has_local_member, - [rabbit_misc:r(<<"/">>, queue, QName)]) - end, 30_000); - _ -> - ok - end, + ok = wait_for_local_member(QType, QName, Config), Filter = consume_from_first(QType), {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"test-receiver">>, Address, @@ -3638,10 +3629,7 @@ leader_transfer_credit(QName, QType, Credit, Config) -> ok = wait_for_accepts(NumMsgs), ok = detach_link_sync(Sender), - %% Wait a bit to avoid the following error when attaching: - %% "stream queue does not have a running replica on the local node" - timer:sleep(50), - + ok = wait_for_local_member(QType, QName, Config), Filter = consume_from_first(QType), {ok, Receiver} = amqp10_client:attach_receiver_link( Session0, <<"receiver">>, Address, @@ -5666,15 +5654,18 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). -session_max(Config) -> +session_max_per_connection(Config) -> App = rabbit, - Par = session_max, + Par = session_max_per_connection, {ok, Default} = rpc(Config, application, get_env, [App, Par]), %% Let's allow only 1 session per connection. ok = rpc(Config, application, set_env, [App, Par, 1]), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection, opened}} -> ok + after 5000 -> ct:fail(opened_timeout) + end, %% The 1st session should succeed. {ok, _Session1} = amqp10_client:begin_session_sync(Connection), %% The 2nd session should fail. @@ -5688,6 +5679,32 @@ session_max(Config) -> ok = rpc(Config, application, set_env, [App, Par, Default]). +link_max_per_session(Config) -> + App = rabbit, + Par = link_max_per_session, + {ok, Default} = rpc(Config, application, get_env, [App, Par]), + %% Let's allow only 1 link per session. + ok = rpc(Config, application, set_env, [App, Par, 1]), + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection, opened}} -> ok + after 5000 -> ct:fail(opened_timeout) + end, + {ok, Session} = amqp10_client:begin_session_sync(Connection), + Address1 = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"k1">>), + Address2 = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"k2">>), + %% The 1st link should succeed. + {ok, Link1} = amqp10_client:attach_sender_link_sync(Session, <<"link-1">>, Address1), + ok = wait_for_credit(Link1), + %% Since the 2nd link should fail, we expect our session process to die. + ?assert(is_process_alive(Session)), + {ok, _Link2} = amqp10_client:attach_sender_link(Session, <<"link-2">>, Address2), + eventually(?_assertNot(is_process_alive(Session))), + + flush(test_succeeded), + ok = rpc(Config, application, set_env, [App, Par, Default]). + %% internal %% @@ -5985,6 +6002,16 @@ ready_messages(QName, Config) ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>). +wait_for_local_member(<<"stream">>, QName, Config) -> + %% If it is a stream we need to wait until there is a local member + %% on the node we want to subscribe from before proceeding. + rabbit_ct_helpers:await_condition( + fun() -> rpc(Config, 0, ?MODULE, has_local_member, + [rabbit_misc:r(<<"/">>, queue, QName)]) + end, 30_000); +wait_for_local_member(_, _, _) -> + ok. + has_local_member(QName) -> case rabbit_amqqueue:lookup(QName) of {ok, Q} -> diff --git a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl index 76a12873e715..ba465e396fa3 100644 --- a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl +++ b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl @@ -97,7 +97,7 @@ credit_api_v2(Config) -> ok = amqp10_client:detach_link(QQSender), %% Consume with credit API v1 - CQAttachArgs = #{handle => 300, + CQAttachArgs = #{handle => 100, name => <<"cq receiver 1">>, role => {receiver, #{address => CQAddr, durable => configuration}, self()}, @@ -105,7 +105,7 @@ credit_api_v2(Config) -> rcv_settle_mode => first, filter => #{}}, {ok, CQReceiver1} = amqp10_client:attach_link(Session, CQAttachArgs), - QQAttachArgs = #{handle => 400, + QQAttachArgs = #{handle => 200, name => <<"qq receiver 1">>, role => {receiver, #{address => QQAddr, durable => configuration}, self()}, diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index a67bec3788b7..ec706686466b 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -429,13 +429,21 @@ tcp_listen_options.exit_on_close = false", "channel_max_per_node = infinity", [{rabbit,[{channel_max_per_node, infinity}]}], []}, - {session_max_1, - "session_max = 1", - [{rabbit,[{session_max, 1}]}], + {session_max_per_connection_1, + "session_max_per_connection = 1", + [{rabbit,[{session_max_per_connection, 1}]}], []}, - {session_max, - "session_max = 65000", - [{rabbit,[{session_max, 65000}]}], + {session_max_per_connection, + "session_max_per_connection = 65000", + [{rabbit,[{session_max_per_connection, 65_000}]}], + []}, + {link_max_per_session_1, + "link_max_per_session = 1", + [{rabbit,[{link_max_per_session, 1}]}], + []}, + {link_max_per_session, + "link_max_per_session = 4200000000", + [{rabbit,[{link_max_per_session, 4_200_000_000}]}], []}, {consumer_max_per_channel, "consumer_max_per_channel = 16",