Skip to content

Commit

Permalink
workload: use IMPORT then BACKUP in 'fixtures make'
Browse files Browse the repository at this point in the history
Previously we used the 'transform' option on IMPORT.

However that option will soon go away as IMPORT is increasingly focused on ingestion.
Making fixtures should hopefully be rare enough that we can do a little extra work here
rather than maintain the special 'transform' mode.

Release note: none.
  • Loading branch information
dt committed May 7, 2019
1 parent 29bf8a1 commit f4a0350
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 170 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/workloadccl/cliccl/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var fixturesListCmd = workloadcli.SetCmdDefaults(&cobra.Command{
})
var fixturesMakeCmd = workloadcli.SetCmdDefaults(&cobra.Command{
Use: `make`,
Short: `regenerate and store a fixture on GCS`,
Short: `IMPORT a fixture and then store a BACKUP of it on GCS`,
})
var fixturesLoadCmd = workloadcli.SetCmdDefaults(&cobra.Command{
Use: `load`,
Expand Down
191 changes: 22 additions & 169 deletions pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"encoding/json"
"fmt"
"net/url"
"path"
"path/filepath"
"runtime"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -171,122 +169,6 @@ func GetFixture(
return fixture, err
}

type groupCSVWriter struct {
sem chan struct{}
gcs *storage.Client
config FixtureConfig
folder string
chunkSizeBytes int64

start time.Time
csvBytesWritten int64 // Only access via atomic
}

// defaultRetryOptions was copied from base because base was bringing in a lot
// of other deps and this shaves ~0.5s off the ~2s pkg/cmd/workload build time.
func defaultRetryOptions() retry.Options {
return retry.Options{
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 1 * time.Second,
Multiplier: 2,
}
}

// groupWriteCSVs creates files on GCS in the specified folder that contain the
// data for the given table and rows.
//
// Files are chunked into ~c.chunkSizeBytes or smaller. Concurrency is limited
// by c.sem. The GCS object paths to the written files are returned on
// c.pathsCh.
func (c *groupCSVWriter) groupWriteCSVs(
ctx context.Context, pathsCh chan<- string, table workload.Table, rowStart, rowEnd int,
) error {
if rowStart == rowEnd {
return nil
}

// For each table, first write out a chunk of ~c.chunkSizeBytes. If the
// table fits in one chunk, we're done, otherwise this gives an estimate for
// how many rows are needed.
var rowIdx int
if err := func() error {
select {
case <-ctx.Done():
return ctx.Err()
case c.sem <- struct{}{}:
}
defer func() { <-c.sem }()

path := path.Join(c.folder, table.Name, fmt.Sprintf(`%09d.csv`, rowStart))
const maxAttempts = 3
err := retry.WithMaxAttempts(ctx, defaultRetryOptions(), maxAttempts, func() error {
b := c.gcs.Bucket(c.config.GCSBucket)
if c.config.BillingProject != `` {
b = b.UserProject(c.config.BillingProject)
}
w := b.Object(path).NewWriter(ctx)
var err error
rowIdx, err = workload.WriteCSVRows(ctx, w, table, rowStart, rowEnd, c.chunkSizeBytes)
closeErr := w.Close()
if err != nil {
return err
}
if closeErr != nil {
return closeErr
}

pathsCh <- c.config.objectPathToURI(path)
newBytesWritten := atomic.AddInt64(&c.csvBytesWritten, w.Attrs().Size)
d := timeutil.Since(c.start)
throughput := float64(newBytesWritten) / (d.Seconds() * float64(1<<20) /* 1MiB */)
log.Infof(ctx, `wrote csv %s [%d,%d] of %d row batches (%.2f%% (%s) in %s: %.1f MB/s)`,
table.Name, rowStart, rowIdx, table.InitialRows.NumBatches,
float64(100*rowIdx)/float64(table.InitialRows.NumBatches),
humanizeutil.IBytes(newBytesWritten), d, throughput)

return nil
})
return err
}(); err != nil {
return err
}
if rowIdx >= rowEnd {
return nil
}

// If rowIdx < rowEnd, then the rows didn't all fit in one chunk. Use the
// number of rows that did fit to estimate how many chunks are needed to
// finish the table. Then break up the remaining rows into that many chunks,
// running this whole process recursively in case the distribution of row
// size is not uniform. Something like `(rowIdx - rowStart) * fudge` would
// be simpler, but this will make the chunks a more even size.
var rowStep int
{
const fudge = 0.9
additionalChunks := int(float64(rowEnd-rowIdx) / (float64(rowIdx-rowStart) * fudge))
if additionalChunks <= 0 {
additionalChunks = 1
}
rowStep = (rowEnd - rowIdx) / additionalChunks
if rowStep <= 0 {
rowStep = 1
}
}

g, gCtx := errgroup.WithContext(ctx)
for rowIdx < rowEnd {
chunkRowStart, chunkRowEnd := rowIdx, rowIdx+rowStep
if chunkRowEnd > rowEnd {
chunkRowEnd = rowEnd
}
g.Go(func() error {
return c.groupWriteCSVs(gCtx, pathsCh, table, chunkRowStart, chunkRowEnd)
})
rowIdx = chunkRowEnd
}
return g.Wait()
}

func csvServerPaths(
csvServerURL string, gen workload.Generator, table workload.Table, numNodes int,
) []string {
Expand Down Expand Up @@ -360,72 +242,43 @@ func MakeFixture(
gen workload.Generator,
filesPerNode int,
) (Fixture, error) {
const writeCSVChunkSize = 64 * 1 << 20 // 64 MB
for _, t := range gen.Tables() {
if t.InitialRows.Batch == nil {
return Fixture{}, errors.Errorf(
`make fixture is not supported for workload %s`, gen.Meta().Name,
)
}
}

fixtureFolder := generatorToGCSFolder(config, gen)
if _, err := GetFixture(ctx, gcs, config, gen); err == nil {
return Fixture{}, errors.Errorf(
`fixture %s already exists`, config.objectPathToURI(fixtureFolder))
}

writeCSVConcurrency := runtime.NumCPU()
c := &groupCSVWriter{
sem: make(chan struct{}, writeCSVConcurrency),
gcs: gcs,
config: config,
folder: fixtureFolder,
chunkSizeBytes: writeCSVChunkSize,
start: timeutil.Now(),
dbName := gen.Meta().Name
if _, err := sqlDB.Exec(`CREATE DATABASE IF NOT EXISTS ` + dbName); err != nil {
return Fixture{}, err
}
const direct, stats, csvServer = false, false, ""
if _, err := ImportFixture(ctx, sqlDB, gen, dbName, direct, filesPerNode, stats, csvServer); err != nil {
return Fixture{}, err
}

g := ctxgroup.WithContext(ctx)
for _, t := range gen.Tables() {
table := t
if t.InitialRows.Batch == nil {
return Fixture{}, errors.Errorf(
`make fixture is not supported for workload %s`, gen.Meta().Name)
}

tableCSVPathsCh := make(chan string)
g.GoCtx(func(ctx context.Context) error {
defer close(tableCSVPathsCh)
if len(config.CSVServerURL) == 0 {
startRow, endRow := 0, table.InitialRows.NumBatches
return c.groupWriteCSVs(ctx, tableCSVPathsCh, table, startRow, endRow)
}

var numNodes int
if err := sqlDB.QueryRow(numNodesQuery).Scan(&numNodes); err != nil {
return err
}
numPaths := numNodes * filesPerNode
paths := csvServerPaths(config.CSVServerURL, gen, table, numPaths)
for _, path := range paths {
tableCSVPathsCh <- path
}
return nil
})
g.GoCtx(func(ctx context.Context) error {
// NB: it's fine to loop over this channel without selecting
// ctx.Done because a context cancel will cause the above goroutine
// to finish and close tableCSVPathsCh.
var paths []string
for tableCSVPath := range tableCSVPathsCh {
paths = append(paths, tableCSVPath)
}
output := config.objectPathToURI(filepath.Join(fixtureFolder, table.Name))
const directIngestion = false
_, err := importFixtureTable(
ctx, sqlDB, gen.Meta().Name, table, paths, directIngestion, output, false, /* injectStats */
)
return errors.Wrapf(err, `creating backup for table %s`, table.Name)
for _, t := range gen.Tables() {
g.Go(func() error {
q := fmt.Sprintf(`BACKUP "%s"."%s" TO $1`, dbName, t.Name)
output := config.objectPathToURI(filepath.Join(fixtureFolder, t.Name))
log.Infof(ctx, "Backing %s up to %q...", t.Name, output)
_, err := sqlDB.Exec(q, output)
return err
})
}

if err := g.Wait(); err != nil {
return Fixture{}, err
}

// TODO(dan): Clean up the CSVs.
return GetFixture(ctx, gcs, config, gen)
}

Expand Down

0 comments on commit f4a0350

Please sign in to comment.