Skip to content

Commit

Permalink
importccl: Modify row_id generation for IMPORT from CSV.
Browse files Browse the repository at this point in the history
This change was motivated by a bug in which consecutive IMPORT INTO
queries into a table without an explicit PK, and from unique data
sources would overwrite instead of appending data.
This is because row_id generation was based on fileIndex and
rowNum which created colliding PKs across query runs.

To fix this we add the timestamp at which IMPORT INTO is run
at a 10-microsecond granularity to the rowNum.

Release note: None
  • Loading branch information
adityamaru27 committed Jul 29, 2019
1 parent 1b934a6 commit 318d98d
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 337 deletions.
45 changes: 0 additions & 45 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ func importPlanHook(
if err != nil {
return err
}
<<<<<<< HEAD

// Validate target columns.
var intoCols []string
Expand Down Expand Up @@ -429,10 +428,6 @@ func importPlanHook(
}

tableDetails = []jobspb.ImportDetails_Table{{Desc: &found.TableDescriptor, IsNew: false, TargetCols: intoCols}}
=======
// TODO(dt): configure target cols from ImportStmt.IntoCols
tableDetails = []jobspb.ImportDetails_Table{{Desc: &found.TableDescriptor, IsNew: false}}
>>>>>>> 91221dc63d... importccl: Move the desc preparation from planHook to IMPORT job.
} else {
var tableDescs []*sqlbase.TableDescriptor
seqVals := make(map[sqlbase.ID]int64)
Expand Down Expand Up @@ -702,16 +697,6 @@ func prepareNewTableDescsForIngestion(
return nil, errors.Wrapf(err, "creating tables")
}

<<<<<<< HEAD
=======
// TODO(dt): we should be creating the job with this txn too. Once a job
// is created, the contract is it does its own, explicit cleanup on
// failure (i.e. not just txn rollback) but everything up to and including
// the creation of the job *should* be a single atomic txn. As-is, if we
// fail to creat the job after committing this txn, we've leaving broken
// descs and namespace records.

>>>>>>> 91221dc63d... importccl: Move the desc preparation from planHook to IMPORT job.
return tableDescs, nil
}

Expand Down Expand Up @@ -766,11 +751,7 @@ func (r *importResumer) prepareTableDescsForIngestion(
) error {
importDetails := details
var hasExistingTables bool
<<<<<<< HEAD
err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
=======
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
>>>>>>> 91221dc63d... importccl: Move the desc preparation from planHook to IMPORT job.
var err error
newTableDescToIdx := make(map[*sqlbase.TableDescriptor]int, len(importDetails.Tables))
var newTableDescs []jobspb.ImportDetails_Table
Expand Down Expand Up @@ -817,24 +798,11 @@ func (r *importResumer) prepareTableDescsForIngestion(
}

// Update the job once all descs have been prepared for ingestion.
<<<<<<< HEAD
err = r.job.WithTxn(txn).SetDetails(ctx, importDetails)
return err
})

return err
=======
if err = r.job.WithTxn(txn).SetDetails(ctx, importDetails); err != nil {
return err
}

return nil
}); err != nil {
return err
}

return nil
>>>>>>> 91221dc63d... importccl: Move the desc preparation from planHook to IMPORT job.
}

// Resume is part of the jobs.Resumer interface.
Expand Down Expand Up @@ -863,12 +831,7 @@ func (r *importResumer) Resume(
sstSize = storageccl.MaxImportBatchSize(r.settings) * 5
}

<<<<<<< HEAD
tables := make(map[string]*distsqlpb.ReadImportDataSpec_ImportTable, len(details.Tables))
=======
tables := make(map[string]*sqlbase.TableDescriptor, len(details.Tables))

>>>>>>> 91221dc63d... importccl: Move the desc preparation from planHook to IMPORT job.
if details.Tables != nil {
// Skip prepare stage on job resumption, if it has already been completed.
if !details.PrepareComplete {
Expand All @@ -880,19 +843,11 @@ func (r *importResumer) Resume(
details = r.job.Details().(jobspb.ImportDetails)
}

<<<<<<< HEAD
for _, i := range details.Tables {
if i.Name != "" {
tables[i.Name] = &distsqlpb.ReadImportDataSpec_ImportTable{Desc: i.Desc, TargetCols: i.TargetCols}
} else if i.Desc != nil {
tables[i.Desc.Name] = &distsqlpb.ReadImportDataSpec_ImportTable{Desc: i.Desc, TargetCols: i.TargetCols}
=======
for _, tbl := range details.Tables {
if tbl.Name != "" {
tables[tbl.Name] = tbl.Desc
} else if tbl.Desc != nil {
tables[tbl.Desc.Name] = tbl.Desc
>>>>>>> 91221dc63d... importccl: Move the desc preparation from planHook to IMPORT job.
} else {
return errors.Errorf("invalid table specification")
}
Expand Down
78 changes: 48 additions & 30 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -1095,14 +1094,14 @@ func TestImportCSVStmt(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(`IMPORT TABLE pk.t (a INT8, b STRING) CSV DATA (%s)`, strings.Join(testFiles.files, ", ")))
// Verify the rowids are being generated as expected.
sqlDB.CheckQueryResults(t,
`SELECT count(*), sum(rowid) FROM pk.t`,
`SELECT count(*) FROM pk.t`,
sqlDB.QueryStr(t, `
SELECT count(*), sum(rowid) FROM
(SELECT file + (rownum << $3) as rowid FROM
SELECT count(*) FROM
(SELECT * FROM
(SELECT generate_series(0, $1 - 1) file),
(SELECT generate_series(1, $2) rownum)
)
`, numFiles, rowsPerFile, builtins.NodeIDBits),
`, numFiles, rowsPerFile),
)
})

Expand Down Expand Up @@ -1505,20 +1504,17 @@ func TestImportIntoCSV(t *testing.T) {
}
}

var existingRowsRowIDSum int
sqlDB.QueryRow(t, `SELECT sum(rowid) FROM pk.t`).Scan(&existingRowsRowIDSum)

sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO pk.t (a, b) CSV DATA (%s)`, strings.Join(testFiles.files, ", ")))
// Verify the rowids are being generated as expected.
sqlDB.CheckQueryResults(t,
`SELECT count(*), sum(rowid) FROM pk.t`,
`SELECT count(*) FROM pk.t`,
sqlDB.QueryStr(t, `
SELECT count(*) + $5, sum(rowid) + $4 FROM
(SELECT file + (rownum << $3) as rowid FROM
(SELECT generate_series(0, $1 - 1) file),
(SELECT generate_series(1, $2) rownum)
)
`, numFiles, rowsPerFile, builtins.NodeIDBits, existingRowsRowIDSum, numExistingRows),
SELECT count(*) + $3 FROM
(SELECT * FROM
(SELECT generate_series(0, $1 - 1) file),
(SELECT generate_series(1, $2) rownum)
)
`, numFiles, rowsPerFile, numExistingRows),
)
})

Expand Down Expand Up @@ -1658,21 +1654,6 @@ func TestImportIntoCSV(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

// Expect an error if attempting to IMPORT INTO a target list which does
// not include all the PKs of the table.
sqlDB.ExpectErr(
Expand Down Expand Up @@ -1773,6 +1754,43 @@ func TestImportIntoCSV(t *testing.T) {
})
// TODO(adityamaru): Add test for IMPORT INTO without target columns specified
// once grammar has been added.

// This tests that consecutive imports from unique data sources into an
// existing table without an explicit PK, do not overwrite each other. It
// exercises the row_id generation in IMPORT.
t.Run("multiple-import-into-without-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE multiple; USE multiple")
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}
numExistingRows := len(insert)
insertedRows := 3000

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

// Expect it to succeed with correct columns.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]))
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]))
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[2]))

// Verify correct number of rows via COUNT.
var result int
sqlDB.QueryRow(t, `SELECT count(*) FROM t`).Scan(&result)
if expect := numExistingRows + insertedRows; result != expect {
t.Fatalf("expected %d rows, got %d", expect, result)
}
})
}

func BenchmarkImport(b *testing.B) {
Expand Down
11 changes: 10 additions & 1 deletion pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"io"
"runtime"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -32,6 +33,7 @@ type csvInputReader struct {
batchSize int
batch csvRecord
opts roachpb.CSVOptions
walltime int64
tableDesc *sqlbase.TableDescriptor
targetCols tree.NameList
expectedCols int
Expand All @@ -42,13 +44,15 @@ var _ inputConverter = &csvInputReader{}
func newCSVInputReader(
kvCh chan []roachpb.KeyValue,
opts roachpb.CSVOptions,
walltime int64,
tableDesc *sqlbase.TableDescriptor,
targetCols tree.NameList,
evalCtx *tree.EvalContext,
) *csvInputReader {
return &csvInputReader{
evalCtx: evalCtx,
opts: opts,
walltime: walltime,
kvCh: kvCh,
expectedCols: len(tableDesc.VisibleColumns()),
tableDesc: tableDesc,
Expand Down Expand Up @@ -173,6 +177,9 @@ func (c *csvInputReader) convertRecordWorker(ctx context.Context) error {
panic("uninitialized session data")
}

const precision = uint64(10 * time.Microsecond)
timestamp := uint64(c.walltime) / precision

for batch := range c.recordCh {
for batchIdx, record := range batch.r {
rowNum := int64(batch.rowOffset + batchIdx)
Expand All @@ -196,7 +203,9 @@ func (c *csvInputReader) convertRecordWorker(ctx context.Context) error {
}
datumIdx++
}
if err := conv.Row(ctx, batch.fileIndex, rowNum); err != nil {

rowIndex := int64(timestamp) + rowNum
if err := conv.Row(ctx, batch.fileIndex, rowIndex); err != nil {
return wrapRowErr(err, batch.file, rowNum, pgcode.Uncategorized, "")
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
if isWorkload {
conv = newWorkloadReader(kvCh, singleTable, evalCtx)
} else {
conv = newCSVInputReader(kvCh, cp.spec.Format.Csv, singleTable, singleTableTargetCols, evalCtx)
conv = newCSVInputReader(kvCh, cp.spec.Format.Csv, cp.spec.WalltimeNanos, singleTable, singleTableTargetCols, evalCtx)
}
case roachpb.IOFileFormat_MysqlOutfile:
conv, err = newMysqloutfileReader(kvCh, cp.spec.Format.MysqlOut, singleTable, evalCtx)
Expand Down
Loading

0 comments on commit 318d98d

Please sign in to comment.