Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spam control #1123

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ ct: deps quick_compile
> $(LOG_SILENCE_COVER)

# This compiles and runs one test suite. For quick feedback/TDD.
# (strip _SUITE from test name, it is added automatically!)
# Example:
# $ make qct SUITE=amp_resolver_SUITE
# $ make qct SUITE=amp_resolver
qct:
mkdir -p /tmp/ct_log
@if [ "$(SUITE)" ]; then ct_run -pa apps/*/ebin -pa deps/*/ebin -pa ebin -dir apps/*/test\
Expand Down
5 changes: 3 additions & 2 deletions apps/ejabberd/src/ejabberd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@

%% Incoming event from XML stream. Used everywhere in xmlstream fsm modules
-type xml_stream_item() :: 'closed'
| 'stop'
| 'timeout'
| {'xmlstreamelement', jlib:xmlel()}
| {'xmlstreamend',_}
| {'xmlstreamerror',_}
| {'xmlstreamend', _}
| {'xmlstreamerror', _}
| {'xmlstreamstart', Name :: any(), Attrs :: list()}.

-export_type([lang/0,
Expand Down
39 changes: 33 additions & 6 deletions apps/ejabberd/src/ejabberd_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ init([{SockMod, Socket}, Opts]) ->
streamid = new_id(),
access = Access,
shaper = Shaper,
spamctl_state = undefined,
ip = IP},
?C2S_OPEN_TIMEOUT}
end.
Expand Down Expand Up @@ -249,7 +250,12 @@ handle_stream_start({xmlstreamstart, _Name, Attrs}, #state{} = S0) ->
{?NS_STREAM, true} ->
change_shaper(S, jid:make(<<>>, Server, <<>>)),
Version = xml:get_attr_s(<<"version">>, Attrs),
stream_start_by_protocol_version(Version, S);
SpamCtlState = ejabberd_hooks:run_fold(spamctl_initialise,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These spamctl_* set of hooks are quite powerful. How about passing whole c2s state as an arg to the initialise hook handler so the implementation may choose what to do with it? Other solution would be passing the state to all the hooks. I can imagine that some clever logic (rewording or stopping the user) may need some of the fields which are in c2s state.

Server,
#{host => Server},
[]),
S1 = S#state{spamctl_state = SpamCtlState},
stream_start_by_protocol_version(Version, S1);
{?NS_STREAM, false} ->
stream_start_error(?HOST_UNKNOWN_ERR, S);
{_InvalidNS, _} ->
Expand Down Expand Up @@ -907,8 +913,11 @@ session_established({xmlstreamerror, _}, StateData) ->
send_element(StateData, ?INVALID_XML_ERR),
send_trailer(StateData),
{stop, normal, StateData};
session_established(stop, StateData) ->
send_trailer(StateData),
{stop, normal, StateData};
session_established(closed, StateData) ->
?DEBUG("Session established closed - trying to enter resume_session",[]),
?DEBUG("Session established closed - trying to enter resume_session", []),
maybe_enter_resume_session(StateData#state.stream_mgmt_id, StateData).

%% @doc Process packets sent by user (coming from user on c2s XMPP
Expand Down Expand Up @@ -937,9 +946,26 @@ process_outgoing_stanza(El, StateData) ->
_ ->
NewEl1
end,
NewState = process_outgoing_stanza(ToJID, Name, {Attrs, NewEl, FromJID, StateData, Server, User}),
ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, El}]),
fsm_next_state(session_established, NewState).
NewSpamCtlState = ejabberd_hooks:run_fold(spamctl_control, Server,
StateData#state.spamctl_state,
[Name, NewEl]),
StateData1 = StateData#state{spamctl_state = NewSpamCtlState},
Dec = maps:get(decision, NewSpamCtlState, ok),
case Dec of
% if decision is anything other then ok we stop the message and call hook to
% take appropriate action
ok ->
NewState = process_outgoing_stanza(ToJID, Name, {Attrs, NewEl, FromJID,
StateData1, Server, User}),
ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, El}]),
fsm_next_state(session_established, NewState);
Dec ->
ReactSpamState = ejabberd_hooks:run_fold(spamctl_react,
Server,
NewSpamCtlState,
[Dec, FromJID, ToJID, NewEl]),
fsm_next_state(session_established, StateData1#state{spamctl_state = ReactSpamState})
end.


process_outgoing_stanza(error, _Name, Args) ->
Expand Down Expand Up @@ -1193,6 +1219,8 @@ handle_info(check_buffer_full, StateName, StateData) ->
fsm_next_state(StateName,
StateData#state{stream_mgmt_constraint_check_tref = undefined})
end;
handle_info({stop, Reason}, _StateName, StateData) ->
{stop, Reason, StateData};
handle_info(Info, StateName, StateData) ->
?ERROR_MSG("Unexpected info: ~p", [Info]),
fsm_next_state(StateName, StateData).
Expand Down Expand Up @@ -1594,7 +1622,6 @@ send_trailer(StateData) when StateData#state.xml_socket ->
send_trailer(StateData) ->
send_text(StateData, ?STREAM_TRAILER).


send_and_maybe_buffer_stanza({J1, J2, El}, State, StateName)->
{SendResult, BufferedStateData} =
send_and_maybe_buffer_stanza({J1, J2, mod_amp:strip_amp_el_from_request(El)}, State),
Expand Down
1 change: 1 addition & 0 deletions apps/ejabberd/src/ejabberd_c2s.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
sasl_state,
access,
shaper,
spamctl_state,
zlib = {false, 0} :: {boolean(), integer()},
tls = false :: boolean(),
tls_required = false :: boolean(),
Expand Down
2 changes: 2 additions & 0 deletions apps/ejabberd/src/ejabberd_receiver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,10 @@ replace_too_big_elems_with_stream_error(Elems, MaxSize) ->
maybe_pause(_, #state{c2s_pid = undefined}) ->
ok;
maybe_pause(Pause, _State) when Pause > 0 ->
%% ?ERROR_MSG("Pause: ~p~n", [Pause]),
erlang:start_timer(Pause, self(), activate);
maybe_pause(_, State) ->
%% ?ERROR_MSG("KeepOn~n", []),
activate_socket(State).

maybe_run_keep_alive_hook(Size, #state{c2s_pid = C2SPid})
Expand Down
1 change: 1 addition & 0 deletions apps/ejabberd/src/mod_blocking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

-xep([{xep, 191}, {version, "1.2"}]).
-behaviour(gen_mod).

-export([start/2,
process_iq_get/5,
process_iq_set/4,
Expand Down
152 changes: 152 additions & 0 deletions apps/ejabberd/src/mod_spamctl.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
%%%-------------------------------------------------------------------
%%% @author bartek
%%% @copyright (C) 2016, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 23. Oct 2016 11:19
%%%-------------------------------------------------------------------
-module(mod_spamctl).
-author("bartek").
-behaviour(gen_mod).

%% API
-export([start/2, stop/1, initialise/1, control/3]).
-export([cutoff/5, notify_offender/5, notify_admin/5]).

-include("ejabberd.hrl").
-include("jlib.hrl").

-define(DEFAULT_POOL_NAME, http_pool).
-define(DEFAULT_PATH, "").

-type state() :: map(). % whatever you want it to be
-type decision() :: atom().

start(Host, _Opts) ->
ejabberd_hooks:add(spamctl_initialise, Host,
?MODULE, initialise, 50),
ejabberd_hooks:add(spamctl_control, Host,
?MODULE, control, 50),
ejabberd_hooks:add(spamctl_react, Host,
?MODULE, notify_offender, 30),
ejabberd_hooks:add(spamctl_react, Host,
?MODULE, notify_admin, 40),
ejabberd_hooks:add(spamctl_react, Host,
?MODULE, cutoff, 50),
ok.

stop(Host) ->
ejabberd_hooks:delete(spamctl_initialise, Host,
?MODULE, initialise, 50),
ejabberd_hooks:delete(spamctl_control, Host,
?MODULE, control, 50),
ejabberd_hooks:delete(spamctl_react, Host,
?MODULE, notify_offender, 30),
ejabberd_hooks:delete(spamctl_react, Host,
?MODULE, notify_admin, 40),
ejabberd_hooks:delete(spamctl_react, Host,
?MODULE, cutoff, 50),
ok.

%% @doc Triggered by `spamctl_initialise` when stream is started, returns initial spamcontrol state.
-spec initialise(map()) -> state().
initialise(#{host := Host} = State) ->
MaxRate = gen_mod:get_module_opt(Host, ?MODULE, maxrate, 10),
Span = gen_mod:get_module_opt(Host, ?MODULE, span, 2),
ModOpt = #{maxrate => MaxRate, span => Span},
% merge config options into accumulator
NState = maps:merge(State, ModOpt),
% add working values used by this module
ModState = #{rate => 0,
decision => ok,
lasttime => usec:from_now(os:timestamp())},
maps:merge(NState, ModState).

%% @doc Triggered by `spamctl_control` every time the user sends a stanza; returns a
%% modified spamcontrol state. The state MUST contain key `decision` which normally is
%% `ok` - anything else means that the user violated spamcontrol rules and tells c2s
%% to run `spamctl_react` hook.
-spec control(state(), binary(), xmlel()) -> state().
control(State, Name, M) ->
NState = check_msg(Name, M, State),
case NState of
#{decision := excess} ->
{stop, NState};
_ ->
NState
end.

check_msg(<<"message">>, M, State) ->
Now = usec:from_now(os:timestamp()),
Span = maps:get(span, State) * 1000000,
Lasttime = maps:get(lasttime, State),
Cycled = Now - Lasttime > Span,
check_msg(M, State, Now, Cycled);
check_msg(_, _, State) ->
State.

check_msg(_M, State, Now, true) ->
set_decision(ok, Now, State);
check_msg(M, State, Now, false) ->
NRate = maps:get(rate, State) + 1,
check_msg_rate(M, Now, State#{rate => NRate}).

check_msg_rate(_M, Now, #{maxrate := Max, rate := Rate} = State) when Rate > Max ->
set_decision(excess, Now, State);
check_msg_rate(_M, _Now, State) ->
State.

set_decision(Dec, Now, State) ->
State#{decision => Dec, lasttime => Now, rate => 0}.

%% @doc if a msg is determined to be spam, for any reason, we call this
%% to terminate the user connection
-spec cutoff(state(), decision(), jid(), jid(), xmlel()) -> state().
cutoff(State, excess, _From, _To, _Msg) ->
p1_fsm_old:send_event(self(), stop),
State;
cutoff(State, _, _, _, _) ->
State.

%% @doc if a msg is determined to be spam, for any reason, we call this to notify
%% the user he is not welcome
-spec notify_offender(state(), decision(), jid(), jid(), xmlel()) -> state().
notify_offender(State, excess, From, To, Msg) ->
send_back_error(?ERR_NOT_ACCEPTABLE, From, To, Msg),
State;
notify_offender(State, _, _, _, _) ->
State.

send_back_error(Etype, From, To, Packet) ->
Err = jlib:make_error_reply(Packet, Etype),
ejabberd_router:route(To, From, Err).

%% @doc if a msg is determined to be spam, for any reason, we call this to notify
%% some external service about what happened
-spec notify_admin(state(), decision(), jid(), jid(), xmlel()) -> state().
notify_admin(State, excess, From, _To, _Msg) ->
C = jid:to_binary({From#jid.user, From#jid.server}),
send_http_notification(maps:get(host, State), C, <<"exceeded message limit">>),
State.

send_http_notification(Host, Culprit, Body) ->
Path = fix_path(list_to_binary(gen_mod:get_module_opt(Host, ?MODULE, path, ?DEFAULT_PATH))),
PoolName = gen_mod:get_module_opt(Host, ?MODULE, pool_name, ?DEFAULT_POOL_NAME),
Pool = mongoose_http_client:get_pool(PoolName),
Query = <<"culprit=", Culprit/binary, "&message=", Body/binary>>,
?INFO_MSG("Making request '~p' for user ~s@~s...", [Path, Culprit, Host]),
Headers = [{<<"Content-Type">>, <<"application/x-www-form-urlencoded">>}],
case mongoose_http_client:post(Pool, Path, Headers, Query) of
{ok, _} ->
ok;
{error, E} ->
?ERROR_MSG("Failed to record spam policy violation (~p):~nOffender: ~p, message: ~p~n",
[E, Culprit, Body])
end.


fix_path(<<"/", R/binary>>) ->
R;
fix_path(R) ->
R.
3 changes: 2 additions & 1 deletion apps/ejabberd/src/shaper_srv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ worker_count(_Host) ->

-spec worker_names(ejabberd:server()) -> [atom()].
worker_names(Host) ->
[worker_name(Host, N) || N <- lists:seq(0, worker_count(Host) - 1)].
Wn = [worker_name(Host, N) || N <- lists:seq(0, worker_count(Host) - 1)],
Wn.


-spec worker_name(ejabberd:server(), integer()) -> atom().
Expand Down
56 changes: 56 additions & 0 deletions apps/ejabberd/test/spamctl_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-module(spamctl_SUITE).
-compile([export_all]).

-include_lib("exml/include/exml.hrl").
-include_lib("eunit/include/eunit.hrl").


all() ->
[ burst_ctl ].

init_per_suite(C) ->
application:ensure_all_started(lager),
C.

end_per_suite(_C) ->
ok.

burst_ctl(_C) ->
% initialise with 10 msgs over 2 seconds
State = #{maxrate => 10,
span => 2,
rate => 0,
decision => ok,
lasttime => now_to_usec()},
State1 = proc_msgs(State, 8),
State2 = proc_msgs(State1, 1),
#{decision := ok} = State2,
State3 = proc_msgs(State2, 1),
#{decision := ok} = State3,
State4 = proc_msgs(State3, 1),
#{decision := excess} = State4,
State5 = proc_msgs(State4, 1),
#{decision := excess} = State5,
timer:sleep(2100),
State7 = proc_msgs(State5, 8),
#{decision := ok} = State7,
ok.

proc_msgs(State, 0) ->
State;
proc_msgs(State, Y) ->
M = #xmlel{name = <<"message">>,
attrs = [{<<"type">>, <<"chat">>}, {<<"to">>, <<"bob37.76184@localhost">>}],
children = [#xmlel{name = <<"body">>,
attrs = [],
children = [{xmlcdata, <<Y/integer>>}]}]},
NState = case mod_spamctl:control(State, <<"message">>, [M]) of
{stop, S} -> S;
S -> S
end,
proc_msgs(NState, Y - 1).


now_to_usec() ->
{MSec, Sec, USec} = os:timestamp(),
(MSec * 1000000 + Sec) * 1000000 + USec.
5 changes: 5 additions & 0 deletions rel/files/ejabberd.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,11 @@
{mod_bosh, []},
{mod_carboncopy, []}

%% anti-spam subsystem (requires http_connections activated)
%% {mod_spamctl, [{maxrate, 10},
%% {span, 2}
%% {pool_name, conn1}]},

%%
%% Message Archive Management (MAM, XEP-0313) for registered users and
%% Multi-User chats (MUCs).
Expand Down
Loading