From 75688c2fe965812794a2b454b07d706e84b2c3be Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 14 Oct 2022 11:04:42 -0400 Subject: [PATCH] rangedesciter: carve out library for range desc iteration Informs #87503; pure code-movement. Going to use it in future commits as part of multi-tenant replication reports (#89987) where we'll need to iterate over the set of range descriptors. Release note: None --- pkg/BUILD.bazel | 4 + pkg/server/BUILD.bazel | 1 + pkg/server/server_sql.go | 8 +- pkg/upgrade/system_upgrade.go | 24 ---- pkg/upgrade/upgradecluster/BUILD.bazel | 6 +- pkg/upgrade/upgradecluster/cluster.go | 53 +------- pkg/util/rangedesciter/BUILD.bazel | 39 ++++++ pkg/util/rangedesciter/main_test.go | 31 +++++ pkg/util/rangedesciter/rangedesciter.go | 125 ++++++++++++++++++ .../rangedesciter/rangedesciter_test.go} | 21 +-- 10 files changed, 217 insertions(+), 95 deletions(-) create mode 100644 pkg/util/rangedesciter/BUILD.bazel create mode 100644 pkg/util/rangedesciter/main_test.go create mode 100644 pkg/util/rangedesciter/rangedesciter.go rename pkg/{upgrade/upgradecluster/client_test.go => util/rangedesciter/rangedesciter_test.go} (78%) diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 4f4a6cdc8d44..531556511cb9 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -585,6 +585,7 @@ ALL_TESTS = [ "//pkg/util/quantile:quantile_test", "//pkg/util/quotapool:quotapool_test", "//pkg/util/randutil:randutil_test", + "//pkg/util/rangedesciter:rangedesciter_test", "//pkg/util/retry:retry_test", "//pkg/util/ring:ring_test", "//pkg/util/schedulerlatency:schedulerlatency_test", @@ -2020,6 +2021,8 @@ GO_TARGETS = [ "//pkg/util/quotapool:quotapool_test", "//pkg/util/randutil:randutil", "//pkg/util/randutil:randutil_test", + "//pkg/util/rangedesciter:rangedesciter", + "//pkg/util/rangedesciter:rangedesciter_test", "//pkg/util/retry:retry", "//pkg/util/retry:retry_test", "//pkg/util/ring:ring", @@ -2994,6 +2997,7 @@ GET_X_DATA_TARGETS = [ "//pkg/util/quantile:get_x_data", "//pkg/util/quotapool:get_x_data", "//pkg/util/randutil:get_x_data", + "//pkg/util/rangedesciter:get_x_data", "//pkg/util/retry:get_x_data", "//pkg/util/ring:get_x_data", "//pkg/util/schedulerlatency:get_x_data", diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 94aac3f80288..9bbfbdb2a58f 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -257,6 +257,7 @@ go_library( "//pkg/util/pprofutil", "//pkg/util/protoutil", "//pkg/util/quotapool", + "//pkg/util/rangedesciter", "//pkg/util/retry", "//pkg/util/schedulerlatency", "//pkg/util/stop", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 2a3d5edd51d1..f787fc18cd6f 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -111,6 +111,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/netutil/addr" + "github.com/cockroachdb/cockroach/pkg/util/rangedesciter" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -1004,9 +1005,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { var systemDeps upgrade.SystemDeps if codec.ForSystemTenant() { c = upgradecluster.New(upgradecluster.ClusterConfig{ - NodeLiveness: nodeLiveness, - Dialer: cfg.nodeDialer, - DB: cfg.db, + NodeLiveness: nodeLiveness, + Dialer: cfg.nodeDialer, + RangeDescIterator: rangedesciter.New(cfg.db), + DB: cfg.db, }) systemDeps = upgrade.SystemDeps{ Cluster: c, diff --git a/pkg/upgrade/system_upgrade.go b/pkg/upgrade/system_upgrade.go index 1e50e1310d87..c6cde37755f4 100644 --- a/pkg/upgrade/system_upgrade.go +++ b/pkg/upgrade/system_upgrade.go @@ -94,30 +94,6 @@ type Cluster interface { // just be the `Migrate` request, with code added within [1] to do the // specific things intended for the specified version. // - // It's important to note that the closure is being executed in the context of - // a distributed transaction that may be automatically retried. So something - // like the following is an anti-pattern: - // - // processed := 0 - // _ = h.IterateRangeDescriptors(..., - // func(descriptors ...roachpb.RangeDescriptor) error { - // processed += len(descriptors) // we'll over count if retried - // log.Infof(ctx, "processed %d ranges", processed) - // }, - // ) - // - // Instead we allow callers to pass in a callback to signal on every attempt - // (including the first). This lets us salvage the example above: - // - // var processed int - // init := func() { processed = 0 } - // _ = h.IterateRangeDescriptors(..., init, - // func(descriptors ...roachpb.RangeDescriptor) error { - // processed += len(descriptors) - // log.Infof(ctx, "processed %d ranges", processed) - // }, - // ) - // // [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go IterateRangeDescriptors( ctx context.Context, diff --git a/pkg/upgrade/upgradecluster/BUILD.bazel b/pkg/upgrade/upgradecluster/BUILD.bazel index aa230dc64ab4..ed9ddb606aee 100644 --- a/pkg/upgrade/upgradecluster/BUILD.bazel +++ b/pkg/upgrade/upgradecluster/BUILD.bazel @@ -11,7 +11,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgradecluster", visibility = ["//visibility:public"], deps = [ - "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", @@ -20,6 +19,7 @@ go_library( "//pkg/util/ctxgroup", "//pkg/util/log", "//pkg/util/quotapool", + "//pkg/util/rangedesciter", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@org_golang_google_grpc//:go_default_library", @@ -30,7 +30,6 @@ go_test( name = "upgradecluster_test", size = "small", srcs = [ - "client_test.go", "helper_test.go", "main_test.go", "nodes_test.go", @@ -38,15 +37,12 @@ go_test( args = ["-test.timeout=55s"], embed = [":upgradecluster"], deps = [ - "//pkg/keys", - "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", "//pkg/server/serverpb", - "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", diff --git a/pkg/upgrade/upgradecluster/cluster.go b/pkg/upgrade/upgradecluster/cluster.go index 7edc8a3039ba..d1630a92b197 100644 --- a/pkg/upgrade/upgradecluster/cluster.go +++ b/pkg/upgrade/upgradecluster/cluster.go @@ -14,7 +14,6 @@ package upgradecluster import ( "context" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -23,7 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" - "github.com/cockroachdb/errors" + "github.com/cockroachdb/cockroach/pkg/util/rangedesciter" "github.com/cockroachdb/redact" "google.golang.org/grpc" ) @@ -42,6 +41,9 @@ type ClusterConfig struct { // Dialer constructs connections to other nodes. Dialer NodeDialer + // RangeDescIterator iterates through all range descriptors. + RangeDescIterator rangedesciter.Iterator + // DB provides access the kv.DB instance backing the cluster. // // TODO(irfansharif): We could hide the kv.DB instance behind an interface @@ -143,50 +145,5 @@ func (c *Cluster) ForEveryNode( func (c *Cluster) IterateRangeDescriptors( ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error, ) error { - return c.c.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - // Inform the caller that we're starting a fresh attempt to page in - // range descriptors. - init() - - // Iterate through meta{1,2} to pull out all the range descriptors. - var lastRangeIDInMeta1 roachpb.RangeID - return txn.Iterate(ctx, keys.MetaMin, keys.MetaMax, blockSize, - func(rows []kv.KeyValue) error { - descriptors := make([]roachpb.RangeDescriptor, 0, len(rows)) - var desc roachpb.RangeDescriptor - for _, row := range rows { - err := row.ValueProto(&desc) - if err != nil { - return errors.Wrapf(err, "unable to unmarshal range descriptor from %s", row.Key) - } - - // In small enough clusters it's possible for the same range - // descriptor to be stored in both meta1 and meta2. This - // happens when some range spans both the meta and the user - // keyspace. Consider when r1 is [/Min, - // /System/NodeLiveness); we'll store the range descriptor - // in both /Meta2/ and in /Meta1/KeyMax[1]. - // - // As part of iterator we'll de-duplicate this descriptor - // away by checking whether we've seen it before in meta1. - // Since we're scanning over the meta range in sorted - // order, it's enough to check against the last range - // descriptor we've seen in meta1. - // - // [1]: See kvserver.rangeAddressing. - if desc.RangeID == lastRangeIDInMeta1 { - continue - } - - descriptors = append(descriptors, desc) - if keys.InMeta1(keys.RangeMetaKey(desc.StartKey)) { - lastRangeIDInMeta1 = desc.RangeID - } - } - - // Invoke fn with the current chunk (of size ~blockSize) of - // range descriptors. - return fn(descriptors...) - }) - }) + return c.c.RangeDescIterator.Iterate(ctx, blockSize, init, fn) } diff --git a/pkg/util/rangedesciter/BUILD.bazel b/pkg/util/rangedesciter/BUILD.bazel new file mode 100644 index 000000000000..aed75899a35f --- /dev/null +++ b/pkg/util/rangedesciter/BUILD.bazel @@ -0,0 +1,39 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "rangedesciter", + srcs = ["rangedesciter.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/rangedesciter", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv", + "//pkg/roachpb", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "rangedesciter_test", + srcs = [ + "main_test.go", + "rangedesciter_test.go", + ], + args = ["-test.timeout=295s"], + deps = [ + ":rangedesciter", + "//pkg/keys", + "//pkg/kv/kvserver", + "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql/tests", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/util/rangedesciter/main_test.go b/pkg/util/rangedesciter/main_test.go new file mode 100644 index 000000000000..9770c518a095 --- /dev/null +++ b/pkg/util/rangedesciter/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangedesciter_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/util/rangedesciter/rangedesciter.go b/pkg/util/rangedesciter/rangedesciter.go new file mode 100644 index 000000000000..315516f04d72 --- /dev/null +++ b/pkg/util/rangedesciter/rangedesciter.go @@ -0,0 +1,125 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangedesciter + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" +) + +// Iterator paginates through every range descriptor in the system. +type Iterator interface { + // Iterate paginates through range descriptors in the system using the given + // page size. It's important to note that the closure is being executed in + // the context of a distributed transaction that may be automatically + // retried. So something like the following is an anti-pattern: + // + // processed := 0 + // _ = rdi.Iterate(..., + // func(descriptors ...roachpb.RangeDescriptor) error { + // processed += len(descriptors) // we'll over count if retried + // log.Infof(ctx, "processed %d ranges", processed) + // }, + // ) + // + // Instead we allow callers to pass in a callback to signal on every attempt + // (including the first). This lets us salvage the example above: + // + // var processed int + // init := func() { processed = 0 } + // _ = rdi.Iterate(..., init, + // func(descriptors ...roachpb.RangeDescriptor) error { + // processed += len(descriptors) + // log.Infof(ctx, "processed %d ranges", processed) + // }, + // ) + Iterate( + ctx context.Context, pageSize int, init func(), + fn func(descriptors ...roachpb.RangeDescriptor) error, + ) error +} + +// DB is a database handle to a CRDB cluster. +type DB interface { + Txn(ctx context.Context, retryable func(context.Context, *kv.Txn) error) error +} + +// iteratorImpl is a concrete (private) implementation of the Iterator +// interface. +type iteratorImpl struct { + db DB +} + +// New returns an Iterator. +func New(db DB) Iterator { + return &iteratorImpl{db: db} +} + +var _ Iterator = &iteratorImpl{} + +// Iterate implements the Iterator interface. +func (i *iteratorImpl) Iterate( + ctx context.Context, + pageSize int, + init func(), + fn func(descriptors ...roachpb.RangeDescriptor) error, +) error { + return i.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Inform the caller that we're starting a fresh attempt to page in + // range descriptors. + init() + + // Iterate through meta{1,2} to pull out all the range descriptors. + var lastRangeIDInMeta1 roachpb.RangeID + return txn.Iterate(ctx, keys.MetaMin, keys.MetaMax, pageSize, + func(rows []kv.KeyValue) error { + descriptors := make([]roachpb.RangeDescriptor, 0, len(rows)) + var desc roachpb.RangeDescriptor + for _, row := range rows { + err := row.ValueProto(&desc) + if err != nil { + return errors.Wrapf(err, "unable to unmarshal range descriptor from %s", row.Key) + } + + // In small enough clusters it's possible for the same range + // descriptor to be stored in both meta1 and meta2. This + // happens when some range spans both the meta and the user + // keyspace. Consider when r1 is [/Min, + // /System/NodeLiveness); we'll store the range descriptor + // in both /Meta2/ and in /Meta1/KeyMax[1]. + // + // As part of iterator we'll de-duplicate this descriptor + // away by checking whether we've seen it before in meta1. + // Since we're scanning over the meta range in sorted + // order, it's enough to check against the last range + // descriptor we've seen in meta1. + // + // [1]: See kvserver.rangeAddressing. + if desc.RangeID == lastRangeIDInMeta1 { + continue + } + + descriptors = append(descriptors, desc) + if keys.InMeta1(keys.RangeMetaKey(desc.StartKey)) { + lastRangeIDInMeta1 = desc.RangeID + } + } + + // Invoke fn with the current chunk (of size ~blockSize) of + // range descriptors. + return fn(descriptors...) + }) + }) +} diff --git a/pkg/upgrade/upgradecluster/client_test.go b/pkg/util/rangedesciter/rangedesciter_test.go similarity index 78% rename from pkg/upgrade/upgradecluster/client_test.go rename to pkg/util/rangedesciter/rangedesciter_test.go index 69346044364f..67f79259c109 100644 --- a/pkg/upgrade/upgradecluster/client_test.go +++ b/pkg/util/rangedesciter/rangedesciter_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 The Cockroach Authors. +// Copyright 2022 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package upgradecluster_test +package rangedesciter_test import ( "context" @@ -20,17 +20,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/upgrade/nodelivenesstest" - "github.com/cockroachdb/cockroach/pkg/upgrade/upgradecluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/rangedesciter" ) -func TestClusterIterateRangeDescriptors(t *testing.T) { +func TestIterator(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - const numNodes = 1 - for _, splits := range [][]roachpb.Key{ {}, // no splits {keys.Meta2Prefix}, // split between meta1 and meta2 @@ -58,17 +55,11 @@ func TestClusterIterateRangeDescriptors(t *testing.T) { t.Fatal(err) } - c := nodelivenesstest.New(numNodes) - h := upgradecluster.New(upgradecluster.ClusterConfig{ - NodeLiveness: c, - Dialer: upgradecluster.NoopDialer{}, - DB: kvDB, - }) - + iter := rangedesciter.New(kvDB) for _, blockSize := range []int{1, 5, 10, 50} { var numDescs int init := func() { numDescs = 0 } - if err := h.IterateRangeDescriptors(ctx, blockSize, init, func(descriptors ...roachpb.RangeDescriptor) error { + if err := iter.Iterate(ctx, blockSize, init, func(descriptors ...roachpb.RangeDescriptor) error { numDescs += len(descriptors) return nil }); err != nil {