Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: reap DistSender circuit breaker probes during GC #120946

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 174 additions & 98 deletions pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,25 @@ func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) {
cbs++

if idleDuration := cb.lastRequestDuration(nowNanos); idleDuration >= cbGCThreshold {
// Check if we raced with a concurrent delete. We don't expect to, since
// only this loop removes circuit breakers.
if _, ok := d.replicas.LoadAndDelete(key); ok {
// TODO(erikgrinaker): this needs to remove tripped circuit breakers
// from the metrics, otherwise they'll appear as tripped forever.
// However, there are race conditions with concurrent probes that can
// lead to metrics gauge leaks (both positive and negative), so we'll
// have to make sure we reap the probes here first.
// Reload the circuit breaker, in case we raced with a concurrent
// replace. We don't expect this, since only this loop removes them.
if v, ok := d.replicas.LoadAndDelete(key); ok {
cb = v.(*ReplicaCircuitBreaker)
d.metrics.CircuitBreaker.Replicas.Dec(1)
gced++

// Reap a currently running probe, if any. This shouldn't commonly
// happen, since probes shut down when idle, and we only GC idle
// replicas. But it can happen if we race with a probe launch, or if
// probes run longer than the GC threshold. The map removal above will
// prevent new probes being launched for this ReplicaCircuitBreaker
// (checked by launchProbe).
cb.maybeReapProbe()

// Remove the GCed breaker from tripped metrics.
if cb.isTripped() {
d.metrics.CircuitBreaker.ReplicasTripped.Dec(1)
}
}
}
return true
Expand Down Expand Up @@ -341,6 +350,10 @@ type ReplicaCircuitBreaker struct {
// cancelFns contains context cancellation functions for all in-flight
// requests. Only tracked if cancellation is enabled.
cancelFns map[*kvpb.BatchRequest]context.CancelCauseFunc

// reapProbe, if non-nil, can be called to synchronously cancel a running
// probe and wait for it to exit.
reapProbe func()
}
}

Expand Down Expand Up @@ -437,6 +450,18 @@ func (r *ReplicaCircuitBreaker) isTripped() bool {
return r.breaker.Signal().IsTripped()
}

// isGCed returns true if this circuit breaker has been GCed, i.e. it is no
// longer present in the replica circuit breaker map. It will return false if it
// has since been replaced by a different circuit breaker for the same replica.
func (r *ReplicaCircuitBreaker) isGCed() bool {
if r == nil {
return true // circuit breakers disabled
}
key := cbKey{rangeID: r.rangeID, replicaID: r.desc.ReplicaID}
v, ok := r.d.replicas.Load(key)
return !ok || v.(*ReplicaCircuitBreaker) != r
}

// Track attempts to start tracking a request with the circuit breaker. If the
// breaker is tripped, returns an error. Otherwise, returns the context to use
// for the send and a token which the caller must call Done() on with the result
Expand Down Expand Up @@ -604,6 +629,57 @@ func (r *ReplicaCircuitBreaker) done(

// launchProbe spawns an async replica probe that sends LeaseInfo requests to
// the replica and trips/untrips the breaker as appropriate.
func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {
// Install reapProbe to allow GC to synchronously cancel this probe, and
// extend done() to clean it up. This won't commonly happen, since the probe
// shuts down when the replica is inactive and we only GC after a period of
// inactivity. But it can happen if GC races with a probe launch, or if the
// probe interval or timeout has been increased beyond the GC threshold.
ctx, cancel := r.d.stopper.WithCancelOnQuiesce(r.d.ambientCtx.AnnotateCtx(context.Background()))

var wg sync.WaitGroup
wg.Add(1)

reapProbe := func() {
cancel()
wg.Wait()
}
r.mu.Lock()
r.mu.reapProbe = reapProbe
r.mu.Unlock()

prevDone := done
done = func() {
cancel()
wg.Done()
prevDone()
r.mu.Lock()
r.mu.reapProbe = nil
r.mu.Unlock()
}

// If the replica has been GCed, prevent launching new probes. This acts as a
// synchronization point with garbage collection.
//
// NB: this check must happen after reapProbe has been installed above, to
// avoid GC racing with us and not finding a reapProbe we install later.
if r.isGCed() {
done()
return
}

// Spawn the probe goroutine.
err := r.d.stopper.RunAsyncTask(ctx, fmt.Sprintf("distsender-replica-probe-%s", r.id()),
func(ctx context.Context) {
r.probeLoop(ctx, report, done)
})
if err != nil {
done()
}
}

// probeLoop sends LeaseInfo requests to the replica and trips/untrips the
// breaker as appropriate.
//
// While the breaker is tripped, the probe keeps running as long as there have
// been requests to the replica in the past few probe intervals. Otherwise, the
Expand All @@ -617,106 +693,95 @@ func (r *ReplicaCircuitBreaker) done(
// replicas on the same node/store. However, this needs server-side timeout
// handling such that if 1 out of 1000 replicas are stalled we won't fail the
// entire batch.
func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {
ctx := r.d.ambientCtx.AnnotateCtx(context.Background())
func (r *ReplicaCircuitBreaker) probeLoop(ctx context.Context, report func(error), done func()) {
defer done()

name := fmt.Sprintf("distsender-replica-probe-%s", r.id())
err := r.d.stopper.RunAsyncTask(ctx, name, func(ctx context.Context) {
defer done()
// Prepare the probe transport, using SystemClass to avoid RPC latency.
//
// We construct a bare replica slice without any locality information, since
// we're only going to contact this replica.
replicas := ReplicaSlice{{ReplicaDescriptor: r.desc}}
opts := SendOptions{
class: rpc.SystemClass,
metrics: &r.d.metrics,
dontConsiderConnHealth: true,
}
transport, err := r.d.transportFactory(opts, replicas)
if err != nil {
log.Errorf(ctx, "failed to launch probe: %s", err)
return
}
defer transport.Release()

ctx, cancel := r.d.stopper.WithCancelOnQuiesce(ctx)
defer cancel()
// Continually probe the replica until it succeeds or the replica stops
// seeing traffic. We probe immediately since we only trip the breaker on
// probe failure.
var timer timeutil.Timer
defer timer.Stop()
timer.Reset(CircuitBreakerProbeInterval.Get(&r.d.settings.SV))

// Prepare the probe transport, using SystemClass to avoid RPC latency.
//
// We construct a bare replica slice without any locality information, since
// we're only going to contact this replica.
replicas := ReplicaSlice{{ReplicaDescriptor: r.desc}}
opts := SendOptions{
class: rpc.SystemClass,
metrics: &r.d.metrics,
dontConsiderConnHealth: true,
}
transport, err := r.d.transportFactory(opts, replicas)
if err != nil {
log.Errorf(ctx, "failed to launch probe: %s", err)
for {
// Untrip the breaker and stop probing if circuit breakers are disabled.
if !CircuitBreakerEnabled.Get(&r.d.settings.SV) {
report(nil)
return
}
defer transport.Release()

// Continually probe the replica until it succeeds or the replica stops
// seeing traffic. We probe immediately since we only trip the breaker on
// probe failure.
var timer timeutil.Timer
defer timer.Stop()
timer.Reset(CircuitBreakerProbeInterval.Get(&r.d.settings.SV))

for {
// Untrip the breaker and stop probing if circuit breakers are disabled.
if !CircuitBreakerEnabled.Get(&r.d.settings.SV) {
report(nil)
return
}

// Probe the replica.
err := r.sendProbe(ctx, transport)
// Probe the replica.
err := r.sendProbe(ctx, transport)

// If the context (with no timeout) failed, we're shutting down. Just exit
// the probe without reporting the result (which could trip the breaker).
if ctx.Err() != nil {
return
}
// If the context (with no timeout) failed, we're shutting down. Just exit
// the probe without reporting the result (which could trip the breaker).
if ctx.Err() != nil {
return
}

// Report the probe result.
report(err)
if err == nil {
// On a successful probe, record the success and stop probing.
r.stallSince.Store(timeutil.Now().UnixNano())
r.errorSince.Store(0)
return
}
// Report the probe result.
report(err)
if err == nil {
// On a successful probe, record the success and stop probing.
r.stallSince.Store(timeutil.Now().UnixNano())
r.errorSince.Store(0)
return
}

// Cancel in-flight requests on failure. We do this on every failure, and
// also remove the cancel functions from the map (even though done() will
// also clean them up), in case another request makes it in after the
// breaker trips. There should typically never be any contention here.
func() {
r.mu.Lock()
defer r.mu.Unlock()
for ba, cancel := range r.mu.cancelFns {
delete(r.mu.cancelFns, ba)
cancel(errors.Wrapf(err, "%s is unavailable (circuit breaker tripped)", r.id()))
r.d.metrics.CircuitBreaker.ReplicasRequestsCancelled.Inc(1)
}
}()

select {
case <-timer.C:
timer.Read = true
case <-r.d.stopper.ShouldQuiesce():
return
case <-ctx.Done():
return
// Cancel in-flight requests on failure. We do this on every failure, and
// also remove the cancel functions from the map (even though done() will
// also clean them up), in case another request makes it in after the
// breaker trips. There should typically never be any contention here.
func() {
r.mu.Lock()
defer r.mu.Unlock()
for ba, cancel := range r.mu.cancelFns {
delete(r.mu.cancelFns, ba)
cancel(errors.Wrapf(err, "%s is unavailable (circuit breaker tripped)", r.id()))
r.d.metrics.CircuitBreaker.ReplicasRequestsCancelled.Inc(1)
}
}()

select {
case <-timer.C:
timer.Read = true
case <-r.d.stopper.ShouldQuiesce():
return
case <-ctx.Done():
return
}

probeInterval := CircuitBreakerProbeInterval.Get(&r.d.settings.SV)
timer.Reset(probeInterval)
probeInterval := CircuitBreakerProbeInterval.Get(&r.d.settings.SV)
timer.Reset(probeInterval)

// If there haven't been any requests in the past few probe intervals,
// stop probing but keep the breaker tripped. A new probe will be launched
// on the next request.
//
// NB: we check this after waiting out the probe interval above, to avoid
// frequently spawning new probe goroutines, instead waiting to see if any
// requests come in.
idleThreshold := cbProbeIdleIntervals * probeInterval
if r.lastRequestDuration(timeutil.Now().UnixNano()) >= idleThreshold {
return
}
// If there haven't been any requests in the past few probe intervals,
// stop probing but keep the breaker tripped. A new probe will be launched
// on the next request.
//
// NB: we check this after waiting out the probe interval above, to avoid
// frequently spawning new probe goroutines, instead waiting to see if any
// requests come in.
idleThreshold := cbProbeIdleIntervals * probeInterval
if r.lastRequestDuration(timeutil.Now().UnixNano()) >= idleThreshold {
return
}
})
if err != nil {
done()
}
}

Expand Down Expand Up @@ -769,9 +834,10 @@ func (r *ReplicaCircuitBreaker) sendProbe(ctx context.Context, transport Transpo
}

// Any other local error is likely a networking/gRPC issue. This includes if
// either the remote node or the local node has been decommissioned. We
// rely on RPC circuit breakers to fail fast for these, so there's no point
// in us probing individual replicas. Stop probing.
// either the remote node or the local node has been decommissioned. We rely
// on RPC circuit breakers to fail fast for these, so there's no point in us
// probing individual replicas. Stop probing and untrip the breaker if it
// was tripped.
return nil // nolint:returnerrcheck
}

Expand Down Expand Up @@ -801,6 +867,16 @@ func (r *ReplicaCircuitBreaker) sendProbe(ctx context.Context, transport Transpo
return errors.Wrapf(err, "probe failed")
}

// maybeReapProbe synchronously reaps the currently running probe, if any.
func (r *ReplicaCircuitBreaker) maybeReapProbe() {
r.mu.Lock()
reapProbe := r.mu.reapProbe
r.mu.Unlock()
if reapProbe != nil {
reapProbe()
}
}

// OnTrip implements circuit.EventHandler.
func (r *ReplicaCircuitBreaker) OnTrip(b *circuit.Breaker, prev, cur error) {
if cur == nil {
Expand Down
Loading