Skip to content

Commit

Permalink
Merge #28533
Browse files Browse the repository at this point in the history
28533: storage: handle snapshots that span merges r=tschottdorf a=benesch

This needs a fair bit of cleanup, but I wanted to get it out for early review. Does this general approach seem reasonable? I'm actually kind of shocked by how little code is involved.

Co-authored-by: Nikhil Benesch <[email protected]>
  • Loading branch information
craig[bot] and benesch committed Aug 17, 2018
2 parents d78dd7a + b728fcd commit 9f5ea5b
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 36 deletions.
153 changes: 138 additions & 15 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ func TestStoreRangeMergeTwoEmptyRanges(t *testing.T) {
}
}

func getEngineKeySet(t *testing.T, e engine.Engine) map[string]struct{} {
t.Helper()
kvs, err := engine.Scan(e, engine.NilKey, engine.MVCCKeyMax, 0 /* max */)
if err != nil {
t.Fatal(err)
}
out := map[string]struct{}{}
for _, kv := range kvs {
out[string(kv.Key.Key)] = struct{}{}
}
return out
}

// TestStoreRangeMergeMetadataCleanup tests that all metadata of a
// subsumed range is cleaned up on merge.
func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
Expand All @@ -126,19 +139,6 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
defer mtc.Stop()
store := mtc.Store(0)

scan := func() map[string]struct{} {
t.Helper()
kvs, err := engine.Scan(store.Engine(), engine.NilKey, engine.MVCCKeyMax, 0 /* max */)
if err != nil {
t.Fatal(err)
}
out := map[string]struct{}{}
for _, kv := range kvs {
out[string(kv.Key.Key)] = struct{}{}
}
return out
}

content := roachpb.Key("testing!")

// Write some values left of the proposed split key.
Expand All @@ -148,7 +148,7 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
}

// Collect all the keys.
preKeys := scan()
preKeys := getEngineKeySet(t, store.Engine())

// Split the range.
lhsDesc, rhsDesc, err := createSplitRanges(ctx, store)
Expand All @@ -171,7 +171,7 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
}

// Collect all the keys again.
postKeys := scan()
postKeys := getEngineKeySet(t, store.Engine())

// Compute the new keys.
for k := range preKeys {
Expand Down Expand Up @@ -1609,6 +1609,129 @@ func TestStoreRangeMergeReadoptedLHSFollower(t *testing.T) {
mtc.transferLease(ctx, lhsDesc.RangeID, 0, 2)
}

// unreliableRaftHandler drops all Raft messages that are addressed to the
// specified rangeID, but lets all other messages through.
type unreliableRaftHandler struct {
rangeID roachpb.RangeID
storage.RaftMessageHandler
}

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
if req.RangeID == h.rangeID {
return nil
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)
distSender := mtc.distSenders[0]

// Create three fully-caught-up, adjacent ranges on all three stores.
mtc.replicateRange(roachpb.RangeID(1), 1, 2)
splitKeys := []roachpb.Key{roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")}
for _, key := range splitKeys {
if _, pErr := client.SendWrapped(ctx, distSender, adminSplitArgs(key)); pErr != nil {
t.Fatal(pErr)
}
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(key, []int64{1, 1, 1})
}

lhsRepl0 := store0.LookupReplica(roachpb.RKey("a"))

// Start dropping all Raft traffic to the first range on store1.
mtc.transport.Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: lhsRepl0.RangeID,
RaftMessageHandler: store2,
})

// Merge [a, b) into [b, c), then [a, c) into [c, /Max).
for i := 0; i < 2; i++ {
if _, pErr := client.SendWrapped(ctx, distSender, adminMergeArgs(roachpb.Key("a"))); pErr != nil {
t.Fatal(pErr)
}
}

// Truncate the logs of the LHS.
{
repl := store0.LookupReplica(roachpb.RKey("a"))
index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
// Truncate the log at index+1 (log entries < N are removed, so this
// includes the merge).
truncArgs := &roachpb.TruncateLogRequest{
RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a")},
Index: index,
RangeID: repl.RangeID,
}
if _, err := client.SendWrapped(ctx, mtc.distSenders[0], truncArgs); err != nil {
t.Fatal(err)
}
}

beforeRaftSnaps := store2.Metrics().RangeSnapshotsNormalApplied.Count()

// Restore Raft traffic to the LHS on store2.
mtc.transport.Listen(store2.Ident.StoreID, store2)

// Wait for all replicas to catch up to the same point. Because we truncated
// the log while store2 was unavailable, this will require a Raft snapshot.
for _, key := range splitKeys {
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(key, []int64{2, 2, 2})
}

afterRaftSnaps := store2.Metrics().RangeSnapshotsNormalApplied.Count()
if afterRaftSnaps == beforeRaftSnaps {
t.Fatal("expected store2 to apply at least 1 additional raft snapshot")
}

// Verify that the sets of keys in store0 and store2 are identical.
storeKeys0 := getEngineKeySet(t, store0.Engine())
storeKeys2 := getEngineKeySet(t, store2.Engine())
ignoreKey := func(k string) bool {
// Unreplicated keys for the two remaining ranges are allowed to differ.
return strings.HasPrefix(k, string(keys.MakeRangeIDUnreplicatedPrefix(roachpb.RangeID(1)))) ||
strings.HasPrefix(k, string(keys.MakeRangeIDUnreplicatedPrefix(lhsRepl0.RangeID)))
}
for k := range storeKeys0 {
if ignoreKey(k) {
continue
}
if _, ok := storeKeys2[k]; !ok {
t.Errorf("store2 missing key %s", roachpb.Key(k))
}
}
for k := range storeKeys2 {
if ignoreKey(k) {
continue
}
if _, ok := storeKeys0[k]; !ok {
t.Errorf("store2 has extra key %s", roachpb.Key(k))
}
}
}

// TestStoreRangeMergeDuringShutdown verifies that a shutdown of a store
// containing the RHS of a merge can occur cleanly. This previously triggered
// a fatal error (#27552).
Expand Down
52 changes: 51 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4050,7 +4050,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
log.Fatalf(ctx, "incoming snapshot id doesn't match raft snapshot id: %s != %s", snapUUID, inSnap.SnapUUID)
}

if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState); err != nil {
// Applying this snapshot may require us to subsume one or more of our right
// neighbors. This occurs if this replica is informed about the merges via a
// Raft snapshot instead of a MsgApp containing the merge commits, e.g.,
// because it went offline before the merge commits applied and did not come
// back online until after the merge commits were truncated away.
subsumedRepls, releaseMergeLock := r.maybeAcquireSnapshotMergeLock(ctx, inSnap)
defer releaseMergeLock()

if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState, subsumedRepls); err != nil {
const expl = "while applying snapshot"
return stats, expl, errors.Wrap(err, expl)
}
Expand Down Expand Up @@ -5502,6 +5510,48 @@ func (r *Replica) processRaftCommand(
return raftCmd.ReplicatedEvalResult.ChangeReplicas != nil
}

// maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes
// any replicas and, if so, locks them for subsumption. See acquireMergeLock
// for details about the lock itself.
func (r *Replica) maybeAcquireSnapshotMergeLock(
ctx context.Context, inSnap IncomingSnapshot,
) (subsumedRepls []*Replica, releaseMergeLock func()) {
// Any replicas that overlap with the bounds of the incoming snapshot are ours
// to subsume; further, the end of the last overlapping replica will exactly
// align with the end of the snapshot. How are we guaranteed this? Each merge
// could not have committed unless this store had an up-to-date replica of the
// RHS at the time of the merge. Nothing could have removed that RHS replica,
// as the replica GC queue cannot GC a replica unless it can prove its
// left-hand neighbor has no pending merges to apply. And that RHS replica
// could not have been further split or merged, as it never processes another
// command after the merge commits.
endKey := r.Desc().EndKey
if endKey == nil || !endKey.Less(inSnap.State.Desc.EndKey) {
// The existing replica is unitialized, in which case we've already
// installed a placeholder for snapshot's keyspace, or this snapshot does
// not widen the existing replica. No merge lock needed.
return nil, func() {}
}
for endKey.Less(inSnap.State.Desc.EndKey) {
sRepl := r.store.LookupReplica(endKey)
if sRepl == nil || !endKey.Equal(sRepl.Desc().StartKey) {
log.Fatalf(ctx, "snapshot widens existing replica, but no replica exists for subsumed key %s", endKey)
}
sRepl.raftMu.Lock()
subsumedRepls = append(subsumedRepls, sRepl)
endKey = sRepl.Desc().EndKey
}
if !endKey.Equal(inSnap.State.Desc.EndKey) {
log.Fatalf(ctx, "subsumed replicas %v extend past snapshot end key %s",
subsumedRepls, inSnap.State.Desc.EndKey)
}
return subsumedRepls, func() {
for _, sr := range subsumedRepls {
sr.raftMu.Unlock()
}
}
}

// maybeAcquireSplitMergeLock examines the given raftCmd (which need
// not be evaluated yet) and acquires the split or merge lock if
// necessary (in addition to other preparation). It returns a function
Expand Down
56 changes: 52 additions & 4 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"context"
"fmt"
"math"
"time"

"github.com/coreos/etcd/raft"
Expand Down Expand Up @@ -708,11 +709,18 @@ func clearRangeData(

// applySnapshot updates the replica based on the given snapshot and associated
// HardState. All snapshots must pass through Raft for correctness, i.e. the
// parameters to this method must be taken from a raft.Ready. It is the caller's
// responsibility to call r.store.processRangeDescriptorUpdate(r) after a
// successful applySnapshot. This method requires that r.raftMu is held.
// parameters to this method must be taken from a raft.Ready. Any replicas
// specified in subsumedRepls will be destroyed atomically with the application
// of the snapshot. It is the caller's responsibility to call
// r.store.processRangeDescriptorUpdate(r) after a successful applySnapshot.
// This method requires that r.raftMu is held, as well as the raftMus of any
// replicas in subsumedRepls.
func (r *Replica) applySnapshot(
ctx context.Context, inSnap IncomingSnapshot, snap raftpb.Snapshot, hs raftpb.HardState,
ctx context.Context,
inSnap IncomingSnapshot,
snap raftpb.Snapshot,
hs raftpb.HardState,
subsumedRepls []*Replica,
) (err error) {
s := *inSnap.State
if s.Desc.RangeID != r.RangeID {
Expand Down Expand Up @@ -781,6 +789,26 @@ func (r *Replica) applySnapshot(
batch := r.store.Engine().NewWriteOnlyBatch()
defer batch.Close()

// If we're subsuming a replica below, we don't have its last NextReplicaID,
// nor can we obtain it. That's OK: we can just be conservative and use the
// maximum possible replica ID. preDestroyRaftMuLocked will write a replica
// tombstone using this maximum possible replica ID, which would normally be
// problematic, as it would prevent this store from ever having a new replica
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
const subsumedNextReplicaID = math.MaxInt32

// As part of applying the snapshot, we may need to subsume replicas that have
// been merged into this range. Destroy their data in the same batch in which
// we apply the snapshot.
for _, sr := range subsumedRepls {
if err := sr.preDestroyRaftMuLocked(
ctx, r.store.Engine(), batch, subsumedNextReplicaID, true, /* destroyData */
); err != nil {
return err
}
}

// Delete everything in the range and recreate it from the snapshot.
// We need to delete any old Raft log entries here because any log entries
// that predate the snapshot will be orphaned and never truncated or GC'd.
Expand Down Expand Up @@ -869,6 +897,26 @@ func (r *Replica) applySnapshot(
}
stats.commit = timeutil.Now()

for _, sr := range subsumedRepls {
// We removed sr's data when we committed the batch. Finish subsumption by
// updating the in-memory bookkeping.
if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil {
return err
}
// We already hold sr's raftMu, so we must call removeReplicaImpl directly.
// Note that it's safe to update the store's metadata for sr's removal
// separately from updating the store's metadata for r's new descriptor
// (i.e., under a different store.mu acquisition). Each store.mu acquisition
// leaves the store in a consistent state, and access to the replicas
// themselves is protected by their raftMus, which are held from start to
// finish.
if err := r.store.removeReplicaImpl(ctx, sr, subsumedNextReplicaID, RemoveOptions{
DestroyData: false, // data is already destroyed
}); err != nil {
return err
}
}

r.mu.Lock()
// We set the persisted last index to the last applied index. This is
// not a correctness issue, but means that we may have just transferred
Expand Down
26 changes: 15 additions & 11 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ type Store struct {
// Replicas in the Store is being changed (which may happen outside of raft
// via the replica GC queue).
//
// If holding raftMus for multiple different replicas simultaneously,
// acquire the locks in the order that the replicas appear in replicasByKey.
//
// * Replica.readOnlyCmdMu (RWMutex): Held in read mode while any
// read-only command is in progress on the replica; held in write
// mode while executing a commit trigger. This is necessary
Expand Down Expand Up @@ -489,12 +492,12 @@ type Store struct {
// state. Callbacks from the scheduler are performed while not holding this
// mutex in order to observe the above ordering constraints.
//
// Splits (and merges, but they're not finished and so will not be discussed
// here) deserve special consideration: they operate on two ranges. Naively,
// this is fine because the right-hand range is brand new, but an
// uninitialized version may have been created by a raft message before we
// process the split (see commentary on Replica.splitTrigger). We make this
// safe by locking the right-hand range for the duration of the Raft command
// Splits and merges deserve special consideration: they operate on two
// ranges. For splits, this might seem fine because the right-hand range is
// brand new, but an uninitialized version may have been created by a raft
// message before we process the split (see commentary on
// Replica.splitTrigger). We make this safe, for both splits and merges, by
// locking the right-hand range for the duration of the Raft command
// containing the split/merge trigger.
//
// Note that because we acquire and release Store.mu and Replica.mu
Expand Down Expand Up @@ -3393,9 +3396,6 @@ func (s *Store) processRaftRequestWithReplica(
// processRaftSnapshotRequest processes the incoming snapshot Raft request on
// the request's specified replica. This snapshot can be preemptive or not. If
// not, the function makes sure to handle any updated Raft Ready state.
//
// TODO(benesch): handle snapshots that widen EndKey. These can occur if this
// replica was behind when the range committed a merge.
func (s *Store) processRaftSnapshotRequest(
ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot,
) *roachpb.Error {
Expand Down Expand Up @@ -3579,8 +3579,12 @@ func (s *Store) processRaftSnapshotRequest(
r.mu.Unlock()
}

// Apply the snapshot, as Raft told us to.
if err := r.applySnapshot(ctx, inSnap, ready.Snapshot, ready.HardState); err != nil {
// Apply the snapshot, as Raft told us to. Preemptive snapshots never
// subsume replicas (this is guaranteed by Store.canApplySnapshot), so
// we can simply pass nil for the subsumedRepls parameter.
if err := r.applySnapshot(
ctx, inSnap, ready.Snapshot, ready.HardState, nil, /* subsumedRepls */
); err != nil {
return roachpb.NewError(err)
}

Expand Down
Loading

0 comments on commit 9f5ea5b

Please sign in to comment.