Skip to content

Commit

Permalink
Merge pull request #12617 from rabbitmq/amqp-log-macro
Browse files Browse the repository at this point in the history
Use log macros for AMQP
  • Loading branch information
michaelklishin authored Oct 31, 2024
2 parents df0b767 + dbd9ede commit 67bc950
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 84 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_access_control.erl
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ check_user_id0(ClaimedUserName, #user{username = ActualUserName,
end.

-spec update_state(User :: rabbit_types:user(), NewState :: term()) ->
{'ok', rabbit_types:auth_user()} |
{'ok', rabbit_types:user()} |
{'refused', string()} |
{'error', any()}.

Expand Down
14 changes: 13 additions & 1 deletion deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,19 @@ handle_http_req(<<"GET">>,
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName),
Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key],
RespPayload = encode_bindings(Bindings),
{<<"200">>, RespPayload, PermCaches}.
{<<"200">>, RespPayload, PermCaches};

handle_http_req(<<"PUT">>,
[<<"auth">>, <<"tokens">>],
_Query,
ReqPayload,
_Vhost,
_User,
ConnPid,
PermCaches) ->
{binary, Token} = ReqPayload,
ok = rabbit_amqp_reader:set_credential(ConnPid, Token),
{<<"204">>, null, PermCaches}.

decode_queue({map, KVList}) ->
M = lists:foldl(
Expand Down
121 changes: 75 additions & 46 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

-module(rabbit_amqp_reader).

-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp.hrl").

-export([init/1,
info/2,
mainloop/2]).
mainloop/2,
set_credential/2]).

-export([system_continue/3,
system_terminate/4,
Expand Down Expand Up @@ -53,6 +55,7 @@
channel_max :: non_neg_integer(),
auth_mechanism :: sasl_init_unprocessed | {binary(), module()},
auth_state :: term(),
credential_timer :: undefined | reference(),
properties :: undefined | {map, list(tuple())}
}).

Expand Down Expand Up @@ -139,6 +142,11 @@ server_properties() ->
Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1],
{map, Props}.

-spec set_credential(pid(), binary()) -> ok.
set_credential(Pid, Credential) ->
Pid ! {set_credential, Credential},
ok.

%%--------------------------------------------------------------------------

inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
Expand Down Expand Up @@ -243,6 +251,8 @@ handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
State;
handle_other(terminate_connection, _State) ->
stop;
handle_other({set_credential, Cred}, State) ->
set_credential0(Cred, State);
handle_other(credential_expired, State) ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []),
handle_exception(State, 0, Error);
Expand Down Expand Up @@ -320,16 +330,14 @@ error_frame(Condition, Fmt, Args) ->

handle_exception(State = #v1{connection_state = closed}, Channel,
#'v1_0.error'{description = {utf8, Desc}}) ->
rabbit_log_connection:error(
"Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), closed, Channel, Desc]),
?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), closed, Channel, Desc]),
State;
handle_exception(State = #v1{connection_state = CS}, Channel,
Error = #'v1_0.error'{description = {utf8, Desc}})
when ?IS_RUNNING(State) orelse CS =:= closing ->
rabbit_log_connection:error(
"Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), CS, Channel, Desc]),
?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), CS, Channel, Desc]),
close(Error, State);
handle_exception(State, _Channel, Error) ->
silent_close_delay(),
Expand Down Expand Up @@ -416,21 +424,23 @@ handle_connection_frame(
},
helper_sup = HelperSupPid,
sock = Sock} = State0) ->
logger:update_process_metadata(#{amqp_container => ContainerId}),
Vhost = vhost(Hostname),
logger:update_process_metadata(#{amqp_container => ContainerId,
vhost => Vhost,
user => Username}),
ok = check_user_loopback(State0),
ok = check_vhost_exists(Vhost, State0),
ok = check_vhost_alive(Vhost),
ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}),
ok = check_vhost_connection_limit(Vhost, Username),
ok = check_user_connection_limit(Username),
ok = ensure_credential_expiry_timer(User),
Timer = maybe_start_credential_expiry_timer(User),
rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10),
notify_auth(user_authentication_success, Username, State0),
rabbit_log_connection:info(
"Connection from AMQP 1.0 container '~ts': user '~ts' authenticated "
"using SASL mechanism ~s and granted access to vhost '~ts'",
[ContainerId, Username, Mechanism, Vhost]),
?LOG_INFO(
"Connection from AMQP 1.0 container '~ts': user '~ts' authenticated "
"using SASL mechanism ~s and granted access to vhost '~ts'",
[ContainerId, Username, Mechanism, Vhost]),

OutgoingMaxFrameSize = case ClientMaxFrame of
undefined ->
Expand Down Expand Up @@ -499,17 +509,18 @@ handle_connection_frame(
outgoing_max_frame_size = OutgoingMaxFrameSize,
channel_max = EffectiveChannelMax,
properties = Properties,
timeout = ReceiveTimeoutMillis},
timeout = ReceiveTimeoutMillis,
credential_timer = Timer},
heartbeater = Heartbeater},
State = start_writer(State1),
HostnameVal = case Hostname of
undefined -> undefined;
null -> undefined;
{utf8, Val} -> Val
end,
rabbit_log:debug(
"AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p",
[HostnameVal, Vhost, IdleTimeout]),
?LOG_DEBUG(
"AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p",
[HostnameVal, Vhost, IdleTimeout]),

Infos = infos(?CONNECTION_EVENT_KEYS, State),
ok = rabbit_core_metrics:connection_created(
Expand Down Expand Up @@ -768,16 +779,16 @@ notify_auth(EventType, Username, State) ->
rabbit_event:notify(EventType, EventProps).

track_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels} = State) ->
rabbit_log:debug("AMQP 1.0 created session process ~p for channel number ~b",
[SessionPid, ChannelNum]),
?LOG_DEBUG("AMQP 1.0 created session process ~p for channel number ~b",
[SessionPid, ChannelNum]),
_Ref = erlang:monitor(process, SessionPid, [{tag, {'DOWN', ChannelNum}}]),
State#v1{tracked_channels = maps:put(ChannelNum, SessionPid, Channels)}.

untrack_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels0} = State) ->
case maps:take(ChannelNum, Channels0) of
{SessionPid, Channels} ->
rabbit_log:debug("AMQP 1.0 closed session process ~p with channel number ~b",
[SessionPid, ChannelNum]),
?LOG_DEBUG("AMQP 1.0 closed session process ~p with channel number ~b",
[SessionPid, ChannelNum]),
State#v1{tracked_channels = Channels};
_ ->
State
Expand Down Expand Up @@ -871,39 +882,57 @@ check_user_connection_limit(Username) ->
end.


%% TODO Provide a means for the client to refresh the credential.
%% This could be either via:
%% 1. SASL (if multiple authentications are allowed on the same AMQP 1.0 connection), see
%% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or
%% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html
%% and https://github.com/rabbitmq/rabbitmq-server/issues/9259
%% 3. Simpler variation of 2. where a token is put to a special /token node.
%%
%% If the user does not refresh their credential on time (the only implementation currently),
%% close the entire connection as we must assume that vhost access could have been revoked.
%%
%% If the user refreshes their credential on time (to be implemented), the AMQP reader should
%% 1. rabbit_access_control:check_vhost_access/4
%% 2. send a message to all its sessions which should then erase the permission caches and
%% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed).
%% 3. cancel the current timer, and set a new timer
%% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292
ensure_credential_expiry_timer(User) ->
set_credential0(Cred,
State = #v1{connection = #v1_connection{
user = User0,
vhost = Vhost,
credential_timer = OldTimer} = Conn,
tracked_channels = Chans,
sock = Sock}) ->
?LOG_INFO("updating credential", []),
case rabbit_access_control:update_state(User0, Cred) of
{ok, User} ->
try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of
ok ->
maps:foreach(fun(_ChanNum, Pid) ->
rabbit_amqp_session:reset_authz(Pid, User)
end, Chans),
case OldTimer of
undefined -> ok;
Ref -> ok = erlang:cancel_timer(Ref, [{info, false}])
end,
NewTimer = maybe_start_credential_expiry_timer(User),
State#v1{connection = Conn#v1_connection{
user = User,
credential_timer = NewTimer}}
catch _:Reason ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"access to vhost ~s failed for new credential: ~p",
[Vhost, Reason]),
handle_exception(State, 0, Error)
end;
Err ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"credential update failed: ~p",
[Err]),
handle_exception(State, 0, Error)
end.

maybe_start_credential_expiry_timer(User) ->
case rabbit_access_control:expiry_timestamp(User) of
never ->
ok;
undefined;
Ts when is_integer(Ts) ->
Time = (Ts - os:system_time(second)) * 1000,
rabbit_log:debug(
"Credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
[Time, Ts]),
?LOG_DEBUG(
"credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
[Time, Ts]),
case Time > 0 of
true ->
_TimerRef = erlang:send_after(Time, self(), credential_expired),
ok;
erlang:send_after(Time, self(), credential_expired);
false ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"Credential expired ~b ms ago", [abs(Time)])
"credential expired ~b ms ago", [abs(Time)])
end
end.

Expand Down
68 changes: 56 additions & 12 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

-behaviour(gen_server).

-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp.hrl").
Expand Down Expand Up @@ -90,7 +91,8 @@
list_local/0,
conserve_resources/3,
check_resource_access/4,
check_read_permitted_on_topic/4
check_read_permitted_on_topic/4,
reset_authz/2
]).

-export([init/1,
Expand Down Expand Up @@ -393,6 +395,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
handle_max = ClientHandleMax}}) ->
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),
logger:update_process_metadata(#{channel_number => ChannelNum,
connection => ConnName,
vhost => Vhost,
user => User#user.username}),

ok = pg:join(pg_scope(), self(), self()),
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Expand Down Expand Up @@ -480,6 +486,10 @@ list_local() ->
conserve_resources(Pid, Source, {_, Conserve, _}) ->
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).

-spec reset_authz(pid(), rabbit_types:user()) -> ok.
reset_authz(Pid, User) ->
gen_server:cast(Pid, {reset_authz, User}).

handle_call(Msg, _From, State) ->
Reply = {error, {not_understood, Msg}},
reply(Reply, State).
Expand Down Expand Up @@ -574,15 +584,26 @@ handle_cast({conserve_resources, Alarm, Conserve},
noreply(State);
handle_cast(refresh_config, #state{cfg = #cfg{vhost = Vhost} = Cfg} = State0) ->
State = State0#state{cfg = Cfg#cfg{trace_state = rabbit_trace:init(Vhost)}},
noreply(State).
noreply(State);
handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
State1 = State0#state{
permission_cache = [],
topic_permission_cache = [],
cfg = Cfg#cfg{user = User}},
try recheck_authz(State1) of
State ->
noreply(State)
catch exit:#'v1_0.error'{} = Error ->
log_error_and_close_session(Error, State1)
end.

log_error_and_close_session(
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
writer_pid = WriterPid,
channel_num = Ch}}) ->
End = #'v1_0.end'{error = Error},
rabbit_log:warning("Closing session for connection ~p: ~tp",
[ReaderPid, Error]),
?LOG_WARNING("Closing session for connection ~p: ~tp",
[ReaderPid, Error]),
ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End),
{stop, {shutdown, Error}, State}.

Expand Down Expand Up @@ -869,8 +890,8 @@ destroy_outgoing_link(_, _, _, Acc) ->
Acc.

detach(Handle, Link, Error = #'v1_0.error'{}) ->
rabbit_log:warning("Detaching link handle ~b due to error: ~tp",
[Handle, Error]),
?LOG_WARNING("Detaching link handle ~b due to error: ~tp",
[Handle, Error]),
publisher_or_consumer_deleted(Link),
#'v1_0.detach'{handle = ?UINT(Handle),
closed = true,
Expand Down Expand Up @@ -961,8 +982,8 @@ handle_frame(#'v1_0.flow'{handle = Handle} = Flow,
%% "If set to a handle that is not currently associated with
%% an attached link, the recipient MUST respond by ending the
%% session with an unattached-handle session error." [2.7.4]
rabbit_log:warning(
"Received Flow frame for unknown link handle: ~tp", [Flow]),
?LOG_WARNING("Received Flow frame for unknown link handle: ~tp",
[Flow]),
protocol_error(
?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE,
"Unattached link handle: ~b", [HandleInt])
Expand Down Expand Up @@ -2141,9 +2162,9 @@ handle_deliver(ConsumerTag, AckRequired,
outgoing_links = OutgoingLinks};
_ ->
%% TODO handle missing link -- why does the queue think it's there?
rabbit_log:warning(
"No link handle ~b exists for delivery with consumer tag ~p from queue ~tp",
[Handle, ConsumerTag, QName]),
?LOG_WARNING(
"No link handle ~b exists for delivery with consumer tag ~p from queue ~tp",
[Handle, ConsumerTag, QName]),
State
end.

Expand Down Expand Up @@ -2988,7 +3009,7 @@ credit_reply_timeout(QType, QName) ->
Fmt = "Timed out waiting for credit reply from ~s ~s. "
"Hint: Enable feature flag rabbitmq_4.0.0",
Args = [QType, rabbit_misc:rs(QName)],
rabbit_log:error(Fmt, Args),
?LOG_ERROR(Fmt, Args),
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).

default(undefined, Default) -> Default;
Expand Down Expand Up @@ -3522,6 +3543,29 @@ check_topic_authorisation(#exchange{type = topic,
check_topic_authorisation(_, _, _, _, Cache) ->
Cache.

recheck_authz(#state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
permission_cache = Cache0,
cfg = #cfg{user = User}
} = State) ->
?LOG_DEBUG("rechecking link authorizations", []),
Cache1 = maps:fold(
fun(_Handle, #incoming_link{exchange = X}, Cache) ->
case X of
#exchange{name = XName} ->
check_resource_access(XName, write, User, Cache);
#resource{} = XName ->
check_resource_access(XName, write, User, Cache);
to ->
Cache
end
end, Cache0, IncomingLinks),
Cache2 = maps:fold(
fun(_Handle, #outgoing_link{queue_name = QName}, Cache) ->
check_resource_access(QName, read, User, Cache)
end, Cache1, OutgoingLinks),
State#state{permission_cache = Cache2}.

check_user_id(Mc, User) ->
case rabbit_access_control:check_user_id(Mc, User) of
ok ->
Expand Down
Loading

0 comments on commit 67bc950

Please sign in to comment.