Skip to content

Commit

Permalink
migration: introduce the IterateRangeDescriptors primitive
Browse files Browse the repository at this point in the history
It's not currently wired up to anything. We'll use it in future PRs to
send out `Migrate` requests to the entire keyspace. This was originally
prototyped in cockroachdb#57445. See the inline comments and the RFC (cockroachdb#48843) for
the motivation here.

Release note: None
  • Loading branch information
irfansharif committed Dec 18, 2020
1 parent b367516 commit 458f398
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
Expand All @@ -33,17 +34,26 @@ go_library(
go_test(
name = "migration_test",
srcs = [
"client_test.go",
"helper_test.go",
"main_test.go",
"util_test.go",
],
embed = [":migration"],
deps = [
"//pkg/clusterversion",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/syncutil",
"//vendor/google.golang.org/grpc",
Expand Down
61 changes: 61 additions & 0 deletions pkg/migration/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package migration_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

func TestHelperIterateRangeDescriptors(t *testing.T) {
defer leaktest.AfterTest(t)

cv := clusterversion.ClusterVersion{}
ctx := context.Background()
const numNodes = 1

params, _ := tests.CreateTestServerParams()
server, _, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())

var numRanges int
if err := server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error {
numRanges = s.ReplicaCount()
return nil
}); err != nil {
t.Fatal(err)
}

c := migration.TestingNewCluster(numNodes, migration.TestingWithKV(kvDB))
h := migration.TestingNewHelper(c, cv)

for _, blockSize := range []int{1, 5, 10, 50} {
var numDescs int
init := func() { numDescs = 0 }
if err := h.IterateRangeDescriptors(ctx, blockSize, init, func(descriptors ...roachpb.RangeDescriptor) error {
numDescs += len(descriptors)
return nil
}); err != nil {
t.Fatal(err)
}

if numDescs != numRanges {
t.Fatalf("expected to find %d ranges, found %d", numRanges+1, numDescs)
}
}
}
68 changes: 68 additions & 0 deletions pkg/migration/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -157,6 +158,73 @@ func (h *Helper) EveryNode(
return nil
}

// IterateRangeDescriptors provides a handle on every range descriptor in the
// system, which callers can then use to send out arbitrary KV requests to in
// order to run arbitrary KV-level migrations. These requests will typically
// just be the `Migrate` request, with code added within [1] to do the specific
// things intended for the specified version.
//
// It's important to note that the closure is being executed in the context of a
// distributed transaction that may be automatically retried. So something like
// the following is an anti-pattern:
//
// processed := 0
// _ = h.IterateRangeDescriptors(...,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors) // we'll over count if retried
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// Instead we allow callers to pass in a callback to signal on every attempt
// (including the first). This lets us salvage the example above:
//
// var processed int
// init := func() { processed = 0 }
// _ = h.IterateRangeDescriptors(..., init,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors)
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go
func (h *Helper) IterateRangeDescriptors(
ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error,
) error {
if err := h.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Inform the caller that we're starting a fresh attempt to page in
// range descriptors.
init()

// Iterate through meta2 to pull out all the range descriptors.
return txn.Iterate(ctx, keys.Meta2Prefix, keys.MetaMax, blockSize,
func(rows []kv.KeyValue) error {
descriptors := make([]roachpb.RangeDescriptor, len(rows))
for i, row := range rows {
if err := row.ValueProto(&descriptors[i]); err != nil {
return errors.Wrapf(err,
"unable to unmarshal range descriptor from %s",
row.Key,
)
}
}

// Invoke fn with the current chunk (of size ~blockSize) of
// range descriptors.
if err := fn(descriptors...); err != nil {
return err
}

return nil
})
}); err != nil {
return err
}

return nil
}

// DB provides exposes the underlying *kv.DB instance.
func (h *Helper) DB() *kv.DB {
return h.c.db()
Expand Down
29 changes: 29 additions & 0 deletions pkg/migration/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migration_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

0 comments on commit 458f398

Please sign in to comment.