-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jobs: introduce job_type column to system.jobs #93144
Conversation
2c95cb9
to
e627b28
Compare
fmt.Println("expected") | ||
|
||
fmt.Println(expected) | ||
fmt.Println("found") | ||
|
||
fmt.Println(found) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: delete these print statements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 12 of 19 files at r1, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @dt, and @jayshrivastava)
pkg/clusterversion/cockroach_versions.go
line 349 at r1 (raw file):
// job rows created prior to V23_1AddTypeColumnToJobsTable. V23_1BackfillTypeColumnInJobsTable
Just leaving this as I read the code: I'm not sure why we need to have 2 versions;
can't this be done with alter table/update records?
pkg/jobs/registry.go
line 383 at r1 (raw file):
instanceID := r.ID() var numColumns = 7 columns := make([]string, 7, 8)
it's a bit sad that we have to allocate dynamic slice whereas before we had static array.
You could have
columns := [8]{seven elements are initialized only}
numColums = 7
.. if version Okay {
numColumns = 8
columns[7] = job type
}
```
pkg/jobs/registry.go
line 423 at r1 (raw file):
columns = append(columns, `type`) valueFns[`type`] = func(rec *Record) interface{} { return (&jobspb.Payload{Details: jobspb.WrapPayloadDetails(rec.Details)}).Type().String()
Have you considered having job_type column be non-null, with DEFAULT crdb_internal.payload_type(payload)
? Basically, have this column computed?
I understand there might be some concern that calling this funciton is "more work" (i.e. we'd need to unmarshal proto we just marshaled) -- and maybe it's a real concern -- but I think it probably won't matter much and might make the code simpler/smaller?
@ajwerner , wdyt?
pkg/jobs/registry.go
line 517 at r1 (raw file):
if _, err = j.registry.ex.Exec(ctx, "job-row-insert", txn, ` INSERT INTO system.jobs (id, status, payload, progress, claim_session_id, claim_instance_id, type) VALUES ($1, $2, $3, $4, $5, $6, $7)`, jobID, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(), jobType.String(),
wdyt about:
cols = []string{id, status, payload, progress, claim_session_id, claim_instance_id}
vals = []interface{}{jobID, StatusRunnnng, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID()}
placeholders := func() string {
var p strings.Builder
for i := range vals {
if i > 0 {
p.WriteByte(',')
}
p.WriteByte('$')
p.WriteString(strconv.Itoa(i+1))
}
return p.String()
}
if r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) {
cols = append(cols, "type")
vals = append(vals, jobType.String())
}
insertQuery = fmt.Sprintf(`INSERT INTO system.jobs (%s) VALUES (%s)`, strings.Join(cols, ","), placeholders())
// Execute query ...
pkg/sql/sem/builtins/builtins.go
line 4033 at r1 (raw file):
}()...), "crdb_internal.payload_type": makeBuiltin(
nit: perhaps be explicit and name the function job_payload_type()?
pkg/sql/sem/builtins/builtins.go
line 4044 at r1 (raw file):
return nil, pgerror.Wrap(err, pgcode.InvalidParameterValue, "invalid protocol message") } typ := msg.(*jobspb.Payload).Type().String()
you might be better off not using protoreflect package at all (which really exists so that you can create
protocol message of appropriate type, given type name). You don't need it, because you know exactly the type of the message you expect. So, something like this:
var msg jobspb.Payload
if if err := protoutil.Unmarshal(data, &msg); err != nil {
return nil, err...
}
return tree.NewDString(msg.Type().String()), nil
pkg/sql/sem/builtins/builtins.go
line 4051 at r1 (raw file):
{ Info: info, Volatility: volatility,
honestly, I'd just inline info, volatility, and return type here. It's
pretty idiomatic in this file; also, why have variables when these are not
variables?
pkg/upgrade/upgrades/alter_jobs_add_job_type.go
line 25 at r1 (raw file):
Previously, dt (David Taylor) wrote…
drive-by: what would you think about calling it
job_type
since type is a reserved word in some/many places?that's what the revamp RFC proposed here: https://github.com/cockroachdb/cockroach/pull/82638/files#diff-d65357c15cffdb009c1b3fd0f6456265d25b94758f326207c531b8f734b59164R126
Seconded.
pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go
line 93 at r1 (raw file):
record := jobs.Record{ Details: jobspb.JobDetailsForEveryJobType[typ], Progress: jobspb.ImportProgress{},
is this safe to do? Can this job start running (and likely crash)?
Or is it safe to do because the jobs system is not running yet?
pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go
line 154 at r1 (raw file):
} func getDeprecatedJobsTableDescriptor() *descpb.TableDescriptor {
nit: it's not really deprecated, but I see what you mean. Maybe call it getJobsTableDescriptorPrioToV222_....
-- quite a bit of a mouth full, but very descriptive.
Up to you.
7996c25
to
6963965
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @andyyang890, @dt, and @miretskiy)
pkg/clusterversion/cockroach_versions.go
line 349 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Just leaving this as I read the code: I'm not sure why we need to have 2 versions;
can't this be done with alter table/update records?
Sure. We can do everything in one upgrade.
pkg/jobs/registry.go
line 383 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
it's a bit sad that we have to allocate dynamic slice whereas before we had static array.
You could havecolumns := [8]{seven elements are initialized only} numColums = 7 .. if version Okay { numColumns = 8 columns[7] = job type } ```
Done. How do you feel about defaulting to numColumns=8
and subtracting 1 if the version is old? The most common path is the one where the version is above the one with my upgrade. In the most common path, we can avoid the if branch.
pkg/jobs/registry.go
line 423 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Have you considered having job_type column be non-null, with
DEFAULT crdb_internal.payload_type(payload)
? Basically, have this column computed?I understand there might be some concern that calling this funciton is "more work" (i.e. we'd need to unmarshal proto we just marshaled) -- and maybe it's a real concern -- but I think it probably won't matter much and might make the code simpler/smaller?
@ajwerner , wdyt?
I considered both options and thought my way was better. I'm open to changing it though.
In createJobsInBatchWithTxn
, we do (&jobspb.Payload{Details: jobspb.WrapPayloadDetails(rec.Details)}).Type().String()
, which just allocates some nested structs. In CreateJobWithTxn
, we do even less work - the job type is in scope and we insert jobType.String()
directly.
If we use, DEFAULT crdb_internal.payload_type(payload)
, then we would have to decode payload bytes (which we just encoded when inserting the row) every time we insert into the jobs table.
pkg/jobs/registry.go
line 517 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
wdyt about:
cols = []string{id, status, payload, progress, claim_session_id, claim_instance_id} vals = []interface{}{jobID, StatusRunnnng, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID()} placeholders := func() string { var p strings.Builder for i := range vals { if i > 0 { p.WriteByte(',') } p.WriteByte('$') p.WriteString(strconv.Itoa(i+1)) } return p.String() } if r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) { cols = append(cols, "type") vals = append(vals, jobType.String()) } insertQuery = fmt.Sprintf(`INSERT INTO system.jobs (%s) VALUES (%s)`, strings.Join(cols, ","), placeholders()) // Execute query ...
Done. I like this. Again, I went with defaulting to more columns and doing numCols -= 1
in the if condition.
pkg/sql/sem/builtins/builtins.go
line 4044 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
you might be better off not using protoreflect package at all (which really exists so that you can create
protocol message of appropriate type, given type name). You don't need it, because you know exactly the type of the message you expect. So, something like this:var msg jobspb.Payload if if err := protoutil.Unmarshal(data, &msg); err != nil { return nil, err... } return tree.NewDString(msg.Type().String()), nil
Done.
pkg/sql/sem/builtins/builtins.go
line 4051 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
honestly, I'd just inline info, volatility, and return type here. It's
pretty idiomatic in this file; also, why have variables when these are not
variables?
Done.
pkg/upgrade/upgrades/alter_jobs_add_job_type.go
line 55 at r1 (raw file):
Previously, andyyang890 (Andy Yang) wrote…
You could put these into a for loop so that you only need to write the code calling
migrateTable
once.
Done.
pkg/upgrade/upgrades/alter_jobs_add_job_type.go
line 70 at r1 (raw file):
Previously, andyyang890 (Andy Yang) wrote…
ie := d.InternalExecutorFactory.MakeInternalExecutorWithoutTxn() _, err := ie.Exec(ctx, "backfill-jobs-type-column", nil /* txn */, backfillTypeColumnStmt, username.RootUser)
Done.
pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go
line 93 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
is this safe to do? Can this job start running (and likely crash)?
Or is it safe to do because the jobs system is not running yet?
It doesn't run. CreateJobWithTxn
will not start the job. That's probably why this code doesn't cause issues. I'm okay with leaving this as is.
3a730b5
to
7a71887
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @andyyang890, @dt, and @miretskiy)
pkg/upgrade/upgrades/alter_jobs_add_job_type.go
line 65 at r2 (raw file):
ie := d.InternalExecutorFactory.MakeInternalExecutorWithoutTxn() _, err := ie.Exec(ctx, "backfill-jobs-type-column", nil /* txn */, backfillTypeColumnStmt, username.RootUser)
@andyyang890 Brought up a good point about putting both the backfill and schema change in the same migration. What happens in between the backfill ending and setting new cluster version? If we allow writes to the jobs table during this time (which I'm not sure we do), something could insert a job with a NULL
job_type
which would never be backfilled. Using two separate upgrades avoids this. cc @ajwerner @miretskiy
7a71887
to
0bed397
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 15 files at r2.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @andyyang890, @dt, and @jayshrivastava)
pkg/jobs/registry.go
line 383 at r1 (raw file):
Previously, jayshrivastava (Jayant Shrivastava) wrote…
Done. How do you feel about defaulting to
numColumns=8
and subtracting 1 if the version is old? The most common path is the one where the version is above the one with my upgrade. In the most common path, we can avoid the if branch.
fine.
pkg/jobs/registry.go
line 382 at r3 (raw file):
) (string, []interface{}, []jobspb.JobID, error) { instanceID := r.ID() numColumns := 8
perhaps init columns, and then set numColumns := len(columns)
pkg/jobs/registry.go
line 418 at r3 (raw file):
// To run the upgrade below migration and schema change jobs will need // to be created using the old schema of the jobs table.
perhaps be specify and say ... "old schema, which does not have job_type column, ..."
pkg/jobs/registry.go
line 499 at r3 (raw file):
} numCols := 7
dito -- set numCols := len(cols)
pkg/upgrade/upgrades/alter_jobs_add_job_type.go
line 65 at r2 (raw file):
If we allow writes to the jobs table during this time (which I'm not sure we do), something could insert a job with a NULL job_type which would never be backfilled. Using two separate upgrades avoids this
hmmm... I don't think migration precludes another node from creating new jobs; Perhaps the code should assume that job_type can be null -- which, tbh, is not a big deal, I think. Possible impact to some of the "show" jobs.
@ajwerner what say you?
pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go
line 93 at r1 (raw file):
Previously, jayshrivastava (Jayant Shrivastava) wrote…
It doesn't run.
CreateJobWithTxn
will not start the job. That's probably why this code doesn't cause issues. I'm okay with leaving this as is.
Right, I understand CreateJobWithTxn does not start the job -- but that does not mean
that job registry is not running, and would not pick up this job immediately upon creation --
which will make this test flaky as it might crash the process.
You almost certainly want to get access to this node jobs registry, and set TestingResumerCreationKnobs
for all job types to be your own resumer (that does nothing, or close to nothing).
0bed397
to
c5d1f7a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @andyyang890, @dt, and @miretskiy)
pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go
line 93 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Right, I understand CreateJobWithTxn does not start the job -- but that does not mean
that job registry is not running, and would not pick up this job immediately upon creation --
which will make this test flaky as it might crash the process.You almost certainly want to get access to this node jobs registry, and set TestingResumerCreationKnobs
for all job types to be your own resumer (that does nothing, or close to nothing).
Done.
0ab1740
to
39e38d9
Compare
1837c54
to
7a30681
Compare
bors r+ |
Build failed (retrying...): |
Build failed (retrying...): |
Build failed (retrying...): |
93144: jobs: introduce job_type column to system.jobs r=jayshrivastava a=jayshrivastava This change introduces an upgrade to add the `job_type` column to `system.jobs` as a nullable string. This upgrade also creates an index for the column and backfills it. With this change, new jobs being inserted into the jobs table by the jobs registry will populate the type column. To make migrations easier, this change adds a new builtin, `crdb_internal.job_payload_type`, which can be used to determine the type of a jobs payload. Release note: None Epic: https://cockroachlabs.atlassian.net/browse/CRDB-19709 Informs: #92261 Co-authored-by: Jayant Shrivastava <[email protected]>
Build failed: |
bors r- |
c574a76
to
c91672c
Compare
bors r+ |
This PR was included in a batch that was canceled, it will be automatically retried |
Build failed (retrying...): |
Build failed (retrying...): |
Build failed (retrying...): |
bors r- Breaking CI |
Canceled. |
This change introduces an upgrade to add the `job_type` column to `system.jobs` as a nullable string. This upgrade also creates an index for the column and backfills it. With this change, new jobs being inserted into the jobs table by the jobs registry will populate the `job_type` column. To make migrations easier, this change adds a new builtin, `crdb_internal.job_payload_type`, which can be used to determine the type of a jobs payload. Release note: None
c91672c
to
819d0aa
Compare
bors r+ |
Build succeeded: |
This change introduces an upgrade to add the
job_type
column tosystem.jobs
as a nullable string. This upgrade also creates anindex for the column and backfills it.
With this change, new jobs being inserted into the jobs table by
the jobs registry will populate the type column.
To make migrations easier, this change adds a new builtin,
crdb_internal.job_payload_type
, which can be used to determinethe type of a jobs payload.
Release note: None
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-19709
Informs: #92261