diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 070205fa0b64..b53fa4d0cf0b 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -10,6 +10,7 @@ -include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl"). +-include("rabbit_reader.hrl"). -include("rabbit_amqp.hrl"). -export([init/1, @@ -79,7 +80,8 @@ pending_recv :: boolean(), buf :: list(), buf_len :: non_neg_integer(), - tracked_channels :: #{channel_number() => Session :: pid()} + tracked_channels :: #{channel_number() => Session :: pid()}, + stats_timer :: rabbit_event:state() }). -type state() :: #v1{}. @@ -90,7 +92,7 @@ unpack_from_0_9_1( {Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket, - ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt}, + ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}, Parent) -> logger:update_process_metadata(#{connection => ConnectionName}), #v1{parent = Parent, @@ -106,6 +108,7 @@ unpack_from_0_9_1( tracked_channels = maps:new(), writer = none, connection_state = received_amqp3100, + stats_timer = StatsTimer, connection = #v1_connection{ name = ConnectionName, container_id = none, @@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. +handle_other(emit_stats, State) -> + emit_stats(State); +handle_other(ensure_stats_timer, State) -> + ensure_stats_timer(State); handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'", [Reason]), @@ -247,8 +254,17 @@ handle_other({'$gen_call', From, {info, Items}}, State) -> end, gen_server:reply(From, Reply), State; -handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> - State; +handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) -> + case ?IS_RUNNING(State) of + true -> + %%TODO test case + Infos = infos(?CONNECTION_EVENT_KEYS, State), + rabbit_event:notify(connection_created, Infos, Ref), + rabbit_event:init_stats_timer(State, #v1.stats_timer); + false -> + %% Ignore, we will emit a connection_created event once we start running. + State + end; handle_other(terminate_connection, _State) -> stop; handle_other({set_credential, Cred}, State) -> @@ -646,7 +662,8 @@ handle_input({frame_header, Mode}, "frame size (~b bytes) > maximum frame size (~b bytes)", [Size, MaxFrameSize])); true -> - switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8) + State1 = switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8), + ensure_stats_timer(State1) end; handle_input({frame_header, _Mode}, Malformed, _State) -> throw({bad_1_0_header, Malformed}); @@ -1045,6 +1062,11 @@ i(channels, #v1{tracked_channels = Channels}) -> maps:size(Channels); i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) -> Max; +i(reductions = Item, _State) -> + {Item, Reductions} = erlang:process_info(self(), Item), + Reductions; +i(garbage_collection, _State) -> + rabbit_misc:get_gc_info(self()); i(Item, #v1{}) -> throw({bad_argument, Item}). @@ -1055,6 +1077,24 @@ socket_info(Get, Select, #v1{sock = Sock}) -> {error, _} -> '' end. +emit_stats(State) -> + [{_, Pid}, + {_, RecvOct}, + {_, SendOct}, + {_, Reductions}] = infos(?SIMPLE_METRICS, State), + Infos = infos(?OTHER_METRICS, State), + rabbit_core_metrics:connection_stats(Pid, Infos), + rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions), + %% NB: Don't call ensure_stats_timer because it becomes expensive + %% if all idle connections emit stats. + rabbit_event:reset_stats_timer(State, #v1.stats_timer). + +ensure_stats_timer(State) + when ?IS_RUNNING(State) -> + rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats); +ensure_stats_timer(State) -> + State. + ignore_maintenance({map, Properties}) -> lists:member( {{symbol, <<"ignore-maintenance">>}, true}, diff --git a/deps/rabbit/src/rabbit_amqp_writer.erl b/deps/rabbit/src/rabbit_amqp_writer.erl index 7b239a10a107..0c3ef07a59e6 100644 --- a/deps/rabbit/src/rabbit_amqp_writer.erl +++ b/deps/rabbit/src/rabbit_amqp_writer.erl @@ -31,7 +31,8 @@ pending :: iolist(), %% This field is just an optimisation to minimize the cost of erlang:iolist_size/1 pending_size :: non_neg_integer(), - monitored_sessions :: #{pid() => true} + monitored_sessions :: #{pid() => true}, + stats_timer :: rabbit_event:state() }). -define(HIBERNATE_AFTER, 6_000). @@ -100,7 +101,8 @@ init({Sock, ReaderPid}) -> reader = ReaderPid, pending = [], pending_size = 0, - monitored_sessions = #{}}, + monitored_sessions = #{}, + stats_timer = rabbit_event:init_stats_timer()}, process_flag(message_queue_data, off_heap), {ok, State}. @@ -123,6 +125,9 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) -> State = flush(State1), {reply, ok, State}. +handle_info(emit_stats, State = #state{reader = ReaderPid}) -> + ReaderPid ! ensure_stats_timer, + rabbit_event:reset_stats_timer(State, #state.stats_timer); handle_info(timeout, State0) -> State = flush(State0), {noreply, State}; @@ -229,12 +234,13 @@ maybe_flush(State = #state{pending_size = PendingSize}) -> flush(State = #state{pending = []}) -> State; -flush(State = #state{sock = Sock, - pending = Pending}) -> +flush(State0 = #state{sock = Sock, + pending = Pending}) -> case rabbit_net:send(Sock, lists:reverse(Pending)) of ok -> - State#state{pending = [], - pending_size = 0}; + State = State0#state{pending = [], + pending_size = 0}, + rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats); {error, Reason} -> exit({writer, send_failed, Reason}) end. diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 42e7e70a75fe..2e5723ebfb24 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -42,6 +42,7 @@ -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-include("rabbit_reader.hrl"). -export([start_link/2, info/2, force_event_refresh/2, shutdown/2]). @@ -116,10 +117,6 @@ connection_blocked_message_sent }). --define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]). --define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, - garbage_collection]). - -define(CREATION_EVENT_KEYS, [pid, name, port, peer_port, host, peer_host, ssl, peer_cert_subject, peer_cert_issuer, @@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState, end; i(garbage_collection, _State) -> rabbit_misc:get_gc_info(self()); -i(reductions, _State) -> - {reductions, Reductions} = erlang:process_info(self(), reductions), +i(reductions = Item, _State) -> + {Item, Reductions} = erlang:process_info(self(), Item), Reductions; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). @@ -1623,12 +1620,12 @@ maybe_emit_stats(State) -> emit_stats(State) -> [{_, Pid}, - {_, Recv_oct}, - {_, Send_oct}, + {_, RecvOct}, + {_, SendOct}, {_, Reductions}] = infos(?SIMPLE_METRICS, State), Infos = infos(?OTHER_METRICS, State), rabbit_core_metrics:connection_stats(Pid, Infos), - rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions), + rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions), State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer), ensure_stats_timer(State1). @@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock, pending_recv = PendingRecv, helper_sup = {_HelperSup091, HelperSup10}, proxy_socket = ProxySocket, + stats_timer = StatsTimer, connection = #connection{ name = Name, host = Host, @@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock, peer_port = PeerPort, connected_at = ConnectedAt}}) -> {Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket, - Name, Host, PeerHost, Port, PeerPort, ConnectedAt}. + Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}. respond_and_close(State, Channel, Protocol, Reason, LogErr) -> log_hard_error(State, Channel, LogErr), diff --git a/deps/rabbit/src/rabbit_reader.hrl b/deps/rabbit/src/rabbit_reader.hrl new file mode 100644 index 000000000000..ead31e9c59ed --- /dev/null +++ b/deps/rabbit/src/rabbit_reader.hrl @@ -0,0 +1,11 @@ + +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-define(SIMPLE_METRICS, + [pid, recv_oct, send_oct, reductions]). +-define(OTHER_METRICS, + [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection]). diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl index 7be3c89e9b8a..2a381ffb07fa 100644 --- a/deps/rabbit_common/src/rabbit_core_metrics.erl +++ b/deps/rabbit_common/src/rabbit_core_metrics.erl @@ -168,9 +168,9 @@ connection_stats(Pid, Infos) -> ets:insert(connection_metrics, {Pid, Infos}), ok. -connection_stats(Pid, Recv_oct, Send_oct, Reductions) -> +connection_stats(Pid, RecvOct, SendOct, Reductions) -> %% Includes delete marker - ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}), + ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}), ok. -spec session_begun(rabbit_types:infos()) -> ok. diff --git a/deps/rabbit_common/src/rabbit_event.erl b/deps/rabbit_common/src/rabbit_event.erl index c390b52f555e..c0d60cf9e433 100644 --- a/deps/rabbit_common/src/rabbit_event.erl +++ b/deps/rabbit_common/src/rabbit_event.erl @@ -110,7 +110,7 @@ ensure_stats_timer(C, P, Msg) -> when Level =/= none -> TRef = erlang:send_after(Interval, self(), Msg), setelement(P, C, State#state{timer = TRef}); - #state{} -> + _State -> C end.