Skip to content

Commit

Permalink
storage: remove preemptive snapshots and replica ID 0
Browse files Browse the repository at this point in the history
This commit adds a migration to remove any remaining on-disk pre-emptive
snapshots and then excises the concept from the codebase. With this commit
we gain the invariant that a Replica always has a non-zero replicaID that
never changes.

There's some cruft in replica_init that I'd love to clean up but don't
have a clear vision of how.

Release note: None
  • Loading branch information
ajwerner committed Feb 4, 2020
1 parent 4c38ba1 commit b2850e1
Show file tree
Hide file tree
Showing 28 changed files with 659 additions and 1,439 deletions.
101 changes: 13 additions & 88 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,13 +1042,22 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
mtc.Start(t, 3)

rep, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
repDesc, err := rep.GetReplicaDescriptor()
require.NoError(t, err)
desc := protoutil.Clone(rep.Desc()).(*roachpb.RangeDescriptor)
desc.AddReplica(2, 2, roachpb.LEARNER)
rep2Desc, found := desc.GetReplicaDescriptor(2)
require.True(t, found)
header := storage.SnapshotRequest_Header{
CanDecline: true,
RangeSize: 100,
State: storagepb.ReplicaState{Desc: rep.Desc()},
State: storagepb.ReplicaState{Desc: desc},
RaftMessageRequest: storage.RaftMessageRequest{
RangeID: rep.RangeID,
FromReplica: repDesc,
ToReplica: rep2Desc,
},
}
header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes()
// Cause this stream to return an error as soon as we ask it for something.
Expand Down Expand Up @@ -3643,90 +3652,6 @@ func TestRemovedReplicaError(t *testing.T) {
})
}

// TestRemoveRangeWithoutGC ensures that we do not panic when a
// replica has been removed but not yet GC'd (and therefore
// does not have an active raft group).
func TestRemoveRangeWithoutGC(t *testing.T) {
defer leaktest.AfterTest(t)()

sc := storage.TestStoreConfig(nil)
sc.TestingKnobs.DisableReplicaGCQueue = true
sc.TestingKnobs.DisableEagerReplicaRemoval = true
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 2)
const rangeID roachpb.RangeID = 1
mtc.replicateRange(rangeID, 1)
mtc.transferLease(context.TODO(), rangeID, 0, 1)
mtc.unreplicateRange(rangeID, 0)

// Wait for store 0 to process the removal. The in-memory replica
// object still exists but store 0 is no longer present in the
// configuration.
testutils.SucceedsSoon(t, func() error {
rep, err := mtc.stores[0].GetReplica(rangeID)
if err != nil {
return err
}
desc := rep.Desc()
if len(desc.InternalReplicas) != 1 {
return errors.Errorf("range has %d replicas", len(desc.InternalReplicas))
}
return nil
})

// The replica's data is still on disk.
var desc roachpb.RangeDescriptor
descKey := keys.RangeDescriptorKey(roachpb.RKeyMin)
if ok, err := engine.MVCCGetProto(context.Background(), mtc.stores[0].Engine(), descKey,
mtc.stores[0].Clock().Now(), &desc, engine.MVCCGetOptions{}); err != nil {
t.Fatal(err)
} else if !ok {
t.Fatal("expected range descriptor to be present")
}

// Stop and restart the store. The primary motivation for this test
// is to ensure that the store does not panic on restart (as was
// previously the case).
mtc.stopStore(0)
mtc.restartStore(0)

// Initially, the in-memory Replica object is recreated from the
// on-disk state.
if _, err := mtc.stores[0].GetReplica(rangeID); err != nil {
t.Fatal(err)
}

// Re-enable the GC queue to allow the replica to be destroyed
// (after the simulated passage of time).
mtc.advanceClock(context.TODO())
mtc.manualClock.Increment(int64(storage.ReplicaGCQueueInactivityThreshold + 1))
mtc.stores[0].SetReplicaGCQueueActive(true)
// There's a fun flake where between when the queue detects that this replica
// needs to be removed and when it actually gets processed whereby an older
// replica will send this replica a raft message which will give it an ID.
// When our replica ID changes the queue will ignore the previous addition and
// we won't be removed.
testutils.SucceedsSoon(t, func() error {
mtc.stores[0].MustForceReplicaGCScanAndProcess()

// The Replica object should be removed.
const msg = "r[0-9]+ was not found"
if _, err := mtc.stores[0].GetReplica(rangeID); !testutils.IsError(err, msg) {
return errors.Errorf("expected %s, got %v", msg, err)
}
return nil
})

// And the data should no longer be on disk.
if ok, err := engine.MVCCGetProto(context.Background(), mtc.stores[0].Engine(), descKey,
mtc.stores[0].Clock().Now(), &desc, engine.MVCCGetOptions{}); err != nil {
t.Fatal(err)
} else if ok {
t.Fatalf("expected range descriptor to be absent")
}
}

func TestTransferRaftLeadership(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
40 changes: 16 additions & 24 deletions pkg/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,6 @@ var (
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotsPreemptiveApplied = metric.Metadata{
Name: "range.snapshots.preemptive-applied",
Help: "Number of applied pre-emptive snapshots",
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeRaftLeaderTransfers = metric.Metadata{
Name: "range.raftleadertransfers",
Help: "Number of raft leader transfers",
Expand Down Expand Up @@ -1060,15 +1054,14 @@ type StoreMetrics struct {
// accordingly.

// Range event metrics.
RangeSplits *metric.Counter
RangeMerges *metric.Counter
RangeAdds *metric.Counter
RangeRemoves *metric.Counter
RangeSnapshotsGenerated *metric.Counter
RangeSnapshotsNormalApplied *metric.Counter
RangeSnapshotsLearnerApplied *metric.Counter
RangeSnapshotsPreemptiveApplied *metric.Counter
RangeRaftLeaderTransfers *metric.Counter
RangeSplits *metric.Counter
RangeMerges *metric.Counter
RangeAdds *metric.Counter
RangeRemoves *metric.Counter
RangeSnapshotsGenerated *metric.Counter
RangeSnapshotsNormalApplied *metric.Counter
RangeSnapshotsLearnerApplied *metric.Counter
RangeRaftLeaderTransfers *metric.Counter

// Raft processing metrics.
RaftTicks *metric.Counter
Expand Down Expand Up @@ -1267,15 +1260,14 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RdbPendingCompaction: metric.NewGauge(metaRdbPendingCompaction),

// Range event metrics.
RangeSplits: metric.NewCounter(metaRangeSplits),
RangeMerges: metric.NewCounter(metaRangeMerges),
RangeAdds: metric.NewCounter(metaRangeAdds),
RangeRemoves: metric.NewCounter(metaRangeRemoves),
RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated),
RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied),
RangeSnapshotsLearnerApplied: metric.NewCounter(metaRangeSnapshotsLearnerApplied),
RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),
RangeSplits: metric.NewCounter(metaRangeSplits),
RangeMerges: metric.NewCounter(metaRangeMerges),
RangeAdds: metric.NewCounter(metaRangeAdds),
RangeRemoves: metric.NewCounter(metaRangeRemoves),
RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated),
RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied),
RangeSnapshotsLearnerApplied: metric.NewCounter(metaRangeSnapshotsLearnerApplied),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),

// Raft processing metrics.
RaftTicks: metric.NewCounter(metaRaftTicks),
Expand Down
Loading

0 comments on commit b2850e1

Please sign in to comment.