Skip to content

Commit

Permalink
Merge #71910 #74188
Browse files Browse the repository at this point in the history
71910: sql: add a cluster setting to avoid system config triggers r=ajwerner a=ajwerner

This is intended as a short-term workaround to improve performance in
situations of repeated schema changes, like ORM tests.

We have a plan to disable the system config trigger altogether in 22.1 with
#70560. 

This PR provides a cluster setting which allows schema change transactions
to bypass triggerring an update to the system config span. These updates
currently drive only the propagation of zone configs to KV and cluster
settings. The cluster setting behavior is retained until we address #70566.

We have a history of these sorts of unsafe settings in
`kv.raft_log.disable_synchronization_unsafe`.

Release note: None

74188: sql: fix InternalExecutor bug r=ajwerner a=ajwerner

Any time we use WithSyntheticDescriptors, it has to be on an unshared internal
executor. The move in #71246 to not have an internal executor hanging around
for the current session hurts here.

Fixes #73788

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jan 3, 2022
3 parents 7a2781f + 754db5d + 79c4c86 commit c1f1185
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 49 deletions.
15 changes: 11 additions & 4 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,8 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateCheckInTxn(
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutor, params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr,
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutorFactory,
params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr,
); err != nil {
return err
}
Expand All @@ -822,8 +823,12 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateFkInTxn(
params.ctx, params.p.LeaseMgr(), params.ExecCfg().InternalExecutor,
n.tableDesc, params.EvalContext().Txn, name, params.EvalContext().Codec,
params.ctx, params.p.LeaseMgr(),
params.ExecCfg().InternalExecutorFactory,
params.p.SessionData(),
n.tableDesc,
params.EvalContext().Txn,
name, params.EvalContext().Codec,
); err != nil {
return err
}
Expand All @@ -846,7 +851,9 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateUniqueWithoutIndexConstraintInTxn(
params.ctx, params.ExecCfg().InternalExecutor, n.tableDesc, params.EvalContext().Txn, name,
params.ctx, params.ExecCfg().InternalExecutorFactory(
params.ctx, params.SessionData(),
), n.tableDesc, params.EvalContext().Txn, name,
); err != nil {
return err
}
Expand Down
31 changes: 16 additions & 15 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, descriptors)
return retryable(ctx, txn, &evalCtx)
})
}
Expand All @@ -159,9 +159,7 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).SchemaChangeInternalExecutor
ie := sc.ieFactory(ctx, NewFakeSessionData(sc.execCfg.SV()))
return retryable(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -704,21 +702,21 @@ func (sc *SchemaChanger) validateConstraints(
defer func() { collection.ReleaseAll(ctx) }()
if c.IsCheck() {
if err := validateCheckInTxn(
ctx, &semaCtx, evalCtx.SchemaChangeInternalExecutor, evalCtx.SessionData(), desc, txn, c.Check().Expr,
ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
return err
}
} else if c.IsForeignKey() {
if err := validateFkInTxn(ctx, sc.leaseMgr, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName(), evalCtx.Codec); err != nil {
if err := validateFkInTxn(ctx, sc.leaseMgr, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.GetName(), evalCtx.Codec); err != nil {
return err
}
} else if c.IsUniqueWithoutIndex() {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName()); err != nil {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, sc.ieFactory(ctx, evalCtx.SessionData()), desc, txn, c.GetName()); err != nil {
return err
}
} else if c.IsNotNull() {
if err := validateCheckInTxn(
ctx, &semaCtx, evalCtx.SchemaChangeInternalExecutor, evalCtx.SessionData(), desc, txn, c.Check().Expr,
ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
// TODO (lucy): This should distinguish between constraint
// validation errors and other types of unexpected errors, and
Expand Down Expand Up @@ -1018,7 +1016,7 @@ func (sc *SchemaChanger) distIndexBackfill(
if err != nil {
return err
}
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors)
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
Expand Down Expand Up @@ -1292,7 +1290,7 @@ func (sc *SchemaChanger) distColumnBackfill(
return nil
}
cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn}
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors)
recv := MakeDistSQLReceiver(
ctx,
&cbw,
Expand Down Expand Up @@ -2117,7 +2115,8 @@ func runSchemaChangesInTxn(
check := &c.ConstraintToUpdateDesc().Check
if check.Validity == descpb.ConstraintValidity_Validating {
if err := validateCheckInTxn(
ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutor, planner.SessionData(), tableDesc, planner.txn, check.Expr,
ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutorFactory,
planner.SessionData(), tableDesc, planner.txn, check.Expr,
); err != nil {
return err
}
Expand Down Expand Up @@ -2214,7 +2213,7 @@ func runSchemaChangesInTxn(
func validateCheckInTxn(
ctx context.Context,
semaCtx *tree.SemaContext,
ie *InternalExecutor,
ief sqlutil.SessionBoundInternalExecutorFactory,
sessionData *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
Expand All @@ -2224,6 +2223,7 @@ func validateCheckInTxn(
if tableDesc.Version > tableDesc.ClusterVersion.Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}
ie := ief(ctx, sessionData)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn)
})
Expand All @@ -2244,7 +2244,8 @@ func validateCheckInTxn(
func validateFkInTxn(
ctx context.Context,
leaseMgr *lease.Manager,
ie *InternalExecutor,
ief sqlutil.SessionBoundInternalExecutorFactory,
sd *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
fkName string,
Expand All @@ -2266,7 +2267,7 @@ func validateFkInTxn(
if fk == nil {
return errors.AssertionFailedf("foreign key %s does not exist", fkName)
}

ie := ief(ctx, sd)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateForeignKey(ctx, tableDesc, fk, ie, txn, codec)
})
Expand All @@ -2286,7 +2287,7 @@ func validateFkInTxn(
// reuse an existing kv.Txn safely.
func validateUniqueWithoutIndexConstraintInTxn(
ctx context.Context,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
constraintName string,
Expand Down
22 changes: 20 additions & 2 deletions pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand All @@ -26,6 +27,21 @@ import (

var errTwoVersionInvariantViolated = errors.Errorf("two version invariant violated")

// UnsafeSkipSystemConfigTrigger will prevent setting the system config
// trigger for transactions which write to tables in the system config. The
// implication of setting this to true is that various subsystems which
// rely on that trigger, such as zone configs and replication reports, will
// not work. This can be used to accelerate high-frequency schema changes
// like during an ORM test suite.
var UnsafeSkipSystemConfigTrigger = settings.RegisterBoolSetting(
settings.SystemOnly,
"sql.catalog.unsafe_skip_system_config_trigger.enabled",
"avoid setting the system config trigger in transactions which write to "+
"the system config. This will unlock performance at the cost of breaking "+
"table splits, zone configuration propagation, and cluster settings",
false,
)

// Txn enables callers to run transactions with a *Collection such that all
// retrieved immutable descriptors are properly leased and all mutable
// descriptors are handled. The function deals with verifying the two version
Expand Down Expand Up @@ -78,8 +94,10 @@ func (cf *CollectionFactory) Txn(
deletedDescs = nil
descsCol = cf.MakeCollection(nil)
defer descsCol.ReleaseAll(ctx)
if err := txn.SetSystemConfigTrigger(cf.leaseMgr.Codec().ForSystemTenant()); err != nil {
return err
if !UnsafeSkipSystemConfigTrigger.Get(&cf.settings.SV) {
if err := txn.SetSystemConfigTrigger(cf.leaseMgr.Codec().ForSystemTenant()); err != nil {
return err
}
}
if err := f(ctx, txn, &descsCol); err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func validateCheckExpr(
sessionData *sessiondata.SessionData,
exprStr string,
tableDesc *tabledesc.Mutable,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
expr, err := schemaexpr.FormatExprForDisplay(ctx, tableDesc, exprStr, semaCtx, sessionData, tree.FmtParsable)
Expand Down Expand Up @@ -237,7 +237,7 @@ func validateForeignKey(
ctx context.Context,
srcTable *tabledesc.Mutable,
fk *descpb.ForeignKeyConstraint,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
codec keys.SQLCodec,
) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/gcjob/descriptor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ func deleteDatabaseZoneConfig(
return nil
}
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil {
return err
if !descs.UnsafeSkipSystemConfigTrigger.Get(&settings.SV) {
if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil {
return err
}
}
b := &kv.Batch{}

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/gcjob/table_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func gcTables(
}

// Finished deleting all the table data, now delete the table meta data.
if err := sql.DeleteTableDescAndZoneConfig(ctx, execCfg.DB, execCfg.Codec, table); err != nil {
if err := sql.DeleteTableDescAndZoneConfig(
ctx, execCfg.DB, execCfg.Settings, execCfg.Codec, table,
); err != nil {
return errors.Wrapf(err, "dropping table descriptor for table %d", table.GetID())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (ib *IndexBackfillPlanner) plan(
if err := DescsTxn(ctx, ib.execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, ib.ieFactory, descriptors)
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, descriptors)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/execbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/server/telemetry",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/lexbase",
"//pkg/sql/mutations",
"//pkg/sql/opt",
Expand Down
14 changes: 9 additions & 5 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/opt/constraint"
Expand Down Expand Up @@ -157,12 +158,15 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) {
// `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;`
// where the table already exists. This will generate some false schema
// cache refreshes, but that's expected to be quite rare in practice.
if err := b.evalCtx.Txn.SetSystemConfigTrigger(b.evalCtx.Codec.ForSystemTenant()); err != nil {
return execPlan{}, errors.WithSecondaryError(
unimplemented.NewWithIssuef(26508,
"the first schema change statement in a transaction must precede any writes"),
err)
if !descs.UnsafeSkipSystemConfigTrigger.Get(&b.evalCtx.Settings.SV) {
if err := b.evalCtx.Txn.SetSystemConfigTrigger(b.evalCtx.Codec.ForSystemTenant()); err != nil {
return execPlan{}, errors.WithSecondaryError(
unimplemented.NewWithIssuef(26508,
"the first schema change statement in a transaction must precede any writes"),
err)
}
}

}

if opt.IsMutationOp(e) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
Expand Down Expand Up @@ -540,7 +541,8 @@ func (p *planner) maybePlanHook(ctx context.Context, stmt tree.Statement) (planN
// Mark transaction as operating on the system DB if the descriptor id
// is within the SystemConfig range.
func (p *planner) maybeSetSystemConfig(id descpb.ID) error {
if !descpb.IsSystemConfigID(id) {
if !descpb.IsSystemConfigID(id) ||
descs.UnsafeSkipSystemConfigTrigger.Get(&p.EvalContext().Settings.SV) {
return nil
}
// Mark transaction as operating on the system DB.
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ type extendedEvalContext struct {
indexUsageStats *idxusage.LocalIndexUsageStats

SchemaChangerState *SchemaChangerState

SchemaChangeInternalExecutor *InternalExecutor
}

// copyFromExecCfg copies relevant fields from an ExecutorConfig.
Expand Down
30 changes: 16 additions & 14 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ func (sc *SchemaChanger) exec(ctx context.Context) error {
} else {
// We've dropped a non-physical table, no need for a GC job, let's delete
// its descriptor and zone config immediately.
if err := DeleteTableDescAndZoneConfig(ctx, sc.db, sc.execCfg.Codec, tableDesc); err != nil {
if err := DeleteTableDescAndZoneConfig(
ctx, sc.db, sc.settings, sc.execCfg.Codec, tableDesc,
); err != nil {
return err
}
}
Expand Down Expand Up @@ -1936,24 +1938,18 @@ func (sc *SchemaChanger) txn(
// used in the surrounding SQL session, so session tracing is unable
// to capture schema change activity.
func createSchemaChangeEvalCtx(
ctx context.Context,
execCfg *ExecutorConfig,
ts hlc.Timestamp,
ieFactory sqlutil.SessionBoundInternalExecutorFactory,
descriptors *descs.Collection,
ctx context.Context, execCfg *ExecutorConfig, ts hlc.Timestamp, descriptors *descs.Collection,
) extendedEvalContext {

sd := NewFakeSessionData(execCfg.SV())
ie := ieFactory(ctx, sd)

evalCtx := extendedEvalContext{
// Make a session tracing object on-the-fly. This is OK
// because it sets "enabled: false" and thus none of the
// other fields are used.
Tracing: &SessionTracing{},
ExecCfg: execCfg,
Descs: descriptors,
SchemaChangeInternalExecutor: ie.(*InternalExecutor),
Tracing: &SessionTracing{},
ExecCfg: execCfg,
Descs: descriptors,
EvalContext: tree.EvalContext{
SessionDataStack: sessiondata.NewStack(sd),
// TODO(andrei): This is wrong (just like on the main code path on
Expand Down Expand Up @@ -2457,12 +2453,18 @@ func (sc *SchemaChanger) applyZoneConfigChangeForMutation(

// DeleteTableDescAndZoneConfig removes a table's descriptor and zone config from the KV database.
func DeleteTableDescAndZoneConfig(
ctx context.Context, db *kv.DB, codec keys.SQLCodec, tableDesc catalog.TableDescriptor,
ctx context.Context,
db *kv.DB,
settings *cluster.Settings,
codec keys.SQLCodec,
tableDesc catalog.TableDescriptor,
) error {
log.Infof(ctx, "removing table descriptor and zone config for table %d", tableDesc.GetID())
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil {
return err
if !descs.UnsafeSkipSystemConfigTrigger.Get(&settings.SV) {
if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil {
return err
}
}
b := &kv.Batch{}

Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/set_cluster_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ func (n *setClusterSettingNode) startExec(params runParams) error {
if !params.p.ExtendedEvalContext().TxnImplicit {
return errors.Errorf("SET CLUSTER SETTING cannot be used inside a transaction")
}

// Set the system config trigger explicitly here as it might not happen
// implicitly due to the setting of the
// sql.catalog.unsafe_skip_system_config_trigger.enabled cluster setting.
// The usage of gossip to propagate cluster settings in the system tenant
// will be fixed in an upcoming PR with #70566.
if err := params.p.EvalContext().Txn.SetSystemConfigTrigger(
params.EvalContext().Codec.ForSystemTenant(),
); err != nil {
return err
}

execCfg := params.extendedEvalCtx.ExecCfg
var expectedEncodedValue string
if err := execCfg.DB.Txn(params.ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down

0 comments on commit c1f1185

Please sign in to comment.