From 7f1a6f19b46d16e07b2a4985d4cf73bef5d63948 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Fri, 24 May 2019 14:46:25 -0400 Subject: [PATCH] workload/ycsb: implement scans and add support for workload E As described in the ycsb paper, the key should be chosen by a zipfian distribution and the number of keys to scan should be chosen by a uniform distribution. Add a command-line flag for choosing the scan length distribution and also changed the API for the uniform generate to accept a lower bound and upper bound. This makes it more consistent with the zipfian generator and the latest generator and is needed for generating a scan length. Release note: None --- pkg/workload/ycsb/uniform_generator.go | 28 +++---- pkg/workload/ycsb/ycsb.go | 103 +++++++++++++++++-------- 2 files changed, 83 insertions(+), 48 deletions(-) diff --git a/pkg/workload/ycsb/uniform_generator.go b/pkg/workload/ycsb/uniform_generator.go index 9be702715d05..ff117952b446 100644 --- a/pkg/workload/ycsb/uniform_generator.go +++ b/pkg/workload/ycsb/uniform_generator.go @@ -24,20 +24,22 @@ import ( // UniformGenerator is a random number generator that generates draws from a // uniform distribution. type UniformGenerator struct { - mu struct { + iMin uint64 + mu struct { syncutil.Mutex - r *rand.Rand - sequence uint64 + r *rand.Rand + iMax uint64 } } // NewUniformGenerator constructs a new UniformGenerator with the given parameters. // It returns an error if the parameters are outside the accepted range. -func NewUniformGenerator(rng *rand.Rand, minInsertRow uint64) (*UniformGenerator, error) { +func NewUniformGenerator(rng *rand.Rand, iMin, iMax uint64) (*UniformGenerator, error) { z := UniformGenerator{} + z.iMin = iMin z.mu.r = rng - z.mu.sequence = minInsertRow + z.mu.iMax = iMax return &z, nil } @@ -45,24 +47,22 @@ func NewUniformGenerator(rng *rand.Rand, minInsertRow uint64) (*UniformGenerator // IMaxHead returns the current value of IMaxHead, without incrementing. func (z *UniformGenerator) IMaxHead() uint64 { z.mu.Lock() - max := z.mu.sequence - z.mu.Unlock() - return max + defer z.mu.Unlock() + return z.mu.iMax } // IncrementIMax increments the sequence number. func (z *UniformGenerator) IncrementIMax() error { z.mu.Lock() - z.mu.sequence++ - z.mu.Unlock() + defer z.mu.Unlock() + z.mu.iMax++ return nil } -// Uint64 returns a random Uint64 between min and sequence, drawn from a uniform +// Uint64 returns a random Uint64 between iMin and iMax, drawn from a uniform // distribution. func (z *UniformGenerator) Uint64() uint64 { z.mu.Lock() - result := rand.Uint64() % z.mu.sequence - z.mu.Unlock() - return result + defer z.mu.Unlock() + return rand.Uint64() % (z.mu.iMax + 1) } diff --git a/pkg/workload/ycsb/ycsb.go b/pkg/workload/ycsb/ycsb.go index 9289b7bd3ff0..3fbb073f9e18 100644 --- a/pkg/workload/ycsb/ycsb.go +++ b/pkg/workload/ycsb/ycsb.go @@ -94,7 +94,9 @@ type ycsb struct { splits int workload string - distribution string + requestDistribution string + scanLengthDistribution string + minScanLength, maxScanLength uint64 readFreq, insertFreq, updateFreq, scanFreq float32 } @@ -120,7 +122,10 @@ var ycsbMeta = workload.Meta{ g.flags.BoolVar(&g.families, `families`, true, `Place each column in its own column family`) g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations`) g.flags.StringVar(&g.workload, `workload`, `B`, `Workload type. Choose from A-F.`) - g.flags.StringVar(&g.distribution, `request-distribution`, `zipfian`, `Distribution for random number generator [zipfian, uniform, latest].`) + g.flags.StringVar(&g.requestDistribution, `request-distribution`, `zipfian`, `Distribution for request key generation [zipfian, uniform, latest].`) + g.flags.StringVar(&g.scanLengthDistribution, `scan-length-distribution`, `uniform`, `Distribution for scan length generation [zipfian, uniform].`) + g.flags.Uint64Var(&g.minScanLength, `min-scan-length`, 1, `The minimum length for scan operations.`) + g.flags.Uint64Var(&g.maxScanLength, `max-scan-length`, 1000, `The maximum length for scan operations.`) // TODO(dan): g.flags.Uint64Var(&g.maxWrites, `max-writes`, // 7*24*3600*1500, // 7 days at 5% writes and 30k ops/s @@ -156,7 +161,6 @@ func (g *ycsb) Hooks() workload.Hooks { case "E", "e": g.scanFreq = 0.95 g.insertFreq = 0.05 - return errors.New("Workload E (scans) not implemented yet") case "F", "f": g.insertFreq = 1.0 default: @@ -234,6 +238,11 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, return workload.QueryLoad{}, err } + scanStmt, err := db.Prepare(`SELECT * FROM ycsb.usertable WHERE ycsb_key >= $1 LIMIT $2`) + if err != nil { + return workload.QueryLoad{}, err + } + var insertStmt *gosql.Stmt if g.json { insertStmt, err = db.Prepare(`INSERT INTO ycsb.usertable VALUES ($1, json_build_object( @@ -276,43 +285,58 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, } zipfRng := rand.New(rand.NewSource(g.seed)) - var randGen randGenerator + var requestGen randGenerator + var scanLengthGen randGenerator var rowIndex = new(uint64) var rowCount = new(uint64) *rowIndex = uint64(g.initialRows) *rowCount = uint64(g.initialRows) - switch strings.ToLower(g.distribution) { + switch strings.ToLower(g.requestDistribution) { case "zipfian": - randGen, err = NewZipfGenerator( + requestGen, err = NewZipfGenerator( zipfRng, zipfIMin, defaultIMax-1, defaultTheta, false /* verbose */) case "uniform": - randGen, err = NewUniformGenerator(zipfRng, uint64(g.initialRows)) + requestGen, err = NewUniformGenerator(zipfRng, 0, uint64(g.initialRows)-1) case "latest": - randGen, err = NewLatestGenerator( + requestGen, err = NewLatestGenerator( zipfRng, zipfIMin, uint64(g.initialRows)-1, defaultTheta, false /* verbose */) default: - return workload.QueryLoad{}, errors.Errorf("Unknown distribution: %s", g.distribution) + return workload.QueryLoad{}, errors.Errorf("Unknown request distribution: %s", g.requestDistribution) + } + if err != nil { + return workload.QueryLoad{}, err + } + + switch strings.ToLower(g.scanLengthDistribution) { + case "zipfian": + scanLengthGen, err = NewZipfGenerator(zipfRng, g.minScanLength, g.maxScanLength, defaultTheta, false /* verbose */) + case "uniform": + scanLengthGen, err = NewUniformGenerator(zipfRng, g.minScanLength, g.maxScanLength) + default: + return workload.QueryLoad{}, errors.Errorf("Unknown scan length distribution: %s", g.scanLengthDistribution) + } + if err != nil { + return workload.QueryLoad{}, err } ql := workload.QueryLoad{SQLDatabase: sqlDatabase} for i := 0; i < g.connFlags.Concurrency; i++ { rng := rand.New(rand.NewSource(g.seed + int64(i))) - if err != nil { - return workload.QueryLoad{}, err - } w := &ycsbWorker{ - config: g, - hists: reg.GetHandle(), - db: db, - readStmt: readStmt, - insertStmt: insertStmt, - updateStmts: updateStmts, - rowIndex: rowIndex, - rowCount: rowCount, - randGen: randGen, - rng: rng, - hashFunc: fnv.New64(), + config: g, + hists: reg.GetHandle(), + db: db, + readStmt: readStmt, + scanStmt: scanStmt, + insertStmt: insertStmt, + updateStmts: updateStmts, + rowIndex: rowIndex, + rowCount: rowCount, + requestGen: requestGen, + scanLengthGen: scanLengthGen, + rng: rng, + hashFunc: fnv.New64(), } ql.WorkerFns = append(ql.WorkerFns, w.run) } @@ -326,10 +350,10 @@ type randGenerator interface { } type ycsbWorker struct { - config *ycsb - hists *histogram.Histograms - db *gosql.DB - readStmt, insertStmt *gosql.Stmt + config *ycsb + hists *histogram.Histograms + db *gosql.DB + readStmt, scanStmt, insertStmt *gosql.Stmt // In normal mode this is one statement per field, since the field name cannot // be parametrized. In JSON mode it's a single statement. @@ -340,10 +364,11 @@ type ycsbWorker struct { // The total number of rows inserted. rowCount *uint64 - randGen randGenerator // used to generate random keys - rng *rand.Rand // used to generate random strings for the values - hashFunc hash.Hash64 - hashBuf [8]byte + requestGen randGenerator // used to generate random keys for requests + scanLengthGen randGenerator // used to generate length of scan operations + rng *rand.Rand // used to generate random strings for the values + hashFunc hash.Hash64 + hashBuf [8]byte } func (yw *ycsbWorker) run(ctx context.Context) error { @@ -404,7 +429,7 @@ func (yw *ycsbWorker) buildKeyName(keynum uint64) string { // See YCSB paper section 5.3 for a complete description of how keys are chosen. func (yw *ycsbWorker) nextReadKey() string { rowCount := atomic.LoadUint64(yw.rowCount) - rowIndex := yw.hashKey(yw.randGen.Uint64()) % rowCount + rowIndex := yw.hashKey(yw.requestGen.Uint64()) % rowCount return yw.buildKeyName(rowIndex) } @@ -435,7 +460,7 @@ func (yw *ycsbWorker) insertRow(ctx context.Context, key string, increment bool) } if increment { - if err := yw.randGen.IncrementIMax(); err != nil { + if err := yw.requestGen.IncrementIMax(); err != nil { return err } } @@ -474,7 +499,17 @@ func (yw *ycsbWorker) readRow(ctx context.Context) error { } func (yw *ycsbWorker) scanRows(ctx context.Context) error { - return errors.New("not implemented yet") + key := yw.nextReadKey() + scanLength := yw.scanLengthGen.Uint64() + res, err := yw.scanStmt.QueryContext(ctx, key, scanLength) + if err != nil { + return err + } + defer res.Close() + for res.Next() { + + } + return res.Err() } // Choose an operation in proportion to the frequencies.