From bb8bf979a8ba42a5140800da6f3a0c05114d207e Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Fri, 17 Jan 2020 11:02:03 -0500 Subject: [PATCH] storage/engine: MVCC Metamorphic test suite, first phase This PR adds a new test-only sub-package to engine, metamorphic, which has one test, TestMeta, that generates and runs random MVCC operations on rocksdb and pebble instances with default settings. Future additions to this test suite could include: - A "check" mode that takes an output file as input, parses it, runs the operations in that sequence, and compares output strings. - Diffing test output between rocksdb and pebble and failing if there's a difference - Adding support for more operations - Adding a "restart" operation that closes the engine and restarts a different kind of engine in the store directory, then confirming operations after that point generate the same output. First-but-biggest part of #43762 . Release note: None --- pkg/storage/engine/metamorphic/deck.go | 67 +++ pkg/storage/engine/metamorphic/generator.go | 181 +++++++ pkg/storage/engine/metamorphic/meta_test.go | 139 +++++ pkg/storage/engine/metamorphic/operands.go | 510 +++++++++++++++++++ pkg/storage/engine/metamorphic/operations.go | 420 +++++++++++++++ 5 files changed, 1317 insertions(+) create mode 100644 pkg/storage/engine/metamorphic/deck.go create mode 100644 pkg/storage/engine/metamorphic/generator.go create mode 100644 pkg/storage/engine/metamorphic/meta_test.go create mode 100644 pkg/storage/engine/metamorphic/operands.go create mode 100644 pkg/storage/engine/metamorphic/operations.go diff --git a/pkg/storage/engine/metamorphic/deck.go b/pkg/storage/engine/metamorphic/deck.go new file mode 100644 index 000000000000..6ad6b1517370 --- /dev/null +++ b/pkg/storage/engine/metamorphic/deck.go @@ -0,0 +1,67 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metamorphic + +import ( + "math/rand" + "sync" +) + +// Deck is a random number generator that generates numbers in the range +// [0,len(weights)-1] where the probability of i is +// weights(i)/sum(weights). Unlike Weighted, the weights are specified as +// integers and used in a deck-of-cards style random number selection which +// ensures that each element is returned with a desired frequency within the +// size of the deck. +type Deck struct { + rng *rand.Rand + mu struct { + sync.Mutex + index int + deck []int + } +} + +// NewDeck returns a new deck random number generator. +func NewDeck(rng *rand.Rand, weights ...int) *Deck { + var sum int + for i := range weights { + sum += weights[i] + } + deck := make([]int, 0, sum) + for i := range weights { + for j := 0; j < weights[i]; j++ { + deck = append(deck, i) + } + } + d := &Deck{ + rng: rng, + } + d.mu.index = len(deck) + d.mu.deck = deck + return d +} + +// Int returns a random number in the range [0,len(weights)-1] where the +// probability of i is weights(i)/sum(weights). +func (d *Deck) Int() int { + d.mu.Lock() + if d.mu.index == len(d.mu.deck) { + d.rng.Shuffle(len(d.mu.deck), func(i, j int) { + d.mu.deck[i], d.mu.deck[j] = d.mu.deck[j], d.mu.deck[i] + }) + d.mu.index = 0 + } + result := d.mu.deck[d.mu.index] + d.mu.index++ + d.mu.Unlock() + return result +} diff --git a/pkg/storage/engine/metamorphic/generator.go b/pkg/storage/engine/metamorphic/generator.go new file mode 100644 index 000000000000..fb835c52ec61 --- /dev/null +++ b/pkg/storage/engine/metamorphic/generator.go @@ -0,0 +1,181 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metamorphic + +import ( + "context" + "fmt" + "io" + "math/rand" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// Object to store info corresponding to one metamorphic test run. Responsible +// for generating and executing operations. +type metaTestRunner struct { + ctx context.Context + w io.Writer + t *testing.T + rng *rand.Rand + seed int64 + engine engine.Engine + tsGenerator tsGenerator + managers map[operandType]operandManager + nameToOp map[string]*mvccOp + weights []int + ops []*mvccOp +} + +func (m *metaTestRunner) init() { + // Use a passed-in seed. Using the same seed for two consecutive metamorphic + // test runs should guarantee the same operations being generated. + m.rng = rand.New(rand.NewSource(m.seed)) + + m.managers = map[operandType]operandManager{ + OPERAND_TRANSACTION: &txnManager{ + rng: m.rng, + tsGenerator: &m.tsGenerator, + txnIdMap: make(map[string]*roachpb.Transaction), + inFlightBatches: make(map[*roachpb.Transaction][]engine.Batch), + testRunner: m, + }, + OPERAND_READWRITER: &readWriterManager{ + rng: m.rng, + eng: m.engine, + batchToIdMap: make(map[engine.Batch]int), + }, + OPERAND_MVCC_KEY: &keyManager{ + rng: m.rng, + tsGenerator: &m.tsGenerator, + }, + OPERAND_VALUE: &valueManager{m.rng}, + OPERAND_TEST_RUNNER: &testRunnerManager{m}, + OPERAND_ITERATOR: &iteratorManager{ + rng: m.rng, + readerToIter: make(map[engine.Reader][]engine.Iterator), + iterToId: make(map[engine.Iterator]uint64), + iterCounter: 0, + }, + } + m.nameToOp = make(map[string]*mvccOp) + + m.weights = make([]int, len(operations)) + for i := range operations { + m.weights[i] = operations[i].weight + m.nameToOp[operations[i].name] = &operations[i] + } + m.ops = nil +} + +// generateAndRun generates n operations using a TPCC-style deck shuffle with +// weighted probabilities of each operation appearing. +func (m *metaTestRunner) generateAndRun(n uint64) { + m.ops = make([]*mvccOp, n) + deck := NewDeck(m.rng, m.weights...) + + for i := uint64(0); i < n; i++ { + opToAdd := &operations[deck.Int()] + + m.resolveAndRunOp(opToAdd) + } + + // Close all open objects. This should let the engine close cleanly. + closingOrder := []operandType{ + OPERAND_ITERATOR, + OPERAND_READWRITER, + OPERAND_TRANSACTION, + } + for _, operandType := range closingOrder { + m.managers[operandType].closeAll() + } +} +func (m *metaTestRunner) parseFileAndRun(f io.Reader) { + // TODO(itsbilal): Implement this. +} + +func (m *metaTestRunner) runOp(run opRun) string { + op := run.op + + // This operation might require other operations to run before it runs. Call + // the dependentOps method to resolve these dependencies. + if op.dependentOps != nil { + for _, opRun := range op.dependentOps(m, run.args...) { + m.runOp(opRun) + } + } + + // Running the operation could cause this operand to not exist. Build strings + // for arguments beforehand. + argStrings := make([]string, len(op.operands)) + for i, arg := range run.args { + argStrings[i] = m.managers[op.operands[i]].toString(arg) + } + + m.ops = append(m.ops, op) + output := op.run(m.ctx, m, run.args...) + m.printOp(op, argStrings, output) + return output +} + +// Resolve all operands (including recursively running openers for operands as +// necessary) and run the specified operation. +func (m *metaTestRunner) resolveAndRunOp(op *mvccOp) { + operandInstances := make([]operand, len(op.operands)) + + // Operation op depends on some operands to exist in an open state. + // If those operands' managers report a zero count for that object's open + // instances, recursively call addOp with that operand type's opener. + for i, operand := range op.operands { + opManager := m.managers[operand] + if opManager.count() == 0 { + // Add this operation to the list first, so that it creates the dependency. + m.resolveAndRunOp(m.nameToOp[opManager.opener()]) + } + operandInstances[i] = opManager.get() + } + + m.runOp(opRun{ + op: op, + args: operandInstances, + }) +} + +// Print passed-in operation, arguments and output string to output file. +func (m *metaTestRunner) printOp(op *mvccOp, argStrings []string, output string) { + fmt.Fprintf(m.w, "%s(", op.name) + for i, arg := range argStrings { + if i > 0 { + fmt.Fprintf(m.w, ", ") + } + fmt.Fprintf(m.w, "%s", arg) + } + fmt.Fprintf(m.w, ") -> %s\n", output) +} + +// Monotonically increasing timestamp generator. +type tsGenerator struct { + lastTS hlc.Timestamp +} + +func (t *tsGenerator) generate() hlc.Timestamp { + t.lastTS.WallTime++ + return t.lastTS +} + +func (t *tsGenerator) randomPastTimestamp(rng *rand.Rand) hlc.Timestamp { + var result hlc.Timestamp + result.WallTime = int64(float64(t.lastTS.WallTime+1) * rng.Float64()) + return result +} diff --git a/pkg/storage/engine/metamorphic/meta_test.go b/pkg/storage/engine/metamorphic/meta_test.go new file mode 100644 index 000000000000..27207f3c654c --- /dev/null +++ b/pkg/storage/engine/metamorphic/meta_test.go @@ -0,0 +1,139 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metamorphic + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "path" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +// createTestRocksDBEngine returns a new in-memory RocksDB engine with 1MB of +// storage capacity. +func createTestRocksDBEngine(path string) (engine.Engine, error) { + return engine.NewEngine(enginepb.EngineTypeRocksDB, 1<<20, base.StorageConfig{ + Attrs: roachpb.Attributes{}, + Dir: path, + MustExist: false, + MaxSize: 0, + Settings: cluster.MakeTestingClusterSettings(), + UseFileRegistry: false, + ExtraOptions: nil, + }) +} + +// createTestPebbleEngine returns a new in-memory Pebble storage engine. +func createTestPebbleEngine(path string) (engine.Engine, error) { + return engine.NewEngine(enginepb.EngineTypePebble, 1<<20, base.StorageConfig{ + Attrs: roachpb.Attributes{}, + Dir: path, + MustExist: false, + MaxSize: 0, + Settings: cluster.MakeTestingClusterSettings(), + UseFileRegistry: false, + ExtraOptions: nil, + }) +} + +var mvccEngineImpls = []struct { + name string + create func(path string) (engine.Engine, error) +}{ + {"rocksdb", createTestRocksDBEngine}, + {"pebble", createTestPebbleEngine}, +} + +var ( + keep = flag.Bool("keep", false, "keep temp directories after test") + check = flag.String("check", "", "run operations in specified file and check output for equality") +) + +func runMetaTest(ctx context.Context, t *testing.T, seed int64, checkFile io.Reader) { + for _, engineImpl := range mvccEngineImpls { + t.Run(engineImpl.name, func(t *testing.T) { + tempDir, cleanup := testutils.TempDir(t) + defer func() { + if !*keep { + cleanup() + } + }() + + eng, err := engineImpl.create(path.Join(tempDir, engineImpl.name)) + if err != nil { + t.Fatal(err) + } + defer eng.Close() + + outputFilePath := path.Join(tempDir, fmt.Sprintf("%s.meta", engineImpl.name)) + fmt.Printf("output file path: %s\n", outputFilePath) + + outputFile, err := os.Create(outputFilePath) + if err != nil { + t.Fatal(err) + } + defer outputFile.Close() + + testRunner := metaTestRunner{ + ctx: ctx, + t: t, + w: outputFile, + seed: seed, + engine: eng, + } + + testRunner.init() + if checkFile != nil { + testRunner.parseFileAndRun(checkFile) + } else { + testRunner.generateAndRun(10000) + } + }) + } +} + +// TestMeta runs the MVCC Metamorphic test suite. +func TestMeta(t *testing.T) { + defer leaktest.AfterTest(t) + ctx := context.Background() + seeds := []int64{123} + + if *check != "" { + t.Run("check", func(t *testing.T) { + if _, err := os.Stat(*check); os.IsNotExist(err) { + t.Fatal(err) + } + checkFile, err := os.Open(*check) + if err != nil { + t.Fatal(err) + } + defer checkFile.Close() + + runMetaTest(ctx, t, 0, checkFile) + }) + } + for _, seed := range seeds { + t.Run(fmt.Sprintf("seed=%d", seed), func(t *testing.T) { + runMetaTest(ctx, t, seed, nil) + }) + } +} diff --git a/pkg/storage/engine/metamorphic/operands.go b/pkg/storage/engine/metamorphic/operands.go new file mode 100644 index 000000000000..4f08cbed567a --- /dev/null +++ b/pkg/storage/engine/metamorphic/operands.go @@ -0,0 +1,510 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metamorphic + +import ( + "context" + "fmt" + "math/rand" + "strconv" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +type operandType int + +const ( + OPERAND_TRANSACTION operandType = iota + OPERAND_READWRITER + OPERAND_MVCC_KEY + OPERAND_PAST_TS + OPERAND_VALUE + OPERAND_TEST_RUNNER + OPERAND_ITERATOR +) + +const ( + maxValueSize = 16 +) + +// operandManager represents an object to manage instances of a type of +// object that can be passed as an "operand" to an operation. For simplicity, +// we create operandManagers for each type of argument, even primitive ones like +// MVCCKeys and values. All state about open objects (iterators, transactions, +// writers, etc) should be stored in an operandManager. +type operandManager interface { + get() operand + opener() string + count() int + close(operand) + closeAll() + toString(operand) string + parse(string) operand +} + +type operand interface{} + +func generateBytes(rng *rand.Rand, min int, max int) []byte { + // For better readability, stick to lowercase alphabet characters. + iterations := min + int(float64(max-min)*rng.Float64()) + result := make([]byte, 0, iterations) + + for i := 0; i < iterations; i++ { + result = append(result, byte(rng.Float64()*float64('z'-'a')+'a')) + } + return result +} + +type keyManager struct { + liveKeys []engine.MVCCKey + rng *rand.Rand + tsGenerator *tsGenerator +} + +var _ operandManager = &keyManager{} + +func (k *keyManager) opener() string { + return "" +} + +func (k *keyManager) count() int { + // Always return a nonzero value so opener() is never called directly. + return len(k.liveKeys) + 1 +} + +func (k *keyManager) open() engine.MVCCKey { + var key engine.MVCCKey + key.Key = generateBytes(k.rng, 8, maxValueSize) + key.Timestamp = k.tsGenerator.generate() + k.liveKeys = append(k.liveKeys, key) + + return key +} + +func (k *keyManager) close(operand) { + // No-op. +} + +func (k *keyManager) toString(key operand) string { + mvccKey := key.(engine.MVCCKey) + return fmt.Sprintf("%s/%d", mvccKey.Key, mvccKey.Timestamp.WallTime) +} + +func (k *keyManager) get() operand { + // 15% chance of returning a new key even if some exist. + if len(k.liveKeys) == 0 || k.rng.Float64() < 0.15 { + return k.open() + } + + return k.liveKeys[int(k.rng.Float64()*float64(len(k.liveKeys)))] +} + +func (k *keyManager) closeAll() { + // No-op. +} + +func (k *keyManager) parse(input string) operand { + var key engine.MVCCKey + key.Key = make([]byte, 0, maxValueSize) + _, err := fmt.Sscanf(input, "%s/%d", &key.Key, &key.Timestamp.WallTime) + if err != nil { + panic(err) + } + return key +} + +type valueManager struct { + rng *rand.Rand +} + +var _ operandManager = &valueManager{} + +func (v *valueManager) opener() string { + return "" +} + +func (v *valueManager) count() int { + return 1 +} + +func (v *valueManager) open() []byte { + return v.get().([]byte) +} + +func (v *valueManager) get() operand { + return generateBytes(v.rng, 4, maxValueSize) +} + +func (v *valueManager) close(operand) { + // No-op. +} + +func (v *valueManager) closeAll() { + // No-op. +} + +func (v *valueManager) toString(value operand) string { + return fmt.Sprintf("%s", value.([]byte)) +} + +func (v *valueManager) parse(input string) operand { + var value roachpb.Key = make([]byte, 0, maxValueSize) + _, err := fmt.Sscanf(input, "%s", &input) + if err != nil { + panic(err) + } + return value +} + +type txnManager struct { + rng *rand.Rand + testRunner *metaTestRunner + tsGenerator *tsGenerator + liveTxns []*roachpb.Transaction + txnIdMap map[string]*roachpb.Transaction + inFlightBatches map[*roachpb.Transaction][]engine.Batch + txnCounter uint64 +} + +var _ operandManager = &txnManager{} + +func (t *txnManager) opener() string { + return "txn_open" +} + +func (t *txnManager) count() int { + return len(t.liveTxns) +} + +func (t *txnManager) get() operand { + if len(t.liveTxns) == 0 { + panic("no open txns") + } + return t.liveTxns[int(t.rng.Float64()*float64(len(t.liveTxns)))] +} + +func (t *txnManager) open() *roachpb.Transaction { + t.txnCounter++ + ts := t.tsGenerator.generate() + txn := &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: uuid.FromUint128(uint128.FromInts(0, t.txnCounter)), + Key: roachpb.KeyMin, + WriteTimestamp: ts, + Sequence: 0, + }, + Name: fmt.Sprintf("t%d", t.txnCounter), + DeprecatedOrigTimestamp: ts, + ReadTimestamp: ts, + Status: roachpb.PENDING, + } + t.liveTxns = append(t.liveTxns, txn) + t.txnIdMap[txn.Name] = txn + + return txn +} + +func (t *txnManager) close(op operand) { + txn := op.(*roachpb.Transaction) + for _, span := range txn.IntentSpans { + intent := roachpb.MakeIntent(txn, span) + intent.Status = roachpb.COMMITTED + _, err := engine.MVCCResolveWriteIntent(context.TODO(), t.testRunner.engine, nil, intent) + if err != nil { + panic(err) + } + } + + delete(t.txnIdMap, txn.Name) + delete(t.inFlightBatches, txn) + idx := len(t.liveTxns) + for i := range t.liveTxns { + if t.liveTxns[i] == txn { + idx = i + break + } + } + t.liveTxns[idx] = t.liveTxns[len(t.liveTxns)-1] + t.liveTxns = t.liveTxns[:len(t.liveTxns)-1] +} + +func (t *txnManager) closeAll() { + for _, txn := range t.liveTxns { + t.close(txn) + } +} + +func (t *txnManager) toString(op operand) string { + txn := op.(*roachpb.Transaction) + return txn.Name +} + +func (t *txnManager) parse(input string) operand { + return t.txnIdMap[input] +} + +// TODO(itsbilal): Use this in an inconsistent version of MVCCScan. +type tsManager struct { + rng *rand.Rand + tsGenerator *tsGenerator +} + +var _ operandManager = &tsManager{} + +func (t *tsManager) opener() string { + return "" +} + +func (t *tsManager) count() int { + // Always return a non-zero count so opener() is never called. + return int(t.tsGenerator.lastTS.WallTime) + 1 +} + +func (t *tsManager) close(operand) { + // No-op. +} + +func (t *tsManager) closeAll() { + // No-op. +} + +func (t *tsManager) toString(op operand) string { + ts := op.(hlc.Timestamp) + return fmt.Sprintf("%d", ts.WallTime) +} + +func (t *tsManager) parse(input string) operand { + var ts hlc.Timestamp + wallTime, err := strconv.ParseInt(input, 10, 0) + if err != nil { + panic(err) + } + ts.WallTime = wallTime + return ts +} + +func (t *tsManager) get() operand { + return t.tsGenerator.randomPastTimestamp(t.rng) +} + +type testRunnerManager struct { + t *metaTestRunner +} + +var _ operandManager = &testRunnerManager{} + +func (t *testRunnerManager) opener() string { + return "" +} + +func (t *testRunnerManager) count() int { + return 1 +} + +func (t *testRunnerManager) close(operand) { + // No-op. +} + +func (t *testRunnerManager) closeAll() { + // No-op. +} + +func (t *testRunnerManager) toString(operand) string { + return "t" +} + +func (t *testRunnerManager) parse(string) operand { + return t.t +} + +func (t *testRunnerManager) get() operand { + return t.t +} + +type readWriterManager struct { + rng *rand.Rand + eng engine.Engine + liveBatches []engine.Batch + batchToIdMap map[engine.Batch]int + batchCounter int +} + +var _ operandManager = &readWriterManager{} + +func (w *readWriterManager) get() operand { + // 25% chance of returning the engine, even if there are live batches. + if len(w.liveBatches) == 0 || w.rng.Float64() < 0.25 { + return w.eng + } + + return w.liveBatches[int(w.rng.Float64()*float64(len(w.liveBatches)))] +} + +func (w *readWriterManager) open() engine.ReadWriter { + batch := w.eng.NewBatch() + w.batchCounter++ + w.liveBatches = append(w.liveBatches, batch) + w.batchToIdMap[batch] = w.batchCounter + return batch +} + +func (w *readWriterManager) opener() string { + return "batch_open" +} + +func (w *readWriterManager) count() int { + return len(w.liveBatches) + 1 +} + +func (w *readWriterManager) close(op operand) { + // No-op if engine. + if op == w.eng { + return + } + + batch := op.(engine.Batch) + for i, batch2 := range w.liveBatches { + if batch2 == batch { + w.liveBatches[i] = w.liveBatches[len(w.liveBatches)-1] + w.liveBatches = w.liveBatches[:len(w.liveBatches)-1] + break + } + } + delete(w.batchToIdMap, batch) + batch.Close() +} + +func (w *readWriterManager) closeAll() { + for _, batch := range w.liveBatches { + batch.Close() + } + w.liveBatches = w.liveBatches[:0] + w.batchToIdMap = make(map[engine.Batch]int) +} + +func (w *readWriterManager) toString(op operand) string { + if w.eng == op { + return "engine" + } + return fmt.Sprintf("batch%d", w.batchToIdMap[op.(engine.Batch)]) +} + +func (w *readWriterManager) parse(input string) operand { + if input == "engine" { + return w.eng + } + + var id int + _, err := fmt.Sscanf(input, "batch%d", &id) + if err != nil { + panic(err) + } + + for batch, id2 := range w.batchToIdMap { + if id == id2 { + return batch + } + } + return nil +} + +type iteratorManager struct { + rng *rand.Rand + testRunner *metaTestRunner + readerToIter map[engine.Reader][]engine.Iterator + iterToId map[engine.Iterator]uint64 + liveIters []engine.Iterator + iterCounter uint64 +} + +var _ operandManager = &iteratorManager{} + +func (i *iteratorManager) get() operand { + if len(i.liveIters) == 0 { + panic("no open iterators") + } + + return i.liveIters[int(i.rng.Float64()*float64(len(i.liveIters)))] +} + +func (i *iteratorManager) open(rw engine.ReadWriter, options engine.IterOptions) engine.Iterator { + i.iterCounter++ + iter := rw.NewIterator(options) + i.readerToIter[rw] = append(i.readerToIter[rw], iter) + i.iterToId[iter] = i.iterCounter + i.liveIters = append(i.liveIters, iter) + return iter +} + +func (i *iteratorManager) opener() string { + return "iterator_open" +} + +func (i *iteratorManager) count() int { + return len(i.iterToId) +} + +func (i *iteratorManager) close(op operand) { + iter := op.(engine.Iterator) + iter.Close() + + delete(i.iterToId, iter) + // Clear iter from liveIters + for j, iter2 := range i.liveIters { + if iter == iter2 { + i.liveIters[j] = i.liveIters[len(i.liveIters)-1] + i.liveIters = i.liveIters[:len(i.liveIters)-1] + break + } + } + // Clear iter from readerToIter + for reader, iters := range i.readerToIter { + for j, iter2 := range iters { + if iter == iter2 { + // Delete iters[j] + iters[j] = iters[len(iters)-1] + i.readerToIter[reader] = iters[:len(iters)-1] + + return + } + } + } +} + +func (i *iteratorManager) closeAll() { + for iter, _ := range i.iterToId { + iter.Close() + } +} + +func (i *iteratorManager) toString(op operand) string { + return fmt.Sprintf("iter%d", i.iterToId[op.(engine.Iterator)]) +} + +func (i *iteratorManager) parse(input string) operand { + var id uint64 + _, err := fmt.Sscanf(input, "batch%d", &id) + if err != nil { + panic(err) + } + + for iter, id2 := range i.iterToId { + if id == id2 { + return iter + } + } + return nil +} diff --git a/pkg/storage/engine/metamorphic/operations.go b/pkg/storage/engine/metamorphic/operations.go new file mode 100644 index 000000000000..b1a304827c9a --- /dev/null +++ b/pkg/storage/engine/metamorphic/operations.go @@ -0,0 +1,420 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metamorphic + +import ( + "context" + "fmt" + "math" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" +) + +// opRun represents one operation run; an mvccOp reference as well as bound +// arguments. +type opRun struct { + op *mvccOp + args []operand +} + +// An mvccOp instance represents one type of an operation. The run and +// dependantOps commands should be stateless, with all state stored in the +// passed-in test runner or its operand managers. +type mvccOp struct { + // Name of the operation. Used in file output and parsing. + name string + // Function to call to run this operation. + run func(ctx context.Context, m *metaTestRunner, args ...operand) string + // Returns a list of operation runs that must happen before this operation. + // Note that openers for non-existent operands are handled separately and + // don't need to be handled here. + dependentOps func(m *metaTestRunner, args ...operand) []opRun + // Operands this operation expects. Passed in the same order to run and + // dependentOps. + operands []operandType + // weight is used to denote frequency of this operation to the TPCC-style + // deck. + // + // Note that the generator tends to bias towards opener operations; since + // an opener can be generated outside of the deck shuffle, in resolveAndRunOp + // to create an instance of an operand that does not exist. To counter this + // bias, we try to keep the sum of opener operations to be less than half + // the sum of "closer" operations for an operand type, to prevent too many + // of that type of object from accumulating throughout the run. + weight int +} + +// Helper function to generate iterator_close opRuns for all iterators on a +// passed-in Batch. +func closeItersOnBatch(m *metaTestRunner, reader engine.Reader) (results []opRun) { + // No need to close iters on non-batches (i.e. engines). + if batch, ok := reader.(engine.Batch); ok { + // Close all iterators for this batch first. + iterManager := m.managers[OPERAND_ITERATOR].(*iteratorManager) + for _, iter := range iterManager.readerToIter[batch] { + results = append(results, opRun{ + op: m.nameToOp["iterator_close"], + args: []operand{iter}, + }) + } + } + return +} + +// Helper function to run MVCCScan given a key range and a reader. +func runMvccScan(ctx context.Context, m *metaTestRunner, reverse bool, args []operand) string { + key := args[0].(engine.MVCCKey) + endKey := args[1].(engine.MVCCKey) + txn := args[2].(*roachpb.Transaction) + if endKey.Less(key) { + tmpKey := endKey + endKey = key + key = tmpKey + } + // While MVCCScanning on a batch works in Pebble, it does not in rocksdb. + // This is due to batch iterators not supporting SeekForPrev. For now, use + // m.engine instead of a readWriterManager-generated engine.Reader, otherwise + // we will try MVCCScanning on batches and produce diffs between runs on + // different engines that don't point to an actual issue. + kvs, _, intent, err := engine.MVCCScan(ctx, m.engine, key.Key, endKey.Key, math.MaxInt64, txn.ReadTimestamp, engine.MVCCScanOptions{ + Inconsistent: false, + Tombstones: true, + Reverse: reverse, + Txn: txn, + }) + if err != nil { + return fmt.Sprintf("error: %s", err) + } + return fmt.Sprintf("kvs = %v, intent = %v", kvs, intent) +} + +// Prints the key where an iterator is positioned, or valid = false if invalid. +func printIterState(iter engine.Iterator) string { + if ok, err := iter.Valid(); !ok || err != nil { + if err != nil { + return fmt.Sprintf("valid = %v, err = %s", ok, err.Error()) + } else { + return "valid = false" + } + } + return fmt.Sprintf("key = %s", iter.UnsafeKey().String()) +} + +// List of operations, where each operation is defined as one instance of mvccOp. +var operations = []mvccOp{ + { + name: "mvcc_get", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + reader := args[0].(engine.Reader) + key := args[1].(engine.MVCCKey) + val, intent, err := engine.MVCCGet(ctx, reader, key.Key, key.Timestamp, engine.MVCCGetOptions{ + Inconsistent: true, + Tombstones: true, + Txn: nil, + }) + if err != nil { + return fmt.Sprintf("error: %s", err) + } + return fmt.Sprintf("val = %v, intent = %v", val, intent) + }, + operands: []operandType{ + OPERAND_READWRITER, + OPERAND_MVCC_KEY, + }, + weight: 10, + }, + { + name: "mvcc_put", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + writer := args[0].(engine.ReadWriter) + key := args[1].(engine.MVCCKey) + value := roachpb.MakeValueFromBytes(args[2].([]byte)) + txn := args[3].(*roachpb.Transaction) + txn.Sequence++ + + err := engine.MVCCPut(ctx, writer, nil, key.Key, txn.WriteTimestamp, value, txn) + if err != nil { + return fmt.Sprintf("error: %s", err) + } + + // Update the txn's intent spans to account for this intent being written. + txn.IntentSpans = append(txn.IntentSpans, roachpb.Span{ + Key: key.Key, + }) + // If this write happened on a batch, track that in the txn manager so + // that the batch is committed before the transaction is aborted or + // committed. + if batch, ok := writer.(engine.Batch); ok { + txnManager := m.managers[OPERAND_TRANSACTION].(*txnManager) + txnManager.inFlightBatches[txn] = append(txnManager.inFlightBatches[txn], batch) + } + return "ok" + }, + operands: []operandType{ + OPERAND_READWRITER, + OPERAND_MVCC_KEY, + OPERAND_VALUE, + OPERAND_TRANSACTION, + }, + weight: 30, + }, + { + name: "mvcc_scan", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + return runMvccScan(ctx, m, false, args) + }, + operands: []operandType{ + OPERAND_MVCC_KEY, + OPERAND_MVCC_KEY, + OPERAND_TRANSACTION, + }, + weight: 10, + }, + { + name: "mvcc_reverse_scan", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + return runMvccScan(ctx, m, true, args) + }, + operands: []operandType{ + OPERAND_MVCC_KEY, + OPERAND_MVCC_KEY, + OPERAND_TRANSACTION, + }, + weight: 10, + }, + { + name: "txn_open", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + txn := m.managers[OPERAND_TRANSACTION].(*txnManager).open() + return m.managers[OPERAND_TRANSACTION].toString(txn) + }, + operands: []operandType{}, + weight: 4, + }, + { + name: "txn_commit", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + txnManager := m.managers[OPERAND_TRANSACTION].(*txnManager) + txn := args[0].(*roachpb.Transaction) + txn.Status = roachpb.COMMITTED + txnManager.close(txn) + + return "ok" + }, + dependentOps: func(m *metaTestRunner, args ...operand) (result []opRun) { + txnManager := m.managers[OPERAND_TRANSACTION].(*txnManager) + rwManager := m.managers[OPERAND_READWRITER].(*readWriterManager) + txn := args[0].(*roachpb.Transaction) + closedBatches := make(map[engine.Batch]struct{}) + + // A transaction could have in-flight writes in some batches. Get a list + // of all those batches, and dispatch batch_commit operations for them. + for _, batch := range txnManager.inFlightBatches[txn] { + if _, ok := closedBatches[batch]; ok { + continue + } + closedBatches[batch] = struct{}{} + + // Check if this batch is live. There's a chance it could have been + // closed by a previous batch_commit. + found := false + for _, liveBatch := range rwManager.liveBatches { + if batch == liveBatch { + found = true + break + } + } + if !found { + continue + } + + result = append(result, opRun{ + op: m.nameToOp["batch_commit"], + args: []operand{batch}, + }) + } + return + }, + operands: []operandType{ + OPERAND_TRANSACTION, + }, + weight: 10, + }, + { + name: "batch_open", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + batch := m.managers[OPERAND_READWRITER].(*readWriterManager).open() + return m.managers[OPERAND_READWRITER].toString(batch) + }, + operands: []operandType{}, + weight: 4, + }, + { + name: "batch_commit", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + if batch, ok := args[0].(engine.Batch); ok { + err := batch.Commit(false) + m.managers[OPERAND_READWRITER].close(batch) + if err != nil { + return err.Error() + } + return "ok" + } + return "nop" + }, + dependentOps: func(m *metaTestRunner, args ...operand) (results []opRun) { + return closeItersOnBatch(m, args[0].(engine.Reader)) + }, + operands: []operandType{ + OPERAND_READWRITER, + }, + weight: 10, + }, + { + name: "iterator_open", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + iterManager := m.managers[OPERAND_ITERATOR].(*iteratorManager) + key := args[1].(engine.MVCCKey) + endKey := args[2].(engine.MVCCKey) + if endKey.Less(key) { + tmpKey := key + key = endKey + endKey = tmpKey + } + iter := iterManager.open(args[0].(engine.ReadWriter), engine.IterOptions{ + Prefix: false, + LowerBound: key.Key, + UpperBound: endKey.Key.Next(), + }) + + return iterManager.toString(iter) + }, + dependentOps: func(m *metaTestRunner, args ...operand) (results []opRun) { + return closeItersOnBatch(m, args[0].(engine.Reader)) + }, + operands: []operandType{ + OPERAND_READWRITER, + OPERAND_MVCC_KEY, + OPERAND_MVCC_KEY, + }, + weight: 2, + }, + { + name: "iterator_close", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + iterManager := m.managers[OPERAND_ITERATOR].(*iteratorManager) + iterManager.close(args[0].(engine.Iterator)) + + return "ok" + }, + operands: []operandType{ + OPERAND_ITERATOR, + }, + weight: 5, + }, + { + name: "iterator_seekge", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + iter := args[0].(engine.Iterator) + key := args[1].(engine.MVCCKey) + iter.SeekGE(key) + + return printIterState(iter) + }, + operands: []operandType{ + OPERAND_ITERATOR, + OPERAND_MVCC_KEY, + }, + weight: 5, + }, + { + name: "iterator_seeklt", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + iter := args[0].(engine.Iterator) + key := args[1].(engine.MVCCKey) + iter.SeekLT(key) + + return printIterState(iter) + }, + operands: []operandType{ + OPERAND_ITERATOR, + OPERAND_MVCC_KEY, + }, + weight: 5, + }, + { + name: "iterator_next", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + iter := args[0].(engine.Iterator) + // The rocksdb iterator does not treat kindly to a Next() if it is already + // invalid. Don't run next if that is the case. + if ok, err := iter.Valid(); !ok || err != nil { + if err != nil { + return fmt.Sprintf("valid = %v, err = %s", ok, err.Error()) + } else { + return "valid = false" + } + } + iter.Next() + + return printIterState(iter) + }, + operands: []operandType{ + OPERAND_ITERATOR, + }, + weight: 10, + }, + { + name: "iterator_nextkey", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + iter := args[0].(engine.Iterator) + // The rocksdb iterator does not treat kindly to a NextKey() if it is + // already invalid. Don't run NextKey if that is the case. + if ok, err := iter.Valid(); !ok || err != nil { + if err != nil { + return fmt.Sprintf("valid = %v, err = %s", ok, err.Error()) + } else { + return "valid = false" + } + } + iter.NextKey() + + return printIterState(iter) + }, + operands: []operandType{ + OPERAND_ITERATOR, + }, + weight: 10, + }, + { + name: "iterator_prev", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + iter := args[0].(engine.Iterator) + // The rocksdb iterator does not treat kindly to a Prev() if it is already + // invalid. Don't run prev if that is the case. + if ok, err := iter.Valid(); !ok || err != nil { + if err != nil { + return fmt.Sprintf("valid = %v, err = %s", ok, err.Error()) + } else { + return "valid = false" + } + } + iter.Prev() + + return printIterState(iter) + }, + operands: []operandType{ + OPERAND_ITERATOR, + }, + weight: 10, + }, +}