Skip to content

Commit

Permalink
Merge #76673
Browse files Browse the repository at this point in the history
76673: kvserver: exempt certain requests from circuit breakers r=erikgrinaker a=tbg

While running the nightly roachtest suite with circuit breakers enabled
(#76146) I observed undesirable interactions between circuit breakers
and bulk operations.

Bulk operations tend to overload the cluster. When this happens, circuit
breakers may fire "spuriously" (at the very least if liveness is lost
across multiple nodes for some time), which returns hard errors to the
bulk job. The job will then roll back any work it has done, which may
also fail due to the circuit breaker (leaving the job in limbo).

For long-running bulk jobs, it seems desirable to bypass the circuit
breaker entirely. When unavailability occurs, the job will not be able
to make progress, but it should be left to the operator whether to
cancel it or not; after all, once the outage resolves the job may
be able to resume normally.

This PR adds machinery that allows request types to bypass circuit
breakers. Concretely, any batch that contains either of the following
request types:

- Export
- AddSSTable
- RevertRange
- ClearRange
- GC
- Probe

Touches #33007.

Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 21, 2022
2 parents cc1e630 + 957c6a7 commit 5b3067f
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 56 deletions.
137 changes: 116 additions & 21 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver_test

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -461,6 +465,89 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
readOneVal(t, stream2)
}

func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := setupCircuitBreakerTest(t)
defer tc.Stopper().Stop(context.Background())

// Put the lease on n1 but then trip the breaker with the probe
// disabled, i.e. it will stay tripped.
require.NoError(t, tc.Write(n1))
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("boom"))

exemptRequests := []func() roachpb.Request{
func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} },
func() roachpb.Request {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(context.Background(), cluster.MakeTestingClusterSettings(), sstFile)
defer sst.Close()
require.NoError(t, sst.LogData([]byte("hello")))
require.NoError(t, sst.Finish())

addReq := &roachpb.AddSSTableRequest{
Data: sstFile.Data(),
IngestAsWrites: true,
}
return addReq
},
func() roachpb.Request {
return &roachpb.RevertRangeRequest{TargetTime: tc.Servers[0].Clock().Now()}
},
func() roachpb.Request {
return &roachpb.GCRequest{}
},
func() roachpb.Request {
return &roachpb.ClearRangeRequest{}
},
func() roachpb.Request {
return &roachpb.ProbeRequest{}
},
}

for _, reqFn := range exemptRequests {
req := reqFn()
t.Run(fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) {
require.NoError(t, tc.Send(n1, req))
})
}
for _, reqFn := range exemptRequests {
req := reqFn()
t.Run(fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) {
resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats)
resumeHeartbeats() // intentionally resume right now so that lease can be acquired
require.NoError(t, tc.Send(n1, req))
})
}

resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats)
defer resumeHeartbeats() // can't acquire leases until test ends

for _, reqFn := range exemptRequests {
req := reqFn()
if req.Method() == roachpb.Probe {
// Probe does not require the lease, and is the most-tested of the bunch
// already. We don't have to test it again here, which would require undue
// amounts of special-casing below.
continue
}
t.Run(fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond)
defer cancel()
const maxWait = 5 * time.Second
tBegin := timeutil.Now()
err := tc.SendCtx(ctx, n1, req)
t.Log(err) // usually: [NotLeaseHolderError] lease acquisition canceled because context canceled
require.Error(t, err)
require.Error(t, ctx.Err())
// Make sure we didn't run into the "long" timeout inside of SendCtx but
// actually terminated as a result of our ctx cancelling.
require.Less(t, timeutil.Since(tBegin), maxWait)
})
}
}

// Test infrastructure below.

func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) {
Expand Down Expand Up @@ -629,23 +716,37 @@ func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats boo
}
}

func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Request) error {
func (cbt *circuitBreakerTest) Send(idx int, req roachpb.Request) error {
return cbt.SendCtx(context.Background(), idx, req)

}

func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb.Request) error {
var ba roachpb.BatchRequest
repl := cbt.repls[idx]
ba.RangeID = repl.Desc().RangeID
ba.Timestamp = repl.Clock().Now()
ba.Add(req)
ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration)
if h := req.Header(); len(h.Key) == 0 {
h.Key = repl.Desc().StartKey.AsRawKey()
if roachpb.IsRange(req) {
h.EndKey = repl.Desc().EndKey.AsRawKey()
}
req.SetHeader(h)
}
parCtx := ctx
ctx, cancel := context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration)
defer cancel()
// Tag the breaker with the request. Once Send returns, we'll check that it's
// no longer tracked by the breaker. This gives good coverage that we're not
// going to leak memory.
ctx = context.WithValue(ctx, req, struct{}{})

defer cancel()
_, pErr := repl.Send(ctx, ba)
// If our context got canceled, return an opaque error regardless of presence or
// absence of actual error. This makes sure we don't accidentally pass tests as
// a result of our context cancellation.
if err := ctx.Err(); err != nil {
if err := ctx.Err(); err != nil && parCtx.Err() == nil {
pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr)
}
{
Expand All @@ -665,6 +766,11 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque
return pErr.GoError()
}

func (cbt *circuitBreakerTest) WriteDS(idx int) error {
put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello"))
return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put)
}

func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb.Request) error {
var ba roachpb.BatchRequest
ba.Add(req)
Expand Down Expand Up @@ -696,32 +802,21 @@ func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error)
require.True(t, ok, "%+v", err)
}

func (cbt *circuitBreakerTest) Write(idx int) error {
return cbt.writeViaRepl(cbt.repls[idx].Replica)
}

func (cbt *circuitBreakerTest) WriteDS(idx int) error {
put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello"))
return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put)
}

// SetSlowThreshold sets the SlowReplicationThreshold for requests sent through the
// test harness (i.e. via Write) to the provided duration. The zero value restores
// the default.
func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) {
cbt.slowThresh.Store(dur)
}

func (cbt *circuitBreakerTest) Read(idx int) error {
return cbt.readViaRepl(cbt.repls[idx].Replica)
}

func (cbt *circuitBreakerTest) writeViaRepl(repl *kvserver.Replica) error {
func (cbt *circuitBreakerTest) Write(idx int) error {
repl := cbt.repls[idx]
put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello"))
return cbt.sendViaRepl(repl, put)
return cbt.Send(idx, put)
}

func (cbt *circuitBreakerTest) readViaRepl(repl *kvserver.Replica) error {
func (cbt *circuitBreakerTest) Read(idx int) error {
repl := cbt.repls[idx]
get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */)
return cbt.sendViaRepl(repl, get)
return cbt.Send(idx, get)
}
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/replica_backpressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ func canBackpressureBatch(ba *roachpb.BatchRequest) bool {
return false
}

// bypassReplicaCircuitBreakerForBatch returns whether the provided
// BatchRequest bypasses the per-Replica circuit breaker. This is the
// case if any request in the batch is requesting to do so.
func bypassReplicaCircuitBreakerForBatch(ba *roachpb.BatchRequest) bool {
for _, ru := range ba.Requests {
req := ru.GetInner()
if roachpb.BypassesReplicaCircuitBreaker(req) {
return true
}
}
return false
}

// shouldBackpressureWrites returns whether writes to the range should be
// subject to backpressure. This is based on the size of the range in
// relation to the split size. The method returns true if the range is more
Expand Down
17 changes: 12 additions & 5 deletions pkg/kv/kvserver/replica_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (br *replicaCircuitBreaker) Register(

// TODO(tbg): we may want to exclude more requests from this check, or allow
// requests to exclude themselves from the check (via their header). This
// latter mechanism could also replace isCircuitBreakerProbe.
if isCircuitBreakerProbe(ctx) {
// latter mechanism could also replace hasBypassCircuitBreakerMarker.
if hasBypassCircuitBreakerMarker(ctx) {
// NB: brSig.C() == nil.
brSig = neverTripSignaller{}
}
Expand Down Expand Up @@ -184,6 +184,10 @@ func (br *replicaCircuitBreaker) UnregisterAndAdjustError(
return pErr
}

func (br *replicaCircuitBreaker) HasMark(err error) bool {
return br.wrapped.HasMark(err)
}

func (br *replicaCircuitBreaker) cancelAllTrackedContexts() {
br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) {
cancel()
Expand Down Expand Up @@ -297,11 +301,11 @@ func (r replicaCircuitBreakerLogger) OnReset(br *circuit.Breaker) {

type probeKey struct{}

func isCircuitBreakerProbe(ctx context.Context) bool {
func hasBypassCircuitBreakerMarker(ctx context.Context) bool {
return ctx.Value(probeKey{}) != nil
}

func withCircuitBreakerProbeMarker(ctx context.Context) context.Context {
func withBypassCircuitBreakerMarker(ctx context.Context) context.Context {
return context.WithValue(ctx, probeKey{}, probeKey{})
}

Expand Down Expand Up @@ -330,7 +334,10 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) {
}

func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error {
ctx = withCircuitBreakerProbeMarker(ctx)
// NB: we don't need to put this marker since ProbeRequest has the
// canBypassReplicaCircuitBreaker flag, but if in the future we do
// additional work in this method we may need it.
ctx = withBypassCircuitBreakerMarker(ctx)
desc := r.Desc()
if !desc.IsInitialized() {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestReplicaCircuitBreaker_Register(t *testing.T) {
defer leaktest.AfterTest(t)()
br, stopper := setupCircuitBreakerTest(t, "mutexmap-1")
defer stopper.Stop(context.Background())
ctx := withCircuitBreakerProbeMarker(context.Background())
ctx := withBypassCircuitBreakerMarker(context.Background())
tok, sig, err := br.Register(ctx, func() {})
require.NoError(t, err)
defer br.UnregisterAndAdjustError(tok, sig, nil /* pErr */)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,7 @@ func (r *Replica) refreshProposalsLocked(
}

var maxSlowProposalDurationRequest *roachpb.BatchRequest
// TODO(tbg): don't track exempt requests for tripping the breaker?
var maxSlowProposalDuration time.Duration
var slowProposalCount int64
var reproposals pendingCmdSlice
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,20 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// coalesced requests timeout/cancel. p.cancelLocked (defined below) is the
// cancel function that must be called; calling just cancel is insufficient.
ctx := p.repl.AnnotateCtx(context.Background())
if hasBypassCircuitBreakerMarker(ctx) {
// If the caller bypasses the circuit breaker, allow the lease to do the
// same. Otherwise, the lease will be refused by the circuit breaker as
// well.
//
// Note that there is a tiny race: if a request is in flight, but the
// request that triggered it (i.e. parentCtx here) does *not* bypass the
// probe, and before the circuit breaker rejects the inflight lease another
// request that *does* want to bypass the probe joins the request, it too
// will receive the circuit breaker error. This is special-cased in
// `redirectOnOrAcquireLease`, where such a caller needs to retry instead of
// propagating the error.
ctx = withBypassCircuitBreakerMarker(ctx)
}
const opName = "request range lease"
tr := p.repl.AmbientContext.Tracer
tagsOpt := tracing.WithLogTags(logtags.FromContext(parentCtx))
Expand Down Expand Up @@ -1112,6 +1126,11 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat
func (r *Replica) redirectOnOrAcquireLeaseForRequest(
ctx context.Context, reqTS hlc.Timestamp,
) (kvserverpb.LeaseStatus, *roachpb.Error) {
if hasBypassCircuitBreakerMarker(ctx) {
defer func() {
log.Infof(ctx, "hello")
}()
}
// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
{
Expand Down Expand Up @@ -1209,6 +1228,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
}
if llHandle == nil {
// We own a valid lease.
log.Eventf(ctx, "valid lease %+v", status)
return status, nil
}

Expand Down Expand Up @@ -1244,6 +1264,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// cannot be reproposed so we get this ambiguity.
// We'll just loop around.
return nil
case r.breaker.HasMark(goErr) && hasBypassCircuitBreakerMarker(ctx):
// If this request wanted to bypass the circuit breaker but still got a
// breaker error back, it joined a lease request started by an operation
// that did not bypass circuit breaker errors. Loop around and try again.
// See requestLeaseAsync for details.
return nil
case errors.HasType(goErr, (*roachpb.LeaseRejectedError)(nil)):
var tErr *roachpb.LeaseRejectedError
errors.As(goErr, &tErr)
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (r *Replica) sendWithoutRangeID(

// Circuit breaker handling.
ctx, cancel := context.WithCancel(ctx)
if bypassReplicaCircuitBreakerForBatch(ba) {
ctx = withBypassCircuitBreakerMarker(ctx)
}
tok, brSig, err := r.breaker.Register(ctx, cancel)
if err != nil {
return nil, roachpb.NewError(err)
Expand Down
Loading

0 comments on commit 5b3067f

Please sign in to comment.