diff --git a/kvrocks.conf b/kvrocks.conf index e7009e44617..47822244c02 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -329,6 +329,13 @@ json-max-nesting-depth 1024 # Default: json json-storage-format json +# The write buffer size of minor column families those infrequently be used +# including pubsub, propagte, zset_score, stream and search. +# It will reduce the memory usage in some scenarios. +# +# Default: 64 MB (same with rocksdb.write_buffer_size) +minor-columns-write-buffer-size 64 + ################################## TLS ################################### # By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0. diff --git a/src/config/config.cc b/src/config/config.cc index 500ac717008..d3f16fda328 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -41,6 +41,7 @@ #include "server/server.h" #include "status.h" #include "storage/redis_metadata.h" +#include "storage/storage.h" constexpr const char *kDefaultDir = "/tmp/kvrocks"; constexpr const char *kDefaultBackupDir = "/tmp/kvrocks/backup"; @@ -189,6 +190,7 @@ Config::Config() { {"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth, 1024, 0, INT_MAX)}, {"json-storage-format", false, new EnumField(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)}, + {"minor-columns-write-buffer-size", false, new IntField(&minor_columns_write_buffer_size, 64, 0, 4096)}, /* rocksdb options */ {"rocksdb.compression", false, @@ -575,6 +577,19 @@ void Config::initFieldCallback() { if (!srv) return Status::OK(); return srv->GetNamespace()->LoadAndRewrite(); }}, + {"minor-columns-write-buffer-size", + [this](Server *srv, const std::string &k, const std::string &v) -> Status { + if (!srv) return Status::OK(); + const std::vector column_families = {kColumnFamilyIDZSetScore, kColumnFamilyIDPubSub, + kColumnFamilyIDPropagate, kColumnFamilyIDStream, + kColumnFamilyIDSearch}; + for (const auto &cf : column_families) { + auto s = srv->storage->SetOptionForColumnFamily(cf, "write_buffer_size", + std::to_string(minor_columns_write_buffer_size * MiB)); + if (!s.IsOK()) return s; + } + return Status::OK(); + }}, {"rocksdb.target_file_size_base", [this](Server *srv, const std::string &k, const std::string &v) -> Status { diff --git a/src/config/config.h b/src/config/config.h index c4bc705b006..5dd40b8cf8b 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -119,6 +119,7 @@ struct Config { bool auto_resize_block_and_sst = true; int fullsync_recv_file_delay = 0; bool use_rsid_psync = false; + int minor_columns_write_buffer_size; std::vector binds; std::string dir; std::string db_dir; diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 2133645c40c..6245dd569ad 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -37,6 +37,7 @@ #include #include "compact_filter.h" +#include "config/config.h" #include "db_util.h" #include "event_listener.h" #include "event_util.h" @@ -226,6 +227,12 @@ Status Storage::SetOptionForAllColumnFamilies(const std::string &key, const std: return Status::OK(); } +Status Storage::SetOptionForColumnFamily(ColumnFamilyID id, const std::string &key, const std::string &value) { + auto s = db_->SetOptions(GetCFHandle(id), {{key, value}}); + if (!s.ok()) return {Status::NotOK, s.ToString()}; + return Status::OK(); +} + Status Storage::SetDBOption(const std::string &key, const std::string &value) { auto s = db_->SetDBOptions({{key, value}}); if (!s.ok()) return {Status::NotOK, s.ToString()}; @@ -337,15 +344,20 @@ Status Storage::Open(DBOpenMode mode) { propagate_opts.disable_auto_compactions = config_->rocks_db.disable_auto_compactions; SetBlobDB(&propagate_opts); + rocksdb::ColumnFamilyOptions minor_opts(subkey_opts); + minor_opts.write_buffer_size = config_->minor_columns_write_buffer_size * MiB; + pubsub_opts.write_buffer_size = config_->minor_columns_write_buffer_size * MiB; + propagate_opts.write_buffer_size = config_->minor_columns_write_buffer_size * MiB; + std::vector column_families; // Caution: don't change the order of column family, or the handle will be mismatched column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts); column_families.emplace_back(kMetadataColumnFamilyName, metadata_opts); - column_families.emplace_back(kZSetScoreColumnFamilyName, subkey_opts); + column_families.emplace_back(kZSetScoreColumnFamilyName, minor_opts); column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts); column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts); - column_families.emplace_back(kStreamColumnFamilyName, subkey_opts); - column_families.emplace_back(kSearchColumnFamilyName, subkey_opts); + column_families.emplace_back(kStreamColumnFamilyName, minor_opts); + column_families.emplace_back(kSearchColumnFamilyName, minor_opts); std::vector old_column_families; auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, &old_column_families); diff --git a/src/storage/storage.h b/src/storage/storage.h index 44888f1ca4a..a7354f62c85 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -136,6 +136,7 @@ class Storage { void SetBlobDB(rocksdb::ColumnFamilyOptions *cf_options); rocksdb::Options InitRocksDBOptions(); Status SetOptionForAllColumnFamilies(const std::string &key, const std::string &value); + Status SetOptionForColumnFamily(ColumnFamilyID id, const std::string &key, const std::string &value); Status SetDBOption(const std::string &key, const std::string &value); Status CreateColumnFamilies(const rocksdb::Options &options); // The sequence_number will be pointed to the value of the sequence number in range of DB,