Skip to content

Commit

Permalink
kvserver: Add migration for interleaved intents to separated
Browse files Browse the repository at this point in the history
Adds a long running migration that relies on a ranged-read
Raft command to leak an engine snapshot, on which we do a
scan and push any txns whose intents were observed to still
be interleaved.

Part of #41720.

Release note: None.
  • Loading branch information
itsbilal committed Jun 14, 2021
1 parent 66e227d commit 9dce6ed
Show file tree
Hide file tree
Showing 24 changed files with 1,844 additions and 539 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-12 set the active cluster version in the format '<major>.<minor>'
version version 21.1-14 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-12</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-14</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
66 changes: 66 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,71 @@ func runDebugMergeLogs(cmd *cobra.Command, args []string) error {
return writeLogStream(s, cmd.OutOrStdout(), o.filter, o.prefix, o.keepRedactable)
}

var debugIntentCount = &cobra.Command{
Use: "intent-count <store directory>",
Short: "return a count of intents in directory",
Long: `
Returns a count of interleaved and separated intents in the store directory.
Used to investigate stores with lots of unresolved intents, or to confirm
if the migration away from interleaved intents was successful.
`,
Args: cobra.MinimumNArgs(1),
RunE: runDebugIntentCount,
}

func runDebugIntentCount(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
if err != nil {
return err
}
var interleavedIntentCount, separatedIntentCount int

iter := db.NewEngineIterator(storage.IterOptions{
LowerBound: roachpb.KeyMin,
UpperBound: roachpb.KeyMax,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin})
for ; valid && err == nil; valid, err = iter.NextEngineKey() {
key, err := iter.EngineKey()
if err != nil {
return err
}
if key.IsLockTableKey() {
separatedIntentCount++
continue
}
if !key.IsMVCCKey() {
continue
}
mvccKey, err := key.ToMVCCKey()
if err != nil {
return err
}
if !mvccKey.Timestamp.IsEmpty() {
continue
}
val := iter.Value()
meta := enginepb.MVCCMetadata{}
if err := meta.Unmarshal(val); err != nil {
return err
}
if meta.IsInline() {
continue
}
interleavedIntentCount++
}
if err != nil {
return err
}
fmt.Printf("interleaved intents: %d\nseparated intents: %d\n",
interleavedIntentCount, separatedIntentCount)
return nil
}

// DebugCmdsForRocksDB lists debug commands that access rocksdb through the engine
// and need encryption flags (injected by CCL code).
// Note: do NOT include commands that just call rocksdb code without setting up an engine.
Expand All @@ -1298,6 +1363,7 @@ var DebugCmdsForRocksDB = []*cobra.Command{
debugRaftLogCmd,
debugRangeDataCmd,
debugRangeDescriptorsCmd,
debugIntentCount,
}

// All other debug commands go here.
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ const (
ExpressionBasedIndexes
// DeleteDeprecatedNamespaceTableDescriptorMigration deletes the descriptor at ID=2.
DeleteDeprecatedNamespaceTableDescriptorMigration
// SeparatedIntentsMigration adds the migration to move over all remaining
// intents to the separated lock table space.
SeparatedIntentsMigration

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -544,6 +547,10 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: DeleteDeprecatedNamespaceTableDescriptorMigration,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 12},
},
{
Key: SeparatedIntentsMigration,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 14},
},

// Step (2): Add new versions here.
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.AdminScatterRequest:
case *roachpb.AddSSTableRequest:
case *roachpb.MigrateRequest:
case *roachpb.MigrateLockTableRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
Expand Down Expand Up @@ -828,3 +829,24 @@ func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) migrateLockTable(s, e interface{}) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.MigrateLockTableRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
10 changes: 10 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,16 @@ func (db *DB) Migrate(ctx context.Context, begin, end interface{}, version roach
return getOneErr(db.Run(ctx, b), b)
}

// MigrateLockTable is a special command that leaks an engine snapshot to a
// local map. At send time, the replica(s) conduct a migration to convert all
// interleaved intents to separated intents. This command is called with the
// assumption that no more interleaved intents will be written after this point.
func (db *DB) MigrateLockTable(ctx context.Context, begin, end interface{}) error {
b := &Batch{}
b.migrateLockTable(begin, end)
return getOneErr(db.Run(ctx, b), b)
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
75 changes: 75 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package batcheval

import (
"context"
"math/rand"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -21,11 +22,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

func init() {
RegisterReadWriteCommand(roachpb.Migrate, declareKeysMigrate, Migrate)
RegisterReadOnlyCommand(roachpb.MigrateLockTable, declareKeysMigrateLockTable, MigrateLockTable)
}

func declareKeysMigrate(
Expand All @@ -46,15 +49,49 @@ func declareKeysMigrate(
lockSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(rs.GetRangeID())})
}

func declareKeysMigrateLockTable(
rs ImmutableRangeState,
_ roachpb.Header,
_ roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())})
}

// migrationRegistry is a global registry of all KV-level migrations. See
// pkg/migration for details around how the migrations defined here are
// wired up.
var migrationRegistry = make(map[roachpb.Version]migration)

// lockTableSnapshotRegistryType is a node-local registry of all snapshots
// created as part of the MigrateLockTable command. A singleton instance
// of this type is stored as lockTableSnapshotRegistry.
type lockTableSnapshotRegistryType struct {
syncutil.Mutex

registry map[uint64]storage.Reader
}

var lockTableSnapshotRegistry = lockTableSnapshotRegistryType{
registry: make(map[uint64]storage.Reader),
}

func GetAndRemoveLockTableSnapshot(id uint64) storage.Reader {
lockTableSnapshotRegistry.Lock()
defer lockTableSnapshotRegistry.Unlock()

if val, ok := lockTableSnapshotRegistry.registry[id]; ok {
delete(lockTableSnapshotRegistry.registry, id)
return val
}
return nil
}

type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Result, error)

func init() {
registerMigration(clusterversion.TruncatedAndRangeAppliedStateMigration, truncatedAndAppliedStateMigration)
registerMigration(clusterversion.SeparatedIntentsMigration, separatedIntentsMigration)
}

func registerMigration(key clusterversion.Key, migration migration) {
Expand Down Expand Up @@ -95,6 +132,35 @@ func Migrate(
return pd, nil
}

// MigrateLockTable leaks the engine's snapshot for an eventual lock-table migration.
// The actual migration is implemented in Replica.migrateLockTable.
func MigrateLockTable(
ctx context.Context, reader storage.Reader, cArgs CommandArgs, response roachpb.Response,
) (result.Result, error) {
snap := cArgs.EvalCtx.Engine().NewSnapshot()
resp := response.(*roachpb.MigrateLockTableResponse)
ms := cArgs.Stats

if ms != nil && ms.ContainsEstimates == 0 && ms.IntentCount == 0 {
// Fast path: this range does not contain any intents. Return the zero
// value result.
resp.NoIntents = true
}
id := rand.Uint64()
lockTableSnapshotRegistry.Lock()
defer lockTableSnapshotRegistry.Unlock()
for {
if _, ok := lockTableSnapshotRegistry.registry[id]; !ok {
break
}
id = rand.Uint64()
}
lockTableSnapshotRegistry.registry[id] = snap
resp.SnapshotId = id

return result.Result{}, nil
}

// truncatedAndRangeAppliedStateMigration lets us stop using the legacy
// replicated truncated state and start using the new RangeAppliedState for this
// specific range.
Expand Down Expand Up @@ -130,6 +196,15 @@ func truncatedAndAppliedStateMigration(
return pd, nil
}

// separatedIntentsMigration is the below-raft part of the migration for
// interleaved to separated intents. It is a no-op as the only purpose of
// running the Migrate command here is to clear out replicas with
func separatedIntentsMigration(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
return result.Result{}, nil
}

// TestingRegisterMigrationInterceptor is used in tests to register an
// interceptor for a below-raft migration.
//
Expand Down
Loading

0 comments on commit 9dce6ed

Please sign in to comment.