Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add topic metrics to on_deliver and on_delivery_complete events #42

Merged
3 changes: 2 additions & 1 deletion apps/vmq_commons/src/vmq_types_common.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
undefined
| msg_expiry_ts(),
non_retry = false :: flag(),
non_persistence = false :: flag()
non_persistence = false :: flag(),
acl_name :: binary() | 'undefined'
}).
-type msg() :: #vmq_msg{}.
-record(matched_acl, {
Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_diversity/src/vmq_diversity.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
{vmq_diversity_plugin, on_publish, 7, []},
{vmq_diversity_plugin, on_subscribe, 3, []},
{vmq_diversity_plugin, on_unsubscribe, 3, []},
{vmq_diversity_plugin, on_deliver, 6, []},
{vmq_diversity_plugin, on_delivery_complete, 6, []},
{vmq_diversity_plugin, on_deliver, 7, []},
{vmq_diversity_plugin, on_delivery_complete, 7, []},

{vmq_diversity_plugin, auth_on_register_m5, 6, []},
{vmq_diversity_plugin, auth_on_publish_m5, 7, []},
Expand Down
8 changes: 4 additions & 4 deletions apps/vmq_diversity/src/vmq_diversity_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
on_publish/7,
on_subscribe/3,
on_unsubscribe/3,
on_deliver/6,
on_deliver/7,
on_offline_message/5,
on_client_wakeup/1,
on_client_offline/2,
on_client_gone/2,
on_session_expired/1,
on_delivery_complete/6,
on_delivery_complete/7,
auth_on_register_m5/6,
on_register_m5/4,
auth_on_publish_m5/7,
Expand Down Expand Up @@ -582,7 +582,7 @@ on_unsubscribe(UserName, SubscriberId, Topics) ->
])
end.

on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, _) ->
{MP, ClientId} = subscriber_id(SubscriberId),
all_till_ok(on_deliver, [
{username, nilify(UserName)},
Expand All @@ -594,7 +594,7 @@ on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
{retain, IsRetain}
]).

on_delivery_complete(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_delivery_complete(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, _) ->
{MP, ClientId} = subscriber_id(SubscriberId),
all(on_delivery_complete, [
{username, nilify(UserName)},
Expand Down
2 changes: 1 addition & 1 deletion apps/vmq_diversity/test/vmq_diversity_plugin_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ on_unsubscribe_test(_) ->

on_deliver_test(_) ->
ok = vmq_plugin:all_till_ok(on_deliver,
[username(), allowed_subscriber_id(), 1, topic(), payload(), false]).
[username(), allowed_subscriber_id(), 1, topic(), payload(), false, matched_acl()]).

on_offline_message_test(_) ->
[next] = vmq_plugin:all(on_offline_message, [allowed_subscriber_id(), 2,
Expand Down
18 changes: 1 addition & 17 deletions apps/vmq_enhanced_auth/src/vmq_enhanced_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ match(TIn, T, Tbl, Type, Key, Qos) ->
},
{true, MatchedAcl};
[{_, _, Label}] ->
incr_matched_topic(Label, Type, Qos),
vmq_mqtt_fsm:incr_matched_topic(Label, Type, Qos),
dhruvjain99 marked this conversation as resolved.
Show resolved Hide resolved
MatchedAcl = #matched_acl{
name = Label, pattern = iolist_to_binary(vmq_topic:unword(T))
},
Expand All @@ -381,22 +381,6 @@ match(TIn, T, Tbl, Type, Key, Qos) ->
false
end.

incr_matched_topic(<<>>, _Type, _Qos) ->
ok;
incr_matched_topic(undefined, _Type, _Qos) ->
ok;
incr_matched_topic(Label, Type, Qos) ->
OperationName =
case Type of
read -> subscribe;
write -> publish
end,
_ = vmq_metrics:incr_topic_counter(
{topic_matches, OperationName, [
{acl_matched, Label}, {qos, integer_to_list(Qos)}
]}
).

topic(User, {MP, ClientId}, Topic) ->
subst(list_to_binary(MP), User, ClientId, Topic, []).

Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
{vmq_events_sidecar_plugin, on_publish, 7, []},
{vmq_events_sidecar_plugin, on_subscribe, 3, []},
{vmq_events_sidecar_plugin, on_unsubscribe, 3, []},
{vmq_events_sidecar_plugin, on_deliver, 6, []},
{vmq_events_sidecar_plugin, on_deliver, 7, []},
{vmq_events_sidecar_plugin, on_offline_message, 5, []},
{vmq_events_sidecar_plugin, on_client_wakeup, 1, []},
{vmq_events_sidecar_plugin, on_client_offline, 2, []},
{vmq_events_sidecar_plugin, on_session_expired, 1, []},
{vmq_events_sidecar_plugin, on_delivery_complete, 6, []},
{vmq_events_sidecar_plugin, on_delivery_complete, 7, []},
{vmq_events_sidecar_plugin, on_client_gone, 2, []}
]}
]},
Expand Down
24 changes: 20 additions & 4 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar_format.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ encode({on_unsubscribe, Timestamp, {MP, ClientId, UserName, Topics}}) ->
timestamp = convert_timestamp(Timestamp)
})
);
encode({on_deliver, Timestamp, {MP, ClientId, UserName, QoS, Topic, Payload, IsRetain}}) ->
encode(
{on_deliver, Timestamp,
{MP, ClientId, UserName, QoS, Topic, Payload, IsRetain, #matched_acl{
name = Name, pattern = Pattern
}}}
) ->
encode_envelope(
"OnDeliver",
on_deliver_pb:encode_msg(#'eventssidecar.v1.OnDeliver'{
Expand All @@ -111,10 +116,18 @@ encode({on_deliver, Timestamp, {MP, ClientId, UserName, QoS, Topic, Payload, IsR
qos = QoS,
is_retain = IsRetain,
payload = Payload,
timestamp = convert_timestamp(Timestamp)
timestamp = convert_timestamp(Timestamp),
matched_acl = #'eventssidecar.v1.MatchedACL'{
name = Name, pattern = Pattern
}
})
);
encode({on_delivery_complete, Timestamp, {MP, ClientId, UserName, QoS, Topic, Payload, IsRetain}}) ->
encode(
{on_delivery_complete, Timestamp,
{MP, ClientId, UserName, QoS, Topic, Payload, IsRetain, #matched_acl{
name = Name, pattern = Pattern
}}}
) ->
encode_envelope(
"OnDeliveryComplete",
on_delivery_complete_pb:encode_msg(#'eventssidecar.v1.OnDeliveryComplete'{
Expand All @@ -125,7 +138,10 @@ encode({on_delivery_complete, Timestamp, {MP, ClientId, UserName, QoS, Topic, Pa
qos = QoS,
is_retain = IsRetain,
payload = Payload,
timestamp = convert_timestamp(Timestamp)
timestamp = convert_timestamp(Timestamp),
matched_acl = #'eventssidecar.v1.MatchedACL'{
name = Name, pattern = Pattern
}
})
);
encode({on_offline_message, Timestamp, {MP, ClientId, QoS, Topic, Payload, IsRetain}}) ->
Expand Down
18 changes: 10 additions & 8 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
on_publish/7,
on_subscribe/3,
on_unsubscribe/3,
on_deliver/6,
on_deliver/7,
on_offline_message/5,
on_client_wakeup/1,
on_client_offline/2,
on_client_gone/2,
on_session_expired/1,
on_delivery_complete/6
on_delivery_complete/7
]).

%% API
Expand Down Expand Up @@ -290,23 +290,25 @@ on_unsubscribe(UserName, SubscriberId, Topics) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event(on_unsubscribe, {MP, ClientId, normalise(UserName), [unword(T) || T <- Topics]}).

-spec on_deliver(username(), subscriber_id(), qos(), topic(), payload(), flag()) ->
-spec on_deliver(username(), subscriber_id(), qos(), topic(), payload(), flag(), matched_acl()) ->
'next' | 'ok' | {'ok', payload() | [on_deliver_hook:msg_modifier()]}.
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, MatchedAcl) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event(
on_deliver,
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain},
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain, MatchedAcl},
UserName
).

-spec on_delivery_complete(username(), subscriber_id(), qos(), topic(), payload(), flag()) ->
-spec on_delivery_complete(
username(), subscriber_id(), qos(), topic(), payload(), flag(), matched_acl()
) ->
'next'.
on_delivery_complete(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_delivery_complete(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, MatchedAcl) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event(
on_delivery_complete,
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain}
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain, MatchedAcl}
).

-spec on_offline_message(subscriber_id(), qos(), topic(), payload(), flag()) -> 'next'.
Expand Down
7 changes: 5 additions & 2 deletions apps/vmq_events_sidecar/test/events_sidecar_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ on_deliver(#'eventssidecar.v1.OnDeliver'{username = BinPid,
qos = 1,
topic = ?TOPIC,
payload = ?PAYLOAD,
is_retain = false}) ->
is_retain = false,
matched_acl = #'eventssidecar.v1.MatchedACL'{name = ?LABEL, pattern = ?PATTERN}
}) ->
Pid = list_to_pid(binary_to_list(BinPid)),
Pid ! on_deliver_ok.

Expand All @@ -149,7 +151,8 @@ on_delivery_complete(#'eventssidecar.v1.OnDeliveryComplete'{username = BinPid,
qos = 1,
topic = ?TOPIC,
payload = ?PAYLOAD,
is_retain = false}) ->
is_retain = false,
matched_acl = #'eventssidecar.v1.MatchedACL'{name = ?LABEL, pattern = ?PATTERN}}) ->
Pid = list_to_pid(binary_to_list(BinPid)),
Pid ! on_delivery_complete_ok.

Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_events_sidecar/test/vmq_events_sidecar_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ on_deliver_test(_) ->
enable_hook(on_deliver),
Self = pid_to_bin(self()),
ok = vmq_plugin:all_till_ok(on_deliver,
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false]),
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #matched_acl{name = ?LABEL, pattern = ?PATTERN}]),
ok = exp_response(on_deliver_ok),
disable_hook(on_deliver).

on_delivery_complete_test(_) ->
enable_hook(on_delivery_complete),
Self = pid_to_bin(self()),
[ok] = vmq_plugin:all(on_delivery_complete,[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false]),
[ok] = vmq_plugin:all(on_delivery_complete,[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #matched_acl{name = ?LABEL, pattern = ?PATTERN}]),
ok = exp_response(on_delivery_complete_ok),
disable_hook(on_delivery_complete).

Expand Down
20 changes: 17 additions & 3 deletions apps/vmq_proto/include/on_deliver_pb.hrl
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
%% -*- coding: utf-8 -*-
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.1
%% Generated by gpb_compile version 4.20.0

-ifndef(on_deliver_pb).
-define(on_deliver_pb, true).

-define(on_deliver_pb_gpb_version, "4.19.1").
-define(on_deliver_pb_gpb_version, "4.20.0").

-ifndef('EVENTSSIDECAR.V1.ONDELIVER_PB_H').
-define('EVENTSSIDECAR.V1.ONDELIVER_PB_H', true).
Expand All @@ -26,7 +26,9 @@
% = 7, optional
is_retain = false :: boolean() | 0 | 1 | undefined,
% = 8, optional
payload = <<>> :: iodata() | undefined
payload = <<>> :: iodata() | undefined,
% = 9, optional
matched_acl = undefined :: on_deliver_pb:'eventssidecar.v1.MatchedACL'() | undefined
}
).
-endif.
Expand All @@ -43,4 +45,16 @@
).
-endif.

-ifndef('EVENTSSIDECAR.V1.MATCHEDACL_PB_H').
-define('EVENTSSIDECAR.V1.MATCHEDACL_PB_H', true).
-record('eventssidecar.v1.MatchedACL',
% = 1, optional
{
name = <<>> :: unicode:chardata() | undefined,
% = 2, optional
pattern = <<>> :: unicode:chardata() | undefined
}
).
-endif.

-endif.
21 changes: 18 additions & 3 deletions apps/vmq_proto/include/on_delivery_complete_pb.hrl
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
%% -*- coding: utf-8 -*-
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.1
%% Generated by gpb_compile version 4.20.0

-ifndef(on_delivery_complete_pb).
-define(on_delivery_complete_pb, true).

-define(on_delivery_complete_pb_gpb_version, "4.19.1").
-define(on_delivery_complete_pb_gpb_version, "4.20.0").

-ifndef('EVENTSSIDECAR.V1.ONDELIVERYCOMPLETE_PB_H').
-define('EVENTSSIDECAR.V1.ONDELIVERYCOMPLETE_PB_H', true).
Expand All @@ -26,7 +26,10 @@
% = 7, optional
is_retain = false :: boolean() | 0 | 1 | undefined,
% = 8, optional
payload = <<>> :: iodata() | undefined
payload = <<>> :: iodata() | undefined,
% = 9, optional
matched_acl = undefined ::
on_delivery_complete_pb:'eventssidecar.v1.MatchedACL'() | undefined
}
).
-endif.
Expand All @@ -43,4 +46,16 @@
).
-endif.

-ifndef('EVENTSSIDECAR.V1.MATCHEDACL_PB_H').
-define('EVENTSSIDECAR.V1.MATCHEDACL_PB_H', true).
-record('eventssidecar.v1.MatchedACL',
% = 1, optional
{
name = <<>> :: unicode:chardata() | undefined,
% = 2, optional
pattern = <<>> :: unicode:chardata() | undefined
}
).
-endif.

-endif.
2 changes: 2 additions & 0 deletions apps/vmq_proto/proto/on_deliver.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "matched_acl.proto";

option go_package = "source.golabs.io/courier/apis-go/eventssidecar/v1";

Expand All @@ -15,4 +16,5 @@ message OnDeliver {
int32 qos = 6;
bool is_retain = 7;
bytes payload = 8;
MatchedACL matched_acl = 9;
}
2 changes: 2 additions & 0 deletions apps/vmq_proto/proto/on_delivery_complete.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "matched_acl.proto";

option go_package = "source.golabs.io/courier/apis-go/eventssidecar/v1";

Expand All @@ -15,4 +16,5 @@ message OnDeliveryComplete {
int32 qos = 6;
bool is_retain = 7;
bytes payload = 8;
MatchedACL matched_acl = 9;
}
Loading
Loading