Skip to content

Commit

Permalink
sql: wait for one version lease if no job created for a descriptor
Browse files Browse the repository at this point in the history
Previously we only wait for one version in jobs for descriptors
performed with schema changes. The problem is that we don't always
create jobs for all descriptors modified. We need to wait for one
version for these descriptors too. This pr adds logic to wait if
there is no jobs created for a descriptor has new version.

Fixes: #90600

Release note: None
  • Loading branch information
chengxiong-ruan committed Jan 25, 2023
1 parent b84f0eb commit 0eda6bd
Show file tree
Hide file tree
Showing 18 changed files with 293 additions and 139 deletions.
12 changes: 6 additions & 6 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2324,7 +2324,7 @@ func TestJobInTxn(t *testing.T) {

// Accessed atomically.
var hasRun int32
var job *jobs.Job
var jobID jobspb.JobID

defer sql.ClearPlanHooks()
// Piggy back on BACKUP to be able to create a succeeding test job.
Expand All @@ -2338,7 +2338,7 @@ func TestJobInTxn(t *testing.T) {
}
fn := func(ctx context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error {
var err error
job, err = execCtx.ExtendedEvalContext().QueueJob(ctx, execCtx.InternalSQLTxn(), jobs.Record{
jobID = execCtx.ExtendedEvalContext().QueueJob(&jobs.Record{
Description: st.String(),
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
Expand Down Expand Up @@ -2378,7 +2378,7 @@ func TestJobInTxn(t *testing.T) {
}
fn := func(ctx context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error {
var err error
job, err = execCtx.ExtendedEvalContext().QueueJob(ctx, execCtx.InternalSQLTxn(), jobs.Record{
jobID = execCtx.ExtendedEvalContext().QueueJob(&jobs.Record{
Description: "RESTORE",
Details: jobspb.RestoreDetails{},
Progress: jobspb.RestoreProgress{},
Expand Down Expand Up @@ -2413,14 +2413,14 @@ func TestJobInTxn(t *testing.T) {
// If we rollback then the job should not run
require.NoError(t, txn.Rollback())
registry := s.JobRegistry().(*jobs.Registry)
_, err = registry.LoadJob(ctx, job.ID())
_, err = registry.LoadJob(ctx, jobID)
require.Error(t, err, "the job should not exist after the txn is rolled back")
require.True(t, jobs.HasJobNotFoundError(err))

sqlRunner := sqlutils.MakeSQLRunner(sqlDB)
// Just in case the job was scheduled let's wait for it to finish
// to avoid a race.
sqlRunner.Exec(t, "SHOW JOB WHEN COMPLETE $1", job.ID())
sqlRunner.Exec(t, "SHOW JOB WHEN COMPLETE $1", jobID)
require.Equal(t, int32(0), atomic.LoadInt32(&hasRun),
"job has run in transaction before txn commit")
require.True(t, timeutil.Since(start) < jobs.DefaultAdoptInterval, "job should have been adopted immediately")
Expand All @@ -2437,7 +2437,7 @@ func TestJobInTxn(t *testing.T) {
// Committing will block and wait for all jobs to run.
require.NoError(t, txn.Commit())
registry := s.JobRegistry().(*jobs.Registry)
j, err := registry.LoadJob(ctx, job.ID())
j, err := registry.LoadJob(ctx, jobID)
require.NoError(t, err, "queued job not found")
require.NotEqual(t, int32(0), atomic.LoadInt32(&hasRun),
"job scheduled in transaction did not run")
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"completions.go",
"conn_executor.go",
"conn_executor_exec.go",
"conn_executor_jobs.go",
"conn_executor_prepare.go",
"conn_executor_savepoints.go",
"conn_executor_show_commit_timestamp.go",
Expand Down Expand Up @@ -134,6 +135,7 @@ go_library(
"inverted_join.go",
"job_exec_context.go",
"job_exec_context_test_util.go",
"jobs_collection.go",
"join.go",
"join_predicate.go",
"join_token.go",
Expand Down
55 changes: 26 additions & 29 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand Down Expand Up @@ -1018,9 +1017,8 @@ func (s *Server) newConnExecutor(
ex.extraTxnState.descCollection = s.cfg.CollectionFactory.NewCollection(
ctx, descs.WithDescriptorSessionDataProvider(dsdp), descs.WithMonitor(ex.sessionMon),
)
ex.extraTxnState.jobs = new(jobsCollection)
ex.extraTxnState.jobs = newJobsInfo()
ex.extraTxnState.txnRewindPos = -1
ex.extraTxnState.schemaChangeJobRecords = make(map[descpb.ID]*jobs.Record)
ex.extraTxnState.schemaChangerState = &SchemaChangerState{
mode: ex.sessionData().NewSchemaChangerMode,
}
Expand Down Expand Up @@ -1275,18 +1273,7 @@ type connExecutor struct {
// descCollection collects descriptors used by the current transaction.
descCollection *descs.Collection

// jobs accumulates jobs staged for execution inside the transaction.
// Staging happens when executing statements that are implemented with a
// job. The jobs are staged via the function QueueJob in
// pkg/sql/planner.go. The staged jobs are executed once the transaction
// that staged them commits.
jobs *jobsCollection

// schemaChangeJobRecords is a map of descriptor IDs to job Records.
// Used in createOrUpdateSchemaChangeJob so we can check if a job has been
// queued up for the given ID. The cache remains valid only for the current
// transaction and it is cleared after the transaction is committed.
schemaChangeJobRecords map[descpb.ID]*jobs.Record
jobs *txnJobsCollection

// firstStmtExecuted indicates that the first statement inside this
// transaction has been executed.
Expand Down Expand Up @@ -1714,9 +1701,6 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
}
} else {
ex.extraTxnState.descCollection.ReleaseAll(ctx)
for k := range ex.extraTxnState.schemaChangeJobRecords {
delete(ex.extraTxnState.schemaChangeJobRecords, k)
}
ex.extraTxnState.jobs.reset()
ex.extraTxnState.schemaChangerState = &SchemaChangerState{
mode: ex.sessionData().NewSchemaChangerMode,
Expand Down Expand Up @@ -2824,15 +2808,14 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
DescIDGenerator: ex.getDescIDGenerator(),
RangeStatsFetcher: p.execCfg.RangeStatsFetcher,
},
Tracing: &ex.sessionTracing,
MemMetrics: &ex.memMetrics,
Descs: ex.extraTxnState.descCollection,
TxnModesSetter: ex,
Jobs: ex.extraTxnState.jobs,
SchemaChangeJobRecords: ex.extraTxnState.schemaChangeJobRecords,
statsProvider: ex.server.sqlStats,
indexUsageStats: ex.indexUsageStats,
statementPreparer: ex,
Tracing: &ex.sessionTracing,
MemMetrics: &ex.memMetrics,
Descs: ex.extraTxnState.descCollection,
TxnModesSetter: ex,
JobsInfo: ex.extraTxnState.jobs,
statsProvider: ex.server.sqlStats,
indexUsageStats: ex.indexUsageStats,
statementPreparer: ex,
}
evalCtx.copyFromExecCfg(ex.server.cfg)
}
Expand Down Expand Up @@ -3077,13 +3060,27 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}
ex.notifyStatsRefresherOfNewTables(ex.Ctx())

// If there is any descriptor has new version. We want to make sure there is
// only one version of the descriptor in all nodes. In schema changer jobs,
// `WaitForOneVersion` has been called for the descriptors included in jobs.
// So we just need to do this for descriptors not in jobs.
// We need to get descriptor IDs in jobs before jobs are run because we have
// operations in declarative schema changer removing descriptor IDs from job
// payload as it's done with the descriptors.
descIDsInJobs, err := ex.descIDsInSchemaChangeJobs()
if err != nil {
return advanceInfo{}, err
}
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionStartPostCommitJob, timeutil.Now())
if err := ex.server.cfg.JobRegistry.Run(
ex.ctxHolder.connCtx, *ex.extraTxnState.jobs,
ex.ctxHolder.connCtx, ex.extraTxnState.jobs.created,
); err != nil {
handleErr(err)
}
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionEndPostCommitJob, timeutil.Now())
if err := ex.waitOneVersionForNewVersionDescriptorsWithoutJobs(descIDsInJobs); err != nil {
return advanceInfo{}, err
}

fallthrough
case txnRestart, txnRollback:
Expand Down Expand Up @@ -3395,7 +3392,7 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
scs.state = after
scs.jobID = jobID
if jobID != jobspb.InvalidJobID {
ex.extraTxnState.jobs.add(jobID)
ex.extraTxnState.jobs.addCreatedJobID(jobID)
log.Infof(ctx, "queued new schema change job %d using the new schema changer", jobID)
}
return nil
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,20 +1059,23 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) error
// createJobs creates jobs for the records cached in schemaChangeJobRecords
// during this transaction.
func (ex *connExecutor) createJobs(ctx context.Context) error {
if len(ex.extraTxnState.schemaChangeJobRecords) == 0 {
if !ex.extraTxnState.jobs.hasAnyToCreate() {
return nil
}
var records []*jobs.Record
for _, record := range ex.extraTxnState.schemaChangeJobRecords {
records = append(records, record)
if err := ex.extraTxnState.jobs.forEachToCreate(func(jobRecord *jobs.Record) error {
records = append(records, jobRecord)
return nil
}); err != nil {
return err
}
jobIDs, err := ex.server.cfg.JobRegistry.CreateJobsWithTxn(
ctx, ex.planner.InternalSQLTxn(), records,
)
if err != nil {
return err
}
ex.planner.extendedEvalCtx.Jobs.add(jobIDs...)
ex.planner.extendedEvalCtx.JobsInfo.addCreatedJobID(jobIDs...)
return nil
}

Expand Down
109 changes: 109 additions & 0 deletions pkg/sql/conn_executor_jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/errors"
)

func (ex *connExecutor) waitOneVersionForNewVersionDescriptorsWithoutJobs(
descIDsInJobs catalog.DescriptorIDSet,
) error {
withNewVersion, err := ex.extraTxnState.descCollection.GetOriginalPreviousIDVersionsForUncommitted()
if err != nil {
return err
}
for _, idVersion := range withNewVersion {
if descIDsInJobs.Contains(idVersion.ID) {
continue
}
if _, err := WaitToUpdateLeases(ex.Ctx(), ex.planner.LeaseMgr(), idVersion.ID); err != nil {
// In most cases (normal schema changes), deleted descriptor should have
// been handled by jobs. So, normally we won't hit into the situation of
// wait for one version of a deleted descriptor. However, we need catch
// ErrDescriptorNotFound here because we have a special case of descriptor
// repairing where we delete descriptors directly and never record the ids
// in jobs payload or details.
if errors.Is(err, catalog.ErrDescriptorNotFound) {
continue
}
return err
}
}
return nil
}

// descIDsInSchemaChangeJobs returns all descriptor IDs with which schema change
// jobs in this transaction will perform. Within schema change jobs, we also
// wait until the whole cluster only has leases on the latest version of these
// descriptors, and we would like to also "wait for one version" for descriptors
// with new versions but not included in any schema change jobs.
func (ex *connExecutor) descIDsInSchemaChangeJobs() (catalog.DescriptorIDSet, error) {
// Get descriptor IDs from legacy schema changer jobs.
var descIDsInJobs catalog.DescriptorIDSet
if err := ex.extraTxnState.jobs.forEachToCreate(func(jobRecord *jobs.Record) error {
switch t := jobRecord.Details.(type) {
case *jobspb.Payload_SchemaChange:
// In most cases, the field DescriptorIDs contains descriptor IDs the
// schema change directly affects. Like it could be a table ID if an index
// is created, or it could be a list of schema IDs when dropping a group
// of schemas.
// But it can be confusing sometimes in two types of schema change jobs:
// (1) dropping a database:
// In this scenario, the DescriptorIDs is the ids of tables in this
// database that will be dropped together. And the DroppedDatabaseID field
// is the actual ID of the database that will be dropped.
// (2) any other changes on a database (but not drop):
// For example, when renaming a schema, database's list of all schemas
// need to be updated, and we create a job for this kind of database
// changes. DescriptorIDs is empty in this case and the DescID field in
// the job
// detail is the actual database ID.
for _, descID := range jobRecord.DescriptorIDs {
descIDsInJobs.Add(descID)
}
for _, tbl := range t.SchemaChange.DroppedTables {
descIDsInJobs.Add(tbl.ID)
}
for _, id := range t.SchemaChange.DroppedTypes {
descIDsInJobs.Add(id)
}
for _, id := range t.SchemaChange.DroppedSchemas {
descIDsInJobs.Add(id)
}
descIDsInJobs.Add(t.SchemaChange.DroppedDatabaseID)
descIDsInJobs.Add(t.SchemaChange.DescID)
}
return nil
}); err != nil {
return catalog.DescriptorIDSet{}, err
}

// Get descriptor IDs with declarative schema changer jobs.
withNewVersion, err := ex.extraTxnState.descCollection.GetOriginalPreviousIDVersionsForUncommitted()
if err != nil {
return catalog.DescriptorIDSet{}, err
}
for _, idVersion := range withNewVersion {
desc, err := ex.extraTxnState.descCollection.ByID(ex.state.mu.txn).Get().Desc(ex.Ctx(), idVersion.ID)
if err != nil {
return catalog.DescriptorIDSet{}, err
}
if desc.HasConcurrentSchemaChanges() {
continue
}
descIDsInJobs.Add(idVersion.ID)
}
return descIDsInJobs, nil
}
16 changes: 11 additions & 5 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,14 +1111,17 @@ func makeJobsTableRows(
}
defer cleanup(ctx)

sessionJobs := make([]*jobs.Record, 0, len(p.extendedEvalCtx.SchemaChangeJobRecords))
sessionJobs := make([]*jobs.Record, 0, p.extendedEvalCtx.JobsInfo.numToCreate())
uniqueJobs := make(map[*jobs.Record]struct{})
for _, job := range p.extendedEvalCtx.SchemaChangeJobRecords {
if err := p.extendedEvalCtx.JobsInfo.forEachToCreate(func(job *jobs.Record) error {
if _, ok := uniqueJobs[job]; ok {
continue
return nil
}
sessionJobs = append(sessionJobs, job)
uniqueJobs[job] = struct{}{}
return nil
}); err != nil {
return matched, err
}

// Loop while we need to skip a row.
Expand Down Expand Up @@ -5300,17 +5303,20 @@ func collectMarshaledJobMetadataMap(
if err := it.Close(); err != nil {
return nil, err
}
for _, record := range p.ExtendedEvalContext().SchemaChangeJobRecords {
if err := p.ExtendedEvalContext().JobsInfo.forEachToCreate(func(record *jobs.Record) error {
progressBytes, payloadBytes, err := getPayloadAndProgressFromJobsRecord(p, record)
if err != nil {
return nil, err
return err
}
mj := marshaledJobMetadata{
status: tree.NewDString(string(record.RunningStatus)),
payloadBytes: payloadBytes,
progressBytes: progressBytes,
}
m[record.JobID] = mj
return nil
}); err != nil {
return nil, err
}
return m, nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ func (p *planner) renameDatabase(
func (p *planner) writeNonDropDatabaseChange(
ctx context.Context, desc *dbdesc.Mutable, jobDesc string,
) error {
if err := p.createNonDropDatabaseChangeJob(ctx, desc.ID, jobDesc); err != nil {
return err
}
p.createNonDropDatabaseChangeJob(ctx, desc.ID, jobDesc)
b := p.Txn().NewBatch()
if err := p.writeDatabaseChangeToBatch(ctx, desc, b); err != nil {
return err
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/drop_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,14 @@ func (n *dropDatabaseNode) startExec(params runParams) error {
}
}

if err := p.createDropDatabaseJob(
p.createDropDatabaseJob(
ctx,
n.dbDesc.GetID(),
schemasIDsToDelete,
n.d.getDroppedTableDetails(),
n.d.typesToDelete,
tree.AsStringWithFQNames(n.n, params.Ann()),
); err != nil {
return err
}
)

n.dbDesc.SetDropped()
b := p.txn.NewBatch()
Expand Down
Loading

0 comments on commit 0eda6bd

Please sign in to comment.