Skip to content

Commit

Permalink
sql: support the ADD COLUMN ... REFERENCES syntax
Browse files Browse the repository at this point in the history
Fixes #32917.

This PR adds support for the add column references
statement by allowing the foreign key building code
to use columns and indexes added in the current txn.
The schema changer already understands how to add
the combination of the three in the same transaction.

Release note (sql change): This PR adds support for the
`ALTER TABLE ... ADD COLUMN ... REFERENCES ...` syntax
for tables that are empty.
  • Loading branch information
rohany committed Apr 27, 2020
1 parent ef2b897 commit 6ccf57e
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 54 deletions.
10 changes: 1 addition & 9 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,6 @@ func (n *alterTableNode) startExec(params runParams) error {
switch t := cmd.(type) {
case *tree.AlterTableAddColumn:
d := t.ColumnDef
if d.HasFKConstraint() {
return unimplemented.NewWithIssue(32917,
"adding a REFERENCES constraint while also adding a column via ALTER not supported")
}
version := params.ExecCfg().Settings.Version.ActiveVersionOrEmpty(params.ctx)
if supported, err := isTypeSupportedInVersion(version, d.Type); err != nil {
return err
Expand Down Expand Up @@ -317,12 +313,8 @@ func (n *alterTableNode) startExec(params runParams) error {

case *tree.ForeignKeyConstraintTableDef:
for _, colName := range d.FromCols {
col, err := n.tableDesc.FindActiveColumnByName(string(colName))
col, err := n.tableDesc.FindActiveOrNewColumnByName(colName)
if err != nil {
if _, dropped, inactiveErr := n.tableDesc.FindColumnByName(colName); inactiveErr == nil && !dropped {
return unimplemented.NewWithIssue(32917,
"adding a REFERENCES constraint while the column is being added not supported")
}
return err
}

Expand Down
36 changes: 18 additions & 18 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,16 +608,16 @@ func ResolveFK(
validationBehavior tree.ValidationBehavior,
settings *cluster.Settings,
) error {
originColumnIDs := make(sqlbase.ColumnIDs, len(d.FromCols))
originColumns := make([]*sqlbase.ColumnDescriptor, len(d.FromCols))
for i, col := range d.FromCols {
col, _, err := tbl.FindColumnByName(col)
col, err := tbl.FindActiveOrNewColumnByName(col)
if err != nil {
return err
}
if err := col.CheckCanBeFKRef(); err != nil {
return err
}
originColumnIDs[i] = col.ID
originColumns[i] = col
}

target, err := ResolveMutableExistingObject(ctx, sc, &d.Table, true /*required*/, ResolveRequireTableDesc)
Expand Down Expand Up @@ -658,11 +658,6 @@ func ResolveFK(
}
}

srcCols, err := tbl.FindActiveColumnsByNames(d.FromCols)
if err != nil {
return err
}

targetColNames := d.ToCols
// If no columns are specified, attempt to default to PK.
if len(targetColNames) == 0 {
Expand All @@ -677,14 +672,14 @@ func ResolveFK(
return err
}

if len(targetCols) != len(srcCols) {
if len(targetCols) != len(originColumns) {
return pgerror.Newf(pgcode.Syntax,
"%d columns must reference exactly %d columns in referenced table (found %d)",
len(srcCols), len(srcCols), len(targetCols))
len(originColumns), len(originColumns), len(targetCols))
}

for i := range srcCols {
if s, t := srcCols[i], targetCols[i]; !s.Type.Equivalent(&t.Type) {
for i := range originColumns {
if s, t := originColumns[i], targetCols[i]; !s.Type.Equivalent(&t.Type) {
return pgerror.Newf(pgcode.DatatypeMismatch,
"type of %q (%s) does not match foreign key %q.%q (%s)",
s.Name, s.Type.String(), target.Name, t.Name, t.Type.String())
Expand Down Expand Up @@ -723,7 +718,7 @@ func ResolveFK(
// Don't add a SET NULL action on an index that has any column that is NOT
// NULL.
if d.Actions.Delete == tree.SetNull || d.Actions.Update == tree.SetNull {
for _, sourceColumn := range srcCols {
for _, sourceColumn := range originColumns {
if !sourceColumn.Nullable {
col := qualifyFKColErrorWithDB(ctx, txn, tbl.TableDesc(), sourceColumn.Name)
return pgerror.Newf(pgcode.InvalidForeignKey,
Expand All @@ -736,7 +731,7 @@ func ResolveFK(
// Don't add a SET DEFAULT action on an index that has any column that has
// a DEFAULT expression of NULL and a NOT NULL constraint.
if d.Actions.Delete == tree.SetDefault || d.Actions.Update == tree.SetDefault {
for _, sourceColumn := range srcCols {
for _, sourceColumn := range originColumns {
// Having a default expression of NULL, and a constraint of NOT NULL is a
// contradiction and should never be allowed.
if sourceColumn.DefaultExpr == nil && !sourceColumn.Nullable {
Expand All @@ -749,10 +744,15 @@ func ResolveFK(
}
}

originColumnIDs := make(sqlbase.ColumnIDs, len(originColumns))
for i, col := range originColumns {
originColumnIDs[i] = col.ID
}
var legacyOriginIndexID sqlbase.IndexID
// Search for an index on the origin table that matches. If one doesn't exist,
// we create one automatically if the table to alter is new or empty.
originIdx, err := sqlbase.FindFKOriginIndex(tbl.TableDesc(), originColumnIDs)
// we create one automatically if the table to alter is new or empty. We also
// search if an index for the set of columns was created in this transaction.
originIdx, err := sqlbase.FindFKOriginIndexInTxn(tbl, originColumnIDs)
if err == nil {
// If there was no error, we found a suitable index.
legacyOriginIndexID = originIdx.ID
Expand All @@ -775,7 +775,7 @@ func ResolveFK(
return pgerror.Newf(pgcode.ForeignKeyViolation,
"foreign key requires an existing index on columns %s", colNames.String())
}
id, err := addIndexForFK(tbl, srcCols, constraintName, ts)
id, err := addIndexForFK(tbl, originColumns, constraintName, ts)
if err != nil {
return err
}
Expand Down Expand Up @@ -825,7 +825,7 @@ func ResolveFK(
// that will support using `srcCols` as the referencing (src) side of an FK.
func addIndexForFK(
tbl *sqlbase.MutableTableDescriptor,
srcCols []sqlbase.ColumnDescriptor,
srcCols []*sqlbase.ColumnDescriptor,
constraintName string,
ts FKTableState,
) (sqlbase.IndexID, error) {
Expand Down
162 changes: 147 additions & 15 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,6 @@ ALTER TABLE t ADD i INT DEFAULT 1 CHECK (i < g)
statement error pq: validation of CHECK "i > 0" failed on row:.* g=1.* i=0
ALTER TABLE t ADD i INT AS (g - 1) STORED CHECK (i > 0)

statement error adding a REFERENCES constraint while also adding a column via ALTER not supported
ALTER TABLE t ADD f INT UNIQUE REFERENCES other

query TTTTB
SHOW CONSTRAINTS FROM t
----
Expand Down Expand Up @@ -926,18 +923,6 @@ CREATE TABLE t32917_2 (b INT PRIMARY KEY)
statement ok
INSERT INTO t32917_2 VALUES (1), (2), (3)

statement ok
BEGIN

statement ok
ALTER TABLE t32917_2 ADD c INT UNIQUE DEFAULT 4

statement error adding a REFERENCES constraint while the column is being added not supported
ALTER TABLE t32917_2 ADD CONSTRAINT fk_c_a FOREIGN KEY (c) references t32917 (a)

statement ok
ROLLBACK

# Test SET NOT NULL
statement ok
CREATE TABLE t (a INT)
Expand Down Expand Up @@ -1135,3 +1120,150 @@ CREATE TABLE t25045 (x INT, y INT AS (x+1) STORED)

statement error pq: column \"x\" is referenced by computed column \"y\"
ALTER TABLE t25045 DROP COLUMN x

subtest add_col_references

statement ok
DROP TABLE IF EXISTS t1, t2;
CREATE TABLE t1 (x INT PRIMARY KEY);
CREATE TABLE t2 (y INT)

statement ok
ALTER TABLE t2 ADD COLUMN x INT REFERENCES t1 (x)

statement ok
INSERT INTO t1 VALUES (1)

statement error pq: insert on table "t2" violates foreign key constraint "fk_x_ref_t1"
INSERT INTO t2 VALUES (2, 2)

statement ok
INSERT INTO t2 VALUES (1, 1)

# Error out trying to add a column with a foreign key on a non-empty table.
statement error pq: foreign key requires an existing index on columns \("z"\)
ALTER TABLE t2 ADD COLUMN z INT REFERENCES t1 (x)

# Check that the foreign key was indeed added.
query TT
SHOW CREATE t2
----
t2 CREATE TABLE t2 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
INDEX t2_auto_index_fk_x_ref_t1 (x ASC),
FAMILY "primary" (y, rowid, x)
)

# Test that only one index gets created when adding a column
# with references and unique.
statement ok
CREATE TABLE t3 (y INT)

statement ok
ALTER TABLE t3 ADD COLUMN x INT UNIQUE REFERENCES t1 (x)

query TT
SHOW CREATE t3
----
t3 CREATE TABLE t3 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
UNIQUE INDEX t3_x_key (x ASC),
FAMILY "primary" (y, rowid, x)
)

# We allowed the foreign key validation code to look into the mutations
# list to validate what columns / indexes can be used for foreign keys.
# Ensure that we still have the correct restrictions.
statement ok
DROP TABLE t1, t2 CASCADE;
CREATE TABLE t1 (x INT PRIMARY KEY);
CREATE TABLE t2 (x INT, y INT, INDEX i (x))

statement error pq: column \"x\" does not exist
BEGIN;
ALTER TABLE t2 DROP COLUMN x;
ALTER TABLE t2 ADD FOREIGN KEY (x) REFERENCES t1(x);

statement ok
ROLLBACK

statement ok
INSERT INTO t2 VALUES (1, 2)

statement error pq: foreign key requires an existing index on columns \("x"\)
BEGIN;
DROP INDEX t2@i;
ALTER TABLE t2 ADD FOREIGN KEY (x) REFERENCES t1(x)

statement ok
ROLLBACK

# Test using ADD COL REFERENCES in a self referencing constraint.
statement ok
DROP TABLE t1 CASCADE;
CREATE TABLE t1 (x INT PRIMARY KEY);
ALTER TABLE t1 ADD COLUMN x2 INT REFERENCES t1 (x)

query TT
SHOW CREATE t1
----
t1 CREATE TABLE t1 (
x INT8 NOT NULL,
x2 INT8 NULL,
CONSTRAINT "primary" PRIMARY KEY (x ASC),
CONSTRAINT fk_x2_ref_t1 FOREIGN KEY (x2) REFERENCES t1(x),
INDEX t1_auto_index_fk_x2_ref_t1 (x2 ASC),
FAMILY "primary" (x, x2)
)

statement error pq: insert on table "t1" violates foreign key constraint "fk_x2_ref_t1"
INSERT INTO t1 VALUES (1, 2)

# Test ADD COL REFERENCES on a new table in the same txn.
statement ok
DROP TABLE t1, t2 CASCADE

statement ok
BEGIN;
CREATE TABLE t1 (x INT PRIMARY KEY);
CREATE TABLE t2 (y INT);
ALTER TABLE t2 ADD COLUMN x INT REFERENCES t1 (x);
COMMIT

query TT
SHOW CREATE t2
----
t2 CREATE TABLE t2 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
INDEX t2_auto_index_fk_x_ref_t1 (x ASC),
FAMILY "primary" (y, rowid, x)
)

# Test that we can also add a column and then an FK in the same txn.
statement ok
DROP TABLE t1, t2 CASCADE;

statement ok
BEGIN;
CREATE TABLE t1 (x INT PRIMARY KEY);
CREATE TABLE t2 (y INT);
ALTER TABLE t2 ADD COLUMN x INT;
ALTER TABLE t2 ADD FOREIGN KEY (x) REFERENCES t1 (x);
COMMIT

query TT
SHOW CREATE t2
----
t2 CREATE TABLE t2 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
INDEX t2_auto_index_fk_x_ref_t1 (x ASC),
FAMILY "primary" (y, rowid, x)
)
6 changes: 0 additions & 6 deletions pkg/sql/logictest/testdata/logic_test/fk
Original file line number Diff line number Diff line change
Expand Up @@ -2525,12 +2525,6 @@ BEGIN; ALTER TABLE parentid ADD id INT NOT NULL AS (k + 2) STORED; ALTER TABLE c
statement ok
ROLLBACK;

statement error adding a REFERENCES constraint while the column is being added not supported
BEGIN; ALTER TABLE childid ADD id2 INT UNIQUE NOT NULL DEFAULT 0; ALTER TABLE childid ADD CONSTRAINT fk_id FOREIGN KEY (id2) REFERENCES parentid (k);

statement ok
ROLLBACK;

subtest dont_check_nulls
# Make sure that nulls are never checked while executing FK constraints.

Expand Down
6 changes: 0 additions & 6 deletions pkg/sql/logictest/testdata/logic_test/fk_opt
Original file line number Diff line number Diff line change
Expand Up @@ -2769,12 +2769,6 @@ BEGIN; ALTER TABLE parentid ADD id INT NOT NULL AS (k + 2) STORED; ALTER TABLE c
statement ok
ROLLBACK;

statement error adding a REFERENCES constraint while the column is being added not supported
BEGIN; ALTER TABLE childid ADD id2 INT UNIQUE NOT NULL DEFAULT 0; ALTER TABLE childid ADD CONSTRAINT fk_id FOREIGN KEY (id2) REFERENCES parentid (k);

statement ok
ROLLBACK;

subtest dont_check_nulls
# Make sure that nulls are never checked while executing FK constraints.

Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/sem/tree/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ func (node *AlterTable) HoistAddColumnConstraints() {
)
}
d.CheckExprs = nil
if d.HasFKConstraint() {
var targetCol NameList
if d.References.Col != "" {
targetCol = append(targetCol, d.References.Col)
}
fk := &ForeignKeyConstraintTableDef{
Table: *d.References.Table,
FromCols: NameList{d.Name},
ToCols: targetCol,
Name: d.References.ConstraintName,
Actions: d.References.Actions,
Match: d.References.Match,
}
constraint := &AlterTableAddConstraint{
ConstraintDef: fk,
ValidationBehavior: ValidationDefault,
}
normalizedCmds = append(normalizedCmds, constraint)
d.References.Table = nil
}
}
}
node.Cmds = normalizedCmds
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/sqlbase/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,30 @@ func (desc *TableDescriptor) FindColumnByName(name tree.Name) (*ColumnDescriptor
return nil, false, NewUndefinedColumnError(string(name))
}

// FindActiveOrNewColumnByName finds the column with the specified name.
// It returns either an active column or a column that was added in the
// same transaction that is currently running.
func (desc *MutableTableDescriptor) FindActiveOrNewColumnByName(
name tree.Name,
) (*ColumnDescriptor, error) {
for i := range desc.Columns {
c := &desc.Columns[i]
if c.Name == string(name) {
return c, nil
}
}
currentMutationID := desc.ClusterVersion.NextMutationID
for i := range desc.Mutations {
mut := &desc.Mutations[i]
if col := mut.GetColumn(); col != nil &&
mut.MutationID == currentMutationID &&
mut.Direction == DescriptorMutation_ADD {
return col, nil
}
}
return nil, NewUndefinedColumnError(string(name))
}

// ColumnIdxMap returns a map from Column ID to the ordinal position of that
// column.
func (desc *TableDescriptor) ColumnIdxMap() map[ColumnID]int {
Expand Down
Loading

0 comments on commit 6ccf57e

Please sign in to comment.