Skip to content

Commit

Permalink
Put shared subs local caching behind config (#27)
Browse files Browse the repository at this point in the history
* Put shared subs local caching behind config

* Fix run test with retry script to run all failed tests and prevent false positive

* Fix ct tests
  • Loading branch information
dhruvjain99 authored Jul 20, 2023
1 parent 92bc947 commit 98497e6
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 20 deletions.
10 changes: 10 additions & 0 deletions apps/vmq_server/priv/vmq_server.schema
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@
{datatype, integer}
]}.

%% @doc This option caches the shared subscription locally, default is 'off'. !!NOTE!!
%% Local shared subscriptions currently do not honor shared subscription groups properly.
%% It is recommended to use shared subscription topics only for shared subscription use-cases.
%% Do not expect broker to route msgs for same topic to both shared subscribers and normal subscribers.
%% It will only route to shared subscribers irrespective of shared subscription groups.
{mapping, "cache_shared_subscriptions_locally", "vmq_server.cache_shared_subscriptions_locally", [
{default, off},
{datatype, flag}
]}.

%% @doc Allow anonymous users to connect, default is 'off'. !!NOTE!!
%% Enabling this completely disables authentication of the clients and
%% should only be used for testing/development purposes or in case
Expand Down
3 changes: 2 additions & 1 deletion apps/vmq_server/src/vmq_config_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ register_config_() ->
"coordinate_registrations",
"mqtt_connect_timeout",
"queue_sup_sup_max_t",
"queue_sup_sup_max_r"
"queue_sup_sup_max_r",
"cache_shared_subscriptions_locally"
],
_ = [
clique:register_config([Key], fun register_config_callback/3)
Expand Down
30 changes: 18 additions & 12 deletions apps/vmq_server/src/vmq_reg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,24 @@ subscribe_op({MP, ClientId} = SubscriberId, Topics) ->
{error, _} = ErrRes ->
ErrRes;
_ ->
lists:foreach(
fun
({[<<"$share">>, _Group | Topic], QoS}) ->
Key = {MP, Topic},
Value = {ClientId, QoS},
ets:insert(?SHARED_SUBS_ETS_TABLE, {{Key, Value}}),
vmq_metrics:incr_cache_insert(?LOCAL_SHARED_SUBS);
(_) ->
ok
end,
Topics
),
CacheLocally = vmq_config:get_env(cache_shared_subscriptions_locally, false),
if
CacheLocally ->
lists:foreach(
fun
({[<<"$share">>, _Group | Topic], QoS}) ->
Key = {MP, Topic},
Value = {ClientId, QoS},
ets:insert(?SHARED_SUBS_ETS_TABLE, {{Key, Value}}),
vmq_metrics:incr_cache_insert(?LOCAL_SHARED_SUBS);
(_) ->
ok
end,
Topics
);
true ->
skip_caching
end,
Existing = subscriptions_exist(OldSubs, Topics),
QoSTable =
lists:foldl(
Expand Down
4 changes: 3 additions & 1 deletion apps/vmq_server/src/vmq_server.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@
%{graphite_prefix, ""}

% Systree
{systree_enabled, true}
{systree_enabled, true},
%{systree_interval, 20000},
%{systree_prefix, [<<"$SYS">>, NodenameBinary]},
%{systree_mountpoint, ""},
%{systree_qos, 0},
%{systree_retain, false},
%{systree_reg_view, vmq_reg_trie},

{cache_shared_subscriptions_locally, false}

% end env
]}
]}.
41 changes: 40 additions & 1 deletion apps/vmq_server/test/vmq_cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
cluster_dead_node_subscriber_reaper_test/1,
cluster_dead_node_message_reaper_test/1,
shared_subs_random_policy_test/1,
shared_subs_random_policy_test_with_local_caching/1,
shared_subs_random_policy_online_first_test/1,
shared_subs_random_policy_online_first_test_with_local_caching/1,
shared_subs_random_policy_all_offline_test/1,
shared_subs_random_policy_all_offline_test_with_local_caching/1,
shared_subs_prefer_local_policy_test/1,
shared_subs_prefer_local_policy_test_with_local_caching/1,
shared_subs_local_only_policy_test/1,
shared_subs_local_only_policy_test_with_local_caching/1,
cross_node_publish_subscribe/1,
routing_table_survives_node_restart/1
]).
Expand Down Expand Up @@ -116,11 +121,16 @@ all() ->
cluster_dead_node_subscriber_reaper_test,
cluster_dead_node_message_reaper_test,
shared_subs_random_policy_test,
shared_subs_random_policy_test_with_local_caching,
shared_subs_random_policy_online_first_test,
%% This test has been skipped because with introduction of in-mem shared subscriptions cache, the test is not valid anymore
%% shared_subs_random_policy_online_first_test,
%% shared_subs_random_policy_online_first_test_with_local_caching,
shared_subs_random_policy_all_offline_test,
shared_subs_random_policy_all_offline_test_with_local_caching,
shared_subs_prefer_local_policy_test,
shared_subs_prefer_local_policy_test_with_local_caching,
shared_subs_local_only_policy_test,
shared_subs_local_only_policy_test_with_local_caching,
cross_node_publish_subscribe,
routing_table_survives_node_restart
].
Expand Down Expand Up @@ -739,10 +749,13 @@ cluster_dead_node_message_reaper_test(Config) ->
{ToMigrate, ToMigrate},
RestNodesWithPorts).

shared_subs_prefer_local_policy_test_with_local_caching(Config) ->
shared_subs_prefer_local_policy_test([{cache_shared_subscriptions_locally, true} | Config]).
shared_subs_prefer_local_policy_test(Config) ->
ensure_cluster(Config),
[LocalNode | OtherNodes] = _Nodes = nodes_(Config),
set_shared_subs_policy(prefer_local, nodenames(Config)),
set_shared_subs_local_caching(Config),

LocalSubscriberSockets = connect_subscribers(<<"$share/share/sharedtopic">>, 5, [LocalNode]),
RemoteSubscriberSockets = connect_subscribers(<<"$share/share/sharedtopic">>, 5, OtherNodes),
Expand Down Expand Up @@ -771,10 +784,13 @@ shared_subs_prefer_local_policy_test(Config) ->
[ok = gen_tcp:close(S) || S <- LocalSubscriberSockets ++ RemoteSubscriberSockets],
ok.

shared_subs_local_only_policy_test_with_local_caching(Config) ->
shared_subs_local_only_policy_test([{cache_shared_subscriptions_locally, true} | Config]).
shared_subs_local_only_policy_test(Config) ->
ensure_cluster(Config),
[LocalNode | OtherNodes] = _Nodes = nodes_(Config),
set_shared_subs_policy(local_only, nodenames(Config)),
set_shared_subs_local_caching(Config),

LocalSubscriberSockets = connect_subscribers(<<"$share/share/sharedtopic">>, 5, [LocalNode]),
RemoteSubscriberSockets = connect_subscribers(<<"$share/share/sharedtopic">>, 5, OtherNodes),
Expand Down Expand Up @@ -817,10 +833,14 @@ shared_subs_local_only_policy_test(Config) ->
[ok = gen_tcp:close(S) || S <- RemoteSubscriberSockets],
ok.

shared_subs_random_policy_test_with_local_caching(Config) ->
shared_subs_random_policy_test([{cache_shared_subscriptions_locally, true} | Config]).
shared_subs_random_policy_test(Config) ->
ensure_cluster(Config),
Nodes = nodes_(Config),

set_shared_subs_policy(random, nodenames(Config)),
set_shared_subs_local_caching(Config),

SubscriberSockets = connect_subscribers(<<"$share/share/sharedtopic">>, 10, Nodes),

Expand All @@ -847,10 +867,13 @@ shared_subs_random_policy_test(Config) ->
ok.

%% TODO: enable this after groups are honoured in shared subscriptions
shared_subs_random_policy_online_first_test_with_local_caching(Config) ->
shared_subs_random_policy_online_first_test([{cache_shared_subscriptions_locally, true} | Config]).
shared_subs_random_policy_online_first_test(Config) ->
ensure_cluster(Config),
Nodes = nodes_(Config),
set_shared_subs_policy(random, nodenames(Config)),
set_shared_subs_local_caching(Config),

[OnlineSubNode | RestNodes] = Nodes,
create_offline_subscribers(<<"$share/share/sharedtopic">>, 10, RestNodes),
Expand Down Expand Up @@ -878,10 +901,13 @@ shared_subs_random_policy_online_first_test(Config) ->
[ok = gen_tcp:close(S) || S <- SubscriberSocketsOnline],
ok.

shared_subs_random_policy_all_offline_test_with_local_caching(Config) ->
shared_subs_random_policy_all_offline_test([{cache_shared_subscriptions_locally, true} | Config]).
shared_subs_random_policy_all_offline_test(Config) ->
ensure_cluster(Config),
Nodes = nodes_(Config),
set_shared_subs_policy(random, nodenames(Config)),
set_shared_subs_local_caching(Config),

OfflineClients = create_offline_subscribers(<<"$share/share/sharedtopic">>, 10, Nodes),

Expand Down Expand Up @@ -1091,6 +1117,19 @@ set_shared_subs_policy(Policy, Nodes) ->
Nodes
).

set_shared_subs_local_caching(Config) ->
Value = case lists:keyfind(cache_shared_subscriptions_locally, 1, Config) of
{_, V} -> V;
false -> false
end,
Nodes = nodenames(Config),
lists:foreach(
fun(N) ->
{ok, []} = rpc:call(N, vmq_server_cmd, set_config, [cache_shared_subscriptions_locally, Value])
end,
Nodes
).

nodenames(Config) ->
{NodeNames, _} = lists:unzip(nodes_(Config)),
NodeNames.
Expand Down
5 changes: 3 additions & 2 deletions apps/vmq_server/test/vmq_in_order_delivery_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ groups() ->
qos1_online,
qos2_online,
qos1_offline,
qos2_offline,
qos1_offline_node_restart
qos2_offline
%% This test is not valid anymore as single node restart fails due to reaping_in_progress error on start
%% qos1_offline_node_restart
],
[
{mqttv4, [shuffle], Tests},
Expand Down
4 changes: 4 additions & 0 deletions apps/vmq_server/test/vmq_publish_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1034,9 +1034,13 @@ shared_subscription_online_first(Cfg) ->
disable_on_publish(),
disable_on_subscribe().

%% This will only work in case of shared subscription local caching
shared_subscription_does_not_honor_grouping(Cfg) ->
enable_on_publish(),
enable_on_subscribe(),

vmq_server_cmd:set_config(cache_shared_subscriptions_locally, true),

Connack = mqtt5_v4compat:gen_connack(success, Cfg),
Prefix = vmq_cth:ustr(Cfg),
PubConnect = mqtt5_v4compat:gen_connect(Prefix ++ "shared-sub-pub", [{keepalive, 60}], Cfg),
Expand Down
5 changes: 2 additions & 3 deletions run-tests-with-retry.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/bin/sh


DIR=${1:-.}
FULLPATH=$DIR/_build/all_tests+test/logs/retry.spec

Expand All @@ -10,6 +9,6 @@ echo $FULLPATH
(echo -e "\nContents of retry.spec\n" &&
cat $FULLPATH &&
echo -e "\nRetry suites:" &&
echo $(pcregrep -o2 -o3 --om-separator="/" -M "^{(cases),\"(.+)\",[^\w]*(\w+),(.|\n)*?\.$" $FULLPATH | uniq | paste -s -d, -) &&
echo $(pcregrep -o2 -o3 --om-separator="/" -M "^{(cases|groups),\"(.+)\",[^\w]*(\w+),(.|\n)*?\.$" $FULLPATH | uniq | paste -s -d, -) &&
make db-reset &&
./rebar3 ct --suite=$(pcregrep -o2 -o3 --om-separator="/" -M "^{(cases),\"(.+)\",[^\w]*(\w+),(.|\n)*?\.$" $FULLPATH | uniq | paste -s -d, -))
./rebar3 ct --suite=$(pcregrep -o2 -o3 --om-separator="/" -M "^{(cases|groups),\"(.+)\",[^\w]*(\w+),(.|\n)*?\.$" $FULLPATH | uniq | paste -s -d, -))

0 comments on commit 98497e6

Please sign in to comment.