From 957c6a7c2ab13227aa8bc467712a26b644c235b2 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 14 Feb 2022 14:17:51 +0100 Subject: [PATCH] kvserver: exempt certain requests from circuit breakers While running the nightly roachtest suite with circuit breakers enabled (#76146) I observed undesirable interactions between circuit breakers and bulk operations. Bulk operations tend to overload the cluster. When this happens, circuit breakers may fire "spuriously" (at the very least if liveness is lost across multiple nodes for some time), which returns hard errors to the bulk job. The job will then roll back any work it has done, which may also fail due to the circuit breaker (leaving the job in limbo). For long-running bulk jobs, it seems desirable to bypass the circuit breaker entirely. When unavailability occurs, the job will not be able to make progress, but it should be left to the operator whether to cancel it or not; after all, once the outage resolves the job may be able to resume normally. This PR adds machinery that allows request types to bypass circuit breakers. Concretely, any batch that contains either of the following request types: - Export - AddSSTable - RevertRange - ClearRange - GC - Probe Touches #33007. Release note: None --- .../client_replica_circuit_breaker_test.go | 137 +++++++++++++++--- pkg/kv/kvserver/replica_backpressure.go | 13 ++ pkg/kv/kvserver/replica_circuit_breaker.go | 17 ++- .../kvserver/replica_circuit_breaker_test.go | 2 +- pkg/kv/kvserver/replica_raft.go | 1 + pkg/kv/kvserver/replica_range_lease.go | 26 ++++ pkg/kv/kvserver/replica_send.go | 3 + pkg/roachpb/api.go | 69 +++++---- pkg/util/circuit/circuitbreaker.go | 19 ++- 9 files changed, 231 insertions(+), 56 deletions(-) diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 178d48339b32..d7d171cd2be8 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -12,6 +12,7 @@ package kvserver_test import ( "context" + "fmt" "sync/atomic" "testing" "time" @@ -24,6 +25,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -32,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -461,6 +465,89 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { readOneVal(t, stream2) } +func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Put the lease on n1 but then trip the breaker with the probe + // disabled, i.e. it will stay tripped. + require.NoError(t, tc.Write(n1)) + tc.SetProbeEnabled(n1, false) + tc.Report(n1, errors.New("boom")) + + exemptRequests := []func() roachpb.Request{ + func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} }, + func() roachpb.Request { + sstFile := &storage.MemFile{} + sst := storage.MakeIngestionSSTWriter(context.Background(), cluster.MakeTestingClusterSettings(), sstFile) + defer sst.Close() + require.NoError(t, sst.LogData([]byte("hello"))) + require.NoError(t, sst.Finish()) + + addReq := &roachpb.AddSSTableRequest{ + Data: sstFile.Data(), + IngestAsWrites: true, + } + return addReq + }, + func() roachpb.Request { + return &roachpb.RevertRangeRequest{TargetTime: tc.Servers[0].Clock().Now()} + }, + func() roachpb.Request { + return &roachpb.GCRequest{} + }, + func() roachpb.Request { + return &roachpb.ClearRangeRequest{} + }, + func() roachpb.Request { + return &roachpb.ProbeRequest{} + }, + } + + for _, reqFn := range exemptRequests { + req := reqFn() + t.Run(fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) { + require.NoError(t, tc.Send(n1, req)) + }) + } + for _, reqFn := range exemptRequests { + req := reqFn() + t.Run(fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) { + resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + resumeHeartbeats() // intentionally resume right now so that lease can be acquired + require.NoError(t, tc.Send(n1, req)) + }) + } + + resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + defer resumeHeartbeats() // can't acquire leases until test ends + + for _, reqFn := range exemptRequests { + req := reqFn() + if req.Method() == roachpb.Probe { + // Probe does not require the lease, and is the most-tested of the bunch + // already. We don't have to test it again here, which would require undue + // amounts of special-casing below. + continue + } + t.Run(fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond) + defer cancel() + const maxWait = 5 * time.Second + tBegin := timeutil.Now() + err := tc.SendCtx(ctx, n1, req) + t.Log(err) // usually: [NotLeaseHolderError] lease acquisition canceled because context canceled + require.Error(t, err) + require.Error(t, ctx.Err()) + // Make sure we didn't run into the "long" timeout inside of SendCtx but + // actually terminated as a result of our ctx cancelling. + require.Less(t, timeutil.Since(tBegin), maxWait) + }) + } +} + // Test infrastructure below. func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) { @@ -629,23 +716,37 @@ func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats boo } } -func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Request) error { +func (cbt *circuitBreakerTest) Send(idx int, req roachpb.Request) error { + return cbt.SendCtx(context.Background(), idx, req) + +} + +func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb.Request) error { var ba roachpb.BatchRequest + repl := cbt.repls[idx] ba.RangeID = repl.Desc().RangeID ba.Timestamp = repl.Clock().Now() ba.Add(req) - ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) + if h := req.Header(); len(h.Key) == 0 { + h.Key = repl.Desc().StartKey.AsRawKey() + if roachpb.IsRange(req) { + h.EndKey = repl.Desc().EndKey.AsRawKey() + } + req.SetHeader(h) + } + parCtx := ctx + ctx, cancel := context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration) + defer cancel() // Tag the breaker with the request. Once Send returns, we'll check that it's // no longer tracked by the breaker. This gives good coverage that we're not // going to leak memory. ctx = context.WithValue(ctx, req, struct{}{}) - defer cancel() _, pErr := repl.Send(ctx, ba) // If our context got canceled, return an opaque error regardless of presence or // absence of actual error. This makes sure we don't accidentally pass tests as // a result of our context cancellation. - if err := ctx.Err(); err != nil { + if err := ctx.Err(); err != nil && parCtx.Err() == nil { pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) } { @@ -665,6 +766,11 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque return pErr.GoError() } +func (cbt *circuitBreakerTest) WriteDS(idx int) error { + put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) + return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put) +} + func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb.Request) error { var ba roachpb.BatchRequest ba.Add(req) @@ -696,15 +802,6 @@ func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) require.True(t, ok, "%+v", err) } -func (cbt *circuitBreakerTest) Write(idx int) error { - return cbt.writeViaRepl(cbt.repls[idx].Replica) -} - -func (cbt *circuitBreakerTest) WriteDS(idx int) error { - put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) - return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put) -} - // SetSlowThreshold sets the SlowReplicationThreshold for requests sent through the // test harness (i.e. via Write) to the provided duration. The zero value restores // the default. @@ -712,16 +809,14 @@ func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) { cbt.slowThresh.Store(dur) } -func (cbt *circuitBreakerTest) Read(idx int) error { - return cbt.readViaRepl(cbt.repls[idx].Replica) -} - -func (cbt *circuitBreakerTest) writeViaRepl(repl *kvserver.Replica) error { +func (cbt *circuitBreakerTest) Write(idx int) error { + repl := cbt.repls[idx] put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) - return cbt.sendViaRepl(repl, put) + return cbt.Send(idx, put) } -func (cbt *circuitBreakerTest) readViaRepl(repl *kvserver.Replica) error { +func (cbt *circuitBreakerTest) Read(idx int) error { + repl := cbt.repls[idx] get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) - return cbt.sendViaRepl(repl, get) + return cbt.Send(idx, get) } diff --git a/pkg/kv/kvserver/replica_backpressure.go b/pkg/kv/kvserver/replica_backpressure.go index 1c98cccdd008..3ae46e823c95 100644 --- a/pkg/kv/kvserver/replica_backpressure.go +++ b/pkg/kv/kvserver/replica_backpressure.go @@ -108,6 +108,19 @@ func canBackpressureBatch(ba *roachpb.BatchRequest) bool { return false } +// bypassReplicaCircuitBreakerForBatch returns whether the provided +// BatchRequest bypasses the per-Replica circuit breaker. This is the +// case if any request in the batch is requesting to do so. +func bypassReplicaCircuitBreakerForBatch(ba *roachpb.BatchRequest) bool { + for _, ru := range ba.Requests { + req := ru.GetInner() + if roachpb.BypassesReplicaCircuitBreaker(req) { + return true + } + } + return false +} + // shouldBackpressureWrites returns whether writes to the range should be // subject to backpressure. This is based on the size of the range in // relation to the split size. The method returns true if the range is more diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go index 795ebb581558..e64ddb75f3a9 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker.go +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -96,8 +96,8 @@ func (br *replicaCircuitBreaker) Register( // TODO(tbg): we may want to exclude more requests from this check, or allow // requests to exclude themselves from the check (via their header). This - // latter mechanism could also replace isCircuitBreakerProbe. - if isCircuitBreakerProbe(ctx) { + // latter mechanism could also replace hasBypassCircuitBreakerMarker. + if hasBypassCircuitBreakerMarker(ctx) { // NB: brSig.C() == nil. brSig = neverTripSignaller{} } @@ -184,6 +184,10 @@ func (br *replicaCircuitBreaker) UnregisterAndAdjustError( return pErr } +func (br *replicaCircuitBreaker) HasMark(err error) bool { + return br.wrapped.HasMark(err) +} + func (br *replicaCircuitBreaker) cancelAllTrackedContexts() { br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) { cancel() @@ -297,11 +301,11 @@ func (r replicaCircuitBreakerLogger) OnReset(br *circuit.Breaker) { type probeKey struct{} -func isCircuitBreakerProbe(ctx context.Context) bool { +func hasBypassCircuitBreakerMarker(ctx context.Context) bool { return ctx.Value(probeKey{}) != nil } -func withCircuitBreakerProbeMarker(ctx context.Context) context.Context { +func withBypassCircuitBreakerMarker(ctx context.Context) context.Context { return context.WithValue(ctx, probeKey{}, probeKey{}) } @@ -330,7 +334,10 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { } func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error { - ctx = withCircuitBreakerProbeMarker(ctx) + // NB: we don't need to put this marker since ProbeRequest has the + // canBypassReplicaCircuitBreaker flag, but if in the future we do + // additional work in this method we may need it. + ctx = withBypassCircuitBreakerMarker(ctx) desc := r.Desc() if !desc.IsInitialized() { return nil diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go index e76db6e507ce..c7b6029b7d70 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -127,7 +127,7 @@ func TestReplicaCircuitBreaker_Register(t *testing.T) { defer leaktest.AfterTest(t)() br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") defer stopper.Stop(context.Background()) - ctx := withCircuitBreakerProbeMarker(context.Background()) + ctx := withBypassCircuitBreakerMarker(context.Background()) tok, sig, err := br.Register(ctx, func() {}) require.NoError(t, err) defer br.UnregisterAndAdjustError(tok, sig, nil /* pErr */) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 4d9271f2d90f..1cb46178ec4b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1153,6 +1153,7 @@ func (r *Replica) refreshProposalsLocked( } var maxSlowProposalDurationRequest *roachpb.BatchRequest + // TODO(tbg): don't track exempt requests for tripping the breaker? var maxSlowProposalDuration time.Duration var slowProposalCount int64 var reproposals pendingCmdSlice diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 3efb87618a38..46dab0b90d50 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -318,6 +318,20 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // coalesced requests timeout/cancel. p.cancelLocked (defined below) is the // cancel function that must be called; calling just cancel is insufficient. ctx := p.repl.AnnotateCtx(context.Background()) + if hasBypassCircuitBreakerMarker(ctx) { + // If the caller bypasses the circuit breaker, allow the lease to do the + // same. Otherwise, the lease will be refused by the circuit breaker as + // well. + // + // Note that there is a tiny race: if a request is in flight, but the + // request that triggered it (i.e. parentCtx here) does *not* bypass the + // probe, and before the circuit breaker rejects the inflight lease another + // request that *does* want to bypass the probe joins the request, it too + // will receive the circuit breaker error. This is special-cased in + // `redirectOnOrAcquireLease`, where such a caller needs to retry instead of + // propagating the error. + ctx = withBypassCircuitBreakerMarker(ctx) + } const opName = "request range lease" tr := p.repl.AmbientContext.Tracer tagsOpt := tracing.WithLogTags(logtags.FromContext(parentCtx)) @@ -1112,6 +1126,11 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat func (r *Replica) redirectOnOrAcquireLeaseForRequest( ctx context.Context, reqTS hlc.Timestamp, ) (kvserverpb.LeaseStatus, *roachpb.Error) { + if hasBypassCircuitBreakerMarker(ctx) { + defer func() { + log.Infof(ctx, "hello") + }() + } // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() { @@ -1209,6 +1228,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( } if llHandle == nil { // We own a valid lease. + log.Eventf(ctx, "valid lease %+v", status) return status, nil } @@ -1244,6 +1264,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // cannot be reproposed so we get this ambiguity. // We'll just loop around. return nil + case r.breaker.HasMark(goErr) && hasBypassCircuitBreakerMarker(ctx): + // If this request wanted to bypass the circuit breaker but still got a + // breaker error back, it joined a lease request started by an operation + // that did not bypass circuit breaker errors. Loop around and try again. + // See requestLeaseAsync for details. + return nil case errors.HasType(goErr, (*roachpb.LeaseRejectedError)(nil)): var tErr *roachpb.LeaseRejectedError errors.As(goErr, &tErr) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 537a641bd3a8..8554c35a663b 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -140,6 +140,9 @@ func (r *Replica) sendWithoutRangeID( // Circuit breaker handling. ctx, cancel := context.WithCancel(ctx) + if bypassReplicaCircuitBreakerForBatch(ba) { + ctx = withBypassCircuitBreakerMarker(ctx) + } tok, brSig, err := r.breaker.Register(ctx, cancel) if err != nil { return nil, roachpb.NewError(err) diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 67d6c649e334..1feab34d8fbf 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -75,23 +75,24 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error { type flag int const ( - isAdmin flag = 1 << iota // admin cmds don't go through raft, but run on lease holder - isRead // read-only cmds don't go through raft, but may run on lease holder - isWrite // write cmds go through raft and must be proposed on lease holder - isTxn // txn commands may be part of a transaction - isLocking // locking cmds acquire locks for their transaction - isIntentWrite // intent write cmds leave intents when they succeed - isRange // range commands may span multiple keys - isReverse // reverse commands traverse ranges in descending direction - isAlone // requests which must be alone in a batch - isPrefix // requests which, in a batch, must not be split from the following request - isUnsplittable // range command that must not be split during sending - skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease - appliesTSCache // commands which apply the timestamp cache and closed timestamp - updatesTSCache // commands which update the timestamp cache - updatesTSCacheOnErr // commands which make read data available on errors - needsRefresh // commands which require refreshes to avoid serializable retries - canBackpressure // commands which deserve backpressure when a Range grows too large + isAdmin flag = 1 << iota // admin cmds don't go through raft, but run on lease holder + isRead // read-only cmds don't go through raft, but may run on lease holder + isWrite // write cmds go through raft and must be proposed on lease holder + isTxn // txn commands may be part of a transaction + isLocking // locking cmds acquire locks for their transaction + isIntentWrite // intent write cmds leave intents when they succeed + isRange // range commands may span multiple keys + isReverse // reverse commands traverse ranges in descending direction + isAlone // requests which must be alone in a batch + isPrefix // requests which, in a batch, must not be split from the following request + isUnsplittable // range command that must not be split during sending + skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease + appliesTSCache // commands which apply the timestamp cache and closed timestamp + updatesTSCache // commands which update the timestamp cache + updatesTSCacheOnErr // commands which make read data available on errors + needsRefresh // commands which require refreshes to avoid serializable retries + canBackpressure // commands which deserve backpressure when a Range grows too large + bypassesReplicaCircuitBreaker // commands which bypass the replica circuit breaker, i.e. opt out of fail-fast ) // flagDependencies specifies flag dependencies, asserted by TestFlagCombinations. @@ -201,6 +202,13 @@ func CanBackpressure(args Request) bool { return (args.flags() & canBackpressure) != 0 } +// BypassesReplicaCircuitBreaker returns whether the command bypasses +// the per-Replica circuit breakers. These requests will thus hang when +// addressed to an unavailable range (instead of failing fast). +func BypassesReplicaCircuitBreaker(args Request) bool { + return (args.flags() & bypassesReplicaCircuitBreaker) != 0 +} + // Request is an interface for RPC requests. type Request interface { protoutil.Message @@ -1254,11 +1262,13 @@ func (drr *DeleteRangeRequest) flags() flag { // Note that ClearRange commands cannot be part of a transaction as // they clear all MVCC versions. -func (*ClearRangeRequest) flags() flag { return isWrite | isRange | isAlone } +func (*ClearRangeRequest) flags() flag { + return isWrite | isRange | isAlone | bypassesReplicaCircuitBreaker +} // Note that RevertRange commands cannot be part of a transaction as // they clear all MVCC versions above their target time. -func (*RevertRangeRequest) flags() flag { return isWrite | isRange } +func (*RevertRangeRequest) flags() flag { return isWrite | isRange | bypassesReplicaCircuitBreaker } func (sr *ScanRequest) flags() flag { maybeLocking := flagForLockStrength(sr.KeyLocking) @@ -1280,7 +1290,12 @@ func (*AdminMergeRequest) flags() flag { return isAdmin | isAlone } func (*AdminTransferLeaseRequest) flags() flag { return isAdmin | isAlone } func (*AdminChangeReplicasRequest) flags() flag { return isAdmin | isAlone } func (*AdminRelocateRangeRequest) flags() flag { return isAdmin | isAlone } -func (*GCRequest) flags() flag { return isWrite | isRange } + +func (*GCRequest) flags() flag { + // We let GCRequest bypass the circuit breaker because otherwise, the GC queue may + // busy loop on an unavailable range, doing lots of work but never making progress. + return isWrite | isRange | bypassesReplicaCircuitBreaker +} // HeartbeatTxn updates the timestamp cache with transaction records, // to avoid checking for them on disk when considering 1PC evaluation. @@ -1325,16 +1340,18 @@ func (*TransferLeaseRequest) flags() flag { return isWrite | isAlone | skipsLeaseCheck } func (*ProbeRequest) flags() flag { - return isWrite | isAlone | skipsLeaseCheck + return isWrite | isAlone | skipsLeaseCheck | bypassesReplicaCircuitBreaker +} +func (*RecomputeStatsRequest) flags() flag { return isWrite | isAlone } +func (*ComputeChecksumRequest) flags() flag { return isWrite } +func (*CheckConsistencyRequest) flags() flag { return isAdmin | isRange | isAlone } +func (*ExportRequest) flags() flag { + return isRead | isRange | updatesTSCache | bypassesReplicaCircuitBreaker } -func (*RecomputeStatsRequest) flags() flag { return isWrite | isAlone } -func (*ComputeChecksumRequest) flags() flag { return isWrite } -func (*CheckConsistencyRequest) flags() flag { return isAdmin | isRange | isAlone } -func (*ExportRequest) flags() flag { return isRead | isRange | updatesTSCache } func (*AdminScatterRequest) flags() flag { return isAdmin | isRange | isAlone } func (*AdminVerifyProtectedTimestampRequest) flags() flag { return isAdmin | isRange | isAlone } func (r *AddSSTableRequest) flags() flag { - flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure + flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure | bypassesReplicaCircuitBreaker if r.SSTTimestampToRequestTimestamp.IsSet() { flags |= appliesTSCache } diff --git a/pkg/util/circuit/circuitbreaker.go b/pkg/util/circuit/circuitbreaker.go index bb6d8a75c23d..6c7b093910b2 100644 --- a/pkg/util/circuit/circuitbreaker.go +++ b/pkg/util/circuit/circuitbreaker.go @@ -84,6 +84,20 @@ func (b *Breaker) Signal() interface { return b.mu.errAndCh } +// HasMark returns whether the error has an error mark that is unique to this +// breaker. In other words, the error originated at this Breaker. +// +// TODO(tbg): I think this doesn't work as advertised. Two breakers on different +// systems might produce wire-identical `b.errMark()`s. We really want a wrapping +// error which we can then retrieve & check for pointer equality with `b`. +func (b *Breaker) HasMark(err error) bool { + return errors.Is(err, b.errMark()) +} + +func (b *Breaker) errMark() error { + return (*breakerErrorMark)(b) +} + // Report reports a (non-nil) error to the breaker. This will trip the Breaker. func (b *Breaker) Report(err error) { if err == nil { @@ -92,8 +106,7 @@ func (b *Breaker) Report(err error) { return } // Give shouldTrip a chance to massage the error. - markErr := (*breakerErrorMark)(b) - if errors.Is(err, markErr) { + if b.HasMark(err) { // The input error originated from this breaker. This shouldn't // happen but since it is happening, we want to avoid creating // longer and longer error chains below. @@ -102,7 +115,7 @@ func (b *Breaker) Report(err error) { // Update the error. This may overwrite an earlier error, which is fine: // We want the breaker to reflect a recent error as this is more helpful. - storeErr := errors.Mark(errors.Mark(err, ErrBreakerOpen), markErr) + storeErr := errors.Mark(errors.Mark(err, ErrBreakerOpen), b.errMark()) // When the Breaker first trips, we populate the error and close the channel. // When the error changes, we have to replace errAndCh wholesale (that's the