Skip to content

Commit

Permalink
spanconfig: introduce the span config manager
Browse files Browse the repository at this point in the history
This patch introduces the span config subsystem and its orchestrator,
the spanconfig.Manager. The span config manager is owns the process of
reconciling SQL zone configs to KV span configs. This includes managing
the span config reconciliation job and providing it access to
dependencies it needs to perform reconciliation.

The span config reconciliation job is intended to be a per tenant,
forever running job that is non-cancellable. At any point there should
be one (and only one) of these jobs running. The manager helps ensure
these semantics.

Even though we expect this job to be forever running (and therefore
never transition to a failed state), the manager on every sql pod
will periodically ensure that the job is indeed running. It will
restart a new job in case it finds that the job is not running. The
interval at which this happens is dictated by the
`sql.span_config_reconciliation_job.idempotent_start_interval` cluster
setting. It defaults to 10 minutes and is private for now.

This patch only adds the skeleton to encapsulate and plumb dependencies
to the job. The specific components themselves will come in subsequent
patches.

References cockroachdb#67679

Release note: None
  • Loading branch information
arulajmani committed Aug 18, 2021
1 parent 38463fd commit 5e72733
Show file tree
Hide file tree
Showing 33 changed files with 821 additions and 35 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
/pkg/scheduledjobs/ @cockroachdb/bulk-prs
/pkg/security/ @cockroachdb/server-prs
/pkg/settings/ @cockroachdb/server-prs
/pkg/spanconfig/ @cockroachdb/multiregion
/pkg/startupmigrations/ @cockroachdb/server-prs @cockroachdb/sql-schema
/pkg/streaming/ @cockroachdb/bulk-prs
/pkg/testutils/ @cockroachdb/test-eng
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-132 set the active cluster version in the format '<major>.<minor>'
version version 21.1-134 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-132</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-134</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ ALL_TESTS = [
"//pkg/server/tracedumper:tracedumper_test",
"//pkg/server:server_test",
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catalogkv:catalogkv_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ type TestingKnobs struct {
MigrationManager ModuleTestingKnobs
IndexUsageStatsKnobs ModuleTestingKnobs
SQLStatsKnobs ModuleTestingKnobs
SpanConfig ModuleTestingKnobs
}
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 40 descriptors and 41 namespace entries...
ParentID 50, ParentSchemaID 29: relation "foo" (53): expected matching namespace entry, found none
Examining 3 jobs...
Examining 4 jobs...
ERROR: validation failed
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ const (
PostSeparatedIntentsMigration
// RetryJobsWithExponentialBackoff retries failed jobs with exponential delays.
RetryJobsWithExponentialBackoff
// AutoSpanConfigReconciliationJob adds the AutoSpanConfigReconciliationJob
// type.
AutoSpanConfigReconciliationJob

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -467,6 +470,10 @@ var versionsSingleton = keyedVersions{
Key: RetryJobsWithExponentialBackoff,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 132},
},
{
Key: AutoSpanConfigReconciliationJob,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 134},
},

// Step (2): Add new versions here.
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ go_test(
"//pkg/server",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *Payload) Type() Type {
const AutoStatsName = "__auto__"

// AutomaticJobTypes is a list of automatic job types that currently exist.
var AutomaticJobTypes = [2]Type{TypeAutoCreateStats, TypeAutoSpanConfigReconciliation}
var AutomaticJobTypes = [...]Type{TypeAutoCreateStats, TypeAutoSpanConfigReconciliation}

// DetailsType returns the type for a payload detail.
func DetailsType(d isPayload_Details) Type {
Expand Down
32 changes: 30 additions & 2 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
Expand Down Expand Up @@ -130,7 +131,17 @@ func TestRegistryGC(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// This test directly modifies `system.jobs` and makes over its contents
// by querying it. We disable the auto span config reconciliation job
// from getting created so that we don't have to special case it in the
// test itself.
ManagerDisableJobCreation: true,
},
},
})
defer s.Stopper().Stop(ctx)

db := sqlutils.MakeSQLRunner(sqlDB)
Expand Down Expand Up @@ -269,7 +280,17 @@ func TestRegistryGCPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// This test directly modifies `system.jobs` and makes over its contents
// by querying it. We disable the auto span config reconciliation job
// from getting created so that we don't have to special case it in the
// test itself.
ManagerDisableJobCreation: true,
},
},
})
db := sqlutils.MakeSQLRunner(sqlDB)
defer s.Stopper().Stop(ctx)

Expand Down Expand Up @@ -478,6 +499,13 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
Settings: cs,
Knobs: base.TestingKnobs{
JobsTestingKnobs: knobs,
SpanConfig: &spanconfig.TestingKnobs{
// This test directly modifies `system.jobs` and makes over its contents
// by querying it. We disable the auto span config reconciliation job
// from getting created so that we don't have to special case it in the
// test itself.
ManagerDisableJobCreation: true,
},
},
}
var sqlDB *gosql.DB
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ go_library(
"//pkg/server/tracedumper",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
"//pkg/spanconfig/spanconfigmanager",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand Down
76 changes: 61 additions & 15 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,9 +1410,14 @@ func TestHealthAPI(t *testing.T) {
}
}

// getSystemJobIDs queries the jobs table for all jobs IDs. Sorted by decreasing creation time.
func getSystemJobIDs(t testing.TB, db *sqlutils.SQLRunner) []int64 {
rows := db.Query(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC;`)
// getSystemJobIDs queries the jobs table for all job IDs that have
// the given status. Sorted by decreasing creation time.
func getSystemJobIDs(t testing.TB, db *sqlutils.SQLRunner, status jobs.Status) []int64 {
rows := db.Query(
t,
`SELECT job_id FROM crdb_internal.jobs WHERE status=$1 ORDER BY created DESC`,
status,
)
defer rows.Close()

res := []int64{}
Expand Down Expand Up @@ -1443,8 +1448,9 @@ func TestAdminAPIJobs(t *testing.T) {
}
})

// Get list of existing jobs (migrations). Assumed to all have succeeded.
existingIDs := getSystemJobIDs(t, sqlDB)
existingSucceededIDs := getSystemJobIDs(t, sqlDB, jobs.StatusSucceeded)
existingRunningIDs := getSystemJobIDs(t, sqlDB, jobs.StatusRunning)
existingIDs := append(existingSucceededIDs, existingRunningIDs...)

testJobs := []struct {
id int64
Expand Down Expand Up @@ -1496,16 +1502,56 @@ func TestAdminAPIJobs(t *testing.T) {
expectedIDsViaAdmin []int64
expectedIDsViaNonAdmin []int64
}{
{"jobs", append([]int64{5, 4, 3, 2, 1}, existingIDs...), []int64{5}},
{"jobs?limit=1", []int64{5}, []int64{5}},
{"jobs?status=running", []int64{4, 2, 1}, []int64{}},
{"jobs?status=succeeded", append([]int64{5, 3}, existingIDs...), []int64{5}},
{"jobs?status=pending", []int64{}, []int64{}},
{"jobs?status=garbage", []int64{}, []int64{}},
{fmt.Sprintf("jobs?type=%d", jobspb.TypeBackup), []int64{5, 3, 2}, []int64{5}},
{fmt.Sprintf("jobs?type=%d", jobspb.TypeRestore), []int64{1}, []int64{}},
{fmt.Sprintf("jobs?type=%d", invalidJobType), []int64{}, []int64{}},
{fmt.Sprintf("jobs?status=running&type=%d", jobspb.TypeBackup), []int64{2}, []int64{}},
{
"jobs",
append([]int64{5, 4, 3, 2, 1}, existingIDs...),
[]int64{5},
},
{
"jobs?limit=1",
[]int64{5},
[]int64{5},
},
{
"jobs?status=running",
append([]int64{4, 2, 1}, existingRunningIDs...),
[]int64{},
},
{
"jobs?status=succeeded",
append([]int64{5, 3}, existingSucceededIDs...),
[]int64{5},
},
{
"jobs?status=pending",
[]int64{},
[]int64{},
},
{
"jobs?status=garbage",
[]int64{},
[]int64{},
},
{
fmt.Sprintf("jobs?type=%d", jobspb.TypeBackup),
[]int64{5, 3, 2},
[]int64{5},
},
{
fmt.Sprintf("jobs?type=%d", jobspb.TypeRestore),
[]int64{1},
[]int64{},
},
{
fmt.Sprintf("jobs?type=%d", invalidJobType),
[]int64{},
[]int64{},
},
{
fmt.Sprintf("jobs?status=running&type=%d", jobspb.TypeBackup),
[]int64{2},
[]int64{},
},
}

testutils.RunTrueAndFalse(t, "isAdmin", func(t *testing.T, isAdmin bool) {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
_ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
Expand Down
22 changes: 22 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand Down Expand Up @@ -133,6 +135,7 @@ type SQLServer struct {
sqlInstanceProvider sqlinstance.Provider
metricsRegistry *metric.Registry
diagnosticsReporter *diagnostics.Reporter
spanconfigMgr spanconfig.Manager

// settingsWatcher is utilized by secondary tenants to watch for settings
// changes. It is nil on the system tenant.
Expand Down Expand Up @@ -811,6 +814,20 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
execCfg.MigrationTestingKnobs = knobs
}

// Instantiate a span config manager; it exposes a hook to idempotently
// create the span config reconciliation job and captures all relevant job
// dependencies.
knobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
spanconfigMgr := spanconfigmanager.New(
cfg.db,
jobRegistry,
cfg.circularInternalExecutor,
cfg.stopper,
cfg.Settings,
knobs,
)
execCfg.SpanConfigReconciliationJobDeps = spanconfigMgr

temporaryObjectCleaner := sql.NewTemporaryObjectCleaner(
cfg.Settings,
cfg.db,
Expand Down Expand Up @@ -871,6 +888,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
sqlInstanceProvider: cfg.sqlInstanceProvider,
metricsRegistry: cfg.registry,
diagnosticsReporter: reporter,
spanconfigMgr: spanconfigMgr,
settingsWatcher: settingsWatcher,
}, nil
}
Expand Down Expand Up @@ -1009,6 +1027,10 @@ func (s *SQLServer) preStart(
return err
}

if err := s.spanconfigMgr.Start(ctx); err != nil {
return err
}

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down
15 changes: 15 additions & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "spanconfig",
srcs = [
"spanconfig.go",
"testing_knobs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/settings",
],
)
Loading

0 comments on commit 5e72733

Please sign in to comment.