From 7d4e2fcedbd3b5e98dae6715d4b54d5f33db2016 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Fri, 24 Apr 2015 15:18:37 -0400 Subject: [PATCH] Replace the Go Batch implementation with one based on RocksDB WriteBatch. Somewhat more involved than expected because the builtin RocksDB WriteBatchWithIndex iterators do not handle merge records. benchmark old ns/op new ns/op delta BenchmarkMVCCBatch1Put10 6198 5223 -15.73% BenchmarkMVCCBatch100Put10 6019 5586 -7.19% BenchmarkMVCCBatch10000Put10 9324 5539 -40.59% BenchmarkMVCCBatch100000Put10 13419 6242 -53.48% benchmark old MB/s new MB/s speedup BenchmarkMVCCBatch1Put10 1.61 1.91 1.19x BenchmarkMVCCBatch100Put10 1.66 1.79 1.08x BenchmarkMVCCBatch10000Put10 1.07 1.81 1.69x BenchmarkMVCCBatch100000Put10 0.75 1.60 2.13x --- storage/engine/batch.go | 396 -------------------------------- storage/engine/batch_test.go | 19 +- storage/engine/db.cc | 402 ++++++++++++++++++++++++++++++++- storage/engine/db.h | 10 + storage/engine/engine.go | 28 +-- storage/engine/engine_test.go | 141 ++++++++---- storage/engine/mvcc_test.go | 86 ++++++- storage/engine/rocksdb.go | 199 ++++++++++++---- storage/engine/rocksdb_test.go | 3 + storage/raftstorage.go | 5 +- storage/range.go | 9 +- 11 files changed, 785 insertions(+), 513 deletions(-) delete mode 100644 storage/engine/batch.go diff --git a/storage/engine/batch.go b/storage/engine/batch.go deleted file mode 100644 index addec5c6a6e2..000000000000 --- a/storage/engine/batch.go +++ /dev/null @@ -1,396 +0,0 @@ -// Copyright 2014 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. See the AUTHORS file -// for names of contributors. -// -// Author: Spencer Kimball (spencer.kimball@gmail.com) - -// TODO(pmattis): Move this into C++. - -package engine - -import ( - "bytes" - "runtime/debug" - - "github.com/biogo/store/llrb" - "github.com/cockroachdb/cockroach/proto" - "github.com/cockroachdb/cockroach/util" - gogoproto "github.com/gogo/protobuf/proto" -) - -// Batch wrap an instance of Engine and provides a limited subset of -// Engine functionality. Mutations are added to a write batch -// transparently and only applied to the wrapped engine on invocation -// of Commit(). Reads are passed through to the wrapped engine. In the -// event that reads access keys for which there are already-batched -// updates, reads from the wrapped engine are combined on the fly with -// pending write, delete, and merge updates. -// -// This struct is not thread safe. -type Batch struct { - engine Engine - updates llrb.Tree - committed bool -} - -// NewBatch returns a new instance of Batch which wraps engine. -func NewBatch(engine Engine) *Batch { - return &Batch{engine: engine} -} - -// Put stores the key / value as a BatchPut in the updates tree. -func (b *Batch) Put(key proto.EncodedKey, value []byte) error { - if len(key) == 0 { - return emptyKeyError() - } - // Need to make a copy of key and value as the caller may reuse - // them. - key = append(proto.EncodedKey(nil), key...) - value = append([]byte(nil), value...) - b.updates.Insert(BatchPut{proto.RawKeyValue{Key: key, Value: value}}) - return nil -} - -// Get reads first from the updates tree. If the key is found there -// and is deleted, then a nil value is returned. If the key is found, -// and is a Put, returns the value from the tree. If a merge, then -// merge is performed on the fly to combine with the value from the -// underlying engine. Otherwise, the Get is simply passed through to -// the wrapped engine. -func (b *Batch) Get(key proto.EncodedKey) ([]byte, error) { - if len(key) == 0 { - return nil, emptyKeyError() - } - val := b.updates.Get(proto.RawKeyValue{Key: key}) - if val != nil { - switch t := val.(type) { - case BatchDelete: - return nil, nil - case BatchPut: - return t.Value, nil - case BatchMerge: - existingVal, err := b.engine.Get(key) - if err != nil { - return nil, err - } - return goMerge(existingVal, t.Value) - } - } - return b.engine.Get(key) -} - -// GetProto fetches the value at the specified key and unmarshals it. -func (b *Batch) GetProto(key proto.EncodedKey, msg gogoproto.Message) ( - ok bool, keyBytes, valBytes int64, err error) { - var data []byte - if data, err = b.Get(key); err != nil || data == nil { - return - } - ok = true - if msg != nil { - if err = gogoproto.Unmarshal(data, msg); err != nil { - return - } - } - keyBytes = int64(len(key)) - valBytes = int64(len(data)) - return -} - -// Iterate invokes f on key/value pairs merged from the underlying -// engine and pending batch updates. If f returns done or an error, -// the iteration ends and propagates the error. -func (b *Batch) Iterate(start, end proto.EncodedKey, f func(proto.RawKeyValue) (bool, error)) error { - it := b.NewIterator() - defer it.Close() - - it.Seek(start) - for ; it.Valid(); it.Next() { - k := it.Key() - if bytes.Compare(it.Key(), end) >= 0 { - break - } - if done, err := f(proto.RawKeyValue{Key: k, Value: it.Value()}); done || err != nil { - return err - } - } - // Check for any errors during iteration. - return it.Error() -} - -// Scan scans from both the updates tree and the underlying engine -// and combines the results, up to max. -func (b *Batch) Scan(start, end proto.EncodedKey, max int64) ([]proto.RawKeyValue, error) { - var kvs []proto.RawKeyValue - err := b.Iterate(start, end, func(kv proto.RawKeyValue) (bool, error) { - if max != 0 && int64(len(kvs)) >= max { - return true, nil - } - kvs = append(kvs, kv) - return false, nil - }) - return kvs, err -} - -// Clear stores the key as a BatchDelete in the updates tree. -func (b *Batch) Clear(key proto.EncodedKey) error { - if len(key) == 0 { - return emptyKeyError() - } - // Need to make a copy of key as the caller may reuse it. - key = append(proto.EncodedKey(nil), key...) - b.updates.Insert(BatchDelete{proto.RawKeyValue{Key: key}}) - return nil -} - -// Merge stores the key / value as a BatchMerge in the updates tree. -// If the updates map already contains a BatchPut, then this value is -// merged with the Put and kept as a BatchPut. If the updates map -// already contains a BatchMerge, then this value is merged with the -// existing BatchMerge and kept as a BatchMerge. If the updates map -// contains a BatchDelete, then this value is merged with a nil byte -// slice and stored as a BatchPut. -func (b *Batch) Merge(key proto.EncodedKey, value []byte) error { - if len(key) == 0 { - return emptyKeyError() - } - // Need to make a copy of key as the caller may reuse it. - key = append(proto.EncodedKey(nil), key...) - val := b.updates.Get(proto.RawKeyValue{Key: key}) - if val != nil { - switch t := val.(type) { - case BatchDelete: - mergedBytes, err := goMerge(nil, value) - if err != nil { - return err - } - b.updates.Insert(BatchPut{proto.RawKeyValue{Key: key, Value: mergedBytes}}) - case BatchPut: - mergedBytes, err := goMerge(t.Value, value) - if err != nil { - return err - } - b.updates.Insert(BatchPut{proto.RawKeyValue{Key: key, Value: mergedBytes}}) - case BatchMerge: - mergedBytes, err := goMerge(t.Value, value) - if err != nil { - return err - } - b.updates.Insert(BatchMerge{proto.RawKeyValue{Key: key, Value: mergedBytes}}) - } - } else { - // Need to make a copy of value as the caller may reuse it. - value = append([]byte(nil), value...) - b.updates.Insert(BatchMerge{proto.RawKeyValue{Key: key, Value: value}}) - } - return nil -} - -// Commit writes all pending updates to the underlying engine in -// an atomic write batch. -func (b *Batch) Commit() error { - if b.committed { - panic("this batch was already committed") - } - var batch []interface{} - b.updates.DoRange(func(n llrb.Comparable) (done bool) { - batch = append(batch, n) - return false - }, proto.RawKeyValue{Key: proto.EncodedKey(KeyMin)}, proto.RawKeyValue{Key: proto.EncodedKey(KeyMax)}) - b.committed = true - return b.engine.WriteBatch(batch) -} - -// Open returns an error if called on a Batch. -func (b *Batch) Open() error { - return util.Errorf("cannot open a batch") -} - -// Close is a noop for Batch. -func (b *Batch) Close() { -} - -// Attrs is a noop for Batch. -func (b *Batch) Attrs() proto.Attributes { - return proto.Attributes{} -} - -// WriteBatch returns an error if called on a Batch. -func (b *Batch) WriteBatch([]interface{}) error { - return util.Errorf("cannot write batch from a Batch") -} - -// Capacity returns an error if called on a Batch. -func (b *Batch) Capacity() (StoreCapacity, error) { - return StoreCapacity{}, util.Errorf("cannot report capacity from a Batch") -} - -// SetGCTimeouts is a noop for Batch. -func (b *Batch) SetGCTimeouts(minTxnTS, minRCacheTS int64) { -} - -// ApproximateSize returns an error if called on a Batch. -func (b *Batch) ApproximateSize(start, end proto.EncodedKey) (uint64, error) { - return 0, util.Errorf("cannot get approximate size from a Batch") -} - -// Flush returns an error if called on a Batch. -func (b *Batch) Flush() error { - return util.Errorf("cannot flush a Batch") -} - -// NewIterator returns an iterator over Batch. Batch iterators are -// not thread safe. -func (b *Batch) NewIterator() Iterator { - return newBatchIterator(b.engine, &b.updates) -} - -// NewSnapshot returns nil if called on a Batch. -func (b *Batch) NewSnapshot() Engine { - return nil -} - -// NewBatch returns a new Batch instance wrapping same underlying engine. -func (b *Batch) NewBatch() Engine { - return &Batch{engine: b.engine} -} - -type batchIterator struct { - iter Iterator - updates *llrb.Tree - pending []proto.RawKeyValue - err error -} - -// newBatchIterator returns a new iterator over the supplied Batch instance. -func newBatchIterator(engine Engine, updates *llrb.Tree) *batchIterator { - return &batchIterator{ - iter: engine.NewIterator(), - updates: updates, - } -} - -// The following methods implement the Iterator interface. -func (bi *batchIterator) Close() { - bi.iter.Close() -} - -func (bi *batchIterator) Seek(key []byte) { - bi.pending = []proto.RawKeyValue{} - bi.err = nil - bi.iter.Seek(key) - bi.mergeUpdates(key) -} - -func (bi *batchIterator) Valid() bool { - return bi.err == nil && len(bi.pending) > 0 -} - -func (bi *batchIterator) Next() { - if !bi.Valid() { - bi.err = util.Errorf("next called with invalid iterator") - return - } - last := bi.pending[0].Key.Next() - if len(bi.pending) > 0 { - bi.pending = bi.pending[1:] - } - if len(bi.pending) == 0 { - bi.mergeUpdates(last) - } -} - -func (bi *batchIterator) Key() proto.EncodedKey { - if !bi.Valid() { - debug.PrintStack() - bi.err = util.Errorf("access to invalid key") - return nil - } - return bi.pending[0].Key -} - -func (bi *batchIterator) Value() []byte { - if !bi.Valid() { - bi.err = util.Errorf("access to invalid value") - return nil - } - return bi.pending[0].Value -} - -func (bi *batchIterator) ValueProto(msg gogoproto.Message) error { - return gogoproto.Unmarshal(bi.Value(), msg) -} - -func (bi *batchIterator) Error() error { - return bi.err -} - -// mergeUpdates combines the next key/value from the engine iterator -// with all batch updates which preceed it. The final batch update -// which might overlap the next key/value is merged. The start -// parameter indicates the first possible key to merge from either -// iterator. -func (bi *batchIterator) mergeUpdates(start proto.EncodedKey) { - // Use a for-loop because deleted entries might cause nothing - // to be added to bi.pending; in this case, we loop to next key. - for len(bi.pending) == 0 && bi.iter.Valid() { - kv := proto.RawKeyValue{Key: bi.iter.Key(), Value: bi.iter.Value()} - bi.iter.Next() - - // Get updates up to the engine iterator's current key. - bi.getUpdates(start, kv.Key) - - // Possibly merge an update with engine iterator's current key. - if val := bi.updates.Get(kv); val != nil { - switch t := val.(type) { - case BatchDelete: - case BatchPut: - bi.pending = append(bi.pending, t.RawKeyValue) - case BatchMerge: - mergedKV := proto.RawKeyValue{Key: t.Key} - mergedKV.Value, bi.err = goMerge(kv.Value, t.Value) - if bi.err == nil { - bi.pending = append(bi.pending, mergedKV) - } - } - } else { - bi.pending = append(bi.pending, kv) - } - start = kv.Key.Next() - } - - if len(bi.pending) == 0 { - bi.getUpdates(start, proto.EncodedKey(KeyMax)) - } -} - -// getUpdates scans the updates tree from start to end, adding -// each value to bi.pending. -func (bi *batchIterator) getUpdates(start, end proto.EncodedKey) { - // Scan the updates tree for the key range, merging as we go. - bi.updates.DoRange(func(n llrb.Comparable) bool { - switch t := n.(type) { - case BatchDelete: // On delete, skip. - case BatchPut: // On put, override the corresponding engine entry. - bi.pending = append(bi.pending, t.RawKeyValue) - case BatchMerge: // On merge, merge with empty value. - kv := proto.RawKeyValue{Key: t.Key} - kv.Value, bi.err = goMerge([]byte(nil), t.Value) - if bi.err == nil { - bi.pending = append(bi.pending, kv) - } - } - return bi.err != nil - }, proto.RawKeyValue{Key: start}, proto.RawKeyValue{Key: end}) -} diff --git a/storage/engine/batch_test.go b/storage/engine/batch_test.go index bdc1254a16e9..f722ba40a8e9 100644 --- a/storage/engine/batch_test.go +++ b/storage/engine/batch_test.go @@ -35,6 +35,8 @@ func TestBatchBasics(t *testing.T) { defer e.Close() b := e.NewBatch() + defer b.Close() + if err := b.Put(proto.EncodedKey("a"), []byte("value")); err != nil { t.Fatal(err) } @@ -100,6 +102,8 @@ func TestBatchGet(t *testing.T) { defer e.Close() b := e.NewBatch() + defer b.Close() + // Write initial values, then write to batch. if err := e.Put(proto.EncodedKey("b"), []byte("value")); err != nil { t.Fatal(err) @@ -143,7 +147,10 @@ func compareMergedValues(result, expected []byte) bool { func TestBatchMerge(t *testing.T) { defer leaktest.AfterTest(t) - b := NewInMem(proto.Attributes{}, 1<<20).NewBatch() + e := NewInMem(proto.Attributes{}, 1<<20) + defer e.Close() + + b := e.NewBatch() defer b.Close() // Write batch put, delete & merge. @@ -200,6 +207,8 @@ func TestBatchProto(t *testing.T) { defer e.Close() b := e.NewBatch() + defer b.Close() + kv := &proto.RawKeyValue{Key: proto.EncodedKey("a"), Value: []byte("value")} PutProto(b, proto.EncodedKey("proto"), kv) getKV := &proto.RawKeyValue{} @@ -242,6 +251,8 @@ func TestBatchScan(t *testing.T) { defer e.Close() b := e.NewBatch() + defer b.Close() + existingVals := []proto.RawKeyValue{ {Key: proto.EncodedKey("a"), Value: []byte("1")}, {Key: proto.EncodedKey("b"), Value: []byte("2")}, @@ -332,6 +343,8 @@ func TestBatchScanWithDelete(t *testing.T) { defer e.Close() b := e.NewBatch() + defer b.Close() + // Write initial value, then delete via batch. if err := e.Put(proto.EncodedKey("a"), []byte("value")); err != nil { t.Fatal(err) @@ -357,6 +370,8 @@ func TestBatchScanMaxWithDeleted(t *testing.T) { defer e.Close() b := e.NewBatch() + defer b.Close() + // Write two values. if err := e.Put(proto.EncodedKey("a"), []byte("value1")); err != nil { t.Fatal(err) @@ -388,6 +403,8 @@ func TestBatchConcurrency(t *testing.T) { defer e.Close() b := e.NewBatch() + defer b.Close() + // Write a merge to the batch. if err := b.Merge(proto.EncodedKey("a"), appender("bar")); err != nil { t.Fatal(err) diff --git a/storage/engine/db.cc b/storage/engine/db.cc index 361088626e85..5dc29e554b75 100644 --- a/storage/engine/db.cc +++ b/storage/engine/db.cc @@ -26,6 +26,7 @@ #include "rocksdb/merge_operator.h" #include "rocksdb/options.h" #include "rocksdb/table.h" +#include "rocksdb/utilities/write_batch_with_index.h" #include "cockroach/proto/api.pb.h" #include "cockroach/proto/data.pb.h" #include "cockroach/proto/internal.pb.h" @@ -39,7 +40,12 @@ extern "C" { extern "C" { struct DBBatch { - rocksdb::WriteBatch rep; + int updates; + rocksdb::WriteBatchWithIndex rep; + + DBBatch() + : updates(0) { + } }; struct DBEngine { @@ -79,6 +85,10 @@ rocksdb::Slice ToSlice(DBSlice s) { return rocksdb::Slice(s.data, s.len); } +rocksdb::Slice ToSlice(DBString s) { + return rocksdb::Slice(s.data, s.len); +} + DBSlice ToDBSlice(const rocksdb::Slice& s) { DBSlice result; result.data = const_cast(s.data()); @@ -86,6 +96,13 @@ DBSlice ToDBSlice(const rocksdb::Slice& s) { return result; } +DBSlice ToDBSlice(const DBString& s) { + DBSlice result; + result.data = s.data; + result.len = s.len; + return result; +} + DBString ToDBString(const rocksdb::Slice& s) { DBString result; result.len = s.size(); @@ -676,7 +693,7 @@ class DBMergeOperator : public rocksdb::MergeOperator { return false; } return MergeValues(meta->mutable_value(), operand_meta.value(), - full_merge, logger); + full_merge, logger); } }; @@ -739,6 +756,353 @@ class DBLogger : public rocksdb::Logger { const bool enabled_; }; +// Getter defines an interface for retrieving a value from either an +// iterator or an engine. It is used by ProcessDeltaKey to abstract +// whether the "base" layer is an iterator or an engine. +struct Getter { + virtual DBStatus Get(DBString* value) = 0; +}; + +// IteratorGetter is an implementation of the Getter interface which +// retrieves the value currently pointed to by the supplied +// iterator. It is ok for the supplied iterator to be NULL in which +// case no value will be retrieved. +struct IteratorGetter : public Getter { + IteratorGetter(rocksdb::Iterator* iter) + : base_(iter) { + } + + virtual DBStatus Get(DBString* value) { + if (base_ == NULL) { + value->data = NULL; + value->len = 0; + } else { + *value = ToDBString(base_->value()); + } + return kSuccess; + } + + rocksdb::Iterator* const base_; +}; + +// EngineGetter is an implementation of the Getter interface which +// retrieves the value for the supplied key from a DBEngine. +struct EngineGetter : public Getter { + EngineGetter(DBEngine* db, DBSlice key) + : db_(db), + key_(key) { + } + + virtual DBStatus Get(DBString* value) { + return DBGet(db_, NULL, key_, value); + } + + DBEngine* const db_; + DBSlice const key_; +}; + +// ProcessDeltaKey performs the heavy lifting of processing the deltas +// for "key" contained in a batch and determining what the resulting +// value is. "delta" should have been seeked to "key", but may not be +// pointing to "key" if no updates existing for that key in the batch. +// +// Note that RocksDB WriteBatches append updates +// internally. WBWIIterator maintains an index for these updates on +// . Looping over the entries in WBWIIterator will +// return the keys in sorted order and, for each key, the updates as +// they were added to the batch. +DBStatus ProcessDeltaKey(Getter* base, rocksdb::WBWIIterator* delta, + rocksdb::Slice key, DBString* value) { + if (value->data != NULL) { + free(value->data); + } + value->data = NULL; + value->len = 0; + + int count = 0; + for (; delta->Valid() && delta->Entry().key == key; + ++count, delta->Next()) { + rocksdb::WriteEntry entry = delta->Entry(); + switch (entry.type) { + case rocksdb::kPutRecord: + if (value->data != NULL) { + free(value->data); + } + *value = ToDBString(entry.value); + break; + case rocksdb::kMergeRecord: { + DBString existing; + if (count == 0) { + // If this is the first record for the key, then we need to + // merge with the record in base. + DBStatus status = base->Get(&existing); + if (status.data != NULL) { + if (value->data != NULL) { + free(value->data); + value->data = NULL; + value->len = 0; + } + return status; + } + } else { + // Merge with the value we've built up so far. + existing = *value; + value->data = NULL; + value->len = 0; + } + if (existing.data != NULL) { + DBStatus status = DBMergeOne( + ToDBSlice(existing), ToDBSlice(entry.value), value); + free(existing.data); + if (status.data != NULL) { + return status; + } + } else { + *value = ToDBString(entry.value); + } + break; + } + case rocksdb::kDeleteRecord: + if (value->data != NULL) { + free(value->data); + } + // This mirrors the logic in DBGet(): a deleted entry is + // indicated by a value with NULL data. + value->data = NULL; + value->len = 0; + break; + default: + break; + } + } + + if (count > 0) { + return kSuccess; + } + return base->Get(value); +} + +// This was cribbed from RocksDB and modified to support merge +// records. +class BaseDeltaIterator : public rocksdb::Iterator { + public: + BaseDeltaIterator(rocksdb::Iterator* base_iterator, rocksdb::WBWIIterator* delta_iterator) + : current_at_base_(true), + equal_keys_(false), + status_(rocksdb::Status::OK()), + base_iterator_(base_iterator), + delta_iterator_(delta_iterator), + comparator_(rocksdb::BytewiseComparator()) { + merged_.data = NULL; + } + + virtual ~BaseDeltaIterator() { + ClearMerged(); + } + + bool Valid() const override { + return current_at_base_ ? BaseValid() : DeltaValid(); + } + + void SeekToFirst() override { + base_iterator_->SeekToFirst(); + delta_iterator_->SeekToFirst(); + UpdateCurrent(); + } + + void SeekToLast() override { + base_iterator_->SeekToLast(); + delta_iterator_->SeekToLast(); + UpdateCurrent(); + } + + void Seek(const rocksdb::Slice& k) override { + base_iterator_->Seek(k); + delta_iterator_->Seek(k); + UpdateCurrent(); + } + + void Next() override { + if (!Valid()) { + status_ = rocksdb::Status::NotSupported("Next() on invalid iterator"); + } + Advance(); + } + + void Prev() override { + status_ = rocksdb::Status::NotSupported("Prev() not supported"); + } + + rocksdb::Slice key() const override { + return current_at_base_ ? base_iterator_->key() + : delta_iterator_->Entry().key; + } + + rocksdb::Slice value() const override { + if (current_at_base_) { + return base_iterator_->value(); + } + return ToSlice(merged_); + } + + rocksdb::Status status() const override { + if (!status_.ok()) { + return status_; + } + if (!base_iterator_->status().ok()) { + return base_iterator_->status(); + } + return delta_iterator_->status(); + } + + private: + // -1 -- delta less advanced than base + // 0 -- delta == base + // 1 -- delta more advanced than base + int Compare() const { + assert(delta_iterator_->Valid() && base_iterator_->Valid()); + return comparator_->Compare(delta_iterator_->Entry().key, + base_iterator_->key()); + } + void AssertInvariants() { +#ifndef NDEBUG + if (!Valid()) { + return; + } + if (!BaseValid()) { + assert(!current_at_base_ && delta_iterator_->Valid()); + return; + } + if (!DeltaValid()) { + assert(current_at_base_ && base_iterator_->Valid()); + return; + } + // we don't support those yet + assert(delta_iterator_->Entry().type != rocksdb::kLogDataRecord); + int compare = comparator_->Compare(delta_iterator_->Entry().key, + base_iterator_->key()); + // current_at_base -> compare < 0 + assert(!current_at_base_ || compare < 0); + // !current_at_base -> compare <= 0 + assert(current_at_base_ && compare >= 0); + // equal_keys_ <=> compare == 0 + assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); +#endif + } + + void Advance() { + if (equal_keys_) { + assert(BaseValid() && DeltaValid()); + AdvanceBase(); + AdvanceDelta(); + } else { + if (current_at_base_) { + assert(BaseValid()); + AdvanceBase(); + } else { + assert(DeltaValid()); + AdvanceDelta(); + } + } + UpdateCurrent(); + } + + void AdvanceDelta() { + delta_iterator_->Next(); + ClearMerged(); + } + bool ProcessDelta() { + IteratorGetter base(equal_keys_ ? base_iterator_.get() : NULL); + DBStatus status = ProcessDeltaKey(&base, delta_iterator_.get(), + delta_iterator_->Entry().key.ToString(), + &merged_); + if (status.data != NULL) { + status_ = rocksdb::Status::Corruption("unable to merge records"); + free(status.data); + return false; + } + + // We advanced past the last entry for key and want to back up the + // delta iterator, but we can only back up if the iterator is + // valid. + if (delta_iterator_->Valid()) { + delta_iterator_->Prev(); + } else { + delta_iterator_->SeekToLast(); + } + + return merged_.data == NULL; + } + void AdvanceBase() { + base_iterator_->Next(); + } + bool BaseValid() const { return base_iterator_->Valid(); } + bool DeltaValid() const { return delta_iterator_->Valid(); } + void UpdateCurrent() { + ClearMerged(); + + for (;;) { + equal_keys_ = false; + if (!BaseValid()) { + // Base has finished. + if (!DeltaValid()) { + // Finished + return; + } + if (!ProcessDelta()) { + current_at_base_ = false; + return; + } + AdvanceDelta(); + continue; + } + + if (!DeltaValid()) { + // Delta has finished. + current_at_base_ = true; + return; + } + + int compare = Compare(); + if (compare > 0) { // delta less than base + current_at_base_ = true; + return; + } + if (compare == 0) { + equal_keys_ = true; + } + if (!ProcessDelta()) { + current_at_base_ = false; + return; + } + + // Delta is less advanced and is delete. + AdvanceDelta(); + if (equal_keys_) { + AdvanceBase(); + } + } + + AssertInvariants(); + } + + void ClearMerged() const { + if (merged_.data != NULL) { + free(merged_.data); + merged_.data = NULL; + merged_.len = 0; + } + } + + bool current_at_base_; + bool equal_keys_; + mutable rocksdb::Status status_; + mutable DBString merged_; + std::unique_ptr base_iterator_; + std::unique_ptr delta_iterator_; + const rocksdb::Comparator* comparator_; // not owned +}; + } // namespace DBStatus DBOpen(DBEngine **db, DBSlice dir, DBOptions db_opts) { @@ -854,8 +1218,11 @@ DBStatus DBDelete(DBEngine* db, DBSlice key) { } DBStatus DBWrite(DBEngine* db, DBBatch *batch) { + if (batch->updates == 0) { + return kSuccess; + } rocksdb::WriteOptions options; - return ToDBStatus(db->rep->Write(options, &batch->rep)); + return ToDBStatus(db->rep->Write(options, batch->rep.GetWriteBatch())); } DBSnapshot* DBNewSnapshot(DBEngine* db) { @@ -922,17 +1289,46 @@ void DBBatchDestroy(DBBatch* batch) { } void DBBatchPut(DBBatch* batch, DBSlice key, DBSlice value) { + ++batch->updates; batch->rep.Put(ToSlice(key), ToSlice(value)); } void DBBatchMerge(DBBatch* batch, DBSlice key, DBSlice value) { + ++batch->updates; batch->rep.Merge(ToSlice(key), ToSlice(value)); } +DBStatus DBBatchGet(DBEngine* db, DBBatch* batch, DBSlice key, DBString* value) { + if (batch->updates == 0) { + return DBGet(db, NULL, key, value); + } + + std::unique_ptr iter(batch->rep.NewIterator()); + rocksdb::Slice rkey(ToSlice(key)); + iter->Seek(rkey); + EngineGetter base(db, key); + return ProcessDeltaKey(&base, iter.get(), rkey, value); +} + void DBBatchDelete(DBBatch* batch, DBSlice key) { + ++batch->updates; batch->rep.Delete(ToSlice(key)); } +DBIterator* DBBatchNewIter(DBEngine* db, DBBatch* batch) { + if (batch->updates == 0) { + // Don't bother to create a batch iterator if the batch contains + // no updates. + return DBNewIter(db, NULL); + } + + DBIterator* iter = new DBIterator; + rocksdb::Iterator* base = db->rep->NewIterator(MakeReadOptions(NULL)); + rocksdb::WBWIIterator *delta = batch->rep.NewIterator(); + iter->rep = new BaseDeltaIterator(base, delta); + return iter; +} + DBStatus DBMergeOne(DBSlice existing, DBSlice update, DBString* new_value) { new_value->len = 0; diff --git a/storage/engine/db.h b/storage/engine/db.h index 6c00e03df1ac..57290e6615f5 100644 --- a/storage/engine/db.h +++ b/storage/engine/db.h @@ -161,9 +161,19 @@ void DBBatchPut(DBBatch* batch, DBSlice key, DBSlice value); // Merge the database entry (if any) for "key" with "value". void DBBatchMerge(DBBatch* batch, DBSlice key, DBSlice value); +// Retrieves the database entry for "key" within the specified +// batch. If the key has not been put or deleted within the batch it +// is retrieved from the base db. +DBStatus DBBatchGet(DBEngine* db, DBBatch* batch, DBSlice key, DBString* value); + // Deletes the database entry for "key". void DBBatchDelete(DBBatch* batch, DBSlice key); +// Creates a new database iterator that iterates over both the +// underlying engine and the updates that have been made to batch. It +// is the callers responsibility to call DBIterDestroy(). +DBIterator* DBBatchNewIter(DBEngine* db, DBBatch* batch); + // Implements the merge operator on a single pair of values. update is // merged with existing. This method is provided for invocation from // Go code. diff --git a/storage/engine/engine.go b/storage/engine/engine.go index e5d3ae424d3b..e4bbe0c0aefd 100644 --- a/storage/engine/engine.go +++ b/storage/engine/engine.go @@ -95,10 +95,6 @@ type Engine interface { // Note that clear actually removes entries from the storage // engine, rather than inserting tombstones. Clear(key proto.EncodedKey) error - // WriteBatch atomically applies the specified writes, deletions and - // merges. The list passed to WriteBatch must only contain elements - // of type Batch{Put,Merge,Delete}. - WriteBatch([]interface{}) error // Merge is a high-performance write operation used for values which are // accumulated over several writes. Multiple values can be merged // sequentially into a single key; a subsequent read will return a "merged" @@ -147,21 +143,6 @@ type Engine interface { Commit() error } -// A BatchDelete is a delete operation executed as part of an atomic batch. -type BatchDelete struct { - proto.RawKeyValue -} - -// A BatchPut is a put operation executed as part of an atomic batch. -type BatchPut struct { - proto.RawKeyValue -} - -// A BatchMerge is a merge operation executed as part of an atomic batch. -type BatchMerge struct { - proto.RawKeyValue -} - var bufferPool = sync.Pool{ New: func() interface{} { return gogoproto.NewBuffer(nil) @@ -255,12 +236,15 @@ func Scan(engine Engine, start, end proto.EncodedKey, max int64) ([]proto.RawKey // actually removes entries from the storage engine, rather than // inserting tombstones, as with deletion through the MVCC. func ClearRange(engine Engine, start, end proto.EncodedKey) (int, error) { - var deletes []interface{} + b := engine.NewBatch() + defer b.Close() + count := 0 if err := engine.Iterate(start, end, func(kv proto.RawKeyValue) (bool, error) { - deletes = append(deletes, BatchDelete{proto.RawKeyValue{Key: kv.Key}}) + b.Clear(kv.Key) + count++ return false, nil }); err != nil { return 0, err } - return len(deletes), engine.WriteBatch(deletes) + return count, b.Commit() } diff --git a/storage/engine/engine_test.go b/storage/engine/engine_test.go index c4506ebf458b..0d74de5e5346 100644 --- a/storage/engine/engine_test.go +++ b/storage/engine/engine_test.go @@ -62,11 +62,11 @@ func runWithAllEngines(test func(e Engine, t *testing.T), t *testing.T) { test(inMem, t) } -// TestEngineWriteBatch writes a batch containing 10K rows (all the +// TestEngineBatchCommit writes a batch containing 10K rows (all the // same key) and concurrently attempts to read the value in a tight // loop. The test verifies that either there is no value for the key // or it contains the final value, but never a value in between. -func TestEngineWriteBatch(t *testing.T) { +func TestEngineBatchCommit(t *testing.T) { defer leaktest.AfterTest(t) numWrites := 10000 key := proto.EncodedKey("a") @@ -102,11 +102,12 @@ func TestEngineWriteBatch(t *testing.T) { <-readsBegun // Create key/values and put them in a batch to engine. - puts := make([]interface{}, numWrites, numWrites) + batch := e.NewBatch() + defer batch.Close() for i := 0; i < numWrites; i++ { - puts[i] = BatchPut{proto.RawKeyValue{Key: key, Value: []byte(strconv.Itoa(i))}} + batch.Put(key, []byte(strconv.Itoa(i))) } - if err := e.WriteBatch(puts); err != nil { + if err := batch.Commit(); err != nil { t.Fatal(err) } close(writesDone) @@ -120,36 +121,65 @@ func TestEngineBatch(t *testing.T) { numShuffles := 100 key := proto.EncodedKey("a") // Those are randomized below. - batch := []interface{}{ - BatchPut{proto.RawKeyValue{Key: key, Value: appender("~ockroachDB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("C~ckroachDB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("Co~kroachDB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("Coc~roachDB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("Cock~oachDB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("Cockr~achDB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("Cockro~chDB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("Cockroa~hDB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("Cockroac~DB")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("Cockroach~B")}}, - BatchPut{proto.RawKeyValue{Key: key, Value: appender("CockroachD~")}}, - BatchDelete{proto.RawKeyValue{Key: key}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender("C")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender(" o")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender(" c")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender(" k")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender("r")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender(" o")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender(" a")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender(" c")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender("h")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender(" D")}}, - BatchMerge{proto.RawKeyValue{Key: key, Value: appender(" B")}}, + type data struct { + key proto.EncodedKey + value []byte + merge bool + } + batch := []data{ + {key, appender("~ockroachDB"), false}, + {key, appender("C~ckroachDB"), false}, + {key, appender("Co~kroachDB"), false}, + {key, appender("Coc~roachDB"), false}, + {key, appender("Cock~oachDB"), false}, + {key, appender("Cockr~achDB"), false}, + {key, appender("Cockro~chDB"), false}, + {key, appender("Cockroa~hDB"), false}, + {key, appender("Cockroac~DB"), false}, + {key, appender("Cockroach~B"), false}, + {key, appender("CockroachD~"), false}, + {key, nil, false}, + {key, appender("C"), true}, + {key, appender(" o"), true}, + {key, appender(" c"), true}, + {key, appender(" k"), true}, + {key, appender("r"), true}, + {key, appender(" o"), true}, + {key, appender(" a"), true}, + {key, appender(" c"), true}, + {key, appender("h"), true}, + {key, appender(" D"), true}, + {key, appender(" B"), true}, + } + + apply := func(eng Engine, d data) error { + if d.value == nil { + return eng.Clear(d.key) + } else if d.merge { + return eng.Merge(d.key, d.value) + } + return eng.Put(d.key, d.value) + } + + get := func(eng Engine, key proto.EncodedKey) []byte { + b, err := eng.Get(key) + if err != nil { + t.Fatal(err) + } + m := &proto.MVCCMetadata{} + if err := gogoproto.Unmarshal(b, m); err != nil { + t.Fatal(err) + } + if m.Value == nil { + return nil + } + return m.Value.Bytes } for i := 0; i < numShuffles; i++ { // In each run, create an array of shuffled operations. shuffledIndices := rand.Perm(len(batch)) - currentBatch := make([]interface{}, len(batch)) + currentBatch := make([]data, len(batch)) for k := range currentBatch { currentBatch[k] = batch[shuffledIndices[k]] } @@ -157,20 +187,50 @@ func TestEngineBatch(t *testing.T) { engine.Clear(key) // Run it once with individual operations and remember the result. for i, op := range currentBatch { - if err := engine.WriteBatch([]interface{}{op}); err != nil { - t.Errorf("batch test: %d: op %v: %v", i, op, err) + if err := apply(engine, op); err != nil { + t.Errorf("%d: op %v: %v", i, op, err) continue } } - correctValue, _ := engine.Get(key) + expectedValue := get(engine, key) // Run the whole thing as a batch and compare. - if err := engine.WriteBatch(currentBatch); err != nil { - t.Errorf("batch test: %d: %v", i, err) + b := engine.NewBatch() + b.Clear(key) + for _, op := range currentBatch { + apply(b, op) + } + // Try getting the value from the batch. + actualValue := get(b, key) + if !bytes.Equal(actualValue, expectedValue) { + t.Errorf("%d: expected %s, but got %s", i, expectedValue, actualValue) + } + // Try using an iterator to get the value from the batch. + iter := b.NewIterator() + iter.Seek(key) + if !iter.Valid() { + if currentBatch[len(currentBatch)-1].value != nil { + t.Errorf("%d: batch seek invalid", i) + } + } else if !bytes.Equal(iter.Key(), key) { + t.Errorf("%d: batch seek expected key %s, but got %s", i, key, iter.Key()) + } else { + m := &proto.MVCCMetadata{} + if err := iter.ValueProto(m); err != nil { + t.Fatal(err) + } + if !bytes.Equal(m.Value.Bytes, expectedValue) { + t.Errorf("%d: expected %s, but got %s", i, expectedValue, m.Value.Bytes) + } + } + iter.Close() + // Commit the batch and try getting the value from the engine. + if err := b.Commit(); err != nil { + t.Errorf("%d: %v", i, err) continue } - actualValue, _ := engine.Get(key) - if !bytes.Equal(actualValue, correctValue) { - t.Errorf("batch test: %d: result inconsistent", i) + actualValue = get(engine, key) + if !bytes.Equal(actualValue, expectedValue) { + t.Errorf("%d: expected %s, but got %s", i, expectedValue, actualValue) } } }, t) @@ -606,11 +666,6 @@ func TestSnapshotMethods(t *testing.T) { t.Error("expected error on Clear to snapshot") } - // Verify WriteBatch is error. - if err := snap.WriteBatch([]interface{}{BatchDelete{proto.RawKeyValue{Key: keys[0]}}}); err == nil { - t.Error("expected error on WriteBatch to snapshot") - } - // Verify Merge is error. if err := snap.Merge([]byte("merge-key"), appender("x")); err == nil { t.Error("expected error on Merge to snapshot") diff --git a/storage/engine/mvcc_test.go b/storage/engine/mvcc_test.go index 7582718c3254..7b24509bcdd3 100644 --- a/storage/engine/mvcc_test.go +++ b/storage/engine/mvcc_test.go @@ -55,11 +55,10 @@ var ( valueEmpty = proto.Value{} ) -// createTestEngine returns a new Batch wrapping an in-memory engine -// with 1MB of storage capacity. +// createTestEngine returns a new in-memory engine with 1MB of storage +// capacity. func createTestEngine() Engine { - engine := NewInMem(proto.Attributes{}, 1<<20) - return engine.NewBatch() + return NewInMem(proto.Attributes{}, 1<<20) } // makeTxn creates a new transaction using the specified base @@ -115,6 +114,8 @@ func TestMVCCKeys(t *testing.T) { func TestMVCCEmptyKey(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + if _, err := MVCCGet(engine, proto.Key{}, makeTS(0, 1), true, nil); err == nil { t.Error("expected empty key error") } @@ -135,6 +136,8 @@ func TestMVCCEmptyKey(t *testing.T) { func TestMVCCGetNotExist(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + value, err := MVCCGet(engine, testKey1, makeTS(0, 1), true, nil) if err != nil { t.Fatal(err) @@ -147,6 +150,8 @@ func TestMVCCGetNotExist(t *testing.T) { func TestMVCCPutWithBadValue(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + badValue := proto.Value{Bytes: []byte("a"), Integer: gogoproto.Int64(1)} err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), badValue, nil) if err == nil { @@ -157,6 +162,8 @@ func TestMVCCPutWithBadValue(t *testing.T) { func TestMVCCPutWithTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) if err != nil { t.Fatal(err) @@ -177,6 +184,8 @@ func TestMVCCPutWithTxn(t *testing.T) { func TestMVCCPutWithoutTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, nil) if err != nil { t.Fatal(err) @@ -199,6 +208,8 @@ func TestMVCCPutWithoutTxn(t *testing.T) { func TestMVCCIncrement(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + newVal, err := MVCCIncrement(engine, nil, testKey1, makeTS(0, 1), nil, 0) if err != nil { t.Fatal(err) @@ -226,6 +237,8 @@ func TestMVCCIncrement(t *testing.T) { func TestMVCCUpdateExistingKey(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, nil) if err != nil { t.Fatal(err) @@ -269,6 +282,8 @@ func TestMVCCUpdateExistingKey(t *testing.T) { func TestMVCCUpdateExistingKeyOldVersion(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 1), value1, nil) if err != nil { t.Fatal(err) @@ -288,6 +303,8 @@ func TestMVCCUpdateExistingKeyOldVersion(t *testing.T) { func TestMVCCUpdateExistingKeyInTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) if err != nil { t.Fatal(err) @@ -302,6 +319,8 @@ func TestMVCCUpdateExistingKeyInTxn(t *testing.T) { func TestMVCCUpdateExistingKeyDiffTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) if err != nil { t.Fatal(err) @@ -328,6 +347,8 @@ func TestMVCCGetNoMoreOldVersion(t *testing.T) { // If we search for a, the scan should not return "b". engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(3, 0), value1, nil) err = MVCCPut(engine, nil, testKey2, makeTS(1, 0), value2, nil) @@ -346,6 +367,8 @@ func TestMVCCGetNoMoreOldVersion(t *testing.T) { func TestMVCCGetUncertainty(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + txn := &proto.Transaction{ID: []byte("txn"), Timestamp: makeTS(5, 0), MaxTimestamp: makeTS(10, 0)} // Put a value from the past. if err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil); err != nil { @@ -409,6 +432,8 @@ func TestMVCCGetUncertainty(t *testing.T) { func TestMVCCGetAndDelete(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil) value, err := MVCCGet(engine, testKey1, makeTS(2, 0), true, nil) if err != nil { @@ -461,6 +486,8 @@ func TestMVCCDeleteMissingKey(t *testing.T) { func TestMVCCGetAndDeleteInTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, txn1) value, err := MVCCGet(engine, testKey1, makeTS(2, 0), true, txn1) if err != nil { @@ -495,6 +522,8 @@ func TestMVCCGetAndDeleteInTxn(t *testing.T) { func TestMVCCGetWriteIntentError(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) if err != nil { t.Fatal(err) @@ -516,6 +545,7 @@ func TestMVCCGetWriteIntentError(t *testing.T) { func TestMVCCGetInconsistent(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() // Put two values to key 1, the latest with a txn. err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil) @@ -560,6 +590,8 @@ func TestMVCCGetInconsistent(t *testing.T) { func TestMVCCScan(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil) err = MVCCPut(engine, nil, testKey1, makeTS(2, 0), value4, nil) err = MVCCPut(engine, nil, testKey2, makeTS(1, 0), value2, nil) @@ -618,6 +650,8 @@ func TestMVCCScan(t *testing.T) { func TestMVCCScanMaxNum(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil) err = MVCCPut(engine, nil, testKey2, makeTS(1, 0), value2, nil) err = MVCCPut(engine, nil, testKey3, makeTS(1, 0), value3, nil) @@ -637,6 +671,8 @@ func TestMVCCScanMaxNum(t *testing.T) { func TestMVCCScanWithKeyPrefix(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + // Let's say you have: // a // a @@ -670,6 +706,8 @@ func TestMVCCScanWithKeyPrefix(t *testing.T) { func TestMVCCScanInTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil) err = MVCCPut(engine, nil, testKey2, makeTS(1, 0), value2, nil) err = MVCCPut(engine, nil, testKey3, makeTS(1, 0), value3, txn1) @@ -698,6 +736,7 @@ func TestMVCCScanInTxn(t *testing.T) { func TestMVCCScanInconsistent(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() // A scan with consistent=false should fail in a txn. if _, err := MVCCScan(engine, KeyMin, KeyMax, 0, makeTS(1, 0), false, txn1); err == nil { @@ -760,6 +799,8 @@ func TestMVCCScanInconsistent(t *testing.T) { func TestMVCCDeleteRange(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil) err = MVCCPut(engine, nil, testKey2, makeTS(1, 0), value2, nil) err = MVCCPut(engine, nil, testKey3, makeTS(1, 0), value3, nil) @@ -811,6 +852,8 @@ func TestMVCCDeleteRange(t *testing.T) { func TestMVCCDeleteRangeFailed(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil) err = MVCCPut(engine, nil, testKey2, makeTS(1, 0), value2, txn1) err = MVCCPut(engine, nil, testKey3, makeTS(1, 0), value3, txn1) @@ -830,6 +873,8 @@ func TestMVCCDeleteRangeFailed(t *testing.T) { func TestMVCCDeleteRangeConcurrentTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(1, 0), value1, nil) err = MVCCPut(engine, nil, testKey2, makeTS(1, 0), value2, txn1) err = MVCCPut(engine, nil, testKey3, makeTS(2, 0), value3, txn2) @@ -844,6 +889,8 @@ func TestMVCCDeleteRangeConcurrentTxn(t *testing.T) { func TestMVCCConditionalPut(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCConditionalPut(engine, nil, testKey1, makeTS(0, 1), value1, &value2, nil) if err == nil { t.Fatal("expected error on key not exists") @@ -928,6 +975,8 @@ func TestMVCCConditionalPut(t *testing.T) { func TestMVCCResolveTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) value, err := MVCCGet(engine, testKey1, makeTS(0, 1), true, txn1) if !bytes.Equal(value1.Bytes, value.Bytes) { @@ -951,6 +1000,8 @@ func TestMVCCResolveTxn(t *testing.T) { func TestMVCCAbortTxn(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) err = MVCCResolveWriteIntent(engine, nil, testKey1, makeTS(0, 1), txn1Abort) if err != nil { @@ -976,6 +1027,8 @@ func TestMVCCAbortTxn(t *testing.T) { func TestMVCCAbortTxnWithPreviousVersion(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, nil) err = MVCCPut(engine, nil, testKey1, makeTS(1, 0), value2, nil) err = MVCCPut(engine, nil, testKey1, makeTS(2, 0), value3, txn1) @@ -1008,6 +1061,8 @@ func TestMVCCAbortTxnWithPreviousVersion(t *testing.T) { func TestMVCCWriteWithDiffTimestampsAndEpochs(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + // Start with epoch 1. if err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1); err != nil { t.Fatal(err) @@ -1061,6 +1116,8 @@ func TestMVCCWriteWithDiffTimestampsAndEpochs(t *testing.T) { func TestMVCCReadWithDiffEpochs(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + // Write initial value wihtout a txn. if err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, nil); err != nil { t.Fatal(err) @@ -1107,6 +1164,8 @@ func TestMVCCReadWithDiffEpochs(t *testing.T) { func TestMVCCReadWithPushedTimestamp(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + // Start with epoch 1. if err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1); err != nil { t.Fatal(err) @@ -1125,6 +1184,8 @@ func TestMVCCReadWithPushedTimestamp(t *testing.T) { func TestMVCCResolveWithDiffEpochs(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) err = MVCCPut(engine, nil, testKey2, makeTS(0, 1), value2, txn1e2) num, err := MVCCResolveWriteIntentRange(engine, nil, testKey1, testKey2.Next(), 2, makeTS(0, 1), txn1e2Commit) @@ -1150,6 +1211,8 @@ func TestMVCCResolveWithDiffEpochs(t *testing.T) { func TestMVCCResolveWithUpdatedTimestamp(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) value, err := MVCCGet(engine, testKey1, makeTS(1, 0), true, txn1) if !bytes.Equal(value1.Bytes, value.Bytes) { @@ -1181,6 +1244,8 @@ func TestMVCCResolveWithUpdatedTimestamp(t *testing.T) { func TestMVCCResolveWithPushedTimestamp(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) value, err := MVCCGet(engine, testKey1, makeTS(1, 0), true, txn1) if !bytes.Equal(value1.Bytes, value.Bytes) { @@ -1213,6 +1278,7 @@ func TestMVCCResolveWithPushedTimestamp(t *testing.T) { func TestMVCCResolveTxnNoOps(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() // Resolve a non existent key; noop. err := MVCCResolveWriteIntent(engine, nil, testKey1, makeTS(0, 1), txn1Commit) @@ -1238,6 +1304,8 @@ func TestMVCCResolveTxnNoOps(t *testing.T) { func TestMVCCResolveTxnRange(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + err := MVCCPut(engine, nil, testKey1, makeTS(0, 1), value1, txn1) err = MVCCPut(engine, nil, testKey2, makeTS(0, 1), value2, nil) err = MVCCPut(engine, nil, testKey3, makeTS(0, 1), value3, txn2) @@ -1591,6 +1659,8 @@ func verifyStats(debug string, ms *proto.MVCCStats, expMS *proto.MVCCStats, t *t func TestMVCCStatsBasic(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + ms := &proto.MVCCStats{} // Verify size of mvccVersionTimestampSize. @@ -1729,6 +1799,8 @@ func TestMVCCStatsWithRandomRuns(t *testing.T) { rng, seed := util.NewPseudoRand() log.Infof("using pseudo random number generator with seed %d", seed) engine := createTestEngine() + defer engine.Close() + ms := &proto.MVCCStats{} // Now, generate a random sequence of puts, deletes and resolves. @@ -1808,6 +1880,8 @@ func TestMVCCStatsWithRandomRuns(t *testing.T) { func TestMVCCGarbageCollect(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + ms := &proto.MVCCStats{} bytes := []byte("value") @@ -1906,6 +1980,8 @@ func TestMVCCGarbageCollect(t *testing.T) { func TestMVCCGarbageCollectNonDeleted(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + bytes := []byte("value") ts1 := makeTS(1E9, 0) ts2 := makeTS(2E9, 0) @@ -1930,6 +2006,8 @@ func TestMVCCGarbageCollectNonDeleted(t *testing.T) { func TestMVCCGarbageCollectIntent(t *testing.T) { defer leaktest.AfterTest(t) engine := createTestEngine() + defer engine.Close() + bytes := []byte("value") ts1 := makeTS(1E9, 0) ts2 := makeTS(2E9, 0) diff --git a/storage/engine/rocksdb.go b/storage/engine/rocksdb.go index 3c427af7f1b2..83e43def78b8 100644 --- a/storage/engine/rocksdb.go +++ b/storage/engine/rocksdb.go @@ -249,37 +249,6 @@ func (r *RocksDB) iterateInternal(start, end proto.EncodedKey, f func(proto.RawK return it.Error() } -// WriteBatch applies the puts, merges and deletes atomically via -// the RocksDB write batch facility. The list must only contain -// elements of type Batch{Put,Merge,Delete}. -func (r *RocksDB) WriteBatch(cmds []interface{}) error { - if len(cmds) == 0 { - return nil - } - batch := C.DBNewBatch() - defer C.DBBatchDestroy(batch) - - for i, e := range cmds { - switch v := e.(type) { - case BatchDelete: - if len(v.Key) == 0 { - return emptyKeyError() - } - C.DBBatchDelete(batch, goToCSlice(v.Key)) - case BatchPut: - // We write the batch before returning from this method, so we - // don't need to worry about the GC reclaiming the data stored. - C.DBBatchPut(batch, goToCSlice(v.Key), goToCSlice(v.Value)) - case BatchMerge: - C.DBBatchMerge(batch, goToCSlice(v.Key), goToCSlice(v.Value)) - default: - panic(fmt.Sprintf("illegal operation #%d passed to writeBatch: %T", i, v)) - } - } - - return statusToError(C.DBWrite(r.rdb, batch)) -} - // Capacity queries the underlying file system for disk capacity // information. func (r *RocksDB) Capacity() (StoreCapacity, error) { @@ -434,9 +403,9 @@ func (r *RocksDB) NewSnapshot() Engine { } } -// NewBatch returns a new Batch wrapping this rocksdb engine. +// NewBatch returns a new batch wrapping this rocksdb engine. func (r *RocksDB) NewBatch() Engine { - return &Batch{engine: r} + return newRocksDBBatch(r) } // Commit is a noop for RocksDB engine. @@ -492,11 +461,6 @@ func (r *rocksDBSnapshot) Clear(key proto.EncodedKey) error { return util.Errorf("cannot Clear from a snapshot") } -// WriteBatch is illegal for snapshot and returns an error. -func (r *rocksDBSnapshot) WriteBatch([]interface{}) error { - return util.Errorf("cannot WriteBatch to a snapshot") -} - // Merge is illegal for snapshot and returns an error. func (r *rocksDBSnapshot) Merge(key proto.EncodedKey, value []byte) error { return util.Errorf("cannot Merge to a snapshot") @@ -528,12 +492,12 @@ func (r *rocksDBSnapshot) NewIterator() Iterator { return newRocksDBIterator(r.parent.rdb, r.handle) } -// NewSnapshot is illegal for snapshot and returns nil. +// NewSnapshot is illegal for snapshot. func (r *rocksDBSnapshot) NewSnapshot() Engine { panic("cannot create a NewSnapshot from a snapshot") } -// NewBatch is illegal for snapshot and returns nil. +// NewBatch is illegal for snapshot. func (r *rocksDBSnapshot) NewBatch() Engine { panic("cannot create a NewBatch from a snapshot") } @@ -543,6 +507,161 @@ func (r *rocksDBSnapshot) Commit() error { return util.Errorf("cannot Commit to a snapshot") } +type rocksDBBatch struct { + parent *RocksDB + batch *C.DBBatch +} + +func newRocksDBBatch(r *RocksDB) *rocksDBBatch { + return &rocksDBBatch{ + parent: r, + batch: C.DBNewBatch(), + } +} + +func (r *rocksDBBatch) Open() error { + return util.Errorf("cannot open a batch") +} + +func (r *rocksDBBatch) Close() { + if r.batch != nil { + C.DBBatchDestroy(r.batch) + } +} + +// Attrs returns the engine/store attributes. +func (r *rocksDBBatch) Attrs() proto.Attributes { + return r.parent.Attrs() +} + +func (r *rocksDBBatch) Put(key proto.EncodedKey, value []byte) error { + if len(key) == 0 { + return emptyKeyError() + } + C.DBBatchPut(r.batch, goToCSlice(key), goToCSlice(value)) + return nil +} + +func (r *rocksDBBatch) Merge(key proto.EncodedKey, value []byte) error { + if len(key) == 0 { + return emptyKeyError() + } + C.DBBatchMerge(r.batch, goToCSlice(key), goToCSlice(value)) + return nil +} + +func (r *rocksDBBatch) Get(key proto.EncodedKey) ([]byte, error) { + if len(key) == 0 { + return nil, emptyKeyError() + } + var result C.DBString + err := statusToError(C.DBBatchGet(r.parent.rdb, r.batch, goToCSlice(key), &result)) + if err != nil { + return nil, err + } + return cStringToGoBytes(result), nil +} + +func (r *rocksDBBatch) GetProto(key proto.EncodedKey, msg gogoproto.Message) ( + ok bool, keyBytes, valBytes int64, err error) { + if len(key) == 0 { + err = emptyKeyError() + return + } + var result C.DBString + if err = statusToError(C.DBBatchGet(r.parent.rdb, r.batch, goToCSlice(key), &result)); err != nil { + return + } + if result.len <= 0 { + return + } + ok = true + if msg != nil { + // Make a byte slice that is backed by result.data. This slice + // cannot live past the lifetime of this method, but we're only + // using it to unmarshal the proto. + data := cSliceToUnsafeGoBytes(C.DBSlice(result)) + err = gogoproto.Unmarshal(data, msg) + } + C.free(unsafe.Pointer(result.data)) + keyBytes = int64(len(key)) + valBytes = int64(result.len) + return +} + +func (r *rocksDBBatch) Iterate(start, end proto.EncodedKey, f func(proto.RawKeyValue) (bool, error)) error { + if bytes.Compare(start, end) >= 0 { + return nil + } + it := &rocksDBIterator{ + iter: C.DBBatchNewIter(r.parent.rdb, r.batch), + } + defer it.Close() + + it.Seek(start) + for ; it.Valid(); it.Next() { + k := it.Key() + if !it.Key().Less(end) { + break + } + if done, err := f(proto.RawKeyValue{Key: k, Value: it.Value()}); done || err != nil { + return err + } + } + // Check for any errors during iteration. + return it.Error() +} + +func (r *rocksDBBatch) Clear(key proto.EncodedKey) error { + if len(key) == 0 { + return emptyKeyError() + } + C.DBBatchDelete(r.batch, goToCSlice(key)) + return nil +} + +func (r *rocksDBBatch) Capacity() (StoreCapacity, error) { + return r.parent.Capacity() +} + +func (r *rocksDBBatch) SetGCTimeouts(minTxnTS, minRCacheTS int64) { + // no-op +} + +func (r *rocksDBBatch) ApproximateSize(start, end proto.EncodedKey) (uint64, error) { + return r.parent.ApproximateSize(start, end) +} + +func (r *rocksDBBatch) Flush() error { + return util.Errorf("cannot flush a batch") +} + +func (r *rocksDBBatch) NewIterator() Iterator { + return &rocksDBIterator{ + iter: C.DBBatchNewIter(r.parent.rdb, r.batch), + } +} + +func (r *rocksDBBatch) NewSnapshot() Engine { + panic("cannot create a NewSnapshot from a batch") +} + +func (r *rocksDBBatch) NewBatch() Engine { + return newRocksDBBatch(r.parent) +} + +func (r *rocksDBBatch) Commit() error { + if r.batch == nil { + panic("this batch was already committed") + } + if err := statusToError(C.DBWrite(r.parent.rdb, r.batch)); err != nil { + return err + } + C.DBBatchDestroy(r.batch) + r.batch = nil + return nil +} + type rocksDBIterator struct { iter *C.DBIterator } diff --git a/storage/engine/rocksdb_test.go b/storage/engine/rocksdb_test.go index 9ec6b2f24a46..5b1764b34cb6 100644 --- a/storage/engine/rocksdb_test.go +++ b/storage/engine/rocksdb_test.go @@ -187,6 +187,7 @@ func setupMVCCScanData(numVersions, numKeys int, b *testing.B) *RocksDB { if err := batch.Commit(); err != nil { b.Fatal(err) } + batch.Close() } rocksdb.CompactRange(nil, nil) @@ -411,6 +412,8 @@ func runMVCCBatchPut(valueSize, batchSize int, b *testing.B) { if err := batch.Commit(); err != nil { b.Fatal(err) } + + batch.Close() } b.StopTimer() diff --git a/storage/raftstorage.go b/storage/raftstorage.go index 4ffa98764dfe..036fbcd8189c 100644 --- a/storage/raftstorage.go +++ b/storage/raftstorage.go @@ -280,6 +280,8 @@ func (r *Range) Snapshot() (raftpb.Snapshot, error) { // Append implements the multiraft.WriteableGroupStorage interface. func (r *Range) Append(entries []raftpb.Entry) error { batch := r.rm.Engine().NewBatch() + defer batch.Close() + for _, ent := range entries { err := engine.MVCCPutProto(batch, nil, engine.RaftLogKey(r.Desc().RaftID, ent.Index), proto.ZeroTimestamp, nil, &ent) @@ -312,7 +314,8 @@ func (r *Range) ApplySnapshot(snap raftpb.Snapshot) error { return nil } - batch := engine.NewBatch(r.rm.Engine()) + batch := r.rm.Engine().NewBatch() + defer batch.Close() // Delete everything in the range and recreate it from the snapshot. for iter := newRangeDataIterator(r, r.rm.Engine()); iter.Valid(); iter.Next() { diff --git a/storage/range.go b/storage/range.go index e6c026e8e680..1ba1caa8e2b6 100644 --- a/storage/range.go +++ b/storage/range.go @@ -248,13 +248,14 @@ func (r *Range) start() { // Destroy cleans up all data associated with this range. func (r *Range) Destroy() error { - var deletes []interface{} iter := newRangeDataIterator(r, r.rm.Engine()) defer iter.Close() + batch := r.rm.Engine().NewBatch() + defer batch.Close() for ; iter.Valid(); iter.Next() { - deletes = append(deletes, engine.BatchDelete{RawKeyValue: proto.RawKeyValue{Key: iter.Key()}}) + _ = batch.Clear(iter.Key()) } - return r.rm.Engine().WriteBatch(deletes) + return batch.Commit() } // GetMaxBytes atomically gets the range maximum byte limit. @@ -750,6 +751,8 @@ func (r *Range) applyRaftCommand(index uint64, originNodeID multiraft.NodeID, ar // Create a new batch for the command to ensure all or nothing semantics. batch := r.rm.Engine().NewBatch() + defer batch.Close() + // Create an proto.MVCCStats instance. ms := proto.MVCCStats{}