Skip to content

Commit

Permalink
db: add format major version for range keys
Browse files Browse the repository at this point in the history
Introduce a new format major version for range keys, with associated
table format `Pebble,v2`. When the DB is opened at this version, it is
free to make use of range key features.

Add various validations and assertions based on the new format major
version. For example, if the DB version does not support range keys,
ingesting tables with range keys will fail, etc.

Related to cockroachdb#1339.
  • Loading branch information
nicktrav committed Feb 8, 2022
1 parent db46dab commit 55452af
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 32 deletions.
23 changes: 17 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,14 @@ func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
if d.opts.Experimental.RangeKeys == nil {
panic("pebble: range keys require the Experimental.RangeKeys option")
}
// TODO(jackson): Assert that all range key operands are suffixless.
if d.FormatMajorVersion() < FormatRangeKeys {
panic(fmt.Sprintf(
"pebble: range keys require at least format major version %d (current: %d)",
FormatRangeKeys, d.FormatMajorVersion(),
))
}

// TODO(jackson): Once the format major version for range keys is
// introduced, error if the batch includes range keys but the active
// format major version doesn't enable them.
// TODO(jackson): Assert that all range key operands are suffixless.
}

if batch.db == nil {
Expand Down Expand Up @@ -869,8 +872,16 @@ func (d *DB) newIterInternal(batch *Batch, s *Snapshot, o *IterOptions) *Iterato
if err := d.closed.Load(); err != nil {
panic(err)
}
if o.rangeKeys() && d.opts.Experimental.RangeKeys == nil {
panic("pebble: range keys require the Experimental.RangeKeys option")
if o.rangeKeys() {
if d.opts.Experimental.RangeKeys == nil {
panic("pebble: range keys require the Experimental.RangeKeys option")
}
if d.FormatMajorVersion() < FormatRangeKeys {
panic(fmt.Sprintf(
"pebble: range keys require at least format major version %d (current: %d)",
FormatRangeKeys, d.FormatMajorVersion(),
))
}
}
if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
Expand Down
9 changes: 8 additions & 1 deletion format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ const (
// FormatBlockPropertyCollector is a format major version that introduces
// BlockPropertyCollectors.
FormatBlockPropertyCollector
// FormatRangeKeys is a format major version that introduces range keys.
FormatRangeKeys
// FormatNewest always contains the most recent format major version.
// NB: When adding new versions, the MaxTableFormat method should also be
// updated to return the maximum allowable version for the new
// FormatMajorVersion.
FormatNewest FormatMajorVersion = FormatBlockPropertyCollector
FormatNewest FormatMajorVersion = FormatRangeKeys
)

// MaxTableFormat returns the maximum sstable.TableFormat that can be used at
Expand All @@ -86,6 +88,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
return sstable.TableFormatRocksDBv2
case FormatBlockPropertyCollector:
return sstable.TableFormatPebblev1
case FormatRangeKeys:
return sstable.TableFormatPebblev2
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
}
Expand Down Expand Up @@ -167,6 +171,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatBlockPropertyCollector: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector)
},
FormatRangeKeys: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatRangeKeys)
},
}

const formatVersionMarkerName = `format-version`
Expand Down
3 changes: 3 additions & 0 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func TestRatchetFormat(t *testing.T) {
require.Equal(t, FormatSetWithDelete, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatBlockPropertyCollector))
require.Equal(t, FormatBlockPropertyCollector, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatRangeKeys))
require.Equal(t, FormatRangeKeys, d.FormatMajorVersion())
require.NoError(t, d.Close())

// If we Open the database again, leaving the default format, the
Expand Down Expand Up @@ -189,6 +191,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
FormatVersioned: sstable.TableFormatRocksDBv2,
FormatSetWithDelete: sstable.TableFormatRocksDBv2,
FormatBlockPropertyCollector: sstable.TableFormatPebblev1,
FormatRangeKeys: sstable.TableFormatPebblev2,
}

// Valid versions.
Expand Down
28 changes: 24 additions & 4 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func ingestValidateKey(opts *Options, key *InternalKey) error {
}

func ingestLoad1(
opts *Options, path string, cacheID uint64, fileNum FileNum,
opts *Options,
fmv FormatMajorVersion,
path string,
cacheID uint64,
fileNum FileNum,
) (*fileMetadata, error) {
stat, err := opts.FS.Stat(path)
if err != nil {
Expand All @@ -64,6 +68,18 @@ func ingestLoad1(
}
defer r.Close()

// Avoid ingesting tables with format versions this DB doesn't support.
tf, err := r.TableFormat()
if err != nil {
return nil, err
}
if tf > fmv.MaxTableFormat() {
return nil, errors.Newf(
"pebble: table with format %s unsupported at DB format major version %d, %s",
tf, fmv, fmv.MaxTableFormat(),
)
}

meta := &fileMetadata{}
meta.FileNum = fileNum
meta.Size = uint64(stat.Size())
Expand Down Expand Up @@ -154,12 +170,16 @@ func ingestLoad1(
}

func ingestLoad(
opts *Options, paths []string, cacheID uint64, pending []FileNum,
opts *Options,
fmv FormatMajorVersion,
paths []string,
cacheID uint64,
pending []FileNum,
) ([]*fileMetadata, []string, error) {
meta := make([]*fileMetadata, 0, len(paths))
newPaths := make([]string, 0, len(paths))
for i := range paths {
m, err := ingestLoad1(opts, paths[i], cacheID, pending[i])
m, err := ingestLoad1(opts, fmv, paths[i], cacheID, pending[i])
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -525,7 +545,7 @@ func (d *DB) Ingest(paths []string) error {

// Load the metadata for all of the files being ingested. This step detects
// and elides empty sstables.
meta, paths, err := ingestLoad(d.opts, paths, d.cacheID, pendingOutputs)
meta, paths, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs)
if err != nil {
return err
}
Expand Down
25 changes: 21 additions & 4 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,28 @@ func TestIngestLoad(t *testing.T) {
datadriven.RunTest(t, "testdata/ingest_load", func(td *datadriven.TestData) string {
switch td.Cmd {
case "load":
writerOpts := sstable.WriterOptions{}
var dbVersion FormatMajorVersion
for _, cmdArgs := range td.CmdArgs {
v, err := strconv.Atoi(cmdArgs.Vals[0])
if err != nil {
return err.Error()
}
switch k := cmdArgs.Key; k {
case "writer-version":
fmv := FormatMajorVersion(v)
writerOpts.TableFormat = fmv.MaxTableFormat()
case "db-version":
dbVersion = FormatMajorVersion(v)
default:
return fmt.Sprintf("unknown cmd %s\n", k)
}
}
f, err := mem.Create("ext")
if err != nil {
return err.Error()
}
w := sstable.NewWriter(f, sstable.WriterOptions{})
w := sstable.NewWriter(f, writerOpts)
for _, data := range strings.Split(td.Input, "\n") {
j := strings.Index(data, ":")
if j < 0 {
Expand All @@ -57,7 +74,7 @@ func TestIngestLoad(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}
meta, _, err := ingestLoad(opts, []string{"ext"}, 0, []FileNum{1})
meta, _, err := ingestLoad(opts, dbVersion, []string{"ext"}, 0, []FileNum{1})
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -141,7 +158,7 @@ func TestIngestLoadRand(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}
meta, _, err := ingestLoad(opts, paths, 0, pending)
meta, _, err := ingestLoad(opts, FormatNewest, paths, 0, pending)
require.NoError(t, err)

for _, m := range meta {
Expand All @@ -162,7 +179,7 @@ func TestIngestLoadInvalid(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}
if _, _, err := ingestLoad(opts, []string{"invalid"}, 0, []FileNum{1}); err == nil {
if _, _, err := ingestLoad(opts, FormatNewest, []string{"invalid"}, 0, []FileNum{1}); err == nil {
t.Fatalf("expected error, but found success")
}
}
Expand Down
2 changes: 1 addition & 1 deletion open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) {
"LOCK",
"MANIFEST-000001",
"OPTIONS-000003",
"marker.format-version.000004.005",
"marker.format-version.000005.006",
"marker.manifest.000001.MANIFEST-000001",
},
}
Expand Down
47 changes: 43 additions & 4 deletions range_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package pebble
import (
"bytes"
"fmt"
"strconv"
"testing"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -35,10 +37,24 @@ func TestRangeKeys(t *testing.T) {
require.NoError(t, d.Close())
}
opts := &Options{
FS: vfs.NewMem(),
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
Comparer: testkeys.Comparer,
FormatMajorVersion: FormatRangeKeys,
}
opts.Experimental.RangeKeys = new(RangeKeysArena)

for _, cmdArgs := range td.CmdArgs {
if cmdArgs.Key != "format-major-version" {
return fmt.Sprintf("unknown command %s\n", cmdArgs.Key)
}
v, err := strconv.Atoi(cmdArgs.Vals[0])
if err != nil {
return err.Error()
}
// Override the DB version.
opts.FormatMajorVersion = FormatMajorVersion(v)
}

var err error
d, err = Open("", opts)
require.NoError(t, err)
Expand All @@ -52,8 +68,19 @@ func TestRangeKeys(t *testing.T) {
case "batch":
b := d.NewBatch()
require.NoError(t, runBatchDefineCmd(td, b))
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = errors.New(r.(string))
}
}()
err = b.Commit(nil)
}()
if err != nil {
return err.Error()
}
count := b.Count()
require.NoError(t, b.Commit(nil))
return fmt.Sprintf("wrote %d keys\n", count)
case "indexed-batch":
b = d.NewIndexedBatch()
Expand All @@ -76,7 +103,19 @@ func TestRangeKeys(t *testing.T) {
}
o.RangeKeyMasking.Suffix = []byte(arg.Vals[0])
}
iter := newIter(o)
var iter *Iterator
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = errors.New(r.(string))
}
}()
iter = newIter(o)
}()
if err != nil {
return err.Error()
}
return runIterCmd(td, iter, true /* close iter */)
case "rangekey-iter":
iter := newIter(&IterOptions{KeyTypes: IterKeyTypeRangesOnly})
Expand Down
10 changes: 10 additions & 0 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,7 @@ type Reader struct {
mergerOK bool
checksumType ChecksumType
tableFilter *tableFilterReader
tableFormat TableFormat
Properties Properties
}

Expand Down Expand Up @@ -2690,6 +2691,14 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
return endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset, nil
}

// TableFormat returns the format version for the table.
func (r *Reader) TableFormat() (TableFormat, error) {
if r.err != nil {
return TableFormatUnspecified, r.err
}
return r.tableFormat, nil
}

// ReadableFile describes subset of vfs.File required for reading SSTs.
type ReadableFile interface {
io.ReaderAt
Expand Down Expand Up @@ -2735,6 +2744,7 @@ func NewReader(f ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Rea
return nil, r.Close()
}
r.checksumType = footer.checksum
r.tableFormat = footer.format
// Read the metaindex.
if err := r.readMetaindex(footer.metaindexBH); err != nil {
r.err = err
Expand Down
29 changes: 29 additions & 0 deletions sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,35 @@ func TestValidateBlockChecksums(t *testing.T) {
}
}

func TestReader_TableFormat(t *testing.T) {
test := func(t *testing.T, want TableFormat) {
fs := vfs.NewMem()
f, err := fs.Create("test")
require.NoError(t, err)

opts := WriterOptions{TableFormat: want}
w := NewWriter(f, opts)
err = w.Close()
require.NoError(t, err)

f, err = fs.Open("test")
require.NoError(t, err)
r, err := NewReader(f, ReaderOptions{})
require.NoError(t, err)
defer r.Close()

got, err := r.TableFormat()
require.NoError(t, err)
require.Equal(t, want, got)
}

for tf := TableFormatLevelDB; tf <= TableFormatMax; tf++ {
t.Run(tf.String(), func(t *testing.T) {
test(t, tf)
})
}
}

func buildTestTable(
t *testing.T, numEntries uint64, blockSize, indexBlockSize int, compression Compression,
) *Reader {
Expand Down
Loading

0 comments on commit 55452af

Please sign in to comment.