Skip to content

Commit

Permalink
spanconfig: introduce the span config manager
Browse files Browse the repository at this point in the history
This patch introduces the spanconfigmanager.Manager. This struct is
responsible for providing a hook to idempootently set up an auto span
config reconciliation job which is responsible for reconciling SQL zone
configs to KV span configs. This job is intended to be a per tenant
singleton job that is non-cancellable. The hook the manager exposes
ensures these semantics.

Every SQL pod on startup creates a background task that periodically
calls into this hook. We don't expect these calls to actually be
instantiating a job after the first SQL pod has won the race, yet we do
this periodic thing in the case that there has been an error in the job
that has transitioned it to the failed state. The interval at which
this happens is controlled by the
`sql.span_config_reconciliation_job.idempotent_start_interval` cluster
setting. It defaults to 10 minutes and is private for now.

The manager will also provide the job access to components it needs to
perform reconciliation. The skeleton to plumb these components has been
added in this patch but the components themselves will come
subsequently.

References cockroachdb#67679

Release note: None
  • Loading branch information
arulajmani committed Aug 16, 2021
1 parent 3e7ddb3 commit f5d1d74
Show file tree
Hide file tree
Showing 32 changed files with 778 additions and 33 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
/pkg/scheduledjobs/ @cockroachdb/bulk-prs
/pkg/security/ @cockroachdb/server-prs
/pkg/settings/ @cockroachdb/server-prs
/pkg/spanconfig/ @cockroachdb/multiregion
/pkg/startupmigrations/ @cockroachdb/server-prs @cockroachdb/sql-schema
/pkg/streaming/ @cockroachdb/bulk-prs
/pkg/testutils/ @cockroachdb/test-eng
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-132 set the active cluster version in the format '<major>.<minor>'
version version 21.1-134 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-132</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-134</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ ALL_TESTS = [
"//pkg/server/tracedumper:tracedumper_test",
"//pkg/server:server_test",
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catalogkv:catalogkv_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ type TestingKnobs struct {
MigrationManager ModuleTestingKnobs
IndexUsageStatsKnobs ModuleTestingKnobs
SQLStatsKnobs ModuleTestingKnobs
SpanConfig ModuleTestingKnobs
}
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 40 descriptors and 41 namespace entries...
ParentID 50, ParentSchemaID 29: relation "foo" (53): expected matching namespace entry, found none
Examining 3 jobs...
Examining 4 jobs...
ERROR: validation failed
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ const (
PostSeparatedIntentsMigration
// RetryJobsWithExponentialBackoff retries failed jobs with exponential delays.
RetryJobsWithExponentialBackoff
// AutoSpanConfigReconciliationJob adds the AutoSpanConfigReconciliationJob
// type.
AutoSpanConfigReconciliationJob

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -467,6 +470,10 @@ var versionsSingleton = keyedVersions{
Key: RetryJobsWithExponentialBackoff,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 132},
},
{
Key: AutoSpanConfigReconciliationJob,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 134},
},

// Step (2): Add new versions here.
}
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ go_test(
"//pkg/server",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *Payload) Type() Type {
const AutoStatsName = "__auto__"

// AutomaticJobTypes is a list of automatic job types that currently exist.
var AutomaticJobTypes = [2]Type{TypeAutoCreateStats, TypeAutoSpanConfigReconciliation}
var AutomaticJobTypes = [...]Type{TypeAutoCreateStats, TypeAutoSpanConfigReconciliation}

// DetailsType returns the type for a payload detail.
func DetailsType(d isPayload_Details) Type {
Expand Down
21 changes: 19 additions & 2 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
Expand Down Expand Up @@ -130,7 +131,15 @@ func TestRegistryGC(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// disable the automatic span config job creation to reduce the noise
// when querying system.jobs.
ManagerDisableJobCreation: true,
},
},
})
defer s.Stopper().Stop(ctx)

db := sqlutils.MakeSQLRunner(sqlDB)
Expand Down Expand Up @@ -269,7 +278,15 @@ func TestRegistryGCPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// disable the automatic span config job creation to reduce the noise
// when querying system.jobs.
ManagerDisableJobCreation: true,
},
},
})
db := sqlutils.MakeSQLRunner(sqlDB)
defer s.Stopper().Stop(ctx)

Expand Down
3 changes: 3 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ go_library(
"//pkg/server/tracedumper",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigmanager",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand Down
76 changes: 61 additions & 15 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,9 +1410,14 @@ func TestHealthAPI(t *testing.T) {
}
}

// getSystemJobIDs queries the jobs table for all jobs IDs. Sorted by decreasing creation time.
func getSystemJobIDs(t testing.TB, db *sqlutils.SQLRunner) []int64 {
rows := db.Query(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC;`)
// getSystemJobIDs queries the jobs table for all job IDs that have
// the given status. Sorted by decreasing creation time.
func getSystemJobIDs(t testing.TB, db *sqlutils.SQLRunner, status jobs.Status) []int64 {
rows := db.Query(
t,
`SELECT job_id FROM crdb_internal.jobs WHERE status=$1 ORDER BY created DESC`,
status,
)
defer rows.Close()

res := []int64{}
Expand Down Expand Up @@ -1443,8 +1448,9 @@ func TestAdminAPIJobs(t *testing.T) {
}
})

// Get list of existing jobs (migrations). Assumed to all have succeeded.
existingIDs := getSystemJobIDs(t, sqlDB)
existingSucceededIDs := getSystemJobIDs(t, sqlDB, jobs.StatusSucceeded)
existingRunningIDs := getSystemJobIDs(t, sqlDB, jobs.StatusRunning)
existingIDs := append(existingSucceededIDs, existingRunningIDs...)

testJobs := []struct {
id int64
Expand Down Expand Up @@ -1496,16 +1502,56 @@ func TestAdminAPIJobs(t *testing.T) {
expectedIDsViaAdmin []int64
expectedIDsViaNonAdmin []int64
}{
{"jobs", append([]int64{5, 4, 3, 2, 1}, existingIDs...), []int64{5}},
{"jobs?limit=1", []int64{5}, []int64{5}},
{"jobs?status=running", []int64{4, 2, 1}, []int64{}},
{"jobs?status=succeeded", append([]int64{5, 3}, existingIDs...), []int64{5}},
{"jobs?status=pending", []int64{}, []int64{}},
{"jobs?status=garbage", []int64{}, []int64{}},
{fmt.Sprintf("jobs?type=%d", jobspb.TypeBackup), []int64{5, 3, 2}, []int64{5}},
{fmt.Sprintf("jobs?type=%d", jobspb.TypeRestore), []int64{1}, []int64{}},
{fmt.Sprintf("jobs?type=%d", invalidJobType), []int64{}, []int64{}},
{fmt.Sprintf("jobs?status=running&type=%d", jobspb.TypeBackup), []int64{2}, []int64{}},
{
"jobs",
append([]int64{5, 4, 3, 2, 1}, existingIDs...),
[]int64{5},
},
{
"jobs?limit=1",
[]int64{5},
[]int64{5},
},
{
"jobs?status=running",
append([]int64{4, 2, 1}, existingRunningIDs...),
[]int64{},
},
{
"jobs?status=succeeded",
append([]int64{5, 3}, existingSucceededIDs...),
[]int64{5},
},
{
"jobs?status=pending",
[]int64{},
[]int64{},
},
{
"jobs?status=garbage",
[]int64{},
[]int64{},
},
{
fmt.Sprintf("jobs?type=%d", jobspb.TypeBackup),
[]int64{5, 3, 2},
[]int64{5},
},
{
fmt.Sprintf("jobs?type=%d", jobspb.TypeRestore),
[]int64{1},
[]int64{},
},
{
fmt.Sprintf("jobs?type=%d", invalidJobType),
[]int64{},
[]int64{},
},
{
fmt.Sprintf("jobs?status=running&type=%d", jobspb.TypeBackup),
[]int64{2},
[]int64{},
},
}

testutils.RunTrueAndFalse(t, "isAdmin", func(t *testing.T, isAdmin bool) {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
_ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
Expand Down
51 changes: 51 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand Down Expand Up @@ -811,6 +813,21 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
execCfg.MigrationTestingKnobs = knobs
}

{
// Instantiate a span config manager; it exposes a hook to idempotently
// create the span config reconciliation job and captures all relevant job
// dependencies.
knobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
reconciliationMgr := spanconfigmanager.New(
cfg.db,
jobRegistry,
cfg.circularInternalExecutor,
knobs,
)
execCfg.SpanConfigReconciliationJobDeps = reconciliationMgr
execCfg.StartSpanConfigReconciliationJobHook = reconciliationMgr.StartJobIfNoneExists
}

temporaryObjectCleaner := sql.NewTemporaryObjectCleaner(
cfg.Settings,
cfg.db,
Expand Down Expand Up @@ -1009,6 +1026,40 @@ func (s *SQLServer) preStart(
return err
}

// Start a background task that starts the auto span config reconciliation job.
// The background task also periodically (as dictated by a cluster setting)
// checks to ensure that the job exists. We don't expect this to happen, but
// if does, we want to start it again.
if err := s.stopper.RunAsyncTask(ctx, "start-span-config-reconciliation-job",
func(ctx context.Context) {
nextTick := timeutil.Now()
for {
nextTickCh := time.After(nextTick.Sub(timeutil.Now()))
select {
case <-nextTickCh:
// Idempotently start the span config reconciliation job if the
// cluster version allows for it.
if s.execCfg.Settings.Version.IsActive(ctx, clusterversion.AutoSpanConfigReconciliationJob) {
_ = s.execCfg.StartSpanConfigReconciliationJobHook(ctx)
}
case <-s.stopper.ShouldQuiesce():
return
case <-ctx.Done():
return
}
nextTick = nextTick.Add(
spanconfig.CheckAndStartReconciliationJobInterval.Get(&s.execCfg.Settings.SV),
)
log.Infof(
ctx,
"next check to ensure span config reconciliation job exists scheduled %s",
nextTick.String(),
)
}
}); err != nil {
return err
}

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down
15 changes: 15 additions & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "spanconfig",
srcs = [
"spanconfig.go",
"testing_knobs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/settings",
],
)
43 changes: 43 additions & 0 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfig

import (
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
)

// CheckAndStartReconciliationJobInterval is a cluster setting to control how
// often the existence of the automatic span config reconciliation job will be
// checked. If the check concludes that the job doesn't exist it will be started.
var CheckAndStartReconciliationJobInterval = settings.RegisterDurationSetting(
"sql.span_config_reconciliation_job.idempotent_start_interval",
"how often to check for the span config reconciliation job exists and start it if it doesn't",
10*time.Minute,
)

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
// configurations.
type ReconciliationDependencies interface {
// TODO(zcfgs-pod): Placeholder comment until subsequent PRs add useful
// interfaces here.
// The job will want access to two interfaces to reconcile.
// 1. spanconfig.KVAccessor -- this will expose RPCs the job can use to fetch
// span configs from KV and update them. It'll be implemented by Node for the
// host tenant and the Connector for secondary tenants.
// 2. spanconfig.SQLWatcher -- this will maintain a rangefeed over
// system.{descriptors, zones} and be responsible for generating span config
// updates. The job will respond to these updates by issuing RPCs using the
// KVAccessor.
}
15 changes: 15 additions & 0 deletions pkg/spanconfig/spanconfigjob/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "spanconfigjob",
srcs = ["job.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/settings/cluster",
"//pkg/sql",
"@com_github_cockroachdb_errors//:errors",
],
)
Loading

0 comments on commit f5d1d74

Please sign in to comment.