Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: add oversample option to configure oversampling #27341

Merged
merged 1 commit into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
importOptionTransform = "transform"
importOptionSSTSize = "sstsize"
importOptionDecompress = "decompress"
importOptionOversample = "oversample"

pgCopyDelimiter = "delimiter"
pgCopyNull = "nullif"
Expand All @@ -77,6 +78,7 @@ var importOptionExpectValues = map[string]bool{
importOptionTransform: true,
importOptionSSTSize: true,
importOptionDecompress: true,
importOptionOversample: true,

pgMaxRowSize: true,
}
Expand Down Expand Up @@ -537,6 +539,14 @@ func importPlanHook(
}
sstSize = sz
}
var oversample int64
if override, ok := opts[importOptionOversample]; ok {
os, err := strconv.ParseInt(override, 10, 64)
if err != nil {
return err
}
sstSize = os
}

if override, ok := opts[importOptionDecompress]; ok {
found := false
Expand Down Expand Up @@ -675,6 +685,7 @@ func importPlanHook(
Tables: tableDetails,
BackupPath: transform,
SSTSize: sstSize,
Oversample: oversample,
Walltime: walltime,
},
Progress: jobspb.ImportProgress{},
Expand All @@ -698,6 +709,7 @@ func doDistributedCSVTransform(
format roachpb.IOFileFormat,
walltime int64,
sstSize int64,
oversample int64,
) error {
evalCtx := p.ExtendedEvalContext()

Expand Down Expand Up @@ -726,6 +738,7 @@ func doDistributedCSVTransform(
format,
walltime,
sstSize,
oversample,
func(descs map[sqlbase.ID]*sqlbase.TableDescriptor) (sql.KeyRewriter, error) {
return storageccl.MakeKeyRewriter(descs)
},
Expand Down Expand Up @@ -837,6 +850,7 @@ func (r *importResumer) Resume(
parentID := details.ParentID
sstSize := details.SSTSize
format := details.Format
oversample := details.Oversample

if sstSize == 0 {
// The distributed importer will correctly chunk up large ranges into
Expand Down Expand Up @@ -864,7 +878,7 @@ func (r *importResumer) Resume(
}

return doDistributedCSVTransform(
ctx, job, files, p, parentID, tables, transform, format, walltime, sstSize,
ctx, job, files, p, parentID, tables, transform, format, walltime, sstSize, oversample,
)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ d
data: `\x0`,
err: "odd length hex string",
},
{
name: "oversample",
create: `i int`,
with: `WITH oversample = '100'`,
typ: "CSV",
data: "1",
},

// MySQL OUTFILE
{
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/distsql_plan_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func LoadCSV(
format roachpb.IOFileFormat,
walltime int64,
splitSize int64,
oversample int64,
makeRewriter func(map[sqlbase.ID]*sqlbase.TableDescriptor) (KeyRewriter, error),
) error {
ctx = log.WithLogTag(ctx, "import-distsql", nil)
Expand Down Expand Up @@ -250,7 +251,7 @@ func LoadCSV(
var parsedTables map[sqlbase.ID]*sqlbase.TableDescriptor
if samples == nil {
var err error
samples, parsedTables, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, &planCtx, inputSpecs, sstSpecs)
samples, parsedTables, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, oversample, &planCtx, inputSpecs, sstSpecs)
if err != nil {
return err
}
Expand Down Expand Up @@ -478,6 +479,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
nodes []roachpb.NodeID,
from []string,
splitSize int64,
oversample int64,
planCtx *planningCtx,
csvSpecs []*distsqlrun.ReadImportDataSpec,
sstSpecs []distsqlrun.SSTWriterSpec,
Expand All @@ -494,7 +496,9 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
// factor of 3 would sample the KV with probability 100/10000000 since we are
// sampling at 3x. Since we're now getting back 3x more samples than needed,
// we only use every 1/(oversample), or 1/3 here, in our final sampling.
const oversample = 3
if oversample < 1 {
oversample = 3
}
sampleSize := splitSize / oversample
if sampleSize > math.MaxInt32 {
return nil, nil, errors.Errorf("SST size must fit in an int32: %d", splitSize)
Expand Down Expand Up @@ -572,7 +576,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
}

sampleCount++
sampleCount = sampleCount % oversample
sampleCount = sampleCount % int(oversample)
if sampleCount == 0 {
k, err := keys.EnsureSafeSplitKey(key)
if err != nil {
Expand Down
Loading