From 6bbb8236495d967f72dc7923984329c9ec4d6c18 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Fri, 24 May 2019 14:02:09 -0400 Subject: [PATCH 1/4] workload/ycsb: fix key selection for reading and inserting Previously, the keys for insertion were chosen based on the iMaxHead of the zipfian distribution generator. As suggested in the ycsb paper, the zipfian distribution generator was later changed to use a much larger key space, but the key selection for insertion was never changed. This commit adds an index for the next row to be inserted and a row counter that keeps track of the largest contiguous sequence of keys able to be read. Since IMaxHead is no longer used after these changes, it was removed from the randGenerator interface and all implementors. Additionally, we perform the convertion from random number to key in a different manner than the yscb paper. The yscb paper performs a hash then mods the number, while we mod the number then hash it. This causes the hotspot of keys when using the zipfian to be record 0 (I.E. the oldest record). Another hash was added to ensure that the hotspot of keys changes. Hashing twice is also consistent with the official yscb implementation. Finally, there was no logic to handle reading keys that were inserted since the key selection for reading only picked from the initial rows. Instead of modding by the initial rows, we now mod by the total number of rows recorded by the row counter. Release note: None --- pkg/workload/ycsb/acknowledged_counter.go | 71 +++++++++++++++++++++++ pkg/workload/ycsb/uniform_generator.go | 8 --- pkg/workload/ycsb/ycsb.go | 46 +++++++++------ pkg/workload/ycsb/zipfgenerator.go | 23 ++------ 4 files changed, 103 insertions(+), 45 deletions(-) create mode 100644 pkg/workload/ycsb/acknowledged_counter.go diff --git a/pkg/workload/ycsb/acknowledged_counter.go b/pkg/workload/ycsb/acknowledged_counter.go new file mode 100644 index 000000000000..27d261e6964a --- /dev/null +++ b/pkg/workload/ycsb/acknowledged_counter.go @@ -0,0 +1,71 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. + +package ycsb + +import ( + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/pkg/errors" +) + +const ( + // windowSize is the size of the window of pending acknowledgements. + windowSize = 1 << 20 + // windowMask is the mask the apply to obtain an index in the window. + windowMask = windowSize - 1 +) + +// AcknowledgedCounter keeps track of the largest value v such that all values +// in [initialCount, v) are acknowledged. +type AcknowledgedCounter struct { + mu struct { + syncutil.Mutex + count uint64 + window []bool + } +} + +// NewAcknowledgedCounter constructs a new AcknowledgedCounter with the given +// parameters. +func NewAcknowledgedCounter(initialCount uint64) *AcknowledgedCounter { + c := &AcknowledgedCounter{} + c.mu.count = initialCount + c.mu.window = make([]bool, windowSize) + return c +} + +// Last returns the largest value v such that all values in [initialCount, v) are ackowledged. +func (c *AcknowledgedCounter) Last() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + return c.mu.count +} + +// Acknowledge marks v as being acknowledged. +func (c *AcknowledgedCounter) Acknowledge(v uint64) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.mu.window[v&windowMask] { + return errors.Errorf("Number of pending acknowledgements exceeded window size: %d has been acknowledged, but %d is not acknowledged", v, c.mu.count) + } + + c.mu.window[v&windowMask] = true + for c.mu.window[c.mu.count&windowMask] { + c.mu.window[c.mu.count&windowMask] = false + c.mu.count++ + } + return nil +} diff --git a/pkg/workload/ycsb/uniform_generator.go b/pkg/workload/ycsb/uniform_generator.go index 9be702715d05..01a73f78fb84 100644 --- a/pkg/workload/ycsb/uniform_generator.go +++ b/pkg/workload/ycsb/uniform_generator.go @@ -42,14 +42,6 @@ func NewUniformGenerator(rng *rand.Rand, minInsertRow uint64) (*UniformGenerator return &z, nil } -// IMaxHead returns the current value of IMaxHead, without incrementing. -func (z *UniformGenerator) IMaxHead() uint64 { - z.mu.Lock() - max := z.mu.sequence - z.mu.Unlock() - return max -} - // IncrementIMax increments the sequence number. func (z *UniformGenerator) IncrementIMax() error { z.mu.Lock() diff --git a/pkg/workload/ycsb/ycsb.go b/pkg/workload/ycsb/ycsb.go index 9837464831fa..47a9998113d9 100644 --- a/pkg/workload/ycsb/ycsb.go +++ b/pkg/workload/ycsb/ycsb.go @@ -280,6 +280,8 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, zipfRng := rand.New(rand.NewSource(g.seed)) var randGen randGenerator + var rowIndex = new(uint64) + *rowIndex = uint64(g.initialRows) switch strings.ToLower(g.distribution) { case "zipfian": @@ -291,6 +293,7 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, return workload.QueryLoad{}, errors.Errorf("Unknown distribution: %s", g.distribution) } + rowCounter := NewAcknowledgedCounter((uint64)(g.initialRows)) ql := workload.QueryLoad{SQLDatabase: sqlDatabase} for i := 0; i < g.connFlags.Concurrency; i++ { rng := rand.New(rand.NewSource(g.seed + int64(i))) @@ -304,6 +307,8 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, readStmt: readStmt, insertStmt: insertStmt, updateStmts: updateStmts, + rowIndex: rowIndex, + rowCounter: rowCounter, randGen: randGen, rng: rng, hashFunc: fnv.New64(), @@ -315,7 +320,6 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, type randGenerator interface { Uint64() uint64 - IMaxHead() uint64 IncrementIMax() error } @@ -329,6 +333,11 @@ type ycsbWorker struct { // be parametrized. In JSON mode it's a single statement. updateStmts []*gosql.Stmt + // The next row index to insert. + rowIndex *uint64 + // Counter to keep track of which rows have been inserted. + rowCounter *AcknowledgedCounter + randGen randGenerator // used to generate random keys rng *rand.Rand // used to generate random strings for the values hashFunc hash.Hash64 @@ -346,7 +355,7 @@ func (yw *ycsbWorker) run(ctx context.Context) error { case readOp: err = yw.readRow(ctx) case insertOp: - err = yw.insertRow(ctx, yw.nextInsertKey(), true) + err = yw.insertRow(ctx, yw.nextInsertKeyIndex(), true) case scanOp: err = yw.scanRows(ctx) default: @@ -392,22 +401,13 @@ func (yw *ycsbWorker) buildKeyName(keynum uint64) string { // close together. // See YCSB paper section 5.3 for a complete description of how keys are chosen. func (yw *ycsbWorker) nextReadKey() string { - // TODO: In order to support workloads with INSERT, this would need to account - // for the number of rows growing over time. See the YCSB paper/code for how - // this should work. (Basically repeatedly drawing from the distribution until - // a sufficiently low value is chosen, but with some complications.) - - // TODO(arjun): Look into why this was being hashed twice before. - rownum := yw.randGen.Uint64() % uint64(yw.config.initialRows) - return yw.buildKeyName(rownum) + rowCount := yw.rowCounter.Last() + rowIndex := yw.hashKey(yw.randGen.Uint64()) % rowCount + return yw.buildKeyName(rowIndex) } -func (yw *ycsbWorker) nextInsertKey() string { - // TODO: This logic is no longer valid now that we are using a large YCSB - // distribution and modding the samples. To properly support INSERTS, we need - // to maintain a separate rownum counter. - rownum := yw.randGen.IMaxHead() - return yw.buildKeyName(rownum) +func (yw *ycsbWorker) nextInsertKeyIndex() uint64 { + return atomic.AddUint64(yw.rowIndex, 1) - 1 } var letters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") @@ -421,9 +421,9 @@ func (yw *ycsbWorker) randString(length int) string { return string(str) } -func (yw *ycsbWorker) insertRow(ctx context.Context, key string, increment bool) error { +func (yw *ycsbWorker) insertRow(ctx context.Context, keyIndex uint64, increment bool) error { args := make([]interface{}, numTableFields+1) - args[0] = key + args[0] = yw.buildKeyName(keyIndex) for i := 1; i <= numTableFields; i++ { args[i] = yw.randString(fieldLength) } @@ -432,9 +432,17 @@ func (yw *ycsbWorker) insertRow(ctx context.Context, key string, increment bool) } if increment { - if err := yw.randGen.IncrementIMax(); err != nil { + prevRowCount := yw.rowCounter.Last() + if err := yw.rowCounter.Acknowledge(keyIndex); err != nil { return err } + currRowCount := yw.rowCounter.Last() + for prevRowCount < currRowCount { + if err := yw.randGen.IncrementIMax(); err != nil { + return err + } + prevRowCount++ + } } return nil } diff --git a/pkg/workload/ycsb/zipfgenerator.go b/pkg/workload/ycsb/zipfgenerator.go index 06d6f70e4ec3..caa69c846e52 100644 --- a/pkg/workload/ycsb/zipfgenerator.go +++ b/pkg/workload/ycsb/zipfgenerator.go @@ -54,12 +54,11 @@ type ZipfGenerator struct { // ZipfGeneratorMu holds variables which must be globally synced. type ZipfGeneratorMu struct { - mu syncutil.Mutex - r *rand.Rand - iMax uint64 - iMaxHead uint64 - eta float64 - zetaN float64 + mu syncutil.Mutex + r *rand.Rand + iMax uint64 + eta float64 + zetaN float64 } // NewZipfGenerator constructs a new ZipfGenerator with the given parameters. @@ -169,15 +168,3 @@ func (z *ZipfGenerator) IncrementIMax() error { z.zipfGenMu.mu.Unlock() return nil } - -// IMaxHead returns the current value of IMaxHead, and increments it after. -func (z *ZipfGenerator) IMaxHead() uint64 { - z.zipfGenMu.mu.Lock() - if z.zipfGenMu.iMaxHead < z.zipfGenMu.iMax { - z.zipfGenMu.iMaxHead = z.zipfGenMu.iMax - } - iMaxHead := z.zipfGenMu.iMaxHead - z.zipfGenMu.iMaxHead++ - z.zipfGenMu.mu.Unlock() - return iMaxHead -} From adb94981626ebf1f1f93350ef198aafa5aac864b Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Tue, 28 May 2019 11:36:56 -0400 Subject: [PATCH 2/4] workload/ycsb: add skewed latest generator and support for workload D The skewed latest generator is used for workload D and skews to the most recently inserted keys. Additionally, changed the API for zipfGenerator to generate an inclusive bound in [0, defaultIMax - 1], instead of [1, defaultIMax]. Release note: None --- pkg/workload/ycsb/skewed_latest_generator.go | 67 ++++++++++++++++++++ pkg/workload/ycsb/ycsb.go | 20 ++++-- 2 files changed, 80 insertions(+), 7 deletions(-) create mode 100644 pkg/workload/ycsb/skewed_latest_generator.go diff --git a/pkg/workload/ycsb/skewed_latest_generator.go b/pkg/workload/ycsb/skewed_latest_generator.go new file mode 100644 index 000000000000..37aa93533069 --- /dev/null +++ b/pkg/workload/ycsb/skewed_latest_generator.go @@ -0,0 +1,67 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. + +package ycsb + +import ( + "math/rand" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// SkewedLatestGenerator is a random number generator that generates numbers in +// the range [iMin, iMax], but skews it towards iMax using a zipfian +// distribution. +type SkewedLatestGenerator struct { + mu struct { + syncutil.Mutex + iMax uint64 + zipfGen *ZipfGenerator + } +} + +// NewSkewedLatestGenerator constructs a new SkewedLatestGenerator with the +// given parameters. It returns an error if the parameters are outside the +// accepted range. +func NewSkewedLatestGenerator( + rng *rand.Rand, iMin, iMax uint64, theta float64, verbose bool, +) (*SkewedLatestGenerator, error) { + + z := SkewedLatestGenerator{} + z.mu.iMax = iMax + zipfGen, err := NewZipfGenerator(rng, 0, iMax-iMin, theta, verbose) + if err != nil { + return nil, err + } + z.mu.zipfGen = zipfGen + + return &z, nil +} + +// IncrementIMax increments iMax. +func (z *SkewedLatestGenerator) IncrementIMax() error { + z.mu.Lock() + defer z.mu.Unlock() + z.mu.iMax++ + return z.mu.zipfGen.IncrementIMax() +} + +// Uint64 returns a random Uint64 between iMin and iMax, where keys near iMax +// are most likely to be drawn. +func (z *SkewedLatestGenerator) Uint64() uint64 { + z.mu.Lock() + defer z.mu.Unlock() + return z.mu.iMax - z.mu.zipfGen.Uint64() +} diff --git a/pkg/workload/ycsb/ycsb.go b/pkg/workload/ycsb/ycsb.go index 47a9998113d9..17318f1b332d 100644 --- a/pkg/workload/ycsb/ycsb.go +++ b/pkg/workload/ycsb/ycsb.go @@ -38,7 +38,7 @@ import ( const ( numTableFields = 10 fieldLength = 100 // In characters - zipfIMin = 1 + zipfIMin = 0 usertableSchemaRelational = `( ycsb_key VARCHAR(255) PRIMARY KEY NOT NULL, @@ -120,7 +120,7 @@ var ycsbMeta = workload.Meta{ g.flags.BoolVar(&g.families, `families`, true, `Place each column in its own column family`) g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations`) g.flags.StringVar(&g.workload, `workload`, `B`, `Workload type. Choose from A-F.`) - g.flags.StringVar(&g.distribution, `request-distribution`, `zipfian`, `Distribution for random number generator [zipfian, uniform].`) + g.flags.StringVar(&g.distribution, `request-distribution`, ``, `Distribution for request key generation [zipfian, uniform, latest]. The default for workloads A, B, C, E, and F is zipfian, and the default for workload D is latest.`) // TODO(dan): g.flags.Uint64Var(&g.maxWrites, `max-writes`, // 7*24*3600*1500, // 7 days at 5% writes and 30k ops/s @@ -145,23 +145,26 @@ func (g *ycsb) Hooks() workload.Hooks { case "A", "a": g.readFreq = 0.5 g.updateFreq = 0.5 + g.distribution = "zipfian" case "B", "b": g.readFreq = 0.95 g.updateFreq = 0.05 + g.distribution = "zipfian" case "C", "c": g.readFreq = 1.0 + g.distribution = "zipfian" case "D", "d": g.readFreq = 0.95 - g.insertFreq = 0.95 - return errors.New("Workload D (read latest) not implemented yet") - // TODO(arjun): workload D (read latest) requires modifying the - // RNG to skew to the latest keys, so not done yet. + g.insertFreq = 0.05 + g.distribution = "latest" case "E", "e": g.scanFreq = 0.95 g.insertFreq = 0.05 + g.distribution = "zipfian" return errors.New("Workload E (scans) not implemented yet") case "F", "f": g.insertFreq = 1.0 + g.distribution = "zipfian" default: return errors.Errorf("Unknown workload: %q", g.workload) } @@ -286,9 +289,12 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, switch strings.ToLower(g.distribution) { case "zipfian": randGen, err = NewZipfGenerator( - zipfRng, zipfIMin, defaultIMax, defaultTheta, false /* verbose */) + zipfRng, zipfIMin, defaultIMax-1, defaultTheta, false /* verbose */) case "uniform": randGen, err = NewUniformGenerator(zipfRng, uint64(g.initialRows)) + case "latest": + randGen, err = NewSkewedLatestGenerator( + zipfRng, zipfIMin, uint64(g.initialRows)-1, defaultTheta, false /* verbose */) default: return workload.QueryLoad{}, errors.Errorf("Unknown distribution: %s", g.distribution) } From 36a4a7fd41805bc597bd8df60f32a09710cba663 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Tue, 28 May 2019 11:38:58 -0400 Subject: [PATCH 3/4] workload/ycsb: implement scans and add support for workload E As described in the ycsb paper, the key should be chosen by a zipfian distribution and the number of keys to scan should be chosen by a uniform distribution. Add a command-line flag for choosing the scan length distribution and also changed the API for the uniform generate to accept a lower bound and upper bound. This makes it more consistent with the zipfian generator and the latest generator and is needed for generating a scan length. Release note: None --- pkg/workload/ycsb/uniform_generator.go | 25 +++--- pkg/workload/ycsb/ycsb.go | 114 ++++++++++++++++--------- 2 files changed, 87 insertions(+), 52 deletions(-) diff --git a/pkg/workload/ycsb/uniform_generator.go b/pkg/workload/ycsb/uniform_generator.go index 01a73f78fb84..268bbff35dc3 100644 --- a/pkg/workload/ycsb/uniform_generator.go +++ b/pkg/workload/ycsb/uniform_generator.go @@ -24,37 +24,38 @@ import ( // UniformGenerator is a random number generator that generates draws from a // uniform distribution. type UniformGenerator struct { - mu struct { + iMin uint64 + mu struct { syncutil.Mutex - r *rand.Rand - sequence uint64 + r *rand.Rand + iMax uint64 } } // NewUniformGenerator constructs a new UniformGenerator with the given parameters. // It returns an error if the parameters are outside the accepted range. -func NewUniformGenerator(rng *rand.Rand, minInsertRow uint64) (*UniformGenerator, error) { +func NewUniformGenerator(rng *rand.Rand, iMin, iMax uint64) (*UniformGenerator, error) { z := UniformGenerator{} + z.iMin = iMin z.mu.r = rng - z.mu.sequence = minInsertRow + z.mu.iMax = iMax return &z, nil } -// IncrementIMax increments the sequence number. +// IncrementIMax increments iMax. func (z *UniformGenerator) IncrementIMax() error { z.mu.Lock() - z.mu.sequence++ - z.mu.Unlock() + defer z.mu.Unlock() + z.mu.iMax++ return nil } -// Uint64 returns a random Uint64 between min and sequence, drawn from a uniform +// Uint64 returns a random Uint64 between iMin and iMax, drawn from a uniform // distribution. func (z *UniformGenerator) Uint64() uint64 { z.mu.Lock() - result := rand.Uint64() % z.mu.sequence - z.mu.Unlock() - return result + defer z.mu.Unlock() + return (uint64)(z.mu.r.Int63n((int64)(z.mu.iMax-z.iMin+1))) + z.iMin } diff --git a/pkg/workload/ycsb/ycsb.go b/pkg/workload/ycsb/ycsb.go index 17318f1b332d..6c4e980738f7 100644 --- a/pkg/workload/ycsb/ycsb.go +++ b/pkg/workload/ycsb/ycsb.go @@ -94,7 +94,9 @@ type ycsb struct { splits int workload string - distribution string + requestDistribution string + scanLengthDistribution string + minScanLength, maxScanLength uint64 readFreq, insertFreq, updateFreq, scanFreq float32 } @@ -120,7 +122,10 @@ var ycsbMeta = workload.Meta{ g.flags.BoolVar(&g.families, `families`, true, `Place each column in its own column family`) g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations`) g.flags.StringVar(&g.workload, `workload`, `B`, `Workload type. Choose from A-F.`) - g.flags.StringVar(&g.distribution, `request-distribution`, ``, `Distribution for request key generation [zipfian, uniform, latest]. The default for workloads A, B, C, E, and F is zipfian, and the default for workload D is latest.`) + g.flags.StringVar(&g.requestDistribution, `request-distribution`, ``, `Distribution for request key generation [zipfian, uniform, latest]. The default for workloads A, B, C, E, and F is zipfian, and the default for workload D is latest.`) + g.flags.StringVar(&g.scanLengthDistribution, `scan-length-distribution`, `uniform`, `Distribution for scan length generation [zipfian, uniform]. Primarily used for workload E.`) + g.flags.Uint64Var(&g.minScanLength, `min-scan-length`, 1, `The minimum length for scan operations. Primarily used for workload E.`) + g.flags.Uint64Var(&g.maxScanLength, `max-scan-length`, 1000, `The maximum length for scan operations. Primarily used for workload E.`) // TODO(dan): g.flags.Uint64Var(&g.maxWrites, `max-writes`, // 7*24*3600*1500, // 7 days at 5% writes and 30k ops/s @@ -145,26 +150,25 @@ func (g *ycsb) Hooks() workload.Hooks { case "A", "a": g.readFreq = 0.5 g.updateFreq = 0.5 - g.distribution = "zipfian" + g.requestDistribution = "zipfian" case "B", "b": g.readFreq = 0.95 g.updateFreq = 0.05 - g.distribution = "zipfian" + g.requestDistribution = "zipfian" case "C", "c": g.readFreq = 1.0 - g.distribution = "zipfian" + g.requestDistribution = "zipfian" case "D", "d": g.readFreq = 0.95 g.insertFreq = 0.05 - g.distribution = "latest" + g.requestDistribution = "latest" case "E", "e": g.scanFreq = 0.95 g.insertFreq = 0.05 - g.distribution = "zipfian" - return errors.New("Workload E (scans) not implemented yet") + g.requestDistribution = "zipfian" case "F", "f": g.insertFreq = 1.0 - g.distribution = "zipfian" + g.requestDistribution = "zipfian" default: return errors.Errorf("Unknown workload: %q", g.workload) } @@ -240,6 +244,11 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, return workload.QueryLoad{}, err } + scanStmt, err := db.Prepare(`SELECT * FROM ycsb.usertable WHERE ycsb_key >= $1 LIMIT $2`) + if err != nil { + return workload.QueryLoad{}, err + } + var insertStmt *gosql.Stmt if g.json { insertStmt, err = db.Prepare(`INSERT INTO ycsb.usertable VALUES ($1, json_build_object( @@ -282,42 +291,57 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, } zipfRng := rand.New(rand.NewSource(g.seed)) - var randGen randGenerator + var requestGen randGenerator + var scanLengthGen randGenerator var rowIndex = new(uint64) *rowIndex = uint64(g.initialRows) - switch strings.ToLower(g.distribution) { + switch strings.ToLower(g.requestDistribution) { case "zipfian": - randGen, err = NewZipfGenerator( + requestGen, err = NewZipfGenerator( zipfRng, zipfIMin, defaultIMax-1, defaultTheta, false /* verbose */) case "uniform": - randGen, err = NewUniformGenerator(zipfRng, uint64(g.initialRows)) + requestGen, err = NewUniformGenerator(zipfRng, 0, uint64(g.initialRows)-1) case "latest": - randGen, err = NewSkewedLatestGenerator( + requestGen, err = NewSkewedLatestGenerator( zipfRng, zipfIMin, uint64(g.initialRows)-1, defaultTheta, false /* verbose */) default: - return workload.QueryLoad{}, errors.Errorf("Unknown distribution: %s", g.distribution) + return workload.QueryLoad{}, errors.Errorf("Unknown request distribution: %s", g.requestDistribution) + } + if err != nil { + return workload.QueryLoad{}, err + } + + switch strings.ToLower(g.scanLengthDistribution) { + case "zipfian": + scanLengthGen, err = NewZipfGenerator(zipfRng, g.minScanLength, g.maxScanLength, defaultTheta, false /* verbose */) + case "uniform": + scanLengthGen, err = NewUniformGenerator(zipfRng, g.minScanLength, g.maxScanLength) + default: + return workload.QueryLoad{}, errors.Errorf("Unknown scan length distribution: %s", g.scanLengthDistribution) + } + if err != nil { + return workload.QueryLoad{}, err } rowCounter := NewAcknowledgedCounter((uint64)(g.initialRows)) ql := workload.QueryLoad{SQLDatabase: sqlDatabase} for i := 0; i < g.connFlags.Concurrency; i++ { rng := rand.New(rand.NewSource(g.seed + int64(i))) - if err != nil { - return workload.QueryLoad{}, err - } w := &ycsbWorker{ - config: g, - hists: reg.GetHandle(), - db: db, - readStmt: readStmt, - insertStmt: insertStmt, - updateStmts: updateStmts, - rowIndex: rowIndex, - rowCounter: rowCounter, - randGen: randGen, - rng: rng, - hashFunc: fnv.New64(), + config: g, + hists: reg.GetHandle(), + db: db, + readStmt: readStmt, + scanStmt: scanStmt, + insertStmt: insertStmt, + updateStmts: updateStmts, + rowIndex: rowIndex, + rowCounter: rowCounter, + requestGen: requestGen, + scanLengthGen: scanLengthGen, + rng: rng, + hashFunc: fnv.New64(), } ql.WorkerFns = append(ql.WorkerFns, w.run) } @@ -330,10 +354,10 @@ type randGenerator interface { } type ycsbWorker struct { - config *ycsb - hists *histogram.Histograms - db *gosql.DB - readStmt, insertStmt *gosql.Stmt + config *ycsb + hists *histogram.Histograms + db *gosql.DB + readStmt, scanStmt, insertStmt *gosql.Stmt // In normal mode this is one statement per field, since the field name cannot // be parametrized. In JSON mode it's a single statement. @@ -344,10 +368,11 @@ type ycsbWorker struct { // Counter to keep track of which rows have been inserted. rowCounter *AcknowledgedCounter - randGen randGenerator // used to generate random keys - rng *rand.Rand // used to generate random strings for the values - hashFunc hash.Hash64 - hashBuf [8]byte + requestGen randGenerator // used to generate random keys for requests + scanLengthGen randGenerator // used to generate length of scan operations + rng *rand.Rand // used to generate random strings for the values + hashFunc hash.Hash64 + hashBuf [8]byte } func (yw *ycsbWorker) run(ctx context.Context) error { @@ -408,7 +433,7 @@ func (yw *ycsbWorker) buildKeyName(keynum uint64) string { // See YCSB paper section 5.3 for a complete description of how keys are chosen. func (yw *ycsbWorker) nextReadKey() string { rowCount := yw.rowCounter.Last() - rowIndex := yw.hashKey(yw.randGen.Uint64()) % rowCount + rowIndex := yw.hashKey(yw.requestGen.Uint64()) % rowCount return yw.buildKeyName(rowIndex) } @@ -444,7 +469,7 @@ func (yw *ycsbWorker) insertRow(ctx context.Context, keyIndex uint64, increment } currRowCount := yw.rowCounter.Last() for prevRowCount < currRowCount { - if err := yw.randGen.IncrementIMax(); err != nil { + if err := yw.requestGen.IncrementIMax(); err != nil { return err } prevRowCount++ @@ -485,7 +510,16 @@ func (yw *ycsbWorker) readRow(ctx context.Context) error { } func (yw *ycsbWorker) scanRows(ctx context.Context) error { - return errors.New("not implemented yet") + key := yw.nextReadKey() + scanLength := yw.scanLengthGen.Uint64() + res, err := yw.scanStmt.QueryContext(ctx, key, scanLength) + if err != nil { + return err + } + defer res.Close() + for res.Next() { + } + return res.Err() } // Choose an operation in proportion to the frequencies. From e5e1116b181d9ed3110b1f8090745990d78c8b54 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Fri, 24 May 2019 16:32:50 -0400 Subject: [PATCH 4/4] roachtest/ycsb: enable workloads D and E Release note: None --- pkg/cmd/roachtest/ycsb.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/cmd/roachtest/ycsb.go b/pkg/cmd/roachtest/ycsb.go index 62748000ee2f..7a450b7405d0 100644 --- a/pkg/cmd/roachtest/ycsb.go +++ b/pkg/cmd/roachtest/ycsb.go @@ -45,11 +45,6 @@ func registerYCSB(r *registry) { } for _, wl := range []string{"A", "B", "C", "D", "E", "F"} { - if wl == "D" || wl == "E" { - // These workloads are currently unsupported by workload. - // See TODOs in workload/ycsb/ycsb.go. - continue - } for _, cpus := range []int{8, 32} { var name string if cpus == 8 { // support legacy test name which didn't include cpu