Skip to content

Commit

Permalink
jobs: introduce type column to system.jobs
Browse files Browse the repository at this point in the history
This change introduces two upgrades. The first upgrade introduces
the type column to system.jobs as a nullable string column, along
with an index.

With this change, new jobs being inserted into the jobs table by
the jobs registry will populate the type column. The 2nd upgrade
backfills the type of every previous job in the jobs table.

To make migrations easier, this change adds a new builtin,
`crdb_internal.payload_type`, which can be used to determine
the type of a jobs payload.

Release note: None
  • Loading branch information
jayshrivastava committed Dec 6, 2022
1 parent 916ca31 commit e627b28
Show file tree
Hide file tree
Showing 19 changed files with 527 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-10 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-14 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.2-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.2-14</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,8 @@ available replica will error.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.json_to_pb"></a><code>crdb_internal.json_to_pb(pbname: <a href="string.html">string</a>, json: jsonb) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>Convert JSONB data to protocol message bytes</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.payload_type"></a><code>crdb_internal.payload_type(data: <a href="bytes.html">bytes</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Reads the type from the jobspb.Payload protocol message.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.pb_to_json"></a><code>crdb_internal.pb_to_json(pbname: <a href="string.html">string</a>, data: <a href="bytes.html">bytes</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>Converts protocol message to its JSONB representation.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.pb_to_json"></a><code>crdb_internal.pb_to_json(pbname: <a href="string.html">string</a>, data: <a href="bytes.html">bytes</a>, emit_defaults: <a href="bool.html">bool</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>Converts protocol message to its JSONB representation.</p>
Expand Down
17 changes: 17 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ const (
// V23_1_CreateSystemJobInfoTable creates the system.job_info table.
V23_1CreateSystemJobInfoTable

// V23_1AddTypeColumnToJobsTable adds the type column to the
// system.jobs table.
V23_1AddTypeColumnToJobsTable

// V23_1BackfillTypeColumnInJobsTable backfills values for the
// type column in the system.jobs table. This is relevant for
// job rows created prior to V23_1AddTypeColumnToJobsTable.
V23_1BackfillTypeColumnInJobsTable

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -584,6 +593,14 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1CreateSystemJobInfoTable,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 10},
},
{
Key: V23_1AddTypeColumnToJobsTable,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 12},
},
{
Key: V23_1BackfillTypeColumnInJobsTable,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 14},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/multitenant",
Expand Down
54 changes: 54 additions & 0 deletions pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ func DetailsType(d isPayload_Details) Type {
}
}

// ForEachType executes f for each job Type.
func ForEachType(f func(typ Type), includeTypeUnspecified bool) {
start := TypeBackup
if includeTypeUnspecified {
start = TypeUnspecified
}
for typ := start; typ < NumJobTypes; typ++ {
f(typ)
}
}

// JobDetailsForEveryJobType is an array of Details keyed by every job type,
// except for jobspb.TypeUnspecified.
var JobDetailsForEveryJobType = map[Type]Details{
TypeBackup: BackupDetails{},
TypeRestore: RestoreDetails{},
TypeSchemaChange: SchemaChangeDetails{},
TypeImport: ImportDetails{},
TypeChangefeed: ChangefeedDetails{},
TypeCreateStats: CreateStatsDetails{},
TypeAutoCreateStats: CreateStatsDetails{
Name: AutoStatsName,
},
TypeSchemaChangeGC: SchemaChangeGCDetails{},
TypeTypeSchemaChange: TypeSchemaChangeDetails{},
TypeStreamIngestion: StreamIngestionDetails{},
TypeNewSchemaChange: NewSchemaChangeDetails{},
TypeMigration: MigrationDetails{},
TypeAutoSpanConfigReconciliation: AutoSpanConfigReconciliationDetails{},
TypeAutoSQLStatsCompaction: AutoSQLStatsCompactionDetails{},
TypeStreamReplication: StreamReplicationDetails{},
TypeRowLevelTTL: RowLevelTTLDetails{},
TypeAutoSchemaTelemetry: SchemaTelemetryDetails{},
}

// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
// struct necessary to make it usable as the Details field of a Progress.
//
Expand Down Expand Up @@ -375,12 +410,31 @@ func (m *ChangefeedDetails) MarshalJSONPB(marshaller *jsonpb.Marshaler) ([]byte,
// DescRewriteMap maps old descriptor IDs to new descriptor and parent IDs.
type DescRewriteMap map[descpb.ID]*DescriptorRewrite

// assertDetailsMap asserts that the entries in JobDetailsForEveryJobType are correct.
func assertDetailsMap() {
if len(JobDetailsForEveryJobType) != NumJobTypes-1 {
panic("JobDetailsForEveryJobType does not have an entry for each Type")
}
ForEachType(
func(typ Type) {
payload := Payload{
Details: WrapPayloadDetails(JobDetailsForEveryJobType[typ]),
}
if typ != payload.Type() {
panic(fmt.Errorf("JobDetailsForEveryJobType has the incorrect entry for type %s", typ))
}
}, false,
)
}

func init() {
if len(Type_name) != NumJobTypes {
panic(fmt.Errorf("NumJobTypes (%d) does not match generated job type name map length (%d)",
NumJobTypes, len(Type_name)))
}

assertDetailsMap()

protoreflect.RegisterShorthands((*Progress)(nil), "progress")
protoreflect.RegisterShorthands((*Payload)(nil), "payload")
protoreflect.RegisterShorthands((*ScheduleDetails)(nil), "schedule", "schedule_details")
Expand Down
36 changes: 33 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand Down Expand Up @@ -378,8 +379,10 @@ func (r *Registry) batchJobInsertStmt(
ctx context.Context, sessionID sqlliveness.SessionID, records []*Record, modifiedMicros int64,
) (string, []interface{}, []jobspb.JobID, error) {
instanceID := r.ID()
const numColumns = 7
columns := [numColumns]string{`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`}
var numColumns = 7
columns := make([]string, 7, 8)
columns[0], columns[1], columns[2], columns[3], columns[4], columns[5], columns[6] =
`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`
marshalPanic := func(m protoutil.Message) []byte {
data, err := protoutil.Marshal(m)
if err != nil {
Expand Down Expand Up @@ -409,6 +412,18 @@ func (r *Registry) batchJobInsertStmt(
return marshalPanic(&progress)
},
}

// To run the upgrade below migration and schema change jobs will need
// to be created using the old schema of the jobs table.
if !r.settings.Version.ActiveVersion(ctx).Less(clusterversion.ByKey(
clusterversion.V23_1AddTypeColumnToJobsTable)) {
numColumns += 1
columns = append(columns, `type`)
valueFns[`type`] = func(rec *Record) interface{} {
return (&jobspb.Payload{Details: jobspb.WrapPayloadDetails(rec.Details)}).Type().String()
}
}

appendValues := func(rec *Record, vals *[]interface{}) (err error) {
defer func() {
switch r := recover(); r.(type) {
Expand Down Expand Up @@ -473,6 +488,7 @@ func (r *Registry) CreateJobWithTxn(
if txn != nil {
start = txn.ReadTimestamp().GoTime()
}
jobType := j.mu.payload.Type()
j.mu.progress.ModifiedMicros = timeutil.ToUnixMicros(start)
payloadBytes, err := protoutil.Marshal(&j.mu.payload)
if err != nil {
Expand All @@ -482,9 +498,23 @@ func (r *Registry) CreateJobWithTxn(
if err != nil {
return nil, err
}
if _, err = j.registry.ex.Exec(ctx, "job-row-insert", txn, `

// To run the upgrade below migration and schema change jobs will need
// to be created using the old schema of the jobs table.
if r.settings.Version.ActiveVersion(ctx).Less(clusterversion.ByKey(
clusterversion.V23_1AddTypeColumnToJobsTable)) {
if _, err = j.registry.ex.Exec(ctx, "job-row-insert", txn, `
INSERT INTO system.jobs (id, status, payload, progress, claim_session_id, claim_instance_id)
VALUES ($1, $2, $3, $4, $5, $6)`, jobID, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(),
); err != nil {
return nil, err
}
return j, nil
}

if _, err = j.registry.ex.Exec(ctx, "job-row-insert", txn, `
INSERT INTO system.jobs (id, status, payload, progress, claim_session_id, claim_instance_id, type)
VALUES ($1, $2, $3, $4, $5, $6, $7)`, jobID, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(), jobType.String(),
); err != nil {
return nil, err
}
Expand Down
26 changes: 21 additions & 5 deletions pkg/sql/catalog/systemschema/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ CREATE TABLE system.jobs (
claim_instance_id INT8,
num_runs INT8,
last_run TIMESTAMP,
type STRING,
CONSTRAINT "primary" PRIMARY KEY (id),
INDEX (status, created),
INDEX (created_by_type, created_by_id) STORING (status),
Expand All @@ -206,7 +207,8 @@ CREATE TABLE system.jobs (
created
) STORING(last_run, num_runs, claim_instance_id)
WHERE ` + JobsRunStatsIdxPredicate + `,
FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id),
INDEX jobs_type_idx (type),
FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id, type),
FAMILY progress (progress),
FAMILY claim (claim_session_id, claim_instance_id, num_runs, last_run)
);`
Expand Down Expand Up @@ -1349,16 +1351,17 @@ var (
{Name: "claim_instance_id", ID: 9, Type: types.Int, Nullable: true},
{Name: "num_runs", ID: 10, Type: types.Int, Nullable: true},
{Name: "last_run", ID: 11, Type: types.Timestamp, Nullable: true},
{Name: "type", ID: 12, Type: types.String, Nullable: true},
},
[]descpb.ColumnFamilyDescriptor{
{
// NB: We are using family name that existed prior to adding created_by_type and
// created_by_id columns. This is done to minimize and simplify migration work
// NB: We are using family name that existed prior to adding created_by_type,
// created_by_id, and type columns. This is done to minimize and simplify migration work
// that needed to be done.
Name: "fam_0_id_status_created_payload",
ID: 0,
ColumnNames: []string{"id", "status", "created", "payload", "created_by_type", "created_by_id"},
ColumnIDs: []descpb.ColumnID{1, 2, 3, 4, 6, 7},
ColumnNames: []string{"id", "status", "created", "payload", "created_by_type", "created_by_id", "type"},
ColumnIDs: []descpb.ColumnID{1, 2, 3, 4, 6, 7, 12},
},
{
Name: "progress",
Expand Down Expand Up @@ -1410,6 +1413,19 @@ var (
Version: descpb.StrictIndexColumnIDGuaranteesVersion,
Predicate: JobsRunStatsIdxPredicate,
},
descpb.IndexDescriptor{
Name: "jobs_type_idx",
ID: 5,
Unique: false,
KeyColumnNames: []string{"type"},
KeyColumnDirections: []catpb.IndexColumn_Direction{catpb.IndexColumn_ASC},
KeyColumnIDs: []descpb.ColumnID{12},
StoreColumnNames: []string{},
StoreColumnIDs: []descpb.ColumnID{},
KeySuffixColumnIDs: []descpb.ColumnID{1},
Version: descpb.StrictIndexColumnIDGuaranteesVersion,
},
// TODO add an index here?
))

// WebSessions table to authenticate sessions over stateless connections.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/catalog/systemschema_test/testdata/bootstrap
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ CREATE TABLE public.jobs (
INDEX jobs_status_created_idx (status ASC, created ASC),
INDEX jobs_created_by_type_created_by_id_idx (created_by_type ASC, created_by_id ASC) STORING (status),
INDEX jobs_run_stats_idx (claim_session_id ASC, status ASC, created ASC) STORING (last_run, num_runs, claim_instance_id) WHERE status IN ('running':::STRING, 'reverting':::STRING, 'pending':::STRING, 'pause-requested':::STRING, 'cancel-requested':::STRING),
FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id),
INDEX jobs_type_idx (type ASC),
FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id, type),
FAMILY progress (progress),
FAMILY claim (claim_session_id, claim_instance_id, num_runs, last_run)
);
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,7 @@ system public jobs num_runs
system public jobs payload 4
system public jobs progress 5
system public jobs status 2
system public jobs type 12
system public join_tokens expiration 3
system public join_tokens id 1
system public join_tokens secret 2
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,7 @@ indexrelid indrelid indnatts indisunique indisprimary indisexclusion indim
2621181441 15 3 false false false false false true false false true false 6 7 2 3403232968 0 0 0 2 2 NULL NULL 2
2621181443 15 1 true true false true false true false false true false 1 0 0 2 NULL NULL 1
2621181446 15 6 false false false false false true false false true false 8 2 3 11 10 9 0 3403232968 0 0 0 0 2 2 2 NULL status IN ('running'::STRING, 'reverting'::STRING, 'pending'::STRING, 'pause-requested'::STRING, 'cancel-requested'::STRING) 3
2621181447 15 1 false false false false false true false false true false 12 3403232968 0 2 NULL NULL 1
2667577107 31 1 true true false true false true false false true false 1 0 0 2 NULL NULL 1
2834522046 34 1 true true false true false true false false true false 1 0 0 2 NULL NULL 1
2880917710 50 2 true true false true false true false false true false 1 2 0 3403232968 0 0 2 2 NULL NULL 2
Expand Down Expand Up @@ -1246,6 +1247,7 @@ indexrelid operator_argument_type_oid operator_argument_position
2621181446 0 1
2621181446 0 2
2621181446 0 3
2621181447 0 1
2667577107 0 1
2834522046 0 1
2880917710 0 1
Expand Down
Loading

0 comments on commit e627b28

Please sign in to comment.