Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
27341: importccl: add oversample option to configure oversampling r=mjibson a=mjibson

On clusters with many nodes and smallish disks, doing an IMPORT that is
within an order of magnitude of the free space of the disks can lead to
disk fullness. This happens because the sampling algorithm (by design)
has a relatively high standard deviation in its error rate. We currently
target split points at a few hundred megs, but the standard deviation
on sampling means that a single node could easily be a few of those
away from the target mean, resulting in overscheduling data to a node
during shuffle.

Introduce an oversample option that can be set to some higher number. This
reduces the standard deviation of the error, resulting in each node having
more similar portion of the data, but does not have a major impact on
the rest of the performance.

Release note (sql change): Add an `oversample` WITH option to IMPORT to
decrease variance in data distributionduring processing.

27345: importccl: verify number of columns during IMPORT PGDUMP r=mjibson a=mjibson

Also make error messages more consistent between PGDUMP and PGCOPY.

Release note (bug fix): Correctly verify number of COPY columns during
IMPORT PGDUMP.

27438: mkrelease: statically link windows release binaries r=mberhault a=benesch

This got lost in 38899a8. Static linking is necessary to bundle MinGW-only
libraries into the Windows binary. The binary is otherwise only
executable from within a MinGW environment.

Fix #27435.

Release note: None

---

At this point I deserve to win an award for most broken refactor.

Co-authored-by: Matt Jibson <[email protected]>
Co-authored-by: Nikhil Benesch <[email protected]>
  • Loading branch information
3 people committed Jul 12, 2018
4 parents 6140fd4 + 6bf632d + 749a7b2 + c60f229 commit da43ca5
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 113 deletions.
1 change: 1 addition & 0 deletions build/builder/mkrelease.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ case "${1-}" in
XGOARCH=amd64
XCMAKE_SYSTEM_NAME=Windows
TARGET_TRIPLE=x86_64-w64-mingw32
LDFLAGS=-static
SUFFIX=-windows-6.2-amd64
) ;;

Expand Down
16 changes: 15 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
importOptionTransform = "transform"
importOptionSSTSize = "sstsize"
importOptionDecompress = "decompress"
importOptionOversample = "oversample"

pgCopyDelimiter = "delimiter"
pgCopyNull = "nullif"
Expand All @@ -77,6 +78,7 @@ var importOptionExpectValues = map[string]bool{
importOptionTransform: true,
importOptionSSTSize: true,
importOptionDecompress: true,
importOptionOversample: true,

pgMaxRowSize: true,
}
Expand Down Expand Up @@ -537,6 +539,14 @@ func importPlanHook(
}
sstSize = sz
}
var oversample int64
if override, ok := opts[importOptionOversample]; ok {
os, err := strconv.ParseInt(override, 10, 64)
if err != nil {
return err
}
sstSize = os
}

if override, ok := opts[importOptionDecompress]; ok {
found := false
Expand Down Expand Up @@ -675,6 +685,7 @@ func importPlanHook(
Tables: tableDetails,
BackupPath: transform,
SSTSize: sstSize,
Oversample: oversample,
Walltime: walltime,
},
Progress: jobspb.ImportProgress{},
Expand All @@ -698,6 +709,7 @@ func doDistributedCSVTransform(
format roachpb.IOFileFormat,
walltime int64,
sstSize int64,
oversample int64,
) error {
evalCtx := p.ExtendedEvalContext()

Expand Down Expand Up @@ -726,6 +738,7 @@ func doDistributedCSVTransform(
format,
walltime,
sstSize,
oversample,
func(descs map[sqlbase.ID]*sqlbase.TableDescriptor) (sql.KeyRewriter, error) {
return storageccl.MakeKeyRewriter(descs)
},
Expand Down Expand Up @@ -837,6 +850,7 @@ func (r *importResumer) Resume(
parentID := details.ParentID
sstSize := details.SSTSize
format := details.Format
oversample := details.Oversample

if sstSize == 0 {
// The distributed importer will correctly chunk up large ranges into
Expand Down Expand Up @@ -864,7 +878,7 @@ func (r *importResumer) Resume(
}

return doDistributedCSVTransform(
ctx, job, files, p, parentID, tables, transform, format, walltime, sstSize,
ctx, job, files, p, parentID, tables, transform, format, walltime, sstSize, oversample,
)
}

Expand Down
59 changes: 58 additions & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestImportData(t *testing.T) {
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `CREATE DATABASE d`)
sqlDB.Exec(t, `CREATE DATABASE d; USE d`)

tests := []struct {
name string
Expand Down Expand Up @@ -167,6 +167,13 @@ d
data: `\x0`,
err: "odd length hex string",
},
{
name: "oversample",
create: `i int`,
with: `WITH oversample = '100'`,
typ: "CSV",
data: "1",
},

// MySQL OUTFILE
{
Expand Down Expand Up @@ -305,6 +312,20 @@ d
with: `WITH max_row_size = '5B'`,
err: "line too long",
},
{
name: "not enough values",
typ: "PGCOPY",
create: "a INT, b INT",
data: `1`,
err: "expected 2 values, got 1",
},
{
name: "too many values",
typ: "PGCOPY",
create: "a INT, b INT",
data: "1\t2\t3",
err: "expected 2 values, got 3",
},

// Postgres DUMP
{
Expand Down Expand Up @@ -355,6 +376,42 @@ d
with: `WITH max_row_size = '5B'`,
err: "line too long",
},
{
name: "not enough values",
typ: "PGDUMP",
data: `
CREATE TABLE d.t (a INT, b INT);
COPY t (a, b) FROM stdin;
1
\.
`,
err: "expected 2 values, got 1",
},
{
name: "too many values",
typ: "PGDUMP",
data: `
CREATE TABLE d.t (a INT, b INT);
COPY t (a, b) FROM stdin;
1 2 3
\.
`,
err: "expected 2 values, got 3",
},
{
name: "too many cols",
typ: "PGDUMP",
data: `
CREATE TABLE d.t (a INT, b INT);
COPY t (a, b, c) FROM stdin;
1 2 3
\.
`,
err: "expected 2 columns, got 3",
},

// Error
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_pgcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (d *pgCopyReader) readFile(
return makeRowErr(inputName, count, "%s", err)
}
if len(row) != len(d.conv.visibleColTypes) {
return makeRowErr(inputName, count, "expected %d columns, found %d", len(d.conv.visibleColTypes), len(row))
return makeRowErr(inputName, count, "expected %d values, got %d", len(d.conv.visibleColTypes), len(row))
}
for i, s := range row {
if s == nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (m *pgDumpReader) readFile(
}
if conv != nil {
if expected, got := len(conv.visibleCols), len(i.Columns); expected != got {
return errors.Errorf("expected %d values, got %d", expected, got)
return errors.Errorf("expected %d columns, got %d", expected, got)
}
for colI, col := range i.Columns {
if string(col) != conv.visibleCols[colI].Name {
Expand All @@ -398,6 +398,9 @@ func (m *pgDumpReader) readFile(
}
switch row := row.(type) {
case copyData:
if expected, got := len(conv.visibleCols), len(row); expected != got {
return errors.Errorf("expected %d values, got %d", expected, got)
}
for i, s := range row {
if s == nil {
conv.datums[i] = tree.DNull
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/distsql_plan_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func LoadCSV(
format roachpb.IOFileFormat,
walltime int64,
splitSize int64,
oversample int64,
makeRewriter func(map[sqlbase.ID]*sqlbase.TableDescriptor) (KeyRewriter, error),
) error {
ctx = log.WithLogTag(ctx, "import-distsql", nil)
Expand Down Expand Up @@ -250,7 +251,7 @@ func LoadCSV(
var parsedTables map[sqlbase.ID]*sqlbase.TableDescriptor
if samples == nil {
var err error
samples, parsedTables, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, &planCtx, inputSpecs, sstSpecs)
samples, parsedTables, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, oversample, &planCtx, inputSpecs, sstSpecs)
if err != nil {
return err
}
Expand Down Expand Up @@ -478,6 +479,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
nodes []roachpb.NodeID,
from []string,
splitSize int64,
oversample int64,
planCtx *planningCtx,
csvSpecs []*distsqlrun.ReadImportDataSpec,
sstSpecs []distsqlrun.SSTWriterSpec,
Expand All @@ -494,7 +496,9 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
// factor of 3 would sample the KV with probability 100/10000000 since we are
// sampling at 3x. Since we're now getting back 3x more samples than needed,
// we only use every 1/(oversample), or 1/3 here, in our final sampling.
const oversample = 3
if oversample < 1 {
oversample = 3
}
sampleSize := splitSize / oversample
if sampleSize > math.MaxInt32 {
return nil, nil, errors.Errorf("SST size must fit in an int32: %d", splitSize)
Expand Down Expand Up @@ -572,7 +576,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
}

sampleCount++
sampleCount = sampleCount % oversample
sampleCount = sampleCount % int(oversample)
if sampleCount == 0 {
k, err := keys.EnsureSafeSplitKey(key)
if err != nil {
Expand Down
Loading

0 comments on commit da43ca5

Please sign in to comment.