From 6eec1ffad56918f81bc57507f66d7eda17dfd607 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 4 Dec 2018 15:28:10 +0100 Subject: [PATCH] storage: smarten logic to wait for split trigger We previously "waited" for the split trigger quite literally, by dropping an outgoing rejecting MsgAppResp for a few seconds. The downside of doing so was that it would also artificially delay Raft snapshots that were actually required. With the introduction of a mechanism that delays split that would cause Raft snapshots this is a lesser problem than before, but it's still ugly. The revamped mechanism in this commit goes the extra mile to make an informed decision on whether a split trigger is expected to apply: 1. when sending MsgApp to a follower that is being probed, send the start key along with the message 2. when receiving said MsgApp on an uninitialized replica, check whether the store has a replica that currently owns the start key. This replica is the one in charge of applying the split trigger, 3. unless it's waiting for GC (so trigger a check for that). There's also a time-based escape hatch to avoid delaying snapshots indefinitely should there be a flaw in the above logic. Release note: None --- pkg/base/config.go | 29 +--- pkg/storage/client_split_test.go | 4 +- pkg/storage/raft.pb.go | 196 ++++++++++++++--------- pkg/storage/raft.proto | 3 + pkg/storage/raft_transport_test.go | 5 +- pkg/storage/replica.go | 93 +++-------- pkg/storage/split_delay_helper.go | 21 ++- pkg/storage/split_trigger_helper.go | 161 +++++++++++++++++++ pkg/storage/split_trigger_helper_test.go | 103 ++++++++++++ pkg/storage/store.go | 7 +- 10 files changed, 442 insertions(+), 180 deletions(-) create mode 100644 pkg/storage/split_trigger_helper.go create mode 100644 pkg/storage/split_trigger_helper_test.go diff --git a/pkg/base/config.go b/pkg/base/config.go index c645fe6208b1..333b4a1195f3 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -117,9 +117,6 @@ var ( // will send to a follower without hearing a response. defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt( "COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64) - - defaultRaftPostSplitSuppressSnapshotTicks = envutil.EnvOrDefaultInt( - "COCKROACH_RAFT_POST_SPLIT_SUPPRESS_SNAPSHOT_TICKS", 20) ) type lazyHTTPClient struct { @@ -490,12 +487,6 @@ type RaftConfig struct { // single raft.Ready operation. RaftMaxInflightMsgs int - // When a Replica with an empty log (i.e. last index zero), drop rejecting - // MsgAppResp for the first few ticks to allow the split trigger to perform - // the split. - // - // -1 to disable. - RaftPostSplitSuppressSnapshotTicks int // Splitting a range which has a replica needing a snapshot results in two // ranges in that state. The delay configured here slows down splits when in // that situation (limiting to those splits not run through the split @@ -542,20 +533,14 @@ func (cfg *RaftConfig) SetDefaults() { cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs } - if cfg.RaftPostSplitSuppressSnapshotTicks == 0 { - cfg.RaftPostSplitSuppressSnapshotTicks = defaultRaftPostSplitSuppressSnapshotTicks - } - if cfg.RaftDelaySplitToSuppressSnapshotTicks == 0 { - // The Raft Ticks interval defaults to 200ms, and - // RaftPostSplitSuppressSnapshotTicks to 20 ticks. A total of 120 ticks is - // ~24s which experimentally has been shown to allow the small pile (<100) - // of Raft snapshots observed at the beginning of an import/restore to be - // resolved. - cfg.RaftDelaySplitToSuppressSnapshotTicks = 100 - if cfg.RaftPostSplitSuppressSnapshotTicks > 0 { - cfg.RaftDelaySplitToSuppressSnapshotTicks += cfg.RaftPostSplitSuppressSnapshotTicks - } + // The Raft Ticks interval defaults to 200ms, and an election is 15 + // ticks. Add a generous amount of ticks to make sure even a backed up + // Raft snapshot queue is going to make progress when a (not overly + // concurrent) amount of splits happens. + // The resulting delay configured here is north of 20s by default, which + // experimentally has shown to be enough. + cfg.RaftDelaySplitToSuppressSnapshotTicks = 3*cfg.RaftElectionTimeoutTicks + 60 } } diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 89e04a5fecbc..23122054492f 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -517,7 +517,9 @@ func TestSplitTriggerRaftSnapshotRace(t *testing.T) { ctx := context.Background() const numNodes = 3 var args base.TestClusterArgs - args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{DisableMergeQueue: true} + // NB: the merge queue is enabled for additional "chaos". Note that the test + // uses three nodes and so there is no replica movement, which would other- + // wise tickle Raft snapshots for unrelated reasons. tc := testcluster.StartTestCluster(t, numNodes, args) defer tc.Stopper().Stop(ctx) diff --git a/pkg/storage/raft.pb.go b/pkg/storage/raft.pb.go index 5a44ff148dc6..dc2433e98bcf 100644 --- a/pkg/storage/raft.pb.go +++ b/pkg/storage/raft.pb.go @@ -172,10 +172,13 @@ func (*RaftHeartbeat) Descriptor() ([]byte, []int) { return fileDescriptorRaft, // as a dummy message and discarded. A coalesced heartbeat request's replica // descriptor's range ID must be zero. type RaftMessageRequest struct { - RangeID github_com_cockroachdb_cockroach_pkg_roachpb.RangeID `protobuf:"varint,1,opt,name=range_id,json=rangeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.RangeID" json:"range_id"` - FromReplica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,2,opt,name=from_replica,json=fromReplica" json:"from_replica"` - ToReplica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,3,opt,name=to_replica,json=toReplica" json:"to_replica"` - Message raftpb.Message `protobuf:"bytes,4,opt,name=message" json:"message"` + RangeID github_com_cockroachdb_cockroach_pkg_roachpb.RangeID `protobuf:"varint,1,opt,name=range_id,json=rangeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.RangeID" json:"range_id"` + // Optionally, the start key of the sending replica. This is only populated + // as a "hint" under certain conditions. + RangeStartKey github_com_cockroachdb_cockroach_pkg_roachpb.RKey `protobuf:"bytes,8,opt,name=range_start_key,json=rangeStartKey,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.RKey" json:"range_start_key,omitempty"` + FromReplica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,2,opt,name=from_replica,json=fromReplica" json:"from_replica"` + ToReplica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,3,opt,name=to_replica,json=toReplica" json:"to_replica"` + Message raftpb.Message `protobuf:"bytes,4,opt,name=message" json:"message"` // Is this a quiesce request? A quiesce request is a MsgHeartbeat // which is requesting the recipient to stop ticking its local // replica as long as the current Raft state matches the heartbeat @@ -599,6 +602,12 @@ func (m *RaftMessageRequest) MarshalTo(dAtA []byte) (int, error) { i += n } } + if m.RangeStartKey != nil { + dAtA[i] = 0x42 + i++ + i = encodeVarintRaft(dAtA, i, uint64(len(m.RangeStartKey))) + i += copy(dAtA[i:], m.RangeStartKey) + } return i, nil } @@ -911,6 +920,10 @@ func (m *RaftMessageRequest) Size() (n int) { n += 1 + l + sovRaft(uint64(l)) } } + if m.RangeStartKey != nil { + l = len(m.RangeStartKey) + n += 1 + l + sovRaft(uint64(l)) + } return n } @@ -1421,6 +1434,37 @@ func (m *RaftMessageRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RangeStartKey", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RangeStartKey = append(m.RangeStartKey[:0], dAtA[iNdEx:postIndex]...) + if m.RangeStartKey == nil { + m.RangeStartKey = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) @@ -2461,75 +2505,77 @@ var ( func init() { proto.RegisterFile("storage/raft.proto", fileDescriptorRaft) } var fileDescriptorRaft = []byte{ - // 1106 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xdd, 0x6e, 0x1b, 0x45, - 0x14, 0xf6, 0xc6, 0xff, 0xc7, 0x76, 0xb3, 0x1d, 0x2a, 0x58, 0x19, 0x70, 0xcc, 0x96, 0x56, 0xa6, - 0x48, 0xeb, 0xca, 0xaa, 0xb8, 0xe0, 0xce, 0x3f, 0xdb, 0xc4, 0xcd, 0xaf, 0x36, 0x69, 0x10, 0x48, - 0x95, 0x35, 0x5e, 0x8f, 0xed, 0x55, 0xec, 0x9d, 0xcd, 0xee, 0xb8, 0x90, 0x3e, 0x05, 0x4f, 0x80, - 0xb8, 0xe1, 0x86, 0x17, 0xe0, 0x15, 0x72, 0x83, 0xc4, 0x65, 0x2f, 0x50, 0x04, 0xe1, 0x2d, 0x10, - 0x17, 0x68, 0x66, 0x67, 0x1c, 0x27, 0x31, 0x34, 0x41, 0x88, 0x1b, 0x6e, 0x92, 0xdd, 0x33, 0xf3, - 0x9d, 0x6f, 0xe7, 0x7c, 0xdf, 0x39, 0x63, 0x40, 0x11, 0xa3, 0x21, 0x1e, 0x91, 0x7a, 0x88, 0x87, - 0xcc, 0x0a, 0x42, 0xca, 0x28, 0xba, 0xeb, 0x52, 0xf7, 0x28, 0xa4, 0xd8, 0x1d, 0x5b, 0x72, 0xb5, - 0x7c, 0x4f, 0xbc, 0x06, 0xfd, 0x3a, 0x09, 0x43, 0x1a, 0x46, 0xf1, 0xc6, 0xf2, 0xdb, 0x2a, 0x3a, - 0x25, 0x0c, 0x0f, 0x30, 0xc3, 0x32, 0xfe, 0xbe, 0x4a, 0x2a, 0xff, 0x07, 0xfd, 0x7a, 0xc4, 0x30, - 0x23, 0x72, 0xf9, 0x5d, 0xc2, 0xdc, 0x81, 0x20, 0x14, 0x7f, 0x82, 0xfe, 0x02, 0x79, 0xf9, 0xde, - 0x88, 0x8e, 0xa8, 0x78, 0xac, 0xf3, 0xa7, 0x38, 0x6a, 0xfe, 0x90, 0x84, 0x92, 0x83, 0x87, 0x6c, - 0x83, 0xe0, 0x90, 0xf5, 0x09, 0x66, 0xa8, 0x0f, 0xb9, 0x10, 0xfb, 0x23, 0xd2, 0xf3, 0x06, 0x86, - 0x56, 0xd5, 0x6a, 0xa9, 0xd6, 0xfa, 0xe9, 0xd9, 0x5a, 0xe2, 0xfc, 0x6c, 0x2d, 0xeb, 0xf0, 0x78, - 0xb7, 0xf3, 0xfb, 0xd9, 0xda, 0x93, 0x91, 0xc7, 0xc6, 0xb3, 0xbe, 0xe5, 0xd2, 0x69, 0x7d, 0x7e, - 0xa8, 0x41, 0xff, 0xe2, 0xb9, 0x1e, 0x1c, 0x8d, 0xea, 0xf2, 0x14, 0x96, 0xc4, 0x39, 0x59, 0x91, - 0xb8, 0x3b, 0x40, 0x5f, 0xc2, 0xea, 0x30, 0xa4, 0xd3, 0x5e, 0x48, 0x82, 0x89, 0xe7, 0x62, 0x4e, - 0xb5, 0x52, 0xd5, 0x6a, 0xa5, 0xd6, 0xae, 0xa4, 0x2a, 0x3d, 0x0d, 0xe9, 0xd4, 0x89, 0x57, 0x05, - 0xe1, 0x27, 0xb7, 0x23, 0x54, 0x48, 0xa7, 0x34, 0x5c, 0x48, 0x34, 0x40, 0xc7, 0x50, 0x62, 0x74, - 0x91, 0x36, 0x29, 0x68, 0xb7, 0x25, 0x6d, 0xe1, 0x80, 0xfe, 0x1b, 0xa4, 0x05, 0x46, 0x2f, 0x28, - 0x0d, 0x48, 0x31, 0x12, 0x4e, 0x8d, 0x94, 0xa8, 0x65, 0x8a, 0x33, 0x39, 0x22, 0x82, 0xde, 0x83, - 0x8c, 0x4b, 0xa7, 0x53, 0x8f, 0x19, 0xe9, 0x85, 0x35, 0x19, 0x43, 0x15, 0xc8, 0x1e, 0xcf, 0x3c, - 0x12, 0xb9, 0xc4, 0xc8, 0x54, 0xb5, 0x5a, 0x4e, 0x2e, 0xab, 0xa0, 0xf9, 0x47, 0x12, 0x10, 0x57, - 0x6e, 0x9b, 0x44, 0x11, 0x1e, 0x11, 0x87, 0x1c, 0xcf, 0x48, 0xf4, 0xdf, 0xc8, 0xb7, 0x0d, 0xc5, - 0x45, 0xf9, 0x84, 0x76, 0x85, 0xc6, 0x87, 0xd6, 0x85, 0xbd, 0xaf, 0xd4, 0xa4, 0x43, 0x22, 0x37, - 0xf4, 0x02, 0x46, 0x43, 0x79, 0x8a, 0xc2, 0x82, 0x2c, 0xa8, 0x0b, 0x70, 0x21, 0x8a, 0x50, 0xe4, - 0x76, 0xc9, 0xf2, 0xf3, 0x72, 0xa3, 0x3a, 0x64, 0xa7, 0x71, 0x3d, 0x44, 0xbd, 0x0b, 0x8d, 0x55, - 0x2b, 0xee, 0x04, 0x4b, 0x96, 0x49, 0x55, 0x51, 0xee, 0x5a, 0xac, 0x72, 0x7a, 0x49, 0x95, 0xd1, - 0x53, 0x80, 0xb1, 0x6a, 0x8d, 0xc8, 0xc8, 0x54, 0x93, 0xb5, 0x42, 0xa3, 0x6a, 0x5d, 0xeb, 0x63, - 0xeb, 0x52, 0x0f, 0xc9, 0x24, 0x0b, 0x48, 0xb4, 0x0b, 0xab, 0xf3, 0xb7, 0x5e, 0x48, 0xa2, 0x20, - 0x32, 0xb2, 0xb7, 0x4a, 0x76, 0x67, 0x0e, 0x77, 0x38, 0xda, 0xec, 0xc3, 0x3b, 0xd7, 0xd5, 0x6f, - 0x61, 0xe6, 0x8e, 0xd1, 0x3a, 0xe4, 0xc2, 0xf8, 0x3d, 0x32, 0x34, 0x41, 0xf2, 0xe0, 0x2f, 0x48, - 0xae, 0xa0, 0x63, 0xa6, 0x39, 0xd8, 0xdc, 0x03, 0xe3, 0xd2, 0xae, 0x28, 0xa0, 0x7e, 0x44, 0x9e, - 0xfb, 0x1e, 0xf5, 0x91, 0x05, 0x69, 0x31, 0xb2, 0x84, 0xc9, 0x0a, 0x0d, 0x63, 0x89, 0x5e, 0x36, - 0x5f, 0x77, 0xe2, 0x6d, 0x9f, 0xa6, 0x4e, 0xbf, 0x5d, 0xd3, 0xcc, 0x9f, 0x57, 0xe0, 0xad, 0x25, - 0x29, 0xff, 0xe7, 0xae, 0x5d, 0x87, 0xf4, 0x8c, 0x17, 0x55, 0x7a, 0xf6, 0xe3, 0x37, 0xa9, 0xb5, - 0xa0, 0x83, 0x4c, 0x16, 0xe3, 0xcd, 0xef, 0xd3, 0xb0, 0xba, 0xef, 0xe3, 0x20, 0x1a, 0x53, 0xa6, - 0x06, 0x42, 0x13, 0x32, 0x63, 0x82, 0x07, 0x44, 0x29, 0xf5, 0xd1, 0x92, 0xec, 0x57, 0x30, 0xd6, - 0x86, 0x00, 0x38, 0x12, 0x88, 0x1e, 0x42, 0xee, 0xe8, 0x65, 0xaf, 0xcf, 0xcd, 0x25, 0xaa, 0x56, - 0x6c, 0x15, 0xb8, 0x32, 0x9b, 0x87, 0xc2, 0x6f, 0x4e, 0xf6, 0xe8, 0x65, 0x6c, 0xbc, 0x35, 0x28, - 0x4c, 0xe8, 0xa8, 0x47, 0x7c, 0x16, 0x7a, 0x24, 0x32, 0x92, 0xd5, 0x64, 0xad, 0xe8, 0xc0, 0x84, - 0x8e, 0xec, 0x38, 0x82, 0xca, 0x90, 0x1e, 0x7a, 0x3e, 0x9e, 0x88, 0x83, 0xaa, 0x5e, 0x8b, 0x43, - 0xe5, 0x6f, 0x92, 0x90, 0x89, 0x79, 0xd1, 0x0b, 0xb8, 0xc7, 0xbb, 0xb6, 0x27, 0x9b, 0xb4, 0x27, - 0x0d, 0x29, 0x15, 0xbb, 0x95, 0x99, 0x51, 0x78, 0x7d, 0x44, 0xde, 0x07, 0x88, 0xcd, 0x16, 0x79, - 0xaf, 0x88, 0x50, 0x2e, 0xa9, 0x34, 0x11, 0xf1, 0x7d, 0xef, 0x15, 0x41, 0x0f, 0xa0, 0xe0, 0x62, - 0xbf, 0x37, 0x20, 0xee, 0xc4, 0xf3, 0xc9, 0xa5, 0x0f, 0x06, 0x17, 0xfb, 0x9d, 0x38, 0x8e, 0x6c, - 0x48, 0x8b, 0x1b, 0x58, 0x4c, 0x8f, 0xe5, 0xc5, 0x9d, 0xdf, 0xd5, 0xca, 0x0a, 0xfb, 0x1c, 0xa0, - 0x0e, 0x2f, 0xd0, 0x68, 0x1b, 0x72, 0x41, 0xe8, 0xd1, 0xd0, 0x63, 0x27, 0x62, 0xda, 0xdf, 0x59, - 0x6a, 0x82, 0xab, 0x32, 0xed, 0x49, 0x88, 0x6a, 0x5c, 0x95, 0x82, 0xa7, 0x8b, 0x58, 0x88, 0x19, - 0x19, 0x9d, 0x18, 0xd9, 0x1b, 0xa7, 0xdb, 0x97, 0x10, 0x95, 0x4e, 0xa5, 0x78, 0x96, 0xca, 0x69, - 0xfa, 0x8a, 0xf9, 0x04, 0x72, 0x8a, 0x10, 0x15, 0x20, 0xfb, 0x7c, 0x67, 0x73, 0x67, 0xf7, 0xb3, - 0x1d, 0x3d, 0x81, 0x8a, 0x90, 0x73, 0xec, 0xf6, 0xee, 0xa1, 0xed, 0x7c, 0xae, 0x6b, 0xa8, 0x04, - 0x79, 0xc7, 0x6e, 0x35, 0xb7, 0x9a, 0x3b, 0x6d, 0x5b, 0x5f, 0x31, 0x0d, 0xc8, 0xa9, 0xbc, 0x7c, - 0xe3, 0xe6, 0x61, 0xaf, 0xd5, 0x3c, 0x68, 0x6f, 0xe8, 0x09, 0xf3, 0x47, 0x0d, 0xf4, 0x8b, 0x4f, - 0x90, 0x83, 0x60, 0x03, 0x32, 0xbc, 0x22, 0xb3, 0x48, 0xb8, 0xf5, 0x4e, 0xe3, 0xd1, 0xdf, 0x7e, - 0x77, 0x0c, 0xb2, 0xf6, 0x05, 0x42, 0xdd, 0x9f, 0x31, 0x9e, 0x4f, 0x76, 0x75, 0x15, 0x70, 0xdf, - 0xe4, 0xaf, 0x4c, 0x7e, 0xb3, 0x0b, 0x99, 0x18, 0x77, 0xed, 0x30, 0xcd, 0x76, 0xdb, 0xde, 0x3b, - 0xb0, 0x3b, 0xba, 0xc6, 0x97, 0x9a, 0x7b, 0x7b, 0x5b, 0x5d, 0xbb, 0xa3, 0xaf, 0xa0, 0x3c, 0xa4, - 0x6d, 0xc7, 0xd9, 0x75, 0xf4, 0x24, 0xdf, 0xd5, 0xb1, 0xdb, 0x5b, 0xdd, 0x1d, 0xbb, 0xa3, 0xa7, - 0x9e, 0xa5, 0x72, 0x49, 0x3d, 0x65, 0x7e, 0xa7, 0xc1, 0xdd, 0x36, 0xf5, 0x87, 0xed, 0x31, 0x37, - 0x51, 0x9b, 0xfa, 0x8c, 0x7c, 0xc5, 0xd0, 0x63, 0x00, 0x7e, 0xa1, 0x63, 0x7f, 0xa0, 0x66, 0x5b, - 0xbe, 0x75, 0x57, 0xce, 0xb6, 0x7c, 0x3b, 0x5e, 0xe9, 0x76, 0x9c, 0xbc, 0xdc, 0x24, 0x7e, 0x30, - 0x64, 0x03, 0x7c, 0x32, 0xa1, 0x38, 0xfe, 0x51, 0x54, 0x74, 0xd4, 0x2b, 0xea, 0x40, 0xf6, 0x9f, - 0xcf, 0x1b, 0x05, 0x6d, 0xbc, 0xd6, 0x20, 0xbf, 0x3d, 0x9b, 0x30, 0x8f, 0x37, 0x0d, 0x9a, 0x80, - 0xbe, 0xd0, 0x3c, 0x71, 0x1f, 0x3f, 0xba, 0x59, 0x87, 0xf1, 0xbd, 0xe5, 0x87, 0x37, 0x1b, 0x56, - 0x66, 0xa2, 0xa6, 0x3d, 0xd6, 0xd0, 0x0b, 0x28, 0xf2, 0x45, 0xa5, 0x20, 0x32, 0xdf, 0x6c, 0xcb, - 0xf2, 0xfd, 0x1b, 0x58, 0x20, 0x4e, 0xdf, 0xfa, 0xe0, 0xf4, 0xd7, 0x4a, 0xe2, 0xf4, 0xbc, 0xa2, - 0xfd, 0x74, 0x5e, 0xd1, 0x5e, 0x9f, 0x57, 0xb4, 0x5f, 0xce, 0x2b, 0xda, 0xd7, 0xbf, 0x55, 0x12, - 0x5f, 0x64, 0x25, 0xf2, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x57, 0x2f, 0x26, 0xf8, 0x98, 0x0b, - 0x00, 0x00, + // 1147 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xdd, 0x6e, 0x1b, 0xc5, + 0x17, 0xf7, 0xc6, 0xdf, 0xc7, 0x76, 0xb3, 0x9d, 0x7f, 0xf5, 0x67, 0x65, 0xc0, 0x36, 0x5b, 0x5a, + 0x99, 0x22, 0xad, 0x8b, 0x55, 0xb8, 0xe0, 0xce, 0x1f, 0xdb, 0xc6, 0x4d, 0xf3, 0xa1, 0x4d, 0x5b, + 0x04, 0x52, 0x65, 0x8d, 0xd7, 0x63, 0x7b, 0x15, 0x7b, 0x67, 0xbb, 0x3b, 0x2e, 0xb8, 0x4f, 0xc1, + 0x13, 0x20, 0x6e, 0xb8, 0xe1, 0x05, 0x78, 0x85, 0xdc, 0x20, 0x71, 0x59, 0x09, 0x14, 0x41, 0x78, + 0x8b, 0x5e, 0xa1, 0x99, 0x9d, 0x71, 0x9c, 0xc4, 0xd0, 0x04, 0x21, 0x6e, 0xb8, 0x49, 0xbc, 0xe7, + 0xcc, 0xef, 0x77, 0xf6, 0x9c, 0xdf, 0x39, 0x67, 0x16, 0x50, 0xc4, 0x68, 0x88, 0xc7, 0xa4, 0x11, + 0xe2, 0x11, 0xb3, 0x82, 0x90, 0x32, 0x8a, 0xae, 0xbb, 0xd4, 0x3d, 0x0c, 0x29, 0x76, 0x27, 0x96, + 0xf4, 0x96, 0x6f, 0x88, 0xc7, 0x60, 0xd0, 0x20, 0x61, 0x48, 0xc3, 0x28, 0x3e, 0x58, 0xfe, 0xbf, + 0xb2, 0xce, 0x08, 0xc3, 0x43, 0xcc, 0xb0, 0xb4, 0xbf, 0xab, 0x48, 0xe5, 0xff, 0x60, 0xd0, 0x88, + 0x18, 0x66, 0x44, 0xba, 0xdf, 0x26, 0xcc, 0x1d, 0x8a, 0x80, 0xe2, 0x4f, 0x30, 0x58, 0x09, 0x5e, + 0xbe, 0x31, 0xa6, 0x63, 0x2a, 0x7e, 0x36, 0xf8, 0xaf, 0xd8, 0x6a, 0xfe, 0x90, 0x84, 0x92, 0x83, + 0x47, 0x6c, 0x8b, 0xe0, 0x90, 0x0d, 0x08, 0x66, 0x68, 0x00, 0xb9, 0x10, 0xfb, 0x63, 0xd2, 0xf7, + 0x86, 0x86, 0x56, 0xd3, 0xea, 0xa9, 0xf6, 0x83, 0xa3, 0xe3, 0x6a, 0xe2, 0xe4, 0xb8, 0x9a, 0x75, + 0xb8, 0xbd, 0xd7, 0x7d, 0x7d, 0x5c, 0xbd, 0x37, 0xf6, 0xd8, 0x64, 0x3e, 0xb0, 0x5c, 0x3a, 0x6b, + 0x2c, 0x93, 0x1a, 0x0e, 0x4e, 0x7f, 0x37, 0x82, 0xc3, 0x71, 0x43, 0x66, 0x61, 0x49, 0x9c, 0x93, + 0x15, 0xc4, 0xbd, 0x21, 0xfa, 0x12, 0x36, 0x47, 0x21, 0x9d, 0xf5, 0x43, 0x12, 0x4c, 0x3d, 0x17, + 0xf3, 0x50, 0x1b, 0x35, 0xad, 0x5e, 0x6a, 0xef, 0xc9, 0x50, 0xa5, 0xfb, 0x21, 0x9d, 0x39, 0xb1, + 0x57, 0x04, 0xfc, 0xe4, 0x6a, 0x01, 0x15, 0xd2, 0x29, 0x8d, 0x56, 0x88, 0x86, 0xe8, 0x39, 0x94, + 0x18, 0x5d, 0x0d, 0x9b, 0x14, 0x61, 0x77, 0x64, 0xd8, 0xc2, 0x63, 0xfa, 0x4f, 0x04, 0x2d, 0x30, + 0x7a, 0x1a, 0xd2, 0x80, 0x14, 0x23, 0xe1, 0xcc, 0x48, 0x89, 0x5a, 0xa6, 0x78, 0x24, 0x47, 0x58, + 0xd0, 0x3b, 0x90, 0x71, 0xe9, 0x6c, 0xe6, 0x31, 0x23, 0xbd, 0xe2, 0x93, 0x36, 0x54, 0x81, 0xec, + 0xf3, 0xb9, 0x47, 0x22, 0x97, 0x18, 0x99, 0x9a, 0x56, 0xcf, 0x49, 0xb7, 0x32, 0x9a, 0x3f, 0xa7, + 0x00, 0x71, 0xe5, 0x76, 0x48, 0x14, 0xe1, 0x31, 0x71, 0xc8, 0xf3, 0x39, 0x89, 0xfe, 0x1d, 0xf9, + 0x76, 0xa0, 0xb8, 0x2a, 0x9f, 0xd0, 0xae, 0xd0, 0x7c, 0xdf, 0x3a, 0x6d, 0xef, 0x73, 0x35, 0xe9, + 0x92, 0xc8, 0x0d, 0xbd, 0x80, 0xd1, 0x50, 0x66, 0x51, 0x58, 0x91, 0x05, 0xf5, 0x00, 0x4e, 0x45, + 0x11, 0x8a, 0x5c, 0x8d, 0x2c, 0xbf, 0x2c, 0x37, 0x6a, 0x40, 0x76, 0x16, 0xd7, 0x43, 0xd4, 0xbb, + 0xd0, 0xdc, 0xb4, 0xe2, 0x49, 0xb0, 0x64, 0x99, 0x54, 0x15, 0xe5, 0xa9, 0xd5, 0x2a, 0xa7, 0xd7, + 0x54, 0x19, 0xdd, 0x07, 0x98, 0xa8, 0xd1, 0x88, 0x8c, 0x4c, 0x2d, 0x59, 0x2f, 0x34, 0x6b, 0xd6, + 0x85, 0x39, 0xb6, 0xce, 0xcc, 0x90, 0x24, 0x59, 0x41, 0xa2, 0x3d, 0xd8, 0x5c, 0x3e, 0xf5, 0x43, + 0x12, 0x05, 0x91, 0x91, 0xbd, 0x12, 0xd9, 0xb5, 0x25, 0xdc, 0xe1, 0x68, 0xf4, 0x0c, 0x36, 0x63, + 0x9d, 0x23, 0x86, 0x43, 0xd6, 0x3f, 0x24, 0x0b, 0x23, 0x57, 0xd3, 0xea, 0xc5, 0xf6, 0xc7, 0xaf, + 0x8f, 0xab, 0x1f, 0x5d, 0x4d, 0xdf, 0x6d, 0xb2, 0x70, 0x4a, 0x82, 0xed, 0x80, 0x93, 0x6d, 0x93, + 0x85, 0x39, 0x80, 0xb7, 0x2e, 0x36, 0x57, 0x1b, 0x33, 0x77, 0x82, 0x1e, 0x40, 0x2e, 0x8c, 0x9f, + 0x23, 0x43, 0x13, 0x39, 0xdc, 0xfa, 0x93, 0x1c, 0xce, 0xa1, 0xe3, 0x44, 0x96, 0x60, 0x73, 0x1f, + 0x8c, 0x33, 0xa7, 0xa2, 0x80, 0xfa, 0x11, 0x79, 0xe2, 0x7b, 0xd4, 0x47, 0x16, 0xa4, 0xc5, 0x46, + 0x14, 0x3d, 0x5c, 0x68, 0x1a, 0x6b, 0xda, 0xc1, 0xe6, 0x7e, 0x27, 0x3e, 0xf6, 0x69, 0xea, 0xe8, + 0xdb, 0xaa, 0x66, 0xfe, 0xb2, 0x01, 0xff, 0x5b, 0x43, 0xf9, 0x1f, 0x1f, 0x8a, 0x07, 0x90, 0x9e, + 0xf3, 0xa2, 0xca, 0x91, 0xf8, 0xf0, 0x4d, 0x6a, 0xad, 0xe8, 0x20, 0xc9, 0x62, 0xbc, 0xf9, 0x7d, + 0x1a, 0x36, 0x0f, 0x7c, 0x1c, 0x44, 0x13, 0xca, 0xd4, 0xbe, 0x69, 0x41, 0x66, 0x42, 0xf0, 0x90, + 0x28, 0xa5, 0x3e, 0x58, 0xc3, 0x7e, 0x0e, 0x63, 0x6d, 0x09, 0x80, 0x23, 0x81, 0xe8, 0x36, 0xe4, + 0x0e, 0x5f, 0xf4, 0x07, 0xbc, 0xb9, 0x44, 0xd5, 0x8a, 0xed, 0x02, 0x57, 0x66, 0xfb, 0xa9, 0xe8, + 0x37, 0x27, 0x7b, 0xf8, 0x22, 0x6e, 0xbc, 0x2a, 0x14, 0xa6, 0x74, 0xdc, 0x27, 0x3e, 0x0b, 0x3d, + 0x12, 0x19, 0xc9, 0x5a, 0xb2, 0x5e, 0x74, 0x60, 0x4a, 0xc7, 0x76, 0x6c, 0x41, 0x65, 0x48, 0x8f, + 0x3c, 0x1f, 0x4f, 0x45, 0xa2, 0x6a, 0x94, 0x63, 0x53, 0xf9, 0x9b, 0x24, 0x64, 0xe2, 0xb8, 0xe8, + 0x19, 0xdc, 0xe0, 0x4b, 0xa1, 0x2f, 0x77, 0x40, 0x5f, 0x36, 0xa4, 0x54, 0xec, 0x4a, 0xcd, 0x8c, + 0xc2, 0x8b, 0x1b, 0xf8, 0x26, 0x80, 0x9c, 0x4c, 0xef, 0x25, 0x11, 0xca, 0x25, 0x95, 0x26, 0xf1, + 0x8c, 0x79, 0x2f, 0x09, 0xba, 0x05, 0x05, 0x17, 0xfb, 0xfd, 0x21, 0x71, 0xa7, 0x9e, 0x4f, 0xce, + 0xbc, 0x30, 0xb8, 0xd8, 0xef, 0xc6, 0x76, 0x64, 0x43, 0x5a, 0x5c, 0xf0, 0x62, 0x39, 0xad, 0x2f, + 0xee, 0xf2, 0x53, 0x40, 0xb5, 0xc2, 0x01, 0x07, 0xa8, 0xe4, 0x05, 0x1a, 0xed, 0x40, 0x2e, 0x08, + 0x3d, 0x1a, 0x7a, 0x6c, 0x21, 0x2e, 0x93, 0x6b, 0x6b, 0x9b, 0xe0, 0xbc, 0x4c, 0xfb, 0x12, 0xa2, + 0x06, 0x57, 0x51, 0x70, 0xba, 0x88, 0x85, 0x98, 0x91, 0xf1, 0xc2, 0xc8, 0x5e, 0x9a, 0xee, 0x40, + 0x42, 0x14, 0x9d, 0xa2, 0x78, 0x98, 0xca, 0x69, 0xfa, 0x86, 0x79, 0x0f, 0x72, 0x2a, 0x20, 0x2a, + 0x40, 0xf6, 0xc9, 0xee, 0xf6, 0xee, 0xde, 0x67, 0xbb, 0x7a, 0x02, 0x15, 0x21, 0xe7, 0xd8, 0x9d, + 0xbd, 0xa7, 0xb6, 0xf3, 0xb9, 0xae, 0xa1, 0x12, 0xe4, 0x1d, 0xbb, 0xdd, 0x7a, 0xd4, 0xda, 0xed, + 0xd8, 0xfa, 0x86, 0x69, 0x40, 0x4e, 0xf1, 0xf2, 0x83, 0xdb, 0x4f, 0xfb, 0xed, 0xd6, 0xe3, 0xce, + 0x96, 0x9e, 0x30, 0x7f, 0xd4, 0x40, 0x3f, 0x7d, 0x05, 0xb9, 0x08, 0xb6, 0x20, 0xc3, 0x2b, 0x32, + 0x8f, 0x44, 0xb7, 0x5e, 0x6b, 0xde, 0xf9, 0xcb, 0xf7, 0x8e, 0x41, 0xd6, 0x81, 0x40, 0xa8, 0xeb, + 0x39, 0xc6, 0xf3, 0x8b, 0x43, 0xdd, 0x34, 0xbc, 0x6f, 0xf2, 0xe7, 0x2e, 0x16, 0xb3, 0x07, 0x99, + 0x18, 0x77, 0x21, 0x99, 0x56, 0xa7, 0x63, 0xef, 0x3f, 0xb6, 0xbb, 0xba, 0xc6, 0x5d, 0xad, 0xfd, + 0xfd, 0x47, 0x3d, 0xbb, 0xab, 0x6f, 0xa0, 0x3c, 0xa4, 0x6d, 0xc7, 0xd9, 0x73, 0xf4, 0x24, 0x3f, + 0xd5, 0xb5, 0x3b, 0x8f, 0x7a, 0xbb, 0x76, 0x57, 0x4f, 0x3d, 0x4c, 0xe5, 0x92, 0x7a, 0xca, 0xfc, + 0x4e, 0x83, 0xeb, 0x1d, 0xea, 0x8f, 0x3a, 0x13, 0xde, 0x44, 0x1d, 0xea, 0x33, 0xf2, 0x15, 0x43, + 0x77, 0x01, 0xf8, 0xf7, 0x02, 0xf6, 0x87, 0x6a, 0xb7, 0xe5, 0xdb, 0xd7, 0xe5, 0x6e, 0xcb, 0x77, + 0x62, 0x4f, 0xaf, 0xeb, 0xe4, 0xe5, 0x21, 0xf1, 0x3d, 0x92, 0x0d, 0xf0, 0x62, 0x4a, 0x71, 0xfc, + 0xcd, 0x55, 0x74, 0xd4, 0x23, 0xea, 0x42, 0xf6, 0xef, 0xef, 0x1b, 0x05, 0x6d, 0xbe, 0xd2, 0x20, + 0xbf, 0x33, 0x9f, 0x32, 0x8f, 0x0f, 0x0d, 0x9a, 0x82, 0xbe, 0x32, 0x3c, 0xf1, 0x1c, 0xdf, 0xb9, + 0xdc, 0x84, 0xf1, 0xb3, 0xe5, 0xdb, 0x97, 0x5b, 0x56, 0x66, 0xa2, 0xae, 0xdd, 0xd5, 0xd0, 0x33, + 0x28, 0x72, 0xa7, 0x52, 0x10, 0x99, 0x6f, 0x6e, 0xcb, 0xf2, 0xcd, 0x4b, 0xb4, 0x40, 0x4c, 0xdf, + 0x7e, 0xef, 0xe8, 0xb7, 0x4a, 0xe2, 0xe8, 0xa4, 0xa2, 0xfd, 0x74, 0x52, 0xd1, 0x5e, 0x9d, 0x54, + 0xb4, 0x5f, 0x4f, 0x2a, 0xda, 0xd7, 0xbf, 0x57, 0x12, 0x5f, 0x64, 0x25, 0xf2, 0x8f, 0x00, 0x00, + 0x00, 0xff, 0xff, 0xd4, 0x72, 0xff, 0x08, 0xf7, 0x0b, 0x00, 0x00, } diff --git a/pkg/storage/raft.proto b/pkg/storage/raft.proto index 7a79e5faa3c3..6a9faa355c01 100644 --- a/pkg/storage/raft.proto +++ b/pkg/storage/raft.proto @@ -50,6 +50,9 @@ message RaftMessageRequest { optional uint64 range_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "RangeID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; + // Optionally, the start key of the sending replica. This is only populated + // as a "hint" under certain conditions. + optional bytes range_start_key = 8 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RKey"]; optional roachpb.ReplicaDescriptor from_replica = 2 [(gogoproto.nullable) = false]; optional roachpb.ReplicaDescriptor to_replica = 3 [(gogoproto.nullable) = false]; diff --git a/pkg/storage/raft_transport_test.go b/pkg/storage/raft_transport_test.go index 6fc3c9d094da..70c462fc80d0 100644 --- a/pkg/storage/raft_transport_test.go +++ b/pkg/storage/raft_transport_test.go @@ -18,6 +18,7 @@ import ( "context" "math/rand" "net" + "reflect" "testing" "time" @@ -36,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "go.etcd.io/etcd/raft/raftpb" ) @@ -350,7 +350,8 @@ func TestSendAndReceive(t *testing.T) { if !transports[storeNodes[fromStoreID]].SendAsync(expReq) { t.Errorf("unable to send message from %d to %d", fromStoreID, toStoreID) } - if req := <-channels[toStoreID].ch; !proto.Equal(req, expReq) { + // NB: proto.Equal will panic here since it doesn't know about `gogoproto.casttype`. + if req := <-channels[toStoreID].ch; !reflect.DeepEqual(req, expReq) { t.Errorf("got unexpected message %v on channel %d", req, toStoreID) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 1a36f34305d7..11fc24b53d34 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -4995,11 +4995,8 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag lastAppResp = message drop = true } - - if r.maybeDropMsgAppResp(ctx, message) { - drop = true - } } + if !drop { r.sendRaftMessage(ctx, message) } @@ -5009,73 +5006,12 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag } } -// maybeDropMsgAppResp returns true if the outgoing Raft message should be -// dropped. It does so if sending the message would likely result in an errant -// Raft snapshot after a split. -func (r *Replica) maybeDropMsgAppResp(ctx context.Context, msg raftpb.Message) bool { - if !msg.Reject { - return false - } - - r.mu.RLock() - ticks := r.mu.ticks - initialized := r.isInitializedRLocked() - r.mu.RUnlock() - - if initialized { - return false - } - - if ticks > r.store.cfg.RaftPostSplitSuppressSnapshotTicks { - log.Infof( - ctx, - "allowing MsgAppResp for uninitialized replica (%d > %d ticks)", - ticks, - r.store.cfg.RaftPostSplitSuppressSnapshotTicks, - ) - return false - } - - if msg.RejectHint != 0 { - log.Fatalf(ctx, "received reject hint %d from supposedly uninitialized replica", msg.RejectHint) - } - - // This replica has a blank state, i.e. its last index is zero (because we - // start our Raft log at index 10). In particular, it's not a preemptive - // snapshot. This happens in two cases: - // - // 1. a rebalance operation is adding a new replica of the range to this - // node. We always send a preemptive snapshot before attempting to do so, so - // we wouldn't enter this branch as the replica would be initialized. We - // would however enter this branch if the preemptive snapshot got GC'ed - // before the actual replica change came through. - // - // 2. a split executed that created this replica as its right hand side, but - // this node's pre-split replica hasn't executed the split trigger (yet). - // The expectation is that it will do so momentarily, however if we don't - // drop this rejection, the Raft leader will try to catch us up via a - // snapshot. In 99.9% of cases this is a wasted effort since the pre-split - // replica already contains the data this replica will hold. The remaining - // 0.01% constitute the case in which our local replica of the pre-split - // range requires a snapshot which catches it up "past" the split trigger, - // in which case the trigger will never be executed (the snapshot instead - // wipes out the data the split trigger would've tried to put into this - // range). A similar scenario occurs if there's a rebalance operation that - // rapidly removes the pre-split replica, so that it never catches up (nor - // via log nor via snapshot); in that case too, the Raft snapshot is - // required to materialize the split's right hand side replica (i.e. this - // one). We're delaying the snapshot for a short amount of time only, so - // this seems tolerable. - log.VEventf(ctx, 2, "dropping rejection from index %d to index %d", msg.Index, msg.RejectHint) - - return true -} - // sendRaftMessage sends a Raft message. func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { r.mu.Lock() fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.mu.lastToReplica) toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.mu.lastFromReplica) + var startKey roachpb.RKey if msg.Type == raftpb.MsgHeartbeat { if r.mu.replicaID == 0 { log.Fatalf(ctx, "preemptive snapshot attempted to send a heartbeat: %+v", msg) @@ -5085,6 +5021,22 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { // update it whenever it sends a heartbeat. In effect, this makes sure // it always sees itself as alive. r.mu.lastUpdateTimes.update(r.mu.replicaID, timeutil.Now()) + } else if msg.Type == raftpb.MsgApp && r.mu.internalRaftGroup != nil { + // When the follower is potentially an uninitialized replica waiting for + // a split trigger, send the replica's StartKey along. See the method + // below for more context: + _ = maybeDropMsgApp + // NB: this code is allocation free. + r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr raft.Progress) { + if id == msg.To && pr.State == raft.ProgressStateProbe { + // It is moderately expensive to attach a full key to the message, but note that + // a probing follower will only be appended to once per heartbeat interval (i.e. + // on the order of seconds). See: + // + // https://github.com/etcd-io/etcd/blob/7f450bf6967638673dd88fd4e730b01d1303d5ff/raft/progress.go#L41 + startKey = r.descRLocked().StartKey + } + }) } r.mu.Unlock() @@ -5112,10 +5064,11 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { } if !r.sendRaftMessageRequest(ctx, &RaftMessageRequest{ - RangeID: r.RangeID, - ToReplica: toReplica, - FromReplica: fromReplica, - Message: msg, + RangeID: r.RangeID, + ToReplica: toReplica, + FromReplica: fromReplica, + Message: msg, + RangeStartKey: startKey, // usually nil }) { if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { r.mu.droppedMessages++ diff --git a/pkg/storage/split_delay_helper.go b/pkg/storage/split_delay_helper.go index 7930c1c497b9..f8b59f31b951 100644 --- a/pkg/storage/split_delay_helper.go +++ b/pkg/storage/split_delay_helper.go @@ -60,6 +60,19 @@ func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) { } func (sdh *splitDelayHelper) NumAttempts() int { + // There is a related mechanism regarding snapshots and splits that is worth + // pointing out here: Incoming MsgApp (see the _ assignment below) are + // dropped if they are addressed to uninitialized replicas likely to become + // initialized via a split trigger. These MsgApp are sent approximately once + // per heartbeat interval, but sometimes there's an additional delay thanks + // to having to wait for a GC run. In effect, it shouldn't take more than a + // small number of heartbeats until the follower leaves probing status, so + // NumAttempts should at least match that. + _ = maybeDropMsgApp // guru assignment + // Snapshots can come up for other reasons and at the end of the day, the + // delay introduced here needs to make sure that the snapshot queue + // processes at a higher rate than splits happen, so the number of attempts + // will typically be much higher than what's suggested by maybeDropMsgApp. return (*Replica)(sdh).store.cfg.RaftDelaySplitToSuppressSnapshotTicks } @@ -76,14 +89,6 @@ func (sdh *splitDelayHelper) Sleep(ctx context.Context) time.Duration { } func maybeDelaySplitToAvoidSnapshot(ctx context.Context, sdh splitDelayHelperI) string { - // We have an "optimization" to avoid Raft snapshots by dropping some - // outgoing MsgAppResp (see the _ assignment below) which takes effect for - // RaftPostSplitSuppressSnapshotTicks ticks after an uninitialized replica - // is created. This check can err, in which case the snapshot will be - // delayed for that many ticks, and so we want to delay by at least as much - // plus a bit of padding to give a snapshot a chance to catch the follower - // up. If we run out of time, we'll resume the split no matter what. - _ = (*Replica)(nil).maybeDropMsgAppResp // guru assignment maxDelaySplitToAvoidSnapshotTicks := sdh.NumAttempts() var slept time.Duration diff --git a/pkg/storage/split_trigger_helper.go b/pkg/storage/split_trigger_helper.go new file mode 100644 index 000000000000..f17d98bba7c8 --- /dev/null +++ b/pkg/storage/split_trigger_helper.go @@ -0,0 +1,161 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "go.etcd.io/etcd/raft/raftpb" +) + +const maxDelaySplitTriggerTicks = 100 + +type replicaMsgAppDropper Replica + +func (rd *replicaMsgAppDropper) Args() (initialized bool, ticks int) { + r := (*Replica)(rd) + r.mu.RLock() + initialized = r.isInitializedRLocked() + ticks = r.mu.ticks + r.mu.RUnlock() + return initialized, ticks +} + +func (rd *replicaMsgAppDropper) ShouldDrop(startKey roachpb.RKey) (fmt.Stringer, bool) { + lhsRepl := (*Replica)(rd).store.LookupReplica(startKey) + if lhsRepl == nil { + return nil, false + } + _, _ = lhsRepl.store.gcQueue.Add(lhsRepl, replicaGCPriorityDefault) + return lhsRepl, true +} + +type msgAppDropper interface { + Args() (initialized bool, ticks int) + ShouldDrop(key roachpb.RKey) (fmt.Stringer, bool) +} + +// maybeDropMsgApp returns true if the incoming Raft message should be dropped. +// It does so if the recipient replica is uninitialized (i.e. has no state) and +// is waiting for a split trigger to apply,in which case delivering the message +// in this situation would result in an unnecessary Raft snapshot: the MsgApp +// would be rejected and the rejection would prompt the leader to send a +// snapshot, while the split trigger would likely populate the replica "for +// free". However, there are some situations in which this is not the case (all +// taken into account by this method by allowing the MsgApp through). +func maybeDropMsgApp( + ctx context.Context, r msgAppDropper, msg *raftpb.Message, startKey roachpb.RKey, +) (drop bool) { + // Run the cheapest check first. If the leader doesn't think this replica is + // probing, it won't set msg.Context (the common case). + // Note that startKey could be of length zero (not nil) if the sender is a + // replica of the first range. + if msg.Type != raftpb.MsgApp || startKey == nil { + return false + } + + // The leader doesn't know our state, so it injected its start key into the + // message via msg.Context. Check if this replica might be waiting for a + // split trigger. The first condition for that is not knowing the key + // bounds, i.e. not being initialized. + initialized, ticks := r.Args() + + if initialized { + return false + } + + // The goal is to find out if this replica is waiting for a split trigger. + // We do so by looking up the start key in the local store. If we find a + // replica for the start key, we know that that replica is in theory going + // to apply the split trigger and populate the right hand side (i.e. this + // replica): + // + // sender (leader) [a--lhs--b)[b---rhs----c) + // \ + // \ + // (1)\ MsgApp (startKey='b') + // \ + // v + // recipient [a----------lhs--------c) (this uninitialized replica) + // ĘŚ / + // \______________/ (2) + // 'b' + // + // However, it's also possible that the left hand side has been rebalanced + // away and is going to be GC'ed soon; queue a check to make sure this would + // happen ASAP. (The leader will probe this replica only once per heartbeat + // interval, so we're not going to queue these checks at some high rate). + // + // New replicas only get created through splits or rebalances, so if we + // don't find a left hand side, it was either garbage collected after having + // been removed from the store (see the above comment), or there wasn't a + // split in the first case and this replica was instead created through an + // up-replication for which the preemptive snapshot had been lost (i.e. + // accidentally GC'ed before the replication change succeeded). + // + // Note that there's a subtle case in which the left hand side is caught up + // across the split trigger via a snapshot. In that case, since we're looking + // up the start key of the right-hand side, we have the following picture: + // + // sender (leader) [a--lhs--b)[b---rhs----c) + // \ + // \ + // (1)\ MsgApp (startKey='b') + // \ + // v + // recipient [a--lhs--b) (this uninitialized replica) + // + // Trying to look up the replica for 'b', we'd come up empty and deliver the + // message, resulting in a snapshot, as intended. + // + // Note that the invariant that the start key points at a replica that will + // definitely apply the split trigger holds even if the left-hand range + // carries out splits (as that doesn't change its start key) or gets merged + // away (as this entails either a removal of the follower's replica during + // colocation, or waiting for the follower to have caught up which implies + // executing all pending split triggers). + + verbose := verboseRaftLoggingEnabled() + + // NB: the caller is likely holding r.raftMu, but that's OK according to + // the lock order. We're not allowed to hold r.mu, but we don't. + lhsRepl, drop := r.ShouldDrop(startKey) + if !drop { + return false + } + + if verbose { + log.Infof(ctx, "start key is contained in replica %v", lhsRepl) + } + if ticks > maxDelaySplitTriggerTicks { + // This is an escape hatch in case there are other scenarios (missed in + // the above analysis) in which a split trigger just isn't coming. If + // there are, the idea is that we notice this log message and improve + // the heuristics. + log.Warningf( + ctx, + "would have dropped incoming MsgApp to wait for split trigger, "+ + "but allowing due to %d (>%d) ticks", + ticks, maxDelaySplitTriggerTicks) + return false + } + if verbose { + log.Infof(ctx, "dropping MsgApp at index %d to wait for split trigger", msg.Index) + } + return true +} diff --git a/pkg/storage/split_trigger_helper_test.go b/pkg/storage/split_trigger_helper_test.go new file mode 100644 index 000000000000..3523538a4705 --- /dev/null +++ b/pkg/storage/split_trigger_helper_test.go @@ -0,0 +1,103 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/raft/raftpb" +) + +type testMsgAppDropper struct { + initialized bool + ticks int + lhs bool + + startKey string // set by ShouldDrop +} + +func (td *testMsgAppDropper) Args() (initialized bool, ticks int) { + return td.initialized, td.ticks +} + +func (td *testMsgAppDropper) ShouldDrop(startKey roachpb.RKey) (fmt.Stringer, bool) { + if len(startKey) == 0 { + panic("empty startKey") + } + td.startKey = string(startKey) + return &Replica{}, td.lhs +} + +func TestMaybeDropMsgApp(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := map[testMsgAppDropper]bool{ + // Already init'ed. + {initialized: true}: false, + // Left hand side not found. + {initialized: false}: false, + // Drop message to wait for trigger. + {initialized: false, lhs: true}: true, + // Drop message to wait for trigger. + {initialized: false, lhs: true, ticks: maxDelaySplitTriggerTicks}: true, + // Escape hatch fires. + {initialized: false, lhs: true, ticks: maxDelaySplitTriggerTicks + 1}: false, + } + + msgHeartbeat := &raftpb.Message{ + Type: raftpb.MsgHeartbeat, + } + msgApp := &raftpb.Message{ + Type: raftpb.MsgApp, + } + ctx := context.Background() + for dropper, exp := range testCases { + t.Run(fmt.Sprintf("%v", dropper), func(t *testing.T) { + assert.Equal(t, false, maybeDropMsgApp(ctx, &dropper, msgHeartbeat, nil)) + assert.Equal(t, false, maybeDropMsgApp(ctx, &dropper, msgApp, nil)) + assert.Equal(t, "", dropper.startKey) + startKey := roachpb.RKey("foo") + assert.Equal(t, exp, maybeDropMsgApp(ctx, &dropper, msgApp, startKey)) + if exp { + assert.Equal(t, string(startKey), dropper.startKey) + } + }) + } +} + +// TestProtoZeroNilSlice verifies that the proto encoding round-trips empty and +// nil byte slices correctly. +func TestProtoZeroNilSlice(t *testing.T) { + defer leaktest.AfterTest(t)() + + testutils.RunTrueAndFalse(t, "isNil", func(t *testing.T, isNil bool) { + msg := &RaftMessageRequest{} + if !isNil { + msg.RangeStartKey = roachpb.RKey("foo") + } + b, err := protoutil.Marshal(msg) + assert.NoError(t, err) + out := &RaftMessageRequest{} + assert.NoError(t, protoutil.Unmarshal(b, out)) + assert.Equal(t, isNil, out.RangeStartKey == nil) + }) +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index d2551a381db9..4231a97c9ee4 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -3376,8 +3376,11 @@ func (s *Store) processRaftRequestWithReplica( ) } - if err := r.stepRaftGroup(req); err != nil { - return roachpb.NewError(err) + drop := maybeDropMsgApp(r.AnnotateCtx(context.Background()), (*replicaMsgAppDropper)(r), &req.Message, req.RangeStartKey) + if !drop { + if err := r.stepRaftGroup(req); err != nil { + return roachpb.NewError(err) + } } return nil }