Skip to content

Commit

Permalink
Merge pull request #12820 from rabbitmq/rabbitmq-server-12814
Browse files Browse the repository at this point in the history
v4.0.x: Expose max offset lag of stream consumers via Prometheus
  • Loading branch information
michaelklishin authored Nov 26, 2024
2 parents 09a9865 + 2846f39 commit 5d8a80d
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 114 deletions.
3 changes: 1 addition & 2 deletions deps/amqp10_common/src/serial_number.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
diff/2,
foldl/4]).

-ifdef(TEST).
%% For tests.
-export([usort/1]).
-endif.

-type serial_number() :: sequence_no().
-export_type([serial_number/0]).
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_ct_helpers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ rabbitmq_app(
license_files = [":license_files"],
priv = [":priv"],
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_stream_common:erlang_app",
"@meck//:erlang_app",
"@proper//:erlang_app",
"@ra//:erlang_app",
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_ct_helpers/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PROJECT = rabbitmq_ct_helpers
PROJECT_DESCRIPTION = Common Test helpers for RabbitMQ

DEPS = rabbit_common proper inet_tcp_proxy meck
DEPS = rabbit_common amqp10_common rabbitmq_stream_common proper inet_tcp_proxy meck
TEST_DEPS = rabbit

XREF_IGNORE = [ \
Expand Down
7 changes: 5 additions & 2 deletions deps/rabbitmq_ct_helpers/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_ct_proper_helpers.erl",
"src/rabbit_ct_vm_helpers.erl",
"src/rabbit_mgmt_test_util.erl",
"src/stream_test_utils.erl",
],
hdrs = [":public_and_private_hdrs"],
app_name = "rabbitmq_ct_helpers",
dest = "ebin",
erlc_opts = "//:erlc_opts",
deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
)

def all_test_beam_files(name = "all_test_beam_files"):
Expand All @@ -45,12 +46,13 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_ct_proper_helpers.erl",
"src/rabbit_ct_vm_helpers.erl",
"src/rabbit_mgmt_test_util.erl",
"src/stream_test_utils.erl",
],
hdrs = [":public_and_private_hdrs"],
app_name = "rabbitmq_ct_helpers",
dest = "test",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
)

def all_srcs(name = "all_srcs"):
Expand Down Expand Up @@ -107,6 +109,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_ct_proper_helpers.erl",
"src/rabbit_ct_vm_helpers.erl",
"src/rabbit_mgmt_test_util.erl",
"src/stream_test_utils.erl",
],
)

Expand Down
137 changes: 137 additions & 0 deletions deps/rabbitmq_ct_helpers/src/stream_test_utils.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

%% There is no open source Erlang RabbitMQ Stream client.
%% Therefore, we have to build the Stream protocol commands manually.

-module(stream_test_utils).

-compile([export_all, nowarn_export_all]).

-include_lib("amqp10_common/include/amqp10_framing.hrl").

-define(RESPONSE_CODE_OK, 1).

connect(Config, Node) ->
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream),
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),

C0 = rabbit_stream_core:init(0),
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),

ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 1, sasl_handshake})),
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(Sock, C1),
Username = <<"guest">>,
Password = <<"guest">>,
Null = 0,
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(Sock, C2),
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(Sock, C3),

ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
{ok, Sock, C5}.

create_stream(Sock, C0, Stream) ->
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
ok = gen_tcp:send(Sock, CreateStreamFrame),
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

declare_publisher(Sock, C0, Stream, PublisherId) ->
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
ok = gen_tcp:send(Sock, SubscribeFrame),
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
Messages = [simple_entry(Seq, P)
|| {Seq, P} <- lists:zip(SeqIds, Payloads)],
{ok, SeqIds, C1} = publish_entries(Sock, C0, PublisherId, length(Messages), Messages),
{ok, C1}.

publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
ok = gen_tcp:send(Sock, PublishFrame1),
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
{ok, SeqIds, C1}.

%% Streams contain AMQP 1.0 encoded messages.
%% In this case, the AMQP 1.0 encoded message contains a single data section.
simple_entry(Sequence, Body)
when is_binary(Body) ->
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
DataSectSize = byte_size(DataSect),
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.

%% Streams contain AMQP 1.0 encoded messages.
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
simple_entry(Sequence, Body, AppProps)
when is_binary(Body) ->
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
Sects = <<AppPropsSect/binary, DataSect/binary>>,
SectSize = byte_size(Sects),
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.

%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
%% All data sections are delivered uncompressed in 1 batch.
sub_batch_entry_uncompressed(Sequence, Bodies) ->
Batch = lists:foldl(fun(Body, Acc) ->
AppProps = #'v1_0.application_properties'{
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
Sect = <<Sect0/binary, Sect1/binary>>,
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
end, <<>>, Bodies),
Size = byte_size(Batch),
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.

%% Here, each AMQP 1.0 encoded message contains a single data section.
%% All data sections are delivered in 1 gzip compressed batch.
sub_batch_entry_compressed(Sequence, Bodies) ->
Uncompressed = lists:foldl(fun(Body, Acc) ->
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
<<Acc/binary, Bin/binary>>
end, <<>>, Bodies),
Compressed = zlib:gzip(Uncompressed),
CompressedLen = byte_size(Compressed),
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
CompressedLen:32, Compressed:CompressedLen/binary>>.

receive_stream_commands(Sock, C0) ->
case rabbit_stream_core:next_command(C0) of
empty ->
case gen_tcp:recv(Sock, 0, 5000) of
{ok, Data} ->
C1 = rabbit_stream_core:incoming_data(Data, C0),
case rabbit_stream_core:next_command(C1) of
empty ->
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
rabbit_stream_core:next_command(
rabbit_stream_core:incoming_data(Data2, C1));
Res ->
Res
end;
{error, Err} ->
ct:fail("error receiving stream data ~w", [Err])
end;
Res ->
Res
end.
21 changes: 19 additions & 2 deletions deps/rabbitmq_prometheus/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
load("@rules_erlang//:eunit2.bzl", "eunit")
load("@rules_erlang//:xref2.bzl", "xref")
load("@rules_erlang//:dialyze.bzl", "dialyze", "plt")
load("//:rabbitmq_home.bzl", "rabbitmq_home")
load("//:rabbitmq_run.bzl", "rabbitmq_run")
load(
"//:rabbitmq.bzl",
"RABBITMQ_DIALYZER_OPTS",
"assert_suites",
"broker_for_integration_suites",
"rabbitmq_app",
"rabbitmq_integration_suite",
)
Expand Down Expand Up @@ -85,7 +86,19 @@ eunit(
target = ":test_erlang_app",
)

broker_for_integration_suites()
rabbitmq_home(
name = "broker-for-tests-home",
plugins = [
"//deps/rabbit:erlang_app",
"//deps/rabbitmq_stream:erlang_app",
":erlang_app",
],
)

rabbitmq_run(
name = "rabbitmq-for-tests-run",
home = ":broker-for-tests-home",
)

rabbitmq_integration_suite(
name = "config_schema_SUITE",
Expand All @@ -96,6 +109,10 @@ rabbitmq_integration_suite(
name = "rabbit_prometheus_http_SUITE",
size = "medium",
flaky = True,
runtime_deps = [
"//deps/rabbitmq_stream_common:erlang_app",
"//deps/rabbitmq_stream:erlang_app",
],
)

assert_suites()
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_prometheus/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
PROJECT_MOD := rabbit_prometheus_app
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_stream

EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
]},

%% the family name for this metric is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
{stream_consumer_metrics, [
{2, undefined, stream_consumer_max_offset_lag, gauge, "Current maximum of offset lag of consumers"}
]},

{connection_metrics, [
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
Expand Down Expand Up @@ -578,6 +583,17 @@ get_data(channel_metrics = Table, false, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(stream_consumer_metrics = MF, false, _) ->
Table = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({_, Props}, OldMax) ->
erlang:max(proplists:get_value(offset_lag, Props, 0), OldMax)
end, 0, Table) of
MaxOffsetLag ->
[{MF, MaxOffsetLag}]
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
Expand Down Expand Up @@ -708,6 +724,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
end, [], Table);
get_data(queue_consumer_count, true, _) ->
ets:tab2list(queue_metrics);
get_data(stream_consumer_metrics, true, _) ->
Table = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({{QueueName, _Pid, _SubId}, Props}, Map0) ->
Value = proplists:get_value(offset_lag, Props, 0),
maps:update_with(
QueueName,
fun(OldMax) -> erlang:max(Value, OldMax) end,
Value,
Map0)
end, #{}, Table) of
Map1 ->
maps:to_list(Map1)
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(vhost_status, _, _) ->
[ { #{<<"vhost">> => VHost},
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
Expand Down
Loading

0 comments on commit 5d8a80d

Please sign in to comment.