From 9f95368192e960be380077a475e3b2345649cd84 Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Tue, 19 Mar 2019 11:42:36 +0100 Subject: [PATCH 1/3] Implement a new raft store using RocksDB Former-commit-id: f8719f3e2125425ce2f64d17d1e53a9ffe6e667f --- raftwal/raftrocks/bench_test.go | 104 ++++++++ raftwal/raftrocks/rocksdb_store.go | 334 ++++++++++++++++++++++++ raftwal/raftrocks/rocksdb_store_test.go | 268 +++++++++++++++++++ rocksdb/cache.go | 48 ++++ rocksdb/cf_handle.go | 62 +++++ rocksdb/cf_test.go | 190 ++++++++++++++ rocksdb/db.go | 256 ++++++++++++++++-- rocksdb/options.go | 220 +++++++++++++++- rocksdb/options_block_based_table.go | 42 ++- rocksdb/options_flush.go | 8 +- rocksdb/options_read.go | 18 +- rocksdb/options_write.go | 16 +- rocksdb/test_util.go | 25 +- rocksdb/write_batch.go | 56 +++- rocksdb/write_batch_test.go | 59 +++++ 15 files changed, 1632 insertions(+), 74 deletions(-) create mode 100644 raftwal/raftrocks/bench_test.go create mode 100644 raftwal/raftrocks/rocksdb_store.go create mode 100644 raftwal/raftrocks/rocksdb_store_test.go create mode 100644 rocksdb/cache.go create mode 100644 rocksdb/cf_handle.go create mode 100644 rocksdb/cf_test.go diff --git a/raftwal/raftrocks/bench_test.go b/raftwal/raftrocks/bench_test.go new file mode 100644 index 000000000..9df6c41a0 --- /dev/null +++ b/raftwal/raftrocks/bench_test.go @@ -0,0 +1,104 @@ +/* + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package raftrocks + +import ( + "os" + "testing" + + "github.com/hashicorp/raft/bench" +) + +func BenchmarkRocksDBStore_FirstIndex(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.FirstIndex(b, store) +} + +func BenchmarkRocksDBStore_LastIndex(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.LastIndex(b, store) +} + +func BenchmarkRocksDBStore_GetLog(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.GetLog(b, store) +} + +func BenchmarkRocksDBStore_StoreLog(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.StoreLog(b, store) +} + +func BenchmarkRocksDBStore_StoreLogs(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.StoreLogs(b, store) +} + +func BenchmarkRocksDBStore_DeleteRange(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.DeleteRange(b, store) +} + +func BenchmarkRocksDBStore_Set(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.Set(b, store) +} + +func BenchmarkRocksDBStore_Get(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.Get(b, store) +} + +func BenchmarkRocksDBStore_SetUint64(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.SetUint64(b, store) +} + +func BenchmarkRocksDBStore_GetUint64(b *testing.B) { + store, path := testRocksDBStore(b) + defer store.Close() + defer os.Remove(path) + + raftbench.GetUint64(b, store) +} diff --git a/raftwal/raftrocks/rocksdb_store.go b/raftwal/raftrocks/rocksdb_store.go new file mode 100644 index 000000000..7b4a5013a --- /dev/null +++ b/raftwal/raftrocks/rocksdb_store.go @@ -0,0 +1,334 @@ +/* + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package raftrocks + +import ( + "bytes" + "errors" + + "github.com/bbva/qed/rocksdb" + "github.com/bbva/qed/util" + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/raft" +) + +var ( + // ErrKeyNotFound is an error indicating a given key does not exist + ErrKeyNotFound = errors.New("key not found") +) + +const ( + stableCF string = "stable" + logCF string = "log" +) + +// RocksDBStore provides access to RocksDB for Raft to store and retrieve +// log entries. It also provides key/value storage, and can be used as +// a LogStore and StableStore. +type RocksDBStore struct { + // db is the underlying handle to the db. + db *rocksdb.DB + + // The path to the RocksDB database directory. + path string + ro *rocksdb.ReadOptions + wo *rocksdb.WriteOptions + stableCFHandle *rocksdb.ColumnFamilyHandle + logCFHandle *rocksdb.ColumnFamilyHandle + cfHandles rocksdb.ColumnFamilyHandles + + // global options + globalOpts *rocksdb.Options + // stable options + stableBbto *rocksdb.BlockBasedTableOptions + stableOpts *rocksdb.Options + // log options + logBbto *rocksdb.BlockBasedTableOptions + logOpts *rocksdb.Options + logCache *rocksdb.Cache +} + +// Options contains all the configuration used to open the RocksDB instance. +type Options struct { + // Path is the directory path to the RocksDB instance to use. + Path string + // TODO decide if we should use a diferent directory for the Rocks WAl + + // BadgerOptions contains any specific RocksDB options you might + // want to specify. + + // NoSync causes the database to skip fsync calls after each + // write to the log. This is unsafe, so it should be used + // with caution. + NoSync bool +} + +// NewRocksDBStore takes a file path and returns a connected Raft backend. +func NewRocksDBStore(path string) (*RocksDBStore, error) { + return New(Options{Path: path, NoSync: true}) +} + +// New uses the supplied options to open the RocksDB instance and prepare it for +// use as a raft backend. +func New(options Options) (*RocksDBStore, error) { + + // we need two column families, one for stable store and one for log store: + // stable : used for storing key configurations. + // log : used for storing logs in a durable fashion. + cfNames := []string{stableCF, logCF, "default"} + + defaultOpts := rocksdb.NewDefaultOptions() + + // global options + globalOpts := rocksdb.NewDefaultOptions() + globalOpts.SetCreateIfMissing(true) + globalOpts.SetCreateIfMissingColumnFamilies(true) + + // stable store options + stableBbto := rocksdb.NewDefaultBlockBasedTableOptions() + stableOpts := rocksdb.NewDefaultOptions() + stableOpts.SetBlockBasedTableFactory(stableBbto) + + // log store options + logBbto := rocksdb.NewDefaultBlockBasedTableOptions() + logBbto.SetBlockSize(32 * 1024) + logCache := rocksdb.NewLRUCache(512 * 1024 * 1024) + logBbto.SetBlockCache(logCache) + logOpts := rocksdb.NewDefaultOptions() + logOpts.SetUseFsync(!options.NoSync) + // dio := directIOSupported(options.Path) + // if dio { + // logOpts.SetUseDirectIOForFlushAndCompaction(true) + // } + logOpts.SetCompression(rocksdb.NoCompression) + + // in normal mode, by default, we try to minimize write amplification, + // so we set: + // + // L0 size = 256MBytes * 2 (min_write_buffer_number_to_merge) * \ + // 8 (level0_file_num_compaction_trigger) + // = 4GBytes + // L1 size close to L0, 4GBytes, max_bytes_for_level_base = 4GBytes, + // max_bytes_for_level_multiplier = 2 + // L2 size is 8G, L3 is 16G, L4 is 32G, L5 64G... + // + // note this is the size of a shard, and the content of the store is expected + // to be compacted by raft. + // + logOpts.SetMaxSubCompactions(2) // TODO what's this? + logOpts.SetEnablePipelinedWrite(true) + logOpts.SetWriteBufferSize(256 * 1024 * 1024) + logOpts.SetMinWriteBufferNumberToMerge(2) + logOpts.SetLevel0FileNumCompactionTrigger(8) + logOpts.SetLevel0SlowdownWritesTrigger(17) + logOpts.SetLevel0StopWritesTrigger(24) + logOpts.SetMaxWriteBufferNumber(25) + logOpts.SetNumLevels(7) + // MaxBytesForLevelBase is the total size of L1, should be close to + // the size of L0 + logOpts.SetMaxBytesForLevelBase(4 * 1024 * 1024 * 1024) + logOpts.SetMaxBytesForLevelMultiplier(2) + // files in L1 will have TargetFileSizeBase bytes + logOpts.SetTargetFileSizeBase(256 * 1024 * 1024) + logOpts.SetTargetFileSizeMultiplier(1) + // IO parallelism + logOpts.SetMaxBackgroundCompactions(2) + logOpts.SetMaxBackgroundFlushes(2) + + cfOpts := []*rocksdb.Options{stableOpts, logOpts, defaultOpts} + db, cfHandles, err := rocksdb.OpenDBColumnFamilies(options.Path, globalOpts, cfNames, cfOpts) + if err != nil { + return nil, err + } + + // read/write options + wo := rocksdb.NewDefaultWriteOptions() + wo.SetSync(!options.NoSync) + ro := rocksdb.NewDefaultReadOptions() + ro.SetFillCache(false) + + return &RocksDBStore{ + db: db, + path: options.Path, + stableCFHandle: cfHandles[0], + logCFHandle: cfHandles[1], + stableBbto: stableBbto, + stableOpts: stableOpts, + logBbto: logBbto, + logOpts: logOpts, + logCache: logCache, + globalOpts: globalOpts, + ro: ro, + wo: wo, + }, nil +} + +// Close is used to gracefully close the DB connection. +func (s *RocksDBStore) Close() error { + if s.db != nil { + s.db.Close() + } + for _, cfh := range s.cfHandles { + cfh.Destroy() + } + if s.stableBbto != nil { + s.stableBbto.Destroy() + } + if s.stableOpts != nil { + s.stableOpts.Destroy() + } + if s.logCache != nil { + s.logCache.Destroy() + } + if s.logBbto != nil { + s.logBbto.Destroy() + } + if s.logOpts != nil { + s.logOpts.Destroy() + } + if s.wo != nil { + s.wo.Destroy() + } + if s.ro != nil { + s.ro.Destroy() + } + s.db = nil + return nil +} + +// FirstIndex returns the first known index from the Raft log. +func (s *RocksDBStore) FirstIndex() (uint64, error) { + it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.logCFHandle) + defer it.Close() + it.SeekToFirst() + if it.Valid() { + slice := it.Key() + defer slice.Free() + key := make([]byte, slice.Size()) + copy(key, slice.Data()) + return util.BytesAsUint64(key), nil + } + return 0, nil +} + +// LastIndex returns the last known index from the Raft log. +func (s *RocksDBStore) LastIndex() (uint64, error) { + it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.logCFHandle) + defer it.Close() + it.SeekToLast() + if it.Valid() { + slice := it.Key() + defer slice.Free() + key := make([]byte, slice.Size()) + copy(key, slice.Data()) + return util.BytesAsUint64(key), nil + } + return 0, nil +} + +// GetLog gets a log entry at a given index. +func (s *RocksDBStore) GetLog(index uint64, log *raft.Log) error { + val, err := s.db.GetBytesCF(s.ro, s.logCFHandle, util.Uint64AsBytes(index)) + if err != nil { + return err + } + if val == nil { + return raft.ErrLogNotFound + } + return decodeMsgPack(val, log) +} + +// StoreLog stores a single raft log. +func (s *RocksDBStore) StoreLog(log *raft.Log) error { + val, err := encodeMsgPack(log) + if err != nil { + return err + } + return s.db.PutCF(s.wo, s.logCFHandle, util.Uint64AsBytes(log.Index), val.Bytes()) +} + +// StoreLogs stores a set of raft logs. +func (s *RocksDBStore) StoreLogs(logs []*raft.Log) error { + batch := rocksdb.NewWriteBatch() + for _, log := range logs { + key := util.Uint64AsBytes(log.Index) + val, err := encodeMsgPack(log) + if err != nil { + return err + } + batch.PutCF(s.logCFHandle, key, val.Bytes()) + } + return s.db.Write(s.wo, batch) +} + +// DeleteRange deletes logs within a given range inclusively. +func (s *RocksDBStore) DeleteRange(min, max uint64) error { + batch := rocksdb.NewWriteBatch() + batch.DeleteRangeCF(s.logCFHandle, util.Uint64AsBytes(min), util.Uint64AsBytes(max+1)) + return s.db.Write(s.wo, batch) +} + +// Set is used to set a key/value set outside of the raft log. +func (s *RocksDBStore) Set(key []byte, val []byte) error { + if err := s.db.PutCF(s.wo, s.stableCFHandle, key, val); err != nil { + return err + } + return nil +} + +// Get is used to retrieve a value from the k/v store by key +func (s *RocksDBStore) Get(key []byte) ([]byte, error) { + val, err := s.db.GetBytesCF(s.ro, s.stableCFHandle, key) + if err != nil { + return nil, err + } + if val == nil { + return nil, ErrKeyNotFound + } + return val, nil +} + +// SetUint64 is like Set, but handles uint64 values +func (s *RocksDBStore) SetUint64(key []byte, val uint64) error { + return s.Set(key, util.Uint64AsBytes(val)) +} + +// GetUint64 is like Get, but handles uint64 values +func (s *RocksDBStore) GetUint64(key []byte) (uint64, error) { // TODO use GEt instead getBytes + val, err := s.Get(key) + if err != nil { + return 0, err + } + return util.BytesAsUint64(val), nil +} + +// Decode reverses the encode operation on a byte slice input +func decodeMsgPack(buf []byte, out interface{}) error { + r := bytes.NewBuffer(buf) + hd := codec.MsgpackHandle{} + dec := codec.NewDecoder(r, &hd) + return dec.Decode(out) +} + +// Encode writes an encoded object to a new bytes buffer +func encodeMsgPack(in interface{}) (*bytes.Buffer, error) { + buf := bytes.NewBuffer(nil) + hd := codec.MsgpackHandle{} + enc := codec.NewEncoder(buf, &hd) + err := enc.Encode(in) + return buf, err +} diff --git a/raftwal/raftrocks/rocksdb_store_test.go b/raftwal/raftrocks/rocksdb_store_test.go new file mode 100644 index 000000000..19c7ad369 --- /dev/null +++ b/raftwal/raftrocks/rocksdb_store_test.go @@ -0,0 +1,268 @@ +/* + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package raftrocks + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/bbva/qed/rocksdb" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/raft" +) + +func testRocksDBStore(t testing.TB) (*RocksDBStore, string) { + path, err := ioutil.TempDir("", "raftrocks") + require.NoError(t, err) + os.RemoveAll(path) + + // Successfully creates and returns a store + store, err := NewRocksDBStore(path) + require.NoError(t, err) + + return store, path +} + +func testRaftLog(idx uint64, data string) *raft.Log { + return &raft.Log{ + Data: []byte(data), + Index: idx, + } +} + +func TestRocksDBStore_Implements(t *testing.T) { + var store interface{} = &RocksDBStore{} + if _, ok := store.(raft.StableStore); !ok { + t.Fatalf("RocksDBStore does not implement raft.StableStore") + } + if _, ok := store.(raft.LogStore); !ok { + t.Fatalf("RocksDBStore does not implement raft.LogStore") + } +} + +func TestNewRocksDBStore(t *testing.T) { + + store, path := testRocksDBStore(t) + + // Ensure the directory was created + require.Equal(t, path, store.path) + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Fatalf("err: %s", err) + } + + // Close the store so we can open again + require.NoError(t, store.Close()) + + // Ensure our files were created + opts := rocksdb.NewDefaultOptions() + opts.SetCreateIfMissing(false) + _, err := rocksdb.OpenDBForReadOnly(path, opts, true) + require.NoError(t, err) + +} + +func TestRocksDBStore_FirstIndex(t *testing.T) { + store, path := testRocksDBStore(t) + defer store.Close() + defer os.RemoveAll(path) + + // Should get 0 index on empty log + idx, err := store.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(0), idx) + + // Set a mock raft log + logs := []*raft.Log{ + testRaftLog(1, "log1"), + testRaftLog(2, "log2"), + testRaftLog(3, "log3"), + } + require.NoError(t, store.StoreLogs(logs)) + + // Fetch the first Raft index + idx, err = store.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), idx) +} + +func TestRocksDBStore_LastIndex(t *testing.T) { + store, path := testRocksDBStore(t) + defer store.Close() + defer os.RemoveAll(path) + + // Should get 0 index on empty log + idx, err := store.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(0), idx) + + // Set a mock raft log + logs := []*raft.Log{ + testRaftLog(1, "log1"), + testRaftLog(2, "log2"), + testRaftLog(3, "log3"), + } + require.NoError(t, store.StoreLogs(logs)) + + // Fetch the last Raft index + idx, err = store.LastIndex() + if err != nil { + t.Fatalf("err: %s", err) + } + require.NoError(t, err) + require.Equal(t, uint64(3), idx) +} + +func TestRocksDBStore_GetLog(t *testing.T) { + store, path := testRocksDBStore(t) + defer store.Close() + defer os.RemoveAll(path) + + log := new(raft.Log) + + // Should return an error on non-existent log + err := store.GetLog(1, log) + require.Equalf(t, err, raft.ErrLogNotFound, "Expected raft log not found") + + // Set a mock raft log + logs := []*raft.Log{ + testRaftLog(1, "log1"), + testRaftLog(2, "log2"), + testRaftLog(3, "log3"), + } + require.NoError(t, store.StoreLogs(logs)) + + // Should return the proper log + require.NoError(t, store.GetLog(2, log)) + require.Equal(t, log, logs[1]) +} + +func TestRocksDBStore_SetLog(t *testing.T) { + store, path := testRocksDBStore(t) + defer store.Close() + defer os.Remove(path) + + // Create the log + log := &raft.Log{ + Data: []byte("log1"), + Index: 1, + } + + // Attempt to store the log + require.NoError(t, store.StoreLog(log)) + + // Retrieve the log again + result := new(raft.Log) + require.NoError(t, store.GetLog(1, result)) + + // Ensure the log comes back the same + require.Equal(t, log, result) +} + +func TestRocksDBStore_SetLogs(t *testing.T) { + store, path := testRocksDBStore(t) + defer store.Close() + defer os.Remove(path) + + // Create a set of logs + logs := []*raft.Log{ + testRaftLog(1, "log1"), + testRaftLog(2, "log2"), + } + + // Attempt to store the logs + require.NoError(t, store.StoreLogs(logs)) + + // Ensure we stored them all + result1, result2 := new(raft.Log), new(raft.Log) + require.NoError(t, store.GetLog(1, result1)) + require.Equal(t, logs[0], result1) + + require.NoError(t, store.GetLog(2, result2)) + require.Equal(t, logs[1], result2) +} + +func TestRocksDBStore_DeleteRange(t *testing.T) { + store, path := testRocksDBStore(t) + defer store.Close() + defer os.Remove(path) + + // Create a set of logs + log1 := testRaftLog(1, "log1") + log2 := testRaftLog(2, "log2") + log3 := testRaftLog(3, "log3") + logs := []*raft.Log{log1, log2, log3} + + // Attempt to store the logs + require.NoError(t, store.StoreLogs(logs)) + + // Attempt to delete a range of logs + require.NoError(t, store.DeleteRange(1, 2)) + + // Ensure the logs were deleted + err := store.GetLog(1, new(raft.Log)) + require.Error(t, err) + require.Equal(t, raft.ErrLogNotFound, err) + + err = store.GetLog(2, new(raft.Log)) + require.Error(t, err) + require.Equal(t, raft.ErrLogNotFound, err) + +} + +func TestRocksDBStore_Set_Get(t *testing.T) { + store, path := testRocksDBStore(t) + defer store.Close() + defer os.Remove(path) + + // Returns error on non-existent key + _, err := store.Get([]byte("bad")) + require.Error(t, err) + require.Equal(t, err, ErrKeyNotFound) + + k, v := []byte("hello"), []byte("world") + + // Try to set a k/v pair + require.NoError(t, store.Set(k, v)) + + // Try to read it back + val, err := store.Get(k) + require.NoError(t, err) + require.Equal(t, v, val) +} + +func TestRocksDBStore_SetUint64_GetUint64(t *testing.T) { + store, path := testRocksDBStore(t) + defer store.Close() + defer os.Remove(path) + + // Returns error on non-existent key + _, err := store.GetUint64([]byte("bad")) + require.Error(t, err) + require.Equal(t, err, ErrKeyNotFound) + + k, v := []byte("abc"), uint64(123) + + // Attempt to set the k/v pair + require.NoError(t, store.SetUint64(k, v)) + + // Read back the value + val, err := store.GetUint64(k) + require.NoError(t, err) + require.Equal(t, v, val) +} diff --git a/rocksdb/cache.go b/rocksdb/cache.go new file mode 100644 index 000000000..ea38a6b83 --- /dev/null +++ b/rocksdb/cache.go @@ -0,0 +1,48 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rocksdb + +// #include +import "C" + +// Cache is a cache used to store data read from data in memory. +type Cache struct { + c *C.rocksdb_cache_t +} + +// NewLRUCache creates a new LRU Cache object with the given capacity. +func NewLRUCache(capacity int) *Cache { + return &Cache{ + c: C.rocksdb_cache_create_lru(C.size_t(capacity)), + } +} + +// GetUsage returns the Cache memory usage. +func (c *Cache) GetUsage() int { + return int(C.rocksdb_cache_get_usage(c.c)) +} + +// GetPinnedUsage returns the Cache pinned memory usage. +func (c *Cache) GetPinnedUsage() int { + return int(C.rocksdb_cache_get_pinned_usage(c.c)) +} + +// Destroy deallocates the Cache object. +func (c *Cache) Destroy() { + C.rocksdb_cache_destroy(c.c) + c.c = nil +} diff --git a/rocksdb/cf_handle.go b/rocksdb/cf_handle.go new file mode 100644 index 000000000..9b2f676b7 --- /dev/null +++ b/rocksdb/cf_handle.go @@ -0,0 +1,62 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rocksdb + +// #include +// #include +import "C" +import ( + "reflect" + "unsafe" +) + +// ColumnFamilyHandle represents a handle to a ColumnFamily. +type ColumnFamilyHandle struct { + c *C.rocksdb_column_family_handle_t +} + +// NewColumnFamilyHandle creates a ColumnFamilyHandle object. +func NewColumnFamilyHandle(c *C.rocksdb_column_family_handle_t) *ColumnFamilyHandle { + return &ColumnFamilyHandle{c} +} + +// UnsafeGetCFHandler returns the underlying c column family handle. +func (h *ColumnFamilyHandle) UnsafeGetCFHandler() unsafe.Pointer { + return unsafe.Pointer(h.c) +} + +// Destroy calls the destructor of the underlying column family handle. +func (h *ColumnFamilyHandle) Destroy() { + C.rocksdb_column_family_handle_destroy(h.c) +} + +type ColumnFamilyHandles []*ColumnFamilyHandle + +func (cfs ColumnFamilyHandles) toCSlice() columnFamilySlice { + cCFs := make(columnFamilySlice, len(cfs)) + for i, cf := range cfs { + cCFs[i] = cf.c + } + return cCFs +} + +type columnFamilySlice []*C.rocksdb_column_family_handle_t + +func (s columnFamilySlice) c() **C.rocksdb_column_family_handle_t { + sH := (*reflect.SliceHeader)(unsafe.Pointer(&s)) + return (**C.rocksdb_column_family_handle_t)(unsafe.Pointer(sH.Data)) +} diff --git a/rocksdb/cf_test.go b/rocksdb/cf_test.go new file mode 100644 index 000000000..24be5a108 --- /dev/null +++ b/rocksdb/cf_test.go @@ -0,0 +1,190 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rocksdb + +import ( + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestOpenDBColumnFamilies(t *testing.T) { + + dir, err := ioutil.TempDir("", "rocksdb-TestOpenDBColumnFamilies") + require.NoError(t, err) + + givenNames := []string{"default", "other"} + opts := NewDefaultOptions() + opts.SetCreateIfMissingColumnFamilies(true) + opts.SetCreateIfMissing(true) + + db, cfh, err := OpenDBColumnFamilies( + dir, opts, givenNames, []*Options{opts, opts}, + ) + require.NoError(t, err) + defer db.Close() + + require.Equal(t, len(cfh), 2) + cfh[0].Destroy() + cfh[1].Destroy() + + actualNames, err := ListColumnFamilies(dir, opts) + require.NoError(t, err) + require.ElementsMatch(t, actualNames, givenNames) + +} + +func TestColumnFamilyBatchPutGet(t *testing.T) { + db, cfh, closeF := newTestDBCF(t, "TestColumnFamilyBatchPutGet") + defer closeF() + + wo := NewDefaultWriteOptions() + defer wo.Destroy() + ro := NewDefaultReadOptions() + defer ro.Destroy() + + key0 := []byte("hello0") + value0 := []byte("world0") + key1 := []byte("hello1") + value1 := []byte("world1") + + batch0 := NewWriteBatch() + defer batch0.Destroy() + batch0.PutCF(cfh[0], key0, value0) + require.NoError(t, db.Write(wo, batch0)) + actualValue0, err := db.GetCF(ro, cfh[0], key0) + defer actualValue0.Free() + require.NoError(t, err) + require.Equal(t, actualValue0.Data(), value0) + + batch1 := NewWriteBatch() + defer batch1.Destroy() + batch1.PutCF(cfh[1], key1, value1) + require.NoError(t, db.Write(wo, batch1)) + actualValue1, err := db.GetCF(ro, cfh[1], key1) + defer actualValue1.Free() + require.NoError(t, err) + require.Equal(t, actualValue1.Data(), value1) + + // check the keys are not inserted in different CF + actualValue, err := db.GetCF(ro, cfh[0], key1) + require.NoError(t, err) + require.Equal(t, actualValue.Size(), 0) + actualValue, err = db.GetCF(ro, cfh[1], key0) + require.NoError(t, err) + require.Equal(t, actualValue.Size(), 0) + +} + +func TestColumnFamilyPutGetDelete(t *testing.T) { + db, cfh, closeF := newTestDBCF(t, "TestColumnFamilyPutGetDelete") + defer closeF() + + wo := NewDefaultWriteOptions() + defer wo.Destroy() + ro := NewDefaultReadOptions() + defer ro.Destroy() + + key0 := []byte("hello0") + value0 := []byte("world0") + key1 := []byte("hello1") + value1 := []byte("world1") + + require.NoError(t, db.PutCF(wo, cfh[0], key0, value0)) + actualValue0, err := db.GetCF(ro, cfh[0], key0) + defer actualValue0.Free() + require.NoError(t, err) + require.Equal(t, actualValue0.Data(), value0) + + require.NoError(t, db.PutCF(wo, cfh[1], key1, value1)) + actualValue1, err := db.GetCF(ro, cfh[1], key1) + defer actualValue1.Free() + require.NoError(t, err) + require.Equal(t, actualValue1.Data(), value1) + + actualValue, err := db.GetCF(ro, cfh[0], key1) + require.NoError(t, err) + require.Equal(t, actualValue.Size(), 0) + actualValue, err = db.GetCF(ro, cfh[1], key0) + require.NoError(t, err) + require.Equal(t, actualValue.Size(), 0) + + require.NoError(t, db.DeleteCF(wo, cfh[0], key0)) + actualValue, err = db.GetCF(ro, cfh[0], key0) + require.NoError(t, err) + +} + +func TestDeleteRangeCF(t *testing.T) { + + db, cfh, closeF := newTestDBCF(t, "TestColumnFamilyPutGetDelete") + defer closeF() + + wo := NewDefaultWriteOptions() + defer wo.Destroy() + ro := NewDefaultReadOptions() + defer ro.Destroy() + + var ( + key1 = []byte("key1") + key2 = []byte("key2") + key3 = []byte("key3") + key4 = []byte("key4") + val1 = []byte("value") + val2 = []byte("12345678") + val3 = []byte("abcdefg") + val4 = []byte("xyz") + ) + + require.NoError(t, db.PutCF(wo, cfh[0], key1, val1)) + require.NoError(t, db.PutCF(wo, cfh[0], key2, val2)) + require.NoError(t, db.PutCF(wo, cfh[1], key3, val3)) + require.NoError(t, db.PutCF(wo, cfh[1], key4, val4)) + + actualVal1, err := db.GetBytesCF(ro, cfh[0], key1) + require.NoError(t, err) + require.Equal(t, actualVal1, val1) + actualVal2, err := db.GetBytesCF(ro, cfh[0], key2) + require.NoError(t, err) + require.Equal(t, actualVal2, val2) + actualVal3, err := db.GetBytesCF(ro, cfh[1], key3) + require.NoError(t, err) + require.Equal(t, actualVal3, val3) + actualVal4, err := db.GetBytesCF(ro, cfh[1], key4) + require.NoError(t, err) + require.Equal(t, actualVal4, val4) + + batch := NewWriteBatch() + defer batch.Destroy() + batch.DeleteRangeCF(cfh[0], key2, key4) // only keys from "defaul" cf + db.Write(wo, batch) + + actualVal1, err = db.GetBytesCF(ro, cfh[0], key1) + require.NoError(t, err) + require.Equal(t, actualVal1, val1) + actualVal2, err = db.GetBytesCF(ro, cfh[0], key2) + require.NoError(t, err) + require.Nil(t, actualVal2) // <- the only one deleted + actualVal3, err = db.GetBytesCF(ro, cfh[1], key3) + require.NoError(t, err) + require.Equal(t, actualVal3, val3) + actualVal4, err = db.GetBytesCF(ro, cfh[1], key4) + require.NoError(t, err) + require.Equal(t, actualVal4, val4) + +} diff --git a/rocksdb/db.go b/rocksdb/db.go index f440d702c..63eef90dd 100644 --- a/rocksdb/db.go +++ b/rocksdb/db.go @@ -26,7 +26,7 @@ import ( // DB is a reusable handler to a RocksDB database on disk, created by OpenDB. type DB struct { - db *C.rocksdb_t + c *C.rocksdb_t opts *Options } @@ -36,41 +36,186 @@ func OpenDB(path string, opts *Options) (*DB, error) { cPath := C.CString(path) defer C.free(unsafe.Pointer(cPath)) - db := C.rocksdb_open(opts.opts, cPath, &cErr) + db := C.rocksdb_open(opts.c, cPath, &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &DB{ - db: db, + c: db, opts: opts, }, nil } -// OpenDBForReadOnly opens a database with the specified options for readonly usage. +// OpenDBForReadOnly opens a database with the specified options for read-only usage. func OpenDBForReadOnly(path string, opts *Options, errorIfLogFileExist bool) (*DB, error) { var cErr *C.char cPath := C.CString(path) defer C.free(unsafe.Pointer(cPath)) - db := C.rocksdb_open_for_read_only(opts.opts, cPath, boolToUchar(errorIfLogFileExist), &cErr) + db := C.rocksdb_open_for_read_only(opts.c, cPath, boolToUchar(errorIfLogFileExist), &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &DB{ - db: db, + c: db, opts: opts, }, nil } +// OpenDBColumnFamilies opens a database with the specified column families. +func OpenDBColumnFamilies( + path string, + opts *Options, + cfNames []string, + cfOpts []*Options, +) (*DB, ColumnFamilyHandles, error) { + + numColumnFamilies := len(cfNames) + if numColumnFamilies != len(cfOpts) { + return nil, nil, errors.New("must provide the same number of column family names and options") + } + + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) + + cNames := make([]*C.char, numColumnFamilies) + for i, s := range cfNames { + cNames[i] = C.CString(s) + } + defer func() { + for _, s := range cNames { + C.free(unsafe.Pointer(s)) + } + }() + + cOpts := make([]*C.rocksdb_options_t, numColumnFamilies) + for i, o := range cfOpts { + cOpts[i] = o.c + } + + cHandles := make([]*C.rocksdb_column_family_handle_t, numColumnFamilies) + + var cErr *C.char + db := C.rocksdb_open_column_families( + opts.c, + cPath, + C.int(numColumnFamilies), + &cNames[0], + &cOpts[0], + &cHandles[0], + &cErr, + ) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return nil, nil, errors.New(C.GoString(cErr)) + } + + cfHandles := make([]*ColumnFamilyHandle, numColumnFamilies) + for i, c := range cHandles { + cfHandles[i] = NewColumnFamilyHandle(c) + } + + return &DB{ + c: db, + opts: opts, + }, cfHandles, nil +} + +// OpenDBForReadOnlyColumnFamilies opens a database with the specified column +// families in read-only mode. +func OpenDBForReadOnlyColumnFamilies( + path string, + opts *Options, + cfNames []string, + cfOpts []*Options, + errorIfLogFileExist bool, +) (*DB, ColumnFamilyHandles, error) { + + numColumnFamilies := len(cfNames) + if numColumnFamilies != len(cfOpts) { + return nil, nil, errors.New("must provide the same number of column family names and options") + } + + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) + + cNames := make([]*C.char, numColumnFamilies) + for i, s := range cfNames { + cNames[i] = C.CString(s) + } + defer func() { + for _, s := range cNames { + C.free(unsafe.Pointer(s)) + } + }() + + cOpts := make([]*C.rocksdb_options_t, numColumnFamilies) + for i, o := range cfOpts { + cOpts[i] = o.c + } + + cHandles := make([]*C.rocksdb_column_family_handle_t, numColumnFamilies) + + var cErr *C.char + db := C.rocksdb_open_for_read_only_column_families( + opts.c, + cPath, + C.int(numColumnFamilies), + &cNames[0], + &cOpts[0], + &cHandles[0], + boolToUchar(errorIfLogFileExist), + &cErr, + ) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return nil, nil, errors.New(C.GoString(cErr)) + } + + cfHandles := make([]*ColumnFamilyHandle, numColumnFamilies) + for i, c := range cHandles { + cfHandles[i] = NewColumnFamilyHandle(c) + } + + return &DB{ + c: db, + opts: opts, + }, cfHandles, nil +} + +// ListColumnFamilies lists the names of the column families in the DB. +func ListColumnFamilies(path string, opts *Options) ([]string, error) { + var cErr *C.char + var cLen C.size_t + var cPath = C.CString(path) + defer C.free(unsafe.Pointer(cPath)) + + cNames := C.rocksdb_list_column_families(opts.c, cPath, &cLen, &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + + namesLen := int(cLen) + names := make([]string, namesLen) + cNamesArr := (*[1 << 30]*C.char)(unsafe.Pointer(cNames))[:namesLen:namesLen] + for i, n := range cNamesArr { + names[i] = C.GoString(n) + } + + C.rocksdb_list_column_families_destroy(cNames, cLen) + return names, nil +} + // Close closes the database. func (db *DB) Close() error { - if db.db != nil { - C.rocksdb_close(db.db) - db.db = nil + if db.c != nil { + C.rocksdb_close(db.c) + db.c = nil } db.opts.Destroy() return nil @@ -79,7 +224,7 @@ func (db *DB) Close() error { // NewCheckpoint creates a new Checkpoint for this db. func (db *DB) NewCheckpoint() (*Checkpoint, error) { var cErr *C.char - cCheckpoint := C.rocksdb_checkpoint_object_create(db.db, &cErr) + cCheckpoint := C.rocksdb_checkpoint_object_create(db.c, &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -88,11 +233,24 @@ func (db *DB) NewCheckpoint() (*Checkpoint, error) { } // Put writes data associated with a key to the database. -func (db *DB) Put(opts *WriteOptions, key, value []byte) error { +func (db *DB) Put(wo *WriteOptions, key, value []byte) error { + cKey := bytesToChar(key) + cValue := bytesToChar(value) + var cErr *C.char + C.rocksdb_put(db.c, wo.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + +// PutCF writes data associated with a key to the database and a column family. +func (db *DB) PutCF(wo *WriteOptions, cf *ColumnFamilyHandle, key, value []byte) error { cKey := bytesToChar(key) cValue := bytesToChar(value) var cErr *C.char - C.rocksdb_put(db.db, opts.opts, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) + C.rocksdb_put_cf(db.c, wo.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -101,11 +259,11 @@ func (db *DB) Put(opts *WriteOptions, key, value []byte) error { } // Get returns the data associated with the key from the database. -func (db *DB) Get(opts *ReadOptions, key []byte) (*Slice, error) { +func (db *DB) Get(ro *ReadOptions, key []byte) (*Slice, error) { var cErr *C.char var cValueLen C.size_t cKey := bytesToChar(key) - cValue := C.rocksdb_get(db.db, opts.opts, cKey, C.size_t(len(key)), &cValueLen, &cErr) + cValue := C.rocksdb_get(db.c, ro.c, cKey, C.size_t(len(key)), &cValueLen, &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -114,11 +272,42 @@ func (db *DB) Get(opts *ReadOptions, key []byte) (*Slice, error) { } // GetBytes is like Get but returns a copy of the data instead of a Slice. -func (db *DB) GetBytes(opts *ReadOptions, key []byte) ([]byte, error) { +func (db *DB) GetBytes(ro *ReadOptions, key []byte) ([]byte, error) { var cErr *C.char var cValueLen C.size_t cKey := bytesToChar(key) - cValue := C.rocksdb_get(db.db, opts.opts, cKey, C.size_t(len(key)), &cValueLen, &cErr) + cValue := C.rocksdb_get(db.c, ro.c, cKey, C.size_t(len(key)), &cValueLen, &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + if cValue == nil { + return nil, nil + } + defer C.free(unsafe.Pointer(cValue)) + return C.GoBytes(unsafe.Pointer(cValue), C.int(cValueLen)), nil +} + +// GetCF returns the data associated with the key from the database +// and column family. +func (db *DB) GetCF(ro *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { + var cErr *C.char + var cValueLen C.size_t + cKey := bytesToChar(key) + cValue := C.rocksdb_get_cf(db.c, ro.c, cf.c, cKey, C.size_t(len(key)), &cValueLen, &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewSlice(cValue, cValueLen), nil +} + +// GetBytesCF is like GetCF but returns a copy of the data instead of a Slice. +func (db *DB) GetBytesCF(ro *ReadOptions, cf *ColumnFamilyHandle, key []byte) ([]byte, error) { + var cErr *C.char + var cValueLen C.size_t + cKey := bytesToChar(key) + cValue := C.rocksdb_get_cf(db.c, ro.c, cf.c, cKey, C.size_t(len(key)), &cValueLen, &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -131,10 +320,22 @@ func (db *DB) GetBytes(opts *ReadOptions, key []byte) ([]byte, error) { } // Delete removes the data associated with the key from the database. -func (db *DB) Delete(opts *WriteOptions, key []byte) error { +func (db *DB) Delete(wo *WriteOptions, key []byte) error { var cErr *C.char cKey := bytesToChar(key) - C.rocksdb_delete(db.db, opts.opts, cKey, C.size_t(len(key)), &cErr) + C.rocksdb_delete(db.c, wo.c, cKey, C.size_t(len(key)), &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + +// DeleteCF removes the data associated with the key from the database and column family. +func (db *DB) DeleteCF(wo *WriteOptions, cf *ColumnFamilyHandle, key []byte) error { + var cErr *C.char + cKey := bytesToChar(key) + C.rocksdb_delete_cf(db.c, wo.c, cf.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -143,9 +344,9 @@ func (db *DB) Delete(opts *WriteOptions, key []byte) error { } // Write writes a WriteBatch to the database -func (db *DB) Write(opts *WriteOptions, batch *WriteBatch) error { +func (db *DB) Write(wo *WriteOptions, batch *WriteBatch) error { var cErr *C.char - C.rocksdb_write(db.db, opts.opts, batch.batch, &cErr) + C.rocksdb_write(db.c, wo.c, batch.c, &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -155,15 +356,22 @@ func (db *DB) Write(opts *WriteOptions, batch *WriteBatch) error { // NewIterator returns an Iterator over the the database that uses the // ReadOptions given. -func (db *DB) NewIterator(opts *ReadOptions) *Iterator { - cIter := C.rocksdb_create_iterator(db.db, opts.opts) +func (db *DB) NewIterator(ro *ReadOptions) *Iterator { + cIter := C.rocksdb_create_iterator(db.c, ro.c) + return NewNativeIterator(unsafe.Pointer(cIter)) +} + +// NewIteratorCF returns an Iterator over the the database and column family +// that uses the ReadOptions given. +func (db *DB) NewIteratorCF(ro *ReadOptions, cf *ColumnFamilyHandle) *Iterator { + cIter := C.rocksdb_create_iterator_cf(db.c, ro.c, cf.c) return NewNativeIterator(unsafe.Pointer(cIter)) } // Flush triggers a manuel flush for the database. -func (db *DB) Flush(opts *FlushOptions) error { +func (db *DB) Flush(fo *FlushOptions) error { var cErr *C.char - C.rocksdb_flush(db.db, opts.opts, &cErr) + C.rocksdb_flush(db.c, fo.c, &cErr) if cErr != nil { defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) diff --git a/rocksdb/options.go b/rocksdb/options.go index 4955daba7..8ef7add0a 100644 --- a/rocksdb/options.go +++ b/rocksdb/options.go @@ -36,7 +36,7 @@ const ( // Options represent all of the available options when opening a database with Open. type Options struct { - opts *C.rocksdb_options_t + c *C.rocksdb_options_t // Hold references for GC. bbto *BlockBasedTableOptions @@ -44,14 +44,14 @@ type Options struct { // NewDefaultOptions creates the default Options. func NewDefaultOptions() *Options { - return &Options{opts: C.rocksdb_options_create()} + return &Options{c: C.rocksdb_options_create()} } // SetCreateIfMissing specifies whether the database // should be created if it is missing. // Default: false func (o *Options) SetCreateIfMissing(value bool) { - C.rocksdb_options_set_create_if_missing(o.opts, boolToUchar(value)) + C.rocksdb_options_set_create_if_missing(o.c, boolToUchar(value)) } // IncreaseParallelism sets the level of parallelism. @@ -62,7 +62,7 @@ func (o *Options) SetCreateIfMissing(value bool) { // cores. You almost definitely want to call this function if your system is // bottlenecked by RocksDB. func (o *Options) IncreaseParallelism(totalThreads int) { - C.rocksdb_options_increase_parallelism(o.opts, C.int(totalThreads)) + C.rocksdb_options_increase_parallelism(o.c, C.int(totalThreads)) } // SetMaxWriteBufferNumber sets the maximum number of write buffers (memtables) @@ -72,7 +72,7 @@ func (o *Options) IncreaseParallelism(totalThreads int) { // storage, new writes can continue to the other write buffer. // Default: 2 func (o *Options) SetMaxWriteBufferNumber(value int) { - C.rocksdb_options_set_max_write_buffer_number(o.opts, C.int(value)) + C.rocksdb_options_set_max_write_buffer_number(o.c, C.int(value)) } // SetMinWriteBufferNumberToMerge sets the minimum number of write buffers @@ -85,13 +85,185 @@ func (o *Options) SetMaxWriteBufferNumber(value int) { // individual write buffers. // Default: 1 func (o *Options) SetMinWriteBufferNumberToMerge(value int) { - C.rocksdb_options_set_min_write_buffer_number_to_merge(o.opts, C.int(value)) + C.rocksdb_options_set_min_write_buffer_number_to_merge(o.c, C.int(value)) } // SetBlockBasedTableFactory sets the block based table factory. func (o *Options) SetBlockBasedTableFactory(value *BlockBasedTableOptions) { o.bbto = value - C.rocksdb_options_set_block_based_table_factory(o.opts, value.opts) + C.rocksdb_options_set_block_based_table_factory(o.c, value.c) +} + +// SetCreateIfMissingColumnFamilies specifies whether the column families +// should be created if they are missing. +func (o *Options) SetCreateIfMissingColumnFamilies(value bool) { + C.rocksdb_options_set_create_missing_column_families(o.c, boolToUchar(value)) +} + +// SetCompression sets the compression algorithm. +// Default: SnappyCompression, which gives lightweight but fast +// compression. +func (o *Options) SetCompression(value CompressionType) { + C.rocksdb_options_set_compression(o.c, C.int(value)) +} + +// SetNumLevels sets the number of levels for this database. +// Default: 7 +func (o *Options) SetNumLevels(value int) { + C.rocksdb_options_set_num_levels(o.c, C.int(value)) +} + +// SetLevel0FileNumCompactionTrigger sets the number of files +// to trigger level-0 compaction. +// +// A value <0 means that level-0 compaction will not be +// triggered by number of files at all. +// Default: 4 +func (o *Options) SetLevel0FileNumCompactionTrigger(value int) { + C.rocksdb_options_set_level0_file_num_compaction_trigger(o.c, C.int(value)) +} + +// SetLevel0SlowdownWritesTrigger sets the soft limit on number of level-0 files. +// +// We start slowing down writes at this point. +// A value <0 means that no writing slow down will be triggered by +// number of files in level-0. +// Default: 8 +func (o *Options) SetLevel0SlowdownWritesTrigger(value int) { + C.rocksdb_options_set_level0_slowdown_writes_trigger(o.c, C.int(value)) +} + +// SetLevel0StopWritesTrigger sets the maximum number of level-0 files. +// We stop writes at this point. +// Default: 12 +func (o *Options) SetLevel0StopWritesTrigger(value int) { + C.rocksdb_options_set_level0_stop_writes_trigger(o.c, C.int(value)) +} + +// SetMaxBytesForLevelBase sets the maximum total data size for a level. +// +// It is the max total for level-1. +// Maximum number of bytes for level L can be calculated as +// (max_bytes_for_level_base) * (max_bytes_for_level_multiplier ^ (L-1)) +// +// For example, if max_bytes_for_level_base is 20MB, and if +// max_bytes_for_level_multiplier is 10, total data size for level-1 +// will be 20MB, total file size for level-2 will be 200MB, +// and total file size for level-3 will be 2GB. +// Default: 10MB +func (o *Options) SetMaxBytesForLevelBase(value uint64) { + C.rocksdb_options_set_max_bytes_for_level_base(o.c, C.uint64_t(value)) +} + +// SetMaxBytesForLevelMultiplier sets the max Bytes for level multiplier. +// Default: 10 +func (o *Options) SetMaxBytesForLevelMultiplier(value float64) { + C.rocksdb_options_set_max_bytes_for_level_multiplier(o.c, C.double(value)) +} + +// SetTargetFileSizeBase sets the target file size for compaction. +// +// Target file size is per-file size for level-1. +// Target file size for level L can be calculated by +// target_file_size_base * (target_file_size_multiplier ^ (L-1)) +// +// For example, if target_file_size_base is 2MB and +// target_file_size_multiplier is 10, then each file on level-1 will +// be 2MB, and each file on level 2 will be 20MB, +// and each file on level-3 will be 200MB. +// Default: 2MB +func (o *Options) SetTargetFileSizeBase(value uint64) { + C.rocksdb_options_set_target_file_size_base(o.c, C.uint64_t(value)) +} + +// SetTargetFileSizeMultiplier sets the target file size multiplier for compaction. +// Default: 1 +func (o *Options) SetTargetFileSizeMultiplier(value int) { + C.rocksdb_options_set_target_file_size_multiplier(o.c, C.int(value)) +} + +// SetWriteBufferSize sets the amount of data to build up in memory +// (backed by an unsorted log on disk) before converting to a sorted on-disk file. +// +// Larger values increase performance, especially during bulk loads. +// Up to max_write_buffer_number write buffers may be held in memory +// at the same time, so you may wish to adjust this parameter to control +// memory usage. +// Also, a larger write buffer will result in a longer recovery time +// the next time the database is opened. +// Default: 4MB +func (o *Options) SetWriteBufferSize(value int) { + C.rocksdb_options_set_write_buffer_size(o.c, C.size_t(value)) +} + +// SetDbWriteBufferSize sets the amount of data to build up +// in memtables across all column families before writing to disk. +// +// This is distinct from write_buffer_size, which enforces a limit +// for a single memtable. +// +// This feature is disabled by default. Specify a non-zero value +// to enable it. +// +// Default: 0 (disabled) +func (o *Options) SetDbWriteBufferSize(value int) { + C.rocksdb_options_set_db_write_buffer_size(o.c, C.size_t(value)) +} + +func (o *Options) SetMaxSubCompactions(value int) { + C.rocksdb_options_set_max_subcompactions(o.c, C.uint(value)) +} + +// SetEnablePipelinedWrite improves concurrent write throughput in +// case WAL is enabled. By default, a single write thread queue is +// maintained for concurrent writers. The thread gets to the head +// of the queue becomes write batch group leader and responsible +// for writing to WAL and memtable for the batch group. +// One observation is that WAL writes and memtable writes are sequential +// and by making them run in parallel we can increase throughput. +// For one single writer WAL writes and memtable writes has to run +// sequentially. With concurrent writers, once the previous writer +// finish WAL write, the next writer waiting in the write queue can +// start to write WAL while the previous writer still have memtable +// write ongoing. +func (o *Options) SetEnablePipelinedWrite(value bool) { + C.rocksdb_options_set_enable_pipelined_write(o.c, boolToUchar(value)) +} + +// SetUseFsync enable/disable fsync. +// +// If true, then every store to stable storage will issue a fsync. +// If false, then every store to stable storage will issue a fdatasync. +// This parameter should be set to true while storing data to +// filesystem like ext3 that can lose files after a reboot. +// Default: false +func (o *Options) SetUseFsync(value bool) { + C.rocksdb_options_set_use_fsync(o.c, C.int(btoi(value))) +} + +// SetUseDirectReads enable/disable direct I/O mode (O_DIRECT) for reads. +// Default: false +func (o *Options) SetUseDirectReads(value bool) { + C.rocksdb_options_set_use_direct_reads(o.c, boolToUchar(value)) +} + +// SetUseDirectIOForFlushAndCompaction enable/disable direct I/O mode (O_DIRECT) +// for both reads and writes in background flush and compactions. +// When true, new_table_reader_for_compaction_inputs is forced to true. +// Default: false +func (o *Options) SetUseDirectIOForFlushAndCompaction(value bool) { + C.rocksdb_options_set_use_direct_io_for_flush_and_compaction(o.c, boolToUchar(value)) +} + +// SetMaxTotalWalSize sets the maximum total wal size in bytes. +// Once write-ahead logs exceed this size, we will start forcing the flush of +// column families whose memtables are backed by the oldest live WAL file +// (i.e. the ones that are causing all the space amplification). If set to 0 +// (default), we will dynamically choose the WAL size limit to be +// [sum of all write_buffer_size * max_write_buffer_number] * 4 +// Default: 0 +func (o *Options) SetMaxTotalWalSize(value uint64) { + C.rocksdb_options_set_max_total_wal_size(o.c, C.uint64_t(value)) } // SetDBLogDir specifies the absolute info LOG dir. @@ -104,27 +276,51 @@ func (o *Options) SetBlockBasedTableFactory(value *BlockBasedTableOptions) { func (o *Options) SetDBLogDir(value string) { cValue := C.CString(value) defer C.free(unsafe.Pointer(cValue)) - C.rocksdb_options_set_db_log_dir(o.opts, cValue) + C.rocksdb_options_set_db_log_dir(o.c, cValue) } // SetWalDir specifies the absolute dir path for write-ahead logs (WAL). // // If it is empty, the log files will be in the same dir as data. // If it is non empty, the log files will be in the specified dir, -// When destroying the db, all log files and the dir itopts is deleted. +// When destroying the db, all log files and the dir are deleted. // Default: empty func (o *Options) SetWalDir(value string) { cValue := C.CString(value) defer C.free(unsafe.Pointer(cValue)) - C.rocksdb_options_set_wal_dir(o.opts, cValue) + C.rocksdb_options_set_wal_dir(o.c, cValue) +} + +// SetMaxBackgroundCompactions sets the maximum number of +// concurrent background jobs, submitted to +// the default LOW priority thread pool +// Default: 1 +func (o *Options) SetMaxBackgroundCompactions(value int) { + C.rocksdb_options_set_max_background_compactions(o.c, C.int(value)) +} + +// SetMaxBackgroundFlushes sets the maximum number of +// concurrent background memtable flush jobs, submitted to +// the HIGH priority thread pool. +// +// By default, all background jobs (major compaction and memtable flush) go +// to the LOW priority pool. If this option is set to a positive number, +// memtable flush jobs will be submitted to the HIGH priority pool. +// It is important when the same Env is shared by multiple db instances. +// Without a separate pool, long running major compaction jobs could +// potentially block memtable flush jobs of other db instances, leading to +// unnecessary Put stalls. +// Default: 0 +func (o *Options) SetMaxBackgroundFlushes(value int) { + C.rocksdb_options_set_max_background_flushes(o.c, C.int(value)) } // Destroy deallocates the Options object. func (o *Options) Destroy() { - C.rocksdb_options_destroy(o.opts) + C.rocksdb_options_destroy(o.c) if o.bbto != nil { o.bbto.Destroy() } - o.opts = nil + o.c = nil o.bbto = nil } diff --git a/rocksdb/options_block_based_table.go b/rocksdb/options_block_based_table.go index 44371f7dd..f9fab1b6f 100644 --- a/rocksdb/options_block_based_table.go +++ b/rocksdb/options_block_based_table.go @@ -21,7 +21,10 @@ import "C" // BlockBasedTableOptions represents block-based table options. type BlockBasedTableOptions struct { - opts *C.rocksdb_block_based_table_options_t + c *C.rocksdb_block_based_table_options_t + + // Hold references for GC. + cache *Cache // We keep these so we can free their memory in Destroy. fp *C.rocksdb_filterpolicy_t @@ -29,14 +32,14 @@ type BlockBasedTableOptions struct { // NewDefaultBlockBasedTableOptions creates a default BlockBasedTableOptions object. func NewDefaultBlockBasedTableOptions() *BlockBasedTableOptions { - return &BlockBasedTableOptions{opts: C.rocksdb_block_based_options_create()} + return &BlockBasedTableOptions{c: C.rocksdb_block_based_options_create()} } // Destroy deallocates the BlockBasedTableOptions object. func (o *BlockBasedTableOptions) Destroy() { //C.rocksdb_filterpolicy_destroy(o.fp) - C.rocksdb_block_based_options_destroy(o.opts) - o.opts = nil + C.rocksdb_block_based_options_destroy(o.c) + o.c = nil o.fp = nil } @@ -45,40 +48,51 @@ func (o *BlockBasedTableOptions) Destroy() { // block during table initialization. // Default: false func (o *BlockBasedTableOptions) SetCacheIndexAndFilterBlocks(value bool) { - C.rocksdb_block_based_options_set_cache_index_and_filter_blocks(o.opts, boolToUchar(value)) + C.rocksdb_block_based_options_set_cache_index_and_filter_blocks(o.c, boolToUchar(value)) } // SetBlockSize sets the approximate size of user data packed per block. -// Note that the block size specified here corresponds to opts uncompressed data. +// Note that the block size specified here corresponds to uncompressed data. // The actual size of the unit read from disk may be smaller if // compression is enabled. This parameter can be changed dynamically. // Default: 4K func (o *BlockBasedTableOptions) SetBlockSize(blockSize int) { - C.rocksdb_block_based_options_set_block_size(o.opts, C.size_t(blockSize)) + C.rocksdb_block_based_options_set_block_size(o.c, C.size_t(blockSize)) } // SetBlockSizeDeviation sets the block size deviation. -// This is used opts close a block before it reaches the configured +// This is used to close a block before it reaches the configured // 'block_size'. If the percentage of free space in the current block is less -// than this specified number and adding a new record opts the block will +// than this specified number and adding a new record to the block will // exceed the configured block size, then this block will be closed and the -// new record will be written opts the next block. +// new record will be written to the next block. // Default: 10 func (o *BlockBasedTableOptions) SetBlockSizeDeviation(blockSizeDeviation int) { - C.rocksdb_block_based_options_set_block_size_deviation(o.opts, C.int(blockSizeDeviation)) + C.rocksdb_block_based_options_set_block_size_deviation(o.c, C.int(blockSizeDeviation)) } -// SetFilterPolicy sets the filter policy opts reduce disk reads. +// SetFilterPolicy sets the filter policy to reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. // Default: nil func (o *BlockBasedTableOptions) SetFilterPolicy(fp *FilterPolicy) { - C.rocksdb_block_based_options_set_filter_policy(o.opts, fp.policy) + C.rocksdb_block_based_options_set_filter_policy(o.c, fp.policy) o.fp = fp.policy } // SetNoBlockCache specify whether block cache should be used or not. // Default: false func (o *BlockBasedTableOptions) SetNoBlockCache(value bool) { - C.rocksdb_block_based_options_set_no_block_cache(o.opts, boolToUchar(value)) + C.rocksdb_block_based_options_set_no_block_cache(o.c, boolToUchar(value)) +} + +// SetBlockCache sets the control over blocks (user data is stored in a set of blocks, and +// a block is the unit of reading from disk). +// +// If set, use the specified cache for blocks. +// If nil, rocksdb will automatically create and use an 8MB internal cache. +// Default: nil +func (o *BlockBasedTableOptions) SetBlockCache(cache *Cache) { + o.cache = cache + C.rocksdb_block_based_options_set_block_cache(o.c, cache.c) } diff --git a/rocksdb/options_flush.go b/rocksdb/options_flush.go index 81a95816d..4515bf552 100644 --- a/rocksdb/options_flush.go +++ b/rocksdb/options_flush.go @@ -22,7 +22,7 @@ import "C" // FlushOptions represent all of the available options when manual flushing the // database. type FlushOptions struct { - opts *C.rocksdb_flushoptions_t + c *C.rocksdb_flushoptions_t } // NewDefaultFlushOptions creates a default FlushOptions object. @@ -33,11 +33,11 @@ func NewDefaultFlushOptions() *FlushOptions { // SetWait specify if the flush will wait until the flush is done. // Default: true func (o *FlushOptions) SetWait(value bool) { - C.rocksdb_flushoptions_set_wait(o.opts, boolToUchar(value)) + C.rocksdb_flushoptions_set_wait(o.c, boolToUchar(value)) } // Destroy deallocates the FlushOptions object. func (o *FlushOptions) Destroy() { - C.rocksdb_flushoptions_destroy(o.opts) - o.opts = nil + C.rocksdb_flushoptions_destroy(o.c) + o.c = nil } diff --git a/rocksdb/options_read.go b/rocksdb/options_read.go index 1b215eb21..33fa16c49 100644 --- a/rocksdb/options_read.go +++ b/rocksdb/options_read.go @@ -20,12 +20,12 @@ package rocksdb import "C" type ReadOptions struct { - opts *C.rocksdb_readoptions_t + c *C.rocksdb_readoptions_t } // NewDefaultReadOptions creates a default ReadOptions object. func NewDefaultReadOptions() *ReadOptions { - return &ReadOptions{opts: C.rocksdb_readoptions_create()} + return &ReadOptions{c: C.rocksdb_readoptions_create()} } // SetFillCache specify whether the "data block"/"index block"/"filter block" @@ -33,11 +33,19 @@ func NewDefaultReadOptions() *ReadOptions { // Callers may wish to set this field to false for bulk scans. // Default: true func (o *ReadOptions) SetFillCache(value bool) { - C.rocksdb_readoptions_set_fill_cache(o.opts, boolToUchar(value)) + C.rocksdb_readoptions_set_fill_cache(o.c, boolToUchar(value)) +} + +// SetIgnoreRangeDeletions specify whether keys deleted using the DeleteRange() +// API will be visible to readers until they are naturally deleted during compaction. +// This improves read performance in DBs with many range deletions. +// Default: false +func (o *ReadOptions) SetIgnoreRangeDeletions(value bool) { + C.rocksdb_readoptions_set_ignore_range_deletions(o.c, boolToUchar(value)) } // Destroy deallocates the ReadOptions object. func (o *ReadOptions) Destroy() { - C.rocksdb_readoptions_destroy(o.opts) - o.opts = nil + C.rocksdb_readoptions_destroy(o.c) + o.c = nil } diff --git a/rocksdb/options_write.go b/rocksdb/options_write.go index 1a8aecd9f..fdbe46500 100644 --- a/rocksdb/options_write.go +++ b/rocksdb/options_write.go @@ -21,7 +21,7 @@ import "C" // WriteOptions represent all options available when writing to a database. type WriteOptions struct { - opts *C.rocksdb_writeoptions_t + c *C.rocksdb_writeoptions_t } // NewDefaultWriteOptions creates a default WriteOptions object. @@ -34,11 +34,19 @@ func NewDefaultWriteOptions() *WriteOptions { // and the write may got lost after a crash. // Default: false func (o *WriteOptions) SetDisableWAL(value bool) { - C.rocksdb_writeoptions_disable_WAL(o.opts, C.int(btoi(value))) + C.rocksdb_writeoptions_disable_WAL(o.c, C.int(btoi(value))) +} + +// SetSync sets the sync mode. If true, the write will be flushed +// from the operating system buffer cache before the write is considered complete. +// If this flag is true, writes will be slower. +// Default: false +func (o *WriteOptions) SetSync(value bool) { + C.rocksdb_writeoptions_set_sync(o.c, boolToUchar(value)) } // Destroy deallocates the WriteOptions object. func (o *WriteOptions) Destroy() { - C.rocksdb_writeoptions_destroy(o.opts) - o.opts = nil + C.rocksdb_writeoptions_destroy(o.c) + o.c = nil } diff --git a/rocksdb/test_util.go b/rocksdb/test_util.go index 7f164f8c7..48b37528d 100644 --- a/rocksdb/test_util.go +++ b/rocksdb/test_util.go @@ -24,7 +24,7 @@ import ( ) func newTestDB(t *testing.T, name string, applyOpts func(opts *Options)) *DB { - dir, err := ioutil.TempDir("", "rocksdb-"+name) + path, err := ioutil.TempDir("", "rocksdb-"+name) require.NoError(t, err) opts := NewDefaultOptions() @@ -33,8 +33,29 @@ func newTestDB(t *testing.T, name string, applyOpts func(opts *Options)) *DB { applyOpts(opts) } - db, err := OpenDB(dir, opts) + db, err := OpenDB(path, opts) require.NoError(t, err) return db } + +func newTestDBCF(t *testing.T, name string) (db *DB, cfh []*ColumnFamilyHandle, cleanup func()) { + path, err := ioutil.TempDir("", "rocksdb-"+name) + require.NoError(t, err) + + givenNames := []string{"default", "other"} + opts := NewDefaultOptions() + opts.SetCreateIfMissingColumnFamilies(true) + opts.SetCreateIfMissing(true) + + db, cfh, err = OpenDBColumnFamilies(path, opts, givenNames, []*Options{opts, opts}) + require.NoError(t, err) + + cleanup = func() { + for _, cf := range cfh { + cf.Destroy() + } + db.Close() + } + return db, cfh, cleanup +} diff --git a/rocksdb/write_batch.go b/rocksdb/write_batch.go index 081bec54c..56ac8e711 100644 --- a/rocksdb/write_batch.go +++ b/rocksdb/write_batch.go @@ -31,25 +31,55 @@ import "C" // batch.Put("key", "v3"); // type WriteBatch struct { - batch *C.rocksdb_writebatch_t + c *C.rocksdb_writebatch_t } // NewWriteBatch create a WriteBatch object. func NewWriteBatch() *WriteBatch { - return &WriteBatch{batch: C.rocksdb_writebatch_create()} + return &WriteBatch{c: C.rocksdb_writebatch_create()} } // Put stores the mapping "key->value" in the database. func (wb *WriteBatch) Put(key, value []byte) { cKey := bytesToChar(key) cValue := bytesToChar(value) - C.rocksdb_writebatch_put(wb.batch, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) + C.rocksdb_writebatch_put(wb.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) +} + +// PutCF stores a mapping "key->value" in a column family. +func (wb *WriteBatch) PutCF(cf *ColumnFamilyHandle, key, value []byte) { + cKey := bytesToChar(key) + cValue := bytesToChar(value) + C.rocksdb_writebatch_put_cf(wb.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) } // Delete erases the mapping for "key" if it exists. Else, do nothing. func (wb *WriteBatch) Delete(key []byte) { cKey := bytesToChar(key) - C.rocksdb_writebatch_delete(wb.batch, cKey, C.size_t(len(key))) + C.rocksdb_writebatch_delete(wb.c, cKey, C.size_t(len(key))) +} + +// DeleteCF erases the mapping for "key", in a column family, if it exists. +// Else, do nothing. +func (wb *WriteBatch) DeleteCF(cf *ColumnFamilyHandle, key []byte) { + cKey := bytesToChar(key) + C.rocksdb_writebatch_delete_cf(wb.c, cf.c, cKey, C.size_t(len(key))) +} + +// DeleteRange erases all mappings in the range ["beginKey", "endKey") +// if the database contains them. Else do nothing. +func (wb *WriteBatch) DeleteRange(beginKey, endKey []byte) { + cBeginKey := bytesToChar(beginKey) + cEndKey := bytesToChar(endKey) + C.rocksdb_writebatch_delete_range(wb.c, cBeginKey, C.size_t(len(beginKey)), cEndKey, C.size_t(len(endKey))) +} + +// DeleteRangeCF erases all mappings in the range ["beginKey", "endKey") +// on the given column family if the database contains them. Else do nothing. +func (wb *WriteBatch) DeleteRangeCF(cf *ColumnFamilyHandle, beginKey, endKey []byte) { + cBeginKey := bytesToChar(beginKey) + cEndKey := bytesToChar(endKey) + C.rocksdb_writebatch_delete_range_cf(wb.c, cf.c, cBeginKey, C.size_t(len(beginKey)), cEndKey, C.size_t(len(endKey))) } // WriteBatch implementation of DeleteRange() // TODO @@ -59,21 +89,29 @@ func (wb *WriteBatch) Delete(key []byte) { func (wb *WriteBatch) Merge(key, value []byte) { cKey := bytesToChar(key) cValue := bytesToChar(value) - C.rocksdb_writebatch_merge(wb.batch, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) + C.rocksdb_writebatch_merge(wb.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) +} + +// MergeCF "value" with the existing value of "key" in a column family. +// "key->merge(existing, value)" +func (wb *WriteBatch) MergeCF(cf *ColumnFamilyHandle, key, value []byte) { + cKey := bytesToChar(key) + cValue := bytesToChar(value) + C.rocksdb_writebatch_merge_cf(wb.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) } // Clear all updates buffered in this batch. func (wb *WriteBatch) Clear() { - C.rocksdb_writebatch_clear(wb.batch) + C.rocksdb_writebatch_clear(wb.c) } // Count returns the number of updates in the batch. func (wb *WriteBatch) Count() int { - return int(C.rocksdb_writebatch_count(wb.batch)) + return int(C.rocksdb_writebatch_count(wb.c)) } // Destroy deallocates the WriteBatch object. func (wb *WriteBatch) Destroy() { - C.rocksdb_writebatch_destroy(wb.batch) - wb.batch = nil + C.rocksdb_writebatch_destroy(wb.c) + wb.c = nil } diff --git a/rocksdb/write_batch_test.go b/rocksdb/write_batch_test.go index d9d4e3b99..32ae088c3 100644 --- a/rocksdb/write_batch_test.go +++ b/rocksdb/write_batch_test.go @@ -57,3 +57,62 @@ func TestWriteBatch(t *testing.T) { require.Nil(t, v2.Data()) } + +func TestDeleteRange(t *testing.T) { + + db := newTestDB(t, "TestDeleteRange", nil) + defer db.Close() + + wo := NewDefaultWriteOptions() + defer wo.Destroy() + ro := NewDefaultReadOptions() + defer ro.Destroy() + + var ( + key1 = []byte("key1") + key2 = []byte("key2") + key3 = []byte("key3") + key4 = []byte("key4") + val1 = []byte("value") + val2 = []byte("12345678") + val3 = []byte("abcdefg") + val4 = []byte("xyz") + ) + + require.NoError(t, db.Put(wo, key1, val1)) + require.NoError(t, db.Put(wo, key2, val2)) + require.NoError(t, db.Put(wo, key3, val3)) + require.NoError(t, db.Put(wo, key4, val4)) + + actualVal1, err := db.GetBytes(ro, key1) + require.NoError(t, err) + require.Equal(t, actualVal1, val1) + actualVal2, err := db.GetBytes(ro, key2) + require.NoError(t, err) + require.Equal(t, actualVal2, val2) + actualVal3, err := db.GetBytes(ro, key3) + require.NoError(t, err) + require.Equal(t, actualVal3, val3) + actualVal4, err := db.GetBytes(ro, key4) + require.NoError(t, err) + require.Equal(t, actualVal4, val4) + + batch := NewWriteBatch() + defer batch.Destroy() + batch.DeleteRange(key2, key4) + db.Write(wo, batch) + + actualVal1, err = db.GetBytes(ro, key1) + require.NoError(t, err) + require.Equal(t, actualVal1, val1) + actualVal2, err = db.GetBytes(ro, key2) + require.NoError(t, err) + require.Nil(t, actualVal2) + actualVal3, err = db.GetBytes(ro, key3) + require.NoError(t, err) + require.Nil(t, actualVal3) + actualVal4, err = db.GetBytes(ro, key4) + require.NoError(t, err) + require.Equal(t, actualVal4, val4) + +} From 53bd75192618c340f52ba25871101b11d460d7f9 Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Tue, 19 Mar 2019 13:13:20 +0100 Subject: [PATCH 2/3] Integrate new raft-rocksdb store Former-commit-id: ca8fdad159d4ca3bcfae05033f26118b2efd7a84 --- api/apihttp/apihttp_test.go | 6 ++--- raftwal/fsm_test.go | 10 ++++---- raftwal/raft.go | 40 ++++++++++++++---------------- raftwal/raft_test.go | 19 +++++++------- raftwal/raftrocks/rocksdb_store.go | 9 +++---- server/server.go | 2 +- 6 files changed, 39 insertions(+), 47 deletions(-) diff --git a/api/apihttp/apihttp_test.go b/api/apihttp/apihttp_test.go index 7f828bea0..a4b94cf96 100644 --- a/api/apihttp/apihttp_test.go +++ b/api/apihttp/apihttp_test.go @@ -368,10 +368,10 @@ func BenchmarkAuth(b *testing.B) { } func newNodeBench(b *testing.B, id int) (*raftwal.RaftBalloon, func()) { - badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id) + rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id) - os.MkdirAll(badgerPath, os.FileMode(0755)) - rocks, closeF := storage_utils.OpenRocksDBStore(b, badgerPath) + os.MkdirAll(rocksdbPath, os.FileMode(0755)) + rocks, closeF := storage_utils.OpenRocksDBStore(b, rocksdbPath) raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id) os.MkdirAll(raftPath, os.FileMode(0755)) diff --git a/raftwal/fsm_test.go b/raftwal/fsm_test.go index ff621831d..b3e6407d5 100644 --- a/raftwal/fsm_test.go +++ b/raftwal/fsm_test.go @@ -14,7 +14,7 @@ import ( ) func TestApply(t *testing.T) { - store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db") + store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db") defer closeF() fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) @@ -39,7 +39,7 @@ func TestApply(t *testing.T) { } func TestSnapshot(t *testing.T) { - store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db") + store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db") defer closeF() fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) @@ -63,7 +63,7 @@ func (f *fakeRC) Close() error { } func TestRestore(t *testing.T) { - store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db") + store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db") defer closeF() fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) @@ -73,7 +73,7 @@ func TestRestore(t *testing.T) { } func TestAddAndRestoreSnapshot(t *testing.T) { - store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db") + store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db") defer closeF() fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) @@ -104,7 +104,7 @@ func TestAddAndRestoreSnapshot(t *testing.T) { snaps, _ := snap.List() _, r, _ := snap.Open(snaps[0].ID) - store2, close2F := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.2.db") + store2, close2F := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.2.db") defer close2F() // New FSMStore diff --git a/raftwal/raft.go b/raftwal/raft.go index 0d942939e..37e8f0040 100644 --- a/raftwal/raft.go +++ b/raftwal/raft.go @@ -28,8 +28,8 @@ import ( "github.com/bbva/qed/log" "github.com/bbva/qed/protocol" "github.com/bbva/qed/raftwal/commands" + "github.com/bbva/qed/raftwal/raftrocks" "github.com/bbva/qed/storage" - raftbadger "github.com/bbva/raft-badger" "github.com/hashicorp/raft" ) @@ -75,10 +75,10 @@ type RaftBalloon struct { } store struct { - db storage.ManagedStore // Persistent database - log raft.LogStore // Persistent log store - badgerLog *raftbadger.BadgerStore // Underlying badger-backed persistent log store - stable *raftbadger.BadgerStore // Persistent k-v store + db storage.ManagedStore // Persistent database + log raft.LogStore // Persistent log store + rocksStore *raftrocks.RocksDBStore // Underlying rocksdb-backed persistent log store + //stable *raftrocks.RocksDBStore // Persistent k-v store snapshots *raft.FileSnapshotStore // Persistent snapstop store } @@ -91,23 +91,23 @@ type RaftBalloon struct { } -// New returns a new RaftBalloon. +// NewRaftBalloon returns a new RaftBalloon. func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQueue chan *protocol.Snapshot) (*RaftBalloon, error) { // Create the log store and stable store - badgerLogStore, err := raftbadger.New(raftbadger.Options{Path: path + "/logs", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/logs") + rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true}) if err != nil { - return nil, fmt.Errorf("new badger store: %s", err) + return nil, fmt.Errorf("cannot create a new rocksdb log store: %s", err) } - logStore, err := raft.NewLogCache(raftLogCacheSize, badgerLogStore) + logStore, err := raft.NewLogCache(raftLogCacheSize, rocksStore) if err != nil { - return nil, fmt.Errorf("new cached store: %s", err) + return nil, fmt.Errorf("cannot create a new cached store: %s", err) } - stableStore, err := raftbadger.New(raftbadger.Options{Path: path + "/config", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/config") - if err != nil { - return nil, fmt.Errorf("new badger store: %s", err) - } + // stableStore, err := raftrocks.New(raftrocks.Options{Path: path + "/config", NoSync: true}) + // if err != nil { + // return nil, fmt.Errorf("cannot create a new rocksdb stable store: %s", err) + // } // Instantiate balloon FSM fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, agentsQueue) @@ -125,8 +125,7 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQue rb.store.db = store rb.store.log = logStore - rb.store.stable = stableStore - rb.store.badgerLog = badgerLogStore + rb.store.rocksStore = rocksStore return rb, nil } @@ -168,7 +167,7 @@ func (b *RaftBalloon) Open(bootstrap bool, metadata map[string]string) error { } // Instantiate the Raft system - b.raft.api, err = raft.NewRaft(b.raft.config, b.fsm, b.store.log, b.store.stable, b.store.snapshots, b.raft.transport) + b.raft.api, err = raft.NewRaft(b.raft.config, b.fsm, b.store.log, b.store.rocksStore, b.store.snapshots, b.raft.transport) if err != nil { return fmt.Errorf("new raft: %s", err) } @@ -226,15 +225,12 @@ func (b *RaftBalloon) Close(wait bool) error { } // close raft store - if err := b.store.badgerLog.Close(); err != nil { - return err - } - if err := b.store.stable.Close(); err != nil { + if err := b.store.rocksStore.Close(); err != nil { return err } + b.store.rocksStore = nil b.store.log = nil - b.store.stable = nil // Close FSM b.fsm.Close() diff --git a/raftwal/raft_test.go b/raftwal/raft_test.go index f3e1b6828..f3cd4b91e 100644 --- a/raftwal/raft_test.go +++ b/raftwal/raft_test.go @@ -30,7 +30,7 @@ import ( "github.com/bbva/qed/protocol" "github.com/bbva/qed/log" - "github.com/bbva/qed/storage/badger" + "github.com/bbva/qed/storage/rocks" utilrand "github.com/bbva/qed/testutils/rand" "github.com/hashicorp/raft" "github.com/stretchr/testify/require" @@ -46,21 +46,20 @@ func raftAddr(id int) string { } func newNode(t *testing.T, id int) (*RaftBalloon, func()) { - badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id) + dbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/db", id) - err := os.MkdirAll(badgerPath, os.FileMode(0755)) + err := os.MkdirAll(dbPath, os.FileMode(0755)) require.NoError(t, err) - badger, err := badger.NewBadgerStore(badgerPath) + db, err := rocks.NewRocksDBStore(dbPath) require.NoError(t, err) raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id) err = os.MkdirAll(raftPath, os.FileMode(0755)) require.NoError(t, err) - r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot, 25000)) + r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), db, make(chan *protocol.Snapshot, 25000)) require.NoError(t, err) return r, func() { - fmt.Println("Removing node folder") os.RemoveAll(fmt.Sprintf("/var/tmp/raft-test/node%d", id)) } } @@ -450,17 +449,17 @@ func mustTempDir() string { } func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) { - badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id) + rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id) - err := os.MkdirAll(badgerPath, os.FileMode(0755)) + err := os.MkdirAll(rocksdbPath, os.FileMode(0755)) require.NoError(b, err) - badger, err := badger.NewBadgerStore(badgerPath) + rocksdb, err := rocks.NewRocksDBStore(rocksdbPath) require.NoError(b, err) raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id) err = os.MkdirAll(raftPath, os.FileMode(0755)) require.NoError(b, err) - r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot, 100)) + r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 100)) require.NoError(b, err) return r, func() { diff --git a/raftwal/raftrocks/rocksdb_store.go b/raftwal/raftrocks/rocksdb_store.go index 7b4a5013a..c51e97151 100644 --- a/raftwal/raftrocks/rocksdb_store.go +++ b/raftwal/raftrocks/rocksdb_store.go @@ -28,7 +28,7 @@ import ( var ( // ErrKeyNotFound is an error indicating a given key does not exist - ErrKeyNotFound = errors.New("key not found") + ErrKeyNotFound = errors.New("not found") ) const ( @@ -66,10 +66,7 @@ type RocksDBStore struct { type Options struct { // Path is the directory path to the RocksDB instance to use. Path string - // TODO decide if we should use a diferent directory for the Rocks WAl - - // BadgerOptions contains any specific RocksDB options you might - // want to specify. + // TODO decide if we should use a diferent directory for the Rocks WAL // NoSync causes the database to skip fsync calls after each // write to the log. This is unsafe, so it should be used @@ -308,7 +305,7 @@ func (s *RocksDBStore) SetUint64(key []byte, val uint64) error { } // GetUint64 is like Get, but handles uint64 values -func (s *RocksDBStore) GetUint64(key []byte) (uint64, error) { // TODO use GEt instead getBytes +func (s *RocksDBStore) GetUint64(key []byte) (uint64, error) { val, err := s.Get(key) if err != nil { return 0, err diff --git a/server/server.go b/server/server.go index f5d30824a..e864fb68f 100644 --- a/server/server.go +++ b/server/server.go @@ -111,7 +111,7 @@ func NewServer(conf *Config) (*Server, error) { return nil, err } - // Open badger store + // Open RocksDB store store, err := rocks.NewRocksDBStore(conf.DBPath) if err != nil { return nil, err From 9ce6063fb3cd6f5d1c65086480ecd3bb68e9b536 Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Tue, 19 Mar 2019 13:16:06 +0100 Subject: [PATCH 3/3] Fix copyright year in headers Former-commit-id: 73caf94199236277cc447454a214b849d54e637a --- rocksdb/cache.go | 2 +- rocksdb/cf_handle.go | 2 +- rocksdb/cf_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rocksdb/cache.go b/rocksdb/cache.go index ea38a6b83..02f48949a 100644 --- a/rocksdb/cache.go +++ b/rocksdb/cache.go @@ -1,5 +1,5 @@ /* - Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/rocksdb/cf_handle.go b/rocksdb/cf_handle.go index 9b2f676b7..7af48606e 100644 --- a/rocksdb/cf_handle.go +++ b/rocksdb/cf_handle.go @@ -1,5 +1,5 @@ /* - Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/rocksdb/cf_test.go b/rocksdb/cf_test.go index 24be5a108..627acb3e1 100644 --- a/rocksdb/cf_test.go +++ b/rocksdb/cf_test.go @@ -1,5 +1,5 @@ /* - Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.