Skip to content

Commit

Permalink
storage: make RaftTruncatedState unreplicated
Browse files Browse the repository at this point in the history
See cockroachdb#34287.

Today, Raft (or preemptive) snapshots include the past Raft log, that
is, log entries which are already reflected in the state of the
snapshot. Fundamentally, this is because we have historically used
a replicated TruncatedState.

TruncatedState essentially tells us what the first index in the log is
(though it also includes a Term).
If the TruncatedState cannot diverge across replicas, we *must* send the
whole log in snapshots, as the first log index must match what the
TruncatedState claims it is.

The Raft log is typically, but not necessarily small. Log truncations
are driven by a queue and use a complex decision process. That decision
process can be faulty and even if it isn't, the queue could be held up.
Besides, even when the Raft log contains only very few entries, these
entries may be quite large (see SSTable ingestion during RESTORE).

All this motivates that we don't want to (be forced to) send the Raft
log as part of snapshots, and in turn we need the TruncatedState to
be unreplicated.

This change migrates the TruncatedState into unreplicated keyspace.
It does not yet allow snapshots to avoid sending the past Raft log,
but that is a relatively straightforward follow-up change.

VersionUnreplicatedRaftTruncatedState, when active, moves the truncated
state into unreplicated keyspace on log truncations.

The migration works as follows:

1. at any log position, the replicas of a Range either use the new
(unreplicated) key or the old one, and exactly one of them exists.

2. When a log truncation evaluates under the new cluster version,
it initiates the migration by deleting the old key. Under the old cluster
version, it behaves like today, updating the replicated truncated state.

3. The deletion signals new code downstream of Raft and triggers a write
to the new, unreplicated, key (atomic with the deletion of the old key).

4. Future log truncations don't write any replicated data any more, but
(like before) send along the TruncatedState which is written downstream
of Raft atomically with the deletion of the log entries. This actually
uses the same code as 3.
What's new is that the truncated state needs to be verified before
replacing a previous one. If replicas disagree about their truncated
state, it's possible for replica X at FirstIndex=100 to apply a
truncated state update that sets FirstIndex to, say, 50 (proposed by a
replica with a "longer" historical log). In that case, the truncated
state update must be ignored (this is straightforward downstream-of-Raft
code).

5. When a split trigger evaluates, it seeds the RHS with the legacy
key iff the LHS uses the legacy key, and the unreplicated key otherwise.
This makes sure that the invariant that all replicas agree on the
state of the migration is upheld.

6. When a snapshot is applied, the receiver is told whether the snapshot
contains a legacy key. If not, it writes the truncated state (which is
part of the snapshot metadata) in its unreplicated version. Otherwise
it doesn't have to do anything (the range will migrate later).

The following diagram visualizes the above. Note that it abuses sequence
diagrams to get a nice layout; the vertical lines belonging to NewState
and OldState don't imply any particular ordering of operations.

```
┌────────┐                            ┌────────┐
│OldState│                            │NewState│
└───┬────┘                            └───┬────┘
    │                        Bootstrap under old version
    │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
    │                                     │
    │                                     │     Bootstrap under new version
    │                                     │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
    │                                     │
    │─ ─ ┐
    │    | Log truncation under old version
    │< ─ ┘
    │                                     │
    │─ ─ ┐                                │
    │    | Snapshot                       │
    │< ─ ┘                                │
    │                                     │
    │                                     │─ ─ ┐
    │                                     │    | Snapshot
    │                                     │< ─ ┘
    │                                     │
    │   Log truncation under new version  │
    │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─>│
    │                                     │
    │                                     │─ ─ ┐
    │                                     │    | Log truncation under new version
    │                                     │< ─ ┘
    │                                     │
    │                                     │─ ─ ┐
    │                                     │    | Log truncation under old version
    │                                     │< ─ ┘ (necessarily running new binary)
```

Source: http://www.plantuml.com/plantuml/uml/ and the following input:

@startuml
scale 600 width

OldState <--] : Bootstrap under old version
NewState <--] : Bootstrap under new version
OldState --> OldState : Log truncation under old version
OldState --> OldState : Snapshot
NewState --> NewState : Snapshot
OldState --> NewState : Log truncation under new version
NewState --> NewState : Log truncation under new version
NewState --> NewState : Log truncation under old version\n(necessarily running new binary)
@enduml

Release note: None
  • Loading branch information
tbg committed Feb 11, 2019
1 parent 3878b12 commit d0aa09e
Show file tree
Hide file tree
Showing 30 changed files with 1,165 additions and 175 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-5</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-6</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
3 changes: 2 additions & 1 deletion pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ var (
LocalRaftAppliedIndexLegacySuffix = []byte("rfta")
// LocalRaftTombstoneSuffix is the suffix for the raft tombstone.
LocalRaftTombstoneSuffix = []byte("rftb")
// LocalRaftTruncatedStateLegacySuffix is the suffix for the RaftTruncatedState.
// LocalRaftTruncatedStateLegacySuffix is the suffix for the legacy RaftTruncatedState.
// See VersionUnreplicatedRaftTruncatedState.
LocalRaftTruncatedStateLegacySuffix = []byte("rftt")
// LocalRangeLeaseSuffix is the suffix for a range lease.
LocalRangeLeaseSuffix = []byte("rll-")
Expand Down
11 changes: 11 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func LeaseAppliedIndexLegacyKey(rangeID roachpb.RangeID) roachpb.Key {
}

// RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState.
// See VersionUnreplicatedRaftTruncatedState.
func RaftTruncatedStateLegacyKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateLegacyKey()
}
Expand Down Expand Up @@ -314,6 +315,11 @@ func RaftTombstoneKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTombstoneKey()
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func RaftTruncatedStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateKey()
}

// RaftHardStateKey returns a system-local key for a Raft HardState.
func RaftHardStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftHardStateKey()
Expand Down Expand Up @@ -935,6 +941,11 @@ func (b RangeIDPrefixBuf) RaftTombstoneKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftTombstoneSuffix...)
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func (b RangeIDPrefixBuf) RaftTruncatedStateKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftTruncatedStateLegacySuffix...)
}

// RaftHardStateKey returns a system-local key for a Raft HardState.
func (b RangeIDPrefixBuf) RaftHardStateKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftHardStateSuffix...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ var (
ppFunc: raftLogKeyPrint,
psFunc: raftLogKeyParse,
},
{name: "LegacyRaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix},
{name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix},
{name: "RaftLastIndex", suffix: LocalRaftLastIndexSuffix},
{name: "RangeLastReplicaGCTimestamp", suffix: LocalRangeLastReplicaGCTimestampSuffix},
{name: "RangeLastVerificationTimestamp", suffix: LocalRangeLastVerificationTimestampSuffixDeprecated},
Expand Down
3 changes: 2 additions & 1 deletion pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func TestPrettyPrint(t *testing.T) {
{RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState"},
{RaftAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftAppliedIndex"},
{LeaseAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LeaseAppliedIndex"},
{RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LegacyRaftTruncatedState"},
{RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState"},
{RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftTruncatedState"},
{RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease"},
{RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats"},
{RangeTxnSpanGCThresholdKey(roachpb.RangeID(1000001)), `/Local/RangeID/1000001/r/RangeTxnSpanGCThreshold`},
Expand Down
172 changes: 172 additions & 0 deletions pkg/server/version_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,20 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

type testClusterWithHelpers struct {
Expand Down Expand Up @@ -191,6 +195,174 @@ func TestClusterVersionPersistedOnJoin(t *testing.T) {
}
}

// TestClusterVersionUnreplicatedRaftTruncatedState exercises the
// VersionUnreplicatedRaftTruncatedState migration in as much detail as possible
// in a unit test.
//
// It starts a four node cluster with a pre-migration version and upgrades into
// the new version while traffic and scattering are active, verifying that the
// truncated states are rewritten.
func TestClusterVersionUnreplicatedRaftTruncatedState(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

dir, finish := testutils.TempDir(t)
defer finish()

oldVersion := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState - 1)
oldVersionS := oldVersion.String()
newVersionS := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState).String()

// Four node cluster in which all versions support newVersion (i.e. would in
// principle upgrade to it) but are bootstrapped at oldVersion.
versions := [][2]string{
{oldVersionS, newVersionS},
{oldVersionS, newVersionS},
{oldVersionS, newVersionS},
{oldVersionS, newVersionS},
}

bootstrapVersion := cluster.ClusterVersion{Version: oldVersion}

knobs := base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
BootstrapVersion: &bootstrapVersion,
},
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
},
}

tc := setupMixedCluster(t, knobs, versions, dir)
defer tc.TestCluster.Stopper().Stop(ctx)

if _, err := tc.ServerConn(0).Exec(`
CREATE TABLE kv (id INT PRIMARY KEY, v INT);
ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i);
`); err != nil {
t.Fatal(err)
}

scatter := func() {
t.Helper()
if _, err := tc.ServerConn(0).Exec(
`ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i%$1+1], i FROM generate_series(0, 9) AS g(i)`, len(versions),
); err != nil {
t.Log(err)
}
}

var n int
insert := func() {
t.Helper()
n++
// Write only to a subset of our ranges to guarantee log truncations there.
_, err := tc.ServerConn(0).Exec(`UPSERT INTO kv VALUES($1, $2)`, n%2, n)
if err != nil {
t.Fatal(err)
}
}

for i := 0; i < 500; i++ {
insert()
}
scatter()

for _, server := range tc.Servers {
assert.NoError(t, server.GetStores().(*storage.Stores).VisitStores(func(s *storage.Store) error {
s.VisitReplicas(func(r *storage.Replica) bool {
key := keys.RaftTruncatedStateKey(r.RangeID)
var truncState roachpb.RaftTruncatedState
found, err := engine.MVCCGetProto(
context.Background(), s.Engine(), key,
hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{},
)
if err != nil {
t.Fatal(err)
}
if found {
t.Errorf("unexpectedly found unreplicated TruncatedState at %s", key)
}
return true // want more
})
return nil
}))
}

if v := tc.getVersionFromSelect(0); v != oldVersionS {
t.Fatalf("running %s, wanted %s", v, oldVersionS)
}

assert.NoError(t, tc.setVersion(0, newVersionS))
for i := 0; i < 500; i++ {
insert()
}
scatter()

for _, server := range tc.Servers {
testutils.SucceedsSoon(t, func() error {
err := server.GetStores().(*storage.Stores).VisitStores(func(s *storage.Store) error {
// We scattered and so old copies of replicas may be laying around.
// If we're not proactive about removing them, the test gets pretty
// slow because those replicas aren't caught up any more.
s.MustForceReplicaGCScanAndProcess()
var err error
s.VisitReplicas(func(r *storage.Replica) bool {
snap := s.Engine().NewSnapshot()
defer snap.Close()

keyLegacy := keys.RaftTruncatedStateLegacyKey(r.RangeID)
keyUnreplicated := keys.RaftTruncatedStateKey(r.RangeID)

if found, innerErr := engine.MVCCGetProto(
context.Background(), snap, keyLegacy,
hlc.Timestamp{}, nil, engine.MVCCGetOptions{},
); innerErr != nil {
t.Fatal(innerErr)
} else if found {
if err == nil {
err = errors.New("found legacy TruncatedState")
}
err = errors.Wrap(err, r.String())

// Force a log truncation to prove that this rectifies
// the situation.
status := r.RaftStatus()
if status != nil {
desc := r.Desc()
truncate := &roachpb.TruncateLogRequest{}
truncate.Key = desc.StartKey.AsRawKey()
truncate.RangeID = desc.RangeID
truncate.Index = status.HardState.Commit
var ba roachpb.BatchRequest
ba.RangeID = r.RangeID
ba.Add(truncate)
if _, err := s.DB().NonTransactionalSender().Send(ctx, ba); err != nil {
t.Fatal(err)
}
}
return true // want more
}

if found, err := engine.MVCCGetProto(
context.Background(), snap, keyUnreplicated,
hlc.Timestamp{}, nil, engine.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
} else if !found {
// We can't have neither of the keys present.
t.Fatalf("%s: unexpectedly did not find unreplicated TruncatedState at %s", r, keyUnreplicated)
}

return true // want more
})
return err
})
return err
})
}
}

func TestClusterVersionUpgrade(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
93 changes: 93 additions & 0 deletions pkg/settings/cluster/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
VersionExportStorageWorkload
VersionLazyTxnRecord
VersionSequencedReads
VersionUnreplicatedRaftTruncatedState // see versionsSingleton for details

// Add new versions here (step one of two).

Expand Down Expand Up @@ -315,6 +316,98 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionSequencedReads,
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 5},
},
{
// VersionLazyTxnRecord is https://github.com/cockroachdb/cockroach/pull/34660.
// When active, it moves the truncated state into unreplicated keyspace
// on log truncations.
//
// The migration works as follows:
//
// 1. at any log position, the replicas of a Range either use the new
// (unreplicated) key or the old one, and exactly one of them exists.
//
// 2. When a log truncation evaluates under the new cluster version,
// it initiates the migration by deleting the old key. Under the old cluster
// version, it behaves like today, updating the replicated truncated state.
//
// 3. The deletion signals new code downstream of Raft and triggers a write
// to the new, unreplicated, key (atomic with the deletion of the old key).
//
// 4. Future log truncations don't write any replicated data any more, but
// (like before) send along the TruncatedState which is written downstream
// of Raft atomically with the deletion of the log entries. This actually
// uses the same code as 3.
// What's new is that the truncated state needs to be verified before
// replacing a previous one. If replicas disagree about their truncated
// state, it's possible for replica X at FirstIndex=100 to apply a
// truncated state update that sets FirstIndex to, say, 50 (proposed by a
// replica with a "longer" historical log). In that case, the truncated
// state update must be ignored (this is straightforward downstream-of-Raft
// code).
//
// 5. When a split trigger evaluates, it seeds the RHS with the legacy
// key iff the LHS uses the legacy key, and the unreplicated key otherwise.
// This makes sure that the invariant that all replicas agree on the
// state of the migration is upheld.
//
// 6. When a snapshot is applied, the receiver is told whether the snapshot
// contains a legacy key. If not, it writes the truncated state (which is
// part of the snapshot metadata) in its unreplicated version. Otherwise
// it doesn't have to do anything (the range will migrate later).
//
// The following diagram visualizes the above. Note that it abuses sequence
// diagrams to get a nice layout; the vertical lines belonging to NewState
// and OldState don't imply any particular ordering of operations.
//
// ┌────────┐ ┌────────┐
// │OldState│ │NewState│
// └───┬────┘ └───┬────┘
// │ Bootstrap under old version
// │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// │ │
// │ │ Bootstrap under new version
// │ │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// │ │
// │─ ─ ┐
// │ | Log truncation under old version
// │< ─ ┘
// │ │
// │─ ─ ┐ │
// │ | Snapshot │
// │< ─ ┘ │
// │ │
// │ │─ ─ ┐
// │ │ | Snapshot
// │ │< ─ ┘
// │ │
// │ Log truncation under new version │
// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─>│
// │ │
// │ │─ ─ ┐
// │ │ | Log truncation under new version
// │ │< ─ ┘
// │ │
// │ │─ ─ ┐
// │ │ | Log truncation under old version
// │ │< ─ ┘ (necessarily running new binary)
//
// Source: http://www.plantuml.com/plantuml/uml/ and the following input:
//
// @startuml
// scale 600 width
//
// OldState <--] : Bootstrap under old version
// NewState <--] : Bootstrap under new version
// OldState --> OldState : Log truncation under old version
// OldState --> OldState : Snapshot
// NewState --> NewState : Snapshot
// OldState --> NewState : Log truncation under new version
// NewState --> NewState : Log truncation under new version
// NewState --> NewState : Log truncation under old version\n(necessarily running new binary)
// @enduml
Key: VersionUnreplicatedRaftTruncatedState,
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 6},
},

// Add new versions here (step two of two).

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ select crdb_internal.set_vmodule('')
query T
select crdb_internal.node_executable_version()
----
2.1-5
2.1-6

query ITTT colnames
select node_id, component, field, regexp_replace(regexp_replace(value, '^\d+$', '<port>'), e':\\d+', ':<port>') as value from crdb_internal.node_runtime_info
Expand Down Expand Up @@ -365,7 +365,7 @@ select * from crdb_internal.gossip_alerts
query T
select crdb_internal.node_executable_version()
----
2.1-5
2.1-6

user root

Expand Down
Loading

0 comments on commit d0aa09e

Please sign in to comment.