Skip to content

Commit

Permalink
storage: smarten logic to wait for split trigger
Browse files Browse the repository at this point in the history
We previously "waited" for the split trigger quite literally, by
dropping an outgoing rejecting MsgAppResp for a few seconds. The
downside of doing so was that it would also artificially delay Raft
snapshots that were actually required. With the introduction of a
mechanism that delays split that would cause Raft snapshots this is a
lesser problem than before, but it's still ugly.

The revamped mechanism in this commit goes the extra mile to make
an informed decision on whether a split trigger is expected to apply:

1. when sending MsgApp to a follower that is being probed, send the
start key along with the message
2. when receiving said MsgApp on an uninitialized replica, check whether
the store has a replica that currently owns the start key. This replica
is the one in charge of applying the split trigger,
3. unless it's waiting for GC (so trigger a check for that).

There's also a time-based escape hatch to avoid delaying snapshots
indefinitely should there be a flaw in the above logic.

Release note: None
  • Loading branch information
tbg committed Dec 14, 2018
1 parent 46de6b8 commit 6eec1ff
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 180 deletions.
29 changes: 7 additions & 22 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ var (
// will send to a follower without hearing a response.
defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64)

defaultRaftPostSplitSuppressSnapshotTicks = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_POST_SPLIT_SUPPRESS_SNAPSHOT_TICKS", 20)
)

type lazyHTTPClient struct {
Expand Down Expand Up @@ -490,12 +487,6 @@ type RaftConfig struct {
// single raft.Ready operation.
RaftMaxInflightMsgs int

// When a Replica with an empty log (i.e. last index zero), drop rejecting
// MsgAppResp for the first few ticks to allow the split trigger to perform
// the split.
//
// -1 to disable.
RaftPostSplitSuppressSnapshotTicks int
// Splitting a range which has a replica needing a snapshot results in two
// ranges in that state. The delay configured here slows down splits when in
// that situation (limiting to those splits not run through the split
Expand Down Expand Up @@ -542,20 +533,14 @@ func (cfg *RaftConfig) SetDefaults() {
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
}

if cfg.RaftPostSplitSuppressSnapshotTicks == 0 {
cfg.RaftPostSplitSuppressSnapshotTicks = defaultRaftPostSplitSuppressSnapshotTicks
}

if cfg.RaftDelaySplitToSuppressSnapshotTicks == 0 {
// The Raft Ticks interval defaults to 200ms, and
// RaftPostSplitSuppressSnapshotTicks to 20 ticks. A total of 120 ticks is
// ~24s which experimentally has been shown to allow the small pile (<100)
// of Raft snapshots observed at the beginning of an import/restore to be
// resolved.
cfg.RaftDelaySplitToSuppressSnapshotTicks = 100
if cfg.RaftPostSplitSuppressSnapshotTicks > 0 {
cfg.RaftDelaySplitToSuppressSnapshotTicks += cfg.RaftPostSplitSuppressSnapshotTicks
}
// The Raft Ticks interval defaults to 200ms, and an election is 15
// ticks. Add a generous amount of ticks to make sure even a backed up
// Raft snapshot queue is going to make progress when a (not overly
// concurrent) amount of splits happens.
// The resulting delay configured here is north of 20s by default, which
// experimentally has shown to be enough.
cfg.RaftDelaySplitToSuppressSnapshotTicks = 3*cfg.RaftElectionTimeoutTicks + 60
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,9 @@ func TestSplitTriggerRaftSnapshotRace(t *testing.T) {
ctx := context.Background()
const numNodes = 3
var args base.TestClusterArgs
args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{DisableMergeQueue: true}
// NB: the merge queue is enabled for additional "chaos". Note that the test
// uses three nodes and so there is no replica movement, which would other-
// wise tickle Raft snapshots for unrelated reasons.
tc := testcluster.StartTestCluster(t, numNodes, args)
defer tc.Stopper().Stop(ctx)

Expand Down
196 changes: 121 additions & 75 deletions pkg/storage/raft.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/storage/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ message RaftMessageRequest {
optional uint64 range_id = 1 [(gogoproto.nullable) = false,
(gogoproto.customname) = "RangeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"];
// Optionally, the start key of the sending replica. This is only populated
// as a "hint" under certain conditions.
optional bytes range_start_key = 8 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RKey"];

optional roachpb.ReplicaDescriptor from_replica = 2 [(gogoproto.nullable) = false];
optional roachpb.ReplicaDescriptor to_replica = 3 [(gogoproto.nullable) = false];
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"math/rand"
"net"
"reflect"
"testing"
"time"

Expand All @@ -36,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/raftpb"
)
Expand Down Expand Up @@ -350,7 +350,8 @@ func TestSendAndReceive(t *testing.T) {
if !transports[storeNodes[fromStoreID]].SendAsync(expReq) {
t.Errorf("unable to send message from %d to %d", fromStoreID, toStoreID)
}
if req := <-channels[toStoreID].ch; !proto.Equal(req, expReq) {
// NB: proto.Equal will panic here since it doesn't know about `gogoproto.casttype`.
if req := <-channels[toStoreID].ch; !reflect.DeepEqual(req, expReq) {
t.Errorf("got unexpected message %v on channel %d", req, toStoreID)
}

Expand Down
93 changes: 23 additions & 70 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4995,11 +4995,8 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag
lastAppResp = message
drop = true
}

if r.maybeDropMsgAppResp(ctx, message) {
drop = true
}
}

if !drop {
r.sendRaftMessage(ctx, message)
}
Expand All @@ -5009,73 +5006,12 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag
}
}

// maybeDropMsgAppResp returns true if the outgoing Raft message should be
// dropped. It does so if sending the message would likely result in an errant
// Raft snapshot after a split.
func (r *Replica) maybeDropMsgAppResp(ctx context.Context, msg raftpb.Message) bool {
if !msg.Reject {
return false
}

r.mu.RLock()
ticks := r.mu.ticks
initialized := r.isInitializedRLocked()
r.mu.RUnlock()

if initialized {
return false
}

if ticks > r.store.cfg.RaftPostSplitSuppressSnapshotTicks {
log.Infof(
ctx,
"allowing MsgAppResp for uninitialized replica (%d > %d ticks)",
ticks,
r.store.cfg.RaftPostSplitSuppressSnapshotTicks,
)
return false
}

if msg.RejectHint != 0 {
log.Fatalf(ctx, "received reject hint %d from supposedly uninitialized replica", msg.RejectHint)
}

// This replica has a blank state, i.e. its last index is zero (because we
// start our Raft log at index 10). In particular, it's not a preemptive
// snapshot. This happens in two cases:
//
// 1. a rebalance operation is adding a new replica of the range to this
// node. We always send a preemptive snapshot before attempting to do so, so
// we wouldn't enter this branch as the replica would be initialized. We
// would however enter this branch if the preemptive snapshot got GC'ed
// before the actual replica change came through.
//
// 2. a split executed that created this replica as its right hand side, but
// this node's pre-split replica hasn't executed the split trigger (yet).
// The expectation is that it will do so momentarily, however if we don't
// drop this rejection, the Raft leader will try to catch us up via a
// snapshot. In 99.9% of cases this is a wasted effort since the pre-split
// replica already contains the data this replica will hold. The remaining
// 0.01% constitute the case in which our local replica of the pre-split
// range requires a snapshot which catches it up "past" the split trigger,
// in which case the trigger will never be executed (the snapshot instead
// wipes out the data the split trigger would've tried to put into this
// range). A similar scenario occurs if there's a rebalance operation that
// rapidly removes the pre-split replica, so that it never catches up (nor
// via log nor via snapshot); in that case too, the Raft snapshot is
// required to materialize the split's right hand side replica (i.e. this
// one). We're delaying the snapshot for a short amount of time only, so
// this seems tolerable.
log.VEventf(ctx, 2, "dropping rejection from index %d to index %d", msg.Index, msg.RejectHint)

return true
}

// sendRaftMessage sends a Raft message.
func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
r.mu.Lock()
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.mu.lastToReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.mu.lastFromReplica)
var startKey roachpb.RKey
if msg.Type == raftpb.MsgHeartbeat {
if r.mu.replicaID == 0 {
log.Fatalf(ctx, "preemptive snapshot attempted to send a heartbeat: %+v", msg)
Expand All @@ -5085,6 +5021,22 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
// update it whenever it sends a heartbeat. In effect, this makes sure
// it always sees itself as alive.
r.mu.lastUpdateTimes.update(r.mu.replicaID, timeutil.Now())
} else if msg.Type == raftpb.MsgApp && r.mu.internalRaftGroup != nil {
// When the follower is potentially an uninitialized replica waiting for
// a split trigger, send the replica's StartKey along. See the method
// below for more context:
_ = maybeDropMsgApp
// NB: this code is allocation free.
r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr raft.Progress) {
if id == msg.To && pr.State == raft.ProgressStateProbe {
// It is moderately expensive to attach a full key to the message, but note that
// a probing follower will only be appended to once per heartbeat interval (i.e.
// on the order of seconds). See:
//
// https://github.com/etcd-io/etcd/blob/7f450bf6967638673dd88fd4e730b01d1303d5ff/raft/progress.go#L41
startKey = r.descRLocked().StartKey
}
})
}
r.mu.Unlock()

Expand Down Expand Up @@ -5112,10 +5064,11 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
}

if !r.sendRaftMessageRequest(ctx, &RaftMessageRequest{
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
}) {
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
r.mu.droppedMessages++
Expand Down
21 changes: 13 additions & 8 deletions pkg/storage/split_delay_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) {
}

func (sdh *splitDelayHelper) NumAttempts() int {
// There is a related mechanism regarding snapshots and splits that is worth
// pointing out here: Incoming MsgApp (see the _ assignment below) are
// dropped if they are addressed to uninitialized replicas likely to become
// initialized via a split trigger. These MsgApp are sent approximately once
// per heartbeat interval, but sometimes there's an additional delay thanks
// to having to wait for a GC run. In effect, it shouldn't take more than a
// small number of heartbeats until the follower leaves probing status, so
// NumAttempts should at least match that.
_ = maybeDropMsgApp // guru assignment
// Snapshots can come up for other reasons and at the end of the day, the
// delay introduced here needs to make sure that the snapshot queue
// processes at a higher rate than splits happen, so the number of attempts
// will typically be much higher than what's suggested by maybeDropMsgApp.
return (*Replica)(sdh).store.cfg.RaftDelaySplitToSuppressSnapshotTicks
}

Expand All @@ -76,14 +89,6 @@ func (sdh *splitDelayHelper) Sleep(ctx context.Context) time.Duration {
}

func maybeDelaySplitToAvoidSnapshot(ctx context.Context, sdh splitDelayHelperI) string {
// We have an "optimization" to avoid Raft snapshots by dropping some
// outgoing MsgAppResp (see the _ assignment below) which takes effect for
// RaftPostSplitSuppressSnapshotTicks ticks after an uninitialized replica
// is created. This check can err, in which case the snapshot will be
// delayed for that many ticks, and so we want to delay by at least as much
// plus a bit of padding to give a snapshot a chance to catch the follower
// up. If we run out of time, we'll resume the split no matter what.
_ = (*Replica)(nil).maybeDropMsgAppResp // guru assignment
maxDelaySplitToAvoidSnapshotTicks := sdh.NumAttempts()

var slept time.Duration
Expand Down
Loading

0 comments on commit 6eec1ff

Please sign in to comment.