-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql: wait for one version lease if no job created for a descriptor #95620
sql: wait for one version lease if no job created for a descriptor #95620
Conversation
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
I promise there'll be tests failing like round trips and some tests relevant to DROP.... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @chengxiong-ruan)
pkg/sql/conn_executor_exec.go
line 1079 at r1 (raw file):
return err } }
I don't think this is where it belongs. I think it belongs around here:
cockroach/pkg/sql/conn_executor.go
Lines 3081 to 3085 in 4d3c36a
if err := ex.server.cfg.JobRegistry.Run( | |
ex.ctxHolder.connCtx, *ex.extraTxnState.jobs, | |
); err != nil { | |
handleErr(err) | |
} |
Code quote:
//If there is any descriptor has new version. We want to make sure there is
//only one version of the descriptor in all nodes. In schema changer jobs,
//`WaitForOneVersion` has been called for the descriptors included in jobs. So
//we just need to do this for descriptors not in jobs.
var descIDsInJobs catalog.DescriptorIDSet
for _, jobID := range *ex.extraTxnState.jobs {
job, err := ex.server.cfg.JobRegistry.LoadJob(ctx, jobID)
if err != nil {
return err
}
payload := job.Payload()
for _, descID := range payload.DescriptorIDs {
descIDsInJobs.Add(descID)
}
}
for _, idVersion := range withNewVersion {
if descIDsInJobs.Contains(idVersion.ID) {
continue
}
if _, err := WaitToUpdateLeases(ctx, ex.planner.LeaseMgr(), idVersion.ID); err != nil {
return err
}
}
7e09c3c
to
3fb41f8
Compare
Previously, ajwerner wrote…
thanks for pointing that out! Moved it there. |
977380b
to
829421d
Compare
I didn't come up with a good way of testing this, but given the amount of test failed and fixed, plus I tried this within the |
829421d
to
bb49416
Compare
pkg/sql/conn_executor.go
Outdated
return nil | ||
} | ||
|
||
func (ex *connExecutor) descIDsInJobs() (catalog.DescriptorIDSet, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know about this -- this is a very expensive thing to do.
I think we can do better. Namely, we buffer jobs we create with schema changes for a while. We can extract the desc IDs from that set and we can also extract a set from the declarative schema changer state if we create one, maybe. I'd like it to be the case that we don't go to the kv-store to decide what to wait on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. There is sometimes other jobs like query statistics. I think 2 options:
(1) How about we only do this if there is any new version descriptors. This would only affect schema changes with jobs.
(2) Cache job records in the extraTxnState
as well in addition to the job ids. In legacy schema changer it seems obvious that how job records are passed around. It's not clear to me how we can plumb the job record out of schema changer yet, but I can take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the declarative schema changer, all you need to know is whether a job was created or not.
bb49416
to
33791b4
Compare
33791b4
to
55cf28f
Compare
55cf28f
to
6760adb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is close. I want to tighten up the code around waiting. The stuff around jobs code is superficial.
pkg/sql/conn_executor.go
Outdated
j.uniqueJobsToCreate = make(map[descpb.ID]*jobs.Record) | ||
j.nonUniqueJobsToCreate = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear the map rather than making a new one. There's a compiler intrinsic, and nobody else has the map.
for id := range j.uniqueJobsToCreate {
delete(j.uniqueJobsToCreate, id)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
pkg/sql/conn_executor.go
Outdated
} | ||
|
||
func (j *jobsInfo) releaseJobsToRecreate() { | ||
j.uniqueJobsToCreate = make(map[descpb.ID]*jobs.Record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here about clearing the map as opposed to allocating a new one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
pkg/sql/conn_executor.go
Outdated
// Get descriptor IDs from legacy schema changer jobs. | ||
var descIDsInJobs catalog.DescriptorIDSet | ||
if err := ex.extraTxnState.jobsInfo.forEachJobToCreate(func(jobRecord *jobs.Record) error { | ||
for _, descID := range jobRecord.DescriptorIDs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this cause problems for other types of jobs than schema changes like gc jobs or changefeeds? can we make sure the job has a relevant type where we know there will be waiting? Maybe rename this function to reference schema change jobs, and describe on this function why those schema change jobs are special?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be good to describe in commentary here the semantics of this field for the different types of schema changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobs like gc and changefeeds are fine. I renamed it to descIDsInSchemaChangeJobs
as suggested and add more comments. I also move the iteration on DescriptorIDs
into the switch
block so that we only use these ids when it's actually a schema change job.
pkg/sql/conn_executor.go
Outdated
@@ -3687,3 +3759,56 @@ func init() { | |||
return planGistFromCtx(ctx) | |||
}) | |||
} | |||
|
|||
type jobsInfo struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can come up with a better name for this structure. txnJobsCollection
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, done.
pkg/sql/conn_executor.go
Outdated
// queued up for the given ID. The cache remains valid only for the current | ||
// transaction and it is cleared after the transaction is committed. | ||
schemaChangeJobRecords map[descpb.ID]*jobs.Record | ||
jobsInfo *jobsInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like jobs
is a good name for this field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
pkg/sql/conn_executor.go
Outdated
@@ -3687,3 +3759,56 @@ func init() { | |||
return planGistFromCtx(ctx) | |||
}) | |||
} | |||
|
|||
type jobsInfo struct { | |||
jobsCreated jobsCollection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: name this created
. It's obviously about jobs in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
pkg/sql/conn_executor.go
Outdated
|
||
type jobsInfo struct { | ||
jobsCreated jobsCollection | ||
uniqueJobsToCreate map[descpb.ID]*jobs.Record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: name this uniqueToCreate
and the other one nonUniqueToCreate
and use comments to explain what makes them different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
pkg/sql/conn_executor.go
Outdated
@@ -3113,6 +3112,79 @@ func (ex *connExecutor) handleWaitingForDescriptorIDGeneratorMigration(ctx conte | |||
return ex.resetTransactionOnSchemaChangeRetry(ctx) | |||
} | |||
|
|||
func (ex *connExecutor) waitOneVersionForNewVersionDescriptorsWithoutJobs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file is too big already. What do you think about moving all this job related code to a new file conn_executor_jobs.go
and moving the jobsCollection
and its friends to a file jobs_collection.go
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. moved them to new files.
pkg/sql/conn_executor.go
Outdated
func (j *jobsInfo) numJobsToCreate() int { | ||
return len(j.uniqueJobsToCreate) + len(j.nonUniqueJobsToCreate) | ||
} | ||
|
||
func (j *jobsInfo) hasJobsToCreate() bool { | ||
return j.numJobsToCreate() > 0 | ||
} | ||
|
||
func (j *jobsInfo) forEachJobToCreate(fn func(jobRecord *jobs.Record) error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i think you can remove the word jobs
from all these methods and improve readability. x.jobs.forEachToCreate
, x.jobs.numToCreate
, x.jobs.hasAnyToCreate
? That's just my taste.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
6760adb
to
0eda6bd
Compare
pkg/sql/jobs_collection.go
Outdated
nonUniqueToCreate []*jobs.Record | ||
} | ||
|
||
func newJobsInfo() *txnJobsCollection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename this to match the type
return catalog.DescriptorIDSet{}, err | ||
} | ||
|
||
// Get descriptor IDs with declarative schema changer jobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the best way to do this? I think it's fine, but would it be better if we were more explicit about it? For one, I don't know if HasConcurrentSchemaChanges
is what you want. I think it works, but it's brute force and relies on the loop above. Could we instead check on the declarative schema changer state and see if it has a job, and, if it does not, go and collect the descriptor IDs which have been modified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. My mistake that I didn't realize HasConcurrentSchemaChanges
also contains checks of legacy mutation jobs on tables. I added a check to see if there is a declarative schema change job. If there is a job, then see which descriptors are affected, but only check descriptors not included in the legacy jobs above.
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" | ||
) | ||
|
||
type txnJobsCollection struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment what this is used for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/sql/internal.go
Outdated
@@ -1007,9 +1004,7 @@ func (ie *InternalExecutor) execInternal( | |||
|
|||
// ReleaseSchemaChangeJobRecords is to release the schema change job records. | |||
func (ie *InternalExecutor) releaseSchemaChangeJobRecords() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kill this function altogether. Look at the one place it was called. That thing can just invoke this function.
pkg/sql/internal.go
Outdated
schemaChangerState *SchemaChangerState | ||
txn *kv.Txn | ||
descCollection *descs.Collection | ||
jobsInfo *txnJobsCollection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename this to jobs
pkg/sql/internal.go
Outdated
ie.extraTxnState.jobsInfo.reset() | ||
ie.releaseSchemaChangeJobRecords() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This relates to my above comment. IIUC, the reset
already makes the second function a no-op. This can now just be: defer ie.extraTxnState.jobs.reset()
pkg/sql/planner.go
Outdated
// in sql.connExecutor. sql.connExecutor.createJobs() enqueues jobs with these | ||
// records when transaction is committed. | ||
SchemaChangeJobRecords map[descpb.ID]*jobs.Record | ||
JobsInfo *txnJobsCollection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call this jobs
. it does not need to be exported.
28ad804
to
53b2e83
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after addressing the comments
"github.com/cockroachdb/errors" | ||
) | ||
|
||
func (ex *connExecutor) waitOneVersionForNewVersionDescriptorsWithoutJobs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give this a comment on why we want to do it / why it matters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
pkg/sql/jobs_collection.go
Outdated
return nil | ||
} | ||
|
||
func (j *txnJobsCollection) releaseToRecreate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing uses this, remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb" | ||
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also move jobsCollection
to this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
53b2e83
to
48252e8
Compare
48252e8
to
8731301
Compare
Previously we only wait for one version in jobs for descriptors performed with schema changes. The problem is that we don't always create jobs for all descriptors modified. We need to wait for one version for these descriptors too. This pr adds logic to wait if there is no jobs created for a descriptor has new version. Fixes: cockroachdb#90600 Release note: None
8731301
to
e0e57b5
Compare
TFTR! |
Build succeeded: |
Previously we only wait for one version in jobs for descriptors performed with schema changes. The problem is that we don't always create jobs for all descriptors modified. We need to wait for one version for these descriptors too. This pr adds logic to wait if there is no jobs created for a descriptor has new version.
Fixes: #90600
Release note: None