From bd16c531b6c307a69ff5261e03deae90313f767b Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 29 Apr 2024 10:50:38 +0200 Subject: [PATCH 01/19] Add sessions_cleanup hook --- src/ejabberd_sm.erl | 11 ++++++++--- src/ejabberd_sm_cets.erl | 9 +++++---- src/ejabberd_sm_mnesia.erl | 3 ++- src/ejabberd_sm_redis.erl | 10 ++++++---- src/hooks/mongoose_hooks.erl | 6 ++++++ 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 00446b0a88f..8816616f643 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -60,7 +60,8 @@ is_offline/1, get_user_present_pids/2, sync/0, - run_session_cleanup_hook/1, + session_cleanup/1, + sessions_cleanup/1, terminate_session/2, sm_backend/0 ]). @@ -354,8 +355,8 @@ unregister_iq_handler(Host, XMLNS) -> ejabberd_sm ! {unregister_iq_handler, Host, XMLNS}, ok. --spec run_session_cleanup_hook(#session{}) -> mongoose_acc:t(). -run_session_cleanup_hook(#session{usr = {U, S, R}, sid = SID}) -> +-spec session_cleanup(#session{}) -> mongoose_acc:t(). +session_cleanup(#session{usr = {U, S, R}, sid = SID}) -> {ok, HostType} = mongoose_domain_api:get_domain_host_type(S), Acc = mongoose_acc:new( #{location => ?LOCATION, @@ -364,6 +365,10 @@ run_session_cleanup_hook(#session{usr = {U, S, R}, sid = SID}) -> element => undefined}), mongoose_hooks:session_cleanup(S, Acc, U, R, SID). +-spec sessions_cleanup(#session{}) -> mongoose_acc:t(). +sessions_cleanup(Sessions) -> + mongoose_hooks:sessions_cleanup(Sessions). + -spec terminate_session(jid:jid() | pid(), binary()) -> ok | no_session. terminate_session(#jid{} = Jid, Reason) -> case get_session_pid(Jid) of diff --git a/src/ejabberd_sm_cets.erl b/src/ejabberd_sm_cets.erl index 79993429f92..6837fa095d1 100644 --- a/src/ejabberd_sm_cets.erl +++ b/src/ejabberd_sm_cets.erl @@ -80,10 +80,11 @@ cleanup(Node) -> %% This is a full table scan, but cleanup is rare. Tuples = ets:select(?TABLE, [{R, [Guard], ['$_']}]), cets:delete_many(?TABLE, [Key || {Key, _, _} <- Tuples]), - lists:foreach(fun(Tuple) -> - Session = tuple_to_session(Tuple), - ejabberd_sm:run_session_cleanup_hook(Session) - end, Tuples). + Sessions = tuples_to_sessions(Tuples), + ejabberd_sm:sessions_cleanup(Sessions), + lists:foreach(fun(Session) -> + ejabberd_sm:session_cleanup(Session) + end, Sessions). -spec total_count() -> integer(). total_count() -> diff --git a/src/ejabberd_sm_mnesia.erl b/src/ejabberd_sm_mnesia.erl index 6debecfe128..d890d8b26a7 100644 --- a/src/ejabberd_sm_mnesia.erl +++ b/src/ejabberd_sm_mnesia.erl @@ -79,9 +79,10 @@ cleanup(Node) -> [{#session{sid = {'_', '$1'}, _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}]), + ejabberd_sm:sessions_cleanup(Es), lists:foreach(fun(#session{sid = SID} = Session) -> mnesia:delete({session, SID}), - ejabberd_sm:run_session_cleanup_hook(Session) + ejabberd_sm:session_cleanup(Session) end, Es) end, mnesia:async_dirty(F). diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl index c6626c065db..096fb4155a0 100644 --- a/src/ejabberd_sm_redis.erl +++ b/src/ejabberd_sm_redis.erl @@ -116,7 +116,7 @@ cleanup(Node) -> maybe_initial_cleanup(Node, Initial) -> Hashes = mongoose_redis:cmd(["SMEMBERS", n(Node)]), mongoose_redis:cmd(["DEL", n(Node)]), - lists:foreach(fun(H) -> + Sessions = lists:map(fun(H) -> [_, U, S, R | SIDEncoded] = re:split(H, ":"), %% Add possible removed ":" from encoded SID SID = binary_to_term(mongoose_bin:join(SIDEncoded, <<":">>)), @@ -125,10 +125,12 @@ maybe_initial_cleanup(Node, Initial) -> true -> ok; false -> - ejabberd_sm:run_session_cleanup_hook(#session{usr = {U, S, R}, - sid = SID}) + Session = #session{usr = {U, S, R}, sid = SID}, + ejabberd_sm:session_cleanup(Session), + Session end - end, Hashes). + end, Hashes), + ejabberd_sm:sessions_cleanup(Sessions). -spec total_count() -> integer(). total_count() -> diff --git a/src/hooks/mongoose_hooks.erl b/src/hooks/mongoose_hooks.erl index 5776621a288..95ff0f3b70f 100644 --- a/src/hooks/mongoose_hooks.erl +++ b/src/hooks/mongoose_hooks.erl @@ -27,6 +27,7 @@ remove_user/3, resend_offline_messages_hook/2, session_cleanup/5, + sessions_cleanup/1, set_vcard/3, unacknowledged_message/2, filter_unacknowledged_messages/3, @@ -369,6 +370,11 @@ session_cleanup(Server, Acc, User, Resource, SID) -> HostType = mongoose_acc:host_type(Acc), run_hook_for_host_type(session_cleanup, HostType, Acc, Params). +-spec sessions_cleanup(ejabberd_sm:session()) -> map(). +sessions_cleanup(Sessions) -> + Params = #{sessions => Sessions}, + run_global_hook(sessions_cleanup, #{}, Params). + %%% @doc The `set_vcard' hook is called when the caller wants to set the VCard. -spec set_vcard(HostType, UserJID, VCard) -> Result when HostType :: mongooseim:host_type(), From 685ab9e82e7aa6a2ffb0569ff0e622a3557f4edd Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 29 Apr 2024 12:02:45 +0200 Subject: [PATCH 02/19] Add a separate function for session_cleanup --- src/ejabberd_sm.erl | 30 ++++++++++++++++++++++- src/hooks/mongoose_hooks.erl | 8 +++---- src/mod_last.erl | 42 +++++++++++++++++++++++++++++---- src/mod_last_backend.erl | 27 ++++++++++++++++++++- src/mod_last_mnesia.erl | 13 +++++++++- src/mod_last_rdbms.erl | 16 ++++++++++--- test/mongoose_cleanup_SUITE.erl | 2 +- 7 files changed, 122 insertions(+), 16 deletions(-) diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 8816616f643..b125892c4ba 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -367,7 +367,35 @@ session_cleanup(#session{usr = {U, S, R}, sid = SID}) -> -spec sessions_cleanup(#session{}) -> mongoose_acc:t(). sessions_cleanup(Sessions) -> - mongoose_hooks:sessions_cleanup(Sessions). + SerSess = [{Server, Session} || Session = #session{usr = {_, Server, _}} <- Sessions], + Servers = lists:usort([Server || {Server, _Session} <- SerSess]), + Map = maps:from_list([{Server, server_to_host_type(Server)} || Server <- Servers]), + HTSession = [{maps:get(Server, Map), Session} || {Server, Session} <- SerSess], + HT2Session = group_sessions(lists:sort(HTSession)), + [mongoose_hooks:sessions_cleanup(HostType, HTSessions) + || {HostType, HTSessions} <- HT2Session, HostType =/= undefined], + ok. + +%% Group sessions by HostType. +%% Sessions should be sorted. +group_sessions([{HostType, Session} | Sessions]) -> + {Acc, Sessions2} = group_sessions(HostType, [Session], Sessions), + [{HostType, Acc} | group_sessions(Sessions2)]; +group_sessions([]) -> + []. + +group_sessions(HostType, Acc, [{HostType, Session} | Sessions]) -> + group_sessions(HostType, [Session | Acc], Sessions); +group_sessions(_HostType, Acc, Sessions) -> + {lists:reverse(Acc), Sessions}. + +server_to_host_type(Server) -> + case mongoose_domain_api:get_domain_host_type(Server) of + {ok, HostType} -> + HostType; + _ -> + undefined + end. -spec terminate_session(jid:jid() | pid(), binary()) -> ok | no_session. terminate_session(#jid{} = Jid, Reason) -> diff --git a/src/hooks/mongoose_hooks.erl b/src/hooks/mongoose_hooks.erl index 95ff0f3b70f..cda71f9fd80 100644 --- a/src/hooks/mongoose_hooks.erl +++ b/src/hooks/mongoose_hooks.erl @@ -27,7 +27,7 @@ remove_user/3, resend_offline_messages_hook/2, session_cleanup/5, - sessions_cleanup/1, + sessions_cleanup/2, set_vcard/3, unacknowledged_message/2, filter_unacknowledged_messages/3, @@ -370,10 +370,10 @@ session_cleanup(Server, Acc, User, Resource, SID) -> HostType = mongoose_acc:host_type(Acc), run_hook_for_host_type(session_cleanup, HostType, Acc, Params). --spec sessions_cleanup(ejabberd_sm:session()) -> map(). -sessions_cleanup(Sessions) -> +-spec sessions_cleanup(mongooseim:host_type(), ejabberd_sm:session()) -> map(). +sessions_cleanup(HostType, Sessions) -> Params = #{sessions => Sessions}, - run_global_hook(sessions_cleanup, #{}, Params). + run_hook_for_host_type(sessions_cleanup, HostType, #{}, Params). %%% @doc The `set_vcard' hook is called when the caller wants to set the VCard. -spec set_vcard(HostType, UserJID, VCard) -> Result when diff --git a/src/mod_last.erl b/src/mod_last.erl index 6ebcae99685..6f2d4a9964f 100644 --- a/src/mod_last.erl +++ b/src/mod_last.erl @@ -44,8 +44,9 @@ process_local_iq/5, process_sm_iq/5, remove_user/3, - on_presence_update/3, + unset_presence_hook/3, session_cleanup/3, + sessions_cleanup/3, remove_domain/3]). %% API @@ -89,8 +90,9 @@ iq_handlers() -> hooks(HostType) -> [{remove_user, HostType, fun ?MODULE:remove_user/3, #{}, 50}, {anonymous_purge_hook, HostType, fun ?MODULE:remove_user/3, #{}, 50}, - {unset_presence_hook, HostType, fun ?MODULE:on_presence_update/3, #{}, 50}, + {unset_presence_hook, HostType, fun ?MODULE:unset_presence_hook/3, #{}, 50}, {session_cleanup, HostType, fun ?MODULE:session_cleanup/3, #{}, 50}, + {sessions_cleanup, HostType, fun ?MODULE:sessions_cleanup/3, #{}, 50}, {remove_domain, HostType, fun ?MODULE:remove_domain/3, #{}, 50} | c2s_hooks(HostType) ]. @@ -262,11 +264,11 @@ maybe_forward_last(Acc) -> {stop, Acc} end. --spec on_presence_update(Acc, Params, Extra) -> {ok, Acc} when +-spec unset_presence_hook(Acc, Params, Extra) -> {ok, Acc} when Acc :: mongoose_acc:t(), Params :: #{jid := jid:jid(), status := status()}, Extra :: gen_hook:extra(). -on_presence_update(Acc, #{jid := #jid{luser = LUser, lserver = LServer}, status := Status}, _) -> +unset_presence_hook(Acc, #{jid := #jid{luser = LUser, lserver = LServer}, status := Status}, _) -> {ok, store_last_info(Acc, LUser, LServer, Status)}. -spec session_cleanup(Acc, Params, Extra) -> {ok, Acc} when @@ -274,7 +276,16 @@ on_presence_update(Acc, #{jid := #jid{luser = LUser, lserver = LServer}, status Params :: #{jid := jid:jid()}, Extra :: gen_hook:extra(). session_cleanup(Acc, #{jid := #jid{luser = LUser, lserver = LServer}}, _) -> - {ok, store_last_info(Acc, LUser, LServer, <<>>)}. + {ok, session_cleanup(Acc, LUser, LServer, <<>>)}. + +-spec sessions_cleanup(Acc, Params, Extra) -> {ok, Acc} when + Acc :: mongoose_acc:t(), + Params :: #{sessions := ejabberd_sm:session()}, + Extra :: gen_hook:extra(). +sessions_cleanup(Acc, #{sessions := Sessions}, _) -> + HostType = mongoose_acc:host_type(Acc), + mod_last_backend:sessions_cleanup(HostType, Sessions), + {ok, Acc}. -spec store_last_info(mongoose_acc:t(), jid:luser(), jid:lserver(), status()) -> mongoose_acc:t(). store_last_info(Acc, LUser, LServer, Status) -> @@ -283,6 +294,13 @@ store_last_info(Acc, LUser, LServer, Status) -> store_last_info(HostType, LUser, LServer, TimeStamp, Status), Acc. +-spec session_cleanup(mongoose_acc:t(), jid:luser(), jid:lserver(), status()) -> mongoose_acc:t(). +session_cleanup(Acc, LUser, LServer, Status) -> + HostType = mongoose_acc:host_type(Acc), + TimeStamp = erlang:system_time(second), + session_cleanup(HostType, LUser, LServer, TimeStamp, Status), + Acc. + -spec store_last_info(mongooseim:host_type(), jid:luser(), jid:lserver(), timestamp(), status()) -> ok. store_last_info(HostType, LUser, LServer, TimeStamp, Status) -> @@ -297,6 +315,20 @@ store_last_info(HostType, LUser, LServer, TimeStamp, Status) -> ok end. +-spec session_cleanup(mongooseim:host_type(), jid:luser(), jid:lserver(), + timestamp(), status()) -> ok. +session_cleanup(HostType, LUser, LServer, TimeStamp, Status) -> + case mod_last_backend:session_cleanup(HostType, LUser, LServer, TimeStamp, Status) of + {error, Reason} -> + ?LOG_ERROR(#{what => session_cleanup_failed, + text => <<"Unexpected error while storing mod_last information">>, + user => LUser, server => LServer, + timestamp => TimeStamp, status => Status, + reason => Reason}); + ok -> + ok + end. + -spec get_last(mongooseim:host_type(), jid:luser(), jid:lserver()) -> {ok, timestamp(), status()} | {error, term()} | not_found. get_last(HostType, LUser, LServer) -> diff --git a/src/mod_last_backend.erl b/src/mod_last_backend.erl index 9a3f2f33740..d7ddd1b88cc 100644 --- a/src/mod_last_backend.erl +++ b/src/mod_last_backend.erl @@ -6,8 +6,10 @@ get_last/3, count_active_users/3, set_last_info/5, + session_cleanup/5, remove_user/3, - remove_domain/2]). + remove_domain/2, + sessions_cleanup/2]). -define(MAIN_MODULE, mod_last). @@ -26,6 +28,13 @@ mod_last:timestamp(), mod_last:status()) -> ok | {error, term()}. +-callback session_cleanup( + mongooseim:host_type(), + jid:luser(), + jid:lserver(), + mod_last:timestamp(), + mod_last:status()) -> ok | {error, term()}. + -callback remove_user(mongooseim:host_type(), jid:luser(), jid:lserver()) -> ok | {error, term()}. @@ -61,6 +70,16 @@ set_last_info(HostType, LUser, LServer, Timestamp, Status) -> Args = [HostType, LUser, LServer, Timestamp, Status], mongoose_backend:call_tracked(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). +-spec session_cleanup( + mongooseim:host_type(), + jid:luser(), + jid:lserver(), + mod_last:timestamp(), + mod_last:status()) -> ok | {error, term()}. +session_cleanup(HostType, LUser, LServer, Timestamp, Status) -> + Args = [HostType, LUser, LServer, Timestamp, Status], + mongoose_backend:call_tracked(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + -spec remove_user(mongooseim:host_type(), jid:luser(), jid:lserver()) -> ok | {error, term()}. remove_user(HostType, LUser, LServer) -> @@ -72,3 +91,9 @@ remove_user(HostType, LUser, LServer) -> remove_domain(HostType, LServer) -> Args = [HostType, LServer], mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + +-spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> + ok | {error, term()}. +sessions_cleanup(HostType, Sessions) -> + Args = [HostType, Sessions], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). diff --git a/src/mod_last_mnesia.erl b/src/mod_last_mnesia.erl index 27555a67bab..6aac910ac18 100644 --- a/src/mod_last_mnesia.erl +++ b/src/mod_last_mnesia.erl @@ -22,8 +22,10 @@ get_last/3, count_active_users/3, set_last_info/5, + session_cleanup/5, remove_user/3, - remove_domain/2]). + remove_domain/2, + sessions_cleanup/2]). -type host_type() :: mongooseim:host_type(). @@ -63,6 +65,11 @@ set_last_info(_HostType, LUser, LServer, TimeStamp, Status) -> end, wrap_transaction_result(mnesia:transaction(F)). +-spec session_cleanup(host_type(), jid:luser(), jid:lserver(), + mod_last:timestamp(), mod_last:status()) -> ok | {error, term()}. +session_cleanup(HostType, LUser, LServer, TimeStamp, Status) -> + set_last_info(HostType, LUser, LServer, TimeStamp, Status). + -spec remove_user(host_type(), jid:luser(), jid:lserver()) -> ok. remove_user(_HostType, LUser, LServer) -> US = {LUser, LServer}, @@ -74,6 +81,10 @@ remove_user(_HostType, LUser, LServer) -> remove_domain(_HostType, _Domain) -> ok. +-spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> ok. +sessions_cleanup(_HostType, _Sessions) -> + ok. + -spec wrap_transaction_result({atomic, ok | term()} | term()) -> ok | {error, term()}. wrap_transaction_result({atomic, ok}) -> ok; wrap_transaction_result({atomic, Error}) -> {error, Error}; diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index fd7d6c69b95..7f7c318641d 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -21,8 +21,10 @@ get_last/3, count_active_users/3, set_last_info/5, + session_cleanup/5, remove_user/3, - remove_domain/2]). + remove_domain/2, + sessions_cleanup/2]). -type host_type() :: mongooseim:host_type(). @@ -81,8 +83,12 @@ count_active_users(HostType, LServer, Seconds) -> Result = execute_count_active_users(HostType, LServer, Seconds), mongoose_rdbms:selected_to_integer(Result). --spec set_last_info(host_type(), jid:luser(), jid:lserver(), - mod_last:timestamp(), mod_last:status()) -> +-spec session_cleanup(host_type(), jid:luser(), jid:lserver(), mod_last:timestamp(), mod_last:status()) -> + ok | {error, term()}. +session_cleanup(HostType, LUser, LServer, Seconds, State) -> + wrap_rdbms_result(execute_upsert_last(HostType, LServer, LUser, Seconds, State)). + +-spec set_last_info(host_type(), jid:luser(), jid:lserver(), mod_last:timestamp(), mod_last:status()) -> ok | {error, term()}. set_last_info(HostType, LUser, LServer, Seconds, State) -> wrap_rdbms_result(execute_upsert_last(HostType, LServer, LUser, Seconds, State)). @@ -105,3 +111,7 @@ decode_last_result({selected, [{DbSeconds, State}]}) -> -spec wrap_rdbms_result({error, term()} | any()) -> ok | {error, term()}. wrap_rdbms_result({error, _} = Error) -> Error; wrap_rdbms_result(_) -> ok. + +-spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> ok. +sessions_cleanup(_HostType, _Sessions) -> + ok. diff --git a/test/mongoose_cleanup_SUITE.erl b/test/mongoose_cleanup_SUITE.erl index f250b29cfc7..21b6b93ab14 100644 --- a/test/mongoose_cleanup_SUITE.erl +++ b/test/mongoose_cleanup_SUITE.erl @@ -184,7 +184,7 @@ last(_Config) -> config_parser_helper:mod_config(mod_last, #{iqdisc => no_queue})), not_found = mod_last:get_last_info(HostType, U, S), Status1 = <<"status1">>, - {ok, #{}} = mod_last:on_presence_update(new_acc(S), #{jid => JID, status => Status1}, #{}), + {ok, #{}} = mod_last:unset_presence_hook(new_acc(S), #{jid => JID, status => Status1}, #{}), {ok, TS1, Status1} = mod_last:get_last_info(HostType, U, S), async_helper:wait_until( fun() -> From c96c0993876fc756b103f176c81565a5c4741af9 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 29 Apr 2024 20:27:46 +0200 Subject: [PATCH 03/19] Add upsert_many function for rdbms_queries --- big_tests/tests/rdbms_SUITE.erl | 18 +++++++- src/rdbms/rdbms_queries.erl | 77 +++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/big_tests/tests/rdbms_SUITE.erl b/big_tests/tests/rdbms_SUITE.erl index bff75958852..8873fa81854 100644 --- a/big_tests/tests/rdbms_SUITE.erl +++ b/big_tests/tests/rdbms_SUITE.erl @@ -85,7 +85,8 @@ rdbms_queries_cases() -> test_failed_transaction_with_execute_wrapped, test_failed_wrapper_transaction, test_incremental_upsert, - arguments_from_two_tables]. + arguments_from_two_tables, + test_upsert_many]. suite() -> escalus:suite(). @@ -602,6 +603,18 @@ do_test_incremental_upsert(Config) -> SelectResult = sql_query(Config, <<"SELECT timestamp FROM inbox">>), ?assertEqual({selected, [{<<"43">>}]}, selected_to_binary(SelectResult)). +test_upsert_many(Config) -> + sql_prepare_upsert_many(Config, 2, upsert_many_last, last, + [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], + [<<"seconds">>, <<"state">>], + [<<"server">>, <<"username">>]), + Insert1 = [<<"localhost">>, <<"alice">>, 0, <<>>], + Insert2 = [<<"localhost">>, <<"bob">>, 0, <<>>], + Update = [0, <<>>], + Key = [], + %% Records keys must be unique (i.e. we cannot insert alice twice) + {updated, 2} = sql_execute_upsert(Config, upsert_many_last, Insert1 ++ Insert2, Update, Key). + %%-------------------------------------------------------------------- %% Text searching %%-------------------------------------------------------------------- @@ -635,6 +648,9 @@ sql_prepare(_Config, Name, Table, Fields, Query) -> sql_prepare_upsert(_Config, Name, Table, Insert, Update, Unique, Incr) -> escalus_ejabberd:rpc(rdbms_queries, prepare_upsert, [host_type(), Name, Table, Insert, Update, Unique, Incr]). +sql_prepare_upsert_many(_Config, RecordCount, Name, Table, Insert, Update, Unique) -> + escalus_ejabberd:rpc(rdbms_queries, prepare_upsert_many, [host_type(), RecordCount, Name, Table, Insert, Update, Unique]). + sql_execute(Config, Name, Parameters) -> ScopeAndTag = scope_and_tag(Config), slow_rpc(mongoose_rdbms, execute, ScopeAndTag ++ [Name, Parameters]). diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index 22146793288..93dd43d5830 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -43,6 +43,7 @@ -export([join/2, prepare_upsert/6, prepare_upsert/7, + prepare_upsert_many/7, execute_upsert/5, execute_upsert/6, request_upsert/5]). @@ -160,6 +161,30 @@ prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> _ -> InsertFields ++ UpdateFields end. +prepared_upsert_many_fields(RecordCount, InsertFields, Updates, UniqueKeyFields) -> + InsertFieldsMany = lists:append(lists:duplicate(RecordCount, InsertFields)), + UpdateFields = lists:filtermap(fun get_field_name/1, Updates), + case mongoose_rdbms:db_type() of + mssql -> + UniqueKeyFields ++ InsertFieldsMany ++ UpdateFields; + _ -> InsertFieldsMany ++ UpdateFields + end. + +-spec prepare_upsert_many(HostType :: mongooseim:host_type_or_global(), + RecordCount :: pos_integer(), + QueryName :: mongoose_rdbms:query_name(), + TableName :: atom(), + InsertFields :: [ColumnName :: binary()], + Updates :: [binary() | {assignment | expression, binary(), binary()}], + UniqueKeyFields :: [binary()]) -> + {ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}. +prepare_upsert_many(HostType, RecordCount, Name, Table, InsertFields, Updates, UniqueKeyFields) -> + SQL = upsert_query_many(HostType, RecordCount, Table, InsertFields, Updates, UniqueKeyFields), + Query = iolist_to_binary(SQL), + ?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}), + Fields = prepared_upsert_many_fields(RecordCount, InsertFields, Updates, UniqueKeyFields), + mongoose_rdbms:prepare(Name, Table, Fields, Query). + upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of {mysql, _} -> @@ -171,6 +196,24 @@ upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, Incrementa NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) end. +-spec upsert_query_many(HostType :: mongooseim:host_type_or_global(), + RecordCount :: pos_integer(), + TableName :: atom(), + InsertFields :: [binary()], + Updates :: [binary() | {assignment | expression, binary(), binary()}], + UniqueKeyFields :: [binary()]) -> + {ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}. +upsert_query_many(HostType, RecordCount, Table, InsertFields, Updates, UniqueKeyFields) -> + case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of + {mysql, _} -> + upsert_many_mysql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields); + {pgsql, _} -> + upsert_many_pgsql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields); + {odbc, mssql} -> + upsert_many_mssql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields); + NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) + end. + mysql_and_pgsql_insert(Table, Columns) -> JoinedFields = join(Columns, <<", ">>), Placeholders = lists:duplicate(length(Columns), $?), @@ -180,6 +223,14 @@ mysql_and_pgsql_insert(Table, Columns) -> join(Placeholders, ", "), ")"]. +mysql_and_pgsql_insert_many(RecordCount, Table, Columns) -> + JoinedFields = join(Columns, <<", ">>), + Placeholders = lists:duplicate(length(Columns), $?), + Values = ["(", join(Placeholders, ", "), ")"], + ManyValues = join(lists:duplicate(RecordCount, Values), ", "), + ["INSERT INTO ", atom_to_binary(Table, utf8), " (", + JoinedFields, ") VALUES ", ManyValues]. + upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) -> Insert = mysql_and_pgsql_insert(Table, InsertFields), OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField), @@ -191,6 +242,16 @@ upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalFie WhereIncrements = pgsql_ensure_increments(Table, IncrementalField), [Insert, OnConflict, WhereIncrements]. +upsert_many_mysql_query(RecordCount, Table, InsertFields, Updates, [Key | _]) -> + Insert = mysql_and_pgsql_insert_many(RecordCount, Table, InsertFields), + OnConflict = mysql_on_conflict(Table, Updates, Key, none), + [Insert, OnConflict]. + +upsert_many_pgsql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields) -> + Insert = mysql_and_pgsql_insert_many(RecordCount, Table, InsertFields), + OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), + [Insert, OnConflict]. + mysql_on_conflict(_Table, [], Key, _) -> %% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL [" ON DUPLICATE KEY UPDATE ", Key, " = ", Key]; @@ -247,6 +308,22 @@ upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) -> " (", JoinedInsertFields, ")" " VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)]. +upsert_many_mssql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields) -> + UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields], + BinTab = atom_to_binary(Table, utf8), + UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields], + JoinedInsertFields = join(InsertFields, ", "), + Placeholders = lists:duplicate(length(InsertFields), $?), + Values = ["(", join(Placeholders, ", "), ")"], + ManyValues = join(lists:duplicate(RecordCount, Values), ", "), + ["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)" + " USING (SELECT ", join(UniqueKeysInSelect, ", "), ")" + " AS source (", join(UniqueKeyFields, ", "), ")" + " ON (", join(UniqueConstraint, " AND "), ")" + " WHEN NOT MATCHED THEN INSERT" + " (", JoinedInsertFields, ")" + " VALUES ", ManyValues | mssql_on_conflict(Updates)]. + mssql_on_conflict([]) -> ";"; mssql_on_conflict(Updates) -> [" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"]. From ef6f55c7e1c5d53ba389be8917ce747fbfc87b48 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 29 Apr 2024 20:28:13 +0200 Subject: [PATCH 04/19] Use parallel batched upserts in mod_last:sessions_cleanup --- big_tests/tests/last_SUITE.erl | 37 ++++++++++++++++- src/hooks/mongoose_hooks.erl | 2 +- src/mod_last.erl | 3 +- src/mod_last_rdbms.erl | 75 ++++++++++++++++++++++++++++++---- 4 files changed, 106 insertions(+), 11 deletions(-) diff --git a/big_tests/tests/last_SUITE.erl b/big_tests/tests/last_SUITE.erl index 55487fea8c7..ccc8cb16433 100644 --- a/big_tests/tests/last_SUITE.erl +++ b/big_tests/tests/last_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("escalus/include/escalus.hrl"). -include_lib("escalus/include/escalus_xmlns.hrl"). -include_lib("exml/include/exml.hrl"). +-include_lib("eunit/include/eunit.hrl"). -import(config_parser_helper, [mod_config_with_auto_backend/2]). @@ -37,7 +38,7 @@ groups() -> valid_test_cases() -> [online_user_query, last_online_user, last_offline_user, - last_server]. + last_server, sessions_cleanup]. invalid_test_cases() -> [user_not_subscribed_receives_error]. @@ -169,6 +170,35 @@ user_not_subscribed_receives_error(Config) -> ok end). +sessions_cleanup(Config) -> + N = distributed_helper:mim(), + HostType = domain_helper:host_type(), + Server = domain_helper:domain(), + CreateUser = fun(Name) -> + SID = {erlang:system_time(microsecond), spawn(fun() -> ok end)}, + JID = mongoose_helper:make_jid(Name, Server, <<"res">>), + Priority = 0, + Info = #{}, + distributed_helper:rpc(N, ejabberd_sm, open_session, [HostType, SID, JID, Priority, Info]) + end, + Names = [<<"user", (list_to_binary((integer_to_list(X))))/binary>> || X <- lists:seq(1, 345)], + measure("create users", fun() -> + lists:foreach(CreateUser, Names) + end), + %% Check that user3 is properly updated + %% User should be registered if we want to use mod_last_api + {ok, _} = distributed_helper:rpc(N, mongoose_account_api, register_user, [<<"user3">>, Server, <<"secret123">>]), + Jid3 = mongoose_helper:make_jid(<<"user3">>, Server, <<>>), + {ok, _} = distributed_helper:rpc(N, mod_last_api, set_last, [Jid3, 1714000000, <<"old status">>]), + {ok, #{timestamp := 1714000000}} = distributed_helper:rpc(N, mod_last_api, get_last, [Jid3]), + measure("node cleanup", fun() -> + distributed_helper:rpc(N#{timeout => timer:minutes(1)}, mongoose_hooks, node_cleanup, [node()]) + end), + {ok, #{timestamp := TS, status := Status} = Data} = distributed_helper:rpc(N, mod_last_api, get_last, [Jid3]), + ?assertNotEqual(TS, 1714000000, Data), + ?assertEqual(Status, <<>>, Data), + ok. + %%----------------------------------------------------------------- %% Helpers @@ -193,3 +223,8 @@ answer_last_activity(IQ = #xmlel{name = <<"iq">>}) -> required_modules() -> [{mod_last, mod_config_with_auto_backend(mod_last, #{iqdisc => one_queue})}]. + +measure(Text, F) -> + {Time, _} = timer:tc(F), + ct:pal("Time ~ts = ~p", [Text, Time]), + ok. diff --git a/src/hooks/mongoose_hooks.erl b/src/hooks/mongoose_hooks.erl index cda71f9fd80..97e459be7f3 100644 --- a/src/hooks/mongoose_hooks.erl +++ b/src/hooks/mongoose_hooks.erl @@ -373,7 +373,7 @@ session_cleanup(Server, Acc, User, Resource, SID) -> -spec sessions_cleanup(mongooseim:host_type(), ejabberd_sm:session()) -> map(). sessions_cleanup(HostType, Sessions) -> Params = #{sessions => Sessions}, - run_hook_for_host_type(sessions_cleanup, HostType, #{}, Params). + run_hook_for_host_type(sessions_cleanup, HostType, #{host_type => HostType}, Params). %%% @doc The `set_vcard' hook is called when the caller wants to set the VCard. -spec set_vcard(HostType, UserJID, VCard) -> Result when diff --git a/src/mod_last.erl b/src/mod_last.erl index 6f2d4a9964f..c0d679d0479 100644 --- a/src/mod_last.erl +++ b/src/mod_last.erl @@ -282,8 +282,7 @@ session_cleanup(Acc, #{jid := #jid{luser = LUser, lserver = LServer}}, _) -> Acc :: mongoose_acc:t(), Params :: #{sessions := ejabberd_sm:session()}, Extra :: gen_hook:extra(). -sessions_cleanup(Acc, #{sessions := Sessions}, _) -> - HostType = mongoose_acc:host_type(Acc), +sessions_cleanup(Acc = #{host_type := HostType}, #{sessions := Sessions}, _) -> mod_last_backend:sessions_cleanup(HostType, Sessions), {ok, Acc}. diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index 7f7c318641d..69ebb8cd76e 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -15,6 +15,8 @@ -behaviour(mod_last_backend). -include("mongoose.hrl"). +-include("session.hrl"). +-include("mongoose_logger.hrl"). %% API -export([init/2, @@ -43,10 +45,13 @@ prepare_queries(HostType) -> <<"DELETE FROM last WHERE server = ? AND username = ?">>), mongoose_rdbms:prepare(last_remove_domain, last, [server], <<"DELETE FROM last WHERE server = ?">>), - rdbms_queries:prepare_upsert(HostType, last_upsert, last, - [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], - [<<"seconds">>, <<"state">>], - [<<"server">>, <<"username">>]). + Ins = [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], + Upd = [<<"seconds">>, <<"state">>], + Key = [<<"server">>, <<"username">>], + rdbms_queries:prepare_upsert(HostType, last_upsert, last, Ins, Upd, Key), + rdbms_queries:prepare_upsert_many(HostType, 10, last_upsert_many10, last, Ins, Upd, Key), + rdbms_queries:prepare_upsert_many(HostType, 100, last_upsert_many100, last, Ins, Upd, Key), + ok. -spec execute_get_last(host_type(), jid:lserver(), jid:luser()) -> mongoose_rdbms:query_result(). execute_get_last(HostType, LServer, LUser) -> @@ -85,8 +90,9 @@ count_active_users(HostType, LServer, Seconds) -> -spec session_cleanup(host_type(), jid:luser(), jid:lserver(), mod_last:timestamp(), mod_last:status()) -> ok | {error, term()}. -session_cleanup(HostType, LUser, LServer, Seconds, State) -> - wrap_rdbms_result(execute_upsert_last(HostType, LServer, LUser, Seconds, State)). +session_cleanup(_HostType, _LUser, _LServer, _Seconds, _State) -> + %% Cleaning is done in sessions_cleanup + ok. -spec set_last_info(host_type(), jid:luser(), jid:lserver(), mod_last:timestamp(), mod_last:status()) -> ok | {error, term()}. @@ -113,5 +119,60 @@ wrap_rdbms_result({error, _} = Error) -> Error; wrap_rdbms_result(_) -> ok. -spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> ok. -sessions_cleanup(_HostType, _Sessions) -> +sessions_cleanup(HostType, Sessions) -> + Seconds = erlang:system_time(second), + %% server, username, seconds, state + Records = [[S, U, Seconds, <<>>] || #session{usr = {U, S, _}} <- Sessions], + %% PgSQL would complain if there are duplicates (i.e. when there are two sessions + %% with the same name but different resources) + Records2 = lists:usort(Records), + UpdateParams = [Seconds, <<>>], + UniqueKeyValues = [], + {Singles, Many100} = bucketize(100, Records2, []), + {Singles2, Many10} = bucketize(10, Singles, []), + %% Prepare data for queries + Tasks = + [{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++ + [{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++ + [{1, last_upsert, Rec} || Rec <- Singles2], + RunTask = fun({Count, QueryName, InsertParams}) -> + {updated, Count} = rdbms_queries:execute_upsert(HostType, QueryName, + InsertParams, UpdateParams, UniqueKeyValues) + end, + %% Run tasks in parallel + RunTasks = fun(TasksList) -> lists:map(RunTask, TasksList) end, + Workers = 8, + BatchSize = length(Tasks) div Workers, + Batches = smear(BatchSize, Tasks), + Results = mongoose_lib:pmap(RunTasks, Batches, timer:minutes(1)), + [check_result(Res) || Res <- Results], ok. + +check_result({ok, Results}) -> + lists:foreach(fun({updated, _}) -> ok end, Results), + ok; +check_result({error, Reason}) -> + ?LOG_ERROR(#{what => session_cleanup_failed, reason => Reason}). + +%% Create chunks of size N +bucketize(N, Records, Acc) -> + try + lists:split(N, Records) + of + {Batch, Records2} -> + bucketize(N, Records2, [Batch | Acc]) + catch error:badarg -> + {Records, Acc} + end. + +%% Spread elements into buckets one element at a time before moving to the next bucket +smear(N, Tasks) -> + Buckets = lists:duplicate(N, []), + smear(Tasks, Buckets, []). + +smear([Task | Tasks], [Bucket | Buckets], Acc) -> + smear(Tasks, Buckets, [[Task | Bucket] | Acc]); +smear([], Buckets, Acc) -> + Buckets ++ Acc; +smear(Tasks, [], Acc) -> + smear(Tasks, lists:reverse(Acc), []). From ec0bc0b55fc79a643a9995e95561869367ad91bc Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 30 Apr 2024 13:38:29 +0200 Subject: [PATCH 05/19] Add unit tests for bucketize and spread --- src/mod_last_rdbms.erl | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index 69ebb8cd76e..6909e8989a4 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -128,8 +128,8 @@ sessions_cleanup(HostType, Sessions) -> Records2 = lists:usort(Records), UpdateParams = [Seconds, <<>>], UniqueKeyValues = [], - {Singles, Many100} = bucketize(100, Records2, []), - {Singles2, Many10} = bucketize(10, Singles, []), + {Singles, Many100} = bucketize(100, Records2), + {Singles2, Many10} = bucketize(10, Singles), %% Prepare data for queries Tasks = [{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++ @@ -143,7 +143,7 @@ sessions_cleanup(HostType, Sessions) -> RunTasks = fun(TasksList) -> lists:map(RunTask, TasksList) end, Workers = 8, BatchSize = length(Tasks) div Workers, - Batches = smear(BatchSize, Tasks), + Batches = spread(BatchSize, Tasks), Results = mongoose_lib:pmap(RunTasks, Batches, timer:minutes(1)), [check_result(Res) || Res <- Results], ok. @@ -155,6 +155,9 @@ check_result({error, Reason}) -> ?LOG_ERROR(#{what => session_cleanup_failed, reason => Reason}). %% Create chunks of size N +bucketize(N, Records) -> + bucketize(N, Records, []). + bucketize(N, Records, Acc) -> try lists:split(N, Records) @@ -162,17 +165,28 @@ bucketize(N, Records, Acc) -> {Batch, Records2} -> bucketize(N, Records2, [Batch | Acc]) catch error:badarg -> - {Records, Acc} + {Records, lists:reverse(Acc)} end. %% Spread elements into buckets one element at a time before moving to the next bucket -smear(N, Tasks) -> +spread(N, Tasks) -> Buckets = lists:duplicate(N, []), - smear(Tasks, Buckets, []). - -smear([Task | Tasks], [Bucket | Buckets], Acc) -> - smear(Tasks, Buckets, [[Task | Bucket] | Acc]); -smear([], Buckets, Acc) -> - Buckets ++ Acc; -smear(Tasks, [], Acc) -> - smear(Tasks, lists:reverse(Acc), []). + spread(lists:reverse(Tasks), Buckets, []). + +spread([Task | Tasks], [Bucket | Buckets], Acc) -> + spread(Tasks, Buckets, [[Task | Bucket] | Acc]); +spread([], Buckets, Acc) -> + Acc ++ lists:reverse(Buckets); +spread(Tasks, [], Acc) -> + spread(Tasks, lists:reverse(Acc), []). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +bucketize_test_() -> + [?_assertEqual({[10], [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}, bucketize(3, lists:seq(1, 10)))]. + +spread_test_() -> + [?_assertEqual([[1, 4, 7, 10], [2, 5, 8], [3, 6, 9]], spread(3, lists:seq(1, 10))), + ?_assertEqual([[1, 6], [2, 7], [3, 8], [4, 9], [5, 10]], spread(5, lists:seq(1, 10))), + ?_assertEqual([[1, 3, 5, 7, 9], [2, 4, 6, 8, 10]], spread(2, lists:seq(1, 10)))]. +-endif. From 201c575b93de0ce72e4afab125bbf5ed4674c121 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 30 Apr 2024 13:58:59 +0200 Subject: [PATCH 06/19] Fix dialyzer --- src/ejabberd_sm.erl | 2 +- src/hooks/mongoose_hooks.erl | 2 +- src/mod_last.erl | 4 ++-- src/mod_last_backend.erl | 3 +++ src/rdbms/rdbms_queries.erl | 7 ------- 5 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index b125892c4ba..2d739aea075 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -365,7 +365,7 @@ session_cleanup(#session{usr = {U, S, R}, sid = SID}) -> element => undefined}), mongoose_hooks:session_cleanup(S, Acc, U, R, SID). --spec sessions_cleanup(#session{}) -> mongoose_acc:t(). +-spec sessions_cleanup([#session{}]) -> ok. sessions_cleanup(Sessions) -> SerSess = [{Server, Session} || Session = #session{usr = {_, Server, _}} <- Sessions], Servers = lists:usort([Server || {Server, _Session} <- SerSess]), diff --git a/src/hooks/mongoose_hooks.erl b/src/hooks/mongoose_hooks.erl index 97e459be7f3..dc461743e14 100644 --- a/src/hooks/mongoose_hooks.erl +++ b/src/hooks/mongoose_hooks.erl @@ -370,7 +370,7 @@ session_cleanup(Server, Acc, User, Resource, SID) -> HostType = mongoose_acc:host_type(Acc), run_hook_for_host_type(session_cleanup, HostType, Acc, Params). --spec sessions_cleanup(mongooseim:host_type(), ejabberd_sm:session()) -> map(). +-spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> map(). sessions_cleanup(HostType, Sessions) -> Params = #{sessions => Sessions}, run_hook_for_host_type(sessions_cleanup, HostType, #{host_type => HostType}, Params). diff --git a/src/mod_last.erl b/src/mod_last.erl index c0d679d0479..f4daf3a4a95 100644 --- a/src/mod_last.erl +++ b/src/mod_last.erl @@ -279,8 +279,8 @@ session_cleanup(Acc, #{jid := #jid{luser = LUser, lserver = LServer}}, _) -> {ok, session_cleanup(Acc, LUser, LServer, <<>>)}. -spec sessions_cleanup(Acc, Params, Extra) -> {ok, Acc} when - Acc :: mongoose_acc:t(), - Params :: #{sessions := ejabberd_sm:session()}, + Acc :: map(), + Params :: #{sessions := [ejabberd_sm:session()]}, Extra :: gen_hook:extra(). sessions_cleanup(Acc = #{host_type := HostType}, #{sessions := Sessions}, _) -> mod_last_backend:sessions_cleanup(HostType, Sessions), diff --git a/src/mod_last_backend.erl b/src/mod_last_backend.erl index d7ddd1b88cc..6b37e8ac1f5 100644 --- a/src/mod_last_backend.erl +++ b/src/mod_last_backend.erl @@ -35,6 +35,9 @@ mod_last:timestamp(), mod_last:status()) -> ok | {error, term()}. +-callback sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> + ok | {error, term()}. + -callback remove_user(mongooseim:host_type(), jid:luser(), jid:lserver()) -> ok | {error, term()}. diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index 93dd43d5830..2fdacb96a95 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -196,13 +196,6 @@ upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, Incrementa NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) end. --spec upsert_query_many(HostType :: mongooseim:host_type_or_global(), - RecordCount :: pos_integer(), - TableName :: atom(), - InsertFields :: [binary()], - Updates :: [binary() | {assignment | expression, binary(), binary()}], - UniqueKeyFields :: [binary()]) -> - {ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}. upsert_query_many(HostType, RecordCount, Table, InsertFields, Updates, UniqueKeyFields) -> case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of {mysql, _} -> From 7e3b964593152383590e325030d5efb23ad1ecfc Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 8 May 2024 22:36:37 +0200 Subject: [PATCH 07/19] Fix multi upsert for MSSQL --- big_tests/tests/rdbms_SUITE.erl | 24 +++++++++++++--- src/mod_last_rdbms.erl | 7 +++-- src/rdbms/rdbms_queries.erl | 51 ++++++++++++++++++++++++++------- 3 files changed, 66 insertions(+), 16 deletions(-) diff --git a/big_tests/tests/rdbms_SUITE.erl b/big_tests/tests/rdbms_SUITE.erl index 8873fa81854..aff7b8ef7f4 100644 --- a/big_tests/tests/rdbms_SUITE.erl +++ b/big_tests/tests/rdbms_SUITE.erl @@ -86,7 +86,8 @@ rdbms_queries_cases() -> test_failed_wrapper_transaction, test_incremental_upsert, arguments_from_two_tables, - test_upsert_many]. + test_upsert_many1, + test_upsert_many2]. suite() -> escalus:suite(). @@ -603,8 +604,19 @@ do_test_incremental_upsert(Config) -> SelectResult = sql_query(Config, <<"SELECT timestamp FROM inbox">>), ?assertEqual({selected, [{<<"43">>}]}, selected_to_binary(SelectResult)). -test_upsert_many(Config) -> - sql_prepare_upsert_many(Config, 2, upsert_many_last, last, +test_upsert_many1(Config) -> + sql_prepare_upsert_many(Config, 1, upsert_many_last1, last, + [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], + [<<"seconds">>, <<"state">>], + [<<"server">>, <<"username">>]), + Insert1 = [<<"localhost">>, <<"kate">>, 0, <<>>], + Update = [0, <<>>], + Key = [], + %% Records keys must be unique (i.e. we cannot insert alice twice) + {updated, 1} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update, Key). + +test_upsert_many2(Config) -> + sql_prepare_upsert_many(Config, 2, upsert_many_last2, last, [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], [<<"seconds">>, <<"state">>], [<<"server">>, <<"username">>]), @@ -613,7 +625,7 @@ test_upsert_many(Config) -> Update = [0, <<>>], Key = [], %% Records keys must be unique (i.e. we cannot insert alice twice) - {updated, 2} = sql_execute_upsert(Config, upsert_many_last, Insert1 ++ Insert2, Update, Key). + {updated, 2} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update, Key). %%-------------------------------------------------------------------- %% Text searching @@ -686,6 +698,10 @@ sql_execute_upsert(Config, Name, Insert, Update, Unique) -> ScopeAndTag = scope_and_tag(Config), slow_rpc(rdbms_queries, execute_upsert, ScopeAndTag ++ [Name, Insert, Update, Unique]). +sql_execute_upsert_many(Config, Name, Insert, Update, Unique) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(rdbms_queries, execute_upsert_many, ScopeAndTag ++ [Name, Insert, Update, Unique]). + sql_query_request(Config, Query) -> ScopeAndTag = scope_and_tag(Config), slow_rpc(mongoose_rdbms, sql_query_request, ScopeAndTag ++ [Query]). diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index 6909e8989a4..a3ae43f4484 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -135,8 +135,11 @@ sessions_cleanup(HostType, Sessions) -> [{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++ [{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++ [{1, last_upsert, Rec} || Rec <- Singles2], - RunTask = fun({Count, QueryName, InsertParams}) -> - {updated, Count} = rdbms_queries:execute_upsert(HostType, QueryName, + RunTask = fun({1, QueryName, InsertParams}) -> + {updated, 1} = rdbms_queries:execute_upsert(HostType, QueryName, + InsertParams, UpdateParams, UniqueKeyValues); + ({Count, QueryName, InsertParams}) -> + {updated, Count} = rdbms_queries:execute_upsert_many(HostType, QueryName, InsertParams, UpdateParams, UniqueKeyValues) end, %% Run tasks in parallel diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index 2fdacb96a95..60efb16fe0e 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -45,6 +45,7 @@ prepare_upsert/7, prepare_upsert_many/7, execute_upsert/5, execute_upsert/6, + execute_upsert_many/5, execute_upsert_many/6, request_upsert/5]). -ignore_xref([ @@ -109,6 +110,31 @@ execute_upsert(HostType, PoolTag, Name, InsertParams, UpdateParams, UniqueKeyVal NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) end. +-spec execute_upsert_many(HostType :: mongooseim:host_type_or_global(), + Name :: atom(), + InsertParams :: [any()], + UpdateParams :: [any()], + UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). +execute_upsert_many(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) -> + execute_upsert_many(HostType, default, Name, InsertParams, UpdateParams, UniqueKeyValues). + +-spec execute_upsert_many(HostType :: mongooseim:host_type_or_global(), + PoolTag :: mongoose_wpool:tag(), + Name :: atom(), + InsertParams :: [any()], + UpdateParams :: [any()], + UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). +execute_upsert_many(HostType, PoolTag, Name, InsertParams, UpdateParams, UniqueKeyValues) -> + case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of + {mysql, _} -> + mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams); + {pgsql, _} -> + mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams); + {odbc, mssql} -> + mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams); + NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) + end. + -spec request_upsert(HostType :: mongooseim:host_type_or_global(), Name :: atom(), InsertParams :: [any()], @@ -166,7 +192,7 @@ prepared_upsert_many_fields(RecordCount, InsertFields, Updates, UniqueKeyFields) UpdateFields = lists:filtermap(fun get_field_name/1, Updates), case mongoose_rdbms:db_type() of mssql -> - UniqueKeyFields ++ InsertFieldsMany ++ UpdateFields; + InsertFieldsMany; _ -> InsertFieldsMany ++ UpdateFields end. @@ -301,21 +327,26 @@ upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) -> " (", JoinedInsertFields, ")" " VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)]. +%% see prepared_upsert_many_fields upsert_many_mssql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields) -> - UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields], BinTab = atom_to_binary(Table, utf8), - UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields], + UniqueConstraint = [["tgt.", Key, " = new.", Key] || Key <- UniqueKeyFields], + NewJoinedInsertFields = join([["new.", Column] || Column <- InsertFields], ", "), JoinedInsertFields = join(InsertFields, ", "), Placeholders = lists:duplicate(length(InsertFields), $?), Values = ["(", join(Placeholders, ", "), ")"], ManyValues = join(lists:duplicate(RecordCount, Values), ", "), - ["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)" - " USING (SELECT ", join(UniqueKeysInSelect, ", "), ")" - " AS source (", join(UniqueKeyFields, ", "), ")" - " ON (", join(UniqueConstraint, " AND "), ")" - " WHEN NOT MATCHED THEN INSERT" - " (", JoinedInsertFields, ")" - " VALUES ", ManyValues | mssql_on_conflict(Updates)]. + ["MERGE ", BinTab, " AS tgt", + " USING (VALUES", ManyValues, ")" + " AS new (", JoinedInsertFields, ")" + " ON (", join(UniqueConstraint, " AND "), ")" + " WHEN MATCHED THEN UPDATE SET ", tgt_equals_new(Updates), + " WHEN NOT MATCHED THEN INSERT (", JoinedInsertFields, ")", + " VALUES(", NewJoinedInsertFields, ");"]. + +tgt_equals_new(Updates) -> + join([["tgt.", ColumnName, "=new.", ColumnName] || ColumnName <- Updates], ", "). + mssql_on_conflict([]) -> ";"; mssql_on_conflict(Updates) -> From a493396a3bcd4d87e8deb54487f76475193c8a5c Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 9 May 2024 21:41:11 +0200 Subject: [PATCH 08/19] Fix cleaning in Redis backend Properly calculate the node name using pids --- src/ejabberd_sm_redis.erl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl index 096fb4155a0..39f7e7f52f9 100644 --- a/src/ejabberd_sm_redis.erl +++ b/src/ejabberd_sm_redis.erl @@ -76,18 +76,19 @@ get_sessions(User, Server, Resource) -> Session :: ejabberd_sm:session()) -> ok | {error, term()}. set_session(User, Server, Resource, Session) -> OldSessions = get_sessions(User, Server, Resource), + Node = sid_to_node(Session#session.sid), case lists:keysearch(Session#session.sid, #session.sid, OldSessions) of {value, OldSession} -> BOldSession = term_to_binary(OldSession), BSession = term_to_binary(Session), - mongoose_redis:cmds([["SADD", n(node()), hash(User, Server, Resource, Session#session.sid)], + mongoose_redis:cmds([["SADD", n(Node), hash(User, Server, Resource, Session#session.sid)], ["SREM", hash(User, Server), BOldSession], ["SREM", hash(User, Server, Resource), BOldSession], ["SADD", hash(User, Server), BSession], ["SADD", hash(User, Server, Resource), BSession]]); false -> BSession = term_to_binary(Session), - mongoose_redis:cmds([["SADD", n(node()), hash(User, Server, Resource, Session#session.sid)], + mongoose_redis:cmds([["SADD", n(Node), hash(User, Server, Resource, Session#session.sid)], ["SADD", hash(User, Server), BSession], ["SADD", hash(User, Server, Resource), BSession]]) end. @@ -103,7 +104,7 @@ delete_session(SID, User, Server, Resource) -> BSession = term_to_binary(Session), mongoose_redis:cmds([["SREM", hash(User, Server), BSession], ["SREM", hash(User, Server, Resource), BSession], - ["SREM", n(node()), hash(User, Server, Resource, SID)]]); + ["SREM", n(sid_to_node(SID)), hash(User, Server, Resource, SID)]]); false -> ok end. @@ -162,3 +163,7 @@ hash(Val1, Val2, Val3, Val4) -> -spec n(atom()) -> iolist(). n(Node) -> ["n:", atom_to_list(Node)]. + +sid_to_node(SID) -> + {_, Pid} = SID, + node(Pid). From 1c0d9de6986c3047f4164e72ef4fd5002449955d Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 9 May 2024 21:41:31 +0200 Subject: [PATCH 09/19] Use replace for MySQL For multiinserts --- src/rdbms/rdbms_queries.erl | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index 60efb16fe0e..24acbc29221 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -127,7 +127,7 @@ execute_upsert_many(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) execute_upsert_many(HostType, PoolTag, Name, InsertParams, UpdateParams, UniqueKeyValues) -> case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of {mysql, _} -> - mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams); + mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams); {pgsql, _} -> mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams); {odbc, mssql} -> @@ -225,7 +225,7 @@ upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, Incrementa upsert_query_many(HostType, RecordCount, Table, InsertFields, Updates, UniqueKeyFields) -> case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of {mysql, _} -> - upsert_many_mysql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields); + upsert_many_mysql_query(RecordCount, Table, InsertFields); {pgsql, _} -> upsert_many_pgsql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields); {odbc, mssql} -> @@ -242,7 +242,7 @@ mysql_and_pgsql_insert(Table, Columns) -> join(Placeholders, ", "), ")"]. -mysql_and_pgsql_insert_many(RecordCount, Table, Columns) -> +pgsql_insert_many(RecordCount, Table, Columns) -> JoinedFields = join(Columns, <<", ">>), Placeholders = lists:duplicate(length(Columns), $?), Values = ["(", join(Placeholders, ", "), ")"], @@ -261,13 +261,17 @@ upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalFie WhereIncrements = pgsql_ensure_increments(Table, IncrementalField), [Insert, OnConflict, WhereIncrements]. -upsert_many_mysql_query(RecordCount, Table, InsertFields, Updates, [Key | _]) -> - Insert = mysql_and_pgsql_insert_many(RecordCount, Table, InsertFields), - OnConflict = mysql_on_conflict(Table, Updates, Key, none), - [Insert, OnConflict]. +upsert_many_mysql_query(RecordCount, Table, InsertFields) -> + TableName = atom_to_list(Table), + Columns = join(InsertFields, ", "), + Placeholders = lists:duplicate(length(InsertFields), $?), + Values = ["(", join(Placeholders, ", "), ")"], + ManyValues = join(lists:duplicate(RecordCount, Values), ", "), + ["REPLACE INTO ", TableName, " (", Columns, ") ", + " VALUES ", ManyValues]. upsert_many_pgsql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields) -> - Insert = mysql_and_pgsql_insert_many(RecordCount, Table, InsertFields), + Insert = pgsql_insert_many(RecordCount, Table, InsertFields), OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), [Insert, OnConflict]. From 383050b3465262c59b0cc237563eae64de062d8e Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 9 May 2024 21:41:49 +0200 Subject: [PATCH 10/19] Do not check the result of updates --- src/mod_last_rdbms.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index a3ae43f4484..ef04cf2f176 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -136,10 +136,11 @@ sessions_cleanup(HostType, Sessions) -> [{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++ [{1, last_upsert, Rec} || Rec <- Singles2], RunTask = fun({1, QueryName, InsertParams}) -> - {updated, 1} = rdbms_queries:execute_upsert(HostType, QueryName, + {updated, _} = rdbms_queries:execute_upsert(HostType, QueryName, InsertParams, UpdateParams, UniqueKeyValues); - ({Count, QueryName, InsertParams}) -> - {updated, Count} = rdbms_queries:execute_upsert_many(HostType, QueryName, + ({_Count1, QueryName, InsertParams}) -> + %% MySQL replace returns wrong numbers + {updated, _Count2} = rdbms_queries:execute_upsert_many(HostType, QueryName, InsertParams, UpdateParams, UniqueKeyValues) end, %% Run tasks in parallel From 6ca204a4d735e2da09233b250f3eb0dc8a5e494c Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 9 May 2024 21:42:05 +0200 Subject: [PATCH 11/19] Add more tests for multi-inserts --- big_tests/tests/rdbms_SUITE.erl | 52 +++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/big_tests/tests/rdbms_SUITE.erl b/big_tests/tests/rdbms_SUITE.erl index aff7b8ef7f4..9ff335f4012 100644 --- a/big_tests/tests/rdbms_SUITE.erl +++ b/big_tests/tests/rdbms_SUITE.erl @@ -87,7 +87,9 @@ rdbms_queries_cases() -> test_incremental_upsert, arguments_from_two_tables, test_upsert_many1, - test_upsert_many2]. + test_upsert_many2, + test_upsert_many1_replaces_existing, + test_upsert_many2_replaces_existing]. suite() -> escalus:suite(). @@ -605,6 +607,7 @@ do_test_incremental_upsert(Config) -> ?assertEqual({selected, [{<<"43">>}]}, selected_to_binary(SelectResult)). test_upsert_many1(Config) -> + erase_last(Config), sql_prepare_upsert_many(Config, 1, upsert_many_last1, last, [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], [<<"seconds">>, <<"state">>], @@ -616,6 +619,7 @@ test_upsert_many1(Config) -> {updated, 1} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update, Key). test_upsert_many2(Config) -> + erase_last(Config), sql_prepare_upsert_many(Config, 2, upsert_many_last2, last, [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], [<<"seconds">>, <<"state">>], @@ -627,6 +631,42 @@ test_upsert_many2(Config) -> %% Records keys must be unique (i.e. we cannot insert alice twice) {updated, 2} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update, Key). +test_upsert_many1_replaces_existing(Config) -> + erase_last(Config), + sql_prepare_upsert_many(Config, 1, upsert_many_last1, last, + [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], + [<<"seconds">>, <<"state">>], + [<<"server">>, <<"username">>]), + Insert1 = [<<"localhost">>, <<"kate">>, 0, <<>>], + Update1 = [0, <<>>], + Insert2 = [<<"localhost">>, <<"kate">>, 10, <<>>], + Update2 = [10, <<>>], + Key = [], + %% Replace returns wrong numbers with MySQL (2 instead of 1, 4 instead of 2) + {updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update1, Key), + {updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert2, Update2, Key), + SelectResult = sql_query(Config, <<"SELECT seconds FROM last">>), + ?assertEqual({selected, [{<<"10">>}]}, selected_to_binary(SelectResult)). + +test_upsert_many2_replaces_existing(Config) -> + erase_last(Config), + sql_prepare_upsert_many(Config, 2, upsert_many_last2, last, + [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], + [<<"seconds">>, <<"state">>], + [<<"server">>, <<"username">>]), + Insert1 = [<<"localhost">>, <<"alice">>, 0, <<>>], + Insert3 = [<<"localhost">>, <<"alice">>, 10, <<>>], + Insert2 = [<<"localhost">>, <<"bob">>, 0, <<>>], + Insert4 = [<<"localhost">>, <<"bob">>, 10, <<>>], + Update1 = [0, <<>>], + Update3 = [10, <<>>], + Key = [], + %% Records keys must be unique (i.e. we cannot insert alice twice) + {updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update1, Key), + {updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert3 ++ Insert4, Update3, Key), + SelectResult = sql_query(Config, <<"SELECT seconds FROM last">>), + ?assertEqual({selected, [{<<"10">>}, {<<"10">>}]}, selected_to_binary(SelectResult)). + %%-------------------------------------------------------------------- %% Text searching %%-------------------------------------------------------------------- @@ -757,6 +797,9 @@ decode_boolean(_Config, Value) -> erase_table(Config) -> {updated, _} = sql_query(Config, <<"DELETE FROM test_types">>). +erase_last(Config) -> + {updated, _} = sql_query(Config, <<"DELETE FROM last">>). + erase_users(Config) -> {updated, _} = sql_query(Config, <<"DELETE FROM users">>), {updated, _} = sql_query(Config, <<"DELETE FROM last">>). @@ -797,11 +840,14 @@ integer_to_binary_or_null(X) -> integer_to_binary(X). %% Helper function to transform values to an uniform format. %% Single tuple, single element case. %% In ODBC int32 is integer, but int64 is binary. -selected_to_binary({selected, [{Value}]}) when is_integer(Value) -> - {selected, [{integer_to_binary(Value)}]}; +selected_to_binary({selected, Rows}) -> + {selected, [row_to_binary(Row) || Row <- Rows]}; selected_to_binary(Other) -> Other. +row_to_binary(Row) -> + list_to_tuple([value_to_binary(Value) || Value <- tuple_to_list(Row)]). + selected_to_sorted({selected, Rows}) -> {selected, lists:sort(Rows)}; selected_to_sorted(Other) -> From cfb7a63340fcce272fb03bbfc3f63a1c1556d36a Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 9 May 2024 22:13:12 +0200 Subject: [PATCH 12/19] Fix error in metrics_session_SUITE:session:login_one suite Clean sessionCount properly --- big_tests/tests/last_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/big_tests/tests/last_SUITE.erl b/big_tests/tests/last_SUITE.erl index ccc8cb16433..f69ec16bad4 100644 --- a/big_tests/tests/last_SUITE.erl +++ b/big_tests/tests/last_SUITE.erl @@ -197,7 +197,7 @@ sessions_cleanup(Config) -> {ok, #{timestamp := TS, status := Status} = Data} = distributed_helper:rpc(N, mod_last_api, get_last, [Jid3]), ?assertNotEqual(TS, 1714000000, Data), ?assertEqual(Status, <<>>, Data), - ok. + distributed_helper:rpc(N, mongoose_metrics, update, [HostType, sessionCount, -345]). %%----------------------------------------------------------------- From 1757f5d57b9318444e8191a862e1c8889a8cc694 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 10 May 2024 12:49:20 +0200 Subject: [PATCH 13/19] Remove Key argument from multiinsert Key does not make sense with more records Fix xref --- big_tests/tests/rdbms_SUITE.erl | 20 ++++++++------------ src/mod_last_rdbms.erl | 5 ++--- src/rdbms/rdbms_queries.erl | 17 ++++++++--------- 3 files changed, 18 insertions(+), 24 deletions(-) diff --git a/big_tests/tests/rdbms_SUITE.erl b/big_tests/tests/rdbms_SUITE.erl index 9ff335f4012..e690cfd57fc 100644 --- a/big_tests/tests/rdbms_SUITE.erl +++ b/big_tests/tests/rdbms_SUITE.erl @@ -614,9 +614,8 @@ test_upsert_many1(Config) -> [<<"server">>, <<"username">>]), Insert1 = [<<"localhost">>, <<"kate">>, 0, <<>>], Update = [0, <<>>], - Key = [], %% Records keys must be unique (i.e. we cannot insert alice twice) - {updated, 1} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update, Key). + {updated, 1} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update). test_upsert_many2(Config) -> erase_last(Config), @@ -627,9 +626,8 @@ test_upsert_many2(Config) -> Insert1 = [<<"localhost">>, <<"alice">>, 0, <<>>], Insert2 = [<<"localhost">>, <<"bob">>, 0, <<>>], Update = [0, <<>>], - Key = [], %% Records keys must be unique (i.e. we cannot insert alice twice) - {updated, 2} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update, Key). + {updated, 2} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update). test_upsert_many1_replaces_existing(Config) -> erase_last(Config), @@ -641,10 +639,9 @@ test_upsert_many1_replaces_existing(Config) -> Update1 = [0, <<>>], Insert2 = [<<"localhost">>, <<"kate">>, 10, <<>>], Update2 = [10, <<>>], - Key = [], %% Replace returns wrong numbers with MySQL (2 instead of 1, 4 instead of 2) - {updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update1, Key), - {updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert2, Update2, Key), + {updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update1), + {updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert2, Update2), SelectResult = sql_query(Config, <<"SELECT seconds FROM last">>), ?assertEqual({selected, [{<<"10">>}]}, selected_to_binary(SelectResult)). @@ -660,10 +657,9 @@ test_upsert_many2_replaces_existing(Config) -> Insert4 = [<<"localhost">>, <<"bob">>, 10, <<>>], Update1 = [0, <<>>], Update3 = [10, <<>>], - Key = [], %% Records keys must be unique (i.e. we cannot insert alice twice) - {updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update1, Key), - {updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert3 ++ Insert4, Update3, Key), + {updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update1), + {updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert3 ++ Insert4, Update3), SelectResult = sql_query(Config, <<"SELECT seconds FROM last">>), ?assertEqual({selected, [{<<"10">>}, {<<"10">>}]}, selected_to_binary(SelectResult)). @@ -738,9 +734,9 @@ sql_execute_upsert(Config, Name, Insert, Update, Unique) -> ScopeAndTag = scope_and_tag(Config), slow_rpc(rdbms_queries, execute_upsert, ScopeAndTag ++ [Name, Insert, Update, Unique]). -sql_execute_upsert_many(Config, Name, Insert, Update, Unique) -> +sql_execute_upsert_many(Config, Name, Insert, Update) -> ScopeAndTag = scope_and_tag(Config), - slow_rpc(rdbms_queries, execute_upsert_many, ScopeAndTag ++ [Name, Insert, Update, Unique]). + slow_rpc(rdbms_queries, execute_upsert_many, ScopeAndTag ++ [Name, Insert, Update]). sql_query_request(Config, Query) -> ScopeAndTag = scope_and_tag(Config), diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index ef04cf2f176..bcbddeefcc5 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -127,7 +127,6 @@ sessions_cleanup(HostType, Sessions) -> %% with the same name but different resources) Records2 = lists:usort(Records), UpdateParams = [Seconds, <<>>], - UniqueKeyValues = [], {Singles, Many100} = bucketize(100, Records2), {Singles2, Many10} = bucketize(10, Singles), %% Prepare data for queries @@ -137,11 +136,11 @@ sessions_cleanup(HostType, Sessions) -> [{1, last_upsert, Rec} || Rec <- Singles2], RunTask = fun({1, QueryName, InsertParams}) -> {updated, _} = rdbms_queries:execute_upsert(HostType, QueryName, - InsertParams, UpdateParams, UniqueKeyValues); + InsertParams, UpdateParams, []); ({_Count1, QueryName, InsertParams}) -> %% MySQL replace returns wrong numbers {updated, _Count2} = rdbms_queries:execute_upsert_many(HostType, QueryName, - InsertParams, UpdateParams, UniqueKeyValues) + InsertParams, UpdateParams) end, %% Run tasks in parallel RunTasks = fun(TasksList) -> lists:map(RunTask, TasksList) end, diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index 24acbc29221..55daee6718d 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -45,11 +45,12 @@ prepare_upsert/7, prepare_upsert_many/7, execute_upsert/5, execute_upsert/6, - execute_upsert_many/5, execute_upsert_many/6, + execute_upsert_many/4, execute_upsert_many/5, request_upsert/5]). -ignore_xref([ execute_upsert/6, count_records_where/3, + execute_upsert_many/5, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 ]). @@ -113,18 +114,16 @@ execute_upsert(HostType, PoolTag, Name, InsertParams, UpdateParams, UniqueKeyVal -spec execute_upsert_many(HostType :: mongooseim:host_type_or_global(), Name :: atom(), InsertParams :: [any()], - UpdateParams :: [any()], - UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). -execute_upsert_many(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) -> - execute_upsert_many(HostType, default, Name, InsertParams, UpdateParams, UniqueKeyValues). + UpdateParams :: [any()]) -> mongoose_rdbms:query_result(). +execute_upsert_many(HostType, Name, InsertParams, UpdateParams) -> + execute_upsert_many(HostType, default, Name, InsertParams, UpdateParams). -spec execute_upsert_many(HostType :: mongooseim:host_type_or_global(), PoolTag :: mongoose_wpool:tag(), Name :: atom(), InsertParams :: [any()], - UpdateParams :: [any()], - UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). -execute_upsert_many(HostType, PoolTag, Name, InsertParams, UpdateParams, UniqueKeyValues) -> + UpdateParams :: [any()]) -> mongoose_rdbms:query_result(). +execute_upsert_many(HostType, PoolTag, Name, InsertParams, UpdateParams) -> case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of {mysql, _} -> mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams); @@ -187,7 +186,7 @@ prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> _ -> InsertFields ++ UpdateFields end. -prepared_upsert_many_fields(RecordCount, InsertFields, Updates, UniqueKeyFields) -> +prepared_upsert_many_fields(RecordCount, InsertFields, Updates, _UniqueKeyFields) -> InsertFieldsMany = lists:append(lists:duplicate(RecordCount, InsertFields)), UpdateFields = lists:filtermap(fun get_field_name/1, Updates), case mongoose_rdbms:db_type() of From 80fa4f49705fb3494c3886ea654b40928ef5067b Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 10 May 2024 13:52:55 +0200 Subject: [PATCH 14/19] Cleanup user3 user in sessions_cleanup testcase --- big_tests/tests/last_SUITE.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/big_tests/tests/last_SUITE.erl b/big_tests/tests/last_SUITE.erl index f69ec16bad4..745376a0962 100644 --- a/big_tests/tests/last_SUITE.erl +++ b/big_tests/tests/last_SUITE.erl @@ -197,7 +197,8 @@ sessions_cleanup(Config) -> {ok, #{timestamp := TS, status := Status} = Data} = distributed_helper:rpc(N, mod_last_api, get_last, [Jid3]), ?assertNotEqual(TS, 1714000000, Data), ?assertEqual(Status, <<>>, Data), - distributed_helper:rpc(N, mongoose_metrics, update, [HostType, sessionCount, -345]). + distributed_helper:rpc(N, mongoose_metrics, update, [HostType, sessionCount, -345]), + {ok, _} = distributed_helper:rpc(N, mongoose_account_api, unregister_user, [<<"user3">>, Server]). %%----------------------------------------------------------------- From 29775e9974de78fb2ebbba293cceaeb4efb0d885 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 May 2024 16:51:35 +0200 Subject: [PATCH 15/19] Simplify sessions_cleanup Add run_upsert function --- src/mod_last_rdbms.erl | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index bcbddeefcc5..ea8fe8d3fe6 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -130,27 +130,29 @@ sessions_cleanup(HostType, Sessions) -> {Singles, Many100} = bucketize(100, Records2), {Singles2, Many10} = bucketize(10, Singles), %% Prepare data for queries - Tasks = + AllTasks = [{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++ [{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++ [{1, last_upsert, Rec} || Rec <- Singles2], - RunTask = fun({1, QueryName, InsertParams}) -> - {updated, _} = rdbms_queries:execute_upsert(HostType, QueryName, - InsertParams, UpdateParams, []); - ({_Count1, QueryName, InsertParams}) -> - %% MySQL replace returns wrong numbers - {updated, _Count2} = rdbms_queries:execute_upsert_many(HostType, QueryName, - InsertParams, UpdateParams) - end, + RunTask = fun({Count, QueryName, InsertParams}) -> + run_upsert(HostType, Count, QueryName, InsertParams, UpdateParams) + end, %% Run tasks in parallel - RunTasks = fun(TasksList) -> lists:map(RunTask, TasksList) end, + RunTasks = fun(Tasks) -> lists:map(RunTask, Tasks) end, Workers = 8, - BatchSize = length(Tasks) div Workers, - Batches = spread(BatchSize, Tasks), - Results = mongoose_lib:pmap(RunTasks, Batches, timer:minutes(1)), + TasksForWorkers = spread(Workers, AllTasks), + Results = mongoose_lib:pmap(RunTasks, TasksForWorkers, timer:minutes(1)), [check_result(Res) || Res <- Results], ok. +run_upsert(HostType, 1, QueryName, InsertParams, UpdateParams) -> + {updated, _} = rdbms_queries:execute_upsert(HostType, QueryName, + InsertParams, UpdateParams, []); +run_upsert(HostType, _Count, QueryName, InsertParams, UpdateParams) -> + %% MySQL replace returns wrong numbers + {updated, _} = rdbms_queries:execute_upsert_many(HostType, QueryName, + InsertParams, UpdateParams). + check_result({ok, Results}) -> lists:foreach(fun({updated, _}) -> ok end, Results), ok; @@ -171,6 +173,7 @@ bucketize(N, Records, Acc) -> {Records, lists:reverse(Acc)} end. +%% Create N chunks %% Spread elements into buckets one element at a time before moving to the next bucket spread(N, Tasks) -> Buckets = lists:duplicate(N, []), From e25fee6cf1eedd725f4ea21176c452c069447ae5 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 May 2024 16:57:48 +0200 Subject: [PATCH 16/19] Extract code into run_tasks_in_parallel helper --- src/mod_last_rdbms.erl | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index ea8fe8d3fe6..2323b283dac 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -123,25 +123,28 @@ sessions_cleanup(HostType, Sessions) -> Seconds = erlang:system_time(second), %% server, username, seconds, state Records = [[S, U, Seconds, <<>>] || #session{usr = {U, S, _}} <- Sessions], + UpdateParams = [Seconds, <<>>], + AllTasks = prepare_cleanup_tasks(Records), + RunTaskF = fun({Count, QueryName, InsertParams}) -> + run_upsert(HostType, Count, QueryName, InsertParams, UpdateParams) + end, + run_tasks_in_parallel(RunTaskF, AllTasks). + +prepare_cleanup_tasks(Records) -> %% PgSQL would complain if there are duplicates (i.e. when there are two sessions %% with the same name but different resources) Records2 = lists:usort(Records), - UpdateParams = [Seconds, <<>>], {Singles, Many100} = bucketize(100, Records2), {Singles2, Many10} = bucketize(10, Singles), - %% Prepare data for queries - AllTasks = - [{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++ - [{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++ - [{1, last_upsert, Rec} || Rec <- Singles2], - RunTask = fun({Count, QueryName, InsertParams}) -> - run_upsert(HostType, Count, QueryName, InsertParams, UpdateParams) - end, - %% Run tasks in parallel - RunTasks = fun(Tasks) -> lists:map(RunTask, Tasks) end, + [{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++ + [{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++ + [{1, last_upsert, Rec} || Rec <- Singles2]. + +run_tasks_in_parallel(RunTaskF, AllTasks) -> Workers = 8, TasksForWorkers = spread(Workers, AllTasks), - Results = mongoose_lib:pmap(RunTasks, TasksForWorkers, timer:minutes(1)), + RunTasksF = fun(Tasks) -> lists:map(RunTaskF, Tasks) end, + Results = mongoose_lib:pmap(RunTasksF, TasksForWorkers, timer:minutes(1)), [check_result(Res) || Res <- Results], ok. From 1c145670b10ecf81bd1ff6ba7d305e688e6fdf74 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 May 2024 16:59:52 +0200 Subject: [PATCH 17/19] Apply review comments to mod_last_rdbms --- src/mod_last_rdbms.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index 2323b283dac..e614c14daee 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -157,8 +157,7 @@ run_upsert(HostType, _Count, QueryName, InsertParams, UpdateParams) -> InsertParams, UpdateParams). check_result({ok, Results}) -> - lists:foreach(fun({updated, _}) -> ok end, Results), - ok; + lists:foreach(fun({updated, _}) -> ok end, Results); check_result({error, Reason}) -> ?LOG_ERROR(#{what => session_cleanup_failed, reason => Reason}). @@ -173,7 +172,7 @@ bucketize(N, Records, Acc) -> {Batch, Records2} -> bucketize(N, Records2, [Batch | Acc]) catch error:badarg -> - {Records, lists:reverse(Acc)} + {Records, lists:reverse(Acc)} end. %% Create N chunks @@ -185,7 +184,7 @@ spread(N, Tasks) -> spread([Task | Tasks], [Bucket | Buckets], Acc) -> spread(Tasks, Buckets, [[Task | Bucket] | Acc]); spread([], Buckets, Acc) -> - Acc ++ lists:reverse(Buckets); + Acc ++ lists:reverse(Buckets); spread(Tasks, [], Acc) -> spread(Tasks, lists:reverse(Acc), []). From f8f26af1fd2d5c9ad64a7f46fdaf27ff6ad0f80b Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 May 2024 17:06:39 +0200 Subject: [PATCH 18/19] Reuse mysql_and_pgsql_insert_many function --- src/rdbms/rdbms_queries.erl | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index 55daee6718d..7affbb86b9a 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -183,7 +183,8 @@ prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> case mongoose_rdbms:db_type() of mssql -> UniqueKeyFields ++ InsertFields ++ UpdateFields; - _ -> InsertFields ++ UpdateFields + _ -> + InsertFields ++ UpdateFields end. prepared_upsert_many_fields(RecordCount, InsertFields, Updates, _UniqueKeyFields) -> @@ -192,7 +193,8 @@ prepared_upsert_many_fields(RecordCount, InsertFields, Updates, _UniqueKeyFields case mongoose_rdbms:db_type() of mssql -> InsertFieldsMany; - _ -> InsertFieldsMany ++ UpdateFields + _ -> + InsertFieldsMany ++ UpdateFields end. -spec prepare_upsert_many(HostType :: mongooseim:host_type_or_global(), @@ -232,16 +234,7 @@ upsert_query_many(HostType, RecordCount, Table, InsertFields, Updates, UniqueKey NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) end. -mysql_and_pgsql_insert(Table, Columns) -> - JoinedFields = join(Columns, <<", ">>), - Placeholders = lists:duplicate(length(Columns), $?), - ["INSERT INTO ", atom_to_binary(Table, utf8), " (", - JoinedFields, - ") VALUES (", - join(Placeholders, ", "), - ")"]. - -pgsql_insert_many(RecordCount, Table, Columns) -> +mysql_and_pgsql_insert_many(RecordCount, Table, Columns) -> JoinedFields = join(Columns, <<", ">>), Placeholders = lists:duplicate(length(Columns), $?), Values = ["(", join(Placeholders, ", "), ")"], @@ -250,12 +243,12 @@ pgsql_insert_many(RecordCount, Table, Columns) -> JoinedFields, ") VALUES ", ManyValues]. upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) -> - Insert = mysql_and_pgsql_insert(Table, InsertFields), + Insert = mysql_and_pgsql_insert_many(1, Table, InsertFields), OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField), [Insert, OnConflict]. upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> - Insert = mysql_and_pgsql_insert(Table, InsertFields), + Insert = mysql_and_pgsql_insert_many(1, Table, InsertFields), OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), WhereIncrements = pgsql_ensure_increments(Table, IncrementalField), [Insert, OnConflict, WhereIncrements]. @@ -270,7 +263,7 @@ upsert_many_mysql_query(RecordCount, Table, InsertFields) -> " VALUES ", ManyValues]. upsert_many_pgsql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFields) -> - Insert = pgsql_insert_many(RecordCount, Table, InsertFields), + Insert = mysql_and_pgsql_insert_many(RecordCount, Table, InsertFields), OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), [Insert, OnConflict]. @@ -350,7 +343,6 @@ upsert_many_mssql_query(RecordCount, Table, InsertFields, Updates, UniqueKeyFiel tgt_equals_new(Updates) -> join([["tgt.", ColumnName, "=new.", ColumnName] || ColumnName <- Updates], ", "). - mssql_on_conflict([]) -> ";"; mssql_on_conflict(Updates) -> [" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"]. From 3ca7a60b6c822910f74dd0c92e9e4b296c0f6114 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 May 2024 17:54:45 +0200 Subject: [PATCH 19/19] Apply changes for MSSQL --- src/mod_last_rdbms.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/mod_last_rdbms.erl b/src/mod_last_rdbms.erl index e614c14daee..3e5ebc8912b 100644 --- a/src/mod_last_rdbms.erl +++ b/src/mod_last_rdbms.erl @@ -141,16 +141,18 @@ prepare_cleanup_tasks(Records) -> [{1, last_upsert, Rec} || Rec <- Singles2]. run_tasks_in_parallel(RunTaskF, AllTasks) -> - Workers = 8, + %% MSSQL fails with "Transaction (Process ID 52) was deadlocked on lock resources with + %% another process and has been chosen as the deadlock victim. Rerun the transaction" + Workers = case mongoose_rdbms:db_type() of mssql -> 1; _ -> 8 end, TasksForWorkers = spread(Workers, AllTasks), RunTasksF = fun(Tasks) -> lists:map(RunTaskF, Tasks) end, Results = mongoose_lib:pmap(RunTasksF, TasksForWorkers, timer:minutes(1)), [check_result(Res) || Res <- Results], ok. -run_upsert(HostType, 1, QueryName, InsertParams, UpdateParams) -> +run_upsert(HostType, 1, QueryName, InsertParams = [S, U|_], UpdateParams) -> {updated, _} = rdbms_queries:execute_upsert(HostType, QueryName, - InsertParams, UpdateParams, []); + InsertParams, UpdateParams, [S, U]); run_upsert(HostType, _Count, QueryName, InsertParams, UpdateParams) -> %% MySQL replace returns wrong numbers {updated, _} = rdbms_queries:execute_upsert_many(HostType, QueryName,