Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangedesciter: carve out library for range desc iteration #89988

Merged
merged 1 commit into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ ALL_TESTS = [
"//pkg/util/quantile:quantile_test",
"//pkg/util/quotapool:quotapool_test",
"//pkg/util/randutil:randutil_test",
"//pkg/util/rangedesciter:rangedesciter_test",
"//pkg/util/retry:retry_test",
"//pkg/util/ring:ring_test",
"//pkg/util/schedulerlatency:schedulerlatency_test",
Expand Down Expand Up @@ -2020,6 +2021,8 @@ GO_TARGETS = [
"//pkg/util/quotapool:quotapool_test",
"//pkg/util/randutil:randutil",
"//pkg/util/randutil:randutil_test",
"//pkg/util/rangedesciter:rangedesciter",
"//pkg/util/rangedesciter:rangedesciter_test",
"//pkg/util/retry:retry",
"//pkg/util/retry:retry_test",
"//pkg/util/ring:ring",
Expand Down Expand Up @@ -2994,6 +2997,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/quantile:get_x_data",
"//pkg/util/quotapool:get_x_data",
"//pkg/util/randutil:get_x_data",
"//pkg/util/rangedesciter:get_x_data",
"//pkg/util/retry:get_x_data",
"//pkg/util/ring:get_x_data",
"//pkg/util/schedulerlatency:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ go_library(
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/rangedesciter",
"//pkg/util/retry",
"//pkg/util/schedulerlatency",
"//pkg/util/stop",
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/rangedesciter"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -1004,9 +1005,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
var systemDeps upgrade.SystemDeps
if codec.ForSystemTenant() {
c = upgradecluster.New(upgradecluster.ClusterConfig{
NodeLiveness: nodeLiveness,
Dialer: cfg.nodeDialer,
DB: cfg.db,
NodeLiveness: nodeLiveness,
Dialer: cfg.nodeDialer,
RangeDescIterator: rangedesciter.New(cfg.db),
DB: cfg.db,
})
systemDeps = upgrade.SystemDeps{
Cluster: c,
Expand Down
24 changes: 0 additions & 24 deletions pkg/upgrade/system_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,30 +94,6 @@ type Cluster interface {
// just be the `Migrate` request, with code added within [1] to do the
// specific things intended for the specified version.
//
// It's important to note that the closure is being executed in the context of
// a distributed transaction that may be automatically retried. So something
// like the following is an anti-pattern:
//
// processed := 0
// _ = h.IterateRangeDescriptors(...,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors) // we'll over count if retried
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// Instead we allow callers to pass in a callback to signal on every attempt
// (including the first). This lets us salvage the example above:
//
// var processed int
// init := func() { processed = 0 }
// _ = h.IterateRangeDescriptors(..., init,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors)
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go
IterateRangeDescriptors(
ctx context.Context,
Expand Down
6 changes: 1 addition & 5 deletions pkg/upgrade/upgradecluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgradecluster",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
Expand All @@ -20,6 +19,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/quotapool",
"//pkg/util/rangedesciter",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@org_golang_google_grpc//:go_default_library",
Expand All @@ -30,23 +30,19 @@ go_test(
name = "upgradecluster_test",
size = "small",
srcs = [
"client_test.go",
"helper_test.go",
"main_test.go",
"nodes_test.go",
],
args = ["-test.timeout=55s"],
embed = [":upgradecluster"],
deps = [
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
Expand Down
53 changes: 5 additions & 48 deletions pkg/upgrade/upgradecluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ package upgradecluster
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -23,7 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/cockroach/pkg/util/rangedesciter"
"github.com/cockroachdb/redact"
"google.golang.org/grpc"
)
Expand All @@ -42,6 +41,9 @@ type ClusterConfig struct {
// Dialer constructs connections to other nodes.
Dialer NodeDialer

// RangeDescIterator iterates through all range descriptors.
RangeDescIterator rangedesciter.Iterator

// DB provides access the kv.DB instance backing the cluster.
//
// TODO(irfansharif): We could hide the kv.DB instance behind an interface
Expand Down Expand Up @@ -143,50 +145,5 @@ func (c *Cluster) ForEveryNode(
func (c *Cluster) IterateRangeDescriptors(
ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error,
) error {
return c.c.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Inform the caller that we're starting a fresh attempt to page in
// range descriptors.
init()

// Iterate through meta{1,2} to pull out all the range descriptors.
var lastRangeIDInMeta1 roachpb.RangeID
return txn.Iterate(ctx, keys.MetaMin, keys.MetaMax, blockSize,
func(rows []kv.KeyValue) error {
descriptors := make([]roachpb.RangeDescriptor, 0, len(rows))
var desc roachpb.RangeDescriptor
for _, row := range rows {
err := row.ValueProto(&desc)
if err != nil {
return errors.Wrapf(err, "unable to unmarshal range descriptor from %s", row.Key)
}

// In small enough clusters it's possible for the same range
// descriptor to be stored in both meta1 and meta2. This
// happens when some range spans both the meta and the user
// keyspace. Consider when r1 is [/Min,
// /System/NodeLiveness); we'll store the range descriptor
// in both /Meta2/<r1.EndKey> and in /Meta1/KeyMax[1].
//
// As part of iterator we'll de-duplicate this descriptor
// away by checking whether we've seen it before in meta1.
// Since we're scanning over the meta range in sorted
// order, it's enough to check against the last range
// descriptor we've seen in meta1.
//
// [1]: See kvserver.rangeAddressing.
if desc.RangeID == lastRangeIDInMeta1 {
continue
}

descriptors = append(descriptors, desc)
if keys.InMeta1(keys.RangeMetaKey(desc.StartKey)) {
lastRangeIDInMeta1 = desc.RangeID
}
}

// Invoke fn with the current chunk (of size ~blockSize) of
// range descriptors.
return fn(descriptors...)
})
})
return c.c.RangeDescIterator.Iterate(ctx, blockSize, init, fn)
}
39 changes: 39 additions & 0 deletions pkg/util/rangedesciter/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "rangedesciter",
srcs = ["rangedesciter.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/rangedesciter",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "rangedesciter_test",
srcs = [
"main_test.go",
"rangedesciter_test.go",
],
args = ["-test.timeout=295s"],
deps = [
":rangedesciter",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/tests",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
],
)

get_x_data(name = "get_x_data")
31 changes: 31 additions & 0 deletions pkg/util/rangedesciter/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangedesciter_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
securityassets.SetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
Loading