Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Cache into a Customizable Object #8998

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 58 additions & 32 deletions cache/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,49 @@

#include "rocksdb/cache.h"

#include "cache/clock_cache.h"
#include "cache/fast_lru_cache.h"
#include "cache/lru_cache.h"
#include "rocksdb/configurable.h"
#include "rocksdb/secondary_cache.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/options_type.h"
#include "util/string_util.h"

namespace ROCKSDB_NAMESPACE {
namespace {
#ifndef ROCKSDB_LITE
static std::unordered_map<std::string, OptionTypeInfo>
lru_cache_options_type_info = {
{"capacity",
{offsetof(struct LRUCacheOptions, capacity), OptionType::kSizeT,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
{"num_shard_bits",
{offsetof(struct LRUCacheOptions, num_shard_bits), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
{"strict_capacity_limit",
{offsetof(struct LRUCacheOptions, strict_capacity_limit),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"high_pri_pool_ratio",
{offsetof(struct LRUCacheOptions, high_pri_pool_ratio),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
};
static int RegisterBuiltinCache(ObjectLibrary& library,
const std::string& /*arg*/) {
library.AddFactory<Cache>(
lru_cache::LRUCache::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<Cache>* guard,
std::string* /* errmsg */) {
guard->reset(new lru_cache::LRUCache());
return guard->get();
});
library.AddFactory<Cache>(
fast_lru_cache::LRUCache::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<Cache>* guard,
std::string* /* errmsg */) {
guard->reset(new fast_lru_cache::LRUCache());
return guard->get();
});
library.AddFactory<Cache>(
ClockCache::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<Cache>* guard,
std::string* errmsg) {
std::unique_ptr<ClockCache> clock;
Status s = ClockCache::CreateClockCache(&clock);
if (!s.ok()) {
*errmsg = s.ToString();
}
guard->reset(clock.release());
return guard->get();
});
return 1;
}
#endif // ROCKSDB_LITE
} // namespace

Status SecondaryCache::CreateFromString(
const ConfigOptions& config_options, const std::string& value,
Expand All @@ -46,23 +63,32 @@ Status SecondaryCache::CreateFromString(
Status Cache::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
std::shared_ptr<Cache>* result) {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterBuiltinCache(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
Status status;
std::shared_ptr<Cache> cache;
if (value.find('=') == std::string::npos) {
cache = NewLRUCache(ParseSizeT(value));
} else {
#ifndef ROCKSDB_LITE
LRUCacheOptions cache_opts;
status = OptionTypeInfo::ParseStruct(config_options, "",
&lru_cache_options_type_info, "",
value, &cache_opts);
if (status.ok()) {
cache = NewLRUCache(cache_opts);
if (!value.empty()) {
std::string id;
std::unordered_map<std::string, std::string> opt_map;
status = Configurable::GetOptionsMap(value, LRUCache::kClassName(), &id,
&opt_map);
if (!status.ok()) {
return status;
} else if (opt_map.empty() && !id.empty() && isdigit(id.at(0))) {
// If there are no name=value options and the id is a digit, assume
// it is an old-style LRUCache created by capacity only
cache = NewLRUCache(ParseSizeT(id));
} else {
status = NewSharedObject<Cache>(config_options, id, opt_map, &cache);
if (status.ok() && !config_options.invoke_prepare_options) {
// Always invoke PrepareOptions for a cache...
status = cache->PrepareOptions(config_options);
}
}
#else
(void)config_options;
status = Status::NotSupported("Cannot load cache in LITE mode ", value);
#endif //! ROCKSDB_LITE
}
if (status.ok()) {
result->swap(cache);
Expand Down
27 changes: 21 additions & 6 deletions cache/cache_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ DEFINE_string(secondary_cache_uri, "",
"Full URI for creating a custom secondary cache object");
static class std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> secondary_cache;
#endif // ROCKSDB_LITE

DEFINE_string(cache_uri, "", "URI for creating a cache");
DEFINE_bool(use_clock_cache, false, "");

// ## BEGIN stress_cache_key sub-tool options ##
Expand Down Expand Up @@ -279,7 +279,19 @@ class CacheBench {
if (max_key > (static_cast<uint64_t>(1) << max_log_)) max_log_++;
}

if (FLAGS_use_clock_cache) {
ConfigOptions config_options;
config_options.ignore_unknown_options = false;
config_options.ignore_unsupported_options = false;

if (!FLAGS_cache_uri.empty()) {
Status s =
Cache::CreateFromString(config_options, FLAGS_cache_uri, &cache_);
if (!s.ok() || !cache_) {
fprintf(stderr, "No cache registered matching string: %s status=%s\n",
FLAGS_cache_uri.c_str(), s.ToString().c_str());
exit(1);
}
} else if (FLAGS_use_clock_cache) {
cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits);
if (!cache_) {
fprintf(stderr, "Clock cache not supported.\n");
Expand All @@ -290,7 +302,7 @@ class CacheBench {
#ifndef ROCKSDB_LITE
if (!FLAGS_secondary_cache_uri.empty()) {
Status s = SecondaryCache::CreateFromString(
ConfigOptions(), FLAGS_secondary_cache_uri, &secondary_cache);
config_options, FLAGS_secondary_cache_uri, &secondary_cache);
if (secondary_cache == nullptr) {
fprintf(
stderr,
Expand Down Expand Up @@ -582,9 +594,12 @@ class CacheBench {
printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion);
printf("Number of threads : %u\n", FLAGS_threads);
printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread);
printf("Cache size : %s\n",
BytesToHumanString(FLAGS_cache_size).c_str());
printf("Num shard bits : %u\n", FLAGS_num_shard_bits);
printf("Cache : %s\n", cache_->Name());
std::string cache_opts;
Status s = cache_->GetOptionString(ConfigOptions(), &cache_opts);
if (s.ok()) {
printf("Cache Options : %s\n", cache_opts.c_str());
}
printf("Max key : %" PRIu64 "\n", max_key_);
printf("Resident ratio : %g\n", FLAGS_resident_ratio);
printf("Skew degree : %u\n", FLAGS_skew);
Expand Down
146 changes: 139 additions & 7 deletions cache/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cache/clock_cache.h"
#include "cache/fast_lru_cache.h"
#include "cache/lru_cache.h"
#include "rocksdb/convenience.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -844,14 +845,145 @@ TEST_P(CacheTest, GetChargeAndDeleter) {
cache_->Release(h1);
}

#ifdef SUPPORT_CLOCK_CACHE
std::shared_ptr<Cache> (*new_clock_cache_func)(
size_t, int, bool, CacheMetadataChargePolicy) = NewClockCache;
INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest,
testing::Values(kLRU, kClock, kFast));
class CreateCacheTest : public testing::Test {
public:
CreateCacheTest() {
config_options_.ignore_unsupported_options = false;
config_options_.invoke_prepare_options = false;
}

protected:
ConfigOptions config_options_;
};

TEST_F(CreateCacheTest, LRUFromSize) {
std::shared_ptr<Cache> cache;
Options options;
ASSERT_OK(Cache::CreateFromString(config_options_, "1M", &cache));
ASSERT_NE(cache, nullptr);
ASSERT_STREQ(cache->Name(), LRUCache::kClassName());
ASSERT_EQ(cache->GetCapacity(), 1024U * 1024U);
ASSERT_OK(cache->ValidateOptions(options, options));
}

TEST_F(CreateCacheTest, LRUFromOptions) {
std::shared_ptr<Cache> cache, copy;
Options options;
Status s =
Cache::CreateFromString(config_options_,
"capacity=1M;num_shard_bits=5;strict_capacity_"
"limit=true; metadata_charge_policy=kDontCharge",
&cache);
#ifndef ROCKSDB_LITE
ASSERT_OK(s);
ASSERT_NE(cache, nullptr);
ASSERT_STREQ(cache->Name(), LRUCache::kClassName());
ASSERT_EQ(cache->GetCapacity(), 1024U * 1024U);
ASSERT_OK(cache->ValidateOptions(options, options));
config_options_.invoke_prepare_options = true;
ASSERT_OK(Cache::CreateFromString(config_options_,
cache->ToString(config_options_), &copy));
ASSERT_NE(copy, nullptr);
ASSERT_STREQ(copy->Name(), LRUCache::kClassName());
std::string mismatch;
ASSERT_TRUE(cache->AreEquivalent(config_options_, copy.get(), &mismatch));
ASSERT_OK(copy->ValidateOptions(options, options));
#else
INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest,
testing::Values(kLRU, kFast));
ASSERT_NOK(s);
#endif // ROCKSDB_LITE
}

TEST_F(CreateCacheTest, ClockCache) {
std::shared_ptr<Cache> cache, copy;
Options options;
Status s = Cache::CreateFromString(config_options_, ClockCache::kClassName(),
&cache);
if (ClockCache::IsClockCacheSupported()) {
ASSERT_OK(s);
ASSERT_NE(cache, nullptr);
ASSERT_STREQ(cache->Name(), ClockCache::kClassName());
ASSERT_OK(cache->PrepareOptions(config_options_));
ASSERT_OK(cache->ValidateOptions(options, options));
cache->SetCapacity(1024U * 1024U);
ASSERT_EQ(cache->GetCapacity(), 1024U * 1024U);
cache->SetStrictCapacityLimit(true);
ASSERT_EQ(cache->HasStrictCapacityLimit(), true);
#ifndef ROCKSDB_LITE
config_options_.invoke_prepare_options = true;
ASSERT_OK(Cache::CreateFromString(config_options_,
cache->ToString(config_options_), &copy));
ASSERT_NE(copy, nullptr);
ASSERT_STREQ(copy->Name(), ClockCache::kClassName());
std::string mismatch;
ASSERT_TRUE(cache->AreEquivalent(config_options_, copy.get(), &mismatch));
ASSERT_OK(copy->ValidateOptions(options, options));
#endif // ROCKSDB_LITE
} else {
ASSERT_NOK(s);
}
}

#ifndef ROCKSDB_LITE
TEST_F(CreateCacheTest, LRUFromBadOptions) {
std::shared_ptr<Cache> cache;

ASSERT_NOK(Cache::CreateFromString(
config_options_, "capacityXX=1M; metadata_charge_policy=kDontCharge",
&cache));
ASSERT_NOK(Cache::CreateFromString(config_options_,
"metadata_charge_policy=kXX", &cache));
ASSERT_NOK(
Cache::CreateFromString(config_options_, "num_shard_bits=21", &cache));
}

TEST_F(CreateCacheTest, ManagedCache) {
std::shared_ptr<Cache> cache1 = NewLRUCache(1024 * 1024);
std::shared_ptr<Cache> cache2 = NewLRUCache(1024 * 1024);
std::shared_ptr<Cache> cache3;
ASSERT_NE(cache1, nullptr);
ASSERT_NE(cache2, nullptr);
ASSERT_NE(cache1, cache2);

auto cache_id = cache1->GetId();
ASSERT_OK(Cache::CreateFromString(config_options_, cache_id, &cache3));
ASSERT_EQ(cache1, cache3);
cache1.reset();
cache2.reset();
cache3.reset();
ASSERT_NOK(Cache::CreateFromString(config_options_, cache_id, &cache3));

cache1 = NewClockCache(1024 * 1024);
if (cache1) {
cache_id = cache1->GetId();
ASSERT_OK(
Cache::CreateFromString(config_options_, cache1->GetId(), &cache3));
ASSERT_EQ(cache1, cache3);
cache1.reset();
cache3.reset();
ASSERT_NOK(Cache::CreateFromString(config_options_, cache_id, &cache3));
}
}

#ifdef SUPPORT_CLOCK_CACHE
TEST_F(CreateCacheTest, ClockCacheFromBadOptions) {
std::shared_ptr<Cache> cache;
std::string id = ClockCache::kClassName();
ASSERT_NOK(Cache::CreateFromString(
config_options_,
"capacityXX=1M; metadata_charge_policy=kDontCharge;id=" + id, &cache));
ASSERT_NOK(Cache::CreateFromString(
config_options_, "metadata_charge_policy=kXX; id=" + id, &cache));

ASSERT_NOK(Cache::CreateFromString(config_options_,
"num_shard_bits=21;id=" + id, &cache));
}
#endif // SUPPORT_CLOCK_CACHE

#endif // ROCKSDB_LITE

INSTANTIATE_TEST_CASE_P(LRUCache, CacheTest, testing::Values(kLRU));
#ifdef SUPPORT_CLOCK_CACHE
INSTANTIATE_TEST_CASE_P(ClockCache, CacheTest, testing::Values(kClock));
#endif // SUPPORT_CLOCK_CACHE
INSTANTIATE_TEST_CASE_P(CacheTestInstance, LRUCacheTest,
testing::Values(kLRU, kFast));
Expand Down
Loading