From 05f4483208e2f0f4ab7086d7736c8a2d1e61d4e3 Mon Sep 17 00:00:00 2001 From: Josh Imhoff Date: Fri, 18 Mar 2022 12:51:01 -0400 Subject: [PATCH] kvserver: crash once and cordon replica in case of an apply fatal error This commit is a POC of an approach to reducing blast radius of panics or fatal errors that happen during application. Without this commit, kvserver repeatedly restarts. Repeateldy restarting means a cluster wide impact on reliability. It also makes debugging hard, as tools like the admin UI are affected by the restarts. We especially note the impact on serverless. Repeatedly restarting means that an apply fatal error results in an outage that affects multiple tenants, even if the range with the apply fatal error is in a single tenant's keyspace (which is likely). With this commit, kvserver crashes a single time and cordons the replica at start up time. Cordoning means the raft scheduler doesn't schedule the replica. Since cordoning happens post restart, and since a node can't resume its leases post restart, the node will shed its lease of the cordoned range. As a result, if only one replica of 3+ experiences an apply fatal error, the range will stay available. If a quorum experience an apply fatal error on the other hand, the range will become unavailable. Reads and writes to the range will fail fast, rather than hang, to improve the user experience. There has been some discussion about removing the replica in case a single replica experiences an apply fatal error. This commit doesn't chew that off. Even without that, this commit is an improvement over the status quo. This commit introduces an integration test that tests the behavior of a one node CRDB cluster in face of an apply fatal error. I have yet to test the behavor of a multi-node cluster. Release note: None. Release note (): --- pkg/keys/constants.go | 3 + pkg/keys/doc.go | 1 + pkg/keys/keys.go | 6 ++ pkg/kv/kvserver/cordon_integration_test.go | 94 +++++++++++++++++++ .../replica_application_state_machine.go | 5 + pkg/kv/kvserver/replica_send.go | 12 +++ pkg/kv/kvserver/scheduler.go | 20 +++- pkg/kv/kvserver/scheduler_test.go | 4 +- pkg/kv/kvserver/store.go | 32 ++++++- pkg/kv/kvserver/store_raft.go | 49 ++++++++++ pkg/kv/kvserver/testing_knobs.go | 3 +- 11 files changed, 224 insertions(+), 5 deletions(-) create mode 100644 pkg/kv/kvserver/cordon_integration_test.go diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 4df92b77c917..2731c7a5d20d 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -160,6 +160,9 @@ var ( // localStoreIdentSuffix stores an immutable identifier for this // store, created when the store is first bootstrapped. localStoreIdentSuffix = []byte("iden") + // localStoreCordonRangeSuffix stores the range ID of a range that should + // be cordoned off on this store. + localStoreCordonRangeSuffix = []byte("cran") // localStoreNodeTombstoneSuffix stores key value pairs that map // nodeIDs to time of removal from cluster. localStoreNodeTombstoneSuffix = []byte("ntmb") diff --git a/pkg/keys/doc.go b/pkg/keys/doc.go index 7d1718b9c54d..1561e1ff86c6 100644 --- a/pkg/keys/doc.go +++ b/pkg/keys/doc.go @@ -218,6 +218,7 @@ var _ = [...]interface{}{ StoreGossipKey, // "goss" StoreHLCUpperBoundKey, // "hlcu" StoreIdentKey, // "iden" + StoreCordonRangeKey, // "cran" StoreNodeTombstoneKey, // "ntmb" StoreLastUpKey, // "uptm" StoreCachedSettingsKey, // "stng" diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index dea29eaa0fdc..38159326e22e 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -55,6 +55,12 @@ func StoreIdentKey() roachpb.Key { return MakeStoreKey(localStoreIdentSuffix, nil) } +// StoreCordonRangeKey returns a store-local key that records the +// range ID of a range that should be cordoned off on this store. +func StoreCordonRangeKey() roachpb.Key { + return MakeStoreKey(localStoreCordonRangeSuffix, nil) +} + // StoreGossipKey returns a store-local key for the gossip bootstrap metadata. func StoreGossipKey() roachpb.Key { return MakeStoreKey(localStoreGossipSuffix, nil) diff --git a/pkg/kv/kvserver/cordon_integration_test.go b/pkg/kv/kvserver/cordon_integration_test.go new file mode 100644 index 000000000000..0085996e3b41 --- /dev/null +++ b/pkg/kv/kvserver/cordon_integration_test.go @@ -0,0 +1,94 @@ +package kvserver_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestCordonIfPanicDuringApply(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderShort(t) + + ctx := context.Background() + + dir, cleanupFn := testutils.TempDir(t) + defer cleanupFn() + + testArgs := func(dontPanicOnApplyPanicOrFatalError bool) base.TestServerArgs { + return base.TestServerArgs{ + Settings: cluster.MakeClusterSettings(), + // We will start up a second server after stopping the first one to + // simulate a server restart, since cordoning in response to a panic + // during apply only takes effect after a server restart. So we use + // the same store for the two servers (they are the same node). This + // way, the should cordon write to storage done by the first server is + // seen by the second server at startup. + StoreSpecs: []base.StoreSpec{{Path: dir + "/store-1"}}, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DontPanicOnApplyPanicOrFatalError: dontPanicOnApplyPanicOrFatalError, + // Simulate a panic during apply! + TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { + for _, ru := range args.Req.Requests { + key := ru.GetInner().Header().Key + // The time-series range is continuously written to. + if bytes.HasPrefix(key, keys.TimeseriesPrefix) { + panic("boom") + } + } + return 0, nil + }, + }, + }, + } + } + + s, _, kvDB := serverutils.StartServer(t, + // In production, the first time the panic at apply time is experienced, we expect the + // server to mark the replica as to be cordoned and crash after re-throwing the panic. + // In this test, we expect the server to mark the replicas as to be cordoned but also + // to NOT re-throw the panic. Else the test would fail due to a uncaught panic. + testArgs(true /* dontPanicOnApplyPanicOrFatalError */)) + + // TODO(josh): This is gnarly. Can we force the scheduler to run on the range with the + // apply time panic, instead of sleeping here? + time.Sleep(10 * time.Second) + s.Stopper().Stop(ctx) + + s, _, kvDB = serverutils.StartServer(t, + // On the second run of the server, we don't expect a panic, as on startup, the replica + // should be cordoned. All raft machinery should stop, so the TestingApplyFilter up above + // shouldn't run. + testArgs(false /* dontPanicOnApplyPanicOrFatalError */)) + defer s.Stopper().Stop(ctx) + + time.Sleep(10 * time.Second) + + // On the second run of the server, we expect requests to the cordoned range to fail fast. + // + // Note that if this was a three node cluster, and if only one replica was failing to apply + // some entry, we'd expect that one replica to cordon on restart, which would imply shedding + // the lease. As a result, we'd expect reads & writes to the range to succeed, rather than + // fail fast. + // TODO(josh): Test behavior on a three node cluster. + _, err := kvDB.Get(ctx, keys.TimeseriesPrefix) + require.Error(t, err) + require.Regexp(t, "cordoned", err.Error()) +} diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 4a9fbcc52283..6fddc2172dde 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -870,6 +870,11 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { } } + // TODO(josh): Remove. + if b.r.RangeID == 54 { + panic("josh second boom") + } + // Apply the write batch to RockDB. Entry application is done without // syncing to disk. The atomicity guarantees of the batch and the fact that // the applied state is stored in this batch, ensure that if the batch ends diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 86956fb884ea..557fb51512f2 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -115,6 +115,18 @@ func (r *Replica) sendWithRangeID( r.leaseholderStats.record(ba.Header.GatewayNodeID) } + // Fail fast in case the replica is cordoned. Note that cordoning only takes + // effect post server restart. Since cordoning keeps the raft scheduler from + // scheduling the replica, and since post restart the node can't resume its + // existing leases, this code will only run if so many replicas are cordoned + // that no node can get a valid lease. + // TODO(josh): Should we be using the circuit breaker that Tobias introduced + // instead? It allow follower reads when possible. See below: + // https://github.com/cockroachdb/cockroach/pull/76858 + if r.store.cordoned[_forStacks] { + return nil, roachpb.NewError(errors.Newf("range %v is cordoned", _forStacks)) + } + // Add the range log tag. ctx = r.AnnotateCtx(ctx) diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index 9a2712c0e6c5..2a6c054836d5 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -168,6 +168,7 @@ type raftScheduler struct { processor raftProcessor latency *metric.Histogram numWorkers int + cordoned map[roachpb.RangeID]bool mu struct { syncutil.Mutex @@ -181,16 +182,22 @@ type raftScheduler struct { } func newRaftScheduler( - ambient log.AmbientContext, metrics *StoreMetrics, processor raftProcessor, numWorkers int, + ambient log.AmbientContext, + metrics *StoreMetrics, + processor raftProcessor, + numWorkers int, + cordoned map[roachpb.RangeID]bool, ) *raftScheduler { s := &raftScheduler{ ambientContext: ambient, processor: processor, latency: metrics.RaftSchedulerLatency, numWorkers: numWorkers, + cordoned: cordoned, } s.mu.cond = sync.NewCond(&s.mu.Mutex) s.mu.state = make(map[roachpb.RangeID]raftScheduleState) + return s } @@ -272,6 +279,17 @@ func (s *raftScheduler) worker(ctx context.Context) { s.mu.cond.Wait() } + // TODO(josh): I have no idea if this is a reasonable way to actually + // implement the cordoning!!! + // TODO(josh): Let's say a single replica of three is cordoned. Will the + // per-replica proposal quota hold back writes to the range? That is not + // desirable! Some relevant discussion: + // https://github.com/cockroachdb/cockroach/issues/77251 + if s.cordoned[id] { + log.Warningf(ctx, "range %v is cordoned", id) + continue + } + // Grab and clear the existing state for the range ID. Note that we leave // the range ID marked as "queued" so that a concurrent Enqueue* will not // queue the range ID again. diff --git a/pkg/kv/kvserver/scheduler_test.go b/pkg/kv/kvserver/scheduler_test.go index a8f861c98436..9161d03ab454 100644 --- a/pkg/kv/kvserver/scheduler_test.go +++ b/pkg/kv/kvserver/scheduler_test.go @@ -236,7 +236,7 @@ func TestSchedulerLoop(t *testing.T) { m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1) + s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1, nil) s.Start(stopper) s.EnqueueRaftTicks(1, 2, 3) @@ -264,7 +264,7 @@ func TestSchedulerBuffering(t *testing.T) { m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1) + s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1, nil) s.Start(stopper) testCases := []struct { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 518ded66944a..8b5a90e3ed29 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -80,6 +80,7 @@ import ( "github.com/cockroachdb/redact" raft "go.etcd.io/etcd/raft/v3" "golang.org/x/time/rate" + "gopkg.in/yaml.v2" ) const ( @@ -903,6 +904,7 @@ type Store struct { // Store.mu.replicas. replicaQueues syncutil.IntMap // map[roachpb.RangeID]*raftRequestQueue + cordoned map[roachpb.RangeID]bool scheduler *raftScheduler // livenessMap is a map from nodeID to a bool indicating @@ -1159,8 +1161,15 @@ func NewStore( } s.replRankings = newReplicaRankings() + var err error + s.cordoned, err = replicasToCordon(ctx, eng) + if err != nil { + // TODO(josh): Is it okay to fatal here in case the read to storage fails? + log.Fatalf(ctx, "couldn't determine which replicas if any to cordon: %v", err) + } + s.draining.Store(false) - s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency) + s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency, s.cordoned) s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize) s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) @@ -1338,6 +1347,27 @@ func NewStore( return s } +func replicasToCordon(ctx context.Context, eng storage.Engine) (map[roachpb.RangeID]bool, error) { + iter := eng.NewEngineIterator(storage.IterOptions{ + LowerBound: keys.StoreCordonRangeKey(), + UpperBound: keys.StoreCordonRangeKey().Next(), + }) + defer iter.Close() + _, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: keys.StoreCordonRangeKey()}) + if err != nil { + return nil, errors.Wrapf(err, "couldn't seek to cordon range key: %v", err) + } + val := iter.Value() + // TODO(josh): For the POC, we only allow cordoning a single range at a time. + var c cordonRecord + // TODO(josh): Use proto instead of yaml. + err = yaml.Unmarshal(val, &c) + if err != nil { + return nil, errors.Wrapf(err, "couldn't unmarshall cordon range record: %v", err) + } + return map[roachpb.RangeID]bool{c.RangeID: true}, nil +} + // String formats a store for debug output. func (s *Store) String() string { return redact.StringWithoutMarkers(s) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 89a70537990a..a815ab6ea7ec 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -15,6 +15,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3/raftpb" + "gopkg.in/yaml.v2" ) type raftRequestInfo struct { @@ -503,6 +505,18 @@ func (s *Store) processReady(rangeID roachpb.RangeID) { } ctx := r.raftCtx + // If panic (or fatal error since maybeFatalOnRaftReadyErr converts + // these into panics), we mark the replica for cordoning and crash. + // On startup, the raft scheduler won't schedule the replica. + // Crashing will also shed the lease if the node happens to have it (the node + // will have a new liveness epoch so it won't be able to resume any existing + // leases). If enough replicas for some range are cordoned, the range will + // clearly be unavailable, but the nodes holding those replicas will be + // healthy otherwise, rather than crashing continuously, which is an improvement + // over the status quo in terms of the blast radius of the outage. If a single + // replica is cordoned OTOH, the range should remain available. + defer s.markReplicaForCordoningIfPanic(ctx, rangeID) + start := timeutil.Now() stats, expl, err := r.handleRaftReady(ctx, noSnap) maybeFatalOnRaftReadyErr(ctx, expl, err) @@ -519,6 +533,41 @@ func (s *Store) processReady(rangeID roachpb.RangeID) { } } +type cordonRecord struct { + RangeID roachpb.RangeID +} + +func (s *Store) markReplicaForCordoningIfPanic(ctx context.Context, rangeID roachpb.RangeID) { + if re := recover(); re != nil { + func() { + log.Errorf(ctx, "cordoning range %v since panic and/or fatal error during application: %v", + rangeID, re) + + // TODO(josh): For the POC, we only allow cordoning a single range at a time. + c := cordonRecord{RangeID: rangeID} + // TODO(josh): Use proto instead of yaml. + mc, err := yaml.Marshal(c) + if err != nil { + log.Errorf(ctx, "couldn't marshall cordon record: %v", err) + return + } + // TODO(josh): I don't think we need an fsync here? The process will restart, + // but OS is going to stay up. I bring this up because there has been some + // discussion about the possibility of a read being served after a panic / + // error but before the process restarts. This code increases that time + // window, but perhaps we can skip the fsync to reduce it again. + if err := s.engine.PutUnversioned(keys.StoreCordonRangeKey(), mc); err != nil { + log.Errorf(ctx, "couldn't write to cordon range key: %v", err) + return + } + }() + if s.cfg.TestingKnobs.DontPanicOnApplyPanicOrFatalError { + return + } + panic(re) + } +} + func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool { r, ok := s.mu.replicasByRangeID.Load(rangeID) if !ok { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 9ec3168eecb3..9060b76837d8 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -64,7 +64,8 @@ type StoreTestingKnobs struct { // a forced error and the command will not be applied. If it returns an error // on some replicas but not others, the behavior is poorly defined. The // returned int is interpreted as a proposalReevaluationReason. - TestingApplyFilter kvserverbase.ReplicaApplyFilter + TestingApplyFilter kvserverbase.ReplicaApplyFilter + DontPanicOnApplyPanicOrFatalError bool // TestingApplyForcedErrFilter is like TestingApplyFilter, but it is only // invoked when there is a pre-existing forced error. The returned int and // *Error replace the existing proposalReevaluationReason (if initially zero