Skip to content

Commit

Permalink
WIP connection metrics
Browse files Browse the repository at this point in the history
TODO: maybe_emit_stats/1 as done in rabbit_reader
  • Loading branch information
ansd committed Oct 31, 2024
1 parent 1969ae4 commit 91df2f1
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 24 deletions.
50 changes: 45 additions & 5 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_reader.hrl").
-include("rabbit_amqp.hrl").

-export([init/1,
Expand Down Expand Up @@ -79,7 +80,8 @@
pending_recv :: boolean(),
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels :: #{channel_number() => Session :: pid()}
tracked_channels :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
}).

-type state() :: #v1{}.
Expand All @@ -90,7 +92,7 @@

unpack_from_0_9_1(
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
Parent) ->
logger:update_process_metadata(#{connection => ConnectionName}),
#v1{parent = Parent,
Expand All @@ -106,6 +108,7 @@ unpack_from_0_9_1(
tracked_channels = maps:new(),
writer = none,
connection_state = received_amqp3100,
stats_timer = StatsTimer,
connection = #v1_connection{
name = ConnectionName,
container_id = none,
Expand Down Expand Up @@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.

handle_other(emit_stats, State) ->
emit_stats(State);
handle_other(ensure_stats_timer, State) ->
ensure_stats_timer(State);
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'",
[Reason]),
Expand Down Expand Up @@ -247,8 +254,17 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
end,
gen_server:reply(From, Reply),
State;
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
State;
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) ->
case ?IS_RUNNING(State) of
true ->
%%TODO test case
Infos = infos(?CONNECTION_EVENT_KEYS, State),
rabbit_event:notify(connection_created, Infos, Ref),
rabbit_event:init_stats_timer(State, #v1.stats_timer);
false ->
%% Ignore, we will emit a connection_created event once we start running.
State
end;
handle_other(terminate_connection, _State) ->
stop;
handle_other({set_credential, Cred}, State) ->
Expand Down Expand Up @@ -646,7 +662,8 @@ handle_input({frame_header, Mode},
"frame size (~b bytes) > maximum frame size (~b bytes)",
[Size, MaxFrameSize]));
true ->
switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8)
State1 = switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8),
ensure_stats_timer(State1)
end;
handle_input({frame_header, _Mode}, Malformed, _State) ->
throw({bad_1_0_header, Malformed});
Expand Down Expand Up @@ -1045,6 +1062,11 @@ i(channels, #v1{tracked_channels = Channels}) ->
maps:size(Channels);
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
Max;
i(reductions = Item, _State) ->
{Item, Reductions} = erlang:process_info(self(), Item),
Reductions;
i(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
i(Item, #v1{}) ->
throw({bad_argument, Item}).

Expand All @@ -1055,6 +1077,24 @@ socket_info(Get, Select, #v1{sock = Sock}) ->
{error, _} -> ''
end.

emit_stats(State) ->
[{_, Pid},
{_, RecvOct},
{_, SendOct},
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
%% NB: Don't call ensure_stats_timer because it becomes expensive
%% if all idle connections emit stats.
rabbit_event:reset_stats_timer(State, #v1.stats_timer).

ensure_stats_timer(State)
when ?IS_RUNNING(State) ->
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
State.

ignore_maintenance({map, Properties}) ->
lists:member(
{{symbol, <<"ignore-maintenance">>}, true},
Expand Down
18 changes: 12 additions & 6 deletions deps/rabbit/src/rabbit_amqp_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
pending :: iolist(),
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
pending_size :: non_neg_integer(),
monitored_sessions :: #{pid() => true}
monitored_sessions :: #{pid() => true},
stats_timer :: rabbit_event:state()
}).

-define(HIBERNATE_AFTER, 6_000).
Expand Down Expand Up @@ -100,7 +101,8 @@ init({Sock, ReaderPid}) ->
reader = ReaderPid,
pending = [],
pending_size = 0,
monitored_sessions = #{}},
monitored_sessions = #{},
stats_timer = rabbit_event:init_stats_timer()},
process_flag(message_queue_data, off_heap),
{ok, State}.

Expand All @@ -123,6 +125,9 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
State = flush(State1),
{reply, ok, State}.

handle_info(emit_stats, State = #state{reader = ReaderPid}) ->
ReaderPid ! ensure_stats_timer,
rabbit_event:reset_stats_timer(State, #state.stats_timer);
handle_info(timeout, State0) ->
State = flush(State0),
{noreply, State};
Expand Down Expand Up @@ -229,12 +234,13 @@ maybe_flush(State = #state{pending_size = PendingSize}) ->

flush(State = #state{pending = []}) ->
State;
flush(State = #state{sock = Sock,
pending = Pending}) ->
flush(State0 = #state{sock = Sock,
pending = Pending}) ->
case rabbit_net:send(Sock, lists:reverse(Pending)) of
ok ->
State#state{pending = [],
pending_size = 0};
State = State0#state{pending = [],
pending_size = 0},
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats);
{error, Reason} ->
exit({writer, send_failed, Reason})
end.
18 changes: 8 additions & 10 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_reader.hrl").

-export([start_link/2, info/2, force_event_refresh/2,
shutdown/2]).
Expand Down Expand Up @@ -116,10 +117,6 @@
connection_blocked_message_sent
}).

-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
garbage_collection]).

-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
Expand Down Expand Up @@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState,
end;
i(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
i(reductions = Item, _State) ->
{Item, Reductions} = erlang:process_info(self(), Item),
Reductions;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).

Expand Down Expand Up @@ -1623,12 +1620,12 @@ maybe_emit_stats(State) ->

emit_stats(State) ->
[{_, Pid},
{_, Recv_oct},
{_, Send_oct},
{_, RecvOct},
{_, SendOct},
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
ensure_stats_timer(State1).

Expand All @@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
pending_recv = PendingRecv,
helper_sup = {_HelperSup091, HelperSup10},
proxy_socket = ProxySocket,
stats_timer = StatsTimer,
connection = #connection{
name = Name,
host = Host,
Expand All @@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
peer_port = PeerPort,
connected_at = ConnectedAt}}) ->
{Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}.

respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
log_hard_error(State, Channel, LogErr),
Expand Down
11 changes: 11 additions & 0 deletions deps/rabbit/src/rabbit_reader.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

-define(SIMPLE_METRICS,
[pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS,
[recv_cnt, send_cnt, send_pend, state, channels, garbage_collection]).
4 changes: 2 additions & 2 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ connection_stats(Pid, Infos) ->
ets:insert(connection_metrics, {Pid, Infos}),
ok.

connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
connection_stats(Pid, RecvOct, SendOct, Reductions) ->
%% Includes delete marker
ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}),
ok.

-spec session_begun(rabbit_types:infos()) -> ok.
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit_common/src/rabbit_event.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ ensure_stats_timer(C, P, Msg) ->
when Level =/= none ->
TRef = erlang:send_after(Interval, self(), Msg),
setelement(P, C, State#state{timer = TRef});
#state{} ->
_State ->
C
end.

Expand Down

0 comments on commit 91df2f1

Please sign in to comment.