Skip to content

Commit

Permalink
feat: allow sampling of hook events based on acl_name for on_publish …
Browse files Browse the repository at this point in the history
…& user for on_deliver (#40)

* feat: add sampler for sampling on_publish based on ACL names

* feat: add sampler for sampling on_deliver based on user

* fix: code formatting & dialyzer error

* fix: local tests fail due to econnrefused
  • Loading branch information
dhruvjain99 authored Feb 5, 2024
1 parent ef1ce39 commit 88909af
Show file tree
Hide file tree
Showing 10 changed files with 509 additions and 73 deletions.
4 changes: 3 additions & 1 deletion apps/vmq_commons/src/vmq_types_common.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@
non_persistence = false :: flag()
}).
-type msg() :: #vmq_msg{}.
-record(matched_acl, {name = <<>> :: binary(), pattern = <<>> :: binary()}).
-record(matched_acl, {
name = undefined :: binary() | undefined, pattern = undefined :: binary() | undefined
}).
-endif.
68 changes: 40 additions & 28 deletions apps/vmq_enhanced_auth/src/vmq_enhanced_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,13 @@ match(TIn, T, Tbl, Type, Key, Qos) ->
case match(TIn, T) of
true ->
case ets:lookup(Tbl, Key) of
[{_, _, <<>>}] ->
MatchedAcl = #matched_acl{
pattern = iolist_to_binary(vmq_topic:unword(T))
},
{true, MatchedAcl};
[{_, _, Label}] ->
check_label_and_incr_metrics(Label, Type, Qos),
incr_matched_topic(Label, Type, Qos),
MatchedAcl = #matched_acl{
name = Label, pattern = iolist_to_binary(vmq_topic:unword(T))
},
Expand All @@ -376,22 +381,21 @@ match(TIn, T, Tbl, Type, Key, Qos) ->
false
end.

check_label_and_incr_metrics(Label, Type, Qos) ->
case Label of
<<>> ->
ok;
_ ->
OperationName =
case Type of
read -> subscribe;
write -> publish
end,
_ = vmq_metrics:incr_topic_counter(
{topic_matches, OperationName, [
{acl_matched, Label}, {qos, integer_to_list(Qos)}
]}
)
end.
incr_matched_topic(<<>>, _Type, _Qos) ->
ok;
incr_matched_topic(undefined, _Type, _Qos) ->
ok;
incr_matched_topic(Label, Type, Qos) ->
OperationName =
case Type of
read -> subscribe;
write -> publish
end,
_ = vmq_metrics:incr_topic_counter(
{topic_matches, OperationName, [
{acl_matched, Label}, {qos, integer_to_list(Qos)}
]}
).

topic(User, {MP, ClientId}, Topic) ->
subst(list_to_binary(MP), User, ClientId, Topic, []).
Expand Down Expand Up @@ -760,8 +764,8 @@ simple_acl(_) ->
),
?_assertEqual(
{ok, [
{[<<"a">>, <<"b">>, <<"c">>], 0, {matched_acl, <<>>, <<"a/b/c">>}},
{[<<"x">>, <<"y">>, <<"z">>, <<"#">>], 0, {matched_acl, <<>>, <<"x/y/z/#">>}},
{[<<"a">>, <<"b">>, <<"c">>], 0, {matched_acl, undefined, <<"a/b/c">>}},
{[<<"x">>, <<"y">>, <<"z">>, <<"#">>], 0, {matched_acl, undefined, <<"x/y/z/#">>}},
{
[<<>>, <<"test">>, <<"my-client-id">>],
0,
Expand All @@ -780,8 +784,8 @@ simple_acl(_) ->
),
?_assertEqual(
{ok, [
{[<<"a">>, <<"b">>, <<"c">>], 0, {matched_acl, <<>>, <<"a/b/c">>}},
{[<<"x">>, <<"y">>, <<"z">>, <<"#">>], 0, {matched_acl, <<>>, <<"x/y/z/#">>}},
{[<<"a">>, <<"b">>, <<"c">>], 0, {matched_acl, undefined, <<"a/b/c">>}},
{[<<"x">>, <<"y">>, <<"z">>, <<"#">>], 0, {matched_acl, undefined, <<"x/y/z/#">>}},
{
[<<"example">>, <<"profile-id">>],
0,
Expand All @@ -802,8 +806,8 @@ simple_acl(_) ->
%% colon separated username
?_assertEqual(
{ok, [
{[<<"a">>, <<"b">>, <<"c">>], 0, {matched_acl, <<>>, <<"a/b/c">>}},
{[<<"x">>, <<"y">>, <<"z">>, <<"#">>], 0, {matched_acl, <<>>, <<"x/y/z/#">>}},
{[<<"a">>, <<"b">>, <<"c">>], 0, {matched_acl, undefined, <<"a/b/c">>}},
{[<<"x">>, <<"y">>, <<"z">>, <<"#">>], 0, {matched_acl, undefined, <<"x/y/z/#">>}},
{
[<<>>, <<"test">>, <<"my-client-id">>],
0,
Expand All @@ -822,9 +826,17 @@ simple_acl(_) ->
),
?_assertEqual(
{ok, [
{[<<"a">>, <<"b">>, <<"c">>], 0, {matched_acl, <<>>, <<"a/b/c">>}},
{[<<"x">>, <<"y">>, <<"z">>, <<"#">>], not_allowed, {matched_acl, <<>>, <<>>}},
{[<<>>, <<"test">>, <<"my-client-id">>], not_allowed, {matched_acl, <<>>, <<>>}}
{[<<"a">>, <<"b">>, <<"c">>], 0, {matched_acl, undefined, <<"a/b/c">>}},
{
[<<"x">>, <<"y">>, <<"z">>, <<"#">>],
not_allowed,
{matched_acl, undefined, undefined}
},
{
[<<>>, <<"test">>, <<"my-client-id">>],
not_allowed,
{matched_acl, undefined, undefined}
}
]},
auth_on_subscribe(
<<"invalid-user">>,
Expand All @@ -837,7 +849,7 @@ simple_acl(_) ->
)
),
?_assertEqual(
{ok, [{matched_acl, {matched_acl, <<>>, <<"a/b/c">>}}]},
{ok, [{matched_acl, {matched_acl, undefined, <<"a/b/c">>}}]},
auth_on_publish(
<<"test">>,
{"", <<"my-client-id">>},
Expand All @@ -859,7 +871,7 @@ simple_acl(_) ->
)
),
?_assertEqual(
{ok, [{matched_acl, {matched_acl, <<>>, <<"x/y/z/#">>}}]},
{ok, [{matched_acl, {matched_acl, undefined, <<"x/y/z/#">>}}]},
auth_on_publish(
<<"test">>,
{"", <<"my-client-id">>},
Expand Down
20 changes: 20 additions & 0 deletions apps/vmq_events_sidecar/priv/vmq_events_sidecar.schema
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,23 @@
[{datatype, integer},
hidden,
{default, 4096}]}.

%% @doc Configure the sampling for on_publish events based on ACL names
{mapping, "sample.on_publish.$acl.percentage", "vmq_events_sidecar.sampler.on_publish",
[{datatype, integer}]}.

{translation, "vmq_events_sidecar.sampler.on_publish",
fun(Conf) ->
vmq_events_sidecar_schema:translate_sampling("on_publish", Conf)
end
}.

%% @doc Configure the sampling for on_deliver events based on user names
{mapping, "sample.on_deliver.$user.percentage", "vmq_events_sidecar.sampler.on_deliver",
[{datatype, integer}]}.

{translation, "vmq_events_sidecar.sampler.on_deliver",
fun(Conf) ->
vmq_events_sidecar_schema:translate_sampling("on_deliver", Conf)
end
}.
185 changes: 183 additions & 2 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ register_cli() ->
register_cli_usage(),
status_cmd(),
enable_cmd(),
disable_cmd().
disable_cmd(),
show_sampling_cmd(),
enable_sampling_cmd(),
disable_sampling_cmd().

register_config() ->
ConfigKeys =
Expand Down Expand Up @@ -90,6 +93,90 @@ disable_cmd() ->
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

show_sampling_cmd() ->
Cmd = ["vmq-admin", "events", "sampling", "show"],
KeySpecs = [hook_sampling_keyspec()],
FlagSpecs = [],
Callback =
fun
(_, [{hook, Hook}], []) ->
CriterionName =
case Hook of
on_publish -> acl_name;
on_deliver -> user
end,
Table =
[
[{CriterionName, binary_to_atom(C)}, {'Percentage', P}]
|| [C, P] <- vmq_events_sidecar_plugin:list_sampling_conf(Hook)
],
[clique_status:table(Table)];
(_, _, _) ->
Text = clique_status:text(show_sampling_usage()),
[clique_status:alert([Text])]
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

enable_sampling_cmd() ->
Cmd = ["vmq-admin", "events", "sampling", "enable"],
KeySpecs = [hook_sampling_keyspec(), sampling_percentage_keyspec()],
FlagSpecs = [sampling_user_flagspec(), sampling_acl_name_flagspec()],
Callback =
fun
(_, [{hook, on_publish}, {percentage, P}], [{acl_name, ACL}]) ->
vmq_events_sidecar_plugin:enable_sampling(on_publish, ACL, P),
[clique_status:text("Done")];
(_, [{percentage, P}, {hook, on_publish}], [{acl_name, ACL}]) ->
vmq_events_sidecar_plugin:enable_sampling(on_publish, ACL, P),
[clique_status:text("Done")];
(_, [{hook, on_deliver}, {percentage, P}], [{user, User}]) ->
vmq_events_sidecar_plugin:enable_sampling(on_deliver, User, P),
[clique_status:text("Done")];
(_, [{percentage, P}, {hook, on_deliver}], [{user, User}]) ->
vmq_events_sidecar_plugin:enable_sampling(on_deliver, User, P),
[clique_status:text("Done")];
(_, _, _) ->
Text = clique_status:text(enable_sampling_usage()),
[clique_status:alert([Text])]
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

disable_sampling_cmd() ->
Cmd = ["vmq-admin", "events", "sampling", "disable"],
KeySpecs = [hook_sampling_keyspec()],
FlagSpecs = [sampling_user_flagspec(), sampling_acl_name_flagspec()],
Callback =
fun
(_, [{hook, on_publish}], [{acl_name, ACL}]) ->
case vmq_events_sidecar_plugin:disable_sampling(on_publish, ACL) of
ok ->
[clique_status:text("Done")];
{error, Reason} ->
Text = io_lib:format(
"can't disable sampling for hook: ~p criterion: ~p due to '~p'", [
on_publish, ACL, Reason
]
),
[clique_status:alert([clique_status:text(Text)])]
end;
(_, [{hook, on_deliver}], [{user, User}]) ->
case vmq_events_sidecar_plugin:disable_sampling(on_deliver, User) of
ok ->
[clique_status:text("Done")];
{error, Reason} ->
Text = io_lib:format(
"can't disable sampling for hook: ~p criterion: ~p due to '~p'", [
on_deliver, User, Reason
]
),
[clique_status:alert([clique_status:text(Text)])]
end;
(_, _, _) ->
Text = clique_status:text(disable_sampling_usage()),
[clique_status:alert([Text])]
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

hook_keyspec() ->
{hook, [
{typecast, fun
Expand Down Expand Up @@ -122,11 +209,72 @@ hook_keyspec() ->
end}
]}.

hook_sampling_keyspec() ->
{hook, [
{typecast, fun
(Hook) when is_list(Hook) ->
case
lists:member(
Hook,
[
"on_publish",
"on_deliver"
]
)
of
true ->
binary_to_atom(list_to_binary(Hook), utf8);
_ ->
{error, {invalid_value, Hook}}
end;
(Hook) ->
{error, {invalid_value, Hook}}
end}
]}.

sampling_user_flagspec() ->
sampling_criterion_flagspec(user).

sampling_acl_name_flagspec() ->
sampling_criterion_flagspec(acl_name).

sampling_criterion_flagspec(CName) ->
{CName, [
{longname, atom_to_list(CName)},
{typecast, fun
(C) when is_list(C) ->
list_to_binary(C);
(C) ->
{error, {invalid_flag, C}}
end}
]}.

sampling_percentage_keyspec() ->
{percentage, [
{typecast, fun(StrP) ->
case catch list_to_integer(StrP) of
P when (P >= 0) and (P =< 100) -> P;
_ -> {error, {invalid_args, [{percentage, StrP}]}}
end
end}
]}.

register_cli_usage() ->
clique:register_usage(["vmq-admin", "events"], events_usage()),
clique:register_usage(["vmq-admin", "events", "enable"], enable_usage()),
clique:register_usage(["vmq-admin", "events", "disable"], disable_usage()),
clique:register_usage(["vmq-admin", "events", "show"], show_usage()).
clique:register_usage(["vmq-admin", "events", "show"], show_usage()),
clique:register_usage(["vmq-admin", "events", "sampling"], events_sampling_usage()),
clique:register_usage(
["vmq-admin", "events", "sampling", "enable"], enable_sampling_usage()
),
clique:register_usage(
["vmq-admin", "events", "sampling", "disable"],
disable_sampling_usage()
),
clique:register_usage(
["vmq-admin", "events", "sampling", "show"], show_sampling_usage()
).

events_usage() ->
[
Expand All @@ -136,6 +284,7 @@ events_usage() ->
" show Show all registered events\n",
" enable Enable an event\n",
" disable Disable an event\n",
" sampling Allows sampling of enabled events\n"
" Use --help after a sub-command for more details.\n"
].

Expand All @@ -159,3 +308,35 @@ show_usage() ->
" Shows the information of the registered events.",
"\n\n"
].

events_sampling_usage() ->
[
"vmq-admin events sampling \n\n",
" Allows sampling of hook specific events based on acl_name/user\n\n",
" Sub-commands:\n",
" show Shows all the hook specific sampling configurations\n",
" enable Enables sampling for events based on acl/user\n",
" disable Disables sampling\n",
" Use --help after a sub-command for more details.\n"
].

enable_sampling_usage() ->
[
"vmq-admin events sampling enable hook=<Hook> percentage=<Percentage> --acl_name=<ACL> --user=<User>\n\n",
" Enables sampling based on acl_name/label for on_publish & username for on_deliver.",
"\n\n"
].

disable_sampling_usage() ->
[
"vmq-admin events sampling disable hook=<Hook> --acl_name=<ACL> --user=<User>\n\n",
" Disables sampling for specified hook based on the flag.",
"\n\n"
].

show_sampling_usage() ->
[
"vmq-admin events sampling show hook=<Hook>\n\n",
" Shows all the hook specific sampling configurations.",
"\n\n"
].
Loading

0 comments on commit 88909af

Please sign in to comment.