diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index d8f8c6ba6121..914d334755a5 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -95,6 +95,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen in the /debug page trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set. -versioncustom validation2.1-5set the active cluster version in the format '.'. +versioncustom validation2.1-6set the active cluster version in the format '.'. diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index ebee2fe9ea8a..6bd5e0bcfcf3 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -121,7 +121,8 @@ var ( LocalRaftAppliedIndexLegacySuffix = []byte("rfta") // LocalRaftTombstoneSuffix is the suffix for the raft tombstone. LocalRaftTombstoneSuffix = []byte("rftb") - // LocalRaftTruncatedStateLegacySuffix is the suffix for the RaftTruncatedState. + // LocalRaftTruncatedStateLegacySuffix is the suffix for the legacy RaftTruncatedState. + // See VersionUnreplicatedRaftTruncatedState. LocalRaftTruncatedStateLegacySuffix = []byte("rftt") // LocalRangeLeaseSuffix is the suffix for a range lease. LocalRangeLeaseSuffix = []byte("rll-") diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index b9af3cb223fe..8d0d1e91f8f0 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -253,6 +253,7 @@ func LeaseAppliedIndexLegacyKey(rangeID roachpb.RangeID) roachpb.Key { } // RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState. +// See VersionUnreplicatedRaftTruncatedState. func RaftTruncatedStateLegacyKey(rangeID roachpb.RangeID) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateLegacyKey() } @@ -314,6 +315,11 @@ func RaftTombstoneKey(rangeID roachpb.RangeID) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftTombstoneKey() } +// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState. +func RaftTruncatedStateKey(rangeID roachpb.RangeID) roachpb.Key { + return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateKey() +} + // RaftHardStateKey returns a system-local key for a Raft HardState. func RaftHardStateKey(rangeID roachpb.RangeID) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftHardStateKey() @@ -935,6 +941,11 @@ func (b RangeIDPrefixBuf) RaftTombstoneKey() roachpb.Key { return append(b.unreplicatedPrefix(), LocalRaftTombstoneSuffix...) } +// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState. +func (b RangeIDPrefixBuf) RaftTruncatedStateKey() roachpb.Key { + return append(b.unreplicatedPrefix(), LocalRaftTruncatedStateLegacySuffix...) +} + // RaftHardStateKey returns a system-local key for a Raft HardState. func (b RangeIDPrefixBuf) RaftHardStateKey() roachpb.Key { return append(b.unreplicatedPrefix(), LocalRaftHardStateSuffix...) diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index 33210ead7897..3fe3a46e1c15 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -150,7 +150,7 @@ var ( ppFunc: raftLogKeyPrint, psFunc: raftLogKeyParse, }, - {name: "LegacyRaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix}, + {name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix}, {name: "RaftLastIndex", suffix: LocalRaftLastIndexSuffix}, {name: "RangeLastReplicaGCTimestamp", suffix: LocalRangeLastReplicaGCTimestampSuffix}, {name: "RangeLastVerificationTimestamp", suffix: LocalRangeLastVerificationTimestampSuffixDeprecated}, diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index 4e3790be87cb..f3b679766d2e 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -58,7 +58,8 @@ func TestPrettyPrint(t *testing.T) { {RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState"}, {RaftAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftAppliedIndex"}, {LeaseAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LeaseAppliedIndex"}, - {RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LegacyRaftTruncatedState"}, + {RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState"}, + {RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftTruncatedState"}, {RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease"}, {RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats"}, {RangeTxnSpanGCThresholdKey(roachpb.RangeID(1000001)), `/Local/RangeID/1000001/r/RangeTxnSpanGCThreshold`}, diff --git a/pkg/server/version_cluster_test.go b/pkg/server/version_cluster_test.go index afca6ccfd360..bc00d92ef0cb 100644 --- a/pkg/server/version_cluster_test.go +++ b/pkg/server/version_cluster_test.go @@ -24,16 +24,20 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) type testClusterWithHelpers struct { @@ -191,6 +195,174 @@ func TestClusterVersionPersistedOnJoin(t *testing.T) { } } +// TestClusterVersionUnreplicatedRaftTruncatedState exercises the +// VersionUnreplicatedRaftTruncatedState migration in as much detail as possible +// in a unit test. +// +// It starts a four node cluster with a pre-migration version and upgrades into +// the new version while traffic and scattering are active, verifying that the +// truncated states are rewritten. +func TestClusterVersionUnreplicatedRaftTruncatedState(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + dir, finish := testutils.TempDir(t) + defer finish() + + oldVersion := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState - 1) + oldVersionS := oldVersion.String() + newVersionS := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState).String() + + // Four node cluster in which all versions support newVersion (i.e. would in + // principle upgrade to it) but are bootstrapped at oldVersion. + versions := [][2]string{ + {oldVersionS, newVersionS}, + {oldVersionS, newVersionS}, + {oldVersionS, newVersionS}, + {oldVersionS, newVersionS}, + } + + bootstrapVersion := cluster.ClusterVersion{Version: oldVersion} + + knobs := base.TestingKnobs{ + Store: &storage.StoreTestingKnobs{ + BootstrapVersion: &bootstrapVersion, + }, + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + }, + } + + tc := setupMixedCluster(t, knobs, versions, dir) + defer tc.TestCluster.Stopper().Stop(ctx) + + if _, err := tc.ServerConn(0).Exec(` +CREATE TABLE kv (id INT PRIMARY KEY, v INT); +ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i); +`); err != nil { + t.Fatal(err) + } + + scatter := func() { + t.Helper() + if _, err := tc.ServerConn(0).Exec( + `ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i%$1+1], i FROM generate_series(0, 9) AS g(i)`, len(versions), + ); err != nil { + t.Log(err) + } + } + + var n int + insert := func() { + t.Helper() + n++ + // Write only to a subset of our ranges to guarantee log truncations there. + _, err := tc.ServerConn(0).Exec(`UPSERT INTO kv VALUES($1, $2)`, n%2, n) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 500; i++ { + insert() + } + scatter() + + for _, server := range tc.Servers { + assert.NoError(t, server.GetStores().(*storage.Stores).VisitStores(func(s *storage.Store) error { + s.VisitReplicas(func(r *storage.Replica) bool { + key := keys.RaftTruncatedStateKey(r.RangeID) + var truncState roachpb.RaftTruncatedState + found, err := engine.MVCCGetProto( + context.Background(), s.Engine(), key, + hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, + ) + if err != nil { + t.Fatal(err) + } + if found { + t.Errorf("unexpectedly found unreplicated TruncatedState at %s", key) + } + return true // want more + }) + return nil + })) + } + + if v := tc.getVersionFromSelect(0); v != oldVersionS { + t.Fatalf("running %s, wanted %s", v, oldVersionS) + } + + assert.NoError(t, tc.setVersion(0, newVersionS)) + for i := 0; i < 500; i++ { + insert() + } + scatter() + + for _, server := range tc.Servers { + testutils.SucceedsSoon(t, func() error { + err := server.GetStores().(*storage.Stores).VisitStores(func(s *storage.Store) error { + // We scattered and so old copies of replicas may be laying around. + // If we're not proactive about removing them, the test gets pretty + // slow because those replicas aren't caught up any more. + s.MustForceReplicaGCScanAndProcess() + var err error + s.VisitReplicas(func(r *storage.Replica) bool { + snap := s.Engine().NewSnapshot() + defer snap.Close() + + keyLegacy := keys.RaftTruncatedStateLegacyKey(r.RangeID) + keyUnreplicated := keys.RaftTruncatedStateKey(r.RangeID) + + if found, innerErr := engine.MVCCGetProto( + context.Background(), snap, keyLegacy, + hlc.Timestamp{}, nil, engine.MVCCGetOptions{}, + ); innerErr != nil { + t.Fatal(innerErr) + } else if found { + if err == nil { + err = errors.New("found legacy TruncatedState") + } + err = errors.Wrap(err, r.String()) + + // Force a log truncation to prove that this rectifies + // the situation. + status := r.RaftStatus() + if status != nil { + desc := r.Desc() + truncate := &roachpb.TruncateLogRequest{} + truncate.Key = desc.StartKey.AsRawKey() + truncate.RangeID = desc.RangeID + truncate.Index = status.HardState.Commit + var ba roachpb.BatchRequest + ba.RangeID = r.RangeID + ba.Add(truncate) + if _, err := s.DB().NonTransactionalSender().Send(ctx, ba); err != nil { + t.Fatal(err) + } + } + return true // want more + } + + if found, err := engine.MVCCGetProto( + context.Background(), snap, keyUnreplicated, + hlc.Timestamp{}, nil, engine.MVCCGetOptions{}, + ); err != nil { + t.Fatal(err) + } else if !found { + // We can't have neither of the keys present. + t.Fatalf("%s: unexpectedly did not find unreplicated TruncatedState at %s", r, keyUnreplicated) + } + + return true // want more + }) + return err + }) + return err + }) + } +} + func TestClusterVersionUpgrade(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() diff --git a/pkg/settings/cluster/cockroach_versions.go b/pkg/settings/cluster/cockroach_versions.go index cd151f156b2a..5601ca32d40e 100644 --- a/pkg/settings/cluster/cockroach_versions.go +++ b/pkg/settings/cluster/cockroach_versions.go @@ -71,6 +71,7 @@ const ( VersionExportStorageWorkload VersionLazyTxnRecord VersionSequencedReads + VersionUnreplicatedRaftTruncatedState // see versionsSingleton for details // Add new versions here (step one of two). @@ -315,6 +316,98 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Key: VersionSequencedReads, Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 5}, }, + { + // VersionLazyTxnRecord is https://github.com/cockroachdb/cockroach/pull/34660. + // When active, it moves the truncated state into unreplicated keyspace + // on log truncations. + // + // The migration works as follows: + // + // 1. at any log position, the replicas of a Range either use the new + // (unreplicated) key or the old one, and exactly one of them exists. + // + // 2. When a log truncation evaluates under the new cluster version, + // it initiates the migration by deleting the old key. Under the old cluster + // version, it behaves like today, updating the replicated truncated state. + // + // 3. The deletion signals new code downstream of Raft and triggers a write + // to the new, unreplicated, key (atomic with the deletion of the old key). + // + // 4. Future log truncations don't write any replicated data any more, but + // (like before) send along the TruncatedState which is written downstream + // of Raft atomically with the deletion of the log entries. This actually + // uses the same code as 3. + // What's new is that the truncated state needs to be verified before + // replacing a previous one. If replicas disagree about their truncated + // state, it's possible for replica X at FirstIndex=100 to apply a + // truncated state update that sets FirstIndex to, say, 50 (proposed by a + // replica with a "longer" historical log). In that case, the truncated + // state update must be ignored (this is straightforward downstream-of-Raft + // code). + // + // 5. When a split trigger evaluates, it seeds the RHS with the legacy + // key iff the LHS uses the legacy key, and the unreplicated key otherwise. + // This makes sure that the invariant that all replicas agree on the + // state of the migration is upheld. + // + // 6. When a snapshot is applied, the receiver is told whether the snapshot + // contains a legacy key. If not, it writes the truncated state (which is + // part of the snapshot metadata) in its unreplicated version. Otherwise + // it doesn't have to do anything (the range will migrate later). + // + // The following diagram visualizes the above. Note that it abuses sequence + // diagrams to get a nice layout; the vertical lines belonging to NewState + // and OldState don't imply any particular ordering of operations. + // + // ┌────────┐ ┌────────┐ + // │OldState│ │NewState│ + // └───┬────┘ └───┬────┘ + // │ Bootstrap under old version + // │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + // │ │ + // │ │ Bootstrap under new version + // │ │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + // │ │ + // │─ ─ ┐ + // │ | Log truncation under old version + // │< ─ ┘ + // │ │ + // │─ ─ ┐ │ + // │ | Snapshot │ + // │< ─ ┘ │ + // │ │ + // │ │─ ─ ┐ + // │ │ | Snapshot + // │ │< ─ ┘ + // │ │ + // │ Log truncation under new version │ + // │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─>│ + // │ │ + // │ │─ ─ ┐ + // │ │ | Log truncation under new version + // │ │< ─ ┘ + // │ │ + // │ │─ ─ ┐ + // │ │ | Log truncation under old version + // │ │< ─ ┘ (necessarily running new binary) + // + // Source: http://www.plantuml.com/plantuml/uml/ and the following input: + // + // @startuml + // scale 600 width + // + // OldState <--] : Bootstrap under old version + // NewState <--] : Bootstrap under new version + // OldState --> OldState : Log truncation under old version + // OldState --> OldState : Snapshot + // NewState --> NewState : Snapshot + // OldState --> NewState : Log truncation under new version + // NewState --> NewState : Log truncation under new version + // NewState --> NewState : Log truncation under old version\n(necessarily running new binary) + // @enduml + Key: VersionUnreplicatedRaftTruncatedState, + Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 6}, + }, // Add new versions here (step two of two). diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index c035e310449e..7f956f1f4ea5 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -268,7 +268,7 @@ select crdb_internal.set_vmodule('') query T select crdb_internal.node_executable_version() ---- -2.1-5 +2.1-6 query ITTT colnames select node_id, component, field, regexp_replace(regexp_replace(value, '^\d+$', ''), e':\\d+', ':') as value from crdb_internal.node_runtime_info @@ -365,7 +365,7 @@ select * from crdb_internal.gossip_alerts query T select crdb_internal.node_executable_version() ---- -2.1-5 +2.1-6 user root diff --git a/pkg/storage/batcheval/cmd_end_transaction.go b/pkg/storage/batcheval/cmd_end_transaction.go index 82a432c4884d..49fb8970c5c3 100644 --- a/pkg/storage/batcheval/cmd_end_transaction.go +++ b/pkg/storage/batcheval/cmd_end_transaction.go @@ -911,6 +911,29 @@ func splitTrigger( log.VEventf(ctx, 1, "LHS's TxnSpanGCThreshold of split is not set") } + // We're about to write the initial state for the replica. We migrated + // the formerly replicated truncated state into unreplicated keyspace + // in 2.2., but this range may still be using the replicated version + // and we need to make a decision about what to use for the RHS that + // is consistent across the followers: do for the RHS what the LHS + // does: if the LHS has the legacy key, initialize the RHS with a + // legacy key as well. + // + // See VersionUnreplicatedRaftTruncatedState. + truncStateType := stateloader.TruncatedStateUnreplicated + if found, err := engine.MVCCGetProto( + ctx, + batch, + keys.RaftTruncatedStateLegacyKey(rec.GetRangeID()), + hlc.Timestamp{}, + nil, + engine.MVCCGetOptions{}, + ); err != nil { + return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load legacy truncated state") + } else if found { + truncStateType = stateloader.TruncatedStateLegacyReplicated + } + // Writing the initial state is subtle since this also seeds the Raft // group. It becomes more subtle due to proposer-evaluated Raft. // @@ -944,6 +967,7 @@ func splitTrigger( ctx, batch, rightMS, split.RightDesc, rightLease, *gcThreshold, *txnSpanGCThreshold, rec.ClusterSettings().Version.Version().Version, + truncStateType, ) if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state") diff --git a/pkg/storage/batcheval/cmd_resolve_intent_test.go b/pkg/storage/batcheval/cmd_resolve_intent_test.go index 505eafe10119..d4d935fb5525 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent_test.go +++ b/pkg/storage/batcheval/cmd_resolve_intent_test.go @@ -36,13 +36,14 @@ import ( ) type mockEvalCtx struct { - clusterSettings *cluster.Settings - desc *roachpb.RangeDescriptor - clock *hlc.Clock - stats enginepb.MVCCStats - qps float64 - abortSpan *abortspan.AbortSpan - gcThreshold hlc.Timestamp + clusterSettings *cluster.Settings + desc *roachpb.RangeDescriptor + clock *hlc.Clock + stats enginepb.MVCCStats + qps float64 + abortSpan *abortspan.AbortSpan + gcThreshold hlc.Timestamp + term, firstIndex uint64 } func (m *mockEvalCtx) String() string { @@ -85,10 +86,10 @@ func (m *mockEvalCtx) IsFirstRange() bool { panic("unimplemented") } func (m *mockEvalCtx) GetFirstIndex() (uint64, error) { - panic("unimplemented") + return m.firstIndex, nil } func (m *mockEvalCtx) GetTerm(uint64) (uint64, error) { - panic("unimplemented") + return m.term, nil } func (m *mockEvalCtx) GetLeaseAppliedIndex() uint64 { panic("unimplemented") diff --git a/pkg/storage/batcheval/cmd_truncate_log.go b/pkg/storage/batcheval/cmd_truncate_log.go index ae61e3d53eb4..1564f849baa4 100644 --- a/pkg/storage/batcheval/cmd_truncate_log.go +++ b/pkg/storage/batcheval/cmd_truncate_log.go @@ -19,10 +19,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) @@ -57,13 +59,33 @@ func TruncateLog( return result.Result{}, nil } - // Have we already truncated this log? If so, just return without an error. - // TODO(tbg): remove this once followers can have divergent truncated states. - firstIndex, err := cArgs.EvalCtx.GetFirstIndex() + var legacyTruncatedState roachpb.RaftTruncatedState + legacyKeyFound, err := engine.MVCCGetProto( + ctx, batch, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()), + hlc.Timestamp{}, &legacyTruncatedState, engine.MVCCGetOptions{}, + ) if err != nil { return result.Result{}, err } + // See the comment on the cluster version for all the moving parts involved + // in migrating into this cluster version. Note that if the legacy key is + // missing, the cluster version has been bumped (though we may not know it + // yet) and we keep using the unreplicated key. + useNewUnreplicatedTruncatedStateKey := cArgs.EvalCtx.ClusterSettings().Version.IsActive(cluster.VersionUnreplicatedRaftTruncatedState) || !legacyKeyFound + + firstIndex, err := cArgs.EvalCtx.GetFirstIndex() + if err != nil { + return result.Result{}, errors.Wrap(err, "getting first index") + } + // Have we already truncated this log? If so, just return without an error. + // Note that there may in principle be followers whose Raft log is longer + // than this node's, but to issue a truncation we also need the *term* for + // the new truncated state, which we can't obtain if we don't have the log + // entry ourselves. + // + // TODO(tbg): think about synthesizing a valid term. Can we use the next + // existing entry's term? if firstIndex >= args.Index { if log.V(3) { log.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d", @@ -75,9 +97,16 @@ func TruncateLog( // args.Index is the first index to keep. term, err := cArgs.EvalCtx.GetTerm(args.Index - 1) if err != nil { - return result.Result{}, err + return result.Result{}, errors.Wrap(err, "getting term") } + // Compute the number of bytes freed by this truncation. Note that this will + // only make sense for the leaseholder as we base this off its own first + // index (other replicas may have other first indexes assuming we're not + // still using the legacy truncated state key). In principle, this could be + // off either way, though in practice we don't expect followers to have + // a first index smaller than the leaseholder's (see #34287), and most of + // the time everyone's first index should be the same. start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(rangeID, firstIndex)) end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(rangeID, args.Index)) @@ -110,7 +139,23 @@ func TruncateLog( pd.Replicated.State = &storagepb.ReplicaState{ TruncatedState: tState, } + pd.Replicated.RaftLogDelta = ms.SysBytes - return pd, MakeStateLoader(cArgs.EvalCtx).SetLegacyRaftTruncatedState(ctx, batch, cArgs.Stats, tState) + if !useNewUnreplicatedTruncatedStateKey { + return pd, MakeStateLoader(cArgs.EvalCtx).SetLegacyRaftTruncatedState(ctx, batch, cArgs.Stats, tState) + } + if legacyKeyFound { + // Time to migrate by deleting the legacy key. The downstream-of-Raft + // code will atomically rewrite the truncated state (supplied via the + // side effect) into the new unreplicated key. + if err := engine.MVCCDelete( + ctx, batch, cArgs.Stats, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()), + hlc.Timestamp{}, nil, /* txn */ + ); err != nil { + return result.Result{}, err + } + } + + return pd, nil } diff --git a/pkg/storage/batcheval/truncate_log_test.go b/pkg/storage/batcheval/truncate_log_test.go new file mode 100644 index 000000000000..e9915697d89d --- /dev/null +++ b/pkg/storage/batcheval/truncate_log_test.go @@ -0,0 +1,200 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package batcheval + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func putTruncatedState( + t *testing.T, + eng engine.Engine, + rangeID roachpb.RangeID, + truncState roachpb.RaftTruncatedState, + legacy bool, +) { + key := keys.RaftTruncatedStateKey(rangeID) + if legacy { + key = keys.RaftTruncatedStateLegacyKey(rangeID) + } + if err := engine.MVCCPutProto( + context.Background(), eng, nil, key, + hlc.Timestamp{}, nil /* txn */, &truncState, + ); err != nil { + t.Fatal(err) + } +} + +func readTruncStates( + t *testing.T, eng engine.Engine, rangeID roachpb.RangeID, +) (legacy roachpb.RaftTruncatedState, unreplicated roachpb.RaftTruncatedState) { + t.Helper() + legacyFound, err := engine.MVCCGetProto( + context.Background(), eng, keys.RaftTruncatedStateLegacyKey(rangeID), + hlc.Timestamp{}, &legacy, engine.MVCCGetOptions{}, + ) + if err != nil { + t.Fatal(err) + } + if legacyFound != (legacy != roachpb.RaftTruncatedState{}) { + t.Fatalf("legacy key found=%t but state is %+v", legacyFound, legacy) + } + + unreplicatedFound, err := engine.MVCCGetProto( + context.Background(), eng, keys.RaftTruncatedStateKey(rangeID), + hlc.Timestamp{}, &unreplicated, engine.MVCCGetOptions{}, + ) + if err != nil { + t.Fatal(err) + } + if unreplicatedFound != (unreplicated != roachpb.RaftTruncatedState{}) { + t.Fatalf("unreplicated key found=%t but state is %+v", unreplicatedFound, unreplicated) + } + return +} + +const ( + expectationNeither = iota + expectationLegacy + expectationUnreplicated +) + +type unreplicatedTruncStateTest struct { + startsWithLegacy bool + hasVersionBumped bool + exp int // see consts above +} + +func TestTruncateLogUnreplicatedTruncatedState(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Follow the reference below for more information on what's being tested. + _ = cluster.VersionUnreplicatedRaftTruncatedState + + const ( + startsLegacy = true + startsUnreplicated = false + newVersion = true + oldVersion = false + ) + + testCases := []unreplicatedTruncStateTest{ + // Steady states: we have one type of TruncatedState and will end up with + // the same type: either we've already migrated, or we haven't but aren't + // allowed to migrate yet. + {startsUnreplicated, oldVersion, expectationUnreplicated}, + {startsUnreplicated, newVersion, expectationUnreplicated}, + {startsLegacy, oldVersion, expectationLegacy}, + // This is the case in which the migration is triggered. As a result, + // we see neither of the keys written. The new key will be written + // atomically as a side effect (outside of the scope of this test). + {startsLegacy, newVersion, expectationNeither}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) { + runUnreplicatedTruncatedState(t, tc) + }) + } +} + +func runUnreplicatedTruncatedState(t *testing.T, tc unreplicatedTruncStateTest) { + ctx := context.Background() + versionOff := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState - 1) + versionOn := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState) + st := cluster.MakeClusterSettings(versionOff, versionOn) + + if tc.hasVersionBumped { + assert.NoError(t, st.InitializeVersion(cluster.ClusterVersion{Version: versionOn})) + } else { + assert.NoError(t, st.InitializeVersion(cluster.ClusterVersion{Version: versionOff})) + } + + const ( + rangeID = 12 + term = 10 + firstIndex = 100 + ) + + evalCtx := mockEvalCtx{ + clusterSettings: st, + desc: &roachpb.RangeDescriptor{RangeID: rangeID}, + term: term, + firstIndex: firstIndex, + } + + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + defer eng.Close() + + truncState := roachpb.RaftTruncatedState{ + Index: firstIndex + 1, + Term: term, + } + + // Put down the TruncatedState specified by the test case. + putTruncatedState(t, eng, rangeID, truncState, tc.startsWithLegacy) + + // Send a truncation request. + req := roachpb.TruncateLogRequest{ + RangeID: rangeID, + Index: firstIndex + 7, + } + cArgs := CommandArgs{ + EvalCtx: &evalCtx, + Args: &req, + } + resp := &roachpb.TruncateLogResponse{} + res, err := TruncateLog(ctx, eng, cArgs, resp) + if err != nil { + t.Fatal(err) + } + + expTruncState := roachpb.RaftTruncatedState{ + Index: req.Index - 1, + Term: term, + } + + legacy, unreplicated := readTruncStates(t, eng, rangeID) + + switch tc.exp { + case expectationLegacy: + assert.Equal(t, expTruncState, legacy) + assert.Zero(t, unreplicated) + case expectationUnreplicated: + // The unreplicated key that we see should be the initial truncated + // state (it's only updated below Raft). + assert.Equal(t, truncState, unreplicated) + assert.Zero(t, legacy) + case expectationNeither: + assert.Zero(t, unreplicated) + assert.Zero(t, legacy) + default: + t.Fatalf("unknown expectation %d", tc.exp) + } + + assert.NotNil(t, res.Replicated.State) + assert.NotNil(t, res.Replicated.State.TruncatedState) + assert.Equal(t, expTruncState, *res.Replicated.State.TruncatedState) +} diff --git a/pkg/storage/below_raft_protos_test.go b/pkg/storage/below_raft_protos_test.go index c2e8f4833d54..69a56db7e817 100644 --- a/pkg/storage/below_raft_protos_test.go +++ b/pkg/storage/below_raft_protos_test.go @@ -118,6 +118,17 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{ emptySum: 892800390935990883, populatedSum: 16231745342114354146, }, + // This is used downstream of Raft only to write it into unreplicated keyspace + // as part of VersionUnreplicatedRaftTruncatedState. + // However, it has been sent through Raft for a long time, as part of + // ReplicatedEvalResult. + reflect.TypeOf(&roachpb.RaftTruncatedState{}): { + populatedConstructor: func(r *rand.Rand) protoutil.Message { + return roachpb.NewPopulatedRaftTruncatedState(r, false) + }, + emptySum: 5531676819244041709, + populatedSum: 14781226418259198098, + }, } func TestBelowRaftProtos(t *testing.T) { diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index 00932fe2668e..b0ffa5e33bcb 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -352,11 +352,19 @@ type Batch interface { // Distinct returns a view of the existing batch which only sees writes that // were performed before the Distinct batch was created. That is, the // returned batch will not read its own writes, but it will read writes to - // the parent batch performed before the call to Distinct(). The returned + // the parent batch performed before the call to Distinct(), except if the + // parent batch is a WriteOnlyBatch, in which case the Distinct() batch will + // read from the underlying engine. + // + // The returned // batch needs to be closed before using the parent batch again. This is used // as an optimization to avoid flushing mutations buffered by the batch in // situations where we know all of the batched operations are for distinct // keys. + // + // TODO(tbg): it seems insane that you cannot read from a WriteOnlyBatch but + // you can read from a Distinct on top of a WriteOnlyBatch but randomly don't + // see the batch at all. I was personally just bitten by this. Distinct() ReadWriter // Empty returns whether the batch has been written to or not. Empty() bool diff --git a/pkg/storage/main_test.go b/pkg/storage/main_test.go index f333f5f48d67..551d87a8616b 100644 --- a/pkg/storage/main_test.go +++ b/pkg/storage/main_test.go @@ -68,7 +68,7 @@ func TestMain(m *testing.M) { delete(notBelowRaftProtos, typ) } else { failed = true - fmt.Printf("%s: missing fixture!\n", typ) + fmt.Printf("%s: missing fixture! Please adjust belowRaftGoldenProtos if necessary\n", typ) } } diff --git a/pkg/storage/raft.pb.go b/pkg/storage/raft.pb.go index d231257348f1..80b533dc71f9 100644 --- a/pkg/storage/raft.pb.go +++ b/pkg/storage/raft.pb.go @@ -70,7 +70,7 @@ func (x *SnapshotRequest_Priority) UnmarshalJSON(data []byte) error { return nil } func (SnapshotRequest_Priority) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{5, 0} + return fileDescriptor_raft_06448cf81da2fcd5, []int{5, 0} } type SnapshotRequest_Strategy int32 @@ -107,7 +107,7 @@ func (x *SnapshotRequest_Strategy) UnmarshalJSON(data []byte) error { return nil } func (SnapshotRequest_Strategy) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{5, 1} + return fileDescriptor_raft_06448cf81da2fcd5, []int{5, 1} } type SnapshotResponse_Status int32 @@ -152,7 +152,7 @@ func (x *SnapshotResponse_Status) UnmarshalJSON(data []byte) error { return nil } func (SnapshotResponse_Status) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{6, 0} + return fileDescriptor_raft_06448cf81da2fcd5, []int{6, 0} } // RaftHeartbeat is a request that contains the barebones information for a @@ -174,7 +174,7 @@ func (m *RaftHeartbeat) Reset() { *m = RaftHeartbeat{} } func (m *RaftHeartbeat) String() string { return proto.CompactTextString(m) } func (*RaftHeartbeat) ProtoMessage() {} func (*RaftHeartbeat) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{0} + return fileDescriptor_raft_06448cf81da2fcd5, []int{0} } func (m *RaftHeartbeat) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -232,7 +232,7 @@ func (m *RaftMessageRequest) Reset() { *m = RaftMessageRequest{} } func (m *RaftMessageRequest) String() string { return proto.CompactTextString(m) } func (*RaftMessageRequest) ProtoMessage() {} func (*RaftMessageRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{1} + return fileDescriptor_raft_06448cf81da2fcd5, []int{1} } func (m *RaftMessageRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -267,7 +267,7 @@ func (m *RaftMessageRequestBatch) Reset() { *m = RaftMessageRequestBatch func (m *RaftMessageRequestBatch) String() string { return proto.CompactTextString(m) } func (*RaftMessageRequestBatch) ProtoMessage() {} func (*RaftMessageRequestBatch) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{2} + return fileDescriptor_raft_06448cf81da2fcd5, []int{2} } func (m *RaftMessageRequestBatch) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -302,7 +302,7 @@ func (m *RaftMessageResponseUnion) Reset() { *m = RaftMessageResponseUni func (m *RaftMessageResponseUnion) String() string { return proto.CompactTextString(m) } func (*RaftMessageResponseUnion) ProtoMessage() {} func (*RaftMessageResponseUnion) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{3} + return fileDescriptor_raft_06448cf81da2fcd5, []int{3} } func (m *RaftMessageResponseUnion) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -347,7 +347,7 @@ func (m *RaftMessageResponse) Reset() { *m = RaftMessageResponse{} } func (m *RaftMessageResponse) String() string { return proto.CompactTextString(m) } func (*RaftMessageResponse) ProtoMessage() {} func (*RaftMessageResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{4} + return fileDescriptor_raft_06448cf81da2fcd5, []int{4} } func (m *RaftMessageResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -390,7 +390,7 @@ func (m *SnapshotRequest) Reset() { *m = SnapshotRequest{} } func (m *SnapshotRequest) String() string { return proto.CompactTextString(m) } func (*SnapshotRequest) ProtoMessage() {} func (*SnapshotRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{5} + return fileDescriptor_raft_06448cf81da2fcd5, []int{5} } func (m *SnapshotRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -433,16 +433,26 @@ type SnapshotRequest_Header struct { // The priority of the snapshot. Priority SnapshotRequest_Priority `protobuf:"varint,6,opt,name=priority,enum=cockroach.storage.SnapshotRequest_Priority" json:"priority"` // The strategy of the snapshot. - Strategy SnapshotRequest_Strategy `protobuf:"varint,7,opt,name=strategy,enum=cockroach.storage.SnapshotRequest_Strategy" json:"strategy"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_sizecache int32 `json:"-"` + Strategy SnapshotRequest_Strategy `protobuf:"varint,7,opt,name=strategy,enum=cockroach.storage.SnapshotRequest_Strategy" json:"strategy"` + // Whether the snapshot uses the unreplicated RaftTruncatedState or not. + // This is generally always true at 2.2 and above outside of the migration + // phase, though theoretically it could take a long time for all ranges + // to update to the new mechanism. This bool is true iff the Raft log at + // the snapshot's applied index is using the new key. In particular, it + // is true if the index itself carries out the migration (in which case + // the data in the snapshot contains neither key). + // + // See VersionUnreplicatedRaftTruncatedState. + UnreplicatedTruncatedState bool `protobuf:"varint,8,opt,name=unreplicated_truncated_state,json=unreplicatedTruncatedState" json:"unreplicated_truncated_state"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SnapshotRequest_Header) Reset() { *m = SnapshotRequest_Header{} } func (m *SnapshotRequest_Header) String() string { return proto.CompactTextString(m) } func (*SnapshotRequest_Header) ProtoMessage() {} func (*SnapshotRequest_Header) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{5, 0} + return fileDescriptor_raft_06448cf81da2fcd5, []int{5, 0} } func (m *SnapshotRequest_Header) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -478,7 +488,7 @@ func (m *SnapshotResponse) Reset() { *m = SnapshotResponse{} } func (m *SnapshotResponse) String() string { return proto.CompactTextString(m) } func (*SnapshotResponse) ProtoMessage() {} func (*SnapshotResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{6} + return fileDescriptor_raft_06448cf81da2fcd5, []int{6} } func (m *SnapshotResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -519,7 +529,7 @@ func (m *ConfChangeContext) Reset() { *m = ConfChangeContext{} } func (m *ConfChangeContext) String() string { return proto.CompactTextString(m) } func (*ConfChangeContext) ProtoMessage() {} func (*ConfChangeContext) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{7} + return fileDescriptor_raft_06448cf81da2fcd5, []int{7} } func (m *ConfChangeContext) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1053,6 +1063,14 @@ func (m *SnapshotRequest_Header) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x38 i++ i = encodeVarintRaft(dAtA, i, uint64(m.Strategy)) + dAtA[i] = 0x40 + i++ + if m.UnreplicatedTruncatedState { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ return i, nil } @@ -1256,6 +1274,7 @@ func (m *SnapshotRequest_Header) Size() (n int) { n += 1 + l + sovRaft(uint64(l)) n += 1 + sovRaft(uint64(m.Priority)) n += 1 + sovRaft(uint64(m.Strategy)) + n += 2 return n } @@ -2406,6 +2425,26 @@ func (m *SnapshotRequest_Header) Unmarshal(dAtA []byte) error { break } } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UnreplicatedTruncatedState", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.UnreplicatedTruncatedState = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) @@ -2770,80 +2809,82 @@ var ( ErrIntOverflowRaft = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("storage/raft.proto", fileDescriptor_raft_460a63b017d715a3) } - -var fileDescriptor_raft_460a63b017d715a3 = []byte{ - // 1147 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xdd, 0x6e, 0x1b, 0xc5, - 0x17, 0xf7, 0xc6, 0xdf, 0xc7, 0x76, 0xb3, 0x9d, 0x7f, 0xf5, 0x67, 0x65, 0xc0, 0x36, 0x5b, 0x5a, - 0x99, 0x22, 0xad, 0x8b, 0x55, 0xb8, 0xe0, 0xce, 0x1f, 0xdb, 0xc6, 0x4d, 0xf3, 0xa1, 0x4d, 0x5b, - 0x04, 0x52, 0x65, 0x8d, 0xd7, 0x63, 0x7b, 0x15, 0x7b, 0x67, 0xbb, 0x3b, 0x2e, 0xb8, 0x4f, 0xc1, - 0x13, 0x20, 0x6e, 0xb8, 0xe1, 0x05, 0x78, 0x85, 0xdc, 0x20, 0x71, 0x59, 0x09, 0x14, 0x41, 0x78, - 0x8b, 0x5e, 0xa1, 0x99, 0x9d, 0x71, 0x9c, 0xc4, 0xd0, 0x04, 0x21, 0x6e, 0xb8, 0x49, 0xbc, 0xe7, - 0xcc, 0xef, 0x77, 0xf6, 0x9c, 0xdf, 0x39, 0x67, 0x16, 0x50, 0xc4, 0x68, 0x88, 0xc7, 0xa4, 0x11, - 0xe2, 0x11, 0xb3, 0x82, 0x90, 0x32, 0x8a, 0xae, 0xbb, 0xd4, 0x3d, 0x0c, 0x29, 0x76, 0x27, 0x96, - 0xf4, 0x96, 0x6f, 0x88, 0xc7, 0x60, 0xd0, 0x20, 0x61, 0x48, 0xc3, 0x28, 0x3e, 0x58, 0xfe, 0xbf, - 0xb2, 0xce, 0x08, 0xc3, 0x43, 0xcc, 0xb0, 0xb4, 0xbf, 0xab, 0x48, 0xe5, 0xff, 0x60, 0xd0, 0x88, - 0x18, 0x66, 0x44, 0xba, 0xdf, 0x26, 0xcc, 0x1d, 0x8a, 0x80, 0xe2, 0x4f, 0x30, 0x58, 0x09, 0x5e, - 0xbe, 0x31, 0xa6, 0x63, 0x2a, 0x7e, 0x36, 0xf8, 0xaf, 0xd8, 0x6a, 0xfe, 0x90, 0x84, 0x92, 0x83, - 0x47, 0x6c, 0x8b, 0xe0, 0x90, 0x0d, 0x08, 0x66, 0x68, 0x00, 0xb9, 0x10, 0xfb, 0x63, 0xd2, 0xf7, - 0x86, 0x86, 0x56, 0xd3, 0xea, 0xa9, 0xf6, 0x83, 0xa3, 0xe3, 0x6a, 0xe2, 0xe4, 0xb8, 0x9a, 0x75, - 0xb8, 0xbd, 0xd7, 0x7d, 0x7d, 0x5c, 0xbd, 0x37, 0xf6, 0xd8, 0x64, 0x3e, 0xb0, 0x5c, 0x3a, 0x6b, - 0x2c, 0x93, 0x1a, 0x0e, 0x4e, 0x7f, 0x37, 0x82, 0xc3, 0x71, 0x43, 0x66, 0x61, 0x49, 0x9c, 0x93, - 0x15, 0xc4, 0xbd, 0x21, 0xfa, 0x12, 0x36, 0x47, 0x21, 0x9d, 0xf5, 0x43, 0x12, 0x4c, 0x3d, 0x17, - 0xf3, 0x50, 0x1b, 0x35, 0xad, 0x5e, 0x6a, 0xef, 0xc9, 0x50, 0xa5, 0xfb, 0x21, 0x9d, 0x39, 0xb1, - 0x57, 0x04, 0xfc, 0xe4, 0x6a, 0x01, 0x15, 0xd2, 0x29, 0x8d, 0x56, 0x88, 0x86, 0xe8, 0x39, 0x94, - 0x18, 0x5d, 0x0d, 0x9b, 0x14, 0x61, 0x77, 0x64, 0xd8, 0xc2, 0x63, 0xfa, 0x4f, 0x04, 0x2d, 0x30, - 0x7a, 0x1a, 0xd2, 0x80, 0x14, 0x23, 0xe1, 0xcc, 0x48, 0x89, 0x5a, 0xa6, 0x78, 0x24, 0x47, 0x58, - 0xd0, 0x3b, 0x90, 0x71, 0xe9, 0x6c, 0xe6, 0x31, 0x23, 0xbd, 0xe2, 0x93, 0x36, 0x54, 0x81, 0xec, - 0xf3, 0xb9, 0x47, 0x22, 0x97, 0x18, 0x99, 0x9a, 0x56, 0xcf, 0x49, 0xb7, 0x32, 0x9a, 0x3f, 0xa7, - 0x00, 0x71, 0xe5, 0x76, 0x48, 0x14, 0xe1, 0x31, 0x71, 0xc8, 0xf3, 0x39, 0x89, 0xfe, 0x1d, 0xf9, - 0x76, 0xa0, 0xb8, 0x2a, 0x9f, 0xd0, 0xae, 0xd0, 0x7c, 0xdf, 0x3a, 0x6d, 0xef, 0x73, 0x35, 0xe9, - 0x92, 0xc8, 0x0d, 0xbd, 0x80, 0xd1, 0x50, 0x66, 0x51, 0x58, 0x91, 0x05, 0xf5, 0x00, 0x4e, 0x45, - 0x11, 0x8a, 0x5c, 0x8d, 0x2c, 0xbf, 0x2c, 0x37, 0x6a, 0x40, 0x76, 0x16, 0xd7, 0x43, 0xd4, 0xbb, - 0xd0, 0xdc, 0xb4, 0xe2, 0x49, 0xb0, 0x64, 0x99, 0x54, 0x15, 0xe5, 0xa9, 0xd5, 0x2a, 0xa7, 0xd7, - 0x54, 0x19, 0xdd, 0x07, 0x98, 0xa8, 0xd1, 0x88, 0x8c, 0x4c, 0x2d, 0x59, 0x2f, 0x34, 0x6b, 0xd6, - 0x85, 0x39, 0xb6, 0xce, 0xcc, 0x90, 0x24, 0x59, 0x41, 0xa2, 0x3d, 0xd8, 0x5c, 0x3e, 0xf5, 0x43, - 0x12, 0x05, 0x91, 0x91, 0xbd, 0x12, 0xd9, 0xb5, 0x25, 0xdc, 0xe1, 0x68, 0xf4, 0x0c, 0x36, 0x63, - 0x9d, 0x23, 0x86, 0x43, 0xd6, 0x3f, 0x24, 0x0b, 0x23, 0x57, 0xd3, 0xea, 0xc5, 0xf6, 0xc7, 0xaf, - 0x8f, 0xab, 0x1f, 0x5d, 0x4d, 0xdf, 0x6d, 0xb2, 0x70, 0x4a, 0x82, 0xed, 0x80, 0x93, 0x6d, 0x93, - 0x85, 0x39, 0x80, 0xb7, 0x2e, 0x36, 0x57, 0x1b, 0x33, 0x77, 0x82, 0x1e, 0x40, 0x2e, 0x8c, 0x9f, - 0x23, 0x43, 0x13, 0x39, 0xdc, 0xfa, 0x93, 0x1c, 0xce, 0xa1, 0xe3, 0x44, 0x96, 0x60, 0x73, 0x1f, - 0x8c, 0x33, 0xa7, 0xa2, 0x80, 0xfa, 0x11, 0x79, 0xe2, 0x7b, 0xd4, 0x47, 0x16, 0xa4, 0xc5, 0x46, - 0x14, 0x3d, 0x5c, 0x68, 0x1a, 0x6b, 0xda, 0xc1, 0xe6, 0x7e, 0x27, 0x3e, 0xf6, 0x69, 0xea, 0xe8, - 0xdb, 0xaa, 0x66, 0xfe, 0xb2, 0x01, 0xff, 0x5b, 0x43, 0xf9, 0x1f, 0x1f, 0x8a, 0x07, 0x90, 0x9e, - 0xf3, 0xa2, 0xca, 0x91, 0xf8, 0xf0, 0x4d, 0x6a, 0xad, 0xe8, 0x20, 0xc9, 0x62, 0xbc, 0xf9, 0x7d, - 0x1a, 0x36, 0x0f, 0x7c, 0x1c, 0x44, 0x13, 0xca, 0xd4, 0xbe, 0x69, 0x41, 0x66, 0x42, 0xf0, 0x90, - 0x28, 0xa5, 0x3e, 0x58, 0xc3, 0x7e, 0x0e, 0x63, 0x6d, 0x09, 0x80, 0x23, 0x81, 0xe8, 0x36, 0xe4, - 0x0e, 0x5f, 0xf4, 0x07, 0xbc, 0xb9, 0x44, 0xd5, 0x8a, 0xed, 0x02, 0x57, 0x66, 0xfb, 0xa9, 0xe8, - 0x37, 0x27, 0x7b, 0xf8, 0x22, 0x6e, 0xbc, 0x2a, 0x14, 0xa6, 0x74, 0xdc, 0x27, 0x3e, 0x0b, 0x3d, - 0x12, 0x19, 0xc9, 0x5a, 0xb2, 0x5e, 0x74, 0x60, 0x4a, 0xc7, 0x76, 0x6c, 0x41, 0x65, 0x48, 0x8f, - 0x3c, 0x1f, 0x4f, 0x45, 0xa2, 0x6a, 0x94, 0x63, 0x53, 0xf9, 0x9b, 0x24, 0x64, 0xe2, 0xb8, 0xe8, - 0x19, 0xdc, 0xe0, 0x4b, 0xa1, 0x2f, 0x77, 0x40, 0x5f, 0x36, 0xa4, 0x54, 0xec, 0x4a, 0xcd, 0x8c, - 0xc2, 0x8b, 0x1b, 0xf8, 0x26, 0x80, 0x9c, 0x4c, 0xef, 0x25, 0x11, 0xca, 0x25, 0x95, 0x26, 0xf1, - 0x8c, 0x79, 0x2f, 0x09, 0xba, 0x05, 0x05, 0x17, 0xfb, 0xfd, 0x21, 0x71, 0xa7, 0x9e, 0x4f, 0xce, - 0xbc, 0x30, 0xb8, 0xd8, 0xef, 0xc6, 0x76, 0x64, 0x43, 0x5a, 0x5c, 0xf0, 0x62, 0x39, 0xad, 0x2f, - 0xee, 0xf2, 0x53, 0x40, 0xb5, 0xc2, 0x01, 0x07, 0xa8, 0xe4, 0x05, 0x1a, 0xed, 0x40, 0x2e, 0x08, - 0x3d, 0x1a, 0x7a, 0x6c, 0x21, 0x2e, 0x93, 0x6b, 0x6b, 0x9b, 0xe0, 0xbc, 0x4c, 0xfb, 0x12, 0xa2, - 0x06, 0x57, 0x51, 0x70, 0xba, 0x88, 0x85, 0x98, 0x91, 0xf1, 0xc2, 0xc8, 0x5e, 0x9a, 0xee, 0x40, - 0x42, 0x14, 0x9d, 0xa2, 0x78, 0x98, 0xca, 0x69, 0xfa, 0x86, 0x79, 0x0f, 0x72, 0x2a, 0x20, 0x2a, - 0x40, 0xf6, 0xc9, 0xee, 0xf6, 0xee, 0xde, 0x67, 0xbb, 0x7a, 0x02, 0x15, 0x21, 0xe7, 0xd8, 0x9d, - 0xbd, 0xa7, 0xb6, 0xf3, 0xb9, 0xae, 0xa1, 0x12, 0xe4, 0x1d, 0xbb, 0xdd, 0x7a, 0xd4, 0xda, 0xed, - 0xd8, 0xfa, 0x86, 0x69, 0x40, 0x4e, 0xf1, 0xf2, 0x83, 0xdb, 0x4f, 0xfb, 0xed, 0xd6, 0xe3, 0xce, - 0x96, 0x9e, 0x30, 0x7f, 0xd4, 0x40, 0x3f, 0x7d, 0x05, 0xb9, 0x08, 0xb6, 0x20, 0xc3, 0x2b, 0x32, - 0x8f, 0x44, 0xb7, 0x5e, 0x6b, 0xde, 0xf9, 0xcb, 0xf7, 0x8e, 0x41, 0xd6, 0x81, 0x40, 0xa8, 0xeb, - 0x39, 0xc6, 0xf3, 0x8b, 0x43, 0xdd, 0x34, 0xbc, 0x6f, 0xf2, 0xe7, 0x2e, 0x16, 0xb3, 0x07, 0x99, - 0x18, 0x77, 0x21, 0x99, 0x56, 0xa7, 0x63, 0xef, 0x3f, 0xb6, 0xbb, 0xba, 0xc6, 0x5d, 0xad, 0xfd, - 0xfd, 0x47, 0x3d, 0xbb, 0xab, 0x6f, 0xa0, 0x3c, 0xa4, 0x6d, 0xc7, 0xd9, 0x73, 0xf4, 0x24, 0x3f, - 0xd5, 0xb5, 0x3b, 0x8f, 0x7a, 0xbb, 0x76, 0x57, 0x4f, 0x3d, 0x4c, 0xe5, 0x92, 0x7a, 0xca, 0xfc, - 0x4e, 0x83, 0xeb, 0x1d, 0xea, 0x8f, 0x3a, 0x13, 0xde, 0x44, 0x1d, 0xea, 0x33, 0xf2, 0x15, 0x43, - 0x77, 0x01, 0xf8, 0xf7, 0x02, 0xf6, 0x87, 0x6a, 0xb7, 0xe5, 0xdb, 0xd7, 0xe5, 0x6e, 0xcb, 0x77, - 0x62, 0x4f, 0xaf, 0xeb, 0xe4, 0xe5, 0x21, 0xf1, 0x3d, 0x92, 0x0d, 0xf0, 0x62, 0x4a, 0x71, 0xfc, - 0xcd, 0x55, 0x74, 0xd4, 0x23, 0xea, 0x42, 0xf6, 0xef, 0xef, 0x1b, 0x05, 0x6d, 0xbe, 0xd2, 0x20, - 0xbf, 0x33, 0x9f, 0x32, 0x8f, 0x0f, 0x0d, 0x9a, 0x82, 0xbe, 0x32, 0x3c, 0xf1, 0x1c, 0xdf, 0xb9, - 0xdc, 0x84, 0xf1, 0xb3, 0xe5, 0xdb, 0x97, 0x5b, 0x56, 0x66, 0xa2, 0xae, 0xdd, 0xd5, 0xd0, 0x33, - 0x28, 0x72, 0xa7, 0x52, 0x10, 0x99, 0x6f, 0x6e, 0xcb, 0xf2, 0xcd, 0x4b, 0xb4, 0x40, 0x4c, 0xdf, - 0x7e, 0xef, 0xe8, 0xb7, 0x4a, 0xe2, 0xe8, 0xa4, 0xa2, 0xfd, 0x74, 0x52, 0xd1, 0x5e, 0x9d, 0x54, - 0xb4, 0x5f, 0x4f, 0x2a, 0xda, 0xd7, 0xbf, 0x57, 0x12, 0x5f, 0x64, 0x25, 0xf2, 0x8f, 0x00, 0x00, - 0x00, 0xff, 0xff, 0xd4, 0x72, 0xff, 0x08, 0xf7, 0x0b, 0x00, 0x00, +func init() { proto.RegisterFile("storage/raft.proto", fileDescriptor_raft_06448cf81da2fcd5) } + +var fileDescriptor_raft_06448cf81da2fcd5 = []byte{ + // 1172 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xdd, 0x6e, 0x1b, 0x55, + 0x10, 0xce, 0xc6, 0xff, 0x63, 0xbb, 0x71, 0x0f, 0x15, 0xac, 0x4c, 0x71, 0xcc, 0x96, 0x56, 0xa6, + 0x48, 0x76, 0x89, 0x0a, 0x17, 0xdc, 0xc5, 0xf6, 0xb6, 0x71, 0xd3, 0xfc, 0x68, 0x93, 0x16, 0x81, + 0x54, 0x59, 0xc7, 0xeb, 0x13, 0x7b, 0x15, 0x7b, 0xcf, 0xf6, 0xec, 0x71, 0x21, 0x7d, 0x0a, 0x1e, + 0x81, 0x1b, 0x9e, 0x81, 0x57, 0xc8, 0x0d, 0x12, 0x97, 0x95, 0x40, 0x11, 0x04, 0x89, 0x07, 0xe0, + 0xb2, 0x57, 0xe8, 0xfc, 0x25, 0x9b, 0xc4, 0xd0, 0x04, 0x21, 0x6e, 0xb8, 0xb1, 0x77, 0x67, 0xe6, + 0x9b, 0xd9, 0x99, 0x6f, 0x66, 0xce, 0x01, 0x14, 0x73, 0xca, 0xf0, 0x88, 0xb4, 0x18, 0xde, 0xe3, + 0xcd, 0x88, 0x51, 0x4e, 0xd1, 0x75, 0x9f, 0xfa, 0xfb, 0x8c, 0x62, 0x7f, 0xdc, 0xd4, 0xda, 0xea, + 0x0d, 0xf9, 0x1a, 0x0d, 0x5a, 0x84, 0x31, 0xca, 0x62, 0x65, 0x58, 0x7d, 0xdb, 0x48, 0xa7, 0x84, + 0xe3, 0x21, 0xe6, 0x58, 0xcb, 0xdf, 0x33, 0x4e, 0xf5, 0x7f, 0x34, 0x68, 0xc5, 0x1c, 0x73, 0xa2, + 0xd5, 0xef, 0x12, 0xee, 0x0f, 0x65, 0x40, 0xf9, 0x13, 0x0d, 0x12, 0xc1, 0xab, 0x37, 0x46, 0x74, + 0x44, 0xe5, 0x63, 0x4b, 0x3c, 0x29, 0xa9, 0xf3, 0x7d, 0x0a, 0xca, 0x1e, 0xde, 0xe3, 0x6b, 0x04, + 0x33, 0x3e, 0x20, 0x98, 0xa3, 0x01, 0xe4, 0x19, 0x0e, 0x47, 0xa4, 0x1f, 0x0c, 0x6d, 0xab, 0x6e, + 0x35, 0xd2, 0xed, 0x87, 0x87, 0x47, 0xcb, 0x0b, 0xc7, 0x47, 0xcb, 0x39, 0x4f, 0xc8, 0x7b, 0xdd, + 0xd7, 0x47, 0xcb, 0xf7, 0x47, 0x01, 0x1f, 0xcf, 0x06, 0x4d, 0x9f, 0x4e, 0x5b, 0x27, 0x49, 0x0d, + 0x07, 0xa7, 0xcf, 0xad, 0x68, 0x7f, 0xd4, 0xd2, 0x59, 0x34, 0x35, 0xce, 0xcb, 0x49, 0xc7, 0xbd, + 0x21, 0xfa, 0x0a, 0x96, 0xf6, 0x18, 0x9d, 0xf6, 0x19, 0x89, 0x26, 0x81, 0x8f, 0x45, 0xa8, 0xc5, + 0xba, 0xd5, 0x28, 0xb7, 0xb7, 0x74, 0xa8, 0xf2, 0x03, 0x46, 0xa7, 0x9e, 0xd2, 0xca, 0x80, 0x9f, + 0x5e, 0x2d, 0xa0, 0x41, 0x7a, 0xe5, 0xbd, 0x84, 0xa3, 0x21, 0x7a, 0x0e, 0x65, 0x4e, 0x93, 0x61, + 0x53, 0x32, 0xec, 0x86, 0x0e, 0x5b, 0xdc, 0xa5, 0xff, 0x46, 0xd0, 0x22, 0xa7, 0xa7, 0x21, 0x6d, + 0x48, 0x73, 0xc2, 0xa6, 0x76, 0x5a, 0xd6, 0x32, 0x2d, 0x22, 0x79, 0x52, 0x82, 0x6e, 0x42, 0xd6, + 0xa7, 0xd3, 0x69, 0xc0, 0xed, 0x4c, 0x42, 0xa7, 0x65, 0xa8, 0x06, 0xb9, 0xe7, 0xb3, 0x80, 0xc4, + 0x3e, 0xb1, 0xb3, 0x75, 0xab, 0x91, 0xd7, 0x6a, 0x23, 0x74, 0x7e, 0x4a, 0x03, 0x12, 0xcc, 0x6d, + 0x90, 0x38, 0xc6, 0x23, 0xe2, 0x91, 0xe7, 0x33, 0x12, 0xff, 0x37, 0xf4, 0x6d, 0x40, 0x29, 0x49, + 0x9f, 0xe4, 0xae, 0xb8, 0xf2, 0x41, 0xf3, 0xb4, 0xbd, 0xcf, 0xd5, 0xa4, 0x4b, 0x62, 0x9f, 0x05, + 0x11, 0xa7, 0x4c, 0x67, 0x51, 0x4c, 0xd0, 0x82, 0x7a, 0x00, 0xa7, 0xa4, 0x48, 0x46, 0xae, 0xe6, + 0xac, 0x70, 0x52, 0x6e, 0xd4, 0x82, 0xdc, 0x54, 0xd5, 0x43, 0xd6, 0xbb, 0xb8, 0xb2, 0xd4, 0x54, + 0x93, 0xd0, 0xd4, 0x65, 0x32, 0x55, 0xd4, 0x56, 0xc9, 0x2a, 0x67, 0xe6, 0x54, 0x19, 0x3d, 0x00, + 0x18, 0x9b, 0xd1, 0x88, 0xed, 0x6c, 0x3d, 0xd5, 0x28, 0xae, 0xd4, 0x9b, 0x17, 0xe6, 0xb8, 0x79, + 0x66, 0x86, 0xb4, 0x93, 0x04, 0x12, 0x6d, 0xc1, 0xd2, 0xc9, 0x5b, 0x9f, 0x91, 0x38, 0x8a, 0xed, + 0xdc, 0x95, 0x9c, 0x5d, 0x3b, 0x81, 0x7b, 0x02, 0x8d, 0x9e, 0xc1, 0x92, 0xe2, 0x39, 0xe6, 0x98, + 0xf1, 0xfe, 0x3e, 0x39, 0xb0, 0xf3, 0x75, 0xab, 0x51, 0x6a, 0x7f, 0xf2, 0xfa, 0x68, 0xf9, 0xe3, + 0xab, 0xf1, 0xbb, 0x4e, 0x0e, 0xbc, 0xb2, 0xf4, 0xb6, 0x23, 0x9c, 0xad, 0x93, 0x03, 0x67, 0x00, + 0xef, 0x5c, 0x6c, 0xae, 0x36, 0xe6, 0xfe, 0x18, 0x3d, 0x84, 0x3c, 0x53, 0xef, 0xb1, 0x6d, 0xc9, + 0x1c, 0x6e, 0xff, 0x45, 0x0e, 0xe7, 0xd0, 0x2a, 0x91, 0x13, 0xb0, 0xb3, 0x0d, 0xf6, 0x19, 0xab, + 0x38, 0xa2, 0x61, 0x4c, 0x9e, 0x84, 0x01, 0x0d, 0x51, 0x13, 0x32, 0x72, 0x23, 0xca, 0x1e, 0x2e, + 0xae, 0xd8, 0x73, 0xda, 0xc1, 0x15, 0x7a, 0x4f, 0x99, 0x7d, 0x96, 0x3e, 0xfc, 0x76, 0xd9, 0x72, + 0x7e, 0x5e, 0x84, 0xb7, 0xe6, 0xb8, 0xfc, 0x9f, 0x0f, 0xc5, 0x43, 0xc8, 0xcc, 0x44, 0x51, 0xf5, + 0x48, 0x7c, 0xf4, 0x26, 0xb6, 0x12, 0x3c, 0x68, 0x67, 0x0a, 0xef, 0xfc, 0x91, 0x81, 0xa5, 0x9d, + 0x10, 0x47, 0xf1, 0x98, 0x72, 0xb3, 0x6f, 0x56, 0x21, 0x3b, 0x26, 0x78, 0x48, 0x0c, 0x53, 0x1f, + 0xce, 0xf1, 0x7e, 0x0e, 0xd3, 0x5c, 0x93, 0x00, 0x4f, 0x03, 0xd1, 0x1d, 0xc8, 0xef, 0xbf, 0xe8, + 0x0f, 0x44, 0x73, 0xc9, 0xaa, 0x95, 0xda, 0x45, 0xc1, 0xcc, 0xfa, 0x53, 0xd9, 0x6f, 0x5e, 0x6e, + 0xff, 0x85, 0x6a, 0xbc, 0x65, 0x28, 0x4e, 0xe8, 0xa8, 0x4f, 0x42, 0xce, 0x02, 0x12, 0xdb, 0xa9, + 0x7a, 0xaa, 0x51, 0xf2, 0x60, 0x42, 0x47, 0xae, 0x92, 0xa0, 0x2a, 0x64, 0xf6, 0x82, 0x10, 0x4f, + 0x64, 0xa2, 0x66, 0x94, 0x95, 0xa8, 0xfa, 0x7b, 0x0a, 0xb2, 0x2a, 0x2e, 0x7a, 0x06, 0x37, 0xc4, + 0x52, 0xe8, 0xeb, 0x1d, 0xd0, 0xd7, 0x0d, 0xa9, 0x19, 0xbb, 0x52, 0x33, 0x23, 0x76, 0x71, 0x03, + 0xdf, 0x02, 0xd0, 0x93, 0x19, 0xbc, 0x24, 0x92, 0xb9, 0x94, 0xe1, 0x44, 0xcd, 0x58, 0xf0, 0x92, + 0xa0, 0xdb, 0x50, 0xf4, 0x71, 0xd8, 0x1f, 0x12, 0x7f, 0x12, 0x84, 0xe4, 0xcc, 0x07, 0x83, 0x8f, + 0xc3, 0xae, 0x92, 0x23, 0x17, 0x32, 0xf2, 0x80, 0x97, 0xcb, 0x69, 0x7e, 0x71, 0x4f, 0xae, 0x02, + 0xa6, 0x15, 0x76, 0x04, 0xc0, 0x24, 0x2f, 0xd1, 0x68, 0x03, 0xf2, 0x11, 0x0b, 0x28, 0x0b, 0xf8, + 0x81, 0x3c, 0x4c, 0xae, 0xcd, 0x6d, 0x82, 0xf3, 0x34, 0x6d, 0x6b, 0x88, 0x19, 0x5c, 0xe3, 0x42, + 0xb8, 0x8b, 0x39, 0xc3, 0x9c, 0x8c, 0x0e, 0xec, 0xdc, 0xa5, 0xdd, 0xed, 0x68, 0x88, 0x71, 0x67, + 0x5c, 0xa0, 0x07, 0x70, 0x73, 0x16, 0xea, 0x4e, 0xe7, 0x64, 0xd8, 0xe7, 0x6c, 0x16, 0xaa, 0x27, + 0x95, 0x7b, 0x3e, 0x51, 0x9c, 0x6a, 0xd2, 0x72, 0xd7, 0x18, 0xca, 0x94, 0x1f, 0xa5, 0xf3, 0x56, + 0x65, 0xd1, 0xb9, 0x0f, 0x79, 0xf3, 0xe1, 0xa8, 0x08, 0xb9, 0x27, 0x9b, 0xeb, 0x9b, 0x5b, 0x9f, + 0x6f, 0x56, 0x16, 0x50, 0x09, 0xf2, 0x9e, 0xdb, 0xd9, 0x7a, 0xea, 0x7a, 0x5f, 0x54, 0x2c, 0x54, + 0x86, 0x82, 0xe7, 0xb6, 0x57, 0x1f, 0xaf, 0x6e, 0x76, 0xdc, 0xca, 0xa2, 0x63, 0x43, 0xde, 0x7c, + 0x9f, 0x30, 0x5c, 0x7f, 0xda, 0x6f, 0xaf, 0xee, 0x76, 0xd6, 0x2a, 0x0b, 0xce, 0x0f, 0x16, 0x54, + 0x4e, 0x53, 0xd1, 0x0b, 0x65, 0x0d, 0xb2, 0xe2, 0xdb, 0x66, 0xb1, 0xec, 0xfa, 0x6b, 0x2b, 0x77, + 0xff, 0x36, 0x7f, 0x05, 0x6a, 0xee, 0x48, 0x84, 0x39, 0xe6, 0x15, 0x5e, 0x1c, 0x40, 0xe6, 0xc4, + 0x12, 0xfd, 0x57, 0x38, 0x77, 0x40, 0x39, 0x3d, 0xc8, 0x2a, 0xdc, 0x85, 0x64, 0x56, 0x3b, 0x1d, + 0x77, 0x7b, 0xd7, 0xed, 0x56, 0x2c, 0xa1, 0x5a, 0xdd, 0xde, 0x7e, 0xdc, 0x73, 0xbb, 0x95, 0x45, + 0x54, 0x80, 0x8c, 0xeb, 0x79, 0x5b, 0x5e, 0x25, 0x25, 0xac, 0xba, 0x6e, 0xe7, 0x71, 0x6f, 0xd3, + 0xed, 0x56, 0xd2, 0x8f, 0xd2, 0xf9, 0x54, 0x25, 0xed, 0x7c, 0x67, 0xc1, 0xf5, 0x0e, 0x0d, 0xf7, + 0x3a, 0x63, 0xd1, 0x8c, 0x1d, 0x1a, 0x72, 0xf2, 0x35, 0x47, 0xf7, 0x00, 0xc4, 0xbd, 0x03, 0x87, + 0x43, 0xb3, 0x23, 0x0b, 0xed, 0xeb, 0x7a, 0x47, 0x16, 0x3a, 0x4a, 0xd3, 0xeb, 0x7a, 0x05, 0x6d, + 0x24, 0xef, 0x35, 0xb9, 0x08, 0x1f, 0x4c, 0x28, 0x56, 0x77, 0xb7, 0x92, 0x67, 0x5e, 0x51, 0x17, + 0x72, 0xff, 0x7c, 0x6f, 0x19, 0xe8, 0xca, 0x2b, 0x0b, 0x0a, 0x1b, 0xb3, 0x09, 0x0f, 0xc4, 0xf0, + 0xa1, 0x09, 0x54, 0x12, 0x43, 0xa8, 0xf6, 0xc1, 0xdd, 0xcb, 0x4d, 0xaa, 0xb0, 0xad, 0xde, 0xb9, + 0xdc, 0xd2, 0x73, 0x16, 0x1a, 0xd6, 0x3d, 0x0b, 0x3d, 0x83, 0x92, 0x50, 0x1a, 0x06, 0x91, 0xf3, + 0xe6, 0xf6, 0xae, 0xde, 0xba, 0x44, 0x0b, 0x28, 0xf7, 0xed, 0xf7, 0x0f, 0x7f, 0xad, 0x2d, 0x1c, + 0x1e, 0xd7, 0xac, 0x1f, 0x8f, 0x6b, 0xd6, 0xab, 0xe3, 0x9a, 0xf5, 0xcb, 0x71, 0xcd, 0xfa, 0xe6, + 0xb7, 0xda, 0xc2, 0x97, 0x39, 0x8d, 0xfc, 0x33, 0x00, 0x00, 0xff, 0xff, 0xde, 0x5c, 0x11, 0x4b, + 0x3f, 0x0c, 0x00, 0x00, } diff --git a/pkg/storage/raft.proto b/pkg/storage/raft.proto index 6a9faa355c01..9457f95a67c0 100644 --- a/pkg/storage/raft.proto +++ b/pkg/storage/raft.proto @@ -148,6 +148,17 @@ message SnapshotRequest { // The strategy of the snapshot. optional Strategy strategy = 7 [(gogoproto.nullable) = false]; + + // Whether the snapshot uses the unreplicated RaftTruncatedState or not. + // This is generally always true at 2.2 and above outside of the migration + // phase, though theoretically it could take a long time for all ranges + // to update to the new mechanism. This bool is true iff the Raft log at + // the snapshot's applied index is using the new key. In particular, it + // is true if the index itself carries out the migration (in which case + // the data in the snapshot contains neither key). + // + // See VersionUnreplicatedRaftTruncatedState. + optional bool unreplicated_truncated_state = 8 [(gogoproto.nullable) = false]; } optional Header header = 1; diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 22996086e46e..c25de5761740 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/causer" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -898,8 +899,29 @@ func (r *Replica) sendSnapshot( return &benignError{errors.New("raft status not initialized")} } + // TODO(tbg): send snapshots without the past raft log. This means replacing + // the state's truncated state with one whose index and term equal that of + // the RaftAppliedIndex of the snapshot. It looks like the code sending out + // the actual entries will do the right thing from then on (see anchor + // below). + _ = (*kvBatchSnapshotStrategy)(nil).Send + usesReplicatedTruncatedState, err := engine.MVCCGetProto( + ctx, snap.EngineSnap, keys.RaftTruncatedStateLegacyKey(r.RangeID), hlc.Timestamp{}, nil, engine.MVCCGetOptions{}, + ) + if err != nil { + return errors.Wrap(err, "loading legacy truncated state") + } + req := SnapshotRequest_Header{ State: snap.State, + // Tell the recipient whether it needs to synthesize the new + // unreplicated TruncatedState. It could tell by itself by peeking into + // the data, but it uses a write only batch for performance which + // doesn't support that; this is easier. Notably, this is true if the + // snap index itself is the one at which the migration happens. + // + // See VersionUnreplicatedRaftTruncatedState. + UnreplicatedTruncatedState: !usesReplicatedTruncatedState, RaftMessageRequest: RaftMessageRequest{ RangeID: r.RangeID, FromReplica: fromRepDesc, diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index dd2541ee8ab6..13f34b45c9b9 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2241,7 +2241,16 @@ func (r *Replica) applyRaftCommand( oldRaftAppliedIndex, raftAppliedIndex) } - batch := r.store.Engine().NewWriteOnlyBatch() + haveTruncatedState := rResult.State != nil && rResult.State.TruncatedState != nil + var batch engine.Batch + if !haveTruncatedState { + batch = r.store.Engine().NewWriteOnlyBatch() + } else { + // When we update the truncated state, we may need to read the batch + // and can't use a WriteOnlyBatch. This is fine since log truncations + // are tiny batches. + batch = r.store.Engine().NewBatch() + } defer batch.Close() if writeBatch != nil { @@ -2251,8 +2260,7 @@ func (r *Replica) applyRaftCommand( } // The only remaining use of the batch is for range-local keys which we know - // have not been previously written within this batch. Currently the only - // remaining writes are the raft applied index and the updated MVCC stats. + // have not been previously written within this batch. writer := batch.Distinct() // Special-cased MVCC stats handling to exploit commutativity of stats delta @@ -2304,30 +2312,27 @@ func (r *Replica) applyRaftCommand( } } - if rResult.State != nil && rResult.State.TruncatedState != nil { - newTruncatedState := rResult.State.TruncatedState - - // Truncate the Raft log from the entry after the previous - // truncation index to the new truncation index. This is performed - // atomically with the raft command application so that the - // TruncatedState index is always consistent with the state of the - // Raft log itself. We can use the distinct writer because we know - // all writes will be to distinct keys. - // - // Intentionally don't use range deletion tombstones (ClearRange()) - // due to performance concerns connected to having many range - // deletion tombstones. There is a chance that ClearRange will - // perform well here because the tombstones could be "collapsed", - // but it is hardly worth the risk at this point. - prefixBuf := &r.raftMu.stateLoader.RangeIDPrefixBuf - for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ { - // NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to - // avoid allocating when constructing Raft log keys (16 bytes). - unsafeKey := prefixBuf.RaftLogKey(idx) - if err := writer.Clear(engine.MakeMVCCMetadataKey(unsafeKey)); err != nil { - err = errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState) - return enginepb.MVCCStats{}, err - } + if haveTruncatedState { + apply, err := handleTruncatedStateBelowRaft(ctx, oldTruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer) + if err != nil { + return enginepb.MVCCStats{}, err + } + if !apply { + // TODO(tbg): As written, there is low confidence that nil'ing out + // the truncated state has the desired effect as our caller actually + // applies the side effects. It may have taken a copy and won't + // observe what we did. + // + // It's very difficult to test this functionality because of how + // difficult it is to test (*Replica).processRaftCommand (and this + // method). Instead of adding yet another terrible that that bends + // reality to its will in some clunky way, assert that we're never + // hitting this branch, which is supposed to be true until we stop + // sending the raft log in snapshots (#34287). + // Morally we would want to drop the command in checkForcedErrLocked, + // but that may be difficult to achieve. + log.Fatal(ctx, log.Safe(fmt.Sprintf("TruncatedState regressed:\nold: %+v\nnew: %+v", oldTruncatedState, rResult.State.TruncatedState))) + rResult.State.TruncatedState = nil } } @@ -2370,3 +2375,79 @@ func (r *Replica) applyRaftCommand( r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) return deltaStats, nil } + +func handleTruncatedStateBelowRaft( + ctx context.Context, + oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState, + loader stateloader.StateLoader, + distinctEng engine.ReadWriter, +) (_apply bool, _ error) { + // If this is a log truncation, load the resulting unreplicated or legacy + // replicated truncated state (in that order). If the migration is happening + // in this command, the result will be an empty message. In steady state + // after the migration, it's the unreplicated truncated state not taking + // into account the current truncation (since the key is unreplicated). + // Either way, we'll update it below. + // + // See VersionUnreplicatedRaftTruncatedState for details. + truncStatePostApply, truncStateIsLegacy, err := loader.LoadRaftTruncatedState(ctx, distinctEng) + if err != nil { + return false, errors.Wrap(err, "loading truncated state") + } + + // Truncate the Raft log from the entry after the previous + // truncation index to the new truncation index. This is performed + // atomically with the raft command application so that the + // TruncatedState index is always consistent with the state of the + // Raft log itself. We can use the distinct writer because we know + // all writes will be to distinct keys. + // + // Intentionally don't use range deletion tombstones (ClearRange()) + // due to performance concerns connected to having many range + // deletion tombstones. There is a chance that ClearRange will + // perform well here because the tombstones could be "collapsed", + // but it is hardly worth the risk at this point. + prefixBuf := &loader.RangeIDPrefixBuf + for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ { + // NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to + // avoid allocating when constructing Raft log keys (16 bytes). + unsafeKey := prefixBuf.RaftLogKey(idx) + if err := distinctEng.Clear(engine.MakeMVCCMetadataKey(unsafeKey)); err != nil { + return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState) + } + } + + if !truncStateIsLegacy { + if truncStatePostApply.Index < newTruncatedState.Index { + // There are two cases here (though handled just the same). In the + // first case, the Raft command has just deleted the legacy + // replicated truncated state key as part of the migration (so + // truncStateIsLegacy is now false for the first time and + // truncStatePostApply is zero) and we need to atomically write the + // new, unreplicated, key. Or we've already migrated earlier, in + // which case truncStatePostApply equals the current value of the + // new key (which wasn't touched by the batch), and we need to + // overwrite it if this truncation "moves it forward". + + // NB: before the first log truncation evaluated under the + // cluster version which activates this code (see anchor below) this + // block is never reached as truncStatePostApply will equal newTruncatedState. + _ = cluster.VersionUnreplicatedRaftTruncatedState + + if err := engine.MVCCPutProto( + ctx, distinctEng, nil /* ms */, prefixBuf.RaftTruncatedStateKey(), + hlc.Timestamp{}, nil /* txn */, newTruncatedState, + ); err != nil { + return false, errors.Wrap(err, "unable to migrate RaftTruncatedState") + } + // Have migrated and this new truncated state is moving us forward. + // Tell caller that we applied it and that so should they. + return true, nil + } + // Have migrated, but this truncated state moves the existing one + // backwards, so instruct caller to not update in-memory state. + return false, nil + } + // Haven't migrated yet, don't ever discard the update. + return true, nil +} diff --git a/pkg/storage/replica_raft_truncation_test.go b/pkg/storage/replica_raft_truncation_test.go new file mode 100644 index 000000000000..81caed41acc4 --- /dev/null +++ b/pkg/storage/replica_raft_truncation_test.go @@ -0,0 +1,119 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/stateloader" + "github.com/cockroachdb/cockroach/pkg/testutils/datadriven" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func TestHandleTruncatedStateBelowRaft(t *testing.T) { + defer leaktest.AfterTest(t)() + + // This test verifies the expected behavior of the downstream-of-Raft log + // truncation code, in particular regarding the migration below: + _ = cluster.VersionUnreplicatedRaftTruncatedState + + ctx := context.Background() + + // neither exists (migration) + // old one exists (no migration) + // new one exists (migrated already) + // truncstate regresses + + var prevTruncatedState roachpb.RaftTruncatedState + datadriven.Walk(t, "testdata/truncated_state_migration", func(t *testing.T, path string) { + const rangeID = 12 + loader := stateloader.Make(rangeID) + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + defer eng.Close() + + datadriven.RunTest(t, path, func(d *datadriven.TestData) string { + switch d.Cmd { + case "prev": + d.ScanArgs(t, "index", &prevTruncatedState.Index) + d.ScanArgs(t, "term", &prevTruncatedState.Term) + return "" + case "put": + var index uint64 + var term uint64 + var legacy bool + d.ScanArgs(t, "index", &index) + d.ScanArgs(t, "term", &term) + d.ScanArgs(t, "legacy", &legacy) + + truncState := &roachpb.RaftTruncatedState{ + Index: index, + Term: term, + } + + if legacy { + assert.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState)) + } else { + assert.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState)) + } + return "" + case "handle": + var buf bytes.Buffer + + var index uint64 + var term uint64 + d.ScanArgs(t, "index", &index) + d.ScanArgs(t, "term", &term) + + newTruncatedState := &roachpb.RaftTruncatedState{ + Index: index, + Term: term, + } + + apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng) + if err != nil { + return err.Error() + } + fmt.Fprintf(&buf, "apply: %t\n", apply) + + for _, key := range []roachpb.Key{ + keys.RaftTruncatedStateLegacyKey(rangeID), + keys.RaftTruncatedStateKey(rangeID), + } { + var truncatedState roachpb.RaftTruncatedState + ok, err := engine.MVCCGetProto(ctx, eng, key, hlc.Timestamp{}, &truncatedState, engine.MVCCGetOptions{}) + if err != nil { + t.Fatal(err) + } + if !ok { + continue + } + fmt.Fprintf(&buf, "%s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term) + } + return buf.String() + default: + } + return fmt.Sprintf("unsupported: %s", d.Cmd) + }) + }) +} diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 4e1a6b88eb7c..695942ae56b3 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -212,7 +212,7 @@ func entries( } // No results, was it due to unavailability or truncation? - ts, err := rsl.LoadLegacyRaftTruncatedState(ctx, e) + ts, _, err := rsl.LoadRaftTruncatedState(ctx, e) if err != nil { return nil, err } @@ -281,7 +281,7 @@ func term( // sideloaded entries. We only need the term, so this is what we do. ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, math.MaxUint64 /* maxBytes */) if err == raft.ErrCompacted { - ts, err := rsl.LoadLegacyRaftTruncatedState(ctx, eng) + ts, _, err := rsl.LoadRaftTruncatedState(ctx, eng) if err != nil { return 0, err } @@ -318,7 +318,7 @@ func (r *Replica) raftTruncatedStateLocked( if r.mu.state.TruncatedState != nil { return *r.mu.state.TruncatedState, nil } - ts, err := r.mu.stateLoader.LoadLegacyRaftTruncatedState(ctx, r.store.Engine()) + ts, _, err := r.mu.stateLoader.LoadRaftTruncatedState(ctx, r.store.Engine()) if err != nil { return ts, err } @@ -488,8 +488,17 @@ type IncomingSnapshot struct { // The Raft log entries for this snapshot. LogEntries [][]byte // The replica state at the time the snapshot was generated (never nil). - State *storagepb.ReplicaState - snapType string + State *storagepb.ReplicaState + // + // When true, this snapshot contains an unreplicated TruncatedState. When + // false, the TruncatedState is replicated (see the reference below) and the + // recipient must avoid also writing the unreplicated TruncatedState. The + // migration to an unreplicated TruncatedState will be carried out during + // the next log truncation (assuming cluster version is bumped at that + // point). + // See the comment on VersionUnreplicatedRaftTruncatedState for details. + UsesUnreplicatedTruncatedState bool + snapType string } // snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the @@ -861,6 +870,19 @@ func (r *Replica) applySnapshot( distinctBatch := batch.Distinct() stats.batch = timeutil.Now() + if inSnap.UsesUnreplicatedTruncatedState { + // We're using the unreplicated truncated state, which we need to + // manually persist to disk. If we're not taking this branch, the + // snapshot contains a legacy TruncatedState and we don't need to do + // anything (in fact, must not -- the invariant is that exactly one of + // them exists at any given point in the state machine). + if err := stateloader.Make(s.Desc.RangeID).SetRaftTruncatedState( + ctx, distinctBatch, s.TruncatedState, + ); err != nil { + return err + } + } + logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) for i, bytes := range inSnap.LogEntries { if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 1467bdc3d855..66752beac9bb 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -257,6 +257,7 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper, hlc.Timestamp{}, hlc.Timestamp{}, bootstrapVersion.Version, + stateloader.TruncatedStateUnreplicated, ); err != nil { t.Fatal(err) } @@ -9829,7 +9830,10 @@ func TestReplicaBootstrapRangeAppliedStateKey(t *testing.T) { // Save the ReplicaState and perform persistent assertions again. repl.raftMu.Lock() repl.mu.Lock() - if _, err := repl.mu.stateLoader.Save(ctx, tc.engine, repl.mu.state); err != nil { + if _, err := repl.mu.stateLoader.Save( + ctx, tc.engine, repl.mu.state, + stateloader.TruncatedStateUnreplicated, + ); err != nil { t.Fatalf("could not save ReplicaState: %v", err) } repl.mu.Unlock() diff --git a/pkg/storage/stateloader/initial.go b/pkg/storage/stateloader/initial.go index 3027c5a481b5..aaaa58764b71 100644 --- a/pkg/storage/stateloader/initial.go +++ b/pkg/storage/stateloader/initial.go @@ -55,8 +55,14 @@ func WriteInitialReplicaState( gcThreshold hlc.Timestamp, txnSpanGCThreshold hlc.Timestamp, activeVersion roachpb.Version, + truncStateType TruncatedStateType, ) (enginepb.MVCCStats, error) { rsl := Make(desc.RangeID) + // NB: be careful using activeVersion here. One caller of this code is the + // split trigger, and the version with which the split trigger is called can + // vary across followers. Thus, actions which require coordination cannot + // use the version as a trigger (this is why this method takes a + // truncStateType argument). var s storagepb.ReplicaState s.TruncatedState = &roachpb.RaftTruncatedState{ @@ -101,7 +107,7 @@ func WriteInitialReplicaState( log.Fatalf(ctx, "expected trivial TxnSpanGCThreshold, but found %+v", existingTxnSpanGCThreshold) } - newMS, err := rsl.Save(ctx, eng, s) + newMS, err := rsl.Save(ctx, eng, s, truncStateType) if err != nil { return enginepb.MVCCStats{}, err } @@ -125,9 +131,10 @@ func WriteInitialState( gcThreshold hlc.Timestamp, txnSpanGCThreshold hlc.Timestamp, bootstrapVersion roachpb.Version, + truncStateType TruncatedStateType, ) (enginepb.MVCCStats, error) { newMS, err := WriteInitialReplicaState( - ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold, bootstrapVersion) + ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold, bootstrapVersion, truncStateType) if err != nil { return enginepb.MVCCStats{}, err } diff --git a/pkg/storage/stateloader/stateloader.go b/pkg/storage/stateloader/stateloader.go index 995b19311136..14f43cc36070 100644 --- a/pkg/storage/stateloader/stateloader.go +++ b/pkg/storage/stateloader/stateloader.go @@ -105,7 +105,7 @@ func (rsl StateLoader) Load( // The truncated state should not be optional (i.e. the pointer is // pointless), but it is and the migration is not worth it. - truncState, err := rsl.LoadLegacyRaftTruncatedState(ctx, reader) + truncState, _, err := rsl.LoadRaftTruncatedState(ctx, reader) if err != nil { return storagepb.ReplicaState{}, err } @@ -114,6 +114,17 @@ func (rsl StateLoader) Load( return s, nil } +// TruncatedStateType determines whether to use a replicated (legacy) or an +// unreplicated TruncatedState. See VersionUnreplicatedRaftTruncatedStateKey. +type TruncatedStateType int + +const ( + // TruncatedStateLegacyReplicated means use the legacy (replicated) key. + TruncatedStateLegacyReplicated TruncatedStateType = iota + // TruncatedStateUnreplicated means use the new (unreplicated) key. + TruncatedStateUnreplicated +) + // Save persists the given ReplicaState to disk. It assumes that the contained // Stats are up-to-date and returns the stats which result from writing the // updated State. @@ -126,7 +137,10 @@ func (rsl StateLoader) Load( // missing whenever save is called. Optional values should be reserved // strictly for use in Result. Do before merge. func (rsl StateLoader) Save( - ctx context.Context, eng engine.ReadWriter, state storagepb.ReplicaState, + ctx context.Context, + eng engine.ReadWriter, + state storagepb.ReplicaState, + truncStateType TruncatedStateType, ) (enginepb.MVCCStats, error) { ms := state.Stats if err := rsl.SetLease(ctx, eng, ms, *state.Lease); err != nil { @@ -138,8 +152,14 @@ func (rsl StateLoader) Save( if err := rsl.SetTxnSpanGCThreshold(ctx, eng, ms, state.TxnSpanGCThreshold); err != nil { return enginepb.MVCCStats{}, err } - if err := rsl.SetLegacyRaftTruncatedState(ctx, eng, ms, state.TruncatedState); err != nil { - return enginepb.MVCCStats{}, err + if truncStateType == TruncatedStateLegacyReplicated { + if err := rsl.SetLegacyRaftTruncatedState(ctx, eng, ms, state.TruncatedState); err != nil { + return enginepb.MVCCStats{}, err + } + } else { + if err := rsl.SetRaftTruncatedState(ctx, eng, state.TruncatedState); err != nil { + return enginepb.MVCCStats{}, err + } } if state.UsingAppliedStateKey { rai, lai := state.RaftAppliedIndex, state.LeaseAppliedIndex @@ -416,19 +436,6 @@ func (rsl StateLoader) SetMVCCStats( return rsl.writeLegacyMVCCStatsInternal(ctx, eng, newMS) } -// LoadLegacyRaftTruncatedState loads the truncated state. -func (rsl StateLoader) LoadLegacyRaftTruncatedState( - ctx context.Context, reader engine.Reader, -) (roachpb.RaftTruncatedState, error) { - var truncState roachpb.RaftTruncatedState - if _, err := engine.MVCCGetProto( - ctx, reader, rsl.RaftTruncatedStateLegacyKey(), hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, - ); err != nil { - return roachpb.RaftTruncatedState{}, err - } - return truncState, nil -} - // SetLegacyRaftTruncatedState overwrites the truncated state. func (rsl StateLoader) SetLegacyRaftTruncatedState( ctx context.Context, @@ -508,7 +515,7 @@ func (rsl StateLoader) LoadLastIndex(ctx context.Context, reader engine.Reader) if lastIndex == 0 { // The log is empty, which means we are either starting from scratch // or the entire log has been truncated away. - lastEnt, err := rsl.LoadLegacyRaftTruncatedState(ctx, reader) + lastEnt, _, err := rsl.LoadRaftTruncatedState(ctx, reader) if err != nil { return 0, err } @@ -517,6 +524,48 @@ func (rsl StateLoader) LoadLastIndex(ctx context.Context, reader engine.Reader) return lastIndex, nil } +// LoadRaftTruncatedState loads the truncated state. The returned boolean returns +// whether the result was read from the TruncatedStateLegacyKey. If both keys +// are missing, it is false which is used to migrate into the unreplicated key. +// +// See VersionUnreplicatedRaftTruncatedState. +func (rsl StateLoader) LoadRaftTruncatedState( + ctx context.Context, reader engine.Reader, +) (_ roachpb.RaftTruncatedState, isLegacy bool, _ error) { + var truncState roachpb.RaftTruncatedState + if found, err := engine.MVCCGetProto( + ctx, reader, rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, + ); err != nil { + return roachpb.RaftTruncatedState{}, false, err + } else if found { + return truncState, false, nil + } + + // If the "new" truncated state isn't there (yet), fall back to the legacy + // truncated state. The next log truncation will atomically rewrite them + // assuming the cluster version has advanced sufficiently. + // + // See VersionUnreplicatedRaftTruncatedState. + legacyFound, err := engine.MVCCGetProto( + ctx, reader, rsl.RaftTruncatedStateLegacyKey(), hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, + ) + if err != nil { + return roachpb.RaftTruncatedState{}, false, err + } + return truncState, legacyFound, nil +} + +// SetRaftTruncatedState overwrites the truncated state. +func (rsl StateLoader) SetRaftTruncatedState( + ctx context.Context, eng engine.ReadWriter, truncState *roachpb.RaftTruncatedState, +) error { + if (*truncState == roachpb.RaftTruncatedState{}) { + return errors.New("cannot persist empty RaftTruncatedState") + } + return engine.MVCCPutProto(ctx, eng, nil, /* ms */ + rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, nil, truncState) +} + // LoadReplicaDestroyedError loads the replica destroyed error for the specified // range. If there is no error, nil is returned. func (rsl StateLoader) LoadReplicaDestroyedError( @@ -575,7 +624,7 @@ func (rsl StateLoader) SynthesizeRaftState(ctx context.Context, eng engine.ReadW if err != nil { return err } - truncState, err := rsl.LoadLegacyRaftTruncatedState(ctx, eng) + truncState, _, err := rsl.LoadRaftTruncatedState(ctx, eng) if err != nil { return err } diff --git a/pkg/storage/stats_test.go b/pkg/storage/stats_test.go index 050170c1593a..fd1bc16af872 100644 --- a/pkg/storage/stats_test.go +++ b/pkg/storage/stats_test.go @@ -31,8 +31,8 @@ import ( // writeInitialState(). func initialStats() enginepb.MVCCStats { return enginepb.MVCCStats{ - SysBytes: 130, - SysCount: 4, + SysBytes: 98, + SysCount: 3, } } func TestRangeStatsEmpty(t *testing.T) { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index c5497e144017..79268f0e121a 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2174,12 +2174,25 @@ func (s *Store) WriteInitialData( } } + // See the cluster version for more details. We're basically saying that if the cluster + // is bootstrapped at a version that uses the unreplicated truncated state, initialize + // it with such a truncated state. + truncStateType := stateloader.TruncatedStateUnreplicated + if bootstrapVersion.Less(cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState)) { + truncStateType = stateloader.TruncatedStateLegacyReplicated + } + lease := roachpb.BootstrapLease() _, err := stateloader.WriteInitialState( ctx, batch, enginepb.MVCCStats{}, *desc, - lease, hlc.Timestamp{}, hlc.Timestamp{}, bootstrapVersion) + lease, + hlc.Timestamp{}, + hlc.Timestamp{}, + bootstrapVersion, + truncStateType, + ) if err != nil { return err } diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index e906b82edec2..b14202e65520 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -140,11 +140,12 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } inSnap := IncomingSnapshot{ - SnapUUID: snapUUID, - Batches: batches, - LogEntries: logEntries, - State: &header.State, - snapType: snapTypeRaft, + UsesUnreplicatedTruncatedState: header.UnreplicatedTruncatedState, + SnapUUID: snapUUID, + Batches: batches, + LogEntries: logEntries, + State: &header.State, + snapType: snapTypeRaft, } if header.RaftMessageRequest.ToReplica.ReplicaID == 0 { inSnap.snapType = snapTypePreemptive diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index b63285db8b7d..d8c81ebdd8b7 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1348,6 +1348,7 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep context.Background(), store.engine, enginepb.MVCCStats{}, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, store.ClusterSettings().Version.Version().Version, + stateloader.TruncatedStateUnreplicated, ); err != nil { t.Fatal(err) } @@ -2778,6 +2779,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, s.ClusterSettings().Version.Version().Version, + stateloader.TruncatedStateUnreplicated, ); err != nil { t.Fatal(err) } diff --git a/pkg/storage/testdata/truncated_state_migration/migration b/pkg/storage/testdata/truncated_state_migration/migration new file mode 100644 index 000000000000..d2c6ea1e6d83 --- /dev/null +++ b/pkg/storage/testdata/truncated_state_migration/migration @@ -0,0 +1,25 @@ +# Migrating after VersionUnreplicatedRaftTruncatedState was enabled. During the +# migration, no TruncatedState is on disk, but we write the new, unreplicated, +# one (note the /u/ in the key) + +prev index=100 term=9 +---- + +handle index=150 term=9 +---- +apply: true +/Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9 + +# Simulate another truncation that moves forward. + +handle index=170 term=9 +---- +apply: true +/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9 + +# ... and one that moves backwards and should not take effect. + +handle index=150 term=9 +---- +apply: false +/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9 diff --git a/pkg/storage/testdata/truncated_state_migration/pre_migration b/pkg/storage/testdata/truncated_state_migration/pre_migration new file mode 100644 index 000000000000..e84177bd0b51 --- /dev/null +++ b/pkg/storage/testdata/truncated_state_migration/pre_migration @@ -0,0 +1,26 @@ +# Mode of operation below VersionUnreplicatedRaftTruncatedState. +# We don't mess with the on-disk state nor do we ever drop updates. + +prev index=100 term=9 +---- + +put legacy=true index=100 term=9 +---- + +handle index=100 term=9 +---- +apply: true +/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 + +# Note that the below aren't actually possible in practice +# as a divergence won't happen before the migration. + +handle index=150 term=9 +---- +apply: true +/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 + +handle index=60 term=9 +---- +apply: true +/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9