diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 178d48339b32..d7d171cd2be8 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -12,6 +12,7 @@ package kvserver_test import ( "context" + "fmt" "sync/atomic" "testing" "time" @@ -24,6 +25,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -32,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -461,6 +465,89 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { readOneVal(t, stream2) } +func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Put the lease on n1 but then trip the breaker with the probe + // disabled, i.e. it will stay tripped. + require.NoError(t, tc.Write(n1)) + tc.SetProbeEnabled(n1, false) + tc.Report(n1, errors.New("boom")) + + exemptRequests := []func() roachpb.Request{ + func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} }, + func() roachpb.Request { + sstFile := &storage.MemFile{} + sst := storage.MakeIngestionSSTWriter(context.Background(), cluster.MakeTestingClusterSettings(), sstFile) + defer sst.Close() + require.NoError(t, sst.LogData([]byte("hello"))) + require.NoError(t, sst.Finish()) + + addReq := &roachpb.AddSSTableRequest{ + Data: sstFile.Data(), + IngestAsWrites: true, + } + return addReq + }, + func() roachpb.Request { + return &roachpb.RevertRangeRequest{TargetTime: tc.Servers[0].Clock().Now()} + }, + func() roachpb.Request { + return &roachpb.GCRequest{} + }, + func() roachpb.Request { + return &roachpb.ClearRangeRequest{} + }, + func() roachpb.Request { + return &roachpb.ProbeRequest{} + }, + } + + for _, reqFn := range exemptRequests { + req := reqFn() + t.Run(fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) { + require.NoError(t, tc.Send(n1, req)) + }) + } + for _, reqFn := range exemptRequests { + req := reqFn() + t.Run(fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) { + resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + resumeHeartbeats() // intentionally resume right now so that lease can be acquired + require.NoError(t, tc.Send(n1, req)) + }) + } + + resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + defer resumeHeartbeats() // can't acquire leases until test ends + + for _, reqFn := range exemptRequests { + req := reqFn() + if req.Method() == roachpb.Probe { + // Probe does not require the lease, and is the most-tested of the bunch + // already. We don't have to test it again here, which would require undue + // amounts of special-casing below. + continue + } + t.Run(fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond) + defer cancel() + const maxWait = 5 * time.Second + tBegin := timeutil.Now() + err := tc.SendCtx(ctx, n1, req) + t.Log(err) // usually: [NotLeaseHolderError] lease acquisition canceled because context canceled + require.Error(t, err) + require.Error(t, ctx.Err()) + // Make sure we didn't run into the "long" timeout inside of SendCtx but + // actually terminated as a result of our ctx cancelling. + require.Less(t, timeutil.Since(tBegin), maxWait) + }) + } +} + // Test infrastructure below. func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) { @@ -629,23 +716,37 @@ func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats boo } } -func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Request) error { +func (cbt *circuitBreakerTest) Send(idx int, req roachpb.Request) error { + return cbt.SendCtx(context.Background(), idx, req) + +} + +func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb.Request) error { var ba roachpb.BatchRequest + repl := cbt.repls[idx] ba.RangeID = repl.Desc().RangeID ba.Timestamp = repl.Clock().Now() ba.Add(req) - ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) + if h := req.Header(); len(h.Key) == 0 { + h.Key = repl.Desc().StartKey.AsRawKey() + if roachpb.IsRange(req) { + h.EndKey = repl.Desc().EndKey.AsRawKey() + } + req.SetHeader(h) + } + parCtx := ctx + ctx, cancel := context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration) + defer cancel() // Tag the breaker with the request. Once Send returns, we'll check that it's // no longer tracked by the breaker. This gives good coverage that we're not // going to leak memory. ctx = context.WithValue(ctx, req, struct{}{}) - defer cancel() _, pErr := repl.Send(ctx, ba) // If our context got canceled, return an opaque error regardless of presence or // absence of actual error. This makes sure we don't accidentally pass tests as // a result of our context cancellation. - if err := ctx.Err(); err != nil { + if err := ctx.Err(); err != nil && parCtx.Err() == nil { pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) } { @@ -665,6 +766,11 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque return pErr.GoError() } +func (cbt *circuitBreakerTest) WriteDS(idx int) error { + put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) + return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put) +} + func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb.Request) error { var ba roachpb.BatchRequest ba.Add(req) @@ -696,15 +802,6 @@ func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) require.True(t, ok, "%+v", err) } -func (cbt *circuitBreakerTest) Write(idx int) error { - return cbt.writeViaRepl(cbt.repls[idx].Replica) -} - -func (cbt *circuitBreakerTest) WriteDS(idx int) error { - put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) - return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put) -} - // SetSlowThreshold sets the SlowReplicationThreshold for requests sent through the // test harness (i.e. via Write) to the provided duration. The zero value restores // the default. @@ -712,16 +809,14 @@ func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) { cbt.slowThresh.Store(dur) } -func (cbt *circuitBreakerTest) Read(idx int) error { - return cbt.readViaRepl(cbt.repls[idx].Replica) -} - -func (cbt *circuitBreakerTest) writeViaRepl(repl *kvserver.Replica) error { +func (cbt *circuitBreakerTest) Write(idx int) error { + repl := cbt.repls[idx] put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) - return cbt.sendViaRepl(repl, put) + return cbt.Send(idx, put) } -func (cbt *circuitBreakerTest) readViaRepl(repl *kvserver.Replica) error { +func (cbt *circuitBreakerTest) Read(idx int) error { + repl := cbt.repls[idx] get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) - return cbt.sendViaRepl(repl, get) + return cbt.Send(idx, get) } diff --git a/pkg/kv/kvserver/replica_backpressure.go b/pkg/kv/kvserver/replica_backpressure.go index 1c98cccdd008..3ae46e823c95 100644 --- a/pkg/kv/kvserver/replica_backpressure.go +++ b/pkg/kv/kvserver/replica_backpressure.go @@ -108,6 +108,19 @@ func canBackpressureBatch(ba *roachpb.BatchRequest) bool { return false } +// bypassReplicaCircuitBreakerForBatch returns whether the provided +// BatchRequest bypasses the per-Replica circuit breaker. This is the +// case if any request in the batch is requesting to do so. +func bypassReplicaCircuitBreakerForBatch(ba *roachpb.BatchRequest) bool { + for _, ru := range ba.Requests { + req := ru.GetInner() + if roachpb.BypassesReplicaCircuitBreaker(req) { + return true + } + } + return false +} + // shouldBackpressureWrites returns whether writes to the range should be // subject to backpressure. This is based on the size of the range in // relation to the split size. The method returns true if the range is more diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go index 795ebb581558..e64ddb75f3a9 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker.go +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -96,8 +96,8 @@ func (br *replicaCircuitBreaker) Register( // TODO(tbg): we may want to exclude more requests from this check, or allow // requests to exclude themselves from the check (via their header). This - // latter mechanism could also replace isCircuitBreakerProbe. - if isCircuitBreakerProbe(ctx) { + // latter mechanism could also replace hasBypassCircuitBreakerMarker. + if hasBypassCircuitBreakerMarker(ctx) { // NB: brSig.C() == nil. brSig = neverTripSignaller{} } @@ -184,6 +184,10 @@ func (br *replicaCircuitBreaker) UnregisterAndAdjustError( return pErr } +func (br *replicaCircuitBreaker) HasMark(err error) bool { + return br.wrapped.HasMark(err) +} + func (br *replicaCircuitBreaker) cancelAllTrackedContexts() { br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) { cancel() @@ -297,11 +301,11 @@ func (r replicaCircuitBreakerLogger) OnReset(br *circuit.Breaker) { type probeKey struct{} -func isCircuitBreakerProbe(ctx context.Context) bool { +func hasBypassCircuitBreakerMarker(ctx context.Context) bool { return ctx.Value(probeKey{}) != nil } -func withCircuitBreakerProbeMarker(ctx context.Context) context.Context { +func withBypassCircuitBreakerMarker(ctx context.Context) context.Context { return context.WithValue(ctx, probeKey{}, probeKey{}) } @@ -330,7 +334,10 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { } func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error { - ctx = withCircuitBreakerProbeMarker(ctx) + // NB: we don't need to put this marker since ProbeRequest has the + // canBypassReplicaCircuitBreaker flag, but if in the future we do + // additional work in this method we may need it. + ctx = withBypassCircuitBreakerMarker(ctx) desc := r.Desc() if !desc.IsInitialized() { return nil diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go index e76db6e507ce..c7b6029b7d70 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -127,7 +127,7 @@ func TestReplicaCircuitBreaker_Register(t *testing.T) { defer leaktest.AfterTest(t)() br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") defer stopper.Stop(context.Background()) - ctx := withCircuitBreakerProbeMarker(context.Background()) + ctx := withBypassCircuitBreakerMarker(context.Background()) tok, sig, err := br.Register(ctx, func() {}) require.NoError(t, err) defer br.UnregisterAndAdjustError(tok, sig, nil /* pErr */) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 4d9271f2d90f..1cb46178ec4b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1153,6 +1153,7 @@ func (r *Replica) refreshProposalsLocked( } var maxSlowProposalDurationRequest *roachpb.BatchRequest + // TODO(tbg): don't track exempt requests for tripping the breaker? var maxSlowProposalDuration time.Duration var slowProposalCount int64 var reproposals pendingCmdSlice diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 3efb87618a38..46dab0b90d50 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -318,6 +318,20 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // coalesced requests timeout/cancel. p.cancelLocked (defined below) is the // cancel function that must be called; calling just cancel is insufficient. ctx := p.repl.AnnotateCtx(context.Background()) + if hasBypassCircuitBreakerMarker(ctx) { + // If the caller bypasses the circuit breaker, allow the lease to do the + // same. Otherwise, the lease will be refused by the circuit breaker as + // well. + // + // Note that there is a tiny race: if a request is in flight, but the + // request that triggered it (i.e. parentCtx here) does *not* bypass the + // probe, and before the circuit breaker rejects the inflight lease another + // request that *does* want to bypass the probe joins the request, it too + // will receive the circuit breaker error. This is special-cased in + // `redirectOnOrAcquireLease`, where such a caller needs to retry instead of + // propagating the error. + ctx = withBypassCircuitBreakerMarker(ctx) + } const opName = "request range lease" tr := p.repl.AmbientContext.Tracer tagsOpt := tracing.WithLogTags(logtags.FromContext(parentCtx)) @@ -1112,6 +1126,11 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat func (r *Replica) redirectOnOrAcquireLeaseForRequest( ctx context.Context, reqTS hlc.Timestamp, ) (kvserverpb.LeaseStatus, *roachpb.Error) { + if hasBypassCircuitBreakerMarker(ctx) { + defer func() { + log.Infof(ctx, "hello") + }() + } // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() { @@ -1209,6 +1228,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( } if llHandle == nil { // We own a valid lease. + log.Eventf(ctx, "valid lease %+v", status) return status, nil } @@ -1244,6 +1264,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // cannot be reproposed so we get this ambiguity. // We'll just loop around. return nil + case r.breaker.HasMark(goErr) && hasBypassCircuitBreakerMarker(ctx): + // If this request wanted to bypass the circuit breaker but still got a + // breaker error back, it joined a lease request started by an operation + // that did not bypass circuit breaker errors. Loop around and try again. + // See requestLeaseAsync for details. + return nil case errors.HasType(goErr, (*roachpb.LeaseRejectedError)(nil)): var tErr *roachpb.LeaseRejectedError errors.As(goErr, &tErr) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 537a641bd3a8..8554c35a663b 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -140,6 +140,9 @@ func (r *Replica) sendWithoutRangeID( // Circuit breaker handling. ctx, cancel := context.WithCancel(ctx) + if bypassReplicaCircuitBreakerForBatch(ba) { + ctx = withBypassCircuitBreakerMarker(ctx) + } tok, brSig, err := r.breaker.Register(ctx, cancel) if err != nil { return nil, roachpb.NewError(err) diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 67d6c649e334..1feab34d8fbf 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -75,23 +75,24 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error { type flag int const ( - isAdmin flag = 1 << iota // admin cmds don't go through raft, but run on lease holder - isRead // read-only cmds don't go through raft, but may run on lease holder - isWrite // write cmds go through raft and must be proposed on lease holder - isTxn // txn commands may be part of a transaction - isLocking // locking cmds acquire locks for their transaction - isIntentWrite // intent write cmds leave intents when they succeed - isRange // range commands may span multiple keys - isReverse // reverse commands traverse ranges in descending direction - isAlone // requests which must be alone in a batch - isPrefix // requests which, in a batch, must not be split from the following request - isUnsplittable // range command that must not be split during sending - skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease - appliesTSCache // commands which apply the timestamp cache and closed timestamp - updatesTSCache // commands which update the timestamp cache - updatesTSCacheOnErr // commands which make read data available on errors - needsRefresh // commands which require refreshes to avoid serializable retries - canBackpressure // commands which deserve backpressure when a Range grows too large + isAdmin flag = 1 << iota // admin cmds don't go through raft, but run on lease holder + isRead // read-only cmds don't go through raft, but may run on lease holder + isWrite // write cmds go through raft and must be proposed on lease holder + isTxn // txn commands may be part of a transaction + isLocking // locking cmds acquire locks for their transaction + isIntentWrite // intent write cmds leave intents when they succeed + isRange // range commands may span multiple keys + isReverse // reverse commands traverse ranges in descending direction + isAlone // requests which must be alone in a batch + isPrefix // requests which, in a batch, must not be split from the following request + isUnsplittable // range command that must not be split during sending + skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease + appliesTSCache // commands which apply the timestamp cache and closed timestamp + updatesTSCache // commands which update the timestamp cache + updatesTSCacheOnErr // commands which make read data available on errors + needsRefresh // commands which require refreshes to avoid serializable retries + canBackpressure // commands which deserve backpressure when a Range grows too large + bypassesReplicaCircuitBreaker // commands which bypass the replica circuit breaker, i.e. opt out of fail-fast ) // flagDependencies specifies flag dependencies, asserted by TestFlagCombinations. @@ -201,6 +202,13 @@ func CanBackpressure(args Request) bool { return (args.flags() & canBackpressure) != 0 } +// BypassesReplicaCircuitBreaker returns whether the command bypasses +// the per-Replica circuit breakers. These requests will thus hang when +// addressed to an unavailable range (instead of failing fast). +func BypassesReplicaCircuitBreaker(args Request) bool { + return (args.flags() & bypassesReplicaCircuitBreaker) != 0 +} + // Request is an interface for RPC requests. type Request interface { protoutil.Message @@ -1254,11 +1262,13 @@ func (drr *DeleteRangeRequest) flags() flag { // Note that ClearRange commands cannot be part of a transaction as // they clear all MVCC versions. -func (*ClearRangeRequest) flags() flag { return isWrite | isRange | isAlone } +func (*ClearRangeRequest) flags() flag { + return isWrite | isRange | isAlone | bypassesReplicaCircuitBreaker +} // Note that RevertRange commands cannot be part of a transaction as // they clear all MVCC versions above their target time. -func (*RevertRangeRequest) flags() flag { return isWrite | isRange } +func (*RevertRangeRequest) flags() flag { return isWrite | isRange | bypassesReplicaCircuitBreaker } func (sr *ScanRequest) flags() flag { maybeLocking := flagForLockStrength(sr.KeyLocking) @@ -1280,7 +1290,12 @@ func (*AdminMergeRequest) flags() flag { return isAdmin | isAlone } func (*AdminTransferLeaseRequest) flags() flag { return isAdmin | isAlone } func (*AdminChangeReplicasRequest) flags() flag { return isAdmin | isAlone } func (*AdminRelocateRangeRequest) flags() flag { return isAdmin | isAlone } -func (*GCRequest) flags() flag { return isWrite | isRange } + +func (*GCRequest) flags() flag { + // We let GCRequest bypass the circuit breaker because otherwise, the GC queue may + // busy loop on an unavailable range, doing lots of work but never making progress. + return isWrite | isRange | bypassesReplicaCircuitBreaker +} // HeartbeatTxn updates the timestamp cache with transaction records, // to avoid checking for them on disk when considering 1PC evaluation. @@ -1325,16 +1340,18 @@ func (*TransferLeaseRequest) flags() flag { return isWrite | isAlone | skipsLeaseCheck } func (*ProbeRequest) flags() flag { - return isWrite | isAlone | skipsLeaseCheck + return isWrite | isAlone | skipsLeaseCheck | bypassesReplicaCircuitBreaker +} +func (*RecomputeStatsRequest) flags() flag { return isWrite | isAlone } +func (*ComputeChecksumRequest) flags() flag { return isWrite } +func (*CheckConsistencyRequest) flags() flag { return isAdmin | isRange | isAlone } +func (*ExportRequest) flags() flag { + return isRead | isRange | updatesTSCache | bypassesReplicaCircuitBreaker } -func (*RecomputeStatsRequest) flags() flag { return isWrite | isAlone } -func (*ComputeChecksumRequest) flags() flag { return isWrite } -func (*CheckConsistencyRequest) flags() flag { return isAdmin | isRange | isAlone } -func (*ExportRequest) flags() flag { return isRead | isRange | updatesTSCache } func (*AdminScatterRequest) flags() flag { return isAdmin | isRange | isAlone } func (*AdminVerifyProtectedTimestampRequest) flags() flag { return isAdmin | isRange | isAlone } func (r *AddSSTableRequest) flags() flag { - flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure + flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure | bypassesReplicaCircuitBreaker if r.SSTTimestampToRequestTimestamp.IsSet() { flags |= appliesTSCache } diff --git a/pkg/util/circuit/circuitbreaker.go b/pkg/util/circuit/circuitbreaker.go index bb6d8a75c23d..6c7b093910b2 100644 --- a/pkg/util/circuit/circuitbreaker.go +++ b/pkg/util/circuit/circuitbreaker.go @@ -84,6 +84,20 @@ func (b *Breaker) Signal() interface { return b.mu.errAndCh } +// HasMark returns whether the error has an error mark that is unique to this +// breaker. In other words, the error originated at this Breaker. +// +// TODO(tbg): I think this doesn't work as advertised. Two breakers on different +// systems might produce wire-identical `b.errMark()`s. We really want a wrapping +// error which we can then retrieve & check for pointer equality with `b`. +func (b *Breaker) HasMark(err error) bool { + return errors.Is(err, b.errMark()) +} + +func (b *Breaker) errMark() error { + return (*breakerErrorMark)(b) +} + // Report reports a (non-nil) error to the breaker. This will trip the Breaker. func (b *Breaker) Report(err error) { if err == nil { @@ -92,8 +106,7 @@ func (b *Breaker) Report(err error) { return } // Give shouldTrip a chance to massage the error. - markErr := (*breakerErrorMark)(b) - if errors.Is(err, markErr) { + if b.HasMark(err) { // The input error originated from this breaker. This shouldn't // happen but since it is happening, we want to avoid creating // longer and longer error chains below. @@ -102,7 +115,7 @@ func (b *Breaker) Report(err error) { // Update the error. This may overwrite an earlier error, which is fine: // We want the breaker to reflect a recent error as this is more helpful. - storeErr := errors.Mark(errors.Mark(err, ErrBreakerOpen), markErr) + storeErr := errors.Mark(errors.Mark(err, ErrBreakerOpen), b.errMark()) // When the Breaker first trips, we populate the error and close the channel. // When the error changes, we have to replace errAndCh wholesale (that's the