diff --git a/deps/rabbit/include/rabbit_global_counters.hrl b/deps/rabbit/include/rabbit_global_counters.hrl new file mode 100644 index 000000000000..f4eac1268eb7 --- /dev/null +++ b/deps/rabbit/include/rabbit_global_counters.hrl @@ -0,0 +1,2 @@ +-define(NUM_PROTOCOL_COUNTERS, 8). +-define(NUM_PROTOCOL_QUEUE_TYPE, 8). diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl index 70b82c88f638..2689cca0e79e 100644 --- a/deps/rabbit/src/rabbit_global_counters.erl +++ b/deps/rabbit/src/rabbit_global_counters.erl @@ -10,8 +10,10 @@ -export([ boot_step/0, init/1, + init/2, overview/0, prometheus_format/0, + increase_protocol_counter/3, messages_received/2, messages_received_confirm/2, messages_routed/2, @@ -41,6 +43,7 @@ -define(MESSAGES_CONFIRMED, 6). -define(PUBLISHERS, 7). -define(CONSUMERS, 8). +%% Note: ?NUM_PROTOCOL_COUNTERS needs to be up-to-date. See include/rabbit_global_counters.hrl -define(PROTOCOL_COUNTERS, [ { @@ -86,6 +89,7 @@ -define(MESSAGES_GET_EMPTY, 6). -define(MESSAGES_REDELIVERED, 7). -define(MESSAGES_ACKNOWLEDGED, 8). +%% Note: ?NUM_PROTOCOL_QUEUE_TYPE_COUNTERS needs to be up-to-date. See include/rabbit_global_counters.hrl -define(PROTOCOL_QUEUE_TYPE_COUNTERS, [ { @@ -128,14 +132,17 @@ boot_step() -> init([{protocol, amqp091}, {queue_type, rabbit_quorum_queue}]), init([{protocol, amqp091}, {queue_type, rabbit_stream_queue}]). -init(Labels = [{protocol, Protocol}, {queue_type, QueueType}]) -> +init(Labels) -> + init(Labels, []). + +init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) -> _ = seshat_counters:new_group(?MODULE), - Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS), + Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra), persistent_term:put({?MODULE, Protocol, QueueType}, Counters), ok; -init(Labels = [{protocol, Protocol}]) -> +init(Labels = [{protocol, Protocol}], Extra) -> _ = seshat_counters:new_group(?MODULE), - Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_COUNTERS), + Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra), persistent_term:put({?MODULE, Protocol}, Counters), ok. @@ -145,6 +152,9 @@ overview() -> prometheus_format() -> seshat_counters:prometheus_format(?MODULE). +increase_protocol_counter(Protocol, Counter, Num) -> + counters:add(fetch(Protocol), Counter, Num). + messages_received(Protocol, Num) -> counters:add(fetch(Protocol), ?MESSAGES_RECEIVED, Num). diff --git a/deps/rabbitmq_prometheus/metrics.md b/deps/rabbitmq_prometheus/metrics.md index f3063628148b..8a8a64390964 100644 --- a/deps/rabbitmq_prometheus/metrics.md +++ b/deps/rabbitmq_prometheus/metrics.md @@ -67,6 +67,31 @@ To generate these: | rabbitmq_global_publishers | Publishers currently connected | | rabbitmq_global_consumers | Consumers currently connected | +#### Stream global counters + +These metrics are specific to the stream protocol. + +| Metric | Description | +| --- | --- | +| stream_error_stream_does_not_exist_total | Total number of commands rejected with stream does not exist error | +| stream_error_subscription_id_already_exists_total | Total number of commands failed with subscription id already exists | +| stream_error_subscription_id_does_not_exist_total | Total number of commands failed with subscription id does not exist | +| stream_error_stream_already_exists_total | Total number of commands failed with stream already exists | +| stream_error_stream_not_available_total | Total number of commands failed with stream not available | +| stream_error_sasl_mechanism_not_supported_total | Total number of commands failed with sasl mechanism not supported | +| stream_error_authentication_failure_total | Total number of commands failed with authentication failure | +| stream_error_sasl_error_total | Total number of commands failed with sasl error | +| stream_error_sasl_challenge_total | Total number of commands failed with sasl challenge | +| stream_error_sasl_authentication_failure_loopback_total | Total number of commands failed with sasl authentication failure loopback | +| stream_error_vhost_access_failure_total | Total number of commands failed with vhost access failure | +| stream_error_unknown_frame_total | Total number of commands failed with unknown frame | +| stream_error_frame_too_large_total | Total number of commands failed with frame too large | +| stream_error_internal_error_total | Total number of commands failed with internal error | +| stream_error_access_refused_total | Total number of commands failed with access refused | +| stream_error_precondition_failed_total | Total number of commands failed with precondition failed | +| stream_error_publisher_does_not_exist_total | Total number of commands failed with publisher does not exist | + + ### Generic | Metric | Description | diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel index 4eee9deaf1c8..37af0e87fc59 100644 --- a/deps/rabbitmq_stream/BUILD.bazel +++ b/deps/rabbitmq_stream/BUILD.bazel @@ -41,9 +41,6 @@ BUILD_DEPS = [ DEPS = [ "//deps/rabbitmq_stream_common:bazel_erlang_lib", -] - -RUNTIME_DEPS = [ "//deps/rabbit:bazel_erlang_lib", ] @@ -53,7 +50,6 @@ rabbitmq_lib( app_module = APP_MODULE, app_name = APP_NAME, build_deps = BUILD_DEPS, - runtime_deps = RUNTIME_DEPS, deps = DEPS, ) @@ -105,6 +101,7 @@ rabbitmq_integration_suite( }, }, deps = [ + "//deps/rabbit:bazel_erlang_lib", "//deps/rabbitmq_stream_common:bazel_erlang_lib", ], ) diff --git a/deps/rabbitmq_stream/include/rabbit_stream_metrics.hrl b/deps/rabbitmq_stream/include/rabbit_stream_metrics.hrl index 8fe3ac7c0eae..066962c4def3 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream_metrics.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream_metrics.hrl @@ -1,2 +1,94 @@ +-include_lib("rabbit/include/rabbit_global_counters.hrl"). + -define(TABLE_CONSUMER, rabbit_stream_consumer_created). -define(TABLE_PUBLISHER, rabbit_stream_publisher_created). + +-define(STREAM_DOES_NOT_EXIST, ?NUM_PROTOCOL_COUNTERS + 1). +-define(SUBSCRIPTION_ID_ALREADY_EXISTS, ?NUM_PROTOCOL_COUNTERS + 2). +-define(SUBSCRIPTION_ID_DOES_NOT_EXIST, ?NUM_PROTOCOL_COUNTERS + 3). +-define(STREAM_ALREADY_EXISTS, ?NUM_PROTOCOL_COUNTERS + 4). +-define(STREAM_NOT_AVAILABLE, ?NUM_PROTOCOL_COUNTERS + 5). +-define(SASL_MECHANISM_NOT_SUPPORTED, ?NUM_PROTOCOL_COUNTERS + 6). +-define(AUTHENTICATION_FAILURE, ?NUM_PROTOCOL_COUNTERS + 7). +-define(SASL_ERROR, ?NUM_PROTOCOL_COUNTERS + 8). +-define(SASL_CHALLENGE, ?NUM_PROTOCOL_COUNTERS + 9). +-define(SASL_AUTHENTICATION_FAILURE_LOOPBACK, ?NUM_PROTOCOL_COUNTERS + 10). +-define(VHOST_ACCESS_FAILURE, ?NUM_PROTOCOL_COUNTERS + 11). +-define(UNKNOWN_FRAME, ?NUM_PROTOCOL_COUNTERS + 12). +-define(FRAME_TOO_LARGE, ?NUM_PROTOCOL_COUNTERS + 13). +-define(INTERNAL_ERROR, ?NUM_PROTOCOL_COUNTERS + 14). +-define(ACCESS_REFUSED, ?NUM_PROTOCOL_COUNTERS + 15). +-define(PRECONDITION_FAILED, ?NUM_PROTOCOL_COUNTERS + 16). +-define(PUBLISHER_DOES_NOT_EXIST, ?NUM_PROTOCOL_COUNTERS + 17). + +-define(PROTOCOL_COUNTERS, + [ + { + stream_error_stream_does_not_exist_total, ?STREAM_DOES_NOT_EXIST, counter, + "Total number of commands rejected with stream does not exist error" + }, + { + stream_error_subscription_id_already_exists_total, ?SUBSCRIPTION_ID_ALREADY_EXISTS, counter, + "Total number of commands failed with subscription id already exists" + }, + { + stream_error_subscription_id_does_not_exist_total, ?SUBSCRIPTION_ID_DOES_NOT_EXIST, counter, + "Total number of commands failed with subscription id does not exist" + }, + { + stream_error_stream_already_exists_total, ?STREAM_ALREADY_EXISTS, counter, + "Total number of commands failed with stream already exists" + }, + { + stream_error_stream_not_available_total, ?STREAM_NOT_AVAILABLE, counter, + "Total number of commands failed with stream not available" + }, + { + stream_error_sasl_mechanism_not_supported_total, ?SASL_MECHANISM_NOT_SUPPORTED, counter, + "Total number of commands failed with sasl mechanism not supported" + }, + { + stream_error_authentication_failure_total, ?AUTHENTICATION_FAILURE, counter, + "Total number of commands failed with authentication failure" + }, + { + stream_error_sasl_error_total, ?SASL_ERROR, counter, + "Total number of commands failed with sasl error" + }, + { + stream_error_sasl_challenge_total, ?SASL_CHALLENGE, counter, + "Total number of commands failed with sasl challenge" + }, + { + stream_error_sasl_authentication_failure_loopback_total, ?SASL_AUTHENTICATION_FAILURE_LOOPBACK, counter, + "Total number of commands failed with sasl authentication failure loopback" + }, + { + stream_error_vhost_access_failure_total, ?VHOST_ACCESS_FAILURE, counter, + "Total number of commands failed with vhost access failure" + }, + { + stream_error_unknown_frame_total, ?UNKNOWN_FRAME, counter, + "Total number of commands failed with unknown frame" + }, + { + stream_error_frame_too_large_total, ?FRAME_TOO_LARGE, counter, + "Total number of commands failed with frame too large" + }, + { + stream_error_internal_error_total, ?INTERNAL_ERROR, counter, + "Total number of commands failed with internal error" + }, + { + stream_error_access_refused_total, ?ACCESS_REFUSED, counter, + "Total number of commands failed with access refused" + }, + { + stream_error_precondition_failed_total, ?PRECONDITION_FAILED, counter, + "Total number of commands failed with precondition failed" + }, + { + stream_error_publisher_does_not_exist_total, ?PUBLISHER_DOES_NOT_EXIST, counter, + "Total number of commands failed with publisher does not exist" + } + ]). diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index e52aca57509a..c50ad318cfe1 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -34,10 +34,11 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). +-include("rabbit_stream_metrics.hrl"). start(_Type, _Args) -> rabbit_stream_metrics:init(), - rabbit_global_counters:init([{protocol, stream}]), + rabbit_global_counters:init([{protocol, stream}], ?PROTOCOL_COUNTERS), rabbit_global_counters:init([{protocol, stream}, {queue_type, ?STREAM_QUEUE_TYPE}]), rabbit_stream_sup:start_link(). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 47c0ece7ab1a..2a61bbcdb79f 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -20,6 +20,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). +-include("rabbit_stream_metrics.hrl"). -type stream() :: binary(). -type publisher_id() :: byte(). @@ -679,6 +680,7 @@ open(info, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Frame = rabbit_stream_core:frame(Command), send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_NOT_AVAILABLE, 1), {NewConnection, NewState}; {not_cleaned, SameConnection, SameState} -> {SameConnection, SameState} @@ -1351,6 +1353,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, ?PRECONDITION_FAILED, 1), {Connection0, State}; handle_frame_post_auth(Transport, #stream_connection{user = User, @@ -1377,6 +1380,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {Connection0, State}; {ClusterLeader, #stream_connection{publishers = Publishers0, @@ -1423,6 +1427,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, ?PRECONDITION_FAILED, 1), {Connection0, State} end; error -> @@ -1431,6 +1436,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), + rabbit_global_counters:increase_protocol_counter(stream, ?ACCESS_REFUSED, 1), {Connection0, State} end; handle_frame_post_auth(Transport, @@ -1475,6 +1481,7 @@ handle_frame_post_auth(Transport, PublishingIds}, Frame = rabbit_stream_core:frame(Command), send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, ?ACCESS_REFUSED, 1), increase_messages_errored(Counters, MessageCount), {Connection, State} end; @@ -1487,6 +1494,7 @@ handle_frame_post_auth(Transport, PublishingIds}, Frame = rabbit_stream_core:frame(Command), send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, ?PUBLISHER_DOES_NOT_EXIST, 1), {Connection, State} end; handle_frame_post_auth(Transport, @@ -1510,6 +1518,7 @@ handle_frame_post_auth(Transport, Stream) of {error, not_found} -> + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; {ok, LocalMemberPid} -> {?RESPONSE_CODE_OK, @@ -1522,6 +1531,7 @@ handle_frame_post_auth(Transport, end} end; error -> + rabbit_global_counters:increase_protocol_counter(stream, ?ACCESS_REFUSED, 1), {?RESPONSE_CODE_ACCESS_REFUSED, 0} end, Frame = @@ -1564,6 +1574,7 @@ handle_frame_post_auth(Transport, delete_publisher, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST), + rabbit_global_counters:increase_protocol_counter(stream, ?PUBLISHER_DOES_NOT_EXIST, 1), {Connection0, State} end; handle_frame_post_auth(Transport, @@ -1600,6 +1611,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE), + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_NOT_AVAILABLE, 1), {Connection, State}; {error, not_found} -> response(Transport, @@ -1607,6 +1619,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {Connection, State}; {ok, LocalMemberPid} -> case subscription_exists(StreamSubscriptions, @@ -1618,6 +1631,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS), + rabbit_global_counters:increase_protocol_counter(stream, ?SUBSCRIPTION_ID_ALREADY_EXISTS, 1), {Connection, State}; false -> rabbit_log:info("Creating subscription ~p to ~p, with offset specificat" @@ -1722,6 +1736,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), + rabbit_global_counters:increase_protocol_counter(stream, ?ACCESS_REFUSED, 1), {Connection, State} end; handle_frame_post_auth(Transport, @@ -1751,6 +1766,7 @@ handle_frame_post_auth(Transport, {credit, Code, SubscriptionId}}), send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, ?SUBSCRIPTION_ID_DOES_NOT_EXIST, 1), {Connection1, State1}; {{segment, Segment1}, {credit, Credit1}} -> Consumer1 = @@ -1770,6 +1786,7 @@ handle_frame_post_auth(Transport, rabbit_stream_core:frame({response, 1, {credit, Code, SubscriptionId}}), send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, ?SUBSCRIPTION_ID_DOES_NOT_EXIST, 1), {Connection, State} end; handle_frame_post_auth(_Transport, @@ -1819,6 +1836,7 @@ handle_frame_post_auth(Transport, ok -> case lookup_leader(Stream, Connection0) of cluster_not_found -> + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0}; {LeaderPid, C} -> {?RESPONSE_CODE_OK, @@ -1831,6 +1849,7 @@ handle_frame_post_auth(Transport, C} end; error -> + rabbit_global_counters:increase_protocol_counter(stream, ?ACCESS_REFUSED, 1), {?RESPONSE_CODE_ACCESS_REFUSED, 0, Connection0} end, Frame = @@ -1852,6 +1871,7 @@ handle_frame_post_auth(Transport, unsubscribe, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST), + rabbit_global_counters:increase_protocol_counter(stream, ?SUBSCRIPTION_ID_DOES_NOT_EXIST, 1), {Connection, State}; true -> {Connection1, State1} = @@ -1902,6 +1922,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, ?PRECONDITION_FAILED, 1), {Connection, State}; {error, reference_already_exists} -> response(Transport, @@ -1909,6 +1930,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_ALREADY_EXISTS, 1), {Connection, State}; {error, _} -> response(Transport, @@ -1916,6 +1938,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR), + rabbit_global_counters:increase_protocol_counter(stream, ?INTERNAL_ERROR, 1), {Connection, State} end; error -> @@ -1924,6 +1947,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), + rabbit_global_counters:increase_protocol_counter(stream, ?ACCESS_REFUSED, 1), {Connection, State} end; _ -> @@ -1932,6 +1956,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, ?PRECONDITION_FAILED, 1), {Connection, State} end; handle_frame_post_auth(Transport, @@ -1969,6 +1994,7 @@ handle_frame_post_auth(Transport, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Frame = rabbit_stream_core:frame(Command), send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_NOT_AVAILABLE, 1), {NewConnection, NewState}; {not_cleaned, SameConnection, SameState} -> {SameConnection, SameState} @@ -1980,6 +2006,7 @@ handle_frame_post_auth(Transport, delete_stream, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {Connection, State} end; error -> @@ -1988,6 +2015,7 @@ handle_frame_post_auth(Transport, delete_stream, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), + rabbit_global_counters:increase_protocol_counter(stream, ?ACCESS_REFUSED, 1), {Connection, State} end; handle_frame_post_auth(Transport, @@ -2103,6 +2131,7 @@ handle_frame_post_auth(Transport, {?RESPONSE_CODE_OK, <>}; {error, _} -> + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<(-1):16>>} end, @@ -2135,6 +2164,7 @@ handle_frame_post_auth(Transport, <>, Streams), {?RESPONSE_CODE_OK, Bin}; {error, _} -> + rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<0:32>>} end, @@ -2175,6 +2205,7 @@ handle_frame_post_auth(Transport, {close, ?RESPONSE_CODE_UNKNOWN_FRAME, CloseReason}}), send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, ?UNKNOWN_FRAME, 1), {Connection#stream_connection{connection_step = close_sent}, State}. notify_connection_closed(#stream_connection{name = Name, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index cce921ddc743..7d1c6b576ef2 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -41,6 +41,9 @@ groups() -> unauthenticated_client_rejected_tcp_connected, unauthenticated_client_rejected_peer_properties_exchanged, unauthenticated_client_rejected_authenticating]}, + %% Run `test_global_counters` on its own so the global metrics are + %% initialised to 0 for each testcase + {single_node, [], [test_global_counters]}, {cluster, [], [test_stream, test_stream_tls, java]}]. init_per_suite(Config) -> @@ -103,6 +106,38 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_Test, _Config) -> ok. +test_global_counters(Config) -> + Port = get_stream_port(Config), + test_server(gen_tcp, Port), + ?assertEqual(#{ + publishers => 0, + consumers => 0, + messages_confirmed_total => 2, + messages_received_confirm_total => 2, + messages_received_total => 2, + messages_routed_total => 0, + messages_unroutable_dropped_total => 0, + messages_unroutable_returned_total => 0, + stream_error_access_refused_total => 0, + stream_error_authentication_failure_total => 0, + stream_error_frame_too_large_total => 0, + stream_error_internal_error_total => 0, + stream_error_precondition_failed_total => 0, + stream_error_publisher_does_not_exist_total => 0, + stream_error_sasl_authentication_failure_loopback_total => 0, + stream_error_sasl_challenge_total => 0, + stream_error_sasl_error_total => 0, + stream_error_sasl_mechanism_not_supported_total => 0, + stream_error_stream_already_exists_total => 0, + stream_error_stream_does_not_exist_total => 0, + stream_error_stream_not_available_total => 1, + stream_error_subscription_id_already_exists_total => 0, + stream_error_subscription_id_does_not_exist_total => 0, + stream_error_unknown_frame_total => 0, + stream_error_vhost_access_failure_total => 0 + }, get_global_counters(Config)), + ok. + test_stream(Config) -> Port = get_stream_port(Config), test_server(gen_tcp, Port), @@ -422,3 +457,7 @@ receive_commands(Transport, S, C0) -> Res -> Res end. + +get_global_counters(Config) -> + maps:get([{protocol, stream}], + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, [])). diff --git a/deps/rabbitmq_stream_management/BUILD.bazel b/deps/rabbitmq_stream_management/BUILD.bazel index 36563a549d24..87b421eae967 100644 --- a/deps/rabbitmq_stream_management/BUILD.bazel +++ b/deps/rabbitmq_stream_management/BUILD.bazel @@ -25,9 +25,6 @@ BUILD_DEPS = [ DEPS = [ "//deps/rabbitmq_management:bazel_erlang_lib", "//deps/rabbitmq_stream:bazel_erlang_lib", -] - -RUNTIME_DEPS = [ "//deps/rabbit:bazel_erlang_lib", ] @@ -36,7 +33,6 @@ rabbitmq_lib( app_module = APP_MODULE, app_name = APP_NAME, build_deps = BUILD_DEPS, - runtime_deps = RUNTIME_DEPS, deps = DEPS, )