Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] kvserver: crash once and cordon replica in case of an apply fatal error #78092

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ var (
// localStoreIdentSuffix stores an immutable identifier for this
// store, created when the store is first bootstrapped.
localStoreIdentSuffix = []byte("iden")
// localStoreCordonRangeSuffix stores the range ID of a range that should
// be cordoned off on this store.
localStoreCordonRangeSuffix = []byte("cran")
// localStoreNodeTombstoneSuffix stores key value pairs that map
// nodeIDs to time of removal from cluster.
localStoreNodeTombstoneSuffix = []byte("ntmb")
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ var _ = [...]interface{}{
StoreGossipKey, // "goss"
StoreHLCUpperBoundKey, // "hlcu"
StoreIdentKey, // "iden"
StoreCordonRangeKey, // "cran"
StoreNodeTombstoneKey, // "ntmb"
StoreLastUpKey, // "uptm"
StoreCachedSettingsKey, // "stng"
Expand Down
6 changes: 6 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func StoreIdentKey() roachpb.Key {
return MakeStoreKey(localStoreIdentSuffix, nil)
}

// StoreCordonRangeKey returns a store-local key that records the
// range ID of a range that should be cordoned off on this store.
func StoreCordonRangeKey() roachpb.Key {
return MakeStoreKey(localStoreCordonRangeSuffix, nil)
}

// StoreGossipKey returns a store-local key for the gossip bootstrap metadata.
func StoreGossipKey() roachpb.Key {
return MakeStoreKey(localStoreGossipSuffix, nil)
Expand Down
94 changes: 94 additions & 0 deletions pkg/kv/kvserver/cordon_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package kvserver_test

import (
"bytes"
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestCordonIfPanicDuringApply(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)

ctx := context.Background()

dir, cleanupFn := testutils.TempDir(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use this Sticky stuff, you don't even need to touch disk (which makes this test a lot faster especially when stressing):

stickyEngineRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyEngineRegistry.CloseAllStickyInMemEngines()
const numServers int = 3
stickyServerArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numServers; i++ {
stickyServerArgs[i] = base.TestServerArgs{
CacheSize: 1 << 20, /* 1 MiB */
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh cool!

defer cleanupFn()

testArgs := func(dontPanicOnApplyPanicOrFatalError bool) base.TestServerArgs {
return base.TestServerArgs{
Settings: cluster.MakeClusterSettings(),
// We will start up a second server after stopping the first one to
// simulate a server restart, since cordoning in response to a panic
// during apply only takes effect after a server restart. So we use
// the same store for the two servers (they are the same node). This
// way, the should cordon write to storage done by the first server is
// seen by the second server at startup.
StoreSpecs: []base.StoreSpec{{Path: dir + "/store-1"}},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DontPanicOnApplyPanicOrFatalError: dontPanicOnApplyPanicOrFatalError,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The need for this awkward testing knob would disappear once we trigger the panic using a special request as suggested above.

// Simulate a panic during apply!
TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
for _, ru := range args.Req.Requests {
key := ru.GetInner().Header().Key
// The time-series range is continuously written to.
if bytes.HasPrefix(key, keys.TimeseriesPrefix) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly trigger the problem from a test by special-casing a request here. For example,

if bytes.HasSuffix(key, "boom") { ... }

then you should be able to inject panics on any range you like by doing a put on append(desc.StartKey.AsRawKey(), "boom"...).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

panic("boom")
}
}
return 0, nil
},
},
},
}
}

s, _, kvDB := serverutils.StartServer(t,
// In production, the first time the panic at apply time is experienced, we expect the
// server to mark the replica as to be cordoned and crash after re-throwing the panic.
// In this test, we expect the server to mark the replicas as to be cordoned but also
// to NOT re-throw the panic. Else the test would fail due to a uncaught panic.
testArgs(true /* dontPanicOnApplyPanicOrFatalError */))

// TODO(josh): This is gnarly. Can we force the scheduler to run on the range with the
// apply time panic, instead of sleeping here?
time.Sleep(10 * time.Second)
s.Stopper().Stop(ctx)

s, _, kvDB = serverutils.StartServer(t,
// On the second run of the server, we don't expect a panic, as on startup, the replica
// should be cordoned. All raft machinery should stop, so the TestingApplyFilter up above
// shouldn't run.
testArgs(false /* dontPanicOnApplyPanicOrFatalError */))
defer s.Stopper().Stop(ctx)

time.Sleep(10 * time.Second)

// On the second run of the server, we expect requests to the cordoned range to fail fast.
//
// Note that if this was a three node cluster, and if only one replica was failing to apply
// some entry, we'd expect that one replica to cordon on restart, which would imply shedding
// the lease. As a result, we'd expect reads & writes to the range to succeed, rather than
// fail fast.
// TODO(josh): Test behavior on a three node cluster.
_, err := kvDB.Get(ctx, keys.TimeseriesPrefix)
require.Error(t, err)
require.Regexp(t, "cordoned", err.Error())
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,11 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
}
}

// TODO(josh): Remove.
if b.r.RangeID == 54 {
panic("josh second boom")
}

// Apply the write batch to RockDB. Entry application is done without
// syncing to disk. The atomicity guarantees of the batch and the fact that
// the applied state is stored in this batch, ensure that if the batch ends
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ func (r *Replica) sendWithRangeID(
r.leaseholderStats.record(ba.Header.GatewayNodeID)
}

// Fail fast in case the replica is cordoned. Note that cordoning only takes
// effect post server restart. Since cordoning keeps the raft scheduler from
// scheduling the replica, and since post restart the node can't resume its
// existing leases, this code will only run if so many replicas are cordoned
// that no node can get a valid lease.
// TODO(josh): Should we be using the circuit breaker that Tobias introduced
// instead? It allow follower reads when possible. See below:
// https://github.com/cockroachdb/cockroach/pull/76858
if r.store.cordoned[_forStacks] {
return nil, roachpb.NewError(errors.Newf("range %v is cordoned", _forStacks))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final code will probably want to call r.replicaUnavailableError(...) here to make a nice error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Should we be using your circuit breaker here? See comment up above about rationale. TLDR: I guess even with two replicas cordoned, we could still serve follower reads.

}

// Add the range log tag.
ctx = r.AnnotateCtx(ctx)

Expand Down
20 changes: 19 additions & 1 deletion pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type raftScheduler struct {
processor raftProcessor
latency *metric.Histogram
numWorkers int
cordoned map[roachpb.RangeID]bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be better as a new state for replica.mu.destroyStatus:

const (
// The replica is alive.
destroyReasonAlive DestroyReason = iota
// The replica has been GCed or is in the process of being synchronously
// removed.
destroyReasonRemoved
// The replica has been merged into its left-hand neighbor, but its left-hand
// neighbor hasn't yet subsumed it.
destroyReasonMergePending
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this state should be on the replica. Don't know all the implications of destroyStatus off the bat.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check this out!


mu struct {
syncutil.Mutex
Expand All @@ -181,16 +182,22 @@ type raftScheduler struct {
}

func newRaftScheduler(
ambient log.AmbientContext, metrics *StoreMetrics, processor raftProcessor, numWorkers int,
ambient log.AmbientContext,
metrics *StoreMetrics,
processor raftProcessor,
numWorkers int,
cordoned map[roachpb.RangeID]bool,
) *raftScheduler {
s := &raftScheduler{
ambientContext: ambient,
processor: processor,
latency: metrics.RaftSchedulerLatency,
numWorkers: numWorkers,
cordoned: cordoned,
}
s.mu.cond = sync.NewCond(&s.mu.Mutex)
s.mu.state = make(map[roachpb.RangeID]raftScheduleState)

return s
}

Expand Down Expand Up @@ -272,6 +279,17 @@ func (s *raftScheduler) worker(ctx context.Context) {
s.mu.cond.Wait()
}

// TODO(josh): I have no idea if this is a reasonable way to actually
// implement the cordoning!!!
// TODO(josh): Let's say a single replica of three is cordoned. Will the
// per-replica proposal quota hold back writes to the range? That is not
// desirable! Some relevant discussion:
// https://github.com/cockroachdb/cockroach/issues/77251
if s.cordoned[id] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the destroyStatus is a good way to streamline this, as "destroyed" (for a suitable definition, which will need to be expanded) are never raft handled:

func (r *Replica) withRaftGroupLocked(
mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error),
) error {
if r.mu.destroyStatus.Removed() {
// Callers know to detect errRemoved as non-fatal.
return errRemoved
}

Instead of Removed() we'd want a method Usable() error or something, which would either return nil, errRemoved, or errCordoned.

log.Warningf(ctx, "range %v is cordoned", id)
continue
}

// Grab and clear the existing state for the range ID. Note that we leave
// the range ID marked as "queued" so that a concurrent Enqueue* will not
// queue the range ID again.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestSchedulerLoop(t *testing.T) {

m := newStoreMetrics(metric.TestSampleInterval)
p := newTestProcessor()
s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1)
s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1, nil)

s.Start(stopper)
s.EnqueueRaftTicks(1, 2, 3)
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestSchedulerBuffering(t *testing.T) {

m := newStoreMetrics(metric.TestSampleInterval)
p := newTestProcessor()
s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1)
s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1, nil)
s.Start(stopper)

testCases := []struct {
Expand Down
32 changes: 31 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import (
"github.com/cockroachdb/redact"
raft "go.etcd.io/etcd/raft/v3"
"golang.org/x/time/rate"
"gopkg.in/yaml.v2"
)

const (
Expand Down Expand Up @@ -903,6 +904,7 @@ type Store struct {
// Store.mu.replicas.
replicaQueues syncutil.IntMap // map[roachpb.RangeID]*raftRequestQueue

cordoned map[roachpb.RangeID]bool
scheduler *raftScheduler

// livenessMap is a map from nodeID to a bool indicating
Expand Down Expand Up @@ -1159,8 +1161,15 @@ func NewStore(
}
s.replRankings = newReplicaRankings()

var err error
s.cordoned, err = replicasToCordon(ctx, eng)
if err != nil {
// TODO(josh): Is it okay to fatal here in case the read to storage fails?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the final code the cordoning will be decided on a per-replica basis when it is instantiated, so you wouldn't need this code.
If we needed this code, we'd want proper error handling.

log.Fatalf(ctx, "couldn't determine which replicas if any to cordon: %v", err)
}

s.draining.Store(false)
s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency)
s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency, s.cordoned)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With destroyStatus in place the raft sched should not need to know about a list of cordoned ranges in the end.


s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize)
s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics())
Expand Down Expand Up @@ -1338,6 +1347,27 @@ func NewStore(
return s
}

func replicasToCordon(ctx context.Context, eng storage.Engine) (map[roachpb.RangeID]bool, error) {
iter := eng.NewEngineIterator(storage.IterOptions{
LowerBound: keys.StoreCordonRangeKey(),
UpperBound: keys.StoreCordonRangeKey().Next(),
})
defer iter.Close()
_, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: keys.StoreCordonRangeKey()})
if err != nil {
return nil, errors.Wrapf(err, "couldn't seek to cordon range key: %v", err)
}
val := iter.Value()
// TODO(josh): For the POC, we only allow cordoning a single range at a time.
var c cordonRecord
// TODO(josh): Use proto instead of yaml.
err = yaml.Unmarshal(val, &c)
if err != nil {
return nil, errors.Wrapf(err, "couldn't unmarshall cordon range record: %v", err)
}
return map[roachpb.RangeID]bool{c.RangeID: true}, nil
}

// String formats a store for debug output.
func (s *Store) String() string {
return redact.StringWithoutMarkers(s)
Expand Down
49 changes: 49 additions & 0 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
"gopkg.in/yaml.v2"
)

type raftRequestInfo struct {
Expand Down Expand Up @@ -503,6 +505,18 @@ func (s *Store) processReady(rangeID roachpb.RangeID) {
}

ctx := r.raftCtx
// If panic (or fatal error since maybeFatalOnRaftReadyErr converts
// these into panics), we mark the replica for cordoning and crash.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a really good approach. When you recover from panics, you always have to worry about what state you're leaving the system in. Since raft processing may also acquire store mutexes and change store data structures (splits/merges), there could be cases in which we recover the panic and cordon the replica, but damage has really been done store-wide. Crashing once elegantly avoids this problem.
I think this is worth as a prominent comment somewhere in the final version as it highlights how avoiding the crash is a bigger undertaking.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

// On startup, the raft scheduler won't schedule the replica.
// Crashing will also shed the lease if the node happens to have it (the node
// will have a new liveness epoch so it won't be able to resume any existing
// leases). If enough replicas for some range are cordoned, the range will
// clearly be unavailable, but the nodes holding those replicas will be
// healthy otherwise, rather than crashing continuously, which is an improvement
// over the status quo in terms of the blast radius of the outage. If a single
// replica is cordoned OTOH, the range should remain available.
defer s.markReplicaForCordoningIfPanic(ctx, rangeID)

start := timeutil.Now()
stats, expl, err := r.handleRaftReady(ctx, noSnap)
maybeFatalOnRaftReadyErr(ctx, expl, err)
Expand All @@ -519,6 +533,41 @@ func (s *Store) processReady(rangeID roachpb.RangeID) {
}
}

type cordonRecord struct {
RangeID roachpb.RangeID
}

func (s *Store) markReplicaForCordoningIfPanic(ctx context.Context, rangeID roachpb.RangeID) {
if re := recover(); re != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's nice to use early-returns to avoid nested ifs, so you could

r := recover()
if r == nil { return }
// rest of code

func() {
log.Errorf(ctx, "cordoning range %v since panic and/or fatal error during application: %v",
rangeID, re)

// TODO(josh): For the POC, we only allow cordoning a single range at a time.
c := cordonRecord{RangeID: rangeID}
// TODO(josh): Use proto instead of yaml.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put the range ID into the key as a suffix. In that case I don't think we need a serialized value at all, nil should do just fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do it the way you suggest, then wouldn't we need to GC a cordonRecord in case a range is removed? If we have a single cordonRecord per store, we maybe we can avoid this complexity? Not sure how desirable that is, but I think it's worth mentioning at least!

Also, if we stick with how I have this now, we can expand that cordonRecord structure to enable N=5 cordons per store max, by making the structure a list of range IDs.

mc, err := yaml.Marshal(c)
if err != nil {
log.Errorf(ctx, "couldn't marshall cordon record: %v", err)
return
}
// TODO(josh): I don't think we need an fsync here? The process will restart,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, an fsync seems unnecessary here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Is there an easy way to tell storage to NOT fsync? I can also look around myself.

// but OS is going to stay up. I bring this up because there has been some
// discussion about the possibility of a read being served after a panic /
// error but before the process restarts. This code increases that time
// window, but perhaps we can skip the fsync to reduce it again.
if err := s.engine.PutUnversioned(keys.StoreCordonRangeKey(), mc); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about this hanging if there is a disk-related issue that caused the apply failure. Since we don't use contexts for disk IO, we should likely spawn off a goroutine with a small timeout and continue to crash if it doesn't make it in time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooo good point! Ack re: context.

log.Errorf(ctx, "couldn't write to cordon range key: %v", err)
return
}
}()
if s.cfg.TestingKnobs.DontPanicOnApplyPanicOrFatalError {
return
}
panic(re)
}
}

func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool {
r, ok := s.mu.replicasByRangeID.Load(rangeID)
if !ok {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ type StoreTestingKnobs struct {
// a forced error and the command will not be applied. If it returns an error
// on some replicas but not others, the behavior is poorly defined. The
// returned int is interpreted as a proposalReevaluationReason.
TestingApplyFilter kvserverbase.ReplicaApplyFilter
TestingApplyFilter kvserverbase.ReplicaApplyFilter
DontPanicOnApplyPanicOrFatalError bool
// TestingApplyForcedErrFilter is like TestingApplyFilter, but it is only
// invoked when there is a pre-existing forced error. The returned int and
// *Error replace the existing proposalReevaluationReason (if initially zero
Expand Down