Skip to content

Commit

Permalink
Merge 'Detect and report slow IO requests' from Pavel Emelyanov
Browse files Browse the repository at this point in the history
Scheduler goal is to make sure that dispatched requests complete not later than after io-latency-goal duration (which is defaulted to 1.5 times reactor CPU latency goal). In cases when disk or kernel slow down, there's flow-ratio guard that slows down the dispatch rate as well [1]. However, sometimes it may not be enough and requests delay their execution even more. Slowly executing requests are indirectly reported via toal-disk-delay metrics [2], but this metrics accumulates several requests into one counter potentially smoothing spikes by good requests. This detector is aimed at detecting individual slow requests and logging them along with some related statistics that should help to understand what's going on.

refs: #1766 [1] (eefa837) [2] (0238d25)
fixes: #1311
closes: #1609

Closes #2371

* github.com:scylladb/seastar:
  reactor: Add --io-completion-notify-ms option
  io_queue: Stall detector
  io_queue: Keep local variable with request execution delay
  io_queue: Rename flow ratio timer to be more generic
  reactor: Export _polls counter (internally)
  • Loading branch information
avikivity committed Aug 8, 2024
2 parents 48e75f2 + 7911006 commit 144aa53
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 9 deletions.
12 changes: 9 additions & 3 deletions include/seastar/core/io_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ private:
uint64_t _prev_dispatched = 0;
uint64_t _prev_completed = 0;
double _flow_ratio = 1.0;
timer<lowres_clock> _flow_ratio_update;

timer<lowres_clock> _averaging_decay_timer;

const std::chrono::milliseconds _stall_threshold_min;
std::chrono::milliseconds _stall_threshold;

void update_flow_ratio() noexcept;
void lower_stall_threshold() noexcept;

metrics::metric_groups _metric_groups;
public:
Expand Down Expand Up @@ -148,9 +153,10 @@ public:
bool duplex = false;
std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
size_t block_count_limit_min = 1;
unsigned flow_ratio_ticks = 100;
unsigned averaging_decay_ticks = 100;
double flow_ratio_ema_factor = 0.95;
double flow_ratio_backpressure_threshold = 1.1;
std::chrono::milliseconds stall_threshold = std::chrono::milliseconds(100);
};

io_queue(io_group_ptr group, internal::io_sink& sink);
Expand All @@ -166,7 +172,7 @@ public:
void submit_request(io_desc_read_write* desc, internal::io_request req) noexcept;
void cancel_request(queued_io_request& req) noexcept;
void complete_cancelled_request(queued_io_request& req) noexcept;
void complete_request(io_desc_read_write& desc) noexcept;
void complete_request(io_desc_read_write& desc, std::chrono::duration<double> delay) noexcept;

[[deprecated("I/O queue users should not track individual requests, but resources (weight, size) passing through the queue")]]
size_t queued_requests() const {
Expand Down
2 changes: 2 additions & 0 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ public:
void wakeup();
/// @private
bool stopped() const noexcept { return _stopped; }
/// @private
uint64_t polls() const noexcept { return _polls; }

private:
class signals {
Expand Down
5 changes: 5 additions & 0 deletions include/seastar/core/reactor_config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ struct reactor_options : public program_options::option_group {
///
/// Default: 1.1
program_options::value<double> io_flow_ratio_threshold;
/// \brief If an IO request is executed longer than that, this is printed to
/// logs with extra debugging
///
/// Default: infinite (detection is OFF)
program_options::value<unsigned> io_completion_notify_ms;
/// \brief Maximum number of task backlog to allow.
///
/// When the number of tasks grow above this, we stop polling (e.g. I/O)
Expand Down
33 changes: 27 additions & 6 deletions src/core/io_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class io_desc_read_write final : public io_completion {
const fair_queue_entry::capacity_t _fq_capacity;
promise<size_t> _pr;
iovec_keeper _iovs;
uint64_t _dispatched_polls;

public:
io_desc_read_write(io_queue& ioq, io_queue::priority_class_data& pc, stream_id stream, io_direction_and_length dnl, fair_queue_entry::capacity_t cap, iovec_keeper iovs)
Expand All @@ -235,16 +236,17 @@ class io_desc_read_write final : public io_completion {
virtual void set_exception(std::exception_ptr eptr) noexcept override {
io_log.trace("dev {} : req {} error", _ioq.dev_id(), fmt::ptr(this));
_pclass.on_error();
_ioq.complete_request(*this);
_ioq.complete_request(*this, std::chrono::duration<double>(0.0));
_pr.set_exception(eptr);
delete this;
}

virtual void complete(size_t res) noexcept override {
io_log.trace("dev {} : req {} complete", _ioq.dev_id(), fmt::ptr(this));
auto now = io_queue::clock_type::now();
_pclass.on_complete(std::chrono::duration_cast<std::chrono::duration<double>>(now - _ts));
_ioq.complete_request(*this);
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(now - _ts);
_pclass.on_complete(delay);
_ioq.complete_request(*this, delay);
_pr.set_value(res);
delete this;
}
Expand All @@ -260,6 +262,7 @@ class io_desc_read_write final : public io_completion {
auto now = io_queue::clock_type::now();
_pclass.on_dispatch(_dnl, std::chrono::duration_cast<std::chrono::duration<double>>(now - _ts));
_ts = now;
_dispatched_polls = engine().polls();
}

future<size_t> get_future() {
Expand All @@ -268,6 +271,7 @@ class io_desc_read_write final : public io_completion {

fair_queue_entry::capacity_t capacity() const noexcept { return _fq_capacity; }
stream_id stream() const noexcept { return _stream; }
uint64_t polls() const noexcept { return _dispatched_polls; }
};

class queued_io_request : private internal::io_request {
Expand Down Expand Up @@ -545,11 +549,23 @@ void io_queue::update_flow_ratio() noexcept {
}
}

void io_queue::lower_stall_threshold() noexcept {
auto new_threshold = _stall_threshold - std::chrono::milliseconds(1);
_stall_threshold = std::max(_stall_threshold_min, new_threshold);
}

void
io_queue::complete_request(io_desc_read_write& desc) noexcept {
io_queue::complete_request(io_desc_read_write& desc, std::chrono::duration<double> delay) noexcept {
_requests_executing--;
_requests_completed++;
_streams[desc.stream()].notify_request_finished(desc.capacity());

if (delay > _stall_threshold) {
_stall_threshold *= 2;
io_log.warn("Request took {:.3f}ms ({} polls) to execute, queued {} executing {}",
std::chrono::duration_cast<std::chrono::duration<double, std::milli>>(delay).count(),
engine().polls() - desc.polls(), _queued_requests, _requests_executing);
}
}

fair_queue::config io_queue::make_fair_queue_config(const config& iocfg, sstring label) {
Expand All @@ -562,7 +578,12 @@ io_queue::io_queue(io_group_ptr group, internal::io_sink& sink)
: _priority_classes()
, _group(std::move(group))
, _sink(sink)
, _flow_ratio_update([this] { update_flow_ratio(); })
, _averaging_decay_timer([this] {
update_flow_ratio();
lower_stall_threshold();
})
, _stall_threshold_min(std::max(get_config().stall_threshold, 1ms))
, _stall_threshold(_stall_threshold_min)
{
auto& cfg = get_config();
if (cfg.duplex) {
Expand All @@ -573,7 +594,7 @@ io_queue::io_queue(io_group_ptr group, internal::io_sink& sink)
} else {
_streams.emplace_back(_group->_fgs[0], make_fair_queue_config(cfg, "rw"));
}
_flow_ratio_update.arm_periodic(std::chrono::duration_cast<std::chrono::milliseconds>(_group->io_latency_goal() * cfg.flow_ratio_ticks));
_averaging_decay_timer.arm_periodic(std::chrono::duration_cast<std::chrono::milliseconds>(_group->io_latency_goal() * cfg.averaging_decay_ticks));

namespace sm = seastar::metrics;
auto owner_l = sm::shard_label(this_shard_id());
Expand Down
8 changes: 8 additions & 0 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3813,6 +3813,7 @@ reactor_options::reactor_options(program_options::option_group* parent_group)
, task_quota_ms(*this, "task-quota-ms", 0.5, "Max time (ms) between polls")
, io_latency_goal_ms(*this, "io-latency-goal-ms", {}, "Max time (ms) io operations must take (1.5 * task-quota-ms if not set)")
, io_flow_ratio_threshold(*this, "io-flow-rate-threshold", 1.1, "Dispatch rate to completion rate threshold")
, io_completion_notify_ms(*this, "io-completion-notify-ms", {}, "Threshold in milliseconds over which IO request completion is reported to logs")
, max_task_backlog(*this, "max-task-backlog", 1000, "Maximum number of task backlog to allow; above this we ignore I/O")
, blocked_reactor_notify_ms(*this, "blocked-reactor-notify-ms", 25, "threshold in miliseconds over which the reactor is considered blocked if no progress is made")
, blocked_reactor_reports_per_minute(*this, "blocked-reactor-reports-per-minute", 5, "Maximum number of backtraces reported by stall detector per minute")
Expand Down Expand Up @@ -4046,6 +4047,7 @@ class disk_config_params {
unsigned _num_io_groups = 0;
std::unordered_map<dev_t, mountpoint_params> _mountpoints;
std::chrono::duration<double> _latency_goal;
std::chrono::milliseconds _stall_threshold;
double _flow_ratio_backpressure_threshold;

public:
Expand All @@ -4063,6 +4065,10 @@ class disk_config_params {
return _latency_goal;
}

std::chrono::milliseconds stall_threshold() const {
return _stall_threshold;
}

double latency_goal_opt(const reactor_options& opts) const {
return opts.io_latency_goal_ms ?
opts.io_latency_goal_ms.get_value() :
Expand All @@ -4075,6 +4081,7 @@ class disk_config_params {
seastar_logger.debug("latency_goal: {}", latency_goal().count());
_flow_ratio_backpressure_threshold = reactor_opts.io_flow_ratio_threshold.get_value();
seastar_logger.debug("flow-ratio threshold: {}", _flow_ratio_backpressure_threshold);
_stall_threshold = reactor_opts.io_completion_notify_ms.defaulted() ? std::chrono::milliseconds::max() : reactor_opts.io_completion_notify_ms.get_value() * 1ms;

if (smp_opts.num_io_groups) {
_num_io_groups = smp_opts.num_io_groups.get_value();
Expand Down Expand Up @@ -4170,6 +4177,7 @@ class disk_config_params {
// scheduler will self-tune to allow for the single 64k request, while it would
// be better to sacrifice some IO latency, but allow for larger concurrency
cfg.block_count_limit_min = (64 << 10) >> io_queue::block_size_shift;
cfg.stall_threshold = stall_threshold();

return cfg;
}
Expand Down

0 comments on commit 144aa53

Please sign in to comment.