Skip to content

Commit

Permalink
importccl: prototype support for IMPORT INTO
Browse files Browse the repository at this point in the history
This adds a prototype of incremental IMPORT, allowing importing CSV data
into an existing table as opposed to only into as new table with current
IMPORT.

Unlike traditional IMPORT which takes a specification of the table to
create, this takes a reference to an existing table into which it will
import data. Initially only CSV data, importing into a single table, is
supported (the SQL dumpfiles are typically dumps of an entire table so
it seems likess likely that we need to support them here for now).

Since the actual bulk ingestion is done via non-transactional AddSSTable
commands, the table must be taken offline during ingestion. The IMPORT
job begins by schema-changing the table to an offline 'IMPORTING' state
that should prevent leasing it and moves it back to public when it
finishes (on success or failure, unlike a newly table created table
which is usually rolled back via a drop on failure).

Release note: none.
  • Loading branch information
dt committed May 11, 2019
1 parent e0d3084 commit d074ad6
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 124 deletions.
290 changes: 166 additions & 124 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -129,7 +130,7 @@ func importPlanHook(
}

var createFileFn func() (string, error)
if !importStmt.Bundle && importStmt.CreateDefs == nil {
if !importStmt.Bundle && !importStmt.Into && importStmt.CreateDefs == nil {
createFileFn, err = p.TypeAsString(importStmt.CreateFile, "IMPORT")
if err != nil {
return nil, nil, nil, false, err
Expand Down Expand Up @@ -383,163 +384,191 @@ func importPlanHook(
}
}

var tableDescs []*sqlbase.TableDescriptor
var jobDesc string
seqVals := make(map[sqlbase.ID]int64)
if importStmt.Bundle {
store, err := storageccl.ExportStorageFromURI(ctx, files[0], p.ExecCfg().Settings)
if err != nil {
return err
}
defer store.Close()
raw, err := store.ReadFile(ctx, "")
if err != nil {
return err
}
defer raw.Close()
reader, err := decompressingReader(raw, files[0], format.Compression)
if err != nil {
return err
}
defer reader.Close()
var tableDetails []jobspb.ImportDetails_Table
jobDesc, err := importJobDescription(importStmt, nil, files, opts)
if err != nil {
return err
}

var match string
if table != nil {
match = table.TableName.String()
}
fks := fkHandler{skip: skipFKs, allowed: true, resolver: make(fkResolver)}
switch format.Format {
case roachpb.IOFileFormat_Mysqldump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, defaultCSVTableID, parentID, match, fks, seqVals)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(reader, evalCtx, p.ExecCfg().Settings, match, parentID, walltime, fks, int(format.PgDump.MaxRowSize))
default:
return errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
if importStmt.Into {
found, err := p.ResolveMutableTableDescriptor(ctx, table, true, sql.ResolveRequireTableDesc)
if err != nil {
return err
}
if tableDescs == nil && table != nil {
return errors.Errorf("table definition not found for %q", table.TableName.String())
}

descStr, err := importJobDescription(importStmt, nil, files, opts)
if err != nil {
// Take the table offline for import.
importing := found.TableDescriptor
importing.State = sqlbase.TableDescriptor_IMPORTING
importing.Version++
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return errors.Wrap(
txn.CPut(ctx, sqlbase.MakeDescMetadataKey(found.TableDescriptor.ID),
sqlbase.WrapDescriptor(&importing), sqlbase.WrapDescriptor(&found.TableDescriptor),
), "another operation is currently operating on the table")
}); err != nil {
return err
}
jobDesc = descStr
// NB: we need to wait for the schema change to show up before it is safe
// to ingest, but rather than do that here, we'll wait for this schema
// change in the job's Resume hook, before running the ingest phase. That
// will hopefully let it get a head start on propagating, plus the more we
// do in the job, the more that has automatic cleanup on rollback.

// TODO(dt): configure target cols from ImportStmt.IntoCols
tableDetails = []jobspb.ImportDetails_Table{{Desc: &importing, IsNew: false}}
} else {
if table == nil {
return errors.Errorf("non-bundle format %q should always have a table name", importStmt.FileFormat)
}
var create *tree.CreateTable
if importStmt.CreateDefs != nil {
create = &tree.CreateTable{
Table: *importStmt.Table,
Defs: importStmt.CreateDefs,
var tableDescs []*sqlbase.TableDescriptor
seqVals := make(map[sqlbase.ID]int64)

if importStmt.Bundle {
store, err := storageccl.ExportStorageFromURI(ctx, files[0], p.ExecCfg().Settings)
if err != nil {
return err
}
} else {
filename, err := createFileFn()
defer store.Close()
raw, err := store.ReadFile(ctx, "")
if err != nil {
return err
}
create, err = readCreateTableFromStore(ctx, filename, p.ExecCfg().Settings)
defer raw.Close()
reader, err := decompressingReader(raw, files[0], format.Compression)
if err != nil {
return err
}
defer reader.Close()

if table.TableName != create.Table.TableName {
return errors.Errorf(
"importing table %s, but file specifies a schema for table %s",
table.TableName, create.Table.TableName,
)
var match string
if table != nil {
match = table.TableName.String()
}
}

tbl, err := MakeSimpleTableDescriptor(
ctx, p.ExecCfg().Settings, create, parentID, defaultCSVTableID, NoFKs, walltime)
if err != nil {
return err
}
tableDescs = []*sqlbase.TableDescriptor{tbl.TableDesc()}
descStr, err := importJobDescription(importStmt, create.Defs, files, opts)
if err != nil {
return err
}
jobDesc = descStr
}

if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
for _, tableDesc := range tableDescs {
if err := backupccl.CheckTableExists(ctx, txn, parentID, tableDesc.Name); err != nil {
return err
fks := fkHandler{skip: skipFKs, allowed: true, resolver: make(fkResolver)}
switch format.Format {
case roachpb.IOFileFormat_Mysqldump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, defaultCSVTableID, parentID, match, fks, seqVals)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(reader, evalCtx, p.ExecCfg().Settings, match, parentID, walltime, fks, int(format.PgDump.MaxRowSize))
default:
return errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
}
// Verification steps have passed, generate a new table ID if we're
// restoring. We do this last because we want to avoid calling
// GenerateUniqueDescID if there's any kind of error above.
// Reserving a table ID now means we can avoid the rekey work during restore.
tableRewrites := make(backupccl.TableRewriteMap)
newSeqVals := make(map[sqlbase.ID]int64, len(seqVals))
for _, tableDesc := range tableDescs {
id, err := sql.GenerateUniqueDescID(ctx, p.ExecCfg().DB)
if err != nil {
return err
}
tableRewrites[tableDesc.ID] = &jobspb.RestoreDetails_TableRewrite{
TableID: id,
ParentID: parentID,
if tableDescs == nil && table != nil {
return errors.Errorf("table definition not found for %q", table.TableName.String())
}
if v, ok := seqVals[tableDesc.ID]; ok {
newSeqVals[id] = v
} else {
if table == nil {
return errors.Errorf("non-bundle format %q should always have a table name", importStmt.FileFormat)
}
}
seqVals = newSeqVals
if err := backupccl.RewriteTableDescs(tableDescs, tableRewrites, ""); err != nil {
return err
}
var create *tree.CreateTable
if importStmt.CreateDefs != nil {
create = &tree.CreateTable{
Table: *importStmt.Table,
Defs: importStmt.CreateDefs,
}
} else {
filename, err := createFileFn()
if err != nil {
return err
}
create, err = readCreateTableFromStore(ctx, filename, p.ExecCfg().Settings)
if err != nil {
return err
}

for i := range tableDescs {
tableDescs[i].State = sqlbase.TableDescriptor_IMPORTING
if table.TableName != create.Table.TableName {
return errors.Errorf(
"importing table %s, but file specifies a schema for table %s",
table.TableName, create.Table.TableName,
)
}
}

tbl, err := MakeSimpleTableDescriptor(
ctx, p.ExecCfg().Settings, create, parentID, defaultCSVTableID, NoFKs, walltime)
if err != nil {
return err
}
tableDescs = []*sqlbase.TableDescriptor{tbl.TableDesc()}
descStr, err := importJobDescription(importStmt, create.Defs, files, opts)
if err != nil {
return err
}
jobDesc = descStr
}

seqValKVs := make([]roachpb.KeyValue, 0, len(seqVals))
for i := range tableDescs {
if v, ok := seqVals[tableDescs[i].ID]; ok && v != 0 {
key, val, err := sql.MakeSequenceKeyVal(tableDescs[i], v, false)
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
for _, tableDesc := range tableDescs {
if err := backupccl.CheckTableExists(ctx, txn, parentID, tableDesc.Name); err != nil {
return err
}
}
// Verification steps have passed, generate a new table ID if we're
// restoring. We do this last because we want to avoid calling
// GenerateUniqueDescID if there's any kind of error above.
// Reserving a table ID now means we can avoid the rekey work during restore.
tableRewrites := make(backupccl.TableRewriteMap)
newSeqVals := make(map[sqlbase.ID]int64, len(seqVals))
for _, tableDesc := range tableDescs {
id, err := sql.GenerateUniqueDescID(ctx, p.ExecCfg().DB)
if err != nil {
return err
}
kv := roachpb.KeyValue{Key: key}
kv.Value.SetInt(val)
seqValKVs = append(seqValKVs, kv)
tableRewrites[tableDesc.ID] = &jobspb.RestoreDetails_TableRewrite{
TableID: id,
ParentID: parentID,
}
if v, ok := seqVals[tableDesc.ID]; ok {
newSeqVals[id] = v
}
}
seqVals = newSeqVals
if err := backupccl.RewriteTableDescs(tableDescs, tableRewrites, ""); err != nil {
return err
}
}

// Write the new TableDescriptors and flip the namespace entries over to
// them. After this call, any queries on a table will be served by the newly
// imported data.
if err := backupccl.WriteTableDescs(ctx, txn, nil, tableDescs, p.User(), p.ExecCfg().Settings, seqValKVs); err != nil {
return pgerror.Wrapf(err, pgerror.CodeDataExceptionError, "creating tables")
}
for i := range tableDescs {
tableDescs[i].State = sqlbase.TableDescriptor_IMPORTING
}

// TODO(dt): we should be creating the job with this txn too. Once a job
// is created, the contract is it does its own, explicit cleanup on
// failure (i.e. not just txn rollback) but everything up to and including
// the creation of the job *should* be a single atomic txn. As-is, if we
// fail to creat the job after committing this txn, we've leaving broken
// descs and namespace records.
seqValKVs := make([]roachpb.KeyValue, 0, len(seqVals))
for i := range tableDescs {
if v, ok := seqVals[tableDescs[i].ID]; ok && v != 0 {
key, val, err := sql.MakeSequenceKeyVal(tableDescs[i], v, false)
if err != nil {
return err
}
kv := roachpb.KeyValue{Key: key}
kv.Value.SetInt(val)
seqValKVs = append(seqValKVs, kv)
}
}

return nil
}); err != nil {
return err
}
// Write the new TableDescriptors and flip the namespace entries over to
// them. After this call, any queries on a table will be served by the newly
// imported data.
if err := backupccl.WriteTableDescs(ctx, txn, nil, tableDescs, p.User(), p.ExecCfg().Settings, seqValKVs); err != nil {
return pgerror.Wrapf(err, pgerror.CodeDataExceptionError, "creating tables")
}

// TODO(dt): we should be creating the job with this txn too. Once a job
// is created, the contract is it does its own, explicit cleanup on
// failure (i.e. not just txn rollback) but everything up to and including
// the creation of the job *should* be a single atomic txn. As-is, if we
// fail to creat the job after committing this txn, we've leaving broken
// descs and namespace records.

tableDetails := make([]jobspb.ImportDetails_Table, 0, len(tableDescs))
for _, tbl := range tableDescs {
tableDetails = append(tableDetails, jobspb.ImportDetails_Table{Desc: tbl, SeqVal: seqVals[tbl.ID], IsNew: true})
return nil
}); err != nil {
return err
}

tableDetails = make([]jobspb.ImportDetails_Table, len(tableDescs))
for i := range tableDescs {
tableDetails[i] = jobspb.ImportDetails_Table{Desc: tableDescs[i], SeqVal: seqVals[tableDescs[i].ID], IsNew: true}
}
}

telemetry.CountBucketed("import.files", int64(len(files)))
Expand Down Expand Up @@ -694,6 +723,7 @@ func (r *importResumer) Resume(
}

tables := make(map[string]*sqlbase.TableDescriptor, len(details.Tables))
requiresSchemaChangeDelay := false
if details.Tables != nil {
for _, i := range details.Tables {
if i.Name != "" {
Expand All @@ -703,6 +733,18 @@ func (r *importResumer) Resume(
} else {
return errors.Errorf("invalid table specification")
}
if !i.IsNew {
requiresSchemaChangeDelay = true
}
}
}

if requiresSchemaChangeDelay {
// TODO(dt): update job status to mention waiting for tables to go offline.
for _, i := range details.Tables {
if _, err := p.ExecCfg().LeaseManager.WaitForOneVersion(ctx, i.Desc.ID, retry.Options{}); err != nil {
return err
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/planhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type PlanHookState interface {
EvalAsOfTimestamp(asOf tree.AsOfClause) (hlc.Timestamp, error)
ResolveUncachedDatabaseByName(
ctx context.Context, dbName string, required bool) (*UncachedDatabaseDescriptor, error)
ResolveMutableTableDescriptor(
ctx context.Context, tn *ObjectName, required bool, requiredType ResolveRequiredType,
) (table *MutableTableDescriptor, err error)
}

// AddPlanHook adds a hook used to short-circuit creating a planNode from a
Expand Down

0 comments on commit d074ad6

Please sign in to comment.