Skip to content

Commit

Permalink
WIP Add AMQP observability
Browse files Browse the repository at this point in the history
Add session -> exchange metrics for unroutable_returned and unroutable_dropped

Add session -> exchange metrics for accepted

WIP Add seesion <-> queue stats

Single ETS session table

Expose per session metrics in Prometheus

Add 'rabbitmqctl list_sessions'

CLI command list_channels queries into each channel proc.

This commit decides that list_sessions reads the local ETS table
instead. Each node sends its session infos directly to the CLI node to avoid
large amounts of data being transferred acrosss RabbitMQ nodes.

Advantages:
* Same code path is used for Prometheus, Management UI, and CLI because they
  all query the same single source of truth: ETS table `session_metrics`.
* Avoid waking up potentially hundreds of thousands of processes at the
  same time.

Disadvantages:
* Data is slightly old because the session emits stats every interval (5
  seconds by default). But this shouldn't matter for this CLI command.
  • Loading branch information
ansd committed Oct 31, 2024
1 parent dbd9ede commit 1969ae4
Show file tree
Hide file tree
Showing 9 changed files with 712 additions and 146 deletions.
537 changes: 440 additions & 97 deletions deps/rabbit/src/rabbit_amqp_session.erl

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion deps/rabbit_common/include/rabbit_core_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

-define(CORE_EXTRA_TABLES, [{gen_server2_metrics, set},
{auth_attempt_metrics, set},
{auth_attempt_detailed_metrics, set}]).
{auth_attempt_detailed_metrics, set},
{session_metrics, set}]).

% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the
% same info as some of the channel_queue_metrics, channel_exchange_metrics and
Expand Down
147 changes: 128 additions & 19 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
connection_stats/2,
connection_stats/4]).

-export([session_begun/1,
session_ended/0,
session_pids/0,
session_infos/1,
session_stats/1
% session_stats/4
]).

-export([channel_created/2,
channel_closed/1,
channel_stats/2,
Expand All @@ -38,6 +46,8 @@
queue_deleted/1,
queues_deleted/1]).

-export([exchange_stats/3]).

-export([node_stats/2]).

-export([node_node_stats/2]).
Expand All @@ -54,18 +64,36 @@
get_auth_attempts/0,
get_auth_attempts_by_source/0]).

% -define(QUEUE_METRICS_DEFAULT(QName),
% %% Last field is delete marker.
% {QName, 0, 0, 0, 0, 0, 0, 0, 0}).

% -define(SESSION_EXCHANGE_METRICS_DEFAULT(Key),
% %% Last field is delete marker.
% {Key, 0, 0, 0, 0, 0}).

% -define(SESSION_QUEUE_METRICS_DEFAULT(Key),
% %% Last field is delete marker.
% {Key, 0, 0, 0, 0, 0, 0, 0, 0}).

%%----------------------------------------------------------------------------
%% Types
%%----------------------------------------------------------------------------
-type(channel_stats_id() :: pid() |
{pid(),
{rabbit_types:rabbit_amqqueue_name(), rabbit_types:exchange_name()}} |
{pid(), rabbit_types:rabbit_amqqueue_name()} |
{pid(), rabbit_types:exchange_name()}).

% -type(session_stats_key() :: {pid(), rabbit_types:r(exchange | queue)}).
% -type(session_stats_type() :: exchange_stats | queue_stats).

-type(channel_stats_id() ::
pid() |
{pid(),
{rabbit_types:rabbit_amqqueue_name(), rabbit_types:exchange_name()} |
rabbit_types:r(exchange | queue)}).

-type(channel_stats_type() :: queue_exchange_stats | queue_stats |
exchange_stats | reductions).

-type(exchange_operation() :: publish | confirm | return_unroutable | drop_unroutable).

-type(activity_status() :: up | single_active | waiting | suspected_down).
%%----------------------------------------------------------------------------
%% Specs
Expand Down Expand Up @@ -107,8 +135,7 @@
%%----------------------------------------------------------------------------

create_table({Table, Type}) ->
ets:new(Table, [Type, public, named_table, {write_concurrency, true},
{read_concurrency, true}]).
ets:new(Table, [Type, public, named_table, {write_concurrency, true}]).

init() ->
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
Expand Down Expand Up @@ -146,6 +173,92 @@ connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
ok.

-spec session_begun(rabbit_types:infos()) -> ok.
session_begun(ImmutableInfos) ->
ets:insert(session_metrics, {self(), ImmutableInfos, _MutableInfos = []}),
ok.

-spec session_ended() -> ok.
session_ended() ->
ets:delete(session_metrics, self()),
ok.

-spec session_pids() -> [pid()].
session_pids() ->
lists:map(fun([Pid]) ->
Pid
end, ets:match(session_metrics, {'$1', '_', '_'})).

-spec session_infos(rabbit_types:info_keys()) -> [rabbit_types:infos()].
session_infos(Items) ->
lists:map(fun({Pid, ImmutableInfos, MutableInfos}) ->
Infos = maps:from_list([{pid, Pid}] ++ ImmutableInfos ++ MutableInfos),
lists:map(fun(Item) ->
{Item, maps:get(Item, Infos)}
end, Items)
end, ets:tab2list(session_metrics)).

-spec session_stats(rabbit_types:infos()) -> ok.
session_stats(MutableInfos) ->
ets:update_element(session_metrics, self(), {3, MutableInfos}),
ok.

% -spec session_stats(session_stats_type(), atom(), session_stats_key(), pos_integer()) -> ok.
% session_stats(exchange_stats, publish, {_SessionPid, XName} = Key, Value) ->
% _ = ets:update_counter(session_exchange_metrics, Key, {2, Value}, ?SESSION_EXCHANGE_METRICS_DEFAULT(Key)),
% _ = ets:update_counter(exchange_metrics, XName, {2, Value}, ?EXCHANGE_METRICS_DEFAULT(XName)),
% ok;
% session_stats(exchange_stats, accept, {_SessionPid, XName} = Key, Value) ->
% _ = ets:update_counter(session_exchange_metrics, Key, {3, Value}, ?SESSION_EXCHANGE_METRICS_DEFAULT(Key)),
% _ = ets:update_counter(exchange_metrics, XName, {3, Value}, ?EXCHANGE_METRICS_DEFAULT(XName)),
% ok;
% session_stats(exchange_stats, return_unroutable, {_SessionPid, XName} = Key, Value) ->
% _ = ets:update_counter(session_exchange_metrics, Key, {4, Value}, ?SESSION_EXCHANGE_METRICS_DEFAULT(Key)),
% _ = ets:update_counter(exchange_metrics, XName, {4, Value}, ?EXCHANGE_METRICS_DEFAULT(XName)),
% ok;
% session_stats(exchange_stats, drop_unroutable, {_SessionPid, XName} = Key, Value) ->
% _ = ets:update_counter(session_exchange_metrics, Key, {5, Value}, ?SESSION_EXCHANGE_METRICS_DEFAULT(Key)),
% _ = ets:update_counter(exchange_metrics, XName, {5, Value}, ?EXCHANGE_METRICS_DEFAULT(XName)),
% ok;

% session_stats(queue_stats, deliver_unsettled, {_SessionPid, QName} = Key, Value) ->
% _ = ets:update_counter(session_queue_metrics, Key, {2, Value}, ?SESSION_QUEUE_METRICS_DEFAULT(Key)),
% _ = ets:update_counter(queue_delivery_metrics, QName, {4, Value}, ?QUEUE_METRICS_DEFAULT(QName)),
% ok;
% session_stats(queue_stats, deliver_settled, {_SessionPid, QName} = Key, Value) ->
% _ = ets:update_counter(session_queue_metrics, Key, {3, Value}, ?SESSION_QUEUE_METRICS_DEFAULT(Key)),
% _ = ets:update_counter(queue_delivery_metrics, QName, {5, Value}, ?QUEUE_METRICS_DEFAULT(QName)),
% ok;
% session_stats(queue_stats, redeliver, {_SessionPid, QName} = Key, Value) ->
% _ = ets:update_counter(session_queue_metrics, Key, {4, Value}, ?SESSION_QUEUE_METRICS_DEFAULT(Key)),
% _ = ets:update_counter(queue_delivery_metrics, QName, {6, Value}, ?QUEUE_METRICS_DEFAULT(QName)),
% ok;
% session_stats(queue_stats, accept, {_SessionPid, QName} = Key, Value) ->
% _ = ets:update_counter(session_queue_metrics, Key, {5, Value}, ?SESSION_QUEUE_METRICS_DEFAULT(Key)),
% _ = ets:update_counter(queue_delivery_metrics, QName, {7, Value}, ?QUEUE_METRICS_DEFAULT(QName)),
% ok;
% session_stats(queue_stats, reject, {_SessionPid, _QName} = Key, Value) ->
% _ = ets:update_counter(session_queue_metrics, Key, {6, Value}, ?SESSION_QUEUE_METRICS_DEFAULT(Key)),
% ok;
% session_stats(queue_stats, release, {_SessionPid, _QName} = Key, Value) ->
% _ = ets:update_counter(session_queue_metrics, Key, {7, Value}, ?SESSION_QUEUE_METRICS_DEFAULT(Key)),
% ok;
% session_stats(queue_stats, modify, {_SessionPid, _QName} = Key, Value) ->
% _ = ets:update_counter(session_queue_metrics, Key, {8, Value}, ?SESSION_QUEUE_METRICS_DEFAULT(Key)),
% ok.

-spec exchange_stats(exchange_operation(), rabbit_types:exchange_name(), pos_integer()) -> ok.
exchange_stats(Operation, XName, Incr) ->
Pos = case Operation of
publish -> 2;
confirm -> 3;
return_unroutable -> 4;
drop_unroutable -> 5
end,
%% Last field is delete marker.
_ = ets:update_counter(exchange_metrics, XName, {Pos, Incr}, {XName, 0, 0, 0, 0, 0}),
ok.

channel_created(Pid, Infos) ->
ets:insert(channel_created, {Pid, Infos}),
_ = ets:update_counter(connection_churn_metrics, node(), {4, 1},
Expand All @@ -168,26 +281,22 @@ channel_stats(reductions, Id, Value) ->
ets:insert(channel_process_metrics, {Id, Value}),
ok.

channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) ->
channel_stats(exchange_stats, Op = publish, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) ->
exchange_stats(Op, XName, Value);
channel_stats(exchange_stats, Op = confirm, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) ->
exchange_stats(Op, XName, Value);
channel_stats(exchange_stats, Op = return_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) ->
exchange_stats(Op, XName, Value);
channel_stats(exchange_stats, Op = drop_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
exchange_stats(Op, XName, Value);
channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}),
Expand Down
29 changes: 19 additions & 10 deletions deps/rabbit_common/src/rabbit_event.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
-include("rabbit.hrl").

-export([start_link/0]).
-export([init_stats_timer/2, init_disabled_stats_timer/2,
-export([init_stats_timer/0, init_stats_timer/2, init_disabled_stats_timer/2,
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([stats_level/1, stats_level/2, if_enabled/3]).
-export([notify/2, notify/3, notify_if/3]).
-export([sync_notify/2, sync_notify/3]).

Expand Down Expand Up @@ -49,7 +49,6 @@
-spec ensure_stats_timer(container(), pos(), term()) -> container().
-spec stop_stats_timer(container(), pos()) -> container().
-spec reset_stats_timer(container(), pos()) -> container().
-spec stats_level(container(), pos()) -> level().
-spec if_enabled(container(), pos(), timer_fun()) -> 'ok'.
-spec notify(event_type(), event_props()) -> 'ok'.
-spec notify(event_type(), event_props(), reference() | 'none') -> 'ok'.
Expand Down Expand Up @@ -89,12 +88,18 @@ start_link() ->
%% Nowadays, instead of sending a message to rabbit_event via notify(stats),
%% some stat-emitting objects update ETS tables directly via module rabbit_core_metrics.

init_stats_timer(C, P) ->
-spec init_stats_timer() -> state().
init_stats_timer() ->
%% If the rabbit app is not loaded - use default none:5000
StatsLevel = application:get_env(rabbit, collect_statistics, none),
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
setelement(P, C, #state{level = StatsLevel, interval = Interval,
timer = undefined}).
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
#state{level = StatsLevel,
interval = Interval,
timer = undefined}.

init_stats_timer(C, P) ->
State = init_stats_timer(),
setelement(P, C, State).

init_disabled_stats_timer(C, P) ->
setelement(P, C, #state{level = none, interval = 0, timer = undefined}).
Expand Down Expand Up @@ -128,10 +133,14 @@ reset_stats_timer(C, P) ->
C
end.

stats_level(C, P) ->
#state{level = Level} = element(P, C),
-spec stats_level(state()) -> level().
stats_level(#state{level = Level}) ->
Level.

-spec stats_level(container(), pos()) -> level().
stats_level(C, P) ->
stats_level(element(P, C)).

if_enabled(C, P, Fun) ->
case element(P, C) of
#state{level = none} -> ok;
Expand All @@ -156,5 +165,5 @@ event_cons(Type, Props, Ref) ->
#event{type = Type,
props = Props,
reference = Ref,
timestamp = os:system_time(milli_seconds)}.
timestamp = os:system_time(millisecond)}.

1 change: 1 addition & 0 deletions deps/rabbitmq_cli/lib/rabbitmq/cli/core/doc_guide.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule RabbitMQ.CLI.Core.DocGuide do
Macros.defguide("alarms")
Macros.defguide("disk_alarms")
Macros.defguide("alternate_exchange", path_segment: "ae")
Macros.defguide("amqp")
Macros.defguide("channels")
Macros.defguide("cli")
Macros.defguide("clustering")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
##

defmodule RabbitMQ.CLI.Ctl.Commands.ListSessionsCommand do
alias RabbitMQ.CLI.Core.{DocGuide, Helpers}
alias RabbitMQ.CLI.Ctl.{InfoKeys, RpcStream}

@behaviour RabbitMQ.CLI.CommandBehaviour

def scopes(), do: [:ctl, :diagnostics]

@info_keys ~w(pid name connection_pid channel_number user vhost handle_max
in_attach in_flow in_transfer in_disposition in_detach)a

def info_keys(), do: @info_keys

def merge_defaults([], opts) do
merge_defaults(~w(pid name user), opts)
end

def merge_defaults(args, opts) do
{args, Map.merge(%{table_headers: true}, opts)}
end

def validate(args, _) do
case InfoKeys.validate_info_keys(args, @info_keys) do
{:ok, _} -> :ok
err -> err
end
end

use RabbitMQ.CLI.Core.RequiresRabbitAppRunning

def run([], opts) do
run(~w(pid name user) |> Enum.map(&to_charlist/1), opts)
end

def run([_ | _] = args, %{node: node_name, timeout: timeout}) do
info_keys = InfoKeys.prepare_info_keys(args)
broker_keys = InfoKeys.broker_keys(info_keys)

Helpers.with_nodes_in_cluster(node_name, fn nodes ->
RpcStream.receive_list_items(
node_name,
:rabbit_amqp_session,
:emit_info_all,
[nodes, broker_keys],
timeout,
info_keys,
Kernel.length(nodes)
)
end)
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.Table

def banner(_, _), do: "Listing AMQP 1.0 sessions ..."

def usage() do
"list_sessions [--no-table-headers] [<column> ...]"
end

def usage_additional() do
[
["<column>", "must be one of " <> Enum.join(Enum.sort(@info_keys), ", ")]
]
end

def usage_doc_guides() do
[
DocGuide.amqp()
]
end

def help_section(), do: :observability_and_health_checks

def description(), do: "Lists all AMQP 1.0 sessions"
end
8 changes: 4 additions & 4 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
rabbit_mqtt_processor:state(),
connection_state :: running | blocked,
conserve :: boolean(),
stats_timer :: option(rabbit_event:state()),
stats_timer :: rabbit_event:state(),
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
conn_name :: binary()
}).
Expand Down Expand Up @@ -87,9 +87,9 @@ init(Ref) ->
await_recv = false,
connection_state = running,
conserve = false,
parse_state = rabbit_mqtt_packet:init_state()},
State1 = control_throttle(State0),
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
parse_state = rabbit_mqtt_packet:init_state(),
stats_timer = rabbit_event:init_stats_timer()},
State = control_throttle(State0),
gen_server:enter_loop(?MODULE, [], State);
{error, Reason = enotconn} ->
?LOG_INFO("MQTT could not get connection string: ~s", [Reason]),
Expand Down
Loading

0 comments on commit 1969ae4

Please sign in to comment.