Skip to content

Commit

Permalink
storage/engine: migrate sstIterator to Pebble
Browse files Browse the repository at this point in the history
This allows us to drop the golang/leveldb dependency.

It also includes a version upgrade for Pebble to b852555.

Fixes #40998.

Release note: None
  • Loading branch information
ajkr committed Oct 8, 2019
1 parent 53ae9e5 commit 2beab58
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 187 deletions.
18 changes: 2 additions & 16 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

218 changes: 60 additions & 158 deletions pkg/storage/engine/sst_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,26 @@ package engine

import (
"bytes"
"encoding/binary"
"fmt"
"os"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/golang/leveldb/db"
"github.com/golang/leveldb/table"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/pkg/errors"
)

var readerOpts = &db.Options{
Comparer: cockroachComparer{},
var readerOpts = &sstable.Options{
Comparer: MVCCComparer,
}

type sstIterator struct {
sst *table.Reader
iter db.Iterator
sst *sstable.Reader
iter sstable.Iterator

valid bool
err error
mvccKey MVCCKey
mvccKey MVCCKey
value []byte
iterValid bool
err error

// For allocation avoidance in NextKey.
nextKeyStart []byte
Expand All @@ -43,81 +40,32 @@ type sstIterator struct {
verify bool
}

var _ SimpleIterator = &sstIterator{}

// NewSSTIterator returns a SimpleIterator for a leveldb formatted sstable on
// disk. It's compatible with sstables output by RocksDBSstFileWriter,
// which means the keys are CockroachDB mvcc keys and they each have the RocksDB
// trailer (of seqno & value type).
// NewSSTIterator returns a `SimpleIterator` for an in-memory sstable.
// It's compatible with sstables written by `RocksDBSstFileWriter` and
// Pebble's `sstable.Writer`, and assumes the keys use Cockroach's MVCC
// format.
func NewSSTIterator(path string) (SimpleIterator, error) {
file, err := db.DefaultFileSystem.Open(path)
file, err := vfs.Default.Open(path)
if err != nil {
return nil, err
}
return &sstIterator{sst: table.NewReader(file, readerOpts)}, nil
}

type memFileInfo int64

var _ os.FileInfo = memFileInfo(0)

func (i memFileInfo) Size() int64 {
return int64(i)
}

func (memFileInfo) IsDir() bool {
return false
}

func (memFileInfo) Name() string {
panic("Name unsupported")
}

func (memFileInfo) Mode() os.FileMode {
panic("Mode unsupported")
}

func (memFileInfo) ModTime() time.Time {
panic("ModTime unsupported")
}

func (memFileInfo) Sys() interface{} {
panic("Sys unsupported")
}

type memFile struct {
*bytes.Reader
size memFileInfo
}

var _ db.File = &memFile{}

func newMemFile(content []byte) *memFile {
return &memFile{Reader: bytes.NewReader(content), size: memFileInfo(len(content))}
}

func (*memFile) Close() error {
return nil
}

func (*memFile) Write(_ []byte) (int, error) {
panic("write unsupported")
}

func (f *memFile) Stat() (os.FileInfo, error) {
return f.size, nil
}

func (*memFile) Sync() error {
return nil
sst, err := sstable.NewReader(file, readerOpts)
if err != nil {
return nil, err
}
return &sstIterator{sst: sst}, nil
}

// NewMemSSTIterator returns a SimpleIterator for a leveldb format sstable in
// memory. It's compatible with sstables output by RocksDBSstFileWriter,
// which means the keys are CockroachDB mvcc keys and they each have the RocksDB
// trailer (of seqno & value type).
// NewMemSSTIterator returns a `SimpleIterator` for an in-memory sstable.
// It's compatible with sstables written by `RocksDBSstFileWriter` and
// Pebble's `sstable.Writer`, and assumes the keys use Cockroach's MVCC
// format.
func NewMemSSTIterator(data []byte, verify bool) (SimpleIterator, error) {
return &sstIterator{sst: table.NewReader(newMemFile(data), readerOpts), verify: verify}, nil
sst, err := sstable.NewReader(vfs.NewMemFile(data), readerOpts)
if err != nil {
return nil, err
}
return &sstIterator{sst: sst, verify: verify}, nil
}

// Close implements the SimpleIterator interface.
Expand All @@ -130,75 +78,59 @@ func (r *sstIterator) Close() {
}
}

// encodeInternalSeekKey encodes an engine.MVCCKey into the RocksDB
// representation and adds padding to the end such that it compares correctly
// with rocksdb "internal" keys which have an 8b suffix, which appear in SSTs
// created by rocks when read directly with a reader like LevelDB's Reader.
func encodeInternalSeekKey(key MVCCKey) []byte {
return append(EncodeKey(key), []byte{0, 0, 0, 0, 0, 0, 0, 0}...)
}

// Seek implements the SimpleIterator interface.
func (r *sstIterator) Seek(key MVCCKey) {
if r.iter != nil {
if r.err = errors.Wrap(r.iter.Close(), "resetting sstable iterator"); r.err != nil {
return
}
if r.err != nil {
return
}
if r.iter == nil {
// Iterator creation happens on the first Seek as it involves I/O.
r.iter = r.sst.NewIter(nil /* lower */, nil /* upper */)
}
var iKey *sstable.InternalKey
iKey, r.value = r.iter.SeekGE(EncodeKey(key))
if iKey != nil {
r.iterValid = true
r.mvccKey, r.err = DecodeMVCCKey(iKey.UserKey)
} else {
r.iterValid = false
r.err = r.iter.Error()
}
if r.iterValid && r.err == nil && r.verify {
r.err = roachpb.Value{RawBytes: r.value}.Verify(r.mvccKey.Key)
}
r.iter = r.sst.Find(encodeInternalSeekKey(key), nil)
r.Next()
}

// Valid implements the SimpleIterator interface.
func (r *sstIterator) Valid() (bool, error) {
return r.valid && r.err == nil, r.err
return r.iterValid && r.err == nil, r.err
}

// Next implements the SimpleIterator interface.
func (r *sstIterator) Next() {
if r.valid = r.iter.Next(); !r.valid {
r.err = errors.Wrap(r.iter.Close(), "closing sstable iterator")
r.iter = nil
return
}

// RocksDB uses the last 8 bytes to pack the sequence number and value type
// into a little-endian encoded uint64. The value type is stored in the
// low byte and the sequence number is in the high 7 bytes. See dbformat.h.
rocksdbInternalKey := r.iter.Key()
if len(rocksdbInternalKey) < 8 {
r.err = errors.Errorf("invalid rocksdb InternalKey: %x", rocksdbInternalKey)
return
}
seqAndValueType := binary.LittleEndian.Uint64(rocksdbInternalKey[len(rocksdbInternalKey)-8:])
if valueType := BatchType(seqAndValueType & 0xff); valueType != BatchTypeValue {
r.err = errors.Errorf("value type not supported: %d", valueType)
if !r.iterValid || r.err != nil {
return
}

key := rocksdbInternalKey[:len(rocksdbInternalKey)-8]

if k, ts, err := enginepb.DecodeKey(key); err == nil {
r.mvccKey.Key = k
r.mvccKey.Timestamp = ts
r.err = nil
var iKey *sstable.InternalKey
iKey, r.value = r.iter.Next()
if iKey != nil {
r.mvccKey, r.err = DecodeMVCCKey(iKey.UserKey)
} else {
r.err = errors.Wrapf(err, "decoding key: %s", key)
return
r.iterValid = false
r.err = r.iter.Error()
}

if r.verify {
r.err = roachpb.Value{RawBytes: r.iter.Value()}.Verify(r.mvccKey.Key)
if r.iterValid && r.err == nil && r.verify {
r.err = roachpb.Value{RawBytes: r.value}.Verify(r.mvccKey.Key)
}
}

// NextKey implements the SimpleIterator interface.
func (r *sstIterator) NextKey() {
if !r.valid {
if !r.iterValid || r.err != nil {
return
}
r.nextKeyStart = append(r.nextKeyStart[:0], r.mvccKey.Key...)
for r.Next(); r.valid && r.err == nil && bytes.Equal(r.nextKeyStart, r.mvccKey.Key); r.Next() {
for r.Next(); r.iterValid && r.err == nil && bytes.Equal(r.nextKeyStart, r.mvccKey.Key); r.Next() {
}
}

Expand All @@ -209,28 +141,7 @@ func (r *sstIterator) UnsafeKey() MVCCKey {

// UnsafeValue implements the SimpleIterator interface.
func (r *sstIterator) UnsafeValue() []byte {
return r.iter.Value()
}

type cockroachComparer struct{}

var _ db.Comparer = cockroachComparer{}

// Compare implements the db.Comparer interface. This is used to compare raw SST
// keys in the sstIterator, and assumes that all keys compared are MVCC encoded
// and then wrapped by rocksdb into Internal keys.
func (cockroachComparer) Compare(a, b []byte) int {
// We assume every key in these SSTs is a rocksdb "internal" key with an 8b
// suffix and need to remove those to compare user keys below.
if len(a) < 8 || len(b) < 8 {
// Special case: either key is empty, so bytes.Compare should work.
if len(a) == 0 || len(b) == 0 {
return bytes.Compare(a, b)
}
panic(fmt.Sprintf("invalid keys: compare expects internal keys with 8b suffix: a: %v b: %v", a, b))
}

return MVCCKeyCompare(a[:len(a)-8], b[:len(b)-8])
return r.value
}

// MVCCKeyCompare compares cockroach keys, including the MVCC timestamps.
Expand Down Expand Up @@ -264,12 +175,3 @@ func MVCCKeyCompare(a, b []byte) int {
// in case the internal suffix differentiates them.
return bytes.Compare(a, b)
}

func (cockroachComparer) Name() string {
// This must match the name in engine/db.cc:DBComparator::Name.
return "cockroach_comparator"
}

func (cockroachComparer) AppendSeparator(dst, a, b []byte) []byte {
panic("unimplemented")
}
23 changes: 11 additions & 12 deletions pkg/storage/engine/sst_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,29 @@ func TestSSTIterator(t *testing.T) {
func TestCockroachComparer(t *testing.T) {
defer leaktest.AfterTest(t)()

keyAMetadata := encodeInternalSeekKey(MVCCKey{
keyAMetadata := MVCCKey{
Key: []byte("a"),
})
keyA2 := encodeInternalSeekKey(MVCCKey{
}
keyA2 := MVCCKey{
Key: []byte("a"),
Timestamp: hlc.Timestamp{WallTime: 2},
})
keyA1 := encodeInternalSeekKey(MVCCKey{
}
keyA1 := MVCCKey{
Key: []byte("a"),
Timestamp: hlc.Timestamp{WallTime: 1},
})
keyB2 := encodeInternalSeekKey(MVCCKey{
}
keyB2 := MVCCKey{
Key: []byte("b"),
Timestamp: hlc.Timestamp{WallTime: 2},
})
}

c := cockroachComparer{}
if x := c.Compare(keyAMetadata, keyA1); x != -1 {
if x := MVCCComparer.Compare(EncodeKey(keyAMetadata), EncodeKey(keyA1)); x != -1 {
t.Errorf("expected key metadata to sort first got: %d", x)
}
if x := c.Compare(keyA2, keyA1); x != -1 {
if x := MVCCComparer.Compare(EncodeKey(keyA2), EncodeKey(keyA1)); x != -1 {
t.Errorf("expected higher timestamp to sort first got: %d", x)
}
if x := c.Compare(keyA2, keyB2); x != -1 {
if x := MVCCComparer.Compare(EncodeKey(keyA2), EncodeKey(keyB2)); x != -1 {
t.Errorf("expected lower key to sort first got: %d", x)
}
}

0 comments on commit 2beab58

Please sign in to comment.