Skip to content

Commit

Permalink
spanconfig: introduce the span config manager
Browse files Browse the repository at this point in the history
This patch introduces the spanconfigmanager.Manager. This struct is
responsible for setting up the auto span config reconciliation job. The
auto span config reconciliation job is a per tenant job responsible for
reconciling SQL zone configs to KV span configs. This is intended to
be a per tenant singleton, non-cancellable job. The manager ensures
these semantics.

The manager will also provide the job access to components it needs to
perform reconciliation. The skeleton to plumb these components has been
added in this patch but the components themselves will come
subsequently.

References cockroachdb#67679

Release note: None
  • Loading branch information
arulajmani committed Aug 12, 2021
1 parent 796b827 commit 0db6ece
Show file tree
Hide file tree
Showing 27 changed files with 667 additions and 27 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
/pkg/scheduledjobs/ @cockroachdb/bulk-prs
/pkg/security/ @cockroachdb/server-prs
/pkg/settings/ @cockroachdb/server-prs
/pkg/spanconfig/ @cockroachdb/multiregion
/pkg/sql/ @cockroachdb/sql-queries-noreview
/pkg/startupmigrations/ @cockroachdb/sql-queries-noreview
/pkg/streaming/ @cockroachdb/bulk-prs
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ ALL_TESTS = [
"//pkg/server/tracedumper:tracedumper_test",
"//pkg/server:server_test",
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catalogkv:catalogkv_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ type TestingKnobs struct {
MigrationManager ModuleTestingKnobs
IndexUsageStatsKnobs ModuleTestingKnobs
SQLStatsKnobs ModuleTestingKnobs
SpanConfig ModuleTestingKnobs
}
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 40 descriptors and 41 namespace entries...
ParentID 50, ParentSchemaID 29: relation "foo" (53): expected matching namespace entry, found none
Examining 3 jobs...
Examining 4 jobs...
ERROR: validation failed
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ go_test(
"//pkg/server",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand Down
21 changes: 19 additions & 2 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
Expand Down Expand Up @@ -121,7 +122,15 @@ func TestRegistryGC(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// disable the automatic span config job creation to reduce the noise
// when querying system.jobs.
ManagerDisableJobCreation: true,
},
},
})
defer s.Stopper().Stop(ctx)

db := sqlutils.MakeSQLRunner(sqlDB)
Expand Down Expand Up @@ -260,7 +269,15 @@ func TestRegistryGCPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// disable the automatic span config job creation to reduce the noise
// when querying system.jobs.
ManagerDisableJobCreation: true,
},
},
})
db := sqlutils.MakeSQLRunner(sqlDB)
defer s.Stopper().Stop(ctx)

Expand Down
3 changes: 3 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ go_library(
"//pkg/server/tracedumper",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigmanager",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand Down
77 changes: 62 additions & 15 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,9 +1410,16 @@ func TestHealthAPI(t *testing.T) {
}
}

// getSystemJobIDs queries the jobs table for all jobs IDs. Sorted by decreasing creation time.
func getSystemJobIDs(t testing.TB, db *sqlutils.SQLRunner) []int64 {
rows := db.Query(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC;`)
// getSystemJobIDs queries the jobs table for all job IDs that have
// the given status. Sorted by decreasing creation time.
func getSystemJobIDs(t testing.TB, db *sqlutils.SQLRunner, status jobs.Status) []int64 {
rows := db.Query(
t,
fmt.Sprintf(
`SELECT job_id FROM crdb_internal.jobs WHERE status='%s' ORDER BY created DESC;`,
status,
),
)
defer rows.Close()

res := []int64{}
Expand Down Expand Up @@ -1443,8 +1450,8 @@ func TestAdminAPIJobs(t *testing.T) {
}
})

// Get list of existing jobs (migrations). Assumed to all have succeeded.
existingIDs := getSystemJobIDs(t, sqlDB)
existingSucceededIDs := getSystemJobIDs(t, sqlDB, jobs.StatusSucceeded)
existingRunningIDs := getSystemJobIDs(t, sqlDB, jobs.StatusRunning)

testJobs := []struct {
id int64
Expand Down Expand Up @@ -1496,16 +1503,56 @@ func TestAdminAPIJobs(t *testing.T) {
expectedIDsViaAdmin []int64
expectedIDsViaNonAdmin []int64
}{
{"jobs", append([]int64{5, 4, 3, 2, 1}, existingIDs...), []int64{5}},
{"jobs?limit=1", []int64{5}, []int64{5}},
{"jobs?status=running", []int64{4, 2, 1}, []int64{}},
{"jobs?status=succeeded", append([]int64{5, 3}, existingIDs...), []int64{5}},
{"jobs?status=pending", []int64{}, []int64{}},
{"jobs?status=garbage", []int64{}, []int64{}},
{fmt.Sprintf("jobs?type=%d", jobspb.TypeBackup), []int64{5, 3, 2}, []int64{5}},
{fmt.Sprintf("jobs?type=%d", jobspb.TypeRestore), []int64{1}, []int64{}},
{fmt.Sprintf("jobs?type=%d", invalidJobType), []int64{}, []int64{}},
{fmt.Sprintf("jobs?status=running&type=%d", jobspb.TypeBackup), []int64{2}, []int64{}},
{
"jobs",
append(append([]int64{5, 4, 3, 2, 1}, existingSucceededIDs...), existingRunningIDs...),
[]int64{5},
},
{
"jobs?limit=1",
[]int64{5},
[]int64{5},
},
{
"jobs?status=running",
append([]int64{4, 2, 1}, existingRunningIDs...),
[]int64{},
},
{
"jobs?status=succeeded",
append([]int64{5, 3}, existingSucceededIDs...),
[]int64{5},
},
{
"jobs?status=pending",
[]int64{},
[]int64{},
},
{
"jobs?status=garbage",
[]int64{},
[]int64{},
},
{
fmt.Sprintf("jobs?type=%d", jobspb.TypeBackup),
[]int64{5, 3, 2},
[]int64{5},
},
{
fmt.Sprintf("jobs?type=%d", jobspb.TypeRestore),
[]int64{1},
[]int64{},
},
{
fmt.Sprintf("jobs?type=%d", invalidJobType),
[]int64{},
[]int64{},
},
{
fmt.Sprintf("jobs?status=running&type=%d", jobspb.TypeBackup),
[]int64{2},
[]int64{},
},
}

testutils.RunTrueAndFalse(t, "isAdmin", func(t *testing.T, isAdmin bool) {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
_ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
Expand Down
22 changes: 22 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand Down Expand Up @@ -798,6 +800,23 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
execCfg.VersionUpgradeHook = migrationMgr.Migrate
}

{
// Instantiate a span config manager which exposes a hook to start the auto
// span config reconciliation during server startup. The span config manager
// provides the reconciliation job with access to dependencies it needs to
// perform its task. The job does so through the executor config.
knobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
reconciliationMgr := spanconfigmanager.New(
cfg.db,
jobRegistry,
cfg.circularInternalExecutor,
cfg.stopper,
knobs,
)
execCfg.SpanConfigReconciliationJobDeps = reconciliationMgr
execCfg.StartSpanConfigReconciliationJobHook = reconciliationMgr.StartJobIfNoneExists
}

temporaryObjectCleaner := sql.NewTemporaryObjectCleaner(
cfg.Settings,
cfg.db,
Expand Down Expand Up @@ -996,6 +1015,9 @@ func (s *SQLServer) preStart(
return err
}

// Create and start the span config reconciliation job if none exist.
_ = s.execCfg.StartSpanConfigReconciliationJobHook(ctx)

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down
12 changes: 12 additions & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "spanconfig",
srcs = [
"spanconfig.go",
"testing_knobs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig",
visibility = ["//visibility:public"],
deps = ["//pkg/base"],
)
28 changes: 28 additions & 0 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfig

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
// configurations.
type ReconciliationDependencies interface {
// TODO(zcfgs-pod): Placeholder comment until subsequent PRs add useful
// interfaces here.
// The job will want access to two interfaces to reconcile.
// 1. spanconfig.KVAccessor -- this will expose RPCs the job can use to fetch
// span configs from KV and update them. It'll be implemented by Node for the
// host tenant and the Connector for secondary tenants.
// 2. spanconfig.SQLWatcher -- this will maintain a rangefeed over
// system.{descriptors, zones} and be responsible for generating span config
// updates. The job will respond to these updates by issuing RPCs using the
// KVAccessor.
}
15 changes: 15 additions & 0 deletions pkg/spanconfig/spanconfigjob/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "spanconfigjob",
srcs = ["job.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/settings/cluster",
"//pkg/sql",
"@com_github_cockroachdb_errors//:errors",
],
)
51 changes: 51 additions & 0 deletions pkg/spanconfig/spanconfigjob/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigjob

import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/errors"
)

type resumer struct {
job *jobs.Job
}

var _ jobs.Resumer = (*resumer)(nil)

// Resume implements the jobs.Resumer interface.
func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) error {
execCtx := execCtxI.(sql.JobExecContext)
rc := execCtx.SpanConfigReconciliationJobDeps()
// TODO(zcfg-pod): Upcoming PRs will actually make use of these reconciliation
// dependencies.
_ = rc

<-ctx.Done()
return ctx.Err()
}

// OnFailOrCancel implements the jobs.Resumer interface.
func (r *resumer) OnFailOrCancel(context.Context, interface{}) error {
return errors.AssertionFailedf("span config reconciliation job can never fail or be canceled")
}

func init() {
jobs.RegisterConstructor(jobspb.TypeAutoSpanConfigReconciliation,
func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return &resumer{job: job}
})
}
43 changes: 43 additions & 0 deletions pkg/spanconfig/spanconfigmanager/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spanconfigmanager",
srcs = ["manager.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/security",
"//pkg/spanconfig",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/util/log",
"//pkg/util/stop",
],
)

go_test(
name = "spanconfigmanager_test",
srcs = [
"main_test.go",
"manager_test.go",
],
deps = [
":spanconfigmanager",
"//pkg/base",
"//pkg/jobs",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/syncutil",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit 0db6ece

Please sign in to comment.