Skip to content

Commit

Permalink
kvcoord: add DistSender circuit breakers
Browse files Browse the repository at this point in the history
This patch adds an initial implementation of DistSender replica circuit
breakers. Their primary purpose is to prevent the DistSender getting
stuck on non-functional replicas. In particular, the DistSender relies
on receiving a NLHE from the replica to update its range cache and try
other replicas, otherwise it will keep sending requests to the same
broken replica which will continue to get stuck, giving the appearance
of an unavailable range. This can happen if:

- The replica stalls, e.g. with a disk stall or mutex deadlock.

- Clients time out before the replica lease acquisition attempt times out,
  e.g. if the replica is partitioned away from the leader.

If a replica has returned only errors in the past few seconds, or hasn't
returned any responses at all, the circuit breaker will probe the
replica by sending a `LeaseInfo` request. This must either return
success or a NLHE pointing to a leaseholder.  Otherwise, the circuit
breaker trips, and the DistSender will skip it for future requests,
optionally also cancelling in-flight requests.

Currently, only replica-level circuit breakers are implemented. If a
range is unavailable, the DistSender will continue to retry replicas as
today. Range-level circuit breakers can be added later if needed, but
are considered out of scope here.

The circuit breakers are disabled by default for now. Some follow-up
work is likely needed before they can be enabled by default:

* Improve probe scalability. Currently, a goroutine is spawned per
  replica probe, which is likely too expensive at large scales.
  We should consider batching probes to nodes/stores, and using
  a bounded worker pool.

* Consider follower read handling, e.g. by tracking the replica's
  closed timestamp and allowing requests that may still be served
  by it even if it's partitioned away from the leaseholder.

* Improve observability, with metrics, tracing, and logging.

* Comprehensive testing and benchmarking.

This will be addressed separately.

Epic: none
Release note (general change): gateways will now detect faulty or
stalled replicas and use other replicas instead, which can prevent them
getting stuck in certain cases (e.g. with disk stalls). This behavior
can be disabled via the cluster setting
`kv.dist_sender.circuit_breaker.enabled`.
  • Loading branch information
erikgrinaker committed Mar 4, 2024
1 parent 51bbfff commit 68a90a6
Show file tree
Hide file tree
Showing 6 changed files with 1,138 additions and 11 deletions.
5 changes: 5 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ kv.closed_timestamp.follower_reads.enabled boolean true allow (all) replicas to
kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps system-visible
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport system-visible
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration system-visible
kv.dist_sender.circuit_breaker.cancellation.enabled boolean true when enabled, in-flight requests will be cancelled when the circuit breaker trips application
kv.dist_sender.circuit_breaker.enabled boolean true enable circuit breakers for failing or stalled replicas application
kv.dist_sender.circuit_breaker.probe.interval duration 3s interval between replica probes application
kv.dist_sender.circuit_breaker.probe.threshold duration 3s duration of errors or stalls after which a replica will be probed application
kv.dist_sender.circuit_breaker.probe.timeout duration 3s timeout for replica probes application
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records system-visible
kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited application
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval system-visible
Expand Down
5 changes: 5 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-override" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_override</code></div></td><td>duration</td><td><code>0s</code></td><td>if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-side-transport-interval" class="anchored"><code>kv.closed_timestamp.side_transport_interval</code></div></td><td>duration</td><td><code>200ms</code></td><td>the interval at which the closed timestamp side-transport attempts to advance each range&#39;s closed timestamp; set to 0 to disable the side-transport</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-target-duration" class="anchored"><code>kv.closed_timestamp.target_duration</code></div></td><td>duration</td><td><code>3s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-cancellation-enabled" class="anchored"><code>kv.dist_sender.circuit_breaker.cancellation.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when enabled, in-flight requests will be cancelled when the circuit breaker trips</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-enabled" class="anchored"><code>kv.dist_sender.circuit_breaker.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable circuit breakers for failing or stalled replicas</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-interval" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.interval</code></div></td><td>duration</td><td><code>3s</code></td><td>interval between replica probes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-threshold" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.threshold</code></div></td><td>duration</td><td><code>3s</code></td><td>duration of errors or stalls after which a replica will be probed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-timeout" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.timeout</code></div></td><td>duration</td><td><code>3s</code></td><td>timeout for replica probes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-lease-transfer-read-summary-global-budget" class="anchored"><code>kv.lease_transfer_read_summary.global_budget</code></div></td><td>byte size</td><td><code>0 B</code></td><td>controls the maximum number of bytes that will be used to summarize the global segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-lease-transfer-read-summary-local-budget" class="anchored"><code>kv.lease_transfer_read_summary.local_budget</code></div></td><td>byte size</td><td><code>4.0 MiB</code></td><td>controls the maximum number of bytes that will be used to summarize the local segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td><td>Dedicated/Self-Hosted</td></tr>
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"batch.go",
"condensable_span_set.go",
"dist_sender.go",
"dist_sender_circuit_breaker.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"dist_sender_rangefeed_canceler.go",
Expand Down Expand Up @@ -66,6 +67,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/ctxgroup",
"//pkg/util/envutil",
"//pkg/util/errorutil/unimplemented",
Expand Down Expand Up @@ -122,6 +124,7 @@ go_test(
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_ambiguous_test.go",
"dist_sender_circuit_breaker_test.go",
"dist_sender_rangefeed_canceler_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
Expand Down
45 changes: 34 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ type DistSender struct {
transportFactory TransportFactory
rpcRetryOptions retry.Options
asyncSenderSem *quotapool.IntPool
circuitBreakers *DistSenderCircuitBreakers

// batchInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
Expand Down Expand Up @@ -726,6 +727,13 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
})
}

// Set up circuit breakers and spawn the manager goroutine, which runs until
// the stopper stops. This can only error if the server is shutting down, so
// ignore the returned error.
ds.circuitBreakers = NewDistSenderCircuitBreakers(
ds.stopper, ds.st, ds.transportFactory, ds.metrics)
_ = ds.circuitBreakers.Start()

if cfg.TestingKnobs.LatencyFunc != nil {
ds.latencyFunc = cfg.TestingKnobs.LatencyFunc
}
Expand Down Expand Up @@ -2454,23 +2462,38 @@ func (ds *DistSender) sendToReplicas(
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

tBegin := timeutil.Now() // for slow log message
br, err = transport.SendNext(ctx, ba)
if dur := timeutil.Since(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
if cbErr != nil {
// Circuit breaker is tripped. err will be handled below.
err = cbErr
transport.SkipReplica()
} else {
br, err = transport.SendNext(sendCtx, ba)
tEnd := timeutil.Now()
cbToken.Done(br, err, tEnd.UnixNano())

if dur := tEnd.Sub(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
}
}
}

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

if err != nil {
if cbErr != nil {
log.VErrEventf(ctx, 2, "circuit breaker error: %s", cbErr)
// We know the request did not start, so the error is not ambiguous.

} else if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)

if grpcutil.IsAuthError(err) {
Expand Down
Loading

0 comments on commit 68a90a6

Please sign in to comment.