Skip to content

Commit

Permalink
Add specific stream protocol counters to track protocol errors
Browse files Browse the repository at this point in the history
  • Loading branch information
dcorbacho committed Jun 29, 2021
1 parent a3e98c2 commit 58e36b6
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 5 deletions.
2 changes: 2 additions & 0 deletions deps/rabbit/include/rabbit_global_counters.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-define(NUM_PROTOCOL_COUNTERS, 8).
-define(NUM_PROTOCOL_QUEUE_TYPE, 8).
18 changes: 14 additions & 4 deletions deps/rabbit/src/rabbit_global_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
-export([
boot_step/0,
init/1,
init/2,
overview/0,
prometheus_format/0,
increase_protocol_counter/3,
messages_received/2,
messages_received_confirm/2,
messages_routed/2,
Expand Down Expand Up @@ -41,6 +43,7 @@
-define(MESSAGES_CONFIRMED, 6).
-define(PUBLISHERS, 7).
-define(CONSUMERS, 8).
%% Note: ?NUM_PROTOCOL_COUNTERS needs to be up-to-date. See include/rabbit_global_counters.hrl
-define(PROTOCOL_COUNTERS,
[
{
Expand Down Expand Up @@ -86,6 +89,7 @@
-define(MESSAGES_GET_EMPTY, 6).
-define(MESSAGES_REDELIVERED, 7).
-define(MESSAGES_ACKNOWLEDGED, 8).
%% Note: ?NUM_PROTOCOL_QUEUE_TYPE_COUNTERS needs to be up-to-date. See include/rabbit_global_counters.hrl
-define(PROTOCOL_QUEUE_TYPE_COUNTERS,
[
{
Expand Down Expand Up @@ -128,14 +132,17 @@ boot_step() ->
init([{protocol, amqp091}, {queue_type, rabbit_quorum_queue}]),
init([{protocol, amqp091}, {queue_type, rabbit_stream_queue}]).

init(Labels = [{protocol, Protocol}, {queue_type, QueueType}]) ->
init(Labels) ->
init(Labels, []).

init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) ->
_ = seshat_counters:new_group(?MODULE),
Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS),
Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra),
persistent_term:put({?MODULE, Protocol, QueueType}, Counters),
ok;
init(Labels = [{protocol, Protocol}]) ->
init(Labels = [{protocol, Protocol}], Extra) ->
_ = seshat_counters:new_group(?MODULE),
Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_COUNTERS),
Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra),
persistent_term:put({?MODULE, Protocol}, Counters),
ok.

Expand All @@ -145,6 +152,9 @@ overview() ->
prometheus_format() ->
seshat_counters:prometheus_format(?MODULE).

increase_protocol_counter(Protocol, Counter, Num) ->
counters:add(fetch(Protocol), Counter, Num).

messages_received(Protocol, Num) ->
counters:add(fetch(Protocol), ?MESSAGES_RECEIVED, Num).

Expand Down
92 changes: 92 additions & 0 deletions deps/rabbitmq_stream/include/rabbit_stream_metrics.hrl
Original file line number Diff line number Diff line change
@@ -1,2 +1,94 @@
-include_lib("rabbit/include/rabbit_global_counters.hrl").

-define(TABLE_CONSUMER, rabbit_stream_consumer_created).
-define(TABLE_PUBLISHER, rabbit_stream_publisher_created).

-define(STREAM_DOES_NOT_EXIST, ?NUM_PROTOCOL_COUNTERS + 1).
-define(SUBSCRIPTION_ID_ALREADY_EXISTS, ?NUM_PROTOCOL_COUNTERS + 2).
-define(SUBSCRIPTION_ID_DOES_NOT_EXIST, ?NUM_PROTOCOL_COUNTERS + 3).
-define(STREAM_ALREADY_EXISTS, ?NUM_PROTOCOL_COUNTERS + 4).
-define(STREAM_NOT_AVAILABLE, ?NUM_PROTOCOL_COUNTERS + 5).
-define(SASL_MECHANISM_NOT_SUPPORTED, ?NUM_PROTOCOL_COUNTERS + 6).
-define(AUTHENTICATION_FAILURE, ?NUM_PROTOCOL_COUNTERS + 7).
-define(SASL_ERROR, ?NUM_PROTOCOL_COUNTERS + 8).
-define(SASL_CHALLENGE, ?NUM_PROTOCOL_COUNTERS + 9).
-define(SASL_AUTHENTICATION_FAILURE_LOOPBACK, ?NUM_PROTOCOL_COUNTERS + 10).
-define(VHOST_ACCESS_FAILURE, ?NUM_PROTOCOL_COUNTERS + 11).
-define(UNKNOWN_FRAME, ?NUM_PROTOCOL_COUNTERS + 12).
-define(FRAME_TOO_LARGE, ?NUM_PROTOCOL_COUNTERS + 13).
-define(INTERNAL_ERROR, ?NUM_PROTOCOL_COUNTERS + 14).
-define(ACCESS_REFUSED, ?NUM_PROTOCOL_COUNTERS + 15).
-define(PRECONDITION_FAILED, ?NUM_PROTOCOL_COUNTERS + 16).
-define(PUBLISHER_DOES_NOT_EXIST, ?NUM_PROTOCOL_COUNTERS + 17).

-define(PROTOCOL_COUNTERS,
[
{
stream_error_stream_does_not_exist, ?STREAM_DOES_NOT_EXIST, counter,
""
},
{
stream_error_subscription_id_already_exists, ?SUBSCRIPTION_ID_ALREADY_EXISTS, counter,
""
},
{
stream_error_subscription_id_does_not_exist, ?SUBSCRIPTION_ID_DOES_NOT_EXIST, counter,
""
},
{
stream_error_stream_already_exists, ?STREAM_ALREADY_EXISTS, counter,
""
},
{
stream_error_stream_not_available, ?STREAM_NOT_AVAILABLE, counter,
""
},
{
stream_error_sasl_mechanism_not_supported, ?SASL_MECHANISM_NOT_SUPPORTED, counter,
""
},
{
stream_error_authentication_failure, ?AUTHENTICATION_FAILURE, counter,
""
},
{
stream_error_sasl_error, ?SASL_ERROR, counter,
""
},
{
stream_error_sasl_challenge, ?SASL_CHALLENGE, counter,
""
},
{
stream_error_sasl_authentication_failure_loopback, ?SASL_AUTHENTICATION_FAILURE_LOOPBACK, counter,
""
},
{
stream_error_vhost_access_failure, ?VHOST_ACCESS_FAILURE, counter,
""
},
{
stream_error_unknown_frame, ?UNKNOWN_FRAME, counter,
""
},
{
stream_error_frame_too_large, ?FRAME_TOO_LARGE, counter,
""
},
{
stream_error_internal_error, ?INTERNAL_ERROR, counter,
""
},
{
stream_error_access_refused, ?ACCESS_REFUSED, counter,
""
},
{
stream_error_precondition_failed, ?PRECONDITION_FAILED, counter,
""
},
{
stream_error_publisher_does_not_exist, ?PUBLISHER_DOES_NOT_EXIST, counter,
""
}
]).
3 changes: 2 additions & 1 deletion deps/rabbitmq_stream/src/rabbit_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-include("rabbit_stream_metrics.hrl").

start(_Type, _Args) ->
rabbit_stream_metrics:init(),
rabbit_global_counters:init([{protocol, stream}]),
rabbit_global_counters:init([{protocol, stream}], ?PROTOCOL_COUNTERS),
rabbit_global_counters:init([{protocol, stream}, {queue_type, ?STREAM_QUEUE_TYPE}]),
rabbit_stream_sup:start_link().

Expand Down
Loading

0 comments on commit 58e36b6

Please sign in to comment.