Skip to content

Commit

Permalink
Tune rocksdb options for every column family
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Mar 29, 2019
1 parent 4bd229f commit 43ba855
Show file tree
Hide file tree
Showing 11 changed files with 600 additions and 78 deletions.
4 changes: 2 additions & 2 deletions balloon/hyper/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func BenchmarkAdd(b *testing.B) {
defer closeF()

hasher := hashing.NewSha256Hasher()
freeCache := cache.NewFreeCache(2000 * (1 << 20))
freeCache := cache.NewFreeCache(CacheSize)

tree := NewHyperTree(hashing.NewSha256Hasher, store, freeCache)

Expand All @@ -303,7 +303,7 @@ func BenchmarkAdd(b *testing.B) {
go http.ListenAndServe(":2112", nil)

b.ResetTimer()
b.N = 200000000
b.N = 10000000
for i := 0; i < b.N; i++ {
index := make([]byte, 8)
binary.LittleEndian.PutUint64(index, uint64(i))
Expand Down
2 changes: 1 addition & 1 deletion raftwal/raftrocks/rocksdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func New(options Options) (*RocksDBStore, error) {
// log store options
logBbto := rocksdb.NewDefaultBlockBasedTableOptions()
logBbto.SetBlockSize(32 * 1024)
logCache := rocksdb.NewLRUCache(512 * 1024 * 1024)
logCache := rocksdb.NewDefaultLRUCache(512 * 1024 * 1024)
logBbto.SetBlockCache(logCache)
logOpts := rocksdb.NewDefaultOptions()
logOpts.SetUseFsync(!options.NoSync)
Expand Down
29 changes: 26 additions & 3 deletions rocksdb/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,44 @@

package rocksdb

// #include <rocksdb/c.h>
// #include "rocksdb/c.h"
// #include "extended.h"
import "C"

// Cache is a cache used to store data read from data in memory.
type Cache struct {
c *C.rocksdb_cache_t
}

// NewLRUCache creates a new LRU Cache object with the given capacity.
func NewLRUCache(capacity int) *Cache {
// NewDefaultLRUCache create a new LRU cache with a fixed size capacity.
// num_shard_bits = -1 means it is automatically determined: every shard
// will be at least 512KB and number of shard bits will not exceed 6.
// strict_capacity_limit = false
// high_pri_pool_ration = 0.0
func NewDefaultLRUCache(capacity int) *Cache {
return &Cache{
c: C.rocksdb_cache_create_lru(C.size_t(capacity)),
}
}

// NewLRUCache creates a new LRU cache with a fixed size capacity
// and high priority pool ration. The cache is sharded
// to 2^num_shard_bits shards, by hash of the key. The total capacity
// is divided and evenly assigned to each shard. If strict_capacity_limit
// is set, insert to the cache will fail when cache is full. User can also
// set percentage of the cache reserves for high priority entries via
// high_pri_pool_pct.
// num_shard_bits = -1 means it is automatically determined: every shard
// will be at least 512KB and number of shard bits will not exceed 6.
func NewLRUCache(capacity int, highPriorityPoolRatio float64) *Cache {
return &Cache{
c: C.rocksdb_cache_create_lru_with_ratio(
C.size_t(capacity),
C.double(highPriorityPoolRatio),
),
}
}

// GetUsage returns the Cache memory usage.
func (c *Cache) GetUsage() int {
return int(C.rocksdb_cache_get_usage(c.c))
Expand Down
52 changes: 52 additions & 0 deletions rocksdb/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rocksdb

// #include "rocksdb/c.h"
import "C"

// Env is an environment used to route through all file operations and
// some system calls.
type Env struct {
c *C.rocksdb_env_t
}

// NewDefaultEnv creates a default environment.
func NewDefaultEnv() *Env {
return &Env{C.rocksdb_create_default_env()}
}

// SetBackgroundThreads sets the number of background worker threads
// of a specific thread pool for this environment.
// 'LOW' is the default pool.
// Default: 1
func (e *Env) SetBackgroundThreads(n int) {
C.rocksdb_env_set_background_threads(e.c, C.int(n))
}

// SetHighPriorityBackgroundThreads sets the size of the high priority
// thread pool that can be used to prevent compactions from stalling
// memtable flushes.
func (e *Env) SetHighPriorityBackgroundThreads(n int) {
C.rocksdb_env_set_high_priority_background_threads(e.c, C.int(n))
}

// Destroy deallocates the Env object.
func (e *Env) Destroy() {
C.rocksdb_env_destroy(e.c)
e.c = nil
}
10 changes: 10 additions & 0 deletions rocksdb/extended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,29 @@ using rocksdb::Statistics;
using rocksdb::HistogramData;
using rocksdb::StatsLevel;
using rocksdb::Options;
using rocksdb::Cache;
using rocksdb::NewLRUCache;
using std::shared_ptr;

extern "C" {

struct rocksdb_statistics_t { std::shared_ptr<Statistics> rep; };
struct rocksdb_histogram_data_t { rocksdb::HistogramData* rep; };
struct rocksdb_options_t { Options rep; };
struct rocksdb_cache_t { std::shared_ptr<Cache> rep; };

void rocksdb_options_set_atomic_flush(
rocksdb_options_t* opts, unsigned char value) {
opts->rep.atomic_flush = value;
}

rocksdb_cache_t* rocksdb_cache_create_lru_with_ratio(
size_t capacity, double hi_pri_pool_ratio) {
rocksdb_cache_t* c = new rocksdb_cache_t;
c->rep = NewLRUCache(capacity, -1, false, hi_pri_pool_ratio);
return c;
}

rocksdb_statistics_t* rocksdb_create_statistics() {
rocksdb_statistics_t* result = new rocksdb_statistics_t;
result->rep = rocksdb::CreateDBStatistics();
Expand Down
5 changes: 5 additions & 0 deletions rocksdb/extended.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ extern void rocksdb_options_set_statistics(
rocksdb_options_t* opts,
rocksdb_statistics_t* stats);

/* Cache */

extern rocksdb_cache_t* rocksdb_cache_create_lru_with_ratio(
size_t capacity, double hi_pri_pool_ratio);

/* Statistics */

typedef enum {
Expand Down
22 changes: 19 additions & 3 deletions rocksdb/filter_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ type FilterPolicy struct {
policy *C.rocksdb_filterpolicy_t
}

// NewBloomFilterPolicy returns a new filter policy that uses a bloom filter
// with approximately the specified number of bits per key. A good value for
// bits_per_key is 10, which yields a filter with ~1% false positive rate.
// NewBloomFilterPolicy returns a new filter policy that uses a block-based
// bloom filter with approximately the specified number of bits per key.
// A good value for bits_per_key is 10, which yields a filter with ~1% false
// positive rate.
//
// Note: if you are using a custom comparator that ignores some parts
// of the keys being compared, you must not use NewBloomFilterPolicy()
Expand All @@ -37,3 +38,18 @@ type FilterPolicy struct {
func NewBloomFilterPolicy(bitsPerKey int) *FilterPolicy {
return &FilterPolicy{C.rocksdb_filterpolicy_create_bloom(C.int(bitsPerKey))}
}

// NewFullBloomFilterPolicy returns a new filter policy that uses a full bloom filter
// for the entire SST file, with approximately the specified number of bits per key.
// A good value for bits_per_key is 10, which yields a filter with ~1% false positive rate.
//
// Note: if you are using a custom comparator that ignores some parts
// of the keys being compared, you must not use NewBloomFilterPolicy()
// and must provide your own FilterPolicy that also ignores the
// corresponding parts of the keys. For example, if the comparator
// ignores trailing spaces, it would be incorrect to use a
// FilterPolicy (like NewBloomFilterPolicy) that does not ignore
// trailing spaces in keys.
func NewFullBloomFilterPolicy(bitsPerKey int) *FilterPolicy {
return &FilterPolicy{C.rocksdb_filterpolicy_create_bloom_full(C.int(bitsPerKey))}
}
13 changes: 13 additions & 0 deletions rocksdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Options struct {
c *C.rocksdb_options_t

// Hold references for GC.
env *Env
bbto *BlockBasedTableOptions
}

Expand All @@ -55,6 +56,14 @@ func (o *Options) SetCreateIfMissing(value bool) {
C.rocksdb_options_set_create_if_missing(o.c, boolToUchar(value))
}

// SetEnv sets the specified object to interact with the environment,
// e.g. to read/write files, schedule background work, etc.
// Default: DefaultEnv
func (o *Options) SetEnv(value *Env) {
o.env = value
C.rocksdb_options_set_env(o.c, value.c)
}

// IncreaseParallelism sets the level of parallelism.
//
// By default, RocksDB uses only one background thread for flush and
Expand Down Expand Up @@ -395,9 +404,13 @@ func (o *Options) SetStatistics(s *Statistics) {
// Destroy deallocates the Options object.
func (o *Options) Destroy() {
C.rocksdb_options_destroy(o.c)
if o.env != nil {
o.env.Destroy()
}
if o.bbto != nil {
o.bbto.Destroy()
}
o.c = nil
o.env = nil
o.bbto = nil
}
107 changes: 106 additions & 1 deletion rocksdb/options_block_based_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,28 @@ package rocksdb
// #include <rocksdb/c.h>
import "C"

// IndexType specifies the index type that will be used for this table.
type IndexType uint

const (
// KBinarySearchIndexType is a space efficient index block that is optimized for
// binary-search-based index.
KBinarySearchIndexType IndexType = iota
// KHashSearchIndexType is the hash index, if enabled, will do the hash lookup when
// `Options.prefix_extractor` is provided.
KHashSearchIndexType
// KTwoLevelIndexSearchIndexType is a two-level index implementation. Both
// levels are binary search indexes.
KTwoLevelIndexSearchIndexType
)

// BlockBasedTableOptions represents block-based table options.
type BlockBasedTableOptions struct {
c *C.rocksdb_block_based_table_options_t

// Hold references for GC.
cache *Cache
cache *Cache
cacheComp *Cache

// We keep these so we can free their memory in Destroy.
fp *C.rocksdb_filterpolicy_t
Expand All @@ -40,6 +56,8 @@ func (o *BlockBasedTableOptions) Destroy() {
//C.rocksdb_filterpolicy_destroy(o.fp)
C.rocksdb_block_based_options_destroy(o.c)
o.c = nil
o.cache = nil
o.cacheComp = nil
o.fp = nil
}

Expand All @@ -51,6 +69,42 @@ func (o *BlockBasedTableOptions) SetCacheIndexAndFilterBlocks(value bool) {
C.rocksdb_block_based_options_set_cache_index_and_filter_blocks(o.c, boolToUchar(value))
}

// SetPinL0FilterAndIndexBlocksInCache sets cache_index_and_filter_blocks.
// If is true and the below is true (hash_index_allow_collision), then
// filter and index blocks are stored in the cache, but a reference is
// held in the "table reader" object so the blocks are pinned and only
// evicted from cache when the table reader is freed.
// Default: false
func (o *BlockBasedTableOptions) SetPinL0FilterAndIndexBlocksInCache(value bool) {
C.rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(o.c, boolToUchar(value))
}

// SetPinTopLevelIndexAndFilterInCache pins top-level indexes.
// Default: false
func (o *BlockBasedTableOptions) SetPinTopLevelIndexAndFilterInCache(value bool) {
C.rocksdb_block_based_options_set_pin_top_level_index_and_filter(o.c, boolToUchar(value))
}

// SetCacheIndexAndFilterBlocksWithHighPriority priority to high for index and filter blocks
// in block cache. It only affect LRUCache so far, and need to use together with
// high_pri_pool_ratio when calling NewLRUCache(). If the feature is enabled, LRU-list in LRU
// cache will be split into two parts, one for high-pri blocks and one for low-pri blocks.
// Data blocks will be inserted to the head of low-pri pool. Index and filter blocks will be
// inserted to the head of high-pri pool. If the total usage in the high-pri pool exceed
// capacity * high_pri_pool_ratio, the block at the tail of high-pri pool will overflow to the
// head of low-pri pool, after which it will compete against data blocks to stay in cache.
// Eviction will start from the tail of low-pri pool.
func (o *BlockBasedTableOptions) SetCacheIndexAndFilterBlocksWithHighPriority(value bool) {
C.rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority(o.c, boolToUchar(value))
}

// SetHashIndexAllowCollision when enabled, prefix hash index for block-based table
// will not store prefix and allow hash collision, reducing memory consumption.
// Default false
func (o *BlockBasedTableOptions) SetHashIndexAllowCollision(value bool) {
C.rocksdb_block_based_options_set_hash_index_allow_collision(o.c, boolToUchar(value))
}

// SetBlockSize sets the approximate size of user data packed per block.
// Note that the block size specified here corresponds to uncompressed data.
// The actual size of the unit read from disk may be smaller if
Expand All @@ -60,6 +114,25 @@ func (o *BlockBasedTableOptions) SetBlockSize(blockSize int) {
C.rocksdb_block_based_options_set_block_size(o.c, C.size_t(blockSize))
}

// SetPartitionFilters enables partition index filters.
// With partitioning, the index/filter of a SST file is partitioned into smaller
// blocks with an additional top-level index on them. When reading an index/filter,
// only top-level index is loaded into memory. The partitioned index/filter then uses
// the top-level index to load on demand into the block cache the partitions that are
// required to perform the index/filter query. The top-level index, which has much smaller
// memory footprint, can be stored in heap or block cache depending on the
// SetCacheIndexAndFilterBlocks setting.
// Default: false
func (o *BlockBasedTableOptions) SetPartitionFilters(value bool) {
C.rocksdb_block_based_options_set_partition_filters(o.c, boolToUchar(value))
}

// SetMetadataBlockSize sets the approximate size of the blocks for index partitions.
// Default: 4K
func (o *BlockBasedTableOptions) SetMetadataBlockSize(blockSize uint64) {
C.rocksdb_block_based_options_set_metadata_block_size(o.c, C.uint64_t(blockSize))
}

// SetBlockSizeDeviation sets the block size deviation.
// This is used to close a block before it reaches the configured
// 'block_size'. If the percentage of free space in the current block is less
Expand Down Expand Up @@ -96,3 +169,35 @@ func (o *BlockBasedTableOptions) SetBlockCache(cache *Cache) {
o.cache = cache
C.rocksdb_block_based_options_set_block_cache(o.c, cache.c)
}

// SetBlockCacheCompressed sets the cache for compressed blocks.
// If nil, rocksdb will not use a compressed block cache.
// Default: nil
func (o *BlockBasedTableOptions) SetBlockCacheCompressed(cache *Cache) {
o.cacheComp = cache
C.rocksdb_block_based_options_set_block_cache_compressed(o.c, cache.c)
}

// SetWholeKeyFiltering specify if whole keys in the filter (not just prefixes)
// should be placed.
// This must generally be true for gets opts be efficient.
// Default: true
func (o *BlockBasedTableOptions) SetWholeKeyFiltering(value bool) {
C.rocksdb_block_based_options_set_whole_key_filtering(o.c, boolToUchar(value))
}

// SetIndexType sets the index type used for this table.
// kBinarySearch:
// A space efficient index block that is optimized for
// binary-search-based index.
//
// kHashSearch:
// The hash index, if enabled, will do the hash lookup when
// `Options.prefix_extractor` is provided.
//
// kTwoLevelIndexSearch:
// A two-level index implementation. Both levels are binary search indexes.
// Default: kBinarySearch
func (o *BlockBasedTableOptions) SetIndexType(value IndexType) {
C.rocksdb_block_based_options_set_index_type(o.c, C.int(value))
}
Loading

0 comments on commit 43ba855

Please sign in to comment.