Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
92378: opt: add rule to replace outer cols with equivalent non-outer cols r=DrewKimball a=DrewKimball

This commit adds three decorrelation rules, `TryRemapJoinOuterColsRight`, `TryRemapJoinOuterColsLeft`, and `TryRemapSelectOuterCols`. These rules match joins and selects that have a correlated input and an equality filter between an outer and a non-outer column from the input. Upon matching, the rules traverse the input as far as it would be valid to push the equality filter through normal push-down rules, and replace any encountered references to the equivalent outer column. This approach avoids interactions with rules like `TryDecorrelateSelect`, which attempt to pull filters *up* the operator tree when correlation is present.

Fixes #88885

Release note: None

95620: sql: wait for one version lease if no job created for a descriptor r=chengxiong-ruan a=chengxiong-ruan

Previously we only wait for one version in jobs for descriptors performed with schema changes. The problem is that we don't always create jobs for all descriptors modified. We need to wait for one version for these descriptors too. This pr adds logic to wait if there is no jobs created for a descriptor has new version.

Fixes: #90600

Release note: None

95800: instancestorage: read instances in a transaction r=ajwerner a=healthy-pod

This code change adds `(instancestorage.Reader).GetAllInstancesUsingTxn` to read all instances using the given `*kv.Txn`.

Release note: None
Epic: CRDB-18735

96004: sql/schemachanger: limit which op/dep rule helpers are shared r=fqazi a=fqazi

Previously, the rules helpers were generally shared between releases inside the declarative schema changer. This could be problematic, since some helpers may get modified as new elements get added between releases. To address this, this patch moves most helpers back into individual rules, leaving a small subset as shared.

Additionally, this patch version gates TypeFilter, so that only elements for a given release are visible. When comparing 22.2 versus the compatibility release the majority of differences are now linked with sorting changes and some constraint related adjustments (element renames and minor rules adjustment for ops).

Epic: none
Release note: None

96021: storage: Make logging event listener async for DiskSlow r=sumeerbhola a=itsbilal

The pebble logger could block if we're experiencing a slow / stalling disk. If the call to the pebble logger is synchronous from the EventListener passed into Pebble, it could end up slowing down Pebble's internal disk health checks as those rely on EventListener methods being quick to run.

This change updates the logging event listener to asynchronously call the logger on a DiskSlow event.

Related to #94373.

Epic: none

Release note: None.

Co-authored-by: Drew Kimball <[email protected]>
Co-authored-by: Chengxiong Ruan <[email protected]>
Co-authored-by: healthy-pod <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
Co-authored-by: Bilal Akhtar <[email protected]>
  • Loading branch information
6 people committed Jan 27, 2023
6 parents af4e192 + eb935d9 + e0e57b5 + b88b17c + 61305b5 + 922238d commit efb4481
Show file tree
Hide file tree
Showing 73 changed files with 2,953 additions and 1,932 deletions.
2 changes: 0 additions & 2 deletions pkg/bench/rttanalysis/orm_queries_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,6 @@ LEFT JOIN LATERAL
) columns ON true;`,
},

// Once https://github.com/cockroachdb/cockroach/issues/88885 is resolved,
// the previous test case should be identical to this one.
{
Name: "hasura column descriptions modified",
Setup: "CREATE TABLE t(a INT PRIMARY KEY)",
Expand Down
12 changes: 6 additions & 6 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2324,7 +2324,7 @@ func TestJobInTxn(t *testing.T) {

// Accessed atomically.
var hasRun int32
var job *jobs.Job
var jobID jobspb.JobID

defer sql.ClearPlanHooks()
// Piggy back on BACKUP to be able to create a succeeding test job.
Expand All @@ -2338,7 +2338,7 @@ func TestJobInTxn(t *testing.T) {
}
fn := func(ctx context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error {
var err error
job, err = execCtx.ExtendedEvalContext().QueueJob(ctx, execCtx.InternalSQLTxn(), jobs.Record{
jobID = execCtx.ExtendedEvalContext().QueueJob(&jobs.Record{
Description: st.String(),
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
Expand Down Expand Up @@ -2378,7 +2378,7 @@ func TestJobInTxn(t *testing.T) {
}
fn := func(ctx context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error {
var err error
job, err = execCtx.ExtendedEvalContext().QueueJob(ctx, execCtx.InternalSQLTxn(), jobs.Record{
jobID = execCtx.ExtendedEvalContext().QueueJob(&jobs.Record{
Description: "RESTORE",
Details: jobspb.RestoreDetails{},
Progress: jobspb.RestoreProgress{},
Expand Down Expand Up @@ -2413,14 +2413,14 @@ func TestJobInTxn(t *testing.T) {
// If we rollback then the job should not run
require.NoError(t, txn.Rollback())
registry := s.JobRegistry().(*jobs.Registry)
_, err = registry.LoadJob(ctx, job.ID())
_, err = registry.LoadJob(ctx, jobID)
require.Error(t, err, "the job should not exist after the txn is rolled back")
require.True(t, jobs.HasJobNotFoundError(err))

sqlRunner := sqlutils.MakeSQLRunner(sqlDB)
// Just in case the job was scheduled let's wait for it to finish
// to avoid a race.
sqlRunner.Exec(t, "SHOW JOB WHEN COMPLETE $1", job.ID())
sqlRunner.Exec(t, "SHOW JOB WHEN COMPLETE $1", jobID)
require.Equal(t, int32(0), atomic.LoadInt32(&hasRun),
"job has run in transaction before txn commit")
require.True(t, timeutil.Since(start) < jobs.DefaultAdoptInterval, "job should have been adopted immediately")
Expand All @@ -2437,7 +2437,7 @@ func TestJobInTxn(t *testing.T) {
// Committing will block and wait for all jobs to run.
require.NoError(t, txn.Commit())
registry := s.JobRegistry().(*jobs.Registry)
j, err := registry.LoadJob(ctx, job.ID())
j, err := registry.LoadJob(ctx, jobID)
require.NoError(t, err, "queued job not found")
require.NotEqual(t, int32(0), atomic.LoadInt32(&hasRun),
"job scheduled in transaction did not run")
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"completions.go",
"conn_executor.go",
"conn_executor_exec.go",
"conn_executor_jobs.go",
"conn_executor_prepare.go",
"conn_executor_savepoints.go",
"conn_executor_show_commit_timestamp.go",
Expand Down Expand Up @@ -134,6 +135,7 @@ go_library(
"inverted_join.go",
"job_exec_context.go",
"job_exec_context_test_util.go",
"jobs_collection.go",
"join.go",
"join_predicate.go",
"join_token.go",
Expand Down
55 changes: 26 additions & 29 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand Down Expand Up @@ -1018,9 +1017,8 @@ func (s *Server) newConnExecutor(
ex.extraTxnState.descCollection = s.cfg.CollectionFactory.NewCollection(
ctx, descs.WithDescriptorSessionDataProvider(dsdp), descs.WithMonitor(ex.sessionMon),
)
ex.extraTxnState.jobs = new(jobsCollection)
ex.extraTxnState.jobs = newTxnJobsCollection()
ex.extraTxnState.txnRewindPos = -1
ex.extraTxnState.schemaChangeJobRecords = make(map[descpb.ID]*jobs.Record)
ex.extraTxnState.schemaChangerState = &SchemaChangerState{
mode: ex.sessionData().NewSchemaChangerMode,
}
Expand Down Expand Up @@ -1275,18 +1273,7 @@ type connExecutor struct {
// descCollection collects descriptors used by the current transaction.
descCollection *descs.Collection

// jobs accumulates jobs staged for execution inside the transaction.
// Staging happens when executing statements that are implemented with a
// job. The jobs are staged via the function QueueJob in
// pkg/sql/planner.go. The staged jobs are executed once the transaction
// that staged them commits.
jobs *jobsCollection

// schemaChangeJobRecords is a map of descriptor IDs to job Records.
// Used in createOrUpdateSchemaChangeJob so we can check if a job has been
// queued up for the given ID. The cache remains valid only for the current
// transaction and it is cleared after the transaction is committed.
schemaChangeJobRecords map[descpb.ID]*jobs.Record
jobs *txnJobsCollection

// firstStmtExecuted indicates that the first statement inside this
// transaction has been executed.
Expand Down Expand Up @@ -1714,9 +1701,6 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
}
} else {
ex.extraTxnState.descCollection.ReleaseAll(ctx)
for k := range ex.extraTxnState.schemaChangeJobRecords {
delete(ex.extraTxnState.schemaChangeJobRecords, k)
}
ex.extraTxnState.jobs.reset()
ex.extraTxnState.schemaChangerState = &SchemaChangerState{
mode: ex.sessionData().NewSchemaChangerMode,
Expand Down Expand Up @@ -2824,15 +2808,14 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
DescIDGenerator: ex.getDescIDGenerator(),
RangeStatsFetcher: p.execCfg.RangeStatsFetcher,
},
Tracing: &ex.sessionTracing,
MemMetrics: &ex.memMetrics,
Descs: ex.extraTxnState.descCollection,
TxnModesSetter: ex,
Jobs: ex.extraTxnState.jobs,
SchemaChangeJobRecords: ex.extraTxnState.schemaChangeJobRecords,
statsProvider: ex.server.sqlStats,
indexUsageStats: ex.indexUsageStats,
statementPreparer: ex,
Tracing: &ex.sessionTracing,
MemMetrics: &ex.memMetrics,
Descs: ex.extraTxnState.descCollection,
TxnModesSetter: ex,
jobs: ex.extraTxnState.jobs,
statsProvider: ex.server.sqlStats,
indexUsageStats: ex.indexUsageStats,
statementPreparer: ex,
}
evalCtx.copyFromExecCfg(ex.server.cfg)
}
Expand Down Expand Up @@ -3077,13 +3060,27 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}
ex.notifyStatsRefresherOfNewTables(ex.Ctx())

// If there is any descriptor has new version. We want to make sure there is
// only one version of the descriptor in all nodes. In schema changer jobs,
// `WaitForOneVersion` has been called for the descriptors included in jobs.
// So we just need to do this for descriptors not in jobs.
// We need to get descriptor IDs in jobs before jobs are run because we have
// operations in declarative schema changer removing descriptor IDs from job
// payload as it's done with the descriptors.
descIDsInJobs, err := ex.descIDsInSchemaChangeJobs()
if err != nil {
return advanceInfo{}, err
}
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionStartPostCommitJob, timeutil.Now())
if err := ex.server.cfg.JobRegistry.Run(
ex.ctxHolder.connCtx, *ex.extraTxnState.jobs,
ex.ctxHolder.connCtx, ex.extraTxnState.jobs.created,
); err != nil {
handleErr(err)
}
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionEndPostCommitJob, timeutil.Now())
if err := ex.waitOneVersionForNewVersionDescriptorsWithoutJobs(descIDsInJobs); err != nil {
return advanceInfo{}, err
}

fallthrough
case txnRestart, txnRollback:
Expand Down Expand Up @@ -3395,7 +3392,7 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
scs.state = after
scs.jobID = jobID
if jobID != jobspb.InvalidJobID {
ex.extraTxnState.jobs.add(jobID)
ex.extraTxnState.jobs.addCreatedJobID(jobID)
log.Infof(ctx, "queued new schema change job %d using the new schema changer", jobID)
}
return nil
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,20 +1059,23 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) error
// createJobs creates jobs for the records cached in schemaChangeJobRecords
// during this transaction.
func (ex *connExecutor) createJobs(ctx context.Context) error {
if len(ex.extraTxnState.schemaChangeJobRecords) == 0 {
if !ex.extraTxnState.jobs.hasAnyToCreate() {
return nil
}
var records []*jobs.Record
for _, record := range ex.extraTxnState.schemaChangeJobRecords {
records = append(records, record)
if err := ex.extraTxnState.jobs.forEachToCreate(func(jobRecord *jobs.Record) error {
records = append(records, jobRecord)
return nil
}); err != nil {
return err
}
jobIDs, err := ex.server.cfg.JobRegistry.CreateJobsWithTxn(
ctx, ex.planner.InternalSQLTxn(), records,
)
if err != nil {
return err
}
ex.planner.extendedEvalCtx.Jobs.add(jobIDs...)
ex.planner.extendedEvalCtx.jobs.addCreatedJobID(jobIDs...)
return nil
}

Expand Down
124 changes: 124 additions & 0 deletions pkg/sql/conn_executor_jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/errors"
)

// waitOneVersionForNewVersionDescriptorsWithoutJobs is to used wait until all
// descriptors with new versions to converge to one version in the cluster.
// `descIDsInJobs` are collected with `descIDsInSchemaChangeJobs`. We need to do
// this to make sure all descriptors mutated are at one version when the schema
// change finish in the user transaction. In schema change jobs, we do similar
// thing for affected descriptors. But, in some scenario, jobs are not created
// for mutated descriptors.
func (ex *connExecutor) waitOneVersionForNewVersionDescriptorsWithoutJobs(
descIDsInJobs catalog.DescriptorIDSet,
) error {
withNewVersion, err := ex.extraTxnState.descCollection.GetOriginalPreviousIDVersionsForUncommitted()
if err != nil {
return err
}
for _, idVersion := range withNewVersion {
if descIDsInJobs.Contains(idVersion.ID) {
continue
}
if _, err := WaitToUpdateLeases(ex.Ctx(), ex.planner.LeaseMgr(), idVersion.ID); err != nil {
// In most cases (normal schema changes), deleted descriptor should have
// been handled by jobs. So, normally we won't hit into the situation of
// wait for one version of a deleted descriptor. However, we need catch
// ErrDescriptorNotFound here because we have a special case of descriptor
// repairing where we delete descriptors directly and never record the ids
// in jobs payload or details.
if errors.Is(err, catalog.ErrDescriptorNotFound) {
continue
}
return err
}
}
return nil
}

// descIDsInSchemaChangeJobs returns all descriptor IDs with which schema change
// jobs in this transaction will perform. Within schema change jobs, we also
// wait until the whole cluster only has leases on the latest version of these
// descriptors, and we would like to also "wait for one version" for descriptors
// with new versions but not included in any schema change jobs.
func (ex *connExecutor) descIDsInSchemaChangeJobs() (catalog.DescriptorIDSet, error) {
// Get descriptor IDs from legacy schema changer jobs.
var descIDsInJobs catalog.DescriptorIDSet
if err := ex.extraTxnState.jobs.forEachToCreate(func(jobRecord *jobs.Record) error {
switch t := jobRecord.Details.(type) {
case jobspb.SchemaChangeDetails:
// In most cases, the field DescriptorIDs contains descriptor IDs the
// schema change directly affects. Like it could be a table ID if an index
// is created, or it could be a list of schema IDs when dropping a group
// of schemas.
// But it can be confusing sometimes in two types of schema change jobs:
// (1) dropping a database:
// In this scenario, the DescriptorIDs is the ids of tables in this
// database that will be dropped together. And the DroppedDatabaseID field
// is the actual ID of the database that will be dropped.
// (2) any other changes on a database (but not drop):
// For example, when renaming a schema, database's list of all schemas
// need to be updated, and we create a job for this kind of database
// changes. DescriptorIDs is empty in this case and the DescID field in
// the job
// detail is the actual database ID.
for _, descID := range jobRecord.DescriptorIDs {
descIDsInJobs.Add(descID)
}
for _, tbl := range t.DroppedTables {
descIDsInJobs.Add(tbl.ID)
}
for _, id := range t.DroppedTypes {
descIDsInJobs.Add(id)
}
for _, id := range t.DroppedSchemas {
descIDsInJobs.Add(id)
}
descIDsInJobs.Add(t.DroppedDatabaseID)
descIDsInJobs.Add(t.DescID)
}
return nil
}); err != nil {
return catalog.DescriptorIDSet{}, err
}

// If there is no declarative schema changer job, then we are done. Otherwise,
// we need to check which descriptor has the jobID in its schema change state.
if ex.extraTxnState.schemaChangerState.jobID == jobspb.InvalidJobID {
return descIDsInJobs, nil
}
// Get descriptor IDs with declarative schema changer jobs.
withNewVersion, err := ex.extraTxnState.descCollection.GetOriginalPreviousIDVersionsForUncommitted()
if err != nil {
return catalog.DescriptorIDSet{}, err
}
for _, idVersion := range withNewVersion {
if descIDsInJobs.Contains(idVersion.ID) {
continue
}
desc, err := ex.extraTxnState.descCollection.ByID(ex.state.mu.txn).Get().Desc(ex.Ctx(), idVersion.ID)
if err != nil {
return catalog.DescriptorIDSet{}, err
}
state := desc.GetDeclarativeSchemaChangerState()
if state != nil && state.JobID != jobspb.InvalidJobID {
descIDsInJobs.Add(idVersion.ID)
}
}
return descIDsInJobs, nil
}
16 changes: 11 additions & 5 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,14 +1111,17 @@ func makeJobsTableRows(
}
defer cleanup(ctx)

sessionJobs := make([]*jobs.Record, 0, len(p.extendedEvalCtx.SchemaChangeJobRecords))
sessionJobs := make([]*jobs.Record, 0, p.extendedEvalCtx.jobs.numToCreate())
uniqueJobs := make(map[*jobs.Record]struct{})
for _, job := range p.extendedEvalCtx.SchemaChangeJobRecords {
if err := p.extendedEvalCtx.jobs.forEachToCreate(func(job *jobs.Record) error {
if _, ok := uniqueJobs[job]; ok {
continue
return nil
}
sessionJobs = append(sessionJobs, job)
uniqueJobs[job] = struct{}{}
return nil
}); err != nil {
return matched, err
}

// Loop while we need to skip a row.
Expand Down Expand Up @@ -5300,17 +5303,20 @@ func collectMarshaledJobMetadataMap(
if err := it.Close(); err != nil {
return nil, err
}
for _, record := range p.ExtendedEvalContext().SchemaChangeJobRecords {
if err := p.ExtendedEvalContext().jobs.forEachToCreate(func(record *jobs.Record) error {
progressBytes, payloadBytes, err := getPayloadAndProgressFromJobsRecord(p, record)
if err != nil {
return nil, err
return err
}
mj := marshaledJobMetadata{
status: tree.NewDString(string(record.RunningStatus)),
payloadBytes: payloadBytes,
progressBytes: progressBytes,
}
m[record.JobID] = mj
return nil
}); err != nil {
return nil, err
}
return m, nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ func (p *planner) renameDatabase(
func (p *planner) writeNonDropDatabaseChange(
ctx context.Context, desc *dbdesc.Mutable, jobDesc string,
) error {
if err := p.createNonDropDatabaseChangeJob(ctx, desc.ID, jobDesc); err != nil {
return err
}
p.createNonDropDatabaseChangeJob(ctx, desc.ID, jobDesc)
b := p.Txn().NewBatch()
if err := p.writeDatabaseChangeToBatch(ctx, desc, b); err != nil {
return err
Expand Down
Loading

0 comments on commit efb4481

Please sign in to comment.