From bca61c9e331286820122e9fc3d612277bf8506ef Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Thu, 11 Apr 2019 17:31:44 +0200 Subject: [PATCH] Remove all code references to Badger --- go.mod | 1 - go.sum | 2 - storage/badger/badger_store.go | 389 ---------------------------- storage/badger/badger_store_test.go | 327 ----------------------- testutils/storage/storage.go | 17 -- 5 files changed, 736 deletions(-) delete mode 100644 storage/badger/badger_store.go delete mode 100644 storage/badger/badger_store_test.go diff --git a/go.mod b/go.mod index da1d31012..08bf26c5e 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/BurntSushi/toml v0.3.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/coocood/freecache v1.1.0 - github.com/dgraph-io/badger v1.5.4 github.com/dgryski/go-farm v0.0.0-20180109070241-2de33835d102 // indirect github.com/golang/protobuf v1.2.0 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c diff --git a/go.sum b/go.sum index ad5c5ec54..d48324f26 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,6 @@ github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazu github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/badger v1.5.4 h1:gVTrpUTbbr/T24uvoCaqY2KSHfNLVGm0w+hbee2HMeg= -github.com/dgraph-io/badger v1.5.4/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= github.com/dgryski/go-farm v0.0.0-20180109070241-2de33835d102 h1:afESQBXJEnj3fu+34X//E8Wg3nEbMJxJkwSc0tPePK0= github.com/dgryski/go-farm v0.0.0-20180109070241-2de33835d102/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= diff --git a/storage/badger/badger_store.go b/storage/badger/badger_store.go deleted file mode 100644 index 6b2afbbb7..000000000 --- a/storage/badger/badger_store.go +++ /dev/null @@ -1,389 +0,0 @@ -/* - Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package badger - -import ( - "bytes" - "encoding/binary" - "io" - - "time" - - b "github.com/dgraph-io/badger" - bo "github.com/dgraph-io/badger/options" - "github.com/dgraph-io/badger/protos" - "github.com/dgraph-io/badger/y" - - "github.com/bbva/qed/log" - "github.com/bbva/qed/metrics" - "github.com/bbva/qed/storage" -) - -type BadgerStore struct { - db *b.DB - vlogTicker *time.Ticker // runs every 1m, check size of vlog and run GC conditionally. - mandatoryVlogTicker *time.Ticker // runs every 10m, we always run vlog GC. -} - -// Options contains all the configuration used to open the Badger db -type Options struct { - // Path is the directory path to the Badger db to use. - Path string - - // BadgerOptions contains any specific Badger options you might - // want to specify. - BadgerOptions *b.Options - - // NoSync causes the database to skip fsync calls after each - // write to the log. This is unsafe, so it should be used - // with caution. - NoSync bool - - // ValueLogGC enables a periodic goroutine that does a garbage - // collection of the value log while the underlying Badger is online. - ValueLogGC bool - - // GCInterval is the interval between conditionally running the garbage - // collection process, based on the size of the vlog. By default, runs every 1m. - GCInterval time.Duration - - // GCInterval is the interval between mandatory running the garbage - // collection process. By default, runs every 10m. - MandatoryGCInterval time.Duration - - // GCThreshold sets threshold in bytes for the vlog size to be included in the - // garbage collection cycle. By default, 1GB. - GCThreshold int64 -} - -func NewBadgerStore(path string) (*BadgerStore, error) { - return NewBadgerStoreOpts(&Options{Path: path}) -} - -func NewBadgerStoreOpts(opts *Options) (*BadgerStore, error) { - - var bOpts b.Options - if bOpts = b.DefaultOptions; opts.BadgerOptions != nil { - bOpts = *opts.BadgerOptions - } - - bOpts.TableLoadingMode = bo.MemoryMap - bOpts.ValueLogLoadingMode = bo.FileIO - bOpts.Dir = opts.Path - bOpts.ValueDir = opts.Path - bOpts.SyncWrites = false - bOpts.ValueThreshold = 1 << 11 // LSM mode - bOpts.ValueLogFileSize = 64 << 20 // Allow easy space reclamation. - - db, err := b.Open(bOpts) - if err != nil { - return nil, err - } - - store := &BadgerStore{db: db} - // Start GC routine - if opts.ValueLogGC { - - var gcInterval time.Duration - var mandatoryGCInterval time.Duration - var threshold int64 - - if gcInterval = 1 * time.Minute; opts.GCInterval != 0 { - gcInterval = opts.GCInterval - } - if mandatoryGCInterval = 10 * time.Minute; opts.MandatoryGCInterval != 0 { - mandatoryGCInterval = opts.MandatoryGCInterval - } - if threshold = int64(1 << 30); opts.GCThreshold != 0 { - threshold = opts.GCThreshold - } - - store.vlogTicker = time.NewTicker(gcInterval) - store.mandatoryVlogTicker = time.NewTicker(mandatoryGCInterval) - go store.runVlogGC(db, threshold) - } - - return store, nil -} - -func (s BadgerStore) Mutate(mutations []*storage.Mutation) error { - return s.db.Update(func(txn *b.Txn) error { - for _, m := range mutations { - key := append([]byte{m.Table.Prefix()}, m.Key...) - err := txn.Set(key, m.Value) - if err != nil { - return err - } - } - return nil - }) -} - -func (s BadgerStore) GetRange(table storage.Table, start, end []byte) (storage.KVRange, error) { - result := make(storage.KVRange, 0) - prefix := table.Prefix() - startKey := append([]byte{prefix}, start...) - endKey := append([]byte{prefix}, end...) - err := s.db.View(func(txn *b.Txn) error { - opts := b.DefaultIteratorOptions - opts.PrefetchValues = false - it := txn.NewIterator(opts) - defer it.Close() - for it.Seek(startKey); it.Valid(); it.Next() { - item := it.Item() - var key []byte - key = item.KeyCopy(key) - if bytes.Compare(key, endKey) > 0 { - break - } - var value []byte - value, err := item.ValueCopy(value) - if err != nil { - return err - } - result = append(result, storage.KVPair{key[1:], value}) - } - return nil - }) - if err != nil { - return nil, err - } - return result, nil -} - -func (s BadgerStore) Get(table storage.Table, key []byte) (*storage.KVPair, error) { - result := new(storage.KVPair) - result.Key = key - err := s.db.View(func(txn *b.Txn) error { - k := append([]byte{table.Prefix()}, key...) - item, err := txn.Get(k) - if err != nil { - return err - } - value, err := item.ValueCopy(nil) - if err != nil { - return err - } - result.Value = value - return nil - }) - switch err { - case nil: - return result, nil - case b.ErrKeyNotFound: - return nil, storage.ErrKeyNotFound - default: - return nil, err - } -} - -func (s BadgerStore) GetLast(table storage.Table) (*storage.KVPair, error) { - result := new(storage.KVPair) - err := s.db.View(func(txn *b.Txn) error { - var err error - opts := b.DefaultIteratorOptions - opts.PrefetchValues = false - opts.Reverse = true - it := txn.NewIterator(opts) - defer it.Close() - // we are using a reversed iterator so we need to seek for - // the last possible key for history prefix - prefix := table.Prefix() - it.Seek([]byte{prefix, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}) - if it.ValidForPrefix([]byte{prefix}) { - item := it.Item() - key := item.KeyCopy(nil) - result.Key = key[1:] - result.Value, err = item.ValueCopy(nil) - } else { - err = b.ErrKeyNotFound - } - return err - }) - switch err { - case nil: - return result, nil - case b.ErrKeyNotFound: - return nil, storage.ErrKeyNotFound - default: - return nil, err - } -} - -type BadgerKVPairReader struct { - prefix byte - txn *b.Txn - it *b.Iterator -} - -func NewBadgerKVPairReader(table storage.Table, txn *b.Txn) *BadgerKVPairReader { - opts := b.DefaultIteratorOptions - opts.PrefetchSize = 10 - it := txn.NewIterator(opts) - it.Seek([]byte{table.Prefix()}) - return &BadgerKVPairReader{table.Prefix(), txn, it} -} - -func (r *BadgerKVPairReader) Read(buffer []*storage.KVPair) (n int, err error) { - for n = 0; r.it.ValidForPrefix([]byte{r.prefix}) && n < len(buffer); r.it.Next() { - item := r.it.Item() - var key, value []byte - key = item.KeyCopy(key) - value, err := item.ValueCopy(value) - if err != nil { - break - } - buffer[n] = &storage.KVPair{key[1:], value} - n++ - } - - // TODO should i close the iterator and transaction? - return n, err -} - -func (r *BadgerKVPairReader) Close() { - r.it.Close() - r.txn.Discard() -} - -func (s BadgerStore) GetAll(table storage.Table) storage.KVPairReader { - return NewBadgerKVPairReader(table, s.db.NewTransaction(false)) -} - -func (s BadgerStore) Close() error { - if s.vlogTicker != nil { - s.vlogTicker.Stop() - } - if s.mandatoryVlogTicker != nil { - s.mandatoryVlogTicker.Stop() - } - return s.db.Close() -} - -// Borrowed from github.com/dgraph-io/badger/backup.go -func writeTo(entry *protos.KVPair, w io.Writer) error { - if err := binary.Write(w, binary.LittleEndian, uint64(entry.Size())); err != nil { - return err - } - buf, err := entry.Marshal() - if err != nil { - return err - } - _, err = w.Write(buf) - return err -} - -// Backup dumps a protobuf-encoded list of all entries in the database into the -// given writer, that are newer than the specified version. -// -// Borrowed from github.com/dgraph-io/badger/backup.go -func (s *BadgerStore) Backup(w io.Writer, until uint64) error { - err := s.db.View(func(txn *b.Txn) error { - opts := b.DefaultIteratorOptions - it := txn.NewIterator(opts) - defer it.Close() - - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - if item.Version() > until { - // Ignore versions great than given timestamp - break - } - val, err := item.Value() - if err != nil { - log.Infof("Key [%x]. Error while fetching value [%v]\n", item.Key(), err) - continue - } - - entry := &protos.KVPair{ - Key: y.Copy(item.Key()), - Value: y.Copy(val), - UserMeta: []byte{item.UserMeta()}, - Version: item.Version(), - ExpiresAt: item.ExpiresAt(), - } - - // Write entries to disk - if err := writeTo(entry, w); err != nil { - return err - } - } - return nil - }) - return err -} - -func (s *BadgerStore) Load(r io.Reader) error { - return s.db.Load(r) -} - -// Snapshot takes a snapshot of the store, and returns and id -// to be used in the back up process. The state of the -// snapshot is stored in the store instance. -// In badger the id corresponds to the last version stored. -func (s *BadgerStore) Snapshot() (uint64, error) { - var version uint64 - err := s.db.View(func(txn *b.Txn) error { - opts := b.DefaultIteratorOptions - opts.PrefetchValues = false - opts.Reverse = true - it := txn.NewIterator(opts) - defer it.Close() - // we are using a reversed iterator so we need to seek for - // the last possible key - it.Rewind() - if it.Valid() { - item := it.Item() - version = item.Version() - } - return nil - }) - return version, err -} - -func (b *BadgerStore) runVlogGC(db *b.DB, threshold int64) { - // Get initial size on start. - _, lastVlogSize := db.Size() - - runGC := func() { - var err error - for err == nil { - // If a GC is successful, immediately run it again. - log.Debug("VlogGC task: running...") - err = db.RunValueLogGC(0.7) - } - log.Debug("VlogGC task: done.") - _, lastVlogSize = db.Size() - } - - for { - select { - case <-b.vlogTicker.C: - _, currentVlogSize := db.Size() - if currentVlogSize < lastVlogSize+threshold { - continue - } - runGC() - case <-b.mandatoryVlogTicker.C: - runGC() - } - } -} - -func (b *BadgerStore) RegisterMetrics(registry metrics.Registry) { - // not implemented -} diff --git a/storage/badger/badger_store_test.go b/storage/badger/badger_store_test.go deleted file mode 100644 index 13650de7c..000000000 --- a/storage/badger/badger_store_test.go +++ /dev/null @@ -1,327 +0,0 @@ -/* - Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package badger - -import ( - "fmt" - "io/ioutil" - "os" - "path/filepath" - "testing" - - "github.com/bbva/qed/storage" - "github.com/bbva/qed/testutils/rand" - "github.com/bbva/qed/util" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestMutate(t *testing.T) { - store, closeF := openBadgerStore(t) - defer closeF() - - tests := []struct { - testname string - table storage.Table - key, value []byte - expectedError error - }{ - {"Mutate Key=Value", storage.HistoryCacheTable, []byte("Key"), []byte("Value"), nil}, - } - - for _, test := range tests { - err := store.Mutate([]*storage.Mutation{ - {test.table, test.key, test.value}, - }) - require.Equalf(t, test.expectedError, err, "Error mutating in test: %s", test.testname) - _, err = store.Get(test.table, test.key) - require.Equalf(t, test.expectedError, err, "Error getting key in test: %s", test.testname) - } -} - -func TestGetExistentKey(t *testing.T) { - - store, closeF := openBadgerStore(t) - defer closeF() - - testCases := []struct { - table storage.Table - key, value []byte - expectedError error - }{ - {storage.HistoryCacheTable, []byte("Key1"), []byte("Value1"), nil}, - {storage.HistoryCacheTable, []byte("Key2"), []byte("Value2"), nil}, - {storage.HyperCacheTable, []byte("Key3"), []byte("Value3"), nil}, - {storage.HyperCacheTable, []byte("Key4"), []byte("Value4"), storage.ErrKeyNotFound}, - } - - for _, test := range testCases { - if test.expectedError == nil { - err := store.Mutate([]*storage.Mutation{ - {test.table, test.key, test.value}, - }) - require.NoError(t, err) - } - - stored, err := store.Get(test.table, test.key) - if test.expectedError == nil { - require.NoError(t, err) - require.Equalf(t, stored.Key, test.key, "The stored key does not match the original: expected %d, actual %d", test.key, stored.Key) - require.Equalf(t, stored.Value, test.value, "The stored value does not match the original: expected %d, actual %d", test.value, stored.Value) - } else { - require.Error(t, test.expectedError) - } - - } - -} - -func TestGetRange(t *testing.T) { - store, closeF := openBadgerStore(t) - defer closeF() - - var testCases = []struct { - size int - start, end byte - }{ - {40, 10, 50}, - {0, 1, 9}, - {11, 1, 20}, - {10, 40, 60}, - {0, 60, 100}, - {0, 20, 10}, - } - - table := storage.HistoryCacheTable - for i := 10; i < 50; i++ { - store.Mutate([]*storage.Mutation{ - {table, []byte{byte(i)}, []byte("Value")}, - }) - } - - for _, test := range testCases { - slice, err := store.GetRange(table, []byte{test.start}, []byte{test.end}) - require.NoError(t, err) - require.Equalf(t, len(slice), test.size, "Slice length invalid: expected %d, actual %d", test.size, len(slice)) - } - -} - -func TestGetAll(t *testing.T) { - - table := storage.HyperCacheTable - numElems := uint16(1000) - testCases := []struct { - batchSize int - numBatches int - lastBatchLen int - }{ - {10, 100, 10}, - {20, 50, 20}, - {17, 59, 14}, - } - - store, closeF := openBadgerStore(t) - defer closeF() - - // insert - for i := uint16(0); i < numElems; i++ { - key := util.Uint16AsBytes(i) - store.Mutate([]*storage.Mutation{ - {table, key, key}, - }) - } - - for i, c := range testCases { - reader := store.GetAll(table) - numBatches := 0 - var lastBatchLen int - for { - entries := make([]*storage.KVPair, c.batchSize) - n, _ := reader.Read(entries) - if n == 0 { - break - } - numBatches++ - lastBatchLen = n - } - reader.Close() - assert.Equalf(t, c.numBatches, numBatches, "The number of batches should match for test case %d", i) - assert.Equal(t, c.lastBatchLen, lastBatchLen, "The size of the last batch len should match for test case %d", i) - } - -} - -func TestGetLast(t *testing.T) { - store, closeF := openBadgerStore(t) - defer closeF() - - // insert - numElems := uint64(20) - tables := []storage.Table{storage.HistoryCacheTable, storage.HyperCacheTable} - for _, table := range tables { - for i := uint64(0); i < numElems; i++ { - key := util.Uint64AsBytes(i) - store.Mutate([]*storage.Mutation{ - {table, key, key}, - }) - } - } - - // get last element for history table - kv, err := store.GetLast(storage.HistoryCacheTable) - require.NoError(t, err) - require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Key, "The key should match the last inserted element") - require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Value, "The value should match the last inserted element") -} - -func TestBackupLoad(t *testing.T) { - store, closeF := openBadgerStore(t) - defer closeF() - - // insert - numElems := uint64(20) - tables := []storage.Table{storage.HistoryCacheTable, storage.HyperCacheTable} - for _, table := range tables { - for i := uint64(0); i < numElems; i++ { - key := util.Uint64AsBytes(i) - store.Mutate([]*storage.Mutation{ - {table, key, key}, - }) - } - } - - version, err := store.Snapshot() - require.NoError(t, err) - - backupDir := mustTempDir() - defer os.RemoveAll(backupDir) - backupFile, err := os.Create(filepath.Join(backupDir, "backup")) - require.NoError(t, err) - - store.Backup(backupFile, version) - - restore, recloseF := openBadgerStore(t) - defer recloseF() - restore.Load(backupFile) - reversion, err := store.Snapshot() - - require.NoError(t, err) - require.Equal(t, reversion, version, "Error in restored version") -} - -func BenchmarkMutate(b *testing.B) { - store, closeF := openBadgerStore(b) - defer closeF() - b.N = 10000 - b.ResetTimer() - for i := 0; i < b.N; i++ { - store.Mutate([]*storage.Mutation{ - {storage.HistoryCacheTable, rand.Bytes(128), []byte("Value")}, - }) - } - -} - -func BenchmarkGet(b *testing.B) { - store, closeF := openBadgerStore(b) - defer closeF() - N := 10000 - b.N = N - var key []byte - - // populate storage - for i := 0; i < N; i++ { - if i == 10 { - key = rand.Bytes(128) - store.Mutate([]*storage.Mutation{ - {storage.HistoryCacheTable, key, []byte("Value")}, - }) - } else { - store.Mutate([]*storage.Mutation{ - {storage.HistoryCacheTable, rand.Bytes(128), []byte("Value")}, - }) - } - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - store.Get(storage.HistoryCacheTable, key) - } - -} - -func BenchmarkGetRangeInLargeTree(b *testing.B) { - store, closeF := openBadgerStore(b) - defer closeF() - N := 1000000 - - // populate storage - for i := 0; i < N; i++ { - store.Mutate([]*storage.Mutation{ - {storage.HistoryCacheTable, []byte{byte(i)}, []byte("Value")}, - }) - } - - b.ResetTimer() - - b.Run("Small range", func(b *testing.B) { - b.N = 10000 - for i := 0; i < b.N; i++ { - store.GetRange(storage.HistoryCacheTable, []byte{10}, []byte{10}) - } - }) - - b.Run("Large range", func(b *testing.B) { - b.N = 10000 - for i := 0; i < b.N; i++ { - store.GetRange(storage.HistoryCacheTable, []byte{10}, []byte{35}) - } - }) - -} - -func openBadgerStore(t require.TestingT) (*BadgerStore, func()) { - path := mustTempDir() - - store, err := NewBadgerStore(filepath.Join(path, "backupbadger_store_test.db")) - if err != nil { - t.Errorf("Error opening badger store: %v", err) - t.FailNow() - } - return store, func() { - store.Close() - defer os.RemoveAll(path) - } -} - -func mustTempDir() string { - var err error - path, err := ioutil.TempDir("", "backup-test-") - if err != nil { - panic("failed to create temp dir") - } - return path -} - -func deleteFile(path string) { - err := os.RemoveAll(path) - if err != nil { - fmt.Printf("Unable to remove db file %s", err) - } -} diff --git a/testutils/storage/storage.go b/testutils/storage/storage.go index d2c28529a..2a063ec84 100644 --- a/testutils/storage/storage.go +++ b/testutils/storage/storage.go @@ -20,7 +20,6 @@ import ( "fmt" "os" - "github.com/bbva/qed/storage/badger" "github.com/bbva/qed/storage/bplus" "github.com/bbva/qed/storage/rocks" "github.com/stretchr/testify/require" @@ -33,22 +32,6 @@ func OpenBPlusTreeStore() (*bplus.BPlusTreeStore, func()) { } } -func OpenBadgerStore(t require.TestingT, path string) (*badger.BadgerStore, func()) { - opts := &badger.Options{ - Path: path, - ValueLogGC: true, - } - store, err := badger.NewBadgerStoreOpts(opts) - if err != nil { - t.Errorf("Error opening badger store: %v", err) - t.FailNow() - } - return store, func() { - store.Close() - deleteFile(path) - } -} - func OpenRocksDBStore(t require.TestingT, path string) (*rocks.RocksDBStore, func()) { store, err := rocks.NewRocksDBStore(path) if err != nil {