Skip to content

Commit

Permalink
Feat: add topic metrics to on_deliver and on_delivery_complete events (
Browse files Browse the repository at this point in the history
…#42)

* feat: add topic metrics for on deliver and complete events

* chore: add test cases

* fix: topic metric increment for undefined label value

* ref: add acl name to vmq_msg for metrics increment

* chore: add spec to incr_matched_topic

* chore: lint and code formatting

* ref: move incr_matched_topic fn to vmq_metrics

* ref: fn call for incr_topic_counter from same module

* ref: unnecessary return value
  • Loading branch information
VivekPipaliya23 authored Feb 16, 2024
1 parent 247824b commit 4dbb030
Show file tree
Hide file tree
Showing 22 changed files with 2,052 additions and 284 deletions.
3 changes: 2 additions & 1 deletion apps/vmq_commons/src/vmq_types_common.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
undefined
| msg_expiry_ts(),
non_retry = false :: flag(),
non_persistence = false :: flag()
non_persistence = false :: flag(),
acl_name :: binary() | 'undefined'
}).
-type msg() :: #vmq_msg{}.
-record(matched_acl, {
Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_diversity/src/vmq_diversity.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
{vmq_diversity_plugin, on_publish, 7, []},
{vmq_diversity_plugin, on_subscribe, 3, []},
{vmq_diversity_plugin, on_unsubscribe, 3, []},
{vmq_diversity_plugin, on_deliver, 6, []},
{vmq_diversity_plugin, on_delivery_complete, 6, []},
{vmq_diversity_plugin, on_deliver, 7, []},
{vmq_diversity_plugin, on_delivery_complete, 7, []},

{vmq_diversity_plugin, auth_on_register_m5, 6, []},
{vmq_diversity_plugin, auth_on_publish_m5, 7, []},
Expand Down
8 changes: 4 additions & 4 deletions apps/vmq_diversity/src/vmq_diversity_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
on_publish/7,
on_subscribe/3,
on_unsubscribe/3,
on_deliver/6,
on_deliver/7,
on_offline_message/5,
on_client_wakeup/1,
on_client_offline/2,
on_client_gone/2,
on_session_expired/1,
on_delivery_complete/6,
on_delivery_complete/7,
auth_on_register_m5/6,
on_register_m5/4,
auth_on_publish_m5/7,
Expand Down Expand Up @@ -582,7 +582,7 @@ on_unsubscribe(UserName, SubscriberId, Topics) ->
])
end.

on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, _) ->
{MP, ClientId} = subscriber_id(SubscriberId),
all_till_ok(on_deliver, [
{username, nilify(UserName)},
Expand All @@ -594,7 +594,7 @@ on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
{retain, IsRetain}
]).

on_delivery_complete(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_delivery_complete(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, _) ->
{MP, ClientId} = subscriber_id(SubscriberId),
all(on_delivery_complete, [
{username, nilify(UserName)},
Expand Down
2 changes: 1 addition & 1 deletion apps/vmq_diversity/test/vmq_diversity_plugin_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ on_unsubscribe_test(_) ->

on_deliver_test(_) ->
ok = vmq_plugin:all_till_ok(on_deliver,
[username(), allowed_subscriber_id(), 1, topic(), payload(), false]).
[username(), allowed_subscriber_id(), 1, topic(), payload(), false, matched_acl()]).

on_offline_message_test(_) ->
[next] = vmq_plugin:all(on_offline_message, [allowed_subscriber_id(), 2,
Expand Down
18 changes: 1 addition & 17 deletions apps/vmq_enhanced_auth/src/vmq_enhanced_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ match(TIn, T, Tbl, Type, Key, Qos) ->
},
{true, MatchedAcl};
[{_, _, Label}] ->
incr_matched_topic(Label, Type, Qos),
vmq_metrics:incr_matched_topic(Label, Type, Qos),
MatchedAcl = #matched_acl{
name = Label, pattern = iolist_to_binary(vmq_topic:unword(T))
},
Expand All @@ -381,22 +381,6 @@ match(TIn, T, Tbl, Type, Key, Qos) ->
false
end.

incr_matched_topic(<<>>, _Type, _Qos) ->
ok;
incr_matched_topic(undefined, _Type, _Qos) ->
ok;
incr_matched_topic(Label, Type, Qos) ->
OperationName =
case Type of
read -> subscribe;
write -> publish
end,
_ = vmq_metrics:incr_topic_counter(
{topic_matches, OperationName, [
{acl_matched, Label}, {qos, integer_to_list(Qos)}
]}
).

topic(User, {MP, ClientId}, Topic) ->
subst(list_to_binary(MP), User, ClientId, Topic, []).

Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
{vmq_events_sidecar_plugin, on_publish, 7, []},
{vmq_events_sidecar_plugin, on_subscribe, 3, []},
{vmq_events_sidecar_plugin, on_unsubscribe, 3, []},
{vmq_events_sidecar_plugin, on_deliver, 6, []},
{vmq_events_sidecar_plugin, on_deliver, 7, []},
{vmq_events_sidecar_plugin, on_offline_message, 5, []},
{vmq_events_sidecar_plugin, on_client_wakeup, 1, []},
{vmq_events_sidecar_plugin, on_client_offline, 2, []},
{vmq_events_sidecar_plugin, on_session_expired, 1, []},
{vmq_events_sidecar_plugin, on_delivery_complete, 6, []},
{vmq_events_sidecar_plugin, on_delivery_complete, 7, []},
{vmq_events_sidecar_plugin, on_client_gone, 2, []}
]}
]},
Expand Down
24 changes: 20 additions & 4 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar_format.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ encode({on_unsubscribe, Timestamp, {MP, ClientId, UserName, Topics}}) ->
timestamp = convert_timestamp(Timestamp)
})
);
encode({on_deliver, Timestamp, {MP, ClientId, UserName, QoS, Topic, Payload, IsRetain}}) ->
encode(
{on_deliver, Timestamp,
{MP, ClientId, UserName, QoS, Topic, Payload, IsRetain, #matched_acl{
name = Name, pattern = Pattern
}}}
) ->
encode_envelope(
"OnDeliver",
on_deliver_pb:encode_msg(#'eventssidecar.v1.OnDeliver'{
Expand All @@ -111,10 +116,18 @@ encode({on_deliver, Timestamp, {MP, ClientId, UserName, QoS, Topic, Payload, IsR
qos = QoS,
is_retain = IsRetain,
payload = Payload,
timestamp = convert_timestamp(Timestamp)
timestamp = convert_timestamp(Timestamp),
matched_acl = #'eventssidecar.v1.MatchedACL'{
name = Name, pattern = Pattern
}
})
);
encode({on_delivery_complete, Timestamp, {MP, ClientId, UserName, QoS, Topic, Payload, IsRetain}}) ->
encode(
{on_delivery_complete, Timestamp,
{MP, ClientId, UserName, QoS, Topic, Payload, IsRetain, #matched_acl{
name = Name, pattern = Pattern
}}}
) ->
encode_envelope(
"OnDeliveryComplete",
on_delivery_complete_pb:encode_msg(#'eventssidecar.v1.OnDeliveryComplete'{
Expand All @@ -125,7 +138,10 @@ encode({on_delivery_complete, Timestamp, {MP, ClientId, UserName, QoS, Topic, Pa
qos = QoS,
is_retain = IsRetain,
payload = Payload,
timestamp = convert_timestamp(Timestamp)
timestamp = convert_timestamp(Timestamp),
matched_acl = #'eventssidecar.v1.MatchedACL'{
name = Name, pattern = Pattern
}
})
);
encode({on_offline_message, Timestamp, {MP, ClientId, QoS, Topic, Payload, IsRetain}}) ->
Expand Down
18 changes: 10 additions & 8 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
on_publish/7,
on_subscribe/3,
on_unsubscribe/3,
on_deliver/6,
on_deliver/7,
on_offline_message/5,
on_client_wakeup/1,
on_client_offline/2,
on_client_gone/2,
on_session_expired/1,
on_delivery_complete/6
on_delivery_complete/7
]).

%% API
Expand Down Expand Up @@ -290,23 +290,25 @@ on_unsubscribe(UserName, SubscriberId, Topics) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event(on_unsubscribe, {MP, ClientId, normalise(UserName), [unword(T) || T <- Topics]}).

-spec on_deliver(username(), subscriber_id(), qos(), topic(), payload(), flag()) ->
-spec on_deliver(username(), subscriber_id(), qos(), topic(), payload(), flag(), matched_acl()) ->
'next' | 'ok' | {'ok', payload() | [on_deliver_hook:msg_modifier()]}.
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, MatchedAcl) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event(
on_deliver,
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain},
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain, MatchedAcl},
UserName
).

-spec on_delivery_complete(username(), subscriber_id(), qos(), topic(), payload(), flag()) ->
-spec on_delivery_complete(
username(), subscriber_id(), qos(), topic(), payload(), flag(), matched_acl()
) ->
'next'.
on_delivery_complete(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_delivery_complete(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, MatchedAcl) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event(
on_delivery_complete,
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain}
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain, MatchedAcl}
).

-spec on_offline_message(subscriber_id(), qos(), topic(), payload(), flag()) -> 'next'.
Expand Down
7 changes: 5 additions & 2 deletions apps/vmq_events_sidecar/test/events_sidecar_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ on_deliver(#'eventssidecar.v1.OnDeliver'{username = BinPid,
qos = 1,
topic = ?TOPIC,
payload = ?PAYLOAD,
is_retain = false}) ->
is_retain = false,
matched_acl = #'eventssidecar.v1.MatchedACL'{name = ?LABEL, pattern = ?PATTERN}
}) ->
Pid = list_to_pid(binary_to_list(BinPid)),
Pid ! on_deliver_ok.

Expand All @@ -149,7 +151,8 @@ on_delivery_complete(#'eventssidecar.v1.OnDeliveryComplete'{username = BinPid,
qos = 1,
topic = ?TOPIC,
payload = ?PAYLOAD,
is_retain = false}) ->
is_retain = false,
matched_acl = #'eventssidecar.v1.MatchedACL'{name = ?LABEL, pattern = ?PATTERN}}) ->
Pid = list_to_pid(binary_to_list(BinPid)),
Pid ! on_delivery_complete_ok.

Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_events_sidecar/test/vmq_events_sidecar_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ on_deliver_test(_) ->
enable_hook(on_deliver),
Self = pid_to_bin(self()),
ok = vmq_plugin:all_till_ok(on_deliver,
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false]),
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #matched_acl{name = ?LABEL, pattern = ?PATTERN}]),
ok = exp_response(on_deliver_ok),
disable_hook(on_deliver).

on_delivery_complete_test(_) ->
enable_hook(on_delivery_complete),
Self = pid_to_bin(self()),
[ok] = vmq_plugin:all(on_delivery_complete,[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false]),
[ok] = vmq_plugin:all(on_delivery_complete,[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #matched_acl{name = ?LABEL, pattern = ?PATTERN}]),
ok = exp_response(on_delivery_complete_ok),
disable_hook(on_delivery_complete).

Expand Down
20 changes: 17 additions & 3 deletions apps/vmq_proto/include/on_deliver_pb.hrl
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
%% -*- coding: utf-8 -*-
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.1
%% Generated by gpb_compile version 4.20.0

-ifndef(on_deliver_pb).
-define(on_deliver_pb, true).

-define(on_deliver_pb_gpb_version, "4.19.1").
-define(on_deliver_pb_gpb_version, "4.20.0").

-ifndef('EVENTSSIDECAR.V1.ONDELIVER_PB_H').
-define('EVENTSSIDECAR.V1.ONDELIVER_PB_H', true).
Expand All @@ -26,7 +26,9 @@
% = 7, optional
is_retain = false :: boolean() | 0 | 1 | undefined,
% = 8, optional
payload = <<>> :: iodata() | undefined
payload = <<>> :: iodata() | undefined,
% = 9, optional
matched_acl = undefined :: on_deliver_pb:'eventssidecar.v1.MatchedACL'() | undefined
}
).
-endif.
Expand All @@ -43,4 +45,16 @@
).
-endif.

-ifndef('EVENTSSIDECAR.V1.MATCHEDACL_PB_H').
-define('EVENTSSIDECAR.V1.MATCHEDACL_PB_H', true).
-record('eventssidecar.v1.MatchedACL',
% = 1, optional
{
name = <<>> :: unicode:chardata() | undefined,
% = 2, optional
pattern = <<>> :: unicode:chardata() | undefined
}
).
-endif.

-endif.
21 changes: 18 additions & 3 deletions apps/vmq_proto/include/on_delivery_complete_pb.hrl
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
%% -*- coding: utf-8 -*-
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.1
%% Generated by gpb_compile version 4.20.0

-ifndef(on_delivery_complete_pb).
-define(on_delivery_complete_pb, true).

-define(on_delivery_complete_pb_gpb_version, "4.19.1").
-define(on_delivery_complete_pb_gpb_version, "4.20.0").

-ifndef('EVENTSSIDECAR.V1.ONDELIVERYCOMPLETE_PB_H').
-define('EVENTSSIDECAR.V1.ONDELIVERYCOMPLETE_PB_H', true).
Expand All @@ -26,7 +26,10 @@
% = 7, optional
is_retain = false :: boolean() | 0 | 1 | undefined,
% = 8, optional
payload = <<>> :: iodata() | undefined
payload = <<>> :: iodata() | undefined,
% = 9, optional
matched_acl = undefined ::
on_delivery_complete_pb:'eventssidecar.v1.MatchedACL'() | undefined
}
).
-endif.
Expand All @@ -43,4 +46,16 @@
).
-endif.

-ifndef('EVENTSSIDECAR.V1.MATCHEDACL_PB_H').
-define('EVENTSSIDECAR.V1.MATCHEDACL_PB_H', true).
-record('eventssidecar.v1.MatchedACL',
% = 1, optional
{
name = <<>> :: unicode:chardata() | undefined,
% = 2, optional
pattern = <<>> :: unicode:chardata() | undefined
}
).
-endif.

-endif.
2 changes: 2 additions & 0 deletions apps/vmq_proto/proto/on_deliver.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "matched_acl.proto";

option go_package = "source.golabs.io/courier/apis-go/eventssidecar/v1";

Expand All @@ -15,4 +16,5 @@ message OnDeliver {
int32 qos = 6;
bool is_retain = 7;
bytes payload = 8;
MatchedACL matched_acl = 9;
}
2 changes: 2 additions & 0 deletions apps/vmq_proto/proto/on_delivery_complete.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "matched_acl.proto";

option go_package = "source.golabs.io/courier/apis-go/eventssidecar/v1";

Expand All @@ -15,4 +16,5 @@ message OnDeliveryComplete {
int32 qos = 6;
bool is_retain = 7;
bytes payload = 8;
MatchedACL matched_acl = 9;
}
Loading

0 comments on commit 4dbb030

Please sign in to comment.