Skip to content

Commit

Permalink
Merge #82866
Browse files Browse the repository at this point in the history
82866: workload/schemachange: FK validation does not check inserted rows r=fqazi a=fqazi

Fixes: #63064

Previously, the validation of foreign keys during inserts,
did not take into account self referential scenarios properly,
where rows inside the insert would allow the FK constraint to
be met. To address this, this patch updates the validation to
scan the set of inserted rows for self referential foreign keys.

Release note: None

Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Jun 15, 2022
2 parents aadbaf9 + 957ec7b commit c030b8b
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 31 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/testccl/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand All @@ -33,7 +32,6 @@ import (

func TestWorkload(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 63064, "flaky test")
defer utilccl.TestingEnableEnterprise()()

dir := t.TempDir()
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/schemachange_random_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func registerSchemaChangeRandomLoad(r registry.Registry) {
}
runSchemaChangeRandomLoad(ctx, t, c, maxOps, concurrency)
},
Skip: "flaky: https://github.com/cockroachdb/cockroach/issues/82133",
})

// Run a few representative scbench specs in CI.
Expand Down
125 changes: 100 additions & 25 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,17 @@ GROUP BY name;
}
// Next validate the uniqueness of both constraints and index expressions.
for constraintIdx, constraint := range constraints {
nonTupleConstraint := constraint[0]
if len(nonTupleConstraint) > 2 &&
nonTupleConstraint[0] == '(' &&
nonTupleConstraint[len(nonTupleConstraint)-1] == ')' {
nonTupleConstraint = nonTupleConstraint[1 : len(nonTupleConstraint)-1]
}
hasNullsQuery := strings.Builder{}
hasNullsQuery.WriteString("SELECT num_nulls(")
hasNullsQuery.WriteString(nonTupleConstraint)
hasNullsQuery.WriteString(") > 0 FROM (VALUES(")

tupleSelectQuery := strings.Builder{}
tupleSelectQuery.WriteString("SELECT array[(")
tupleSelectQuery.WriteString(constraint[0])
Expand All @@ -361,7 +372,7 @@ GROUP BY name;
}
collector := newExprColumnCollector(colInfo)
t.Walk(collector)
query.WriteString("SELECT EXISTS ( SELECT * FROM ")
query.WriteString("SELECT COUNT (*) > 0 FROM (SELECT * FROM ")
query.WriteString(tableName.String())
query.WriteString(" WHERE ")
query.WriteString(constraint[0])
Expand All @@ -370,30 +381,23 @@ GROUP BY name;
query.WriteString(constraint[0])
query.WriteString(" FROM (VALUES( ")
colIdx := 0
nullValueEncountered := false
for col := range collector.columnsObserved {
value := columnsToValues[col]
if colIdx != 0 {
query.WriteString(",")
columns.WriteString(",")
tupleSelectQuery.WriteString(",")
}
if value == "NULL" {
nullValueEncountered = true
break
hasNullsQuery.WriteString(",")
}
query.WriteString(value)
columns.WriteString(col)
hasNullsQuery.WriteString(value)
tupleSelectQuery.WriteString(value)
colIdx++
}
// Row is not comparable to others for unique constraints, since it has a
// NULL value.
// TODO (fqazi): In the future for check constraints we should evaluate
// things for them.
if nullValueEncountered {
continue
}
hasNullsQuery.WriteString(") ) AS T(")
hasNullsQuery.WriteString(columns.String())
hasNullsQuery.WriteString(")")
tupleSelectQuery.WriteString(") ) AS T(")
tupleSelectQuery.WriteString(columns.String())
tupleSelectQuery.WriteString(")")
Expand All @@ -404,24 +408,53 @@ GROUP BY name;
if err != nil {
return false, nil, err
}
exists, err := og.scanBool(ctx, evalTxn, query.String())
if err != nil {
// Detect if any null values exist.
handleEvalTxnError := func(err error) (bool, error) {
// No choice but to rollback, expression is malformed.
rollbackErr := evalTxn.Rollback(ctx)
if rollbackErr != nil {
return false, err
}
var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) {
return false, nil, err
return false, err
}
// Only accept known error types for generated expressions.
if !isValidGenerationError(pgErr.Code) {
return false, nil, err
return false, err
}
generatedCodes = append(generatedCodes,
codesWithConditions{
{code: pgcode.MakeCode(pgErr.Code), condition: true},
}...,
)
return true, nil
}
hasNullValues, err := og.scanBool(ctx, evalTxn, hasNullsQuery.String())
if err != nil {
skipConstraint, err := handleEvalTxnError(err)
if err != nil {
return false, generatedCodes, err
}
if skipConstraint {
continue
}
}
// Skip if any null values exist in the expression
if hasNullValues {
continue
}
err = evalTxn.Rollback(ctx)
exists, err := og.scanBool(ctx, evalTxn, query.String())
if err != nil {
skipConstraint, err := handleEvalTxnError(err)
if err != nil {
return false, generatedCodes, err
}
if skipConstraint {
continue
}
}
err = evalTxn.Commit(ctx)
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -1079,12 +1112,13 @@ func (og *operationGenerator) violatesFkConstraints(
childColumnName := constraint[3]

// If self referential, there cannot be a violation.
if parentTableSchema == tableName.Schema() && parentTableName == tableName.Object() && parentColumnName == childColumnName {
parentAndChildAreSame := parentTableSchema == tableName.Schema() && parentTableName == tableName.Object()
if parentAndChildAreSame && parentColumnName == childColumnName {
continue
}

violation, err := og.violatesFkConstraintsHelper(
ctx, tx, columnNameToIndexMap, parentTableSchema, parentTableName, parentColumnName, childColumnName, row,
ctx, tx, columnNameToIndexMap, parentTableSchema, parentTableName, parentColumnName, tableName.String(), childColumnName, parentAndChildAreSame, row, rows,
)
if err != nil {
return false, err
Expand All @@ -1104,20 +1138,61 @@ func (og *operationGenerator) violatesFkConstraintsHelper(
ctx context.Context,
tx pgx.Tx,
columnNameToIndexMap map[string]int,
parentTableSchema, parentTableName, parentColumn, childColumn string,
parentTableSchema, parentTableName, parentColumn, childTableName, childColumn string,
parentAndChildAreSameTable bool,
row []string,
allRows [][]string,
) (bool, error) {

// If the value to insert in the child column is NULL and the column default is NULL, then it is not possible to have a fk violation.
childValue := row[columnNameToIndexMap[childColumn]]
if childValue == "NULL" {
return false, nil
}

// If the parent and child are the same table, then any rows in an existing
// insert may satisfy the same constraint.
var parentAndChildSameQueryColumns []string
if parentAndChildAreSameTable {
colsInfo, err := og.getTableColumns(ctx, tx, childTableName, false)
if err != nil {
return false, err
}
// Put values to be inserted into a column name to value map to simplify lookups.
columnsToValues := map[string]string{}
for name, idx := range columnNameToIndexMap {
columnsToValues[name] = row[idx]
}
colIdx := 0
for idx, colInfo := range colsInfo {
if colInfo.name == parentColumn {
colIdx = idx
break
}
}
for _, otherRow := range allRows {
parentValueInSameInsert := otherRow[columnNameToIndexMap[parentColumn]]
// If the parent column is generated, spend time to generate the value.
if colsInfo[colIdx].generated {
var err error
parentValueInSameInsert, err = og.generateColumn(ctx, tx, colsInfo[colIdx], columnsToValues)
if err != nil {
return false, err
}
}
parentAndChildSameQueryColumns = append(parentAndChildSameQueryColumns,
fmt.Sprintf("%s = %s", parentValueInSameInsert, childValue))
}
}
checkSharedParentChildRows := ""
if len(parentAndChildSameQueryColumns) > 0 {
checkSharedParentChildRows = fmt.Sprintf("true = ANY (ARRAY [%s]) OR",
strings.Join(parentAndChildSameQueryColumns, ","))
}
return og.scanBool(ctx, tx, fmt.Sprintf(`
SELECT count(*) = 0 from %s.%s
WHERE %s = %s
`, parentTableSchema, parentTableName, parentColumn, childValue))
SELECT %s count(*) = 0 from %s.%s
WHERE %s = (%s)
`,
checkSharedParentChildRows, parentTableSchema, parentTableName, parentColumn, childValue))
}

func (og *operationGenerator) columnIsInDroppingIndex(
Expand Down
4 changes: 2 additions & 2 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ var opWeights = []int{
setColumnNotNull: 1,
setColumnType: 0, // Disabled and tracked with #66662.
survive: 1,
insertRow: 1, // Temporarily reduced because of #80820
validate: 2, // validate twice more often
insertRow: 10, // Temporarily reduced because of #80820
validate: 2, // validate twice more often
}

// adjustOpWeightsForActiveVersion adjusts the weights for the active cockroach
Expand Down

0 comments on commit c030b8b

Please sign in to comment.