From e7d0a7b722ef7351aad3dd5b5ba70f147497e309 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 30 Sep 2019 17:00:22 +0530 Subject: [PATCH] Support compression in Badger (#1013) This commit adds support for compression in badger. Two compression algorithms - Snappy and ZSTD are supported for now. The compression algorithm information is stored in the manifest file. We compress blocks (typically of size 4KB) stored in the SST. The compression algorithm can be specified via the CompressionType option to badger. --- appveyor.yml | 3 +- db.go | 19 ++---- db2_test.go | 15 ++--- db_test.go | 29 ++++++++-- go.mod | 2 + go.sum | 3 + iterator_test.go | 2 +- levels.go | 30 ++++------ manifest.go | 29 ++++++---- manifest_test.go | 21 ++++--- options.go | 22 +++++++ options/options.go | 12 ++++ pb/pb.pb.go | 120 ++++++++++++++++++++++++-------------- pb/pb.proto | 23 ++++---- stream_writer.go | 29 ++++------ stream_writer_test.go | 1 - table/builder.go | 29 ++++++++++ table/builder_test.go | 29 ++++++++-- table/table.go | 50 ++++++++++++++-- table/table_test.go | 132 +++++++++++++++++++++++++----------------- 20 files changed, 394 insertions(+), 206 deletions(-) diff --git a/appveyor.yml b/appveyor.yml index 36853e9d1..ac3a9505c 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -17,10 +17,11 @@ environment: # scripts that run after cloning repository install: - - set PATH=%GOPATH%\bin;c:\go\bin;%PATH% + - set PATH=%GOPATH%\bin;c:\go\bin;c:\msys64\mingw64\bin;%PATH% - go version - go env - python --version + - gcc --version # To run your custom scripts instead of automatic MSBuild build_script: diff --git a/db.go b/db.go index 493edf0a6..39b12dd98 100644 --- a/db.go +++ b/db.go @@ -891,7 +891,7 @@ type flushTask struct { // handleFlushTask must be run serially. func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scnerio, when empty memtable is flushed. For example, memtable is empty and + // There can be a scenario, when empty memtable is flushed. For example, memtable is empty and // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. if ft.mt.Empty() { return nil @@ -911,16 +911,13 @@ func (db *DB) handleFlushTask(ft flushTask) error { if err != nil { return y.Wrapf(err, "failed to get datakey in db.handleFlushTask") } - bopts := table.Options{ - BlockSize: db.opt.BlockSize, - BloomFalsePositive: db.opt.BloomFalsePositive, - DataKey: dk, - } + bopts := buildTableOptions(db.opt) + bopts.DataKey = dk tableData := buildL0Table(ft, bopts) fileID := db.lc.reserveFileID() if db.opt.KeepL0InMemory { - tbl, err := table.OpenInMemoryTable(tableData, fileID, dk) + tbl, err := table.OpenInMemoryTable(tableData, fileID, &bopts) if err != nil { return errors.Wrapf(err, "failed to open table in memory") } @@ -945,13 +942,7 @@ func (db *DB) handleFlushTask(ft flushTask) error { // Do dir sync as best effort. No need to return due to an error there. db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr) } - - opts := table.Options{ - LoadingMode: db.opt.TableLoadingMode, - ChkMode: db.opt.ChecksumVerificationMode, - DataKey: dk, - } - tbl, err := table.OpenTable(fd, opts) + tbl, err := table.OpenTable(fd, bopts) if err != nil { db.elog.Printf("ERROR while opening table: %v", err) return err diff --git a/db2_test.go b/db2_test.go index 5b19aba03..b9ad5abbd 100644 --- a/db2_test.go +++ b/db2_test.go @@ -505,9 +505,10 @@ func TestCompactionFilePicking(t *testing.T) { // addToManifest function is used in TestCompactionFilePicking. It adds table to db manifest. func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) { change := &pb.ManifestChange{ - Id: tab.ID(), - Op: pb.ManifestChange_CREATE, - Level: level, + Id: tab.ID(), + Op: pb.ManifestChange_CREATE, + Level: level, + Compression: uint32(tab.CompressionType()), } require.NoError(t, db.manifest.addChanges([]*pb.ManifestChange{change}), "unable to add to manifest") @@ -516,10 +517,7 @@ func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) { // createTableWithRange function is used in TestCompactionFilePicking. It creates // a table with key starting from start and ending with end. func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table { - bopts := table.Options{ - BlockSize: db.opt.BlockSize, - BloomFalsePositive: db.opt.BloomFalsePositive, - } + bopts := buildTableOptions(db.opt) b := table.NewTableBuilder(bopts) nums := []int{start, end} for _, i := range nums { @@ -537,8 +535,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table { _, err = fd.Write(b.Finish()) require.NoError(t, err, "unable to write to file") - opts := table.Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification} - tab, err := table.OpenTable(fd, opts) + tab, err := table.OpenTable(fd, bopts) require.NoError(t, err) return tab } diff --git a/db_test.go b/db_test.go index c51dec4dd..c4d534499 100644 --- a/db_test.go +++ b/db_test.go @@ -588,7 +588,7 @@ func TestIterate2Basic(t *testing.T) { }) } -func TestLoadAndEncryption(t *testing.T) { +func TestLoad(t *testing.T) { testLoad := func(t *testing.T, opt Options) { dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) @@ -642,14 +642,33 @@ func TestLoadAndEncryption(t *testing.T) { sort.Slice(fileIDs, func(i, j int) bool { return fileIDs[i] < fileIDs[j] }) fmt.Printf("FileIDs: %v\n", fileIDs) } - t.Run("TestLoad Without Encryption", func(t *testing.T) { - testLoad(t, getTestOptions("")) + t.Run("TestLoad Without Encryption/Compression", func(t *testing.T) { + opt := getTestOptions("") + opt.Compression = options.None + testLoad(t, opt) }) - t.Run("TestLoad With Encryption", func(t *testing.T) { + t.Run("TestLoad With Encryption and no compression", func(t *testing.T) { key := make([]byte, 32) _, err := rand.Read(key) require.NoError(t, err) - testLoad(t, getTestOptions("").WithEncryptionKey(key)) + opt := getTestOptions("") + opt.EncryptionKey = key + opt.Compression = options.None + testLoad(t, opt) + }) + t.Run("TestLoad With Encryption and compression", func(t *testing.T) { + key := make([]byte, 32) + _, err := rand.Read(key) + require.NoError(t, err) + opt := getTestOptions("") + opt.EncryptionKey = key + opt.Compression = options.ZSTD + testLoad(t, opt) + }) + t.Run("TestLoad without Encryption and with compression", func(t *testing.T) { + opt := getTestOptions("") + opt.Compression = options.ZSTD + testLoad(t, opt) }) } diff --git a/go.mod b/go.mod index ed247de91..c52fb3ace 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,14 @@ module github.com/dgraph-io/badger go 1.12 require ( + github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash/v2 v2.1.0 // indirect github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 + github.com/golang/snappy v0.0.1 github.com/pkg/errors v0.8.1 github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cobra v0.0.5 diff --git a/go.sum b/go.sum index 8f75b6eca..4a485aafa 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= +github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VictoriaMetrics/fastcache v1.5.1/go.mod h1:+jv9Ckb+za/P1ZRg/sulP5Ni1v49daAVERr0H3CuscE= @@ -31,6 +33,7 @@ github.com/goburrow/cache v0.1.0/go.mod h1:8oxkfud4hvjO4tNjEKZfEd+LrpDVDlBIauGYs github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= diff --git a/iterator_test.go b/iterator_test.go index 9c4461ecc..548b6242c 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -79,9 +79,9 @@ func TestPickSortTables(t *testing.T) { genTables := func(mks ...MockKeys) []*table.Table { out := make([]*table.Table, 0) for _, mk := range mks { - f := buildTable(t, [][]string{{mk.small, "some value"}, {mk.large, "some value"}}) opts := table.Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + f := buildTable(t, [][]string{{mk.small, "some value"}, {mk.large, "some value"}}, opts) tbl, err := table.OpenTable(f, opts) require.NoError(t, err) out = append(out, tbl) diff --git a/levels.go b/levels.go index ed2144f34..e5083f562 100644 --- a/levels.go +++ b/levels.go @@ -154,12 +154,11 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { rerr = errors.Wrapf(err, "Error while reading datakey") return } - opts := table.Options{ - LoadingMode: db.opt.TableLoadingMode, - ChkMode: db.opt.ChecksumVerificationMode, - DataKey: dk, - } - t, err := table.OpenTable(fd, opts) + topt := buildTableOptions(db.opt) + // Set compression from table manifest. + topt.Compression = tf.Compression + topt.DataKey = dk + t, err := table.OpenTable(fd, topt) if err != nil { if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") { db.opt.Errorf(err.Error()) @@ -508,11 +507,8 @@ func (s *levelsController) compactBuildTables( return nil, nil, y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables") } - bopts := table.Options{ - BlockSize: s.kv.opt.BlockSize, - BloomFalsePositive: s.kv.opt.BloomFalsePositive, - DataKey: dk, - } + bopts := buildTableOptions(s.kv.opt) + bopts.DataKey = dk builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 for ; it.Valid(); it.Next() { @@ -593,13 +589,7 @@ func (s *levelsController) compactBuildTables( if _, err := fd.Write(builder.Finish()); err != nil { return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) } - - opts := table.Options{ - LoadingMode: s.kv.opt.TableLoadingMode, - ChkMode: s.kv.opt.ChecksumVerificationMode, - DataKey: builder.DataKey(), - } - tbl, err := table.OpenTable(fd, opts) + tbl, err := table.OpenTable(fd, bopts) // decrRef is added below. return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name()) } @@ -659,7 +649,7 @@ func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeS changes := []*pb.ManifestChange{} for _, table := range newTables { changes = append(changes, - newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID())) + newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType())) } for _, table := range cd.top { // Add a delete change only if the table is not in memory. @@ -895,7 +885,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // the proper order. (That means this update happens before that of some compaction which // deletes the table.) err := s.kv.manifest.addChanges([]*pb.ManifestChange{ - newCreateChange(t.ID(), 0, t.KeyID()), + newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()), }) if err != nil { return err diff --git a/manifest.go b/manifest.go index f54508171..f7a73294e 100644 --- a/manifest.go +++ b/manifest.go @@ -27,6 +27,7 @@ import ( "path/filepath" "sync" + "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/y" "github.com/golang/protobuf/proto" @@ -65,11 +66,12 @@ type levelManifest struct { Tables map[uint64]struct{} // Set of table id's } -// TableManifest contains information about a specific level +// TableManifest contains information about a specific table // in the LSM tree. type TableManifest struct { - Level uint8 - KeyID uint64 + Level uint8 + KeyID uint64 + Compression options.CompressionType } // manifestFile holds the file pointer (and other info) about the manifest file, which is a log @@ -100,7 +102,7 @@ const ( func (m *Manifest) asChanges() []*pb.ManifestChange { changes := make([]*pb.ManifestChange, 0, len(m.Tables)) for id, tm := range m.Tables { - changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID)) + changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID, tm.Compression)) } return changes } @@ -395,8 +397,9 @@ func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error { return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id) } build.Tables[tc.Id] = TableManifest{ - Level: uint8(tc.Level), - KeyID: tc.KeyId, + Level: uint8(tc.Level), + KeyID: tc.KeyId, + Compression: options.CompressionType(tc.Compression), } for len(build.Levels) <= int(tc.Level) { build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})}) @@ -428,14 +431,16 @@ func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error { return nil } -func newCreateChange(id uint64, level int, keyID uint64) *pb.ManifestChange { +func newCreateChange( + id uint64, level int, keyID uint64, c options.CompressionType) *pb.ManifestChange { return &pb.ManifestChange{ - Id: id, - Op: pb.ManifestChange_CREATE, - Level: uint32(level), - KeyId: keyID, + Id: id, + Op: pb.ManifestChange_CREATE, + Level: uint32(level), + KeyId: keyID, + // Hard coding it, since we're supporting only AES for now. EncryptionAlgo: pb.EncryptionAlgo_aes, - // Hardcoding it, since we're supporting only AES for now. + Compression: uint32(c), } } diff --git a/manifest_test.go b/manifest_test.go index 2c9eada08..a0a0c01ae 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -112,7 +112,7 @@ func key(prefix string, i int) string { return prefix + fmt.Sprintf("%04d", i) } -func buildTestTable(t *testing.T, prefix string, n int) *os.File { +func buildTestTable(t *testing.T, prefix string, n int, opts table.Options) *os.File { y.AssertTrue(n <= 10000) keyValues := make([][]string, n) for i := 0; i < n; i++ { @@ -120,13 +120,18 @@ func buildTestTable(t *testing.T, prefix string, n int) *os.File { v := fmt.Sprintf("%d", i) keyValues[i] = []string{k, v} } - return buildTable(t, keyValues) + return buildTable(t, keyValues, opts) } // TODO - Move these to somewhere where table package can also use it. // keyValues is n by 2 where n is number of pairs. -func buildTable(t *testing.T, keyValues [][]string) *os.File { - bopts := table.Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} +func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.File { + if bopts.BloomFalsePositive == 0 { + bopts.BloomFalsePositive = 0.01 + } + if bopts.BlockSize == 0 { + bopts.BlockSize = 4 * 1024 + } b := table.NewTableBuilder(bopts) defer b.Close() // TODO: Add test for file garbage collection here. No files should be left after the tests here. @@ -166,8 +171,8 @@ func TestOverlappingKeyRangeError(t *testing.T) { lh0 := newLevelHandler(kv, 0) lh1 := newLevelHandler(kv, 1) - f := buildTestTable(t, "k", 2) opts := table.Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + f := buildTestTable(t, "k", 2, opts) t1, err := table.OpenTable(f, opts) require.NoError(t, err) defer t1.DecrRef() @@ -188,7 +193,7 @@ func TestOverlappingKeyRangeError(t *testing.T) { require.Equal(t, true, done) lc.runCompactDef(0, cd) - f = buildTestTable(t, "l", 2) + f = buildTestTable(t, "l", 2, opts) t2, err := table.OpenTable(f, opts) require.NoError(t, err) defer t2.DecrRef() @@ -220,13 +225,13 @@ func TestManifestRewrite(t *testing.T) { require.Equal(t, 0, m.Deletions) err = mf.addChanges([]*pb.ManifestChange{ - newCreateChange(0, 0, 0), + newCreateChange(0, 0, 0, 0), }) require.NoError(t, err) for i := uint64(0); i < uint64(deletionsThreshold*3); i++ { ch := []*pb.ManifestChange{ - newCreateChange(i+1, 0, 0), + newCreateChange(i+1, 0, 0, 0), newDeleteChange(i), } err := mf.addChanges(ch) diff --git a/options.go b/options.go index 0068d541f..293e09f4c 100644 --- a/options.go +++ b/options.go @@ -20,6 +20,7 @@ import ( "time" "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/badger/table" ) // Note: If you add a new option X make sure you also add a WithX method on Options. @@ -46,6 +47,7 @@ type Options struct { ReadOnly bool Truncate bool Logger Logger + Compression options.CompressionType EventLogging bool // Fine tuning options. @@ -112,6 +114,7 @@ func DefaultOptions(path string) Options { NumVersionsToKeep: 1, CompactL0OnClose: true, KeepL0InMemory: true, + Compression: options.ZSTD, // Nothing to read/write value log using standard File I/O // MemoryMap to mmap() the value log files // (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32. @@ -129,6 +132,16 @@ func DefaultOptions(path string) Options { } } +func buildTableOptions(opt Options) table.Options { + return table.Options{ + BlockSize: opt.BlockSize, + BloomFalsePositive: opt.BloomFalsePositive, + LoadingMode: opt.TableLoadingMode, + ChkMode: opt.ChecksumVerificationMode, + Compression: opt.Compression, + } +} + const ( maxValueThreshold = (1 << 20) // 1 MB ) @@ -461,3 +474,12 @@ func (opt Options) WithKeepL0InMemory(val bool) Options { opt.KeepL0InMemory = val return opt } + +// WithCompressionType returns a new Options value with CompressionType set to the given value. +// +// When compression type is set, every block will be compressed using the specified algorithm. +// This option doesn't affect existing tables. Only the newly created tables will be compressed. +func (opt Options) WithCompressionType(cType options.CompressionType) Options { + opt.Compression = cType + return opt +} diff --git a/options/options.go b/options/options.go index f73553ab5..564f780f1 100644 --- a/options/options.go +++ b/options/options.go @@ -43,3 +43,15 @@ const ( // on SSTable opening and on every block read. OnTableAndBlockRead ) + +// CompressionType specifies how a block should be compressed. +type CompressionType uint32 + +const ( + // None mode indicates that a block is not compressed. + None CompressionType = 0 + // Snappy mode indicates that a block is compressed using Snappy algorithm. + Snappy CompressionType = 1 + // ZSTD mode indicates that a block is compressed using ZSTD algorithm. + ZSTD CompressionType = 2 +) diff --git a/pb/pb.pb.go b/pb/pb.pb.go index 7a989a6c5..bfff39b7b 100644 --- a/pb/pb.pb.go +++ b/pb/pb.pb.go @@ -285,11 +285,12 @@ func (m *ManifestChangeSet) GetChanges() []*ManifestChange { } type ManifestChange struct { - Id uint64 `protobuf:"varint,1,opt,name=Id,json=id,proto3" json:"Id,omitempty"` - Op ManifestChange_Operation `protobuf:"varint,2,opt,name=Op,json=op,proto3,enum=pb.ManifestChange_Operation" json:"Op,omitempty"` - Level uint32 `protobuf:"varint,3,opt,name=Level,json=level,proto3" json:"Level,omitempty"` + Id uint64 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` + Op ManifestChange_Operation `protobuf:"varint,2,opt,name=Op,proto3,enum=pb.ManifestChange_Operation" json:"Op,omitempty"` + Level uint32 `protobuf:"varint,3,opt,name=Level,proto3" json:"Level,omitempty"` KeyId uint64 `protobuf:"varint,4,opt,name=key_id,json=keyId,proto3" json:"key_id,omitempty"` EncryptionAlgo EncryptionAlgo `protobuf:"varint,5,opt,name=encryption_algo,json=encryptionAlgo,proto3,enum=pb.EncryptionAlgo" json:"encryption_algo,omitempty"` + Compression uint32 `protobuf:"varint,6,opt,name=compression,proto3" json:"compression,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -363,6 +364,13 @@ func (m *ManifestChange) GetEncryptionAlgo() EncryptionAlgo { return EncryptionAlgo_aes } +func (m *ManifestChange) GetCompression() uint32 { + if m != nil { + return m.Compression + } + return 0 +} + type BlockOffset struct { Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Offset uint32 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` @@ -624,45 +632,46 @@ func init() { func init() { proto.RegisterFile("pb.proto", fileDescriptor_f80abaa17e25ccc8) } var fileDescriptor_f80abaa17e25ccc8 = []byte{ - // 599 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xce, 0x3a, 0x8e, 0x93, 0x4c, 0xdb, 0xd4, 0xac, 0xa0, 0x32, 0x02, 0xa2, 0x60, 0x84, 0x14, - 0xaa, 0x2a, 0x87, 0x16, 0x71, 0xe1, 0x94, 0xa6, 0x41, 0x44, 0x69, 0x55, 0x69, 0xa9, 0xaa, 0x8a, - 0x4b, 0xb4, 0x89, 0x27, 0x8d, 0x65, 0xc7, 0x6b, 0x79, 0x37, 0x56, 0xf3, 0x26, 0xbc, 0x07, 0x2f, - 0xc1, 0x91, 0x17, 0x40, 0x42, 0xe5, 0x45, 0xd0, 0xae, 0x9d, 0xaa, 0x11, 0xdc, 0x66, 0xbe, 0x6f, - 0x76, 0x7e, 0xbe, 0x99, 0x85, 0x46, 0x3a, 0xed, 0xa5, 0x99, 0x50, 0x82, 0x5a, 0xe9, 0xd4, 0xff, - 0x4e, 0xc0, 0x1a, 0x5f, 0x53, 0x17, 0xaa, 0x11, 0xae, 0x3d, 0xd2, 0x21, 0xdd, 0x5d, 0xa6, 0x4d, - 0xfa, 0x14, 0x6a, 0x39, 0x8f, 0x57, 0xe8, 0x59, 0x06, 0x2b, 0x1c, 0xfa, 0x02, 0x9a, 0x2b, 0x89, - 0xd9, 0x64, 0x89, 0x8a, 0x7b, 0x55, 0xc3, 0x34, 0x34, 0x70, 0x81, 0x8a, 0x53, 0x0f, 0xea, 0x39, - 0x66, 0x32, 0x14, 0x89, 0x67, 0x77, 0x48, 0xd7, 0x66, 0x1b, 0x97, 0xbe, 0x02, 0xc0, 0xbb, 0x34, - 0xcc, 0x50, 0x4e, 0xb8, 0xf2, 0x6a, 0x86, 0x6c, 0x96, 0x48, 0x5f, 0x51, 0x0a, 0xb6, 0x49, 0xe8, - 0x98, 0x84, 0xc6, 0xd6, 0x95, 0xa4, 0xca, 0x90, 0x2f, 0x27, 0x61, 0xe0, 0x41, 0x87, 0x74, 0xf7, - 0x58, 0xa3, 0x00, 0x46, 0x81, 0xdf, 0x01, 0x67, 0x7c, 0x7d, 0x1e, 0x4a, 0x45, 0x0f, 0xc0, 0x8a, - 0x72, 0x8f, 0x74, 0xaa, 0xdd, 0x9d, 0x63, 0xa7, 0x97, 0x4e, 0x7b, 0xe3, 0x6b, 0x66, 0x45, 0xb9, - 0xdf, 0x87, 0x27, 0x17, 0x3c, 0x09, 0xe7, 0x28, 0xd5, 0x60, 0xc1, 0x93, 0x5b, 0xfc, 0x82, 0x8a, - 0x1e, 0x41, 0x7d, 0x66, 0x1c, 0x59, 0xbe, 0xa0, 0xfa, 0xc5, 0x76, 0x1c, 0xdb, 0x84, 0xf8, 0xbf, - 0x08, 0xb4, 0xb6, 0x39, 0xda, 0x02, 0x6b, 0x14, 0x18, 0x95, 0x6c, 0x66, 0x85, 0x01, 0x3d, 0x02, - 0xeb, 0x32, 0x35, 0x0a, 0xb5, 0x8e, 0x5f, 0xfe, 0x9b, 0xab, 0x77, 0x99, 0x62, 0xc6, 0x55, 0x28, - 0x12, 0x66, 0x89, 0x54, 0x4b, 0x7a, 0x8e, 0x39, 0xc6, 0x46, 0xb8, 0x3d, 0x56, 0x8b, 0xb5, 0x43, - 0x9f, 0x81, 0x13, 0xe1, 0x5a, 0x4f, 0x59, 0x88, 0x56, 0x8b, 0x70, 0x3d, 0x0a, 0xe8, 0x47, 0xd8, - 0xc7, 0x64, 0x96, 0xad, 0x53, 0xfd, 0x7c, 0xc2, 0xe3, 0x5b, 0x61, 0x74, 0x6b, 0x15, 0x3d, 0x0f, - 0x1f, 0xa8, 0x7e, 0x7c, 0x2b, 0x58, 0x0b, 0xb7, 0x7c, 0xff, 0x0d, 0x34, 0x1f, 0x4a, 0x53, 0x00, - 0x67, 0xc0, 0x86, 0xfd, 0xab, 0xa1, 0x5b, 0xd1, 0xf6, 0xd9, 0xf0, 0x7c, 0x78, 0x35, 0x74, 0x89, - 0x3f, 0x82, 0x9d, 0xd3, 0x58, 0xcc, 0xa2, 0xcb, 0xf9, 0x5c, 0xa2, 0xfa, 0xcf, 0x09, 0x1c, 0x80, - 0x23, 0x0c, 0x67, 0x26, 0xdc, 0x63, 0xa5, 0xa7, 0x23, 0x63, 0x4c, 0xca, 0x29, 0xb4, 0xe9, 0x7f, - 0x05, 0xb8, 0xe2, 0xd3, 0x18, 0x47, 0x49, 0x80, 0x77, 0xf4, 0x1d, 0xd4, 0x8b, 0xc8, 0x8d, 0xcc, - 0xfb, 0xba, 0xe5, 0x47, 0xb5, 0xd8, 0x86, 0xa7, 0xaf, 0x61, 0x77, 0x1a, 0x0b, 0xb1, 0x9c, 0xcc, - 0xc3, 0x58, 0x61, 0x56, 0x1e, 0xdb, 0x8e, 0xc1, 0x3e, 0x19, 0xc8, 0x17, 0xd0, 0x18, 0x2c, 0x70, - 0x16, 0xc9, 0xd5, 0x92, 0x1e, 0x82, 0x6d, 0x94, 0x20, 0x46, 0x89, 0x03, 0x9d, 0x76, 0xc3, 0xf5, - 0xf4, 0xe0, 0x59, 0xa8, 0x16, 0x4b, 0x66, 0x62, 0x74, 0x97, 0x72, 0xb5, 0x34, 0x19, 0x6d, 0xa6, - 0x4d, 0xff, 0x2d, 0x34, 0x1f, 0x82, 0x0a, 0x55, 0x06, 0x27, 0xc7, 0x03, 0xb7, 0x42, 0x77, 0xa1, - 0x71, 0x73, 0xf3, 0x99, 0xcb, 0xc5, 0x87, 0xf7, 0x2e, 0xf1, 0x67, 0x50, 0x3f, 0xe3, 0x8a, 0x8f, - 0x71, 0xfd, 0x68, 0x37, 0xe4, 0xf1, 0x6e, 0x28, 0xd8, 0x01, 0x57, 0xbc, 0xec, 0xd6, 0xd8, 0xfa, - 0x34, 0xc2, 0xbc, 0xfc, 0x12, 0x56, 0x98, 0xeb, 0x93, 0x9f, 0x65, 0xc8, 0x15, 0x06, 0xfa, 0xe4, - 0xf5, 0x6a, 0xab, 0xac, 0x59, 0x22, 0x7d, 0x75, 0xf8, 0x1c, 0x5a, 0xdb, 0x3b, 0xa4, 0x75, 0xa8, - 0x72, 0x94, 0x6e, 0xe5, 0xd4, 0xfd, 0x71, 0xdf, 0x26, 0x3f, 0xef, 0xdb, 0xe4, 0xf7, 0x7d, 0x9b, - 0x7c, 0xfb, 0xd3, 0xae, 0x4c, 0x1d, 0xf3, 0x5f, 0x4f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0xf2, - 0x7a, 0x56, 0x42, 0xbb, 0x03, 0x00, 0x00, + // 611 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xdd, 0x6e, 0x12, 0x41, + 0x14, 0x66, 0x16, 0xba, 0xc0, 0xa1, 0xa5, 0xeb, 0x44, 0x9b, 0x35, 0x2a, 0xc1, 0x35, 0x26, 0xd8, + 0x34, 0x5c, 0xb4, 0xc6, 0x1b, 0xaf, 0x28, 0xc5, 0x48, 0x68, 0x43, 0x32, 0x36, 0x4d, 0xe3, 0x0d, + 0x19, 0x76, 0x0f, 0x65, 0xb3, 0xbf, 0xd9, 0x19, 0x36, 0xe5, 0x4d, 0x7c, 0x0f, 0x5f, 0xc2, 0x4b, + 0x1f, 0xc1, 0xd4, 0x07, 0xd1, 0xcc, 0xec, 0xd2, 0x40, 0xf4, 0xee, 0x9c, 0xef, 0x3b, 0x73, 0xe6, + 0xcc, 0xf7, 0xcd, 0x81, 0x46, 0x3a, 0xef, 0xa7, 0x59, 0x22, 0x13, 0x6a, 0xa4, 0x73, 0xe7, 0x3b, + 0x01, 0x63, 0x72, 0x43, 0x2d, 0xa8, 0x06, 0xb8, 0xb6, 0x49, 0x97, 0xf4, 0xf6, 0x99, 0x0a, 0xe9, + 0x53, 0xd8, 0xcb, 0x79, 0xb8, 0x42, 0xdb, 0xd0, 0x58, 0x91, 0xd0, 0x17, 0xd0, 0x5c, 0x09, 0xcc, + 0x66, 0x11, 0x4a, 0x6e, 0x57, 0x35, 0xd3, 0x50, 0xc0, 0x15, 0x4a, 0x4e, 0x6d, 0xa8, 0xe7, 0x98, + 0x09, 0x3f, 0x89, 0xed, 0x5a, 0x97, 0xf4, 0x6a, 0x6c, 0x93, 0xd2, 0x57, 0x00, 0x78, 0x9f, 0xfa, + 0x19, 0x8a, 0x19, 0x97, 0xf6, 0x9e, 0x26, 0x9b, 0x25, 0x32, 0x90, 0x94, 0x42, 0x4d, 0x37, 0x34, + 0x75, 0x43, 0x1d, 0xab, 0x9b, 0x84, 0xcc, 0x90, 0x47, 0x33, 0xdf, 0xb3, 0xa1, 0x4b, 0x7a, 0x07, + 0xac, 0x51, 0x00, 0x63, 0xcf, 0xe9, 0x82, 0x39, 0xb9, 0xb9, 0xf4, 0x85, 0xa4, 0x47, 0x60, 0x04, + 0xb9, 0x4d, 0xba, 0xd5, 0x5e, 0xeb, 0xd4, 0xec, 0xa7, 0xf3, 0xfe, 0xe4, 0x86, 0x19, 0x41, 0xee, + 0x0c, 0xe0, 0xc9, 0x15, 0x8f, 0xfd, 0x05, 0x0a, 0x39, 0x5c, 0xf2, 0xf8, 0x0e, 0xbf, 0xa0, 0xa4, + 0x27, 0x50, 0x77, 0x75, 0x22, 0xca, 0x13, 0x54, 0x9d, 0xd8, 0xad, 0x63, 0x9b, 0x12, 0xe7, 0x0f, + 0x81, 0xf6, 0x2e, 0x47, 0xdb, 0x60, 0x8c, 0x3d, 0xad, 0x52, 0x8d, 0x19, 0x63, 0x8f, 0x9e, 0x80, + 0x31, 0x4d, 0xb5, 0x42, 0xed, 0xd3, 0x97, 0xff, 0xf6, 0xea, 0x4f, 0x53, 0xcc, 0xb8, 0xf4, 0x93, + 0x98, 0x19, 0xd3, 0x54, 0x49, 0x7a, 0x89, 0x39, 0x86, 0x5a, 0xb8, 0x03, 0x56, 0x24, 0xf4, 0x19, + 0x98, 0x01, 0xae, 0xd5, 0x2b, 0x0b, 0xd1, 0xf6, 0x02, 0x5c, 0x8f, 0x3d, 0xfa, 0x11, 0x0e, 0x31, + 0x76, 0xb3, 0x75, 0xaa, 0x8e, 0xcf, 0x78, 0x78, 0x97, 0x68, 0xdd, 0xda, 0xc5, 0xcc, 0xa3, 0x47, + 0x6a, 0x10, 0xde, 0x25, 0xac, 0x8d, 0x3b, 0x39, 0xed, 0x42, 0xcb, 0x4d, 0xa2, 0x34, 0x43, 0xa1, + 0xdd, 0x30, 0xf5, 0x7d, 0xdb, 0x90, 0xf3, 0x06, 0x9a, 0x8f, 0xc3, 0x51, 0x00, 0x73, 0xc8, 0x46, + 0x83, 0xeb, 0x91, 0x55, 0x51, 0xf1, 0xc5, 0xe8, 0x72, 0x74, 0x3d, 0xb2, 0x88, 0x33, 0x86, 0xd6, + 0x79, 0x98, 0xb8, 0xc1, 0x74, 0xb1, 0x10, 0x28, 0xff, 0xf3, 0x49, 0x8e, 0xc0, 0x4c, 0x34, 0xa7, + 0x35, 0x38, 0x60, 0x65, 0xa6, 0x2a, 0x43, 0x8c, 0xcb, 0x77, 0xaa, 0xd0, 0xf9, 0x0a, 0x70, 0xcd, + 0xe7, 0x21, 0x8e, 0x63, 0x0f, 0xef, 0xe9, 0x3b, 0xa8, 0x17, 0x95, 0x1b, 0x23, 0x0e, 0xd5, 0xa3, + 0xb6, 0xee, 0x62, 0x1b, 0x9e, 0xbe, 0x86, 0xfd, 0x79, 0x98, 0x24, 0xd1, 0x6c, 0xe1, 0x87, 0x12, + 0xb3, 0xf2, 0x3b, 0xb6, 0x34, 0xf6, 0x49, 0x43, 0x4e, 0x02, 0x8d, 0xe1, 0x12, 0xdd, 0x40, 0xac, + 0x22, 0x7a, 0x0c, 0x35, 0xad, 0x15, 0xd1, 0x5a, 0x1d, 0xa9, 0xb6, 0x1b, 0xae, 0xaf, 0xa4, 0xc9, + 0x7c, 0xb9, 0x8c, 0x98, 0xae, 0x51, 0x53, 0x8a, 0x55, 0xa4, 0x3b, 0xd6, 0x98, 0x0a, 0x9d, 0xb7, + 0xd0, 0x7c, 0x2c, 0x2a, 0x54, 0x19, 0x9e, 0x9d, 0x0e, 0xad, 0x0a, 0xdd, 0x87, 0xc6, 0xed, 0xed, + 0x67, 0x2e, 0x96, 0x1f, 0xde, 0x5b, 0xc4, 0x71, 0xa1, 0x7e, 0xc1, 0x25, 0x9f, 0xe0, 0x7a, 0xcb, + 0x3d, 0xb2, 0xed, 0x1e, 0x85, 0x9a, 0xc7, 0x25, 0x2f, 0xa7, 0xd5, 0xb1, 0xfa, 0x3c, 0x7e, 0x5e, + 0x2e, 0x8d, 0xe1, 0xe7, 0x6a, 0x29, 0xdc, 0x0c, 0xb9, 0x44, 0x4f, 0x2d, 0x85, 0x32, 0xbf, 0xca, + 0x9a, 0x25, 0x32, 0x90, 0xc7, 0xcf, 0xa1, 0xbd, 0xeb, 0x32, 0xad, 0x43, 0x95, 0xa3, 0xb0, 0x2a, + 0xe7, 0xd6, 0x8f, 0x87, 0x0e, 0xf9, 0xf9, 0xd0, 0x21, 0xbf, 0x1e, 0x3a, 0xe4, 0xdb, 0xef, 0x4e, + 0x65, 0x6e, 0xea, 0x8d, 0x3e, 0xfb, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x1d, 0x2f, 0x21, 0x79, 0xdd, + 0x03, 0x00, 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -831,6 +840,11 @@ func (m *ManifestChange) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintPb(dAtA, i, uint64(m.EncryptionAlgo)) } + if m.Compression != 0 { + dAtA[i] = 0x30 + i++ + i = encodeVarintPb(dAtA, i, uint64(m.Compression)) + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -1090,6 +1104,9 @@ func (m *ManifestChange) Size() (n int) { if m.EncryptionAlgo != 0 { n += 1 + sovPb(uint64(m.EncryptionAlgo)) } + if m.Compression != 0 { + n += 1 + sovPb(uint64(m.Compression)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1744,6 +1761,25 @@ func (m *ManifestChange) Unmarshal(dAtA []byte) error { break } } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Compression", wireType) + } + m.Compression = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Compression |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipPb(dAtA[iNdEx:]) diff --git a/pb/pb.proto b/pb/pb.proto index 1c8d9c587..1edb0edc4 100644 --- a/pb/pb.proto +++ b/pb/pb.proto @@ -36,8 +36,8 @@ message KVList { } message ManifestChangeSet { - // A set of changes that are applied atomically. - repeated ManifestChange changes = 1; + // A set of changes that are applied atomically. + repeated ManifestChange changes = 1; } enum EncryptionAlgo { @@ -45,15 +45,16 @@ enum EncryptionAlgo { } message ManifestChange { - uint64 Id = 1; - enum Operation { - CREATE = 0; - DELETE = 1; - } - Operation Op = 2; - uint32 Level = 3; // Only used for CREATE - uint64 key_id = 4; - EncryptionAlgo encryption_algo = 5; + uint64 Id = 1; // Table ID. + enum Operation { + CREATE = 0; + DELETE = 1; + } + Operation Op = 2; + uint32 Level = 3; // Only used for CREATE. + uint64 key_id = 4; + EncryptionAlgo encryption_algo = 5; + uint32 compression = 6; // Only used for CREATE Op. } message BlockOffset { diff --git a/stream_writer.go b/stream_writer.go index f239e3f8b..b3ab8f141 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -219,11 +219,8 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { return nil, err } - bopts := table.Options{ - BlockSize: sw.db.opt.BlockSize, - BloomFalsePositive: sw.db.opt.BloomFalsePositive, - DataKey: dk, - } + bopts := buildTableOptions(sw.db.opt) + bopts.DataKey = dk w := &sortedWriter{ db: sw.db, streamID: streamID, @@ -317,11 +314,8 @@ func (w *sortedWriter) send() error { if err != nil { return y.Wrapf(err, "Error while retriving datakey in sortedWriter.send") } - bopts := table.Options{ - BlockSize: w.db.opt.BlockSize, - BloomFalsePositive: w.db.opt.BloomFalsePositive, - DataKey: dk, - } + bopts := buildTableOptions(w.db.opt) + bopts.DataKey = dk w.builder = table.NewTableBuilder(bopts) return nil } @@ -348,12 +342,8 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { if _, err := fd.Write(data); err != nil { return err } - - opts := table.Options{ - LoadingMode: w.db.opt.TableLoadingMode, - ChkMode: w.db.opt.ChecksumVerificationMode, - DataKey: builder.DataKey(), - } + opts := buildTableOptions(w.db.opt) + opts.DataKey = builder.DataKey() tbl, err := table.OpenTable(fd, opts) if err != nil { return err @@ -384,9 +374,10 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { } // Now that table can be opened successfully, let's add this to the MANIFEST. change := &pb.ManifestChange{ - Id: tbl.ID(), - Op: pb.ManifestChange_CREATE, - Level: uint32(lhandler.level), + Id: tbl.ID(), + Op: pb.ManifestChange_CREATE, + Level: uint32(lhandler.level), + Compression: uint32(tbl.CompressionType()), } if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil { return err diff --git a/stream_writer_test.go b/stream_writer_test.go index 5e252ea32..f361f4b1b 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -323,7 +323,6 @@ func TestStreamWriter6(t *testing.T) { } } require.NoError(t, db.Close()) - db, err := Open(db.opt) require.NoError(t, err) require.NoError(t, db.Close()) diff --git a/table/builder.go b/table/builder.go index 0f8c06f77..bd22e4318 100644 --- a/table/builder.go +++ b/table/builder.go @@ -22,9 +22,13 @@ import ( "math" "unsafe" + "github.com/DataDog/zstd" "github.com/dgryski/go-farm" "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/pkg/errors" + "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/y" "github.com/dgraph-io/ristretto/z" @@ -150,6 +154,18 @@ func (b *Builder) finishBlock() { blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block. b.writeChecksum(blockBuf) + // Compress the block. + if b.opt.Compression != options.None { + var err error + // TODO: Find a way to reuse buffers. Current implementation creates a + // new buffer for each compressData call. + blockBuf, err = b.compressData(b.buf.Bytes()[b.baseOffset:]) + y.Check(err) + // Truncate already written data. + b.buf.Truncate(int(b.baseOffset)) + // Write compressed data. + b.buf.Write(blockBuf) + } if b.shouldEncrypt() { block := b.buf.Bytes()[b.baseOffset:] eBlock, err := b.encrypt(block) @@ -322,3 +338,16 @@ func (b *Builder) encrypt(data []byte) ([]byte, error) { func (b *Builder) shouldEncrypt() bool { return b.opt.DataKey != nil } + +// compressData compresses the given data. +func (b *Builder) compressData(data []byte) ([]byte, error) { + switch b.opt.Compression { + case options.None: + return data, nil + case options.Snappy: + return snappy.Encode(nil, data), nil + case options.ZSTD: + return zstd.Compress(nil, data) + } + return nil, errors.New("Unsupported compression type") +} diff --git a/table/builder_test.go b/table/builder_test.go index 420967af6..6fcc6dc59 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -34,8 +34,8 @@ func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) keyPrefix := "key" t.Run("single key", func(t *testing.T) { - f := buildTestTable(t, keyPrefix, 1) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := Options{Compression: options.ZSTD} + f := buildTestTable(t, keyPrefix, 1, opts) tbl, err := OpenTable(f, opts) require.NoError(t, err) require.Len(t, tbl.blockIndex, 1) @@ -51,6 +51,9 @@ func TestTableIndex(t *testing.T) { require.NoError(t, err) opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, DataKey: &pb.DataKey{Data: key}}) + // Compression mode. + opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, + Compression: options.ZSTD}) keysCount := 10000 for _, opt := range opts { builder := NewTableBuilder(opt) @@ -76,10 +79,8 @@ func TestTableIndex(t *testing.T) { _, err = f.Write(builder.Finish()) require.NoError(t, err, "unable to write to file") - topt := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead, - DataKey: opt.DataKey} - tbl, err := OpenTable(f, topt) - if topt.DataKey == nil { + tbl, err := OpenTable(f, opt) + if opt.DataKey == nil { // key id is zero if thre is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } @@ -96,6 +97,22 @@ func TestTableIndex(t *testing.T) { }) } +func TestInvalidCompression(t *testing.T) { + keyPrefix := "key" + opts := Options{Compression: options.ZSTD} + f := buildTestTable(t, keyPrefix, 1000, opts) + t.Run("with correct decompression algo", func(t *testing.T) { + _, err := OpenTable(f, opts) + require.NoError(t, err) + }) + t.Run("with incorrect decompression algo", func(t *testing.T) { + // Set incorrect compression algorithm. + opts.Compression = options.Snappy + _, err := OpenTable(f, opts) + require.Error(t, err) + }) +} + func BenchmarkBuilder(b *testing.B) { rand.Seed(time.Now().Unix()) key := func(i int) []byte { diff --git a/table/table.go b/table/table.go index f09f5bd9c..7c0b89118 100644 --- a/table/table.go +++ b/table/table.go @@ -28,7 +28,9 @@ import ( "sync" "sync/atomic" + "github.com/DataDog/zstd" "github.com/golang/protobuf/proto" + "github.com/golang/snappy" "github.com/pkg/errors" "github.com/dgraph-io/badger/options" @@ -59,6 +61,9 @@ type Options struct { // DataKey is the key used to decrypt the encrypted text. DataKey *pb.DataKey + + // Compression indicates the compression algorithm used for block compression. + Compression options.CompressionType } // TableInterface is useful for testing. @@ -91,6 +96,11 @@ type Table struct { opt *Options } +// CompressionType returns the compression algorithm used for block compression. +func (t *Table) CompressionType() options.CompressionType { + return t.opt.Compression +} + // IncrRef increments the refcount (having to do with whether the file should be deleted) func (t *Table) IncrRef() { atomic.AddInt32(&t.ref, 1) @@ -219,10 +229,11 @@ func OpenTable(fd *os.File, opts Options) (*Table, error) { // OpenInMemoryTable is similar to OpenTable but it opens a new table from the provided data. // OpenInMemoryTable is used for L0 tables. -func OpenInMemoryTable(data []byte, id uint64, dk *pb.DataKey) (*Table, error) { +func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) { + opt.LoadingMode = options.LoadToRAM t := &Table{ ref: 1, // Caller is given one reference. - opt: &Options{LoadingMode: options.LoadToRAM, DataKey: dk}, + opt: opt, mmap: data, tableSize: len(data), IsInmemory: true, @@ -245,9 +256,10 @@ func (t *Table) initBiggestAndSmallest() error { it2 := t.NewIterator(true) defer it2.Close() it2.Rewind() - if it2.Valid() { - t.biggest = it2.Key() + if !it2.Valid() { + return errors.Wrapf(it2.err, "failed to initialize biggest for table %s", t.Filename()) } + t.biggest = it2.Key() return nil } @@ -343,7 +355,8 @@ func (t *Table) block(idx int) (*block, error) { } var err error if blk.data, err = t.read(blk.offset, int(ko.Len)); err != nil { - return nil, err + return nil, errors.Wrapf(err, + "failed to read from file: %s at offset: %d, len: %d", t.fd.Name(), blk.offset, ko.Len) } if t.shouldDecrypt() { @@ -353,10 +366,24 @@ func (t *Table) block(idx int) (*block, error) { } } + blk.data, err = t.decompressData(blk.data) + if err != nil { + return nil, errors.Wrapf(err, + "failed to decode compressed data in file: %s at offset: %d, len: %d", + t.fd.Name(), blk.offset, ko.Len) + } + // Read meta data related to block. readPos := len(blk.data) - 4 // First read checksum length. blk.chkLen = int(y.BytesToU32(blk.data[readPos : readPos+4])) + // Checksum length greater than block size could happen if the table was compressed and + // it was opened with an incorrect compression algorithm (or the data was corrupted). + if blk.chkLen > len(blk.data) { + return nil, errors.New("invalid checksum length. Either the data is" + + "corrupted or the table options are incorrectly set") + } + // Read checksum and store it readPos -= blk.chkLen blk.checksum = blk.data[readPos : readPos+blk.chkLen] @@ -476,3 +503,16 @@ func IDToFilename(id uint64) string { func NewFilename(id uint64, dir string) string { return filepath.Join(dir, IDToFilename(id)) } + +// decompressData decompresses the given data. +func (t *Table) decompressData(data []byte) ([]byte, error) { + switch t.opt.Compression { + case options.None: + return data, nil + case options.Snappy: + return snappy.Decode(nil, data) + case options.ZSTD: + return zstd.Decompress(nil, data) + } + return nil, errors.New("Unsupported compression type") +} diff --git a/table/table_test.go b/table/table_test.go index c78855474..98c90fec4 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -43,7 +43,23 @@ func key(prefix string, i int) string { return prefix + fmt.Sprintf("%04d", i) } -func buildTestTable(t *testing.T, prefix string, n int) *os.File { +func getTestTableOptions() Options { + return Options{ + Compression: options.ZSTD, + LoadingMode: options.LoadToRAM, + ChkMode: options.OnTableAndBlockRead, + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + } + +} +func buildTestTable(t *testing.T, prefix string, n int, opts Options) *os.File { + if opts.BloomFalsePositive == 0 { + opts.BloomFalsePositive = 0.01 + } + if opts.BlockSize == 0 { + opts.BlockSize = 4 * 1024 + } y.AssertTrue(n <= 10000) keyValues := make([][]string, n) for i := 0; i < n; i++ { @@ -51,12 +67,11 @@ func buildTestTable(t *testing.T, prefix string, n int) *os.File { v := fmt.Sprintf("%d", i) keyValues[i] = []string{k, v} } - return buildTable(t, keyValues) + return buildTable(t, keyValues, opts) } // keyValues is n by 2 where n is number of pairs. -func buildTable(t *testing.T, keyValues [][]string) *os.File { - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} +func buildTable(t *testing.T, keyValues [][]string, opts Options) *os.File { b := NewTableBuilder(opts) defer b.Close() // TODO: Add test for file garbage collection here. No files should be left after the tests here. @@ -86,8 +101,8 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File { func TestTableIterator(t *testing.T) { for _, n := range []int{99, 100, 101} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -109,8 +124,8 @@ func TestTableIterator(t *testing.T) { func TestSeekToFirst(t *testing.T) { for _, n := range []int{99, 100, 101, 199, 200, 250, 9999, 10000} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -128,8 +143,8 @@ func TestSeekToFirst(t *testing.T) { func TestSeekToLast(t *testing.T) { for _, n := range []int{99, 100, 101, 199, 200, 250, 9999, 10000} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -150,8 +165,8 @@ func TestSeekToLast(t *testing.T) { } func TestSeek(t *testing.T) { - f := buildTestTable(t, "k", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "k", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -186,8 +201,8 @@ func TestSeek(t *testing.T) { } func TestSeekForPrev(t *testing.T) { - f := buildTestTable(t, "k", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "k", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -225,8 +240,8 @@ func TestIterateFromStart(t *testing.T) { // Vary the number of elements added. for _, n := range []int{99, 100, 101, 199, 200, 250, 9999, 10000} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -253,8 +268,8 @@ func TestIterateFromEnd(t *testing.T) { // Vary the number of elements added. for _, n := range []int{99, 100, 101, 199, 200, 250, 9999, 10000} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -277,8 +292,9 @@ func TestIterateFromEnd(t *testing.T) { } func TestTable(t *testing.T) { - f := buildTestTable(t, "key", 10000) - opts := Options{LoadingMode: options.FileIO, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + opts.LoadingMode = options.FileIO + f := buildTestTable(t, "key", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -305,8 +321,8 @@ func TestTable(t *testing.T) { } func TestIterateBackAndForth(t *testing.T) { - f := buildTestTable(t, "key", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -347,8 +363,8 @@ func TestIterateBackAndForth(t *testing.T) { } func TestUniIterator(t *testing.T) { - f := buildTestTable(t, "key", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -380,12 +396,12 @@ func TestUniIterator(t *testing.T) { // Try having only one table. func TestConcatIteratorOneTable(t *testing.T) { + opts := getTestTableOptions() f := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) + }, opts) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} tbl, err := OpenTable(f, opts) require.NoError(t, err) defer tbl.DecrRef() @@ -403,14 +419,13 @@ func TestConcatIteratorOneTable(t *testing.T) { } func TestConcatIterator(t *testing.T) { - f := buildTestTable(t, "keya", 10000) - f2 := buildTestTable(t, "keyb", 10000) - f3 := buildTestTable(t, "keyc", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "keya", 10000, opts) + f2 := buildTestTable(t, "keyb", 10000, opts) + f3 := buildTestTable(t, "keyc", 10000, opts) tbl, err := OpenTable(f, opts) require.NoError(t, err) defer tbl.DecrRef() - opts.LoadingMode = options.LoadToRAM tbl2, err := OpenTable(f2, opts) require.NoError(t, err) defer tbl2.DecrRef() @@ -485,15 +500,15 @@ func TestConcatIterator(t *testing.T) { } func TestMergingIterator(t *testing.T) { + opts := getTestTableOptions() f1 := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) + }, opts) f2 := buildTable(t, [][]string{ {"k1", "b1"}, {"k2", "b2"}, - }) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} + }, opts) tbl1, err := OpenTable(f1, opts) require.NoError(t, err) defer tbl1.DecrRef() @@ -526,15 +541,15 @@ func TestMergingIterator(t *testing.T) { } func TestMergingIteratorReversed(t *testing.T) { + opts := getTestTableOptions() f1 := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) + }, opts) f2 := buildTable(t, [][]string{ {"k1", "b1"}, {"k2", "b2"}, - }) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} + }, opts) tbl1, err := OpenTable(f1, opts) require.NoError(t, err) defer tbl1.DecrRef() @@ -568,13 +583,13 @@ func TestMergingIteratorReversed(t *testing.T) { // Take only the first iterator. func TestMergingIteratorTakeOne(t *testing.T) { + opts := getTestTableOptions() f1 := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) - f2 := buildTable(t, [][]string{}) + }, opts) + f2 := buildTable(t, [][]string{{"l1", "b1"}}, opts) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} t1, err := OpenTable(f1, opts) require.NoError(t, err) defer t1.DecrRef() @@ -604,18 +619,25 @@ func TestMergingIteratorTakeOne(t *testing.T) { require.EqualValues(t, 'A', vs.Meta) it.Next() + k = it.Key() + require.EqualValues(t, "l1", string(y.ParseKey(k))) + vs = it.Value() + require.EqualValues(t, "b1", string(vs.Value)) + require.EqualValues(t, 'A', vs.Meta) + it.Next() + require.False(t, it.Valid()) } // Take only the second iterator. func TestMergingIteratorTakeTwo(t *testing.T) { - f1 := buildTable(t, [][]string{}) + opts := getTestTableOptions() + f1 := buildTable(t, [][]string{{"l1", "b1"}}, opts) f2 := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) + }, opts) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} t1, err := OpenTable(f1, opts) require.NoError(t, err) defer t1.DecrRef() @@ -644,6 +666,15 @@ func TestMergingIteratorTakeTwo(t *testing.T) { require.EqualValues(t, "a2", string(vs.Value)) require.EqualValues(t, 'A', vs.Meta) it.Next() + require.True(t, it.Valid()) + + k = it.Key() + require.EqualValues(t, "l1", string(y.ParseKey(k))) + vs = it.Value() + require.EqualValues(t, "b1", string(vs.Value)) + require.EqualValues(t, 'A', vs.Meta) + it.Next() + require.False(t, it.Valid()) } @@ -658,7 +689,7 @@ func TestTableBigValues(t *testing.T) { require.NoError(t, err, "unable to create file") n := 100 // Insert 100 keys. - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} builder := NewTableBuilder(opts) for i := 0; i < n; i++ { key := y.KeyWithTs([]byte(key("", i)), 0) @@ -668,7 +699,6 @@ func TestTableBigValues(t *testing.T) { _, err = f.Write(builder.Finish()) require.NoError(t, err, "unable to write to file") - opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} tbl, err := OpenTable(f, opts) require.NoError(t, err, "unable to open table") defer tbl.DecrRef() @@ -692,12 +722,12 @@ func TestTableChecksum(t *testing.T) { // we are going to write random byte at random location in table file. rb := make([]byte, 1) rand.Read(rb) - f := buildTestTable(t, "k", 10000) + opts := getTestTableOptions() + f := buildTestTable(t, "k", 10000, opts) fi, err := f.Stat() require.NoError(t, err, "unable to get file information") f.WriteAt(rb, rand.Int63n(fi.Size())) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} _, err = OpenTable(f, opts) if err == nil || !strings.Contains(err.Error(), "checksum") { t.Fatal("Test should have been failed with checksum mismatch error") @@ -730,7 +760,7 @@ func BenchmarkReadAndBuild(b *testing.B) { // Iterate b.N times over the entire table. for i := 0; i < b.N; i++ { func() { - opts := Options{BlockSize: 4 * 0124, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 0124, BloomFalsePositive: 0.01} newBuilder := NewTableBuilder(opts) it := tbl.NewIterator(false) defer it.Close() @@ -751,7 +781,7 @@ func BenchmarkReadMerged(b *testing.B) { var tables []*Table for i := 0; i < m; i++ { filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} builder := NewTableBuilder(opts) f, err := y.OpenSyncedFile(filename, true) y.Check(err) @@ -764,7 +794,6 @@ func BenchmarkReadMerged(b *testing.B) { } _, err = f.Write(builder.Finish()) require.NoError(b, err, "unable to write to file") - opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} tbl, err := OpenTable(f, opts) y.Check(err) tables = append(tables, tbl) @@ -838,7 +867,7 @@ func BenchmarkRandomRead(b *testing.B) { func getTableForBenchmarks(b *testing.B, count int) *Table { rand.Seed(time.Now().Unix()) - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} builder := NewTableBuilder(opts) filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) f, err := y.OpenSyncedFile(filename, true) @@ -851,7 +880,6 @@ func getTableForBenchmarks(b *testing.B, count int) *Table { _, err = f.Write(builder.Finish()) require.NoError(b, err, "unable to write to file") - opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification} tbl, err := OpenTable(f, opts) require.NoError(b, err, "unable to open table") return tbl