Skip to content

Commit

Permalink
Tune some rocksdb options per column family
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Mar 27, 2019
1 parent 248a2d7 commit 6e31394
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 79 deletions.
4 changes: 2 additions & 2 deletions raftwal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type BalloonFSM struct {

func loadState(s storage.ManagedStore) (*fsmState, error) {
var state fsmState
kvstate, err := s.Get(storage.FSMStateTable, []byte{0xab})
kvstate, err := s.Get(storage.FSMStateTable, storage.FSMStateTableKey)
if err == storage.ErrKeyNotFound {
log.Infof("Unable to find previous state: assuming a clean instance")
return &fsmState{0, 0, 0}, nil
Expand Down Expand Up @@ -255,7 +255,7 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse {
return &fsmAddResponse{error: err}
}

mutations = append(mutations, storage.NewMutation(storage.FSMStateTable, []byte{0xab}, stateBuff.Bytes()))
mutations = append(mutations, storage.NewMutation(storage.FSMStateTable, storage.FSMStateTableKey, stateBuff.Bytes()))
err = fsm.store.Mutate(mutations)
if err != nil {
return &fsmAddResponse{error: err}
Expand Down
65 changes: 65 additions & 0 deletions rocksdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ func (o *Options) SetMinWriteBufferNumberToMerge(value int) {
C.rocksdb_options_set_min_write_buffer_number_to_merge(o.c, C.int(value))
}

// SetMaxOpenFiles sets the number of open files that can be used by the DB.
//
// You may need to increase this if your database has a large working set
// (budget one open file per 2MB of working set).
// Default: 1000
func (o *Options) SetMaxOpenFiles(value int) {
C.rocksdb_options_set_max_open_files(o.c, C.int(value))
}

// SetMaxFileOpeningThreads sets the maximum number of file opening threads.
// If max_open_files is -1, DB will open all files on db.Open(). You can
// use this option to increase the number of threads used to open the files.
// Default: 16
func (o *Options) SetMaxFileOpeningThreads(value int) {
C.rocksdb_options_set_max_file_opening_threads(o.c, C.int(value))
}

// SetMaxTotalWalSize sets the maximum total wal size in bytes.
// Once write-ahead logs exceed this size, we will start forcing the flush of
// column families whose memtables are backed by the oldest live WAL file
// (i.e. the ones that are causing all the space amplification). If set to 0
// (default), we will dynamically choose the WAL size limit to be
// [sum of all write_buffer_size * max_write_buffer_number] * 4
// Default: 0
func (o *Options) SetMaxTotalWalSize(value uint64) {
C.rocksdb_options_set_max_total_wal_size(o.c, C.uint64_t(value))
}

// SetBlockBasedTableFactory sets the block based table factory.
func (o *Options) SetBlockBasedTableFactory(value *BlockBasedTableOptions) {
o.bbto = value
Expand Down Expand Up @@ -316,6 +344,43 @@ func (o *Options) SetMaxBackgroundFlushes(value int) {
C.rocksdb_options_set_max_background_flushes(o.c, C.int(value))
}

// SetMaxLogFileSize sets the maximal size of the info log file.
//
// If the log file is larger than `max_log_file_size`, a new info log
// file will be created.
// If max_log_file_size == 0, all logs will be written to one log file.
// Default: 0
func (o *Options) SetMaxLogFileSize(value int) {
C.rocksdb_options_set_max_log_file_size(o.c, C.size_t(value))
}

// SetLogFileTimeToRoll sets the time for the info log file to roll (in seconds).
//
// If specified with non-zero value, log file will be rolled
// if it has been active longer than `log_file_time_to_roll`.
// Default: 0 (disabled)
func (o *Options) SetLogFileTimeToRoll(value int) {
C.rocksdb_options_set_log_file_time_to_roll(o.c, C.size_t(value))
}

// SetKeepLogFileNum sets the maximal info log files to be kept.
// Default: 1000
func (o *Options) SetKeepLogFileNum(value int) {
C.rocksdb_options_set_keep_log_file_num(o.c, C.size_t(value))
}

// SetAllowMmapReads enable/disable mmap reads for reading sst tables.
// Default: false
func (o *Options) SetAllowMmapReads(value bool) {
C.rocksdb_options_set_allow_mmap_reads(o.c, boolToUchar(value))
}

// SetAllowMmapWrites enable/disable mmap writes for writing sst tables.
// Default: false
func (o *Options) SetAllowMmapWrites(value bool) {
C.rocksdb_options_set_allow_mmap_writes(o.c, boolToUchar(value))
}

// SetStatistics sets a statistics object to pass to the DB.
func (o *Options) SetStatistics(s *Statistics) {
C.rocksdb_options_set_statistics(o.c, s.c)
Expand Down
121 changes: 72 additions & 49 deletions storage/rocks/rocksdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,9 @@ type RocksDBStore struct {
cfHandles rocksdb.ColumnFamilyHandles

// global options
globalOpts *rocksdb.Options
defaultTableOpts *rocksdb.Options
indexTableOpts *rocksdb.Options
hyperCacheTableOpts *rocksdb.Options
historyCacheTableOpts *rocksdb.Options
fsmStateTableOpts *rocksdb.Options
globalOpts *rocksdb.Options
// per column family options
cfOpts []*rocksdb.Options

// read/write options
ro *rocksdb.ReadOptions
Expand Down Expand Up @@ -81,34 +78,22 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) {
globalOpts := rocksdb.NewDefaultOptions()
globalOpts.SetCreateIfMissing(true)
globalOpts.SetCreateIfMissingColumnFamilies(true)
globalOpts.IncreaseParallelism(4)
var stats *rocksdb.Statistics
if opts.EnableStatistics {
stats = rocksdb.NewStatistics()
globalOpts.SetStatistics(stats)
}

defaultTableOpts := rocksdb.NewDefaultOptions()
indexTableOpts := getIndexTableOpts()
hyperCacheTableOpts := getHyperCacheTableOpts()
historyCacheTableOpts := getHistoryCacheTableOpts()
fsmStateTableOpts := getFsmStateTableOpts()

// Per column family options
cfOpts := []*rocksdb.Options{
defaultTableOpts,
indexTableOpts,
hyperCacheTableOpts,
historyCacheTableOpts,
fsmStateTableOpts,
rocksdb.NewDefaultOptions(),
getIndexTableOpts(),
getHyperCacheTableOpts(),
getHistoryCacheTableOpts(),
getFsmStateTableOpts(),
}

// rocksdbOpts.IncreaseParallelism(4)
// rocksdbOpts.SetMaxWriteBufferNumber(5)
// rocksdbOpts.SetMinWriteBufferNumberToMerge(2)

// blockOpts := rocksdb.NewDefaultBlockBasedTableOptions()
// blockOpts.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10))
// rocksdbOpts.SetBlockBasedTableFactory(blockOpts)

db, cfHandles, err := rocksdb.OpenDBColumnFamilies(opts.Path, globalOpts, cfNames, cfOpts)
if err != nil {
return nil, err
Expand All @@ -121,19 +106,15 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) {
}

store := &RocksDBStore{
db: db,
stats: stats,
cfHandles: cfHandles,
checkPointPath: checkPointPath,
checkpoints: make(map[uint64]string),
globalOpts: globalOpts,
defaultTableOpts: defaultTableOpts,
indexTableOpts: indexTableOpts,
hyperCacheTableOpts: hyperCacheTableOpts,
historyCacheTableOpts: historyCacheTableOpts,
fsmStateTableOpts: fsmStateTableOpts,
wo: rocksdb.NewDefaultWriteOptions(),
ro: rocksdb.NewDefaultReadOptions(),
db: db,
stats: stats,
cfHandles: cfHandles,
checkPointPath: checkPointPath,
checkpoints: make(map[uint64]string),
globalOpts: globalOpts,
cfOpts: cfOpts,
wo: rocksdb.NewDefaultWriteOptions(),
ro: rocksdb.NewDefaultReadOptions(),
}

if rms == nil && stats != nil {
Expand All @@ -144,30 +125,79 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) {
}

func getIndexTableOpts() *rocksdb.Options {
// index table is append-only so we have to optimize for
// read amplification

// TODO change this!!!

bbto := rocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10))
opts := rocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCompression(rocksdb.SnappyCompression)
// in normal mode, by default, we try to minimize space amplification,
// so we set:
//
// L0 size = 64MBytes * 2 (min_write_buffer_number_to_merge) * \
// 8 (level0_file_num_compaction_trigger)
// = 1GBytes
// L1 size close to L0, 1GBytes, max_bytes_for_level_base = 1GBytes,
// max_bytes_for_level_multiplier = 2
// L2 size is 2G, L3 is 4G, L4 is 8G, L5 16G...
//
opts.SetWriteBufferSize(64 * 1024 * 1024)
opts.SetMaxWriteBufferNumber(5)
opts.SetMinWriteBufferNumberToMerge(2)
opts.SetLevel0FileNumCompactionTrigger(8)
// MaxBytesForLevelBase is the total size of L1, should be close to
// the size of L0
opts.SetMaxBytesForLevelBase(1 * 1024 * 1024 * 1024)
opts.SetMaxBytesForLevelMultiplier(2)
// files in L1 will have TargetFileSizeBase bytes
opts.SetTargetFileSizeBase(64 * 1024 * 1024)
opts.SetTargetFileSizeMultiplier(10)
// io parallelism
opts.SetMaxBackgroundCompactions(2)
opts.SetMaxBackgroundFlushes(2)
return opts
}

func getHyperCacheTableOpts() *rocksdb.Options {
bbto := rocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10))
opts := rocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCompression(rocksdb.SnappyCompression)
return opts
}

func getHistoryCacheTableOpts() *rocksdb.Options {
bbto := rocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10))
opts := rocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCompression(rocksdb.SnappyCompression)
return opts
}

func getFsmStateTableOpts() *rocksdb.Options {
// FSM state contains only one key that is updated on every
// add event operation. We should try to reduce write and
// space amplification by keeping a lower number of levels.
bbto := rocksdb.NewDefaultBlockBasedTableOptions()
opts := rocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCompression(rocksdb.SnappyCompression)
// we try to reduce write and space amplification, so we:
// * set a low size for the in-memory write buffers
// * reduce the number of write buffers
// * activate merging before flushing
// * set parallelism to 1
opts.SetWriteBufferSize(4 * 1024 * 1024)
opts.SetMaxWriteBufferNumber(3)
opts.SetMinWriteBufferNumberToMerge(2)
opts.SetMaxBackgroundCompactions(1)
opts.SetMaxBackgroundFlushes(1)
return opts
}

Expand Down Expand Up @@ -290,18 +320,11 @@ func (s *RocksDBStore) Close() error {
if s.globalOpts != nil {
s.globalOpts.Destroy()
}
if s.defaultTableOpts != nil {
s.defaultTableOpts.Destroy()
}
if s.indexTableOpts != nil {
s.indexTableOpts.Destroy()
}
if s.hyperCacheTableOpts != nil {
s.hyperCacheTableOpts.Destroy()
}
if s.fsmStateTableOpts != nil {
s.fsmStateTableOpts.Destroy()

for _, opt := range s.cfOpts {
opt.Destroy()
}

if s.ro != nil {
s.ro.Destroy()
}
Expand Down
Loading

0 comments on commit 6e31394

Please sign in to comment.