Skip to content

Commit

Permalink
rpc: Calculate round-trip time and export it via metrics
Browse files Browse the repository at this point in the history
Now the client has two durations using which it can estimate the
rount-trip time of a packet -- the duration between sending request and
receiving response and the time it too handler to execute on the server
side. Difference is the RTT value.

Those values from different calls are accumulated in a client counter as
well as the total number of RTT-s received (errors and old servers don't
send it). Metrics export both as counters, so one can rate it over time
and monitor average RTT.

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Jun 12, 2024
1 parent 3854bd2 commit 9bbe38c
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
1 change: 1 addition & 0 deletions include/seastar/rpc/rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ class client : public rpc::connection, public weakly_referencable<client> {
struct reply_handler_base {
timer<rpc_clock_type> t;
cancellable* pcancel = nullptr;
rpc_clock_type::time_point start;
virtual void operator()(client&, id_type, rcv_buf data) = 0;
virtual void timeout() {}
virtual void cancel() {}
Expand Down
8 changes: 5 additions & 3 deletions include/seastar/rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ template<typename Serializer>
struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};

template <typename Serializer, typename Ret, typename... InArgs>
inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, rpc::client& dst, id_type msg_id,
inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, rpc_clock_type::time_point start, cancellable* cancel, rpc::client& dst, id_type msg_id,
signature<Ret (InArgs...)>) {
using reply_type = rcv_reply<Serializer, Ret>;
auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
Expand All @@ -429,13 +429,14 @@ inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point>
};
using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
auto r = std::make_unique<handler_type>(std::move(lambda));
r->start = start;
auto fut = r->reply.p.get_future();
dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
return fut;
}

template<typename Serializer, typename... InArgs>
inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable*, rpc::client&, id_type,
inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, rpc_clock_type::time_point start, cancellable*, rpc::client&, id_type,
signature<no_wait_type (InArgs...)>) { // no_wait overload
return make_ready_future<>();
}
Expand Down Expand Up @@ -473,13 +474,14 @@ auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
return futurize<cleaned_ret_type>::make_exception_future(closed_error());
}

auto start = rpc_clock_type::now();
// send message
auto msg_id = dst.next_message_id();
snd_buf data = marshall(dst.template serializer<Serializer>(), request_frame_headroom, args...);

// prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
using wait = wait_signature_t<Ret>;
return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) {
return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, start, cancel, dst, msg_id, sig)).then([] (auto r) {
std::get<0>(r).ignore_ready_future();
return std::move(std::get<1>(r)); // return future of wait_for_reply
});
Expand Down
2 changes: 2 additions & 0 deletions include/seastar/rpc/rpc_types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ struct stats {
counter_type sent_messages = 0;
counter_type wait_reply = 0;
counter_type timeout = 0;
counter_type rtt_samples = 0;
std::chrono::duration<double> rtt_total = std::chrono::duration<double>(0);
};

class connection_id {
Expand Down
16 changes: 16 additions & 0 deletions src/rpc/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,15 @@ namespace rpc {
sm::description("Total number of exceptional responses received"), { domain_l }).set_skip_when_empty(),
sm::make_counter("timeout", std::bind(&domain::count_all, this, &stats::timeout),
sm::description("Total number of timeout responses"), { domain_l }).set_skip_when_empty(),
sm::make_counter("rtt_samples", std::bind(&domain::count_all, this, &stats::rtt_samples),
sm::description("Total number of RTT samples"), { domain_l }),
sm::make_counter("rtt_total", [this] () -> double {
std::chrono::duration<double> res(0);
for (const auto& m : list) {
res += m._c._stats.rtt_total;
}
return res.count();
}, sm::description("Total RTT time in seconds"), { domain_l }),
sm::make_gauge("pending", std::bind(&domain::count_all_fn, this, &client::outgoing_queue_length),
sm::description("Number of queued outbound messages"), { domain_l }),
sm::make_gauge("wait_reply", std::bind(&domain::count_all_fn, this, &client::incoming_queue_length),
Expand Down Expand Up @@ -904,6 +913,8 @@ namespace rpc {
_domain.dead.exception_received += _c._stats.exception_received;
_domain.dead.sent_messages += _c._stats.sent_messages;
_domain.dead.timeout += _c._stats.timeout;
_domain.dead.rtt_samples += _c._stats.rtt_samples;
_domain.dead.rtt_total += _c._stats.rtt_total;
}

client::client(const logger& l, void* s, client_options ops, socket socket, const socket_address& addr, const socket_address& local)
Expand Down Expand Up @@ -953,8 +964,13 @@ namespace rpc {
_error = true;
} else if (it != _outstanding.end()) {
auto handler = std::move(it->second);
auto ht = std::get<1>(msg_id_and_data);
_outstanding.erase(it);
(*handler)(*this, msg_id, std::move(data.value()));
if (ht) {
_stats.rtt_samples++;
_stats.rtt_total += (rpc_clock_type::now() - handler->start) - std::chrono::microseconds(*ht);
}
} else if (msg_id < 0) {
try {
std::rethrow_exception(unmarshal_exception(data.value()));
Expand Down

0 comments on commit 9bbe38c

Please sign in to comment.