Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
55511: importccl: moved bundle import logic from planning to execution phase r=adityamaru,miretskiy a=mokaixu

Previously, the logic for parsing schemas of Bundle forms such as postgresql dumps and mysql
dumps was located in the planning phase for the import statement.

This was inadequate because for large bundle imports where schema parsing takes
up to an hour, there was no signal to the user that something was taking place.

To address this, the logic for schema parsing for bundle formats was moved from the
planning phase to the job execution phase. In the job execution phase, a RunningStatus
was added to indicate to the user that the schema is being parsed.

Release note: None

Resolves: cockroachdb#48598 

Co-authored-by: Monica Xu <[email protected]>
  • Loading branch information
craig[bot] and mokaixu committed Oct 25, 2020
2 parents 8aceac3 + 28117d0 commit 9fd5351
Show file tree
Hide file tree
Showing 3 changed files with 504 additions and 373 deletions.
229 changes: 161 additions & 68 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ const (
// as either an inline JSON schema, or an external schema URI.
avroSchema = "schema"
avroSchemaURI = "schema_uri"

// RunningStatusImportBundleParseSchema indicates to the user that a bundle format
// schema is being parsed
runningStatusImportBundleParseSchema jobs.RunningStatus = "parsing schema on Import Bundle"
)

var importOptionExpectValues = map[string]sql.KVStringOptValidate{
Expand Down Expand Up @@ -343,7 +347,6 @@ func importPlanHook(
}

table := importStmt.Table

var parentID, parentSchemaID descpb.ID
if table != nil {
// TODO: As part of work for #34240, we should be operating on
Expand Down Expand Up @@ -689,44 +692,13 @@ func importPlanHook(
seqVals := make(map[descpb.ID]int64)

if importStmt.Bundle {
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, files[0], p.User())
if err != nil {
return err
}
defer store.Close()

raw, err := store.ReadFile(ctx, "")
if err != nil {
return err
}
defer raw.Close()
reader, err := decompressingReader(raw, files[0], format.Compression)
if err != nil {
return err
}
defer reader.Close()

var match string
// If we target a single table, populate details with one entry of tableName.
if table != nil {
match = table.ObjectName.String()
}

fks := fkHandler{skip: skipFKs, allowed: true, resolver: make(fkResolver)}
switch format.Format {
case roachpb.IOFileFormat_Mysqldump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, match, fks, seqVals)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, match, parentID, walltime, fks, int(format.PgDump.MaxRowSize))
default:
return errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
if err != nil {
return err
}
if tableDescs == nil && table != nil {
return errors.Errorf("table definition not found for %q", table.ObjectName.String())
tableDetails = make([]jobspb.ImportDetails_Table, 1)
tableDetails[0] = jobspb.ImportDetails_Table{
Name: table.ObjectName.String(),
IsNew: true,
}
}
} else {
if table == nil {
Expand Down Expand Up @@ -761,20 +733,29 @@ func importPlanHook(
if err != nil {
return err
}
tableDescs = []*tabledesc.Mutable{tbl}
descStr, err := importJobDescription(p, importStmt, create.Defs, filenamePatterns, opts)
if err != nil {
return err
}
jobDesc = descStr
}

for _, tbl := range tableDescs {
// For reasons relating to #37691, we disallow user defined types in
// the standard IMPORT case.
for _, col := range tbl.Columns {
if col.Type.UserDefined() {
return errors.Newf("IMPORT cannot be used with user defined types; use IMPORT INTO instead")
tableDescs = []*tabledesc.Mutable{tbl}
for _, tbl := range tableDescs {
// For reasons relating to #37691, we disallow user defined types in
// the standard IMPORT case.
for _, col := range tbl.Columns {
if col.Type.UserDefined() {
return errors.Newf("IMPORT cannot be used with user defined types; use IMPORT INTO instead")
}
}
}

tableDetails = make([]jobspb.ImportDetails_Table, len(tableDescs))
for i := range tableDescs {
tableDetails[i] = jobspb.ImportDetails_Table{
Desc: tableDescs[i].TableDesc(),
SeqVal: seqVals[tableDescs[i].ID],
IsNew: true,
}
}
}
Expand All @@ -786,15 +767,6 @@ func importPlanHook(
hint := errors.WithHint(err, "create the table with CREATE TABLE and use IMPORT INTO instead")
return hint
}

tableDetails = make([]jobspb.ImportDetails_Table, len(tableDescs))
for i := range tableDescs {
tableDetails[i] = jobspb.ImportDetails_Table{
Desc: tableDescs[i].TableDesc(),
SeqVal: seqVals[tableDescs[i].ID],
IsNew: true,
}
}
}

telemetry.CountBucketed("import.files", int64(len(files)))
Expand Down Expand Up @@ -823,13 +795,14 @@ func importPlanHook(
// connExecutor somehow.

importDetails := jobspb.ImportDetails{
URIs: files,
Format: format,
ParentID: parentID,
Tables: tableDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
URIs: files,
Format: format,
ParentID: parentID,
Tables: tableDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
}

// Prepare the protected timestamp record.
Expand Down Expand Up @@ -1192,12 +1165,136 @@ func (r *importResumer) prepareTableDescsForIngestion(
return err
}

// parseAndCreateBundleTableDescs parses and creates the table
// descriptors for bundle formats.
func parseAndCreateBundleTableDescs(
ctx context.Context,
p sql.PlanHookState,
details jobspb.ImportDetails,
seqVals map[descpb.ID]int64,
skipFKs bool,
parentID descpb.ID,
files []string,
format roachpb.IOFileFormat,
walltime int64,
) ([]*tabledesc.Mutable, error) {

var tableDescs []*tabledesc.Mutable
var tableName string

// A single table entry in the import job details when importing a bundle format
// indicates that we are performing a single table import.
// This info is populated during the planning phase.
if len(details.Tables) > 0 {
tableName = details.Tables[0].Name
}

store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, files[0], p.User())
if err != nil {
return tableDescs, err
}
defer store.Close()

raw, err := store.ReadFile(ctx, "")
if err != nil {
return tableDescs, err
}
defer raw.Close()
reader, err := decompressingReader(raw, files[0], format.Compression)
if err != nil {
return tableDescs, err
}
defer reader.Close()

fks := fkHandler{skip: skipFKs, allowed: true, resolver: make(fkResolver)}
switch format.Format {
case roachpb.IOFileFormat_Mysqldump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize))
default:
return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}

if err != nil {
return tableDescs, err
}

if tableDescs == nil && len(details.Tables) > 0 {
return tableDescs, errors.Errorf("table definition not found for %q", tableName)
}

return tableDescs, err
}

func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs interface{}) error {
p := phs.(sql.PlanHookState)
seqVals := make(map[descpb.ID]int64)
details := r.job.Details().(jobspb.ImportDetails)
skipFKs := details.SkipFKs
parentID := details.ParentID
files := details.URIs
format := details.Format

if details.ParseBundleSchema {
if err := r.job.RunningStatus(ctx, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
return runningStatusImportBundleParseSchema, nil
}); err != nil {
return errors.Wrapf(err, "failed to update running status of job %d", errors.Safe(*r.job.ID()))
}

var tableDescs []*tabledesc.Mutable
var err error
walltime := p.ExecCfg().Clock.Now().WallTime

if tableDescs, err = parseAndCreateBundleTableDescs(
ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime); err != nil {
return err
}

tableDetails := make([]jobspb.ImportDetails_Table, len(tableDescs))
for i := range tableDescs {
tableDetails[i] = jobspb.ImportDetails_Table{
Desc: tableDescs[i].TableDesc(),
SeqVal: seqVals[tableDescs[i].ID],
IsNew: true,
}
}
details.Tables = tableDetails

for _, tbl := range tableDescs {
// For reasons relating to #37691, we disallow user defined types in
// the standard IMPORT case.
for _, col := range tbl.Columns {
if col.Type.UserDefined() {
return errors.Newf("IMPORT cannot be used with user defined types; use IMPORT INTO instead")
}
}
}
// Prevent job from redoing schema parsing and table desc creation
// on subsequent resumptions.
details.ParseBundleSchema = false
if err := r.job.WithTxn(nil).SetDetails(ctx, details); err != nil {
return err
}
}
return nil
}

// Resume is part of the jobs.Resumer interface.
func (r *importResumer) Resume(
ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums,
) error {
details := r.job.Details().(jobspb.ImportDetails)
p := phs.(sql.PlanHookState)
if err := r.parseBundleSchemaIfNeeded(ctx, p); err != nil {
return err
}

details := r.job.Details().(jobspb.ImportDetails)
files := details.URIs
format := details.Format
ptsID := details.ProtectedTimestampRecord
if ptsID != nil && !r.testingKnobs.ignoreProtectedTimestamps {
if err := p.ExecCfg().ProtectedTimestampProvider.Verify(ctx, *ptsID); err != nil {
Expand Down Expand Up @@ -1269,11 +1366,7 @@ func (r *importResumer) Resume(
}
}

walltime := details.Walltime
files := details.URIs
format := details.Format

res, err := sql.DistIngest(ctx, p, r.job, tables, files, format, walltime, r.testingKnobs.alwaysFlushJobProgress)
res, err := sql.DistIngest(ctx, p, r.job, tables, files, format, details.Walltime, r.testingKnobs.alwaysFlushJobProgress)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 9fd5351

Please sign in to comment.