Skip to content

Commit

Permalink
jobs: introduce job_type column to system.jobs
Browse files Browse the repository at this point in the history
This change introduces an upgrade to add the `job_type` column to
`system.jobs` as a nullable string. This upgrade also creates an
index for the column and backfills it.

With this change, new jobs being inserted into the jobs table by
the jobs registry will populate the type column.

To make migrations easier, this change adds a new builtin,
`crdb_internal.job_payload_type`, which can be used to determine
the type of a jobs payload.

Release note: None
  • Loading branch information
jayshrivastava committed Dec 12, 2022
1 parent 4b3f582 commit c5d1f7a
Show file tree
Hide file tree
Showing 19 changed files with 520 additions and 27 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-14 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-16 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.2-14</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.2-16</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,8 @@ available replica will error.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="array_to_json"></a><code>array_to_json(array: anyelement[], pretty_bool: <a href="bool.html">bool</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>Returns the array as JSON or JSONB.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.job_payload_type"></a><code>crdb_internal.job_payload_type(data: <a href="bytes.html">bytes</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Reads the type from the jobspb.Payload protocol message.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.json_to_pb"></a><code>crdb_internal.json_to_pb(pbname: <a href="string.html">string</a>, json: jsonb) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>Convert JSONB data to protocol message bytes</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.pb_to_json"></a><code>crdb_internal.pb_to_json(pbname: <a href="string.html">string</a>, data: <a href="bytes.html">bytes</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>Converts protocol message to its JSONB representation.</p>
Expand Down
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ const (
// ids in the role_members system table have been backfilled.
V23_1RoleMembersIDColumnsBackfilled

// V23_1AddTypeColumnToJobsTable adds the type column to the
// system.jobs table and backfills it.
V23_1AddTypeColumnToJobsTable

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -604,6 +608,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1RoleMembersIDColumnsBackfilled,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 14},
},
{
Key: V23_1AddTypeColumnToJobsTable,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 16},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/multitenant",
Expand Down
54 changes: 54 additions & 0 deletions pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ func DetailsType(d isPayload_Details) Type {
}
}

// ForEachType executes f for each job Type.
func ForEachType(f func(typ Type), includeTypeUnspecified bool) {
start := TypeBackup
if includeTypeUnspecified {
start = TypeUnspecified
}
for typ := start; typ < NumJobTypes; typ++ {
f(typ)
}
}

// JobDetailsForEveryJobType is an array of Details keyed by every job type,
// except for jobspb.TypeUnspecified.
var JobDetailsForEveryJobType = map[Type]Details{
TypeBackup: BackupDetails{},
TypeRestore: RestoreDetails{},
TypeSchemaChange: SchemaChangeDetails{},
TypeImport: ImportDetails{},
TypeChangefeed: ChangefeedDetails{},
TypeCreateStats: CreateStatsDetails{},
TypeAutoCreateStats: CreateStatsDetails{
Name: AutoStatsName,
},
TypeSchemaChangeGC: SchemaChangeGCDetails{},
TypeTypeSchemaChange: TypeSchemaChangeDetails{},
TypeStreamIngestion: StreamIngestionDetails{},
TypeNewSchemaChange: NewSchemaChangeDetails{},
TypeMigration: MigrationDetails{},
TypeAutoSpanConfigReconciliation: AutoSpanConfigReconciliationDetails{},
TypeAutoSQLStatsCompaction: AutoSQLStatsCompactionDetails{},
TypeStreamReplication: StreamReplicationDetails{},
TypeRowLevelTTL: RowLevelTTLDetails{},
TypeAutoSchemaTelemetry: SchemaTelemetryDetails{},
}

// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
// struct necessary to make it usable as the Details field of a Progress.
//
Expand Down Expand Up @@ -375,12 +410,31 @@ func (m *ChangefeedDetails) MarshalJSONPB(marshaller *jsonpb.Marshaler) ([]byte,
// DescRewriteMap maps old descriptor IDs to new descriptor and parent IDs.
type DescRewriteMap map[descpb.ID]*DescriptorRewrite

// assertDetailsMap asserts that the entries in JobDetailsForEveryJobType are correct.
func assertDetailsMap() {
if len(JobDetailsForEveryJobType) != NumJobTypes-1 {
panic("JobDetailsForEveryJobType does not have an entry for each Type")
}
ForEachType(
func(typ Type) {
payload := Payload{
Details: WrapPayloadDetails(JobDetailsForEveryJobType[typ]),
}
if typ != payload.Type() {
panic(fmt.Errorf("JobDetailsForEveryJobType has the incorrect entry for type %s", typ))
}
}, false,
)
}

func init() {
if len(Type_name) != NumJobTypes {
panic(fmt.Errorf("NumJobTypes (%d) does not match generated job type name map length (%d)",
NumJobTypes, len(Type_name)))
}

assertDetailsMap()

protoreflect.RegisterShorthands((*Progress)(nil), "progress")
protoreflect.RegisterShorthands((*Payload)(nil), "payload")
protoreflect.RegisterShorthands((*ScheduleDetails)(nil), "schedule", "schedule_details")
Expand Down
46 changes: 39 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand Down Expand Up @@ -378,8 +379,8 @@ func (r *Registry) batchJobInsertStmt(
ctx context.Context, sessionID sqlliveness.SessionID, records []*Record, modifiedMicros int64,
) (string, []interface{}, []jobspb.JobID, error) {
instanceID := r.ID()
const numColumns = 7
columns := [numColumns]string{`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`}
columns := []string{`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`, `job_type`}
numColumns := len(columns)
marshalPanic := func(m protoutil.Message) []byte {
data, err := protoutil.Marshal(m)
if err != nil {
Expand Down Expand Up @@ -408,7 +409,17 @@ func (r *Registry) batchJobInsertStmt(
progress.ModifiedMicros = modifiedMicros
return marshalPanic(&progress)
},
`job_type`: func(rec *Record) interface{} {
return (&jobspb.Payload{Details: jobspb.WrapPayloadDetails(rec.Details)}).Type().String()
},
}

// To run the upgrade below, migration and schema change jobs will need to be
// created using the old schema, which does not have the job_type column.
if !r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) {
numColumns -= 1
}

appendValues := func(rec *Record, vals *[]interface{}) (err error) {
defer func() {
switch r := recover(); r.(type) {
Expand All @@ -419,7 +430,8 @@ func (r *Registry) batchJobInsertStmt(
panic(r)
}
}()
for _, c := range columns {
for j := 0; j < numColumns; j++ {
c := columns[j]
*vals = append(*vals, valueFns[c](rec))
}
return nil
Expand All @@ -436,7 +448,7 @@ func (r *Registry) batchJobInsertStmt(
buf.WriteString(", ")
}
buf.WriteString("(")
for j := range columns {
for j := 0; j < numColumns; j++ {
if j > 0 {
buf.WriteString(", ")
}
Expand Down Expand Up @@ -473,6 +485,7 @@ func (r *Registry) CreateJobWithTxn(
if txn != nil {
start = txn.ReadTimestamp().GoTime()
}
jobType := j.mu.payload.Type()
j.mu.progress.ModifiedMicros = timeutil.ToUnixMicros(start)
payloadBytes, err := protoutil.Marshal(&j.mu.payload)
if err != nil {
Expand All @@ -482,9 +495,28 @@ func (r *Registry) CreateJobWithTxn(
if err != nil {
return nil, err
}
if _, err = j.registry.ex.Exec(ctx, "job-row-insert", txn, `
INSERT INTO system.jobs (id, status, payload, progress, claim_session_id, claim_instance_id)
VALUES ($1, $2, $3, $4, $5, $6)`, jobID, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(),

cols := [7]string{"id", "status", "payload", "progress", "claim_session_id", "claim_instance_id", "job_type"}
numCols := len(cols)
vals := [7]interface{}{jobID, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(), jobType.String()}
placeholders := func() string {
var p strings.Builder
for i := 0; i < numCols; i++ {
if i > 0 {
p.WriteByte(',')
}
p.WriteByte('$')
p.WriteString(strconv.Itoa(i + 1))
}
return p.String()
}
// To run the upgrade below, migration and schema change jobs will need
// to be created using the old schema of the jobs table.
if !r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) {
numCols -= 1
}
insertStmt := fmt.Sprintf(`INSERT INTO system.jobs (%s) VALUES (%s)`, strings.Join(cols[:numCols], ","), placeholders())
if _, err = j.registry.ex.Exec(ctx, "job-row-insert", txn, insertStmt, vals[:numCols]...,
); err != nil {
return nil, err
}
Expand Down
24 changes: 19 additions & 5 deletions pkg/sql/catalog/systemschema/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ CREATE TABLE system.jobs (
claim_instance_id INT8,
num_runs INT8,
last_run TIMESTAMP,
job_type STRING,
CONSTRAINT "primary" PRIMARY KEY (id),
INDEX (status, created),
INDEX (created_by_type, created_by_id) STORING (status),
Expand All @@ -216,7 +217,8 @@ CREATE TABLE system.jobs (
created
) STORING(last_run, num_runs, claim_instance_id)
WHERE ` + JobsRunStatsIdxPredicate + `,
FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id),
INDEX jobs_job_type_idx (job_type),
FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id, job_type),
FAMILY progress (progress),
FAMILY claim (claim_session_id, claim_instance_id, num_runs, last_run)
);`
Expand Down Expand Up @@ -1401,16 +1403,17 @@ var (
{Name: "claim_instance_id", ID: 9, Type: types.Int, Nullable: true},
{Name: "num_runs", ID: 10, Type: types.Int, Nullable: true},
{Name: "last_run", ID: 11, Type: types.Timestamp, Nullable: true},
{Name: "job_type", ID: 12, Type: types.String, Nullable: true},
},
[]descpb.ColumnFamilyDescriptor{
{
// NB: We are using family name that existed prior to adding created_by_type and
// created_by_id columns. This is done to minimize and simplify migration work
// NB: We are using family name that existed prior to adding created_by_type,
// created_by_id, and job_type columns. This is done to minimize and simplify migration work
// that needed to be done.
Name: "fam_0_id_status_created_payload",
ID: 0,
ColumnNames: []string{"id", "status", "created", "payload", "created_by_type", "created_by_id"},
ColumnIDs: []descpb.ColumnID{1, 2, 3, 4, 6, 7},
ColumnNames: []string{"id", "status", "created", "payload", "created_by_type", "created_by_id", "job_type"},
ColumnIDs: []descpb.ColumnID{1, 2, 3, 4, 6, 7, 12},
},
{
Name: "progress",
Expand Down Expand Up @@ -1462,6 +1465,17 @@ var (
Version: descpb.StrictIndexColumnIDGuaranteesVersion,
Predicate: JobsRunStatsIdxPredicate,
},
descpb.IndexDescriptor{
Name: "jobs_job_type_idx",
ID: 5,
Unique: false,
KeyColumnNames: []string{"job_type"},
KeyColumnDirections: []catpb.IndexColumn_Direction{catpb.IndexColumn_ASC},
KeyColumnIDs: []descpb.ColumnID{12},
KeySuffixColumnIDs: []descpb.ColumnID{1},
Version: descpb.StrictIndexColumnIDGuaranteesVersion,
},
// TODO add an index here?
))

// WebSessions table to authenticate sessions over stateless connections.
Expand Down
Loading

0 comments on commit c5d1f7a

Please sign in to comment.