diff --git a/Makefile b/Makefile index ab6864dc0e1..02d5dd817b5 100644 --- a/Makefile +++ b/Makefile @@ -46,8 +46,9 @@ ct: deps quick_compile > $(LOG_SILENCE_COVER) # This compiles and runs one test suite. For quick feedback/TDD. +# (strip _SUITE from test name, it is added automatically!) # Example: -# $ make qct SUITE=amp_resolver_SUITE +# $ make qct SUITE=amp_resolver qct: mkdir -p /tmp/ct_log @if [ "$(SUITE)" ]; then ct_run -pa apps/*/ebin -pa deps/*/ebin -pa ebin -dir apps/*/test\ diff --git a/apps/ejabberd/src/ejabberd.erl b/apps/ejabberd/src/ejabberd.erl index fe3ce33a3e3..8faa9f8e3ff 100644 --- a/apps/ejabberd/src/ejabberd.erl +++ b/apps/ejabberd/src/ejabberd.erl @@ -60,10 +60,11 @@ %% Incoming event from XML stream. Used everywhere in xmlstream fsm modules -type xml_stream_item() :: 'closed' + | 'stop' | 'timeout' | {'xmlstreamelement', jlib:xmlel()} - | {'xmlstreamend',_} - | {'xmlstreamerror',_} + | {'xmlstreamend', _} + | {'xmlstreamerror', _} | {'xmlstreamstart', Name :: any(), Attrs :: list()}. -export_type([lang/0, diff --git a/apps/ejabberd/src/ejabberd_c2s.erl b/apps/ejabberd/src/ejabberd_c2s.erl index cd99f8f9cf8..0cf41e364d7 100644 --- a/apps/ejabberd/src/ejabberd_c2s.erl +++ b/apps/ejabberd/src/ejabberd_c2s.erl @@ -209,6 +209,7 @@ init([{SockMod, Socket}, Opts]) -> streamid = new_id(), access = Access, shaper = Shaper, + spamctl_state = undefined, ip = IP}, ?C2S_OPEN_TIMEOUT} end. @@ -249,7 +250,12 @@ handle_stream_start({xmlstreamstart, _Name, Attrs}, #state{} = S0) -> {?NS_STREAM, true} -> change_shaper(S, jid:make(<<>>, Server, <<>>)), Version = xml:get_attr_s(<<"version">>, Attrs), - stream_start_by_protocol_version(Version, S); + SpamCtlState = ejabberd_hooks:run_fold(spamctl_initialise, + Server, + #{host => Server}, + []), + S1 = S#state{spamctl_state = SpamCtlState}, + stream_start_by_protocol_version(Version, S1); {?NS_STREAM, false} -> stream_start_error(?HOST_UNKNOWN_ERR, S); {_InvalidNS, _} -> @@ -907,8 +913,11 @@ session_established({xmlstreamerror, _}, StateData) -> send_element(StateData, ?INVALID_XML_ERR), send_trailer(StateData), {stop, normal, StateData}; +session_established(stop, StateData) -> + send_trailer(StateData), + {stop, normal, StateData}; session_established(closed, StateData) -> - ?DEBUG("Session established closed - trying to enter resume_session",[]), + ?DEBUG("Session established closed - trying to enter resume_session", []), maybe_enter_resume_session(StateData#state.stream_mgmt_id, StateData). %% @doc Process packets sent by user (coming from user on c2s XMPP @@ -937,9 +946,26 @@ process_outgoing_stanza(El, StateData) -> _ -> NewEl1 end, - NewState = process_outgoing_stanza(ToJID, Name, {Attrs, NewEl, FromJID, StateData, Server, User}), - ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, El}]), - fsm_next_state(session_established, NewState). + NewSpamCtlState = ejabberd_hooks:run_fold(spamctl_control, Server, + StateData#state.spamctl_state, + [Name, NewEl]), + StateData1 = StateData#state{spamctl_state = NewSpamCtlState}, + Dec = maps:get(decision, NewSpamCtlState, ok), + case Dec of + % if decision is anything other then ok we stop the message and call hook to + % take appropriate action + ok -> + NewState = process_outgoing_stanza(ToJID, Name, {Attrs, NewEl, FromJID, + StateData1, Server, User}), + ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, El}]), + fsm_next_state(session_established, NewState); + Dec -> + ReactSpamState = ejabberd_hooks:run_fold(spamctl_react, + Server, + NewSpamCtlState, + [Dec, FromJID, ToJID, NewEl]), + fsm_next_state(session_established, StateData1#state{spamctl_state = ReactSpamState}) + end. process_outgoing_stanza(error, _Name, Args) -> @@ -1193,6 +1219,8 @@ handle_info(check_buffer_full, StateName, StateData) -> fsm_next_state(StateName, StateData#state{stream_mgmt_constraint_check_tref = undefined}) end; +handle_info({stop, Reason}, _StateName, StateData) -> + {stop, Reason, StateData}; handle_info(Info, StateName, StateData) -> ?ERROR_MSG("Unexpected info: ~p", [Info]), fsm_next_state(StateName, StateData). @@ -1594,7 +1622,6 @@ send_trailer(StateData) when StateData#state.xml_socket -> send_trailer(StateData) -> send_text(StateData, ?STREAM_TRAILER). - send_and_maybe_buffer_stanza({J1, J2, El}, State, StateName)-> {SendResult, BufferedStateData} = send_and_maybe_buffer_stanza({J1, J2, mod_amp:strip_amp_el_from_request(El)}, State), diff --git a/apps/ejabberd/src/ejabberd_c2s.hrl b/apps/ejabberd/src/ejabberd_c2s.hrl index b107a6b2244..e81c55f079d 100644 --- a/apps/ejabberd/src/ejabberd_c2s.hrl +++ b/apps/ejabberd/src/ejabberd_c2s.hrl @@ -26,6 +26,7 @@ sasl_state, access, shaper, + spamctl_state, zlib = {false, 0} :: {boolean(), integer()}, tls = false :: boolean(), tls_required = false :: boolean(), diff --git a/apps/ejabberd/src/ejabberd_receiver.erl b/apps/ejabberd/src/ejabberd_receiver.erl index e49732ef09d..e36d125d02d 100644 --- a/apps/ejabberd/src/ejabberd_receiver.erl +++ b/apps/ejabberd/src/ejabberd_receiver.erl @@ -358,8 +358,10 @@ replace_too_big_elems_with_stream_error(Elems, MaxSize) -> maybe_pause(_, #state{c2s_pid = undefined}) -> ok; maybe_pause(Pause, _State) when Pause > 0 -> +%% ?ERROR_MSG("Pause: ~p~n", [Pause]), erlang:start_timer(Pause, self(), activate); maybe_pause(_, State) -> +%% ?ERROR_MSG("KeepOn~n", []), activate_socket(State). maybe_run_keep_alive_hook(Size, #state{c2s_pid = C2SPid}) diff --git a/apps/ejabberd/src/mod_blocking.erl b/apps/ejabberd/src/mod_blocking.erl index 6a755be3b66..896012d8eef 100644 --- a/apps/ejabberd/src/mod_blocking.erl +++ b/apps/ejabberd/src/mod_blocking.erl @@ -10,6 +10,7 @@ -xep([{xep, 191}, {version, "1.2"}]). -behaviour(gen_mod). + -export([start/2, process_iq_get/5, process_iq_set/4, diff --git a/apps/ejabberd/src/mod_spamctl.erl b/apps/ejabberd/src/mod_spamctl.erl new file mode 100644 index 00000000000..ad99cd7cb2f --- /dev/null +++ b/apps/ejabberd/src/mod_spamctl.erl @@ -0,0 +1,152 @@ +%%%------------------------------------------------------------------- +%%% @author bartek +%%% @copyright (C) 2016, +%%% @doc +%%% +%%% @end +%%% Created : 23. Oct 2016 11:19 +%%%------------------------------------------------------------------- +-module(mod_spamctl). +-author("bartek"). +-behaviour(gen_mod). + +%% API +-export([start/2, stop/1, initialise/1, control/3]). +-export([cutoff/5, notify_offender/5, notify_admin/5]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). + +-define(DEFAULT_POOL_NAME, http_pool). +-define(DEFAULT_PATH, ""). + +-type state() :: map(). % whatever you want it to be +-type decision() :: atom(). + +start(Host, _Opts) -> + ejabberd_hooks:add(spamctl_initialise, Host, + ?MODULE, initialise, 50), + ejabberd_hooks:add(spamctl_control, Host, + ?MODULE, control, 50), + ejabberd_hooks:add(spamctl_react, Host, + ?MODULE, notify_offender, 30), + ejabberd_hooks:add(spamctl_react, Host, + ?MODULE, notify_admin, 40), + ejabberd_hooks:add(spamctl_react, Host, + ?MODULE, cutoff, 50), + ok. + +stop(Host) -> + ejabberd_hooks:delete(spamctl_initialise, Host, + ?MODULE, initialise, 50), + ejabberd_hooks:delete(spamctl_control, Host, + ?MODULE, control, 50), + ejabberd_hooks:delete(spamctl_react, Host, + ?MODULE, notify_offender, 30), + ejabberd_hooks:delete(spamctl_react, Host, + ?MODULE, notify_admin, 40), + ejabberd_hooks:delete(spamctl_react, Host, + ?MODULE, cutoff, 50), + ok. + +%% @doc Triggered by `spamctl_initialise` when stream is started, returns initial spamcontrol state. +-spec initialise(map()) -> state(). +initialise(#{host := Host} = State) -> + MaxRate = gen_mod:get_module_opt(Host, ?MODULE, maxrate, 10), + Span = gen_mod:get_module_opt(Host, ?MODULE, span, 2), + ModOpt = #{maxrate => MaxRate, span => Span}, + % merge config options into accumulator + NState = maps:merge(State, ModOpt), + % add working values used by this module + ModState = #{rate => 0, + decision => ok, + lasttime => usec:from_now(os:timestamp())}, + maps:merge(NState, ModState). + +%% @doc Triggered by `spamctl_control` every time the user sends a stanza; returns a +%% modified spamcontrol state. The state MUST contain key `decision` which normally is +%% `ok` - anything else means that the user violated spamcontrol rules and tells c2s +%% to run `spamctl_react` hook. +-spec control(state(), binary(), xmlel()) -> state(). +control(State, Name, M) -> + NState = check_msg(Name, M, State), + case NState of + #{decision := excess} -> + {stop, NState}; + _ -> + NState + end. + +check_msg(<<"message">>, M, State) -> + Now = usec:from_now(os:timestamp()), + Span = maps:get(span, State) * 1000000, + Lasttime = maps:get(lasttime, State), + Cycled = Now - Lasttime > Span, + check_msg(M, State, Now, Cycled); +check_msg(_, _, State) -> + State. + +check_msg(_M, State, Now, true) -> + set_decision(ok, Now, State); +check_msg(M, State, Now, false) -> + NRate = maps:get(rate, State) + 1, + check_msg_rate(M, Now, State#{rate => NRate}). + +check_msg_rate(_M, Now, #{maxrate := Max, rate := Rate} = State) when Rate > Max -> + set_decision(excess, Now, State); +check_msg_rate(_M, _Now, State) -> + State. + +set_decision(Dec, Now, State) -> + State#{decision => Dec, lasttime => Now, rate => 0}. + +%% @doc if a msg is determined to be spam, for any reason, we call this +%% to terminate the user connection +-spec cutoff(state(), decision(), jid(), jid(), xmlel()) -> state(). +cutoff(State, excess, _From, _To, _Msg) -> + p1_fsm_old:send_event(self(), stop), + State; +cutoff(State, _, _, _, _) -> + State. + +%% @doc if a msg is determined to be spam, for any reason, we call this to notify +%% the user he is not welcome +-spec notify_offender(state(), decision(), jid(), jid(), xmlel()) -> state(). +notify_offender(State, excess, From, To, Msg) -> + send_back_error(?ERR_NOT_ACCEPTABLE, From, To, Msg), + State; +notify_offender(State, _, _, _, _) -> + State. + +send_back_error(Etype, From, To, Packet) -> + Err = jlib:make_error_reply(Packet, Etype), + ejabberd_router:route(To, From, Err). + +%% @doc if a msg is determined to be spam, for any reason, we call this to notify +%% some external service about what happened +-spec notify_admin(state(), decision(), jid(), jid(), xmlel()) -> state(). +notify_admin(State, excess, From, _To, _Msg) -> + C = jid:to_binary({From#jid.user, From#jid.server}), + send_http_notification(maps:get(host, State), C, <<"exceeded message limit">>), + State. + +send_http_notification(Host, Culprit, Body) -> + Path = fix_path(list_to_binary(gen_mod:get_module_opt(Host, ?MODULE, path, ?DEFAULT_PATH))), + PoolName = gen_mod:get_module_opt(Host, ?MODULE, pool_name, ?DEFAULT_POOL_NAME), + Pool = mongoose_http_client:get_pool(PoolName), + Query = <<"culprit=", Culprit/binary, "&message=", Body/binary>>, + ?INFO_MSG("Making request '~p' for user ~s@~s...", [Path, Culprit, Host]), + Headers = [{<<"Content-Type">>, <<"application/x-www-form-urlencoded">>}], + case mongoose_http_client:post(Pool, Path, Headers, Query) of + {ok, _} -> + ok; + {error, E} -> + ?ERROR_MSG("Failed to record spam policy violation (~p):~nOffender: ~p, message: ~p~n", + [E, Culprit, Body]) + end. + + +fix_path(<<"/", R/binary>>) -> + R; +fix_path(R) -> + R. diff --git a/apps/ejabberd/src/shaper_srv.erl b/apps/ejabberd/src/shaper_srv.erl index 4f4b10f470e..ca78e70993d 100644 --- a/apps/ejabberd/src/shaper_srv.erl +++ b/apps/ejabberd/src/shaper_srv.erl @@ -74,7 +74,8 @@ worker_count(_Host) -> -spec worker_names(ejabberd:server()) -> [atom()]. worker_names(Host) -> - [worker_name(Host, N) || N <- lists:seq(0, worker_count(Host) - 1)]. + Wn = [worker_name(Host, N) || N <- lists:seq(0, worker_count(Host) - 1)], + Wn. -spec worker_name(ejabberd:server(), integer()) -> atom(). diff --git a/apps/ejabberd/test/spamctl_SUITE.erl b/apps/ejabberd/test/spamctl_SUITE.erl new file mode 100644 index 00000000000..cf2d04a8198 --- /dev/null +++ b/apps/ejabberd/test/spamctl_SUITE.erl @@ -0,0 +1,56 @@ +-module(spamctl_SUITE). +-compile([export_all]). + +-include_lib("exml/include/exml.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +all() -> + [ burst_ctl ]. + +init_per_suite(C) -> + application:ensure_all_started(lager), + C. + +end_per_suite(_C) -> + ok. + +burst_ctl(_C) -> + % initialise with 10 msgs over 2 seconds + State = #{maxrate => 10, + span => 2, + rate => 0, + decision => ok, + lasttime => now_to_usec()}, + State1 = proc_msgs(State, 8), + State2 = proc_msgs(State1, 1), + #{decision := ok} = State2, + State3 = proc_msgs(State2, 1), + #{decision := ok} = State3, + State4 = proc_msgs(State3, 1), + #{decision := excess} = State4, + State5 = proc_msgs(State4, 1), + #{decision := excess} = State5, + timer:sleep(2100), + State7 = proc_msgs(State5, 8), + #{decision := ok} = State7, + ok. + +proc_msgs(State, 0) -> + State; +proc_msgs(State, Y) -> + M = #xmlel{name = <<"message">>, + attrs = [{<<"type">>, <<"chat">>}, {<<"to">>, <<"bob37.76184@localhost">>}], + children = [#xmlel{name = <<"body">>, + attrs = [], + children = [{xmlcdata, <>}]}]}, + NState = case mod_spamctl:control(State, <<"message">>, [M]) of + {stop, S} -> S; + S -> S + end, + proc_msgs(NState, Y - 1). + + +now_to_usec() -> + {MSec, Sec, USec} = os:timestamp(), + (MSec * 1000000 + Sec) * 1000000 + USec. diff --git a/rel/files/ejabberd.cfg b/rel/files/ejabberd.cfg index 20e7b0e1714..4480d797bf7 100755 --- a/rel/files/ejabberd.cfg +++ b/rel/files/ejabberd.cfg @@ -764,6 +764,11 @@ {mod_bosh, []}, {mod_carboncopy, []} + %% anti-spam subsystem (requires http_connections activated) + %% {mod_spamctl, [{maxrate, 10}, + %% {span, 2} + %% {pool_name, conn1}]}, + %% %% Message Archive Management (MAM, XEP-0313) for registered users and %% Multi-User chats (MUCs). diff --git a/test/ejabberd_tests/tests/spam_throttle_SUITE.erl b/test/ejabberd_tests/tests/spam_throttle_SUITE.erl new file mode 100644 index 00000000000..dbdb520f766 --- /dev/null +++ b/test/ejabberd_tests/tests/spam_throttle_SUITE.erl @@ -0,0 +1,138 @@ +%%============================================================================== +%% Copyright 2010 Erlang Solutions Ltd. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%============================================================================== + +-module(spam_throttle_SUITE). +-compile(export_all). + +-include_lib("exml/include/exml.hrl"). +-include_lib("escalus/include/escalus.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-define(NS_BLOCKING, <<"urn:xmpp:blocking">>). + +-define(SLEEP_TIME, 50). + +%%-------------------------------------------------------------------- +%% Suite configuration +%%-------------------------------------------------------------------- + +all() -> + [ + {group, throttle} + ]. + +groups() -> + [ + {throttle, [sequence], throttle_test_cases()} + ]. + +throttle_test_cases() -> + [ + sendtoomuch + ]. + +suite() -> + escalus:suite(). + +%%-------------------------------------------------------------------- +%% Init & teardown +%%-------------------------------------------------------------------- + +init_per_suite(Config) -> + escalus:init_per_suite(Config). +%% [{escalus_no_stanzas_after_story, true} | +%% escalus:init_per_suite(Config)]. + +end_per_suite(Config) -> + escalus_fresh:clean(), + escalus:end_per_suite(Config). + +init_per_group(_GroupName, Config) -> + set_modules([{maxrate, 10}, {span, 2}]), + escalus:create_users(Config, escalus:get_users([alice, bob])). + +end_per_group(_GroupName, Config) -> + dynamic_modules:stop(host(), mod_http_notification), + ejabberd_node_utils:call_fun(mongoose_http_client, stop_pool, [http_pool]), + ejabberd_node_utils:call_fun(mongoose_http_client, stop, []), + escalus:delete_users(Config, escalus:get_users([alice, bob])). + +init_per_testcase(CaseName, Config) -> + escalus:init_per_testcase(CaseName, Config). + +end_per_testcase(CaseName, Config) -> + escalus:end_per_testcase(CaseName, Config). + +set_modules(Opts) -> + ejabberd_node_utils:call_fun(mongoose_http_client, start, [[]]), + ejabberd_node_utils:call_fun(mongoose_http_client, + start_pool, + [http_pool, [{server, "http://localhost:8000"}]]), + dynamic_modules:start(host(), mod_spamctl, Opts), + ok. + +host() -> <<"localhost">>. + +%%-------------------------------------------------------------------- +%% Tests +%%-------------------------------------------------------------------- + +sendtoomuch(Config) -> + escalus:story( + Config, [{alice, 1}, {bob, 1}], + fun(Alice, Bob) -> + Send = fun(I) -> message_is_delivered(Alice, + Bob, + list_to_binary(integer_to_list(I))) + end, + lists:map(Send, lists:seq(1, 10)), + % this msg is not delivered because it exceeded spamctl limit + message_is_not_delivered(Alice, [Bob], <<"11">>), + Res = escalus:wait_for_stanza(Alice), + escalus_assert:is_error(Res, <<"modify">>, <<"not-acceptable">>), + % and nothing more would be delivered because spamctl terminated connection + timer:sleep(1000), + message_is_not_delivered(Alice, [Bob], <<"12">>), + timer:sleep(2000), + message_is_not_delivered(Alice, [Bob], <<"13">>), + ok + end). + +%%-------------------------------------------------------------------- +%% Utils +%%-------------------------------------------------------------------- + +message_is_delivered(From, [To|_] = Tos, MessageText) -> + BareTo = escalus_utils:jid_to_lower(escalus_client:short_jid(To)), + escalus:send(From, escalus_stanza:chat_to(BareTo, MessageText)), + [ escalus:assert(is_chat_message, [MessageText], escalus:wait_for_stanza(C)) || + C <- Tos ]; +message_is_delivered(From, To, MessageText) -> + BareTo = escalus_utils:jid_to_lower(escalus_client:short_jid(To)), + escalus:send(From, escalus_stanza:chat_to(BareTo, MessageText)), + escalus:assert(is_chat_message, [MessageText], escalus:wait_for_stanza(To)). + + +message_is_not_delivered(From, [To|_] = Tos, MessageText) -> + BareTo = escalus_utils:jid_to_lower(escalus_client:short_jid(To)), + escalus:send(From, escalus_stanza:chat_to(BareTo, MessageText)), + timer:sleep(300), + clients_have_no_messages(Tos). + +clients_have_no_messages(Cs) when is_list (Cs) -> [ client_has_no_messages(C) || C <- Cs ]. + +client_has_no_messages(C) -> escalus_assert:has_no_stanzas(C). +