Skip to content

Commit

Permalink
spanconfig: introduce the span config manager
Browse files Browse the repository at this point in the history
This patch introduces the span config subsystem and its orchestrator,
the spanconfig.Manager. The span config manager is owns the process of
reconciling SQL zone configs to KV span configs. This includes managing
the span config reconciliation job and providing it access to
dependencies it needs to perform reconciliation.

The span config reconciliation job is intended to be a per tenant,
forever running job that is non-cancellable. At any point there should
be one (and only one) of these jobs running. The manager helps ensure
these semantics.

Even though we expect this job to be forever running (and therefore
never transition to a failed state), the manager on every sql pod
will periodically ensure that the job is indeed running. It will
restart a new job in case it finds that the job is not running. The
interval at which this happens is dictated by the
`sql.span_config_reconciliation_job.idempotent_start_interval` cluster
setting. It defaults to 10 minutes and is private for now.

This patch only adds the skeleton to encapsulate and plumb dependencies
to the job. The specific components themselves will come in subsequent
patches.

References cockroachdb#67679

Release note: None
  • Loading branch information
arulajmani committed Aug 18, 2021
1 parent 38463fd commit fd0af6e
Show file tree
Hide file tree
Showing 32 changed files with 818 additions and 33 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
/pkg/scheduledjobs/ @cockroachdb/bulk-prs
/pkg/security/ @cockroachdb/server-prs
/pkg/settings/ @cockroachdb/server-prs
/pkg/spanconfig/ @cockroachdb/multiregion
/pkg/startupmigrations/ @cockroachdb/server-prs @cockroachdb/sql-schema
/pkg/streaming/ @cockroachdb/bulk-prs
/pkg/testutils/ @cockroachdb/test-eng
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-132 set the active cluster version in the format '<major>.<minor>'
version version 21.1-134 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-132</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-134</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ ALL_TESTS = [
"//pkg/server/tracedumper:tracedumper_test",
"//pkg/server:server_test",
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catalogkv:catalogkv_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ type TestingKnobs struct {
MigrationManager ModuleTestingKnobs
IndexUsageStatsKnobs ModuleTestingKnobs
SQLStatsKnobs ModuleTestingKnobs
SpanConfig ModuleTestingKnobs
}
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 40 descriptors and 41 namespace entries...
ParentID 50, ParentSchemaID 29: relation "foo" (53): expected matching namespace entry, found none
Examining 3 jobs...
Examining 4 jobs...
ERROR: validation failed
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ const (
PostSeparatedIntentsMigration
// RetryJobsWithExponentialBackoff retries failed jobs with exponential delays.
RetryJobsWithExponentialBackoff
// AutoSpanConfigReconciliationJob adds the AutoSpanConfigReconciliationJob
// type.
AutoSpanConfigReconciliationJob

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -467,6 +470,10 @@ var versionsSingleton = keyedVersions{
Key: RetryJobsWithExponentialBackoff,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 132},
},
{
Key: AutoSpanConfigReconciliationJob,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 134},
},

// Step (2): Add new versions here.
}
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ go_test(
"//pkg/server",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *Payload) Type() Type {
const AutoStatsName = "__auto__"

// AutomaticJobTypes is a list of automatic job types that currently exist.
var AutomaticJobTypes = [2]Type{TypeAutoCreateStats, TypeAutoSpanConfigReconciliation}
var AutomaticJobTypes = [...]Type{TypeAutoCreateStats, TypeAutoSpanConfigReconciliation}

// DetailsType returns the type for a payload detail.
func DetailsType(d isPayload_Details) Type {
Expand Down
32 changes: 30 additions & 2 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
Expand Down Expand Up @@ -130,7 +131,17 @@ func TestRegistryGC(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// This test directly modifies `system.jobs` and makes over its contents
// by querying it. We disable the auto span config reconciliation job
// from getting created so that we don't have to special case it in the
// test itself.
ManagerDisableJobCreation: true,
},
},
})
defer s.Stopper().Stop(ctx)

db := sqlutils.MakeSQLRunner(sqlDB)
Expand Down Expand Up @@ -269,7 +280,17 @@ func TestRegistryGCPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// This test directly modifies `system.jobs` and makes over its contents
// by querying it. We disable the auto span config reconciliation job
// from getting created so that we don't have to special case it in the
// test itself.
ManagerDisableJobCreation: true,
},
},
})
db := sqlutils.MakeSQLRunner(sqlDB)
defer s.Stopper().Stop(ctx)

Expand Down Expand Up @@ -478,6 +499,13 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
Settings: cs,
Knobs: base.TestingKnobs{
JobsTestingKnobs: knobs,
SpanConfig: &spanconfig.TestingKnobs{
// This test directly modifies `system.jobs` and makes over its contents
// by querying it. We disable the auto span config reconciliation job
// from getting created so that we don't have to special case it in the
// test itself.
ManagerDisableJobCreation: true,
},
},
}
var sqlDB *gosql.DB
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ go_library(
"//pkg/server/tracedumper",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigmanager",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand Down
76 changes: 61 additions & 15 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,9 +1410,14 @@ func TestHealthAPI(t *testing.T) {
}
}

// getSystemJobIDs queries the jobs table for all jobs IDs. Sorted by decreasing creation time.
func getSystemJobIDs(t testing.TB, db *sqlutils.SQLRunner) []int64 {
rows := db.Query(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC;`)
// getSystemJobIDs queries the jobs table for all job IDs that have
// the given status. Sorted by decreasing creation time.
func getSystemJobIDs(t testing.TB, db *sqlutils.SQLRunner, status jobs.Status) []int64 {
rows := db.Query(
t,
`SELECT job_id FROM crdb_internal.jobs WHERE status=$1 ORDER BY created DESC`,
status,
)
defer rows.Close()

res := []int64{}
Expand Down Expand Up @@ -1443,8 +1448,9 @@ func TestAdminAPIJobs(t *testing.T) {
}
})

// Get list of existing jobs (migrations). Assumed to all have succeeded.
existingIDs := getSystemJobIDs(t, sqlDB)
existingSucceededIDs := getSystemJobIDs(t, sqlDB, jobs.StatusSucceeded)
existingRunningIDs := getSystemJobIDs(t, sqlDB, jobs.StatusRunning)
existingIDs := append(existingSucceededIDs, existingRunningIDs...)

testJobs := []struct {
id int64
Expand Down Expand Up @@ -1496,16 +1502,56 @@ func TestAdminAPIJobs(t *testing.T) {
expectedIDsViaAdmin []int64
expectedIDsViaNonAdmin []int64
}{
{"jobs", append([]int64{5, 4, 3, 2, 1}, existingIDs...), []int64{5}},
{"jobs?limit=1", []int64{5}, []int64{5}},
{"jobs?status=running", []int64{4, 2, 1}, []int64{}},
{"jobs?status=succeeded", append([]int64{5, 3}, existingIDs...), []int64{5}},
{"jobs?status=pending", []int64{}, []int64{}},
{"jobs?status=garbage", []int64{}, []int64{}},
{fmt.Sprintf("jobs?type=%d", jobspb.TypeBackup), []int64{5, 3, 2}, []int64{5}},
{fmt.Sprintf("jobs?type=%d", jobspb.TypeRestore), []int64{1}, []int64{}},
{fmt.Sprintf("jobs?type=%d", invalidJobType), []int64{}, []int64{}},
{fmt.Sprintf("jobs?status=running&type=%d", jobspb.TypeBackup), []int64{2}, []int64{}},
{
"jobs",
append([]int64{5, 4, 3, 2, 1}, existingIDs...),
[]int64{5},
},
{
"jobs?limit=1",
[]int64{5},
[]int64{5},
},
{
"jobs?status=running",
append([]int64{4, 2, 1}, existingRunningIDs...),
[]int64{},
},
{
"jobs?status=succeeded",
append([]int64{5, 3}, existingSucceededIDs...),
[]int64{5},
},
{
"jobs?status=pending",
[]int64{},
[]int64{},
},
{
"jobs?status=garbage",
[]int64{},
[]int64{},
},
{
fmt.Sprintf("jobs?type=%d", jobspb.TypeBackup),
[]int64{5, 3, 2},
[]int64{5},
},
{
fmt.Sprintf("jobs?type=%d", jobspb.TypeRestore),
[]int64{1},
[]int64{},
},
{
fmt.Sprintf("jobs?type=%d", invalidJobType),
[]int64{},
[]int64{},
},
{
fmt.Sprintf("jobs?status=running&type=%d", jobspb.TypeBackup),
[]int64{2},
[]int64{},
},
}

testutils.RunTrueAndFalse(t, "isAdmin", func(t *testing.T, isAdmin bool) {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
_ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
Expand Down
22 changes: 22 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand Down Expand Up @@ -133,6 +135,7 @@ type SQLServer struct {
sqlInstanceProvider sqlinstance.Provider
metricsRegistry *metric.Registry
diagnosticsReporter *diagnostics.Reporter
spanconfigMgr spanconfig.Manager

// settingsWatcher is utilized by secondary tenants to watch for settings
// changes. It is nil on the system tenant.
Expand Down Expand Up @@ -811,6 +814,20 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
execCfg.MigrationTestingKnobs = knobs
}

// Instantiate a span config manager; it exposes a hook to idempotently
// create the span config reconciliation job and captures all relevant job
// dependencies.
knobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
spanconfigMgr := spanconfigmanager.New(
cfg.db,
jobRegistry,
cfg.circularInternalExecutor,
cfg.stopper,
cfg.Settings,
knobs,
)
execCfg.SpanConfigReconciliationJobDeps = spanconfigMgr

temporaryObjectCleaner := sql.NewTemporaryObjectCleaner(
cfg.Settings,
cfg.db,
Expand Down Expand Up @@ -871,6 +888,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
sqlInstanceProvider: cfg.sqlInstanceProvider,
metricsRegistry: cfg.registry,
diagnosticsReporter: reporter,
spanconfigMgr: spanconfigMgr,
settingsWatcher: settingsWatcher,
}, nil
}
Expand Down Expand Up @@ -1009,6 +1027,10 @@ func (s *SQLServer) preStart(
return err
}

if err := s.spanconfigMgr.Start(ctx); err != nil {
return err
}

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down
15 changes: 15 additions & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "spanconfig",
srcs = [
"spanconfig.go",
"testing_knobs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/settings",
],
)
57 changes: 57 additions & 0 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfig

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
)

// CheckAndStartReconciliationJobInterval is a cluster setting to control how
// often the existence of the automatic span config reconciliation job will be
// checked. If the check concludes that the job doesn't exist it will be started.
var CheckAndStartReconciliationJobInterval = settings.RegisterDurationSetting(
"sql.span_config_reconciliation_job.idempotent_start_interval",
"how often to check for the span config reconciliation job exists and start it if it doesn't",
10*time.Minute,
settings.NonNegativeDuration,
)

// Manager is the coordinator of the span config subsystem. It is responsible
// for the following tasks:
//
// 1. Ensuring that one (and only one) span config reconciliation job exists for
// every tenant.
// 2. Encapsulating all dependencies required by the span config reconciliation
// job to perform its task.
type Manager interface {
Start(ctx context.Context) error
ReconciliationDependencies
}

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
// configurations.
type ReconciliationDependencies interface {
// TODO(zcfgs-pod): Placeholder comment until subsequent PRs add useful
// interfaces here.
// The job will want access to two interfaces to reconcile.
// 1. spanconfig.KVAccessor -- this will expose RPCs the job can use to fetch
// span configs from KV and update them. It'll be implemented by Node for the
// host tenant and the Connector for secondary tenants.
// 2. spanconfig.SQLWatcher -- this will maintain a rangefeed over
// system.{descriptors, zones} and be responsible for generating span config
// updates. The job will respond to these updates by issuing RPCs using the
// KVAccessor
}
Loading

0 comments on commit fd0af6e

Please sign in to comment.