Skip to content

Commit

Permalink
Support compression in Badger (hypermodeinc#1013)
Browse files Browse the repository at this point in the history
This commit adds support for compression in badger. Two compression
algorithms - Snappy and ZSTD are supported for now. The compression algorithm information
is stored in the manifest file. We compress blocks (typically of size 4KB) stored in the SST.

The compression algorithm can be specified via the CompressionType option to badger.
  • Loading branch information
Ibrahim Jarif authored Sep 30, 2019
1 parent e3b5652 commit e7d0a7b
Show file tree
Hide file tree
Showing 20 changed files with 394 additions and 206 deletions.
3 changes: 2 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ environment:

# scripts that run after cloning repository
install:
- set PATH=%GOPATH%\bin;c:\go\bin;%PATH%
- set PATH=%GOPATH%\bin;c:\go\bin;c:\msys64\mingw64\bin;%PATH%
- go version
- go env
- python --version
- gcc --version

# To run your custom scripts instead of automatic MSBuild
build_script:
Expand Down
19 changes: 5 additions & 14 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ type flushTask struct {

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scnerio, when empty memtable is flushed. For example, memtable is empty and
// There can be a scenario, when empty memtable is flushed. For example, memtable is empty and
// after writing request to value log, rotation count exceeds db.LogRotatesToFlush.
if ft.mt.Empty() {
return nil
Expand All @@ -911,16 +911,13 @@ func (db *DB) handleFlushTask(ft flushTask) error {
if err != nil {
return y.Wrapf(err, "failed to get datakey in db.handleFlushTask")
}
bopts := table.Options{
BlockSize: db.opt.BlockSize,
BloomFalsePositive: db.opt.BloomFalsePositive,
DataKey: dk,
}
bopts := buildTableOptions(db.opt)
bopts.DataKey = dk
tableData := buildL0Table(ft, bopts)

fileID := db.lc.reserveFileID()
if db.opt.KeepL0InMemory {
tbl, err := table.OpenInMemoryTable(tableData, fileID, dk)
tbl, err := table.OpenInMemoryTable(tableData, fileID, &bopts)
if err != nil {
return errors.Wrapf(err, "failed to open table in memory")
}
Expand All @@ -945,13 +942,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {
// Do dir sync as best effort. No need to return due to an error there.
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
}

opts := table.Options{
LoadingMode: db.opt.TableLoadingMode,
ChkMode: db.opt.ChecksumVerificationMode,
DataKey: dk,
}
tbl, err := table.OpenTable(fd, opts)
tbl, err := table.OpenTable(fd, bopts)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
return err
Expand Down
15 changes: 6 additions & 9 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,10 @@ func TestCompactionFilePicking(t *testing.T) {
// addToManifest function is used in TestCompactionFilePicking. It adds table to db manifest.
func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) {
change := &pb.ManifestChange{
Id: tab.ID(),
Op: pb.ManifestChange_CREATE,
Level: level,
Id: tab.ID(),
Op: pb.ManifestChange_CREATE,
Level: level,
Compression: uint32(tab.CompressionType()),
}
require.NoError(t, db.manifest.addChanges([]*pb.ManifestChange{change}),
"unable to add to manifest")
Expand All @@ -516,10 +517,7 @@ func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) {
// createTableWithRange function is used in TestCompactionFilePicking. It creates
// a table with key starting from start and ending with end.
func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
bopts := table.Options{
BlockSize: db.opt.BlockSize,
BloomFalsePositive: db.opt.BloomFalsePositive,
}
bopts := buildTableOptions(db.opt)
b := table.NewTableBuilder(bopts)
nums := []int{start, end}
for _, i := range nums {
Expand All @@ -537,8 +535,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
_, err = fd.Write(b.Finish())
require.NoError(t, err, "unable to write to file")

opts := table.Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification}
tab, err := table.OpenTable(fd, opts)
tab, err := table.OpenTable(fd, bopts)
require.NoError(t, err)
return tab
}
Expand Down
29 changes: 24 additions & 5 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func TestIterate2Basic(t *testing.T) {
})
}

func TestLoadAndEncryption(t *testing.T) {
func TestLoad(t *testing.T) {
testLoad := func(t *testing.T, opt Options) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
Expand Down Expand Up @@ -642,14 +642,33 @@ func TestLoadAndEncryption(t *testing.T) {
sort.Slice(fileIDs, func(i, j int) bool { return fileIDs[i] < fileIDs[j] })
fmt.Printf("FileIDs: %v\n", fileIDs)
}
t.Run("TestLoad Without Encryption", func(t *testing.T) {
testLoad(t, getTestOptions(""))
t.Run("TestLoad Without Encryption/Compression", func(t *testing.T) {
opt := getTestOptions("")
opt.Compression = options.None
testLoad(t, opt)
})
t.Run("TestLoad With Encryption", func(t *testing.T) {
t.Run("TestLoad With Encryption and no compression", func(t *testing.T) {
key := make([]byte, 32)
_, err := rand.Read(key)
require.NoError(t, err)
testLoad(t, getTestOptions("").WithEncryptionKey(key))
opt := getTestOptions("")
opt.EncryptionKey = key
opt.Compression = options.None
testLoad(t, opt)
})
t.Run("TestLoad With Encryption and compression", func(t *testing.T) {
key := make([]byte, 32)
_, err := rand.Read(key)
require.NoError(t, err)
opt := getTestOptions("")
opt.EncryptionKey = key
opt.Compression = options.ZSTD
testLoad(t, opt)
})
t.Run("TestLoad without Encryption and with compression", func(t *testing.T) {
opt := getTestOptions("")
opt.Compression = options.ZSTD
testLoad(t, opt)
})
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ module github.com/dgraph-io/badger
go 1.12

require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.0 // indirect
github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
github.com/pkg/errors v0.8.1
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cobra v0.0.5
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VictoriaMetrics/fastcache v1.5.1/go.mod h1:+jv9Ckb+za/P1ZRg/sulP5Ni1v49daAVERr0H3CuscE=
Expand Down Expand Up @@ -31,6 +33,7 @@ github.com/goburrow/cache v0.1.0/go.mod h1:8oxkfud4hvjO4tNjEKZfEd+LrpDVDlBIauGYs
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
2 changes: 1 addition & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func TestPickSortTables(t *testing.T) {
genTables := func(mks ...MockKeys) []*table.Table {
out := make([]*table.Table, 0)
for _, mk := range mks {
f := buildTable(t, [][]string{{mk.small, "some value"}, {mk.large, "some value"}})
opts := table.Options{LoadingMode: options.MemoryMap,
ChkMode: options.OnTableAndBlockRead}
f := buildTable(t, [][]string{{mk.small, "some value"}, {mk.large, "some value"}}, opts)
tbl, err := table.OpenTable(f, opts)
require.NoError(t, err)
out = append(out, tbl)
Expand Down
30 changes: 10 additions & 20 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,11 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
rerr = errors.Wrapf(err, "Error while reading datakey")
return
}
opts := table.Options{
LoadingMode: db.opt.TableLoadingMode,
ChkMode: db.opt.ChecksumVerificationMode,
DataKey: dk,
}
t, err := table.OpenTable(fd, opts)
topt := buildTableOptions(db.opt)
// Set compression from table manifest.
topt.Compression = tf.Compression
topt.DataKey = dk
t, err := table.OpenTable(fd, topt)
if err != nil {
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
db.opt.Errorf(err.Error())
Expand Down Expand Up @@ -508,11 +507,8 @@ func (s *levelsController) compactBuildTables(
return nil, nil,
y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables")
}
bopts := table.Options{
BlockSize: s.kv.opt.BlockSize,
BloomFalsePositive: s.kv.opt.BloomFalsePositive,
DataKey: dk,
}
bopts := buildTableOptions(s.kv.opt)
bopts.DataKey = dk
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
for ; it.Valid(); it.Next() {
Expand Down Expand Up @@ -593,13 +589,7 @@ func (s *levelsController) compactBuildTables(
if _, err := fd.Write(builder.Finish()); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}

opts := table.Options{
LoadingMode: s.kv.opt.TableLoadingMode,
ChkMode: s.kv.opt.ChecksumVerificationMode,
DataKey: builder.DataKey(),
}
tbl, err := table.OpenTable(fd, opts)
tbl, err := table.OpenTable(fd, bopts)
// decrRef is added below.
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}
Expand Down Expand Up @@ -659,7 +649,7 @@ func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeS
changes := []*pb.ManifestChange{}
for _, table := range newTables {
changes = append(changes,
newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID()))
newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType()))
}
for _, table := range cd.top {
// Add a delete change only if the table is not in memory.
Expand Down Expand Up @@ -895,7 +885,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// the proper order. (That means this update happens before that of some compaction which
// deletes the table.)
err := s.kv.manifest.addChanges([]*pb.ManifestChange{
newCreateChange(t.ID(), 0, t.KeyID()),
newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()),
})
if err != nil {
return err
Expand Down
29 changes: 17 additions & 12 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"sync"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/badger/y"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -65,11 +66,12 @@ type levelManifest struct {
Tables map[uint64]struct{} // Set of table id's
}

// TableManifest contains information about a specific level
// TableManifest contains information about a specific table
// in the LSM tree.
type TableManifest struct {
Level uint8
KeyID uint64
Level uint8
KeyID uint64
Compression options.CompressionType
}

// manifestFile holds the file pointer (and other info) about the manifest file, which is a log
Expand Down Expand Up @@ -100,7 +102,7 @@ const (
func (m *Manifest) asChanges() []*pb.ManifestChange {
changes := make([]*pb.ManifestChange, 0, len(m.Tables))
for id, tm := range m.Tables {
changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID))
changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID, tm.Compression))
}
return changes
}
Expand Down Expand Up @@ -395,8 +397,9 @@ func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error {
return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
}
build.Tables[tc.Id] = TableManifest{
Level: uint8(tc.Level),
KeyID: tc.KeyId,
Level: uint8(tc.Level),
KeyID: tc.KeyId,
Compression: options.CompressionType(tc.Compression),
}
for len(build.Levels) <= int(tc.Level) {
build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
Expand Down Expand Up @@ -428,14 +431,16 @@ func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error {
return nil
}

func newCreateChange(id uint64, level int, keyID uint64) *pb.ManifestChange {
func newCreateChange(
id uint64, level int, keyID uint64, c options.CompressionType) *pb.ManifestChange {
return &pb.ManifestChange{
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
KeyId: keyID,
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
KeyId: keyID,
// Hard coding it, since we're supporting only AES for now.
EncryptionAlgo: pb.EncryptionAlgo_aes,
// Hardcoding it, since we're supporting only AES for now.
Compression: uint32(c),
}
}

Expand Down
21 changes: 13 additions & 8 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,26 @@ func key(prefix string, i int) string {
return prefix + fmt.Sprintf("%04d", i)
}

func buildTestTable(t *testing.T, prefix string, n int) *os.File {
func buildTestTable(t *testing.T, prefix string, n int, opts table.Options) *os.File {
y.AssertTrue(n <= 10000)
keyValues := make([][]string, n)
for i := 0; i < n; i++ {
k := key(prefix, i)
v := fmt.Sprintf("%d", i)
keyValues[i] = []string{k, v}
}
return buildTable(t, keyValues)
return buildTable(t, keyValues, opts)
}

// TODO - Move these to somewhere where table package can also use it.
// keyValues is n by 2 where n is number of pairs.
func buildTable(t *testing.T, keyValues [][]string) *os.File {
bopts := table.Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}
func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.File {
if bopts.BloomFalsePositive == 0 {
bopts.BloomFalsePositive = 0.01
}
if bopts.BlockSize == 0 {
bopts.BlockSize = 4 * 1024
}
b := table.NewTableBuilder(bopts)
defer b.Close()
// TODO: Add test for file garbage collection here. No files should be left after the tests here.
Expand Down Expand Up @@ -166,8 +171,8 @@ func TestOverlappingKeyRangeError(t *testing.T) {

lh0 := newLevelHandler(kv, 0)
lh1 := newLevelHandler(kv, 1)
f := buildTestTable(t, "k", 2)
opts := table.Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead}
f := buildTestTable(t, "k", 2, opts)
t1, err := table.OpenTable(f, opts)
require.NoError(t, err)
defer t1.DecrRef()
Expand All @@ -188,7 +193,7 @@ func TestOverlappingKeyRangeError(t *testing.T) {
require.Equal(t, true, done)
lc.runCompactDef(0, cd)

f = buildTestTable(t, "l", 2)
f = buildTestTable(t, "l", 2, opts)
t2, err := table.OpenTable(f, opts)
require.NoError(t, err)
defer t2.DecrRef()
Expand Down Expand Up @@ -220,13 +225,13 @@ func TestManifestRewrite(t *testing.T) {
require.Equal(t, 0, m.Deletions)

err = mf.addChanges([]*pb.ManifestChange{
newCreateChange(0, 0, 0),
newCreateChange(0, 0, 0, 0),
})
require.NoError(t, err)

for i := uint64(0); i < uint64(deletionsThreshold*3); i++ {
ch := []*pb.ManifestChange{
newCreateChange(i+1, 0, 0),
newCreateChange(i+1, 0, 0, 0),
newDeleteChange(i),
}
err := mf.addChanges(ch)
Expand Down
Loading

0 comments on commit e7d0a7b

Please sign in to comment.