Skip to content

Commit

Permalink
workload: add json workload
Browse files Browse the repository at this point in the history
Not sure if there's a better way to architect this - it's largely just
modified from the kv workload.

Release note: None.
  • Loading branch information
Justin Jaffray committed Mar 19, 2018
1 parent 4921949 commit 0b41ec3
Show file tree
Hide file tree
Showing 2 changed files with 349 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/workloadccl/allccl/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl/roachmartccl"
_ "github.com/cockroachdb/cockroach/pkg/workload/bank"
_ "github.com/cockroachdb/cockroach/pkg/workload/examples"
_ "github.com/cockroachdb/cockroach/pkg/workload/jsonload"
_ "github.com/cockroachdb/cockroach/pkg/workload/kv"
_ "github.com/cockroachdb/cockroach/pkg/workload/tpcc"
_ "github.com/cockroachdb/cockroach/pkg/workload/tpch"
Expand Down
348 changes: 348 additions & 0 deletions pkg/workload/jsonload/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.

package jsonload

import (
"bytes"
"context"
"crypto/sha1"
gosql "database/sql"
"encoding/binary"
"fmt"
"hash"
"math"
"math/rand"
"strings"
"sync/atomic"

"github.com/spf13/pflag"

"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/pkg/errors"
)

const (
jsonSchema = `(k BIGINT NOT NULL PRIMARY KEY, v JSONB NOT NULL)`
jsonSchemaWithInvertedIndex = `(k BIGINT NOT NULL PRIMARY KEY, v JSONB NOT NULL, INVERTED INDEX (v))`
jsonSchemaWithComputed = `(k BIGINT AS (v->>'key')::BIGINT STORED PRIMARY KEY, v JSONB NOT NULL)`
)

type jsonLoad struct {
flags workload.Flags
connFlags *workload.ConnFlags

batchSize int
cycleLength int64
readPercent int
writeSeq, seed int64
sequential bool
splits int
complexity int
inverted bool
computed bool
}

func init() {
workload.Register(jsonLoadMeta)
}

var jsonLoadMeta = workload.Meta{
Name: `json`,
Description: `JSON reads and writes to keys spread (by default, uniformly` +
` at random) across the cluster`,
Version: `1.0.0`,
New: func() workload.Generator {
g := &jsonLoad{}
g.flags.FlagSet = pflag.NewFlagSet(`json`, pflag.ContinueOnError)
g.flags.Meta = map[string]workload.FlagMeta{
`batch`: {RuntimeOnly: true},
}
g.flags.IntVar(&g.batchSize, `batch`, 1, `Number of blocks to insert in a single SQL statement`)
g.flags.Int64Var(&g.cycleLength, `cycle-length`, math.MaxInt64, `Number of keys repeatedly accessed by each writer`)
g.flags.IntVar(&g.readPercent, `read-percent`, 0, `Percent (0-100) of operations that are reads of existing keys`)
g.flags.Int64Var(&g.writeSeq, `write-seq`, 0, `Initial write sequence value.`)
g.flags.Int64Var(&g.seed, `seed`, 1, `Key hash seed.`)
g.flags.BoolVar(&g.sequential, `sequential`, false, `Pick keys sequentially instead of randomly`)
g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations`)
g.flags.IntVar(&g.complexity, `complexity`, 20, `Complexity of generated JSON data`)
g.flags.BoolVar(&g.inverted, `inverted`, false, `Whether to include an inverted index`)
g.flags.BoolVar(&g.computed, `computed`, false, `Whether to use a computed primary key`)
g.connFlags = workload.NewConnFlags(&g.flags)
return g
},
}

// Meta implements the Generator interface.
func (*jsonLoad) Meta() workload.Meta { return jsonLoadMeta }

// Flags implements the Flagser interface.
func (w *jsonLoad) Flags() workload.Flags { return w.flags }

// Hooks implements the Hookser interface.
func (w *jsonLoad) Hooks() workload.Hooks {
return workload.Hooks{
Validate: func() error {
if w.computed && w.inverted {
return errors.Errorf("computed and inverted cannot be used together")
}
return nil
},
}
}

// Tables implements the Generator interface.
func (w *jsonLoad) Tables() []workload.Table {
schema := jsonSchema
if w.inverted {
schema = jsonSchemaWithInvertedIndex
} else if w.computed {
schema = jsonSchemaWithComputed
}
table := workload.Table{
Name: `j`,
Schema: schema,
InitialRowCount: 0,
SplitCount: w.splits,
SplitFn: func(splitIdx int) []interface{} {
rng := rand.New(rand.NewSource(w.seed + int64(splitIdx)))
g := newHashGenerator(&sequence{config: w, val: w.writeSeq})
return []interface{}{
int(g.hash(rng.Int63())),
}
},
}
return []workload.Table{table}
}

// Ops implements the Opser interface.
func (w *jsonLoad) Ops(urls []string, reg *workload.HistogramRegistry) (workload.QueryLoad, error) {
sqlDatabase, err := workload.SanitizeUrls(w, w.connFlags.DBOverride, urls)
if err != nil {
return workload.QueryLoad{}, err
}
db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `))
if err != nil {
return workload.QueryLoad{}, err
}
// Allow a maximum of concurrency+1 connections to the database.
db.SetMaxOpenConns(w.connFlags.Concurrency + 1)
db.SetMaxIdleConns(w.connFlags.Concurrency + 1)

var buf bytes.Buffer
buf.WriteString(`SELECT k, v FROM j WHERE k IN (`)
for i := 0; i < w.batchSize; i++ {
if i > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(&buf, `$%d`, i+1)
}
buf.WriteString(`)`)
readStmt, err := db.Prepare(buf.String())
if err != nil {
return workload.QueryLoad{}, err
}

buf.Reset()
if w.computed {
buf.WriteString(`UPSERT INTO j (v) VALUES`)
} else {
buf.WriteString(`UPSERT INTO j (k, v) VALUES`)
}

for i := 0; i < w.batchSize; i++ {
j := i * 2
if i > 0 {
buf.WriteString(", ")
}
if w.computed {
fmt.Fprintf(&buf, ` ($%d)`, i+1)
} else {
fmt.Fprintf(&buf, ` ($%d, $%d)`, j+1, j+2)
}
}

writeStmt, err := db.Prepare(buf.String())
if err != nil {
return workload.QueryLoad{}, err
}

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
for i := 0; i < w.connFlags.Concurrency; i++ {
op := jsonOp{
config: w,
hists: reg.GetHandle(),
db: db,
readStmt: readStmt,
writeStmt: writeStmt,
}
seq := &sequence{config: w, val: w.writeSeq}
if w.sequential {
op.g = newSequentialGenerator(seq)
} else {
op.g = newHashGenerator(seq)
}
ql.WorkerFns = append(ql.WorkerFns, op.run)
}
return ql, nil
}

type jsonOp struct {
config *jsonLoad
hists *workload.Histograms
db *gosql.DB
readStmt *gosql.Stmt
writeStmt *gosql.Stmt
g keyGenerator
}

func (o *jsonOp) run(ctx context.Context) error {
if o.g.rand().Intn(100) < o.config.readPercent {
args := make([]interface{}, o.config.batchSize)
for i := 0; i < o.config.batchSize; i++ {
args[i] = o.g.readKey()
}
start := timeutil.Now()
rows, err := o.readStmt.Query(args...)
if err != nil {
return err
}
for rows.Next() {
}
o.hists.Get(`read`).Record(timeutil.Since(start))
return rows.Err()
}
argCount := 2
if o.config.computed {
argCount = 1
}
args := make([]interface{}, argCount*o.config.batchSize)
for i := 0; i < o.config.batchSize*argCount; i += argCount {
j := i
if !o.config.computed {
args[j] = o.g.writeKey()
j++
}
js, err := json.Random(o.config.complexity, o.g.rand())
if err != nil {
return err
}
if o.config.computed {
builder := json.NewObjectBuilder(2)
builder.Add("key", json.FromInt64(o.g.writeKey()))
builder.Add("data", js)
js = builder.Build()
}
args[j] = js.String()
}
start := timeutil.Now()
_, err := o.writeStmt.Exec(args...)
o.hists.Get(`write`).Record(timeutil.Since(start))
return err
}

type sequence struct {
config *jsonLoad
val int64
}

func (s *sequence) write() int64 {
return (atomic.AddInt64(&s.val, 1) - 1) % s.config.cycleLength
}

// read returns the last key index that has been written. Note that the returned
// index might not actually have been written yet, so a read operation cannot
// require that the key is present.
func (s *sequence) read() int64 {
return atomic.LoadInt64(&s.val) % s.config.cycleLength
}

// keyGenerator generates read and write keys. Read keys may not yet exist and
// write keys may already exist.
type keyGenerator interface {
writeKey() int64
readKey() int64
rand() *rand.Rand
}

type hashGenerator struct {
seq *sequence
random *rand.Rand
hasher hash.Hash
buf [sha1.Size]byte
}

func newHashGenerator(seq *sequence) *hashGenerator {
return &hashGenerator{
seq: seq,
random: rand.New(rand.NewSource(seq.config.seed)),
hasher: sha1.New(),
}
}

func (g *hashGenerator) hash(v int64) int64 {
binary.BigEndian.PutUint64(g.buf[:8], uint64(v))
binary.BigEndian.PutUint64(g.buf[8:16], uint64(g.seq.config.seed))
g.hasher.Reset()
_, _ = g.hasher.Write(g.buf[:16])
g.hasher.Sum(g.buf[:0])
return int64(binary.BigEndian.Uint64(g.buf[:8]))
}

func (g *hashGenerator) writeKey() int64 {
return g.hash(g.seq.write())
}

func (g *hashGenerator) readKey() int64 {
v := g.seq.read()
if v == 0 {
return 0
}
return g.hash(g.random.Int63n(v))
}

func (g *hashGenerator) rand() *rand.Rand {
return g.random
}

type sequentialGenerator struct {
seq *sequence
random *rand.Rand
}

func newSequentialGenerator(seq *sequence) *sequentialGenerator {
return &sequentialGenerator{
seq: seq,
random: rand.New(rand.NewSource(seq.config.seed)),
}
}

func (g *sequentialGenerator) writeKey() int64 {
return g.seq.write()
}

func (g *sequentialGenerator) readKey() int64 {
v := g.seq.read()
if v == 0 {
return 0
}
return g.random.Int63n(v)
}

func (g *sequentialGenerator) rand() *rand.Rand {
return g.random
}

0 comments on commit 0b41ec3

Please sign in to comment.