Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: support unchecked foreign keys in IMPORT PGDUMP #27425

Merged
merged 1 commit into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/csv_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func TestMakeSimpleTableDescriptorErrors(t *testing.T) {
},
{
stmt: "create table a (i int references b (id))",
error: `foreign keys not supported: FOREIGN KEY \(i\) REFERENCES b \(id\)`,
error: `table "b" not found`,
},
{
stmt: "create table a (i int, constraint a foreign key (i) references c (id))",
error: `foreign keys not supported: CONSTRAINT a FOREIGN KEY \(i\) REFERENCES c \(id\)`,
error: `table "c" not found`,
},
{
stmt: `create table a (
Expand All @@ -71,7 +71,7 @@ func TestMakeSimpleTableDescriptorErrors(t *testing.T) {
if !ok {
t.Fatal("expected CREATE TABLE statement in table file")
}
_, err = MakeSimpleTableDescriptor(ctx, st, create, defaultCSVParentID, defaultCSVTableID, 0)
_, err = MakeSimpleTableDescriptor(ctx, st, create, defaultCSVParentID, defaultCSVTableID, nil, 0)
if !testutils.IsError(err, tc.error) {
t.Fatalf("expected %v, got %+v", tc.error, err)
}
Expand Down
97 changes: 91 additions & 6 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -129,6 +130,7 @@ func MakeSimpleTableDescriptor(
create *tree.CreateTable,
parentID,
tableID sqlbase.ID,
otherTables fkResolver,
walltime int64,
) (*sqlbase.TableDescriptor, error) {
sql.HoistConstraints(create)
Expand All @@ -153,7 +155,8 @@ func MakeSimpleTableDescriptor(
return nil, errors.Errorf("computed columns not supported: %s", tree.AsString(def))
}
case *tree.ForeignKeyConstraintTableDef:
return nil, errors.Errorf("foreign keys not supported: %s", tree.AsString(def))
n := tree.MakeTableName("", tree.Name(def.Table.TableNameReference.String()))
def.Table.TableNameReference = &n
default:
return nil, errors.Errorf("unsupported table definition: %s", tree.AsString(def))
}
Expand All @@ -163,28 +166,42 @@ func MakeSimpleTableDescriptor(
CtxProvider: ctxProvider{ctx},
Sequence: &importSequenceOperators{},
}
affected := make(map[sqlbase.ID]*sqlbase.TableDescriptor)
tableDesc, err := sql.MakeTableDesc(
ctx,
nil, /* txn */
nil, /* vt */
otherTables,
st,
create,
parentID,
tableID,
hlc.Timestamp{WallTime: walltime},
sqlbase.NewDefaultPrivilegeDescriptor(),
nil, /* affected */
affected,
&semaCtx,
&evalCtx,
)
if err != nil {
return nil, err
}
// If the table had a FK, it was put into the ADD state and its references were marked as validated. We need to undo those changes.
tableDesc.State = sqlbase.TableDescriptor_PUBLIC
if err := tableDesc.ForeachNonDropIndex(func(idx *sqlbase.IndexDescriptor) error {
if idx.ForeignKey.IsSet() {
idx.ForeignKey.Validity = sqlbase.ConstraintValidity_Unvalidated
}
return nil
}); err != nil {
return nil, err
}

return &tableDesc, nil
}

var errSequenceOperators = errors.New("sequence operations unsupported")
var (
errSequenceOperators = errors.New("sequence operations unsupported")
errSchemaResolver = errors.New("schema resolver unsupported")
)

// Implements the tree.SequenceOperators interface.
type importSequenceOperators struct {
Expand Down Expand Up @@ -230,6 +247,58 @@ func (so *importSequenceOperators) SetSequenceValue(
return errSequenceOperators
}

type fkResolver map[string]*sqlbase.TableDescriptor

var _ sql.SchemaResolver = fkResolver{}

// Implements the sql.SchemaResolver interface.
func (r fkResolver) Txn() *client.Txn {
return nil
}

// Implements the sql.SchemaResolver interface.
func (r fkResolver) LogicalSchemaAccessor() sql.SchemaAccessor {
return nil
}

// Implements the sql.SchemaResolver interface.
func (r fkResolver) CurrentDatabase() string {
return ""
}

// Implements the sql.SchemaResolver interface.
func (r fkResolver) CurrentSearchPath() sessiondata.SearchPath {
return sessiondata.SearchPath{}
}

// Implements the sql.SchemaResolver interface.
func (r fkResolver) CommonLookupFlags(ctx context.Context, required bool) sql.CommonLookupFlags {
return sql.CommonLookupFlags{}
}

// Implements the sql.SchemaResolver interface.
func (r fkResolver) ObjectLookupFlags(ctx context.Context, required bool) sql.ObjectLookupFlags {
return sql.ObjectLookupFlags{}
}

// Implements the tree.TableNameExistingResolver interface.
func (r fkResolver) LookupObject(
ctx context.Context, dbName, scName, obName string,
) (found bool, objMeta tree.NameResolutionResult, err error) {
tbl, ok := r[obName]
if ok {
return true, tbl, nil
}
return false, nil, errors.Errorf("table %q not found in tables previously defined in the same IMPORT", obName)
}

// Implements the tree.TableNameTargetResolver interface.
func (r fkResolver) LookupSchema(
ctx context.Context, dbName, scName string,
) (found bool, scMeta tree.SchemaMeta, err error) {
return false, nil, errSchemaResolver
}

const csvDatabaseName = "csv"

func finalizeCSVBackup(
Expand Down Expand Up @@ -626,7 +695,7 @@ func importPlanHook(
}

tbl, err := MakeSimpleTableDescriptor(
ctx, p.ExecCfg().Settings, create, parentID, defaultCSVTableID, walltime)
ctx, p.ExecCfg().Settings, create, parentID, defaultCSVTableID, nil, walltime)
if err != nil {
return err
}
Expand Down Expand Up @@ -659,12 +728,28 @@ func importPlanHook(
// restoring. We do this last because we want to avoid calling
// GenerateUniqueDescID if there's any kind of error above.
// Reserving a table ID now means we can avoid the rekey work during restore.
tableRewrites := make(map[sqlbase.ID]sqlbase.ID)
for _, tableDesc := range tableDescs {
tableDesc.ID, err = sql.GenerateUniqueDescID(ctx, p.ExecCfg().DB)
tableRewrites[tableDesc.ID], err = sql.GenerateUniqueDescID(ctx, p.ExecCfg().DB)
if err != nil {
return err
}
}
// Now that we have all the new table IDs rewrite them along with FKs.
for _, tableDesc := range tableDescs {
tableDesc.ID = tableRewrites[tableDesc.ID]
if err := tableDesc.ForeachNonDropIndex(func(idx *sqlbase.IndexDescriptor) error {
if idx.ForeignKey.IsSet() {
idx.ForeignKey.Table = tableRewrites[idx.ForeignKey.Table]
}
for i, fk := range idx.ReferencedBy {
idx.ReferencedBy[i].Table = tableRewrites[fk.Table]
}
return nil
}); err != nil {
return err
}
}
}

tableDetails := make([]jobspb.ImportDetails_Table, 0, len(tableDescs))
Expand Down
102 changes: 93 additions & 9 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ func TestImportData(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE d; USE d`)

tests := []struct {
name string
create string
with string
typ string
data string
err string
query map[string][][]string
name string
create string
with string
typ string
data string
err string
cleanup string
query map[string][][]string
}{
{
name: "duplicate unique index key",
Expand Down Expand Up @@ -412,6 +413,37 @@ COPY t (a, b, c) FROM stdin;
`,
err: "expected 2 columns, got 3",
},
{
name: "fk",
typ: "PGDUMP",
data: testPgdumpFk,
query: map[string][][]string{
`SHOW TABLES`: {{"cities"}, {"weather"}},
`SELECT city FROM cities`: {{"Berkeley"}},
`SELECT city FROM weather`: {{"Berkeley"}},

`SELECT dependson_name
FROM crdb_internal.backward_dependencies
`: {{"weather_city_fkey"}},

`SELECT create_statement
FROM crdb_internal.create_statements
WHERE descriptor_name in ('cities', 'weather')
ORDER BY descriptor_name
`: {{testPgdumpCreateCities}, {testPgdumpCreateWeather}},

// Verify the constraint is unvalidated.
`SHOW CONSTRAINTS FROM weather
`: {{"weather", "weather_city_fkey", "FOREIGN KEY", "FOREIGN KEY (city) REFERENCES cities (city)", "false"}},
},
cleanup: `DROP TABLE cities, weather`,
},
{
name: "fk unreferenced",
typ: "TABLE weather FROM PGDUMP",
data: testPgdumpFk,
err: `table "cities" not found`,
},

// Error
{
Expand All @@ -438,7 +470,7 @@ COPY t (a, b, c) FROM stdin;

for _, tc := range tests {
t.Run(fmt.Sprintf("%s: %s", tc.typ, tc.name), func(t *testing.T) {
sqlDB.Exec(t, `DROP TABLE IF EXISTS d.t`)
sqlDB.Exec(t, `DROP TABLE IF EXISTS d.t, t`)
var q string
if tc.create != "" {
q = fmt.Sprintf(`IMPORT TABLE d.t (%s) %s DATA ($1) %s`, tc.create, tc.typ, tc.with)
Expand All @@ -454,6 +486,9 @@ COPY t (a, b, c) FROM stdin;
for query, res := range tc.query {
sqlDB.CheckQueryResults(t, query, res)
}
if tc.cleanup != "" {
sqlDB.Exec(t, tc.cleanup)
}
})
}

Expand All @@ -465,6 +500,55 @@ COPY t (a, b, c) FROM stdin;
})
}

const (
testPgdumpCreateCities = `CREATE TABLE cities (
city STRING(80) NOT NULL,
CONSTRAINT cities_pkey PRIMARY KEY (city ASC),
FAMILY "primary" (city)
)`
testPgdumpCreateWeather = `CREATE TABLE weather (
city STRING(80) NULL,
temp_lo INTEGER NULL,
temp_hi INTEGER NULL,
prcp REAL NULL,
date DATE NULL,
CONSTRAINT weather_city_fkey FOREIGN KEY (city) REFERENCES cities (city),
INDEX weather_auto_index_weather_city_fkey (city ASC),
FAMILY "primary" (city, temp_lo, temp_hi, prcp, date, rowid)
)`
testPgdumpFk = `
CREATE TABLE cities (
city character varying(80) NOT NULL
);

ALTER TABLE cities OWNER TO postgres;

CREATE TABLE weather (
city character varying(80),
temp_lo integer,
temp_hi integer,
prcp real,
date date
);

ALTER TABLE weather OWNER TO postgres;

COPY cities (city) FROM stdin;
Berkeley
\.

COPY weather (city, temp_lo, temp_hi, prcp, date) FROM stdin;
Berkeley 45 53 0 1994-11-28
\.

ALTER TABLE ONLY cities
ADD CONSTRAINT cities_pkey PRIMARY KEY (city);

ALTER TABLE ONLY weather
ADD CONSTRAINT weather_city_fkey FOREIGN KEY (city) REFERENCES cities(city);
`
)

// TODO(dt): switch to a helper in sampledataccl.
func makeCSVData(
t testing.TB, in string, numFiles, rowsPerFile int,
Expand Down Expand Up @@ -1136,7 +1220,7 @@ func BenchmarkConvertRecord(b *testing.B) {
create := stmt.(*tree.CreateTable)
st := cluster.MakeTestingClusterSettings()

tableDesc, err := MakeSimpleTableDescriptor(ctx, st, create, sqlbase.ID(100), sqlbase.ID(100), 1)
tableDesc, err := MakeSimpleTableDescriptor(ctx, st, create, sqlbase.ID(100), sqlbase.ID(100), nil, 1)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func descForTable(t *testing.T, create string, parent, id sqlbase.ID) *sqlbase.T
t.Fatal(err)
}
stmt := parsed.(*tree.CreateTable)
table, err := MakeSimpleTableDescriptor(context.TODO(), nil, stmt, parent, id, testEvalCtx.StmtTimestamp.UnixNano())
table, err := MakeSimpleTableDescriptor(context.TODO(), nil, stmt, parent, id, nil, testEvalCtx.StmtTimestamp.UnixNano())
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,25 @@ func readPostgresCreateTable(
// we'd have to delete the index and row and modify the column family. This
// is much easier and probably safer too.
createTbl := make(map[string]*tree.CreateTable)
// We need to run MakeSimpleTableDescriptor on tables in the same order as
// seen in the SQL file to guarantee that dependencies exist before being used
// (for FKs and sequences).
var tableOrder []string
ps := newPostgreStream(input, max)
for {
stmt, err := ps.Next()
if err == io.EOF {
ret := make([]*sqlbase.TableDescriptor, 0, len(createTbl))
for _, create := range createTbl {
seenDescs := make(fkResolver)
for _, name := range tableOrder {
create := createTbl[name]
if create != nil {
id := sqlbase.ID(int(defaultCSVTableID) + len(ret))
desc, err := MakeSimpleTableDescriptor(evalCtx.Ctx(), settings, create, parentID, id, walltime)
desc, err := MakeSimpleTableDescriptor(evalCtx.Ctx(), settings, create, parentID, id, seenDescs, walltime)
if err != nil {
return nil, err
}
seenDescs[desc.Name] = desc
ret = append(ret, desc)
}
}
Expand Down Expand Up @@ -210,6 +217,7 @@ func readPostgresCreateTable(
} else {
createTbl[name] = stmt
}
tableOrder = append(tableOrder, name)
case *tree.CreateIndex:
name, err := getTableName(stmt.Table)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (t *partitioningTest) parse() error {
st := cluster.MakeTestingClusterSettings()
const parentID, tableID = keys.MinUserDescID, keys.MinUserDescID + 1
t.parsed.tableDesc, err = importccl.MakeSimpleTableDescriptor(
ctx, st, createTable, parentID, tableID, hlc.UnixNano())
ctx, st, createTable, parentID, tableID, nil, hlc.UnixNano())
if err != nil {
return err
}
Expand Down
Loading