From 458f3984183930ecfd296994011d079d01d242b0 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 11:58:09 -0500 Subject: [PATCH] migration: introduce the IterateRangeDescriptors primitive It's not currently wired up to anything. We'll use it in future PRs to send out `Migrate` requests to the entire keyspace. This was originally prototyped in #57445. See the inline comments and the RFC (#48843) for the motivation here. Release note: None --- pkg/migration/BUILD.bazel | 10 ++++++ pkg/migration/client_test.go | 61 ++++++++++++++++++++++++++++++++ pkg/migration/helper.go | 68 ++++++++++++++++++++++++++++++++++++ pkg/migration/main_test.go | 29 +++++++++++++++ 4 files changed, 168 insertions(+) create mode 100644 pkg/migration/client_test.go create mode 100644 pkg/migration/main_test.go diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index e674c65e55a4..23a1b35826fc 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -12,6 +12,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", + "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", @@ -33,17 +34,26 @@ go_library( go_test( name = "migration_test", srcs = [ + "client_test.go", "helper_test.go", + "main_test.go", "util_test.go", ], embed = [":migration"], deps = [ "//pkg/clusterversion", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", "//pkg/server/serverpb", + "//pkg/sql/tests", "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/syncutil", "//vendor/google.golang.org/grpc", diff --git a/pkg/migration/client_test.go b/pkg/migration/client_test.go new file mode 100644 index 000000000000..a56a1c490a18 --- /dev/null +++ b/pkg/migration/client_test.go @@ -0,0 +1,61 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package migration_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestHelperIterateRangeDescriptors(t *testing.T) { + defer leaktest.AfterTest(t) + + cv := clusterversion.ClusterVersion{} + ctx := context.Background() + const numNodes = 1 + + params, _ := tests.CreateTestServerParams() + server, _, kvDB := serverutils.StartServer(t, params) + defer server.Stopper().Stop(context.Background()) + + var numRanges int + if err := server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error { + numRanges = s.ReplicaCount() + return nil + }); err != nil { + t.Fatal(err) + } + + c := migration.TestingNewCluster(numNodes, migration.TestingWithKV(kvDB)) + h := migration.TestingNewHelper(c, cv) + + for _, blockSize := range []int{1, 5, 10, 50} { + var numDescs int + init := func() { numDescs = 0 } + if err := h.IterateRangeDescriptors(ctx, blockSize, init, func(descriptors ...roachpb.RangeDescriptor) error { + numDescs += len(descriptors) + return nil + }); err != nil { + t.Fatal(err) + } + + if numDescs != numRanges { + t.Fatalf("expected to find %d ranges, found %d", numRanges+1, numDescs) + } + } +} diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go index b7884b6b24e6..21feddb07917 100644 --- a/pkg/migration/helper.go +++ b/pkg/migration/helper.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -157,6 +158,73 @@ func (h *Helper) EveryNode( return nil } +// IterateRangeDescriptors provides a handle on every range descriptor in the +// system, which callers can then use to send out arbitrary KV requests to in +// order to run arbitrary KV-level migrations. These requests will typically +// just be the `Migrate` request, with code added within [1] to do the specific +// things intended for the specified version. +// +// It's important to note that the closure is being executed in the context of a +// distributed transaction that may be automatically retried. So something like +// the following is an anti-pattern: +// +// processed := 0 +// _ = h.IterateRangeDescriptors(..., +// func(descriptors ...roachpb.RangeDescriptor) error { +// processed += len(descriptors) // we'll over count if retried +// log.Infof(ctx, "processed %d ranges", processed) +// }, +// ) +// +// Instead we allow callers to pass in a callback to signal on every attempt +// (including the first). This lets us salvage the example above: +// +// var processed int +// init := func() { processed = 0 } +// _ = h.IterateRangeDescriptors(..., init, +// func(descriptors ...roachpb.RangeDescriptor) error { +// processed += len(descriptors) +// log.Infof(ctx, "processed %d ranges", processed) +// }, +// ) +// +// [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go +func (h *Helper) IterateRangeDescriptors( + ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error, +) error { + if err := h.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Inform the caller that we're starting a fresh attempt to page in + // range descriptors. + init() + + // Iterate through meta2 to pull out all the range descriptors. + return txn.Iterate(ctx, keys.Meta2Prefix, keys.MetaMax, blockSize, + func(rows []kv.KeyValue) error { + descriptors := make([]roachpb.RangeDescriptor, len(rows)) + for i, row := range rows { + if err := row.ValueProto(&descriptors[i]); err != nil { + return errors.Wrapf(err, + "unable to unmarshal range descriptor from %s", + row.Key, + ) + } + } + + // Invoke fn with the current chunk (of size ~blockSize) of + // range descriptors. + if err := fn(descriptors...); err != nil { + return err + } + + return nil + }) + }); err != nil { + return err + } + + return nil +} + // DB provides exposes the underlying *kv.DB instance. func (h *Helper) DB() *kv.DB { return h.c.db() diff --git a/pkg/migration/main_test.go b/pkg/migration/main_test.go new file mode 100644 index 000000000000..cdc3f7742a5a --- /dev/null +++ b/pkg/migration/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +}