Skip to content

Commit

Permalink
workload/ycsb: implement scans and add support for workload E
Browse files Browse the repository at this point in the history
As described in the ycsb paper, the key should be chosen by a zipfian
distribution and the number of keys to scan should be chosen by a
uniform distribution.

Add a command-line flag for choosing the scan length distribution and
also changed the API for the uniform generate to accept a lower bound
and upper bound. This makes it more consistent with the zipfian
generator and the latest generator and is needed for generating a scan
length.

Release note: None
  • Loading branch information
jeffrey-xiao committed May 24, 2019
1 parent 993bb9c commit 7f1a6f1
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 48 deletions.
28 changes: 14 additions & 14 deletions pkg/workload/ycsb/uniform_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,45 @@ import (
// UniformGenerator is a random number generator that generates draws from a
// uniform distribution.
type UniformGenerator struct {
mu struct {
iMin uint64
mu struct {
syncutil.Mutex
r *rand.Rand
sequence uint64
r *rand.Rand
iMax uint64
}
}

// NewUniformGenerator constructs a new UniformGenerator with the given parameters.
// It returns an error if the parameters are outside the accepted range.
func NewUniformGenerator(rng *rand.Rand, minInsertRow uint64) (*UniformGenerator, error) {
func NewUniformGenerator(rng *rand.Rand, iMin, iMax uint64) (*UniformGenerator, error) {

z := UniformGenerator{}
z.iMin = iMin
z.mu.r = rng
z.mu.sequence = minInsertRow
z.mu.iMax = iMax

return &z, nil
}

// IMaxHead returns the current value of IMaxHead, without incrementing.
func (z *UniformGenerator) IMaxHead() uint64 {
z.mu.Lock()
max := z.mu.sequence
z.mu.Unlock()
return max
defer z.mu.Unlock()
return z.mu.iMax
}

// IncrementIMax increments the sequence number.
func (z *UniformGenerator) IncrementIMax() error {
z.mu.Lock()
z.mu.sequence++
z.mu.Unlock()
defer z.mu.Unlock()
z.mu.iMax++
return nil
}

// Uint64 returns a random Uint64 between min and sequence, drawn from a uniform
// Uint64 returns a random Uint64 between iMin and iMax, drawn from a uniform
// distribution.
func (z *UniformGenerator) Uint64() uint64 {
z.mu.Lock()
result := rand.Uint64() % z.mu.sequence
z.mu.Unlock()
return result
defer z.mu.Unlock()
return rand.Uint64() % (z.mu.iMax + 1)
}
103 changes: 69 additions & 34 deletions pkg/workload/ycsb/ycsb.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ type ycsb struct {
splits int

workload string
distribution string
requestDistribution string
scanLengthDistribution string
minScanLength, maxScanLength uint64
readFreq, insertFreq, updateFreq, scanFreq float32
}

Expand All @@ -120,7 +122,10 @@ var ycsbMeta = workload.Meta{
g.flags.BoolVar(&g.families, `families`, true, `Place each column in its own column family`)
g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations`)
g.flags.StringVar(&g.workload, `workload`, `B`, `Workload type. Choose from A-F.`)
g.flags.StringVar(&g.distribution, `request-distribution`, `zipfian`, `Distribution for random number generator [zipfian, uniform, latest].`)
g.flags.StringVar(&g.requestDistribution, `request-distribution`, `zipfian`, `Distribution for request key generation [zipfian, uniform, latest].`)
g.flags.StringVar(&g.scanLengthDistribution, `scan-length-distribution`, `uniform`, `Distribution for scan length generation [zipfian, uniform].`)
g.flags.Uint64Var(&g.minScanLength, `min-scan-length`, 1, `The minimum length for scan operations.`)
g.flags.Uint64Var(&g.maxScanLength, `max-scan-length`, 1000, `The maximum length for scan operations.`)

// TODO(dan): g.flags.Uint64Var(&g.maxWrites, `max-writes`,
// 7*24*3600*1500, // 7 days at 5% writes and 30k ops/s
Expand Down Expand Up @@ -156,7 +161,6 @@ func (g *ycsb) Hooks() workload.Hooks {
case "E", "e":
g.scanFreq = 0.95
g.insertFreq = 0.05
return errors.New("Workload E (scans) not implemented yet")
case "F", "f":
g.insertFreq = 1.0
default:
Expand Down Expand Up @@ -234,6 +238,11 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad,
return workload.QueryLoad{}, err
}

scanStmt, err := db.Prepare(`SELECT * FROM ycsb.usertable WHERE ycsb_key >= $1 LIMIT $2`)
if err != nil {
return workload.QueryLoad{}, err
}

var insertStmt *gosql.Stmt
if g.json {
insertStmt, err = db.Prepare(`INSERT INTO ycsb.usertable VALUES ($1, json_build_object(
Expand Down Expand Up @@ -276,43 +285,58 @@ func (g *ycsb) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad,
}

zipfRng := rand.New(rand.NewSource(g.seed))
var randGen randGenerator
var requestGen randGenerator
var scanLengthGen randGenerator
var rowIndex = new(uint64)
var rowCount = new(uint64)
*rowIndex = uint64(g.initialRows)
*rowCount = uint64(g.initialRows)

switch strings.ToLower(g.distribution) {
switch strings.ToLower(g.requestDistribution) {
case "zipfian":
randGen, err = NewZipfGenerator(
requestGen, err = NewZipfGenerator(
zipfRng, zipfIMin, defaultIMax-1, defaultTheta, false /* verbose */)
case "uniform":
randGen, err = NewUniformGenerator(zipfRng, uint64(g.initialRows))
requestGen, err = NewUniformGenerator(zipfRng, 0, uint64(g.initialRows)-1)
case "latest":
randGen, err = NewLatestGenerator(
requestGen, err = NewLatestGenerator(
zipfRng, zipfIMin, uint64(g.initialRows)-1, defaultTheta, false /* verbose */)
default:
return workload.QueryLoad{}, errors.Errorf("Unknown distribution: %s", g.distribution)
return workload.QueryLoad{}, errors.Errorf("Unknown request distribution: %s", g.requestDistribution)
}
if err != nil {
return workload.QueryLoad{}, err
}

switch strings.ToLower(g.scanLengthDistribution) {
case "zipfian":
scanLengthGen, err = NewZipfGenerator(zipfRng, g.minScanLength, g.maxScanLength, defaultTheta, false /* verbose */)
case "uniform":
scanLengthGen, err = NewUniformGenerator(zipfRng, g.minScanLength, g.maxScanLength)
default:
return workload.QueryLoad{}, errors.Errorf("Unknown scan length distribution: %s", g.scanLengthDistribution)
}
if err != nil {
return workload.QueryLoad{}, err
}

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
for i := 0; i < g.connFlags.Concurrency; i++ {
rng := rand.New(rand.NewSource(g.seed + int64(i)))
if err != nil {
return workload.QueryLoad{}, err
}
w := &ycsbWorker{
config: g,
hists: reg.GetHandle(),
db: db,
readStmt: readStmt,
insertStmt: insertStmt,
updateStmts: updateStmts,
rowIndex: rowIndex,
rowCount: rowCount,
randGen: randGen,
rng: rng,
hashFunc: fnv.New64(),
config: g,
hists: reg.GetHandle(),
db: db,
readStmt: readStmt,
scanStmt: scanStmt,
insertStmt: insertStmt,
updateStmts: updateStmts,
rowIndex: rowIndex,
rowCount: rowCount,
requestGen: requestGen,
scanLengthGen: scanLengthGen,
rng: rng,
hashFunc: fnv.New64(),
}
ql.WorkerFns = append(ql.WorkerFns, w.run)
}
Expand All @@ -326,10 +350,10 @@ type randGenerator interface {
}

type ycsbWorker struct {
config *ycsb
hists *histogram.Histograms
db *gosql.DB
readStmt, insertStmt *gosql.Stmt
config *ycsb
hists *histogram.Histograms
db *gosql.DB
readStmt, scanStmt, insertStmt *gosql.Stmt

// In normal mode this is one statement per field, since the field name cannot
// be parametrized. In JSON mode it's a single statement.
Expand All @@ -340,10 +364,11 @@ type ycsbWorker struct {
// The total number of rows inserted.
rowCount *uint64

randGen randGenerator // used to generate random keys
rng *rand.Rand // used to generate random strings for the values
hashFunc hash.Hash64
hashBuf [8]byte
requestGen randGenerator // used to generate random keys for requests
scanLengthGen randGenerator // used to generate length of scan operations
rng *rand.Rand // used to generate random strings for the values
hashFunc hash.Hash64
hashBuf [8]byte
}

func (yw *ycsbWorker) run(ctx context.Context) error {
Expand Down Expand Up @@ -404,7 +429,7 @@ func (yw *ycsbWorker) buildKeyName(keynum uint64) string {
// See YCSB paper section 5.3 for a complete description of how keys are chosen.
func (yw *ycsbWorker) nextReadKey() string {
rowCount := atomic.LoadUint64(yw.rowCount)
rowIndex := yw.hashKey(yw.randGen.Uint64()) % rowCount
rowIndex := yw.hashKey(yw.requestGen.Uint64()) % rowCount
return yw.buildKeyName(rowIndex)
}

Expand Down Expand Up @@ -435,7 +460,7 @@ func (yw *ycsbWorker) insertRow(ctx context.Context, key string, increment bool)
}

if increment {
if err := yw.randGen.IncrementIMax(); err != nil {
if err := yw.requestGen.IncrementIMax(); err != nil {
return err
}
}
Expand Down Expand Up @@ -474,7 +499,17 @@ func (yw *ycsbWorker) readRow(ctx context.Context) error {
}

func (yw *ycsbWorker) scanRows(ctx context.Context) error {
return errors.New("not implemented yet")
key := yw.nextReadKey()
scanLength := yw.scanLengthGen.Uint64()
res, err := yw.scanStmt.QueryContext(ctx, key, scanLength)
if err != nil {
return err
}
defer res.Close()
for res.Next() {

}
return res.Err()
}

// Choose an operation in proportion to the frequencies.
Expand Down

0 comments on commit 7f1a6f1

Please sign in to comment.