Skip to content

Commit

Permalink
Merge #89757
Browse files Browse the repository at this point in the history
89757: *: Implement check constraint validation for new schema changer r=Xiang-Gu a=Xiang-Gu

See each commit message for details.

Commit 1: preparation work where we augmented an existing type definition.

Commit 2: add a `ValidateCheckConstraint` method to interface `Validator`
with empty implementations.

Commit 3: actually implement the logic for validating check constraint.

Informs #89665

Co-authored-by: Xiang Gu <[email protected]>
  • Loading branch information
craig[bot] and Xiang-Gu committed Oct 11, 2022
2 parents 1df5d28 + 397d5da commit b31edd7
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,10 +1785,10 @@ func revalidateIndexes(

// We don't actually need the 'historical' read the way the schema change does
// since our table is offline.
var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error {
var runner descs.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn descs.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
return fn(ctx, txn, ie)
return fn(ctx, txn, ie, nil /* descriptors */)
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ieFactory,
sql.ValidateForwardIndexes,
sql.ValidateInvertedIndexes,
sql.ValidateCheckConstraint,
sql.NewFakeSessionData,
)
execCfg.InternalExecutorFactory = ieFactory
Expand Down
68 changes: 56 additions & 12 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
// makeFixedTimestampRunner creates a HistoricalTxnRunner suitable for use by the helpers.
func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
readAsOf hlc.Timestamp,
) sqlutil.HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable sqlutil.InternalExecFn) error {
) descs.HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable descs.InternalExecFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := sc.ieFactory.NewInternalExecutor(NewFakeSessionData(sc.execCfg.SV()))
return retryable(ctx, txn, ie)
return retryable(ctx, txn, ie, nil /* descriptors */)
})
}
return runner
Expand Down Expand Up @@ -1515,6 +1515,42 @@ func (e InvalidIndexesError) Error() string {
return fmt.Sprintf("found %d invalid indexes", len(e.Indexes))
}

// ValidateCheckConstraint validates the check constraint against all rows
// in the table.
func ValidateCheckConstraint(
ctx context.Context,
tableDesc catalog.TableDescriptor,
constraint *descpb.ConstraintDetail,
sessionData *sessiondata.SessionData,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
execOverride sessiondata.InternalExecutorOverride,
) (err error) {
if constraint.CheckConstraint == nil {
return errors.AssertionFailedf("%v is not a check constraint", constraint.GetConstraintName())
}

tableDesc, err = tableDesc.MakeFirstMutationPublic(catalog.IgnoreConstraints)
if err != nil {
return err
}

// The check operates at the historical timestamp.
return runHistoricalTxn(ctx, func(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, descriptors *descs.Collection,
) error {
// Use the DistSQLTypeResolver because we need to resolve types by ID.
resolver := descs.NewDistSQLTypeResolver(descriptors, txn)
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = &resolver
defer func() { descriptors.ReleaseAll(ctx) }()

return ie.WithSyntheticDescriptors([]catalog.Descriptor{tableDesc}, func() error {
return validateCheckExpr(ctx, &semaCtx, txn, sessionData, constraint.CheckConstraint.Expr,
tableDesc.(*tabledesc.Mutable), ie)
})
})
}

// ValidateInvertedIndexes checks that the indexes have entries for
// all the items of data in rows.
//
Expand All @@ -1527,7 +1563,7 @@ func ValidateInvertedIndexes(
codec keys.SQLCodec,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
Expand All @@ -1553,7 +1589,9 @@ func ValidateInvertedIndexes(
span := tableDesc.IndexSpan(codec, idx.GetID())
key := span.Key
endKey := span.EndKey
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ sqlutil.InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(
ctx context.Context, txn *kv.Txn, _ sqlutil.InternalExecutor, _ *descs.Collection,
) error {
for {
kvs, err := txn.Scan(ctx, key, endKey, 1000000)
if err != nil {
Expand Down Expand Up @@ -1620,7 +1658,7 @@ func countExpectedRowsForInvertedIndex(
ctx context.Context,
tableDesc catalog.TableDescriptor,
idx catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
execOverride sessiondata.InternalExecutorOverride,
) (int64, error) {
Expand Down Expand Up @@ -1659,7 +1697,9 @@ func countExpectedRowsForInvertedIndex(
}

var expectedCount int64
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, _ *descs.Collection,
) error {
var stmt string
geoConfig := idx.GetGeoConfig()
if geoConfig.IsEmpty() {
Expand Down Expand Up @@ -1714,7 +1754,7 @@ func ValidateForwardIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
Expand Down Expand Up @@ -1816,7 +1856,7 @@ func populateExpectedCounts(
indexes []catalog.Index,
partialIndexExpectedCounts map[descpb.IndexID]int64,
withFirstMutationPublic bool,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
execOverride sessiondata.InternalExecutorOverride,
) (int64, error) {
desc := tableDesc
Expand All @@ -1834,7 +1874,9 @@ func populateExpectedCounts(
desc = fakeDesc
}
var tableRowCount int64
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, _ *descs.Collection,
) error {
var s strings.Builder
for _, idx := range indexes {
// For partial indexes, count the number of rows in the table
Expand Down Expand Up @@ -1880,7 +1922,7 @@ func countIndexRowsAndMaybeCheckUniqueness(
tableDesc catalog.TableDescriptor,
idx catalog.Index,
withFirstMutationPublic bool,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
execOverride sessiondata.InternalExecutorOverride,
) (int64, error) {
// If we are doing a REGIONAL BY ROW locality change, we can
Expand Down Expand Up @@ -1945,7 +1987,9 @@ func countIndexRowsAndMaybeCheckUniqueness(

// Retrieve the row count in the index.
var idxLen int64
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, _ *descs.Collection,
) error {
query := fmt.Sprintf(`SELECT count(1) FROM [%d AS t]@[%d]`, desc.GetID(), idx.GetID())
// If the index is a partial index the predicate must be added
// as a filter to the query to force scanning the index.
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -611,3 +612,11 @@ func MakeTestCollection(ctx context.Context, leaseManager *lease.Manager) Collec
leased: makeLeasedDescriptors(leaseManager),
}
}

// InternalExecFn is the type of functions that operates using an internalExecutor.
type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, descriptors *Collection) error

// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only
// passes the fn the exported InternalExecutor and *Collection, instead of
// the whole unexported extendedEvalContenxt, so it can be implemented outside pkg/sql.
type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error
11 changes: 11 additions & 0 deletions pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,17 @@ func (s *TestState) Validator() scexec.Validator {
return s
}

// ValidateCheckConstraint implements the validator interface.
func (s *TestState) ValidateCheckConstraint(
ctx context.Context,
tbl catalog.TableDescriptor,
constraint *descpb.ConstraintDetail,
override sessiondata.InternalExecutorOverride,
) error {
s.LogSideEffectf("validate check constraint %v in table #%d", constraint.GetConstraintName(), tbl.GetID())
return nil
}

// LogEvent implements scexec.EventLogger.
func (s *TestState) LogEvent(
_ context.Context, details eventpb.CommonSQLEventDetails, event logpb.EventPayload,
Expand Down
47 changes: 37 additions & 10 deletions pkg/sql/schemachanger/scdeps/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand All @@ -28,7 +30,7 @@ type ValidateForwardIndexesFn func(
ctx context.Context,
tbl catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
Expand All @@ -40,12 +42,22 @@ type ValidateInvertedIndexesFn func(
codec keys.SQLCodec,
tbl catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error

// ValidateCheckConstraintFn callback function for validting check constraints.
type ValidateCheckConstraintFn func(
ctx context.Context,
tbl catalog.TableDescriptor,
constraint *descpb.ConstraintDetail,
sessionData *sessiondata.SessionData,
runHistoricalTxn descs.HistoricalInternalExecTxnRunner,
execOverride sessiondata.InternalExecutorOverride,
) error

// NewFakeSessionDataFn callback function used to create session data
// for the internal executor.
type NewFakeSessionDataFn func(sv *settings.Values) *sessiondata.SessionData
Expand All @@ -57,6 +69,7 @@ type validator struct {
ieFactory sqlutil.InternalExecutorFactory
validateForwardIndexes ValidateForwardIndexesFn
validateInvertedIndexes ValidateInvertedIndexesFn
validateCheckConstraint ValidateCheckConstraintFn
newFakeSessionData NewFakeSessionDataFn
}

Expand Down Expand Up @@ -92,18 +105,30 @@ func (vd validator) ValidateInvertedIndexes(
)
}

func (vd validator) ValidateCheckConstraint(
ctx context.Context,
tbl catalog.TableDescriptor,
constraint *descpb.ConstraintDetail,
override sessiondata.InternalExecutorOverride,
) error {
return vd.validateCheckConstraint(ctx, tbl, constraint, vd.newFakeSessionData(&vd.settings.SV),
vd.makeHistoricalInternalExecTxnRunner(), override)
}

// makeHistoricalInternalExecTxnRunner creates a new transaction runner which
// always runs at the same time and that time is the current time as of when
// this constructor was called.
func (vd validator) makeHistoricalInternalExecTxnRunner() sqlutil.HistoricalInternalExecTxnRunner {
func (vd validator) makeHistoricalInternalExecTxnRunner() descs.HistoricalInternalExecTxnRunner {
now := vd.db.Clock().Now()
return func(ctx context.Context, fn sqlutil.InternalExecFn) error {
validationTxn := vd.db.NewTxn(ctx, "validation")
err := validationTxn.SetFixedTimestamp(ctx, now)
if err != nil {
return err
}
return fn(ctx, validationTxn, vd.ieFactory.NewInternalExecutor(vd.newFakeSessionData(&vd.settings.SV)))
return func(ctx context.Context, fn descs.InternalExecFn) error {
return vd.ieFactory.(descs.TxnManager).DescsTxnWithExecutor(ctx, vd.db, vd.newFakeSessionData(&vd.settings.SV), func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor,
) error {
if err := txn.SetFixedTimestamp(ctx, now); err != nil {
return err
}
return fn(ctx, txn, ie, descriptors)
})
}
}

Expand All @@ -116,6 +141,7 @@ func NewValidator(
ieFactory sqlutil.InternalExecutorFactory,
validateForwardIndexes ValidateForwardIndexesFn,
validateInvertedIndexes ValidateInvertedIndexesFn,
validateCheckConstraint ValidateCheckConstraintFn,
newFakeSessionData NewFakeSessionDataFn,
) scexec.Validator {
return validator{
Expand All @@ -125,6 +151,7 @@ func NewValidator(
ieFactory: ieFactory,
validateForwardIndexes: validateForwardIndexes,
validateInvertedIndexes: validateInvertedIndexes,
validateCheckConstraint: validateCheckConstraint,
newFakeSessionData: newFakeSessionData,
}
}
7 changes: 7 additions & 0 deletions pkg/sql/schemachanger/scexec/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ type Validator interface {
indexes []catalog.Index,
override sessiondata.InternalExecutorOverride,
) error

ValidateCheckConstraint(
ctx context.Context,
tbl catalog.TableDescriptor,
constraint *descpb.ConstraintDetail,
override sessiondata.InternalExecutorOverride,
) error
}

// IndexSpanSplitter can try to split an index span in the current transaction
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/schemachanger/scexec/executor_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,15 @@ func (noopValidator) ValidateInvertedIndexes(
return nil
}

func (noopValidator) ValidateCheckConstraint(
ctx context.Context,
tbl catalog.TableDescriptor,
constraint *descpb.ConstraintDetail,
override sessiondata.InternalExecutorOverride,
) error {
return nil
}

type noopEventLogger struct{}

var _ scexec.EventLogger = noopEventLogger{}
Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/sqlutil/internal_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,6 @@ type InternalExecutorFactory interface {
TxnWithExecutor(context.Context, *kv.DB, *sessiondata.SessionData, func(context.Context, *kv.Txn, InternalExecutor) error, ...TxnOption) error
}

// InternalExecFn is the type of functions that operates using an internalExecutor.
type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) error

// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only
// passes the fn the exported InternalExecutor instead of the whole unexported
// extendedEvalContenxt, so it can be implemented outside pkg/sql.
type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error

// TxnOption is used to configure a Txn or TxnWithExecutor.
type TxnOption interface {
Apply(*TxnConfig)
Expand Down

0 comments on commit b31edd7

Please sign in to comment.