Skip to content

Commit

Permalink
Simplify session reply frames
Browse files Browse the repository at this point in the history
This commit is only refactoring.

To avoid confusion with reply and noreply gen_server return values, this
commit uses different return values for handle_frame/2.
  • Loading branch information
ansd committed Sep 9, 2024
1 parent 61f53e2 commit 7baff37
Showing 1 changed file with 29 additions and 40 deletions.
69 changes: 29 additions & 40 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -500,15 +500,10 @@ handle_cast({frame_body, FrameBody},
#state{cfg = #cfg{writer_pid = WriterPid,
channel_num = Ch}} = State0) ->
try handle_frame(FrameBody, State0) of
{reply, Replies, State} when is_list(Replies) ->
lists:foreach(fun (Reply) ->
rabbit_amqp_writer:send_command(WriterPid, Ch, Reply)
end, Replies),
noreply(State);
{reply, Reply, State} ->
rabbit_amqp_writer:send_command(WriterPid, Ch, Reply),
noreply(State);
{noreply, State} ->
{ok, ReplyFrames, State} ->
lists:foreach(fun(Frame) ->
rabbit_amqp_writer:send_command(WriterPid, Ch, Frame)
end, ReplyFrames),
noreply(State);
{stop, _, _} = Stop ->
Stop
Expand Down Expand Up @@ -913,7 +908,7 @@ handle_frame({Performative = #'v1_0.transfer'{handle = ?UINT(Handle)}, Paylaod},
_ ->
incoming_mgmt_link_transfer(Performative, Paylaod, State1)
end,
reply0(Reply ++ Flows, State);
reply_frames(Reply ++ Flows, State);

%% Although the AMQP message format [3.2] requires a body, it is valid to send a transfer frame without payload.
%% For example, when a large multi transfer message is streamed using the ProtonJ2 client, the client could send
Expand Down Expand Up @@ -962,7 +957,7 @@ handle_frame(#'v1_0.flow'{handle = Handle} = Flow,
end
end
end,
{noreply, S};
reply_frames([], S);

handle_frame(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
first = ?UINT(First),
Expand All @@ -981,7 +976,7 @@ handle_frame(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
UnsettledMapSize = map_size(UnsettledMap0),
case UnsettledMapSize of
0 ->
{noreply, State0};
reply_frames([], State0);
_ ->
DispositionRangeSize = diff(Last, First) + 1,
{Settled, UnsettledMap} =
Expand Down Expand Up @@ -1041,7 +1036,7 @@ handle_frame(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
role = ?AMQP_ROLE_SENDER}]
end,
State = handle_queue_actions(Actions, State1),
reply0(Reply, State)
reply_frames(Reply, State)
end;

handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach,
Expand Down Expand Up @@ -1095,9 +1090,9 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
outgoing_pending = Pending,
queue_states = QStates},
State = maybe_detach_mgmt_link(HandleInt, State1),
maybe_detach_reply(Detach, State, State0),
Reply = detach_reply(Detach, State, State0),
publisher_or_consumer_deleted(State, State0),
{noreply, State};
reply_frames(Reply, State);

handle_frame(#'v1_0.end'{},
State0 = #state{cfg = #cfg{writer_pid = WriterPid,
Expand All @@ -1118,6 +1113,9 @@ handle_frame(Frame, _State) ->
"Unexpected frame ~tp",
[amqp10_framing:pprint(Frame)]).

reply_frames(Frames, State) ->
{ok, session_flow_fields(Frames, State), State}.

handle_attach(#'v1_0.attach'{
role = ?AMQP_ROLE_SENDER,
snd_settle_mode = ?V_1_0_SENDER_SETTLE_MODE_SETTLED,
Expand Down Expand Up @@ -1173,7 +1171,7 @@ handle_attach(#'v1_0.attach'{
Flow = #'v1_0.flow'{handle = Handle,
delivery_count = DeliveryCount,
link_credit = ?UINT(?MAX_MANAGEMENT_LINK_CREDIT)},
reply0([Reply, Flow], State);
reply_frames([Reply, Flow], State);

handle_attach(#'v1_0.attach'{
role = ?AMQP_ROLE_RECEIVER,
Expand Down Expand Up @@ -1228,7 +1226,7 @@ handle_attach(#'v1_0.attach'{
%% Echo back that we will respect the client's requested max-message-size.
max_message_size = MaybeMaxMessageSize,
properties = Properties},
reply0(Reply, State);
reply_frames([Reply], State);

handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
name = LinkName,
Expand Down Expand Up @@ -1275,7 +1273,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
State = State0#state{incoming_links = IncomingLinks,
permission_cache = PermCache},
rabbit_global_counters:publisher_created(?PROTOCOL),
reply0([Reply, Flow], State);
reply_frames([Reply, Flow], State);
{error, Reason} ->
protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD,
"Attach rejected: ~tp",
Expand Down Expand Up @@ -1416,7 +1414,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
end
end) of
{ok, Reply, State} ->
reply0(Reply, State);
reply_frames(Reply, State);
{error, Reason} ->
protocol_error(
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
Expand Down Expand Up @@ -1842,11 +1840,6 @@ record_outgoing_unsettled(#pending_delivery{queue_ack_required = false}, State)
%% Also, queue client already acked to queue on behalf of us.
State.

reply0([], State) ->
{noreply, State};
reply0(Reply, State) ->
{reply, session_flow_fields(Reply, State), State}.

%% Implements section "receiving a transfer" in 2.5.6
session_flow_control_received_transfer(
#state{next_incoming_id = NextIncomingId,
Expand Down Expand Up @@ -3278,26 +3271,22 @@ publisher_or_consumer_deleted(

%% If we previously already sent a detach with an error condition, and the Detach we
%% receive here is therefore the client's reply, do not reply again with a 3rd detach.
maybe_detach_reply(
Detach,
#state{incoming_links = NewIncomingLinks,
outgoing_links = NewOutgoingLinks,
incoming_management_links = NewIncomingMgmtLinks,
outgoing_management_links = NewOutgoingMgmtLinks,
cfg = #cfg{writer_pid = WriterPid,
channel_num = Ch}},
#state{incoming_links = OldIncomingLinks,
outgoing_links = OldOutgoingLinks,
incoming_management_links = OldIncomingMgmtLinks,
outgoing_management_links = OldOutgoingMgmtLinks})
detach_reply(Detach,
#state{incoming_links = NewIncomingLinks,
outgoing_links = NewOutgoingLinks,
incoming_management_links = NewIncomingMgmtLinks,
outgoing_management_links = NewOutgoingMgmtLinks},
#state{incoming_links = OldIncomingLinks,
outgoing_links = OldOutgoingLinks,
incoming_management_links = OldIncomingMgmtLinks,
outgoing_management_links = OldOutgoingMgmtLinks})
when map_size(NewIncomingLinks) < map_size(OldIncomingLinks) orelse
map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) orelse
map_size(NewIncomingMgmtLinks) < map_size(OldIncomingMgmtLinks) orelse
map_size(NewOutgoingMgmtLinks) < map_size(OldOutgoingMgmtLinks) ->
Reply = Detach#'v1_0.detach'{error = undefined},
rabbit_amqp_writer:send_command(WriterPid, Ch, Reply);
maybe_detach_reply(_, _, _) ->
ok.
[Detach#'v1_0.detach'{error = undefined}];
detach_reply(_, _, _) ->
[].

-spec maybe_detach_mgmt_link(link_handle(), state()) -> state().
maybe_detach_mgmt_link(
Expand Down

0 comments on commit 7baff37

Please sign in to comment.