Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New stable, fixed-length cache keys #9126

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dfdd23c
New stable, fixed-length cache keys
pdillinger Oct 22, 2021
95113f8
Fix ReverseBits test in hash_test
pdillinger Nov 4, 2021
ede7375
Improve file prefix handling
pdillinger Nov 4, 2021
02f4c3b
Remove debug output
pdillinger Nov 4, 2021
24ebdf6
Some cleanup
pdillinger Nov 4, 2021
d5ef9fc
Merge remote-tracking branch 'origin/main' into new_cache_key
pdillinger Nov 4, 2021
f90a406
Readability improvements
pdillinger Nov 4, 2021
a8d78f1
Update CacheEntryStatsCollector
pdillinger Nov 4, 2021
c9feb5a
More comments, readability
pdillinger Nov 4, 2021
a7405eb
Fix^2 hash_test
pdillinger Nov 4, 2021
1fa1a95
An old Windows fix
pdillinger Nov 4, 2021
acf3c8c
Merge remote-tracking branch 'origin/main' into new_cache_key
pdillinger Nov 4, 2021
c049a27
Detailed comments
pdillinger Nov 5, 2021
d527296
Merge remote-tracking branch 'origin/main' into new_cache_key
pdillinger Nov 5, 2021
9a5ed4a
TODO
pdillinger Nov 5, 2021
1c45ab8
More analysis in comments
pdillinger Nov 5, 2021
882ed7c
Merge remote-tracking branch 'origin/main' into new_cache_key
pdillinger Nov 18, 2021
6851d96
Merge remote-tracking branch 'origin/main' into new_cache_key
pdillinger Dec 15, 2021
7286fa2
Simulation tool
pdillinger Dec 16, 2021
7020e4a
Missed 'make format' on part of change
pdillinger Dec 16, 2021
1b241d7
Hopefully fix clang-analyze issue; small simulator improvements
pdillinger Dec 16, 2021
f8755b6
Merge remote-tracking branch 'origin/main' into new_cache_key
pdillinger Dec 16, 2021
ba25f2d
Update HISTORY.md
pdillinger Dec 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ find_package(Threads REQUIRED)
set(SOURCES
cache/cache.cc
cache/cache_entry_roles.cc
cache/cache_key.cc
cache/cache_reservation_manager.cc
cache/clock_cache.cc
cache/lru_cache.cc
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ cpp_library(
srcs = [
"cache/cache.cc",
"cache/cache_entry_roles.cc",
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/clock_cache.cc",
"cache/lru_cache.cc",
Expand Down Expand Up @@ -472,6 +473,7 @@ cpp_library(
srcs = [
"cache/cache.cc",
"cache/cache_entry_roles.cc",
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/clock_cache.cc",
"cache/lru_cache.cc",
Expand Down
205 changes: 205 additions & 0 deletions cache/cache_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

#ifdef GFLAGS
#include <cinttypes>
#include <cstddef>
#include <cstdio>
#include <limits>
#include <memory>
#include <set>
#include <sstream>

#include "db/db_impl/db_impl.h"
#include "monitoring/histogram.h"
#include "port/port.h"
#include "rocksdb/cache.h"
Expand All @@ -18,6 +21,8 @@
#include "rocksdb/env.h"
#include "rocksdb/secondary_cache.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/table_properties.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/cachable_entry.h"
#include "util/coding.h"
#include "util/gflags_compat.h"
Expand Down Expand Up @@ -73,6 +78,35 @@ static class std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> secondary_cache;

DEFINE_bool(use_clock_cache, false, "");

// ## BEGIN stress_cache_key sub-tool options ##
DEFINE_bool(stress_cache_key, false, "If true, run cache key stress test instead");
DEFINE_uint32(sck_files_per_day, 2500000,
"(-stress_cache_key) Simulated files generated per day");
DEFINE_uint32(sck_duration, 90,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you call this something other than "duration"? Duration in db_bench means how long to run the tests whereas here it means how many days to simulate. "sck_days" or something like that might be a better name.

"(-stress_cache_key) Number of days to simulate in each run");
DEFINE_uint32(
sck_min_collision, 15,
"(-stress_cache_key) Keep running until this many collisions seen");
DEFINE_uint32(
sck_file_size_mb, 32,
"(-stress_cache_key) Simulated file size in MiB, for accounting purposes");
DEFINE_uint32(sck_reopen_nfiles, 100,
"(-stress_cache_key) Re-opens DB average every n files");
DEFINE_uint32(
sck_restarts_per_day, 24,
"(-stress_cache_key) Simulated process restarts per day (across DBs)");
DEFINE_uint32(
sck_db_count, 100,
"(-stress_cache_key) Parallel DBs in operation");
DEFINE_uint32(sck_table_bits, 20,
"(-stress_cache_key) Log2 number of tracked files");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide a better description. I am not sure what "table_bits" has to do with "number of tracked files".

DEFINE_uint32(
sck_keep_bits, 50,
"(-stress_cache_key) Number of cache key bits to keep");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide a better description. I am not sure what "number of bits to keep" is supposed to mean.

DEFINE_bool(sck_randomize, false,
"(-stress_cache_key) Randomize (hash) cache key");
// ## END stress_cache_key sub-tool options ##

namespace ROCKSDB_NAMESPACE {

class CacheBench;
Expand Down Expand Up @@ -548,9 +582,180 @@ class CacheBench {
}
};

// TODO: better description (see PR #9126 for some info)
class StressCacheKey {
public:
void Run() {
uint64_t mb_per_day =
uint64_t{FLAGS_sck_files_per_day} * FLAGS_sck_file_size_mb;
printf("Total cache or DBs size: %gTiB Writing %g MiB/s or %gTiB/day\n",
FLAGS_sck_file_size_mb / 1024.0 / 1024.0 *
std::pow(2.0, FLAGS_sck_table_bits),
mb_per_day / 86400.0, mb_per_day / 1024.0 / 1024.0);
multiplier_ = std::pow(2.0, 128 - FLAGS_sck_keep_bits) /
(FLAGS_sck_file_size_mb * 1024.0 * 1024.0);
printf(
"Multiply by %g to correct for simulation losses (but still assume "
"whole file cached)\n",
multiplier_);
restart_nfiles_ = FLAGS_sck_files_per_day / FLAGS_sck_restarts_per_day;
double without_ejection =
std::pow(1.414214, FLAGS_sck_keep_bits) / FLAGS_sck_files_per_day;
printf(
"Without ejection, expect random collision after %g days (%g "
"corrected)\n",
without_ejection, without_ejection * multiplier_);
double with_full_table =
std::pow(2.0, FLAGS_sck_keep_bits - FLAGS_sck_table_bits) /
FLAGS_sck_files_per_day;
printf(
"With ejection and full table, expect random collision after %g "
"days (%g corrected)\n",
with_full_table, with_full_table * multiplier_);
collisions_ = 0;

for (int i = 1; collisions_ < FLAGS_sck_min_collision; i++) {
RunOnce();
if (collisions_ == 0) {
printf(
"No collisions after %d x %u days "
" \n",
i, FLAGS_sck_duration);
} else {
double est = 1.0 * i * FLAGS_sck_duration / collisions_;
printf("%" PRIu64
" collisions after %d x %u days, est %g days between (%g "
"corrected) \n",
collisions_, i, FLAGS_sck_duration, est, est * multiplier_);
}
}
}

void RunOnce() {
const size_t db_count = FLAGS_sck_db_count;
dbs_.reset(new TableProperties[db_count]);
const size_t table_mask = (size_t{1} << FLAGS_sck_table_bits) - 1;
table_.reset(new uint64_t[table_mask + 1]{});
if (FLAGS_sck_keep_bits > 64) {
FLAGS_sck_keep_bits = 64;
}
uint32_t shift_away = 64 - FLAGS_sck_keep_bits;
uint32_t shift_away_b = shift_away / 3;
uint32_t shift_away_a = shift_away - shift_away_b;
Comment on lines +647 to +649
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here to explain the math. Not sure what the shift_away / 3 is for.


process_count_ = 0;
session_count_ = 0;
ResetProcess();

Random64 r{std::random_device{}()};

uint64_t max_file_count =
uint64_t{FLAGS_sck_files_per_day} * FLAGS_sck_duration;
uint64_t file_count = 0;
uint32_t report_count = 0;
uint32_t collisions_this_run = 0;
// Round robin through DBs
for (size_t db_i = 0;; ++db_i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to not also have this for conditional/update file_count?

if (db_i >= db_count) { db_i = 0; }
if (file_count >= max_file_count) {
break;
}
if (r.OneIn(FLAGS_sck_reopen_nfiles)) {
ResetSession(db_i);
} else if (r.OneIn(restart_nfiles_)) {
ResetProcess();
}
OffsetableCacheKey ock;
dbs_[db_i].orig_file_number += 1;
// skip some file numbers, unless 1 DB so that that can simulate
// better (DB-independent) unique IDs
if (db_count > 1) {
dbs_[db_i].orig_file_number += (r.Next() & 3);
}
BlockBasedTable::SetupBaseCacheKey(&dbs_[db_i], "", 42, 42, &ock);
CacheKey ck = ock.WithOffset(0);
uint64_t stripped;
if (FLAGS_sck_randomize) {
stripped = GetSliceHash64(ck.AsSlice()) >> shift_away;
} else {
uint32_t a = DecodeFixed32(ck.AsSlice().data()) << shift_away_a;
uint32_t b = DecodeFixed32(ck.AsSlice().data() + 12) >> shift_away_b;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a Slice cks = ck.AsSlice()? Wondering the overhead of creating/using extra slices.

stripped = (uint64_t{a} << 32) + b;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a hashing method that combines two values we should be using instead?

}
if (stripped == 0) {
// Unlikely, but we need to exclude tracking this value
printf("Hit Zero! \n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be to stderr instead? Wondering if it will get lost if it is hit...

continue;
}
file_count++;
uint64_t h = NPHash64(reinterpret_cast<char*>(&stripped), 8);
// Skew lifetimes
size_t pos = std::min(Lower32of64(h) & table_mask, Upper32of64(h) & table_mask);
if (table_[pos] == stripped) {
collisions_this_run++;
// To predict probability of no collisions, we have to get rid of
// correlated collisions, which this takes care of:
ResetProcess();
} else {
// Replace
table_[pos] = stripped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this just guarantee that two successive hashes to the same pos are not the same? Is it possible that there is an intervening one that would cause something like [A, B, A] where the collision was not detected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simulation of various files and their cache entries kept for various lifetimes. There is no collision in practice if there's no overlap in lifetime (on disk UNION in cache).

}

if (++report_count == FLAGS_sck_files_per_day) {
report_count = 0;
// Estimate fill %
size_t incr = table_mask / 1000;
size_t sampled_count = 0;
for (size_t i = 0; i <= table_mask; i += incr) {
if (table_[i] != 0) {
sampled_count++;
}
}
// Report
printf(
"%" PRIu64 " days, %" PRIu64 " proc, %" PRIu64
" sess, %u coll, occ %g%%, ejected %g%% \r",
file_count / FLAGS_sck_files_per_day, process_count_,
session_count_, collisions_this_run, 100.0 * sampled_count / 1000.0,
100.0 * (1.0 - sampled_count / 1000.0 * table_mask / file_count));
}
}
collisions_ += collisions_this_run;
}

void ResetSession(size_t i) {
dbs_[i].db_session_id = DBImpl::GenerateDbSessionId(nullptr);
session_count_++;
}

void ResetProcess() {
process_count_++;
DBImpl::TEST_ResetDbSessionIdGen();
for (size_t i = 0; i < FLAGS_sck_db_count; ++i) {
ResetSession(i);
}
}

private:
// Use db_session_id and orig_file_number from TableProperties
std::unique_ptr<TableProperties[]> dbs_;
std::unique_ptr<uint64_t[]> table_;
uint64_t process_count_ = 0;
uint64_t session_count_ = 0;
uint64_t collisions_ = 0;
uint32_t restart_nfiles_ = 0;
double multiplier_ = 0.0;
};

int cache_bench_tool(int argc, char** argv) {
ParseCommandLineFlags(&argc, &argv, true);

if (FLAGS_stress_cache_key) {
// Alternate tool
StressCacheKey().Run();
return 0;
}

if (FLAGS_threads <= 0) {
fprintf(stderr, "threads number <= 0\n");
exit(1);
Expand Down
16 changes: 9 additions & 7 deletions cache/cache_entry_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <mutex>

#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
#include "port/lang.h"
#include "rocksdb/cache.h"
#include "rocksdb/status.h"
Expand Down Expand Up @@ -112,13 +113,7 @@ class CacheEntryStatsCollector {
// entry in cache until all refs are destroyed.
static Status GetShared(Cache *cache, SystemClock *clock,
std::shared_ptr<CacheEntryStatsCollector> *ptr) {
std::array<uint64_t, 3> cache_key_data{
{// First 16 bytes == md5 of class name
0x7eba5a8fb5437c90U, 0x8ca68c9b11655855U,
// Last 8 bytes based on a function pointer to make unique for each
// template instantiation
reinterpret_cast<uint64_t>(&CacheEntryStatsCollector::GetShared)}};
Slice cache_key = GetSlice(&cache_key_data);
const Slice &cache_key = GetCacheKey();

Cache::Handle *h = cache->Lookup(cache_key);
if (h == nullptr) {
Expand Down Expand Up @@ -166,6 +161,13 @@ class CacheEntryStatsCollector {
delete static_cast<CacheEntryStatsCollector *>(value);
}

static const Slice &GetCacheKey() {
// For each template instantiation
static CacheKey ckey = CacheKey::CreateUniqueForProcessLifetime();
static Slice ckey_slice = ckey.AsSlice();
return ckey_slice;
}

std::mutex saved_mutex_;
Stats saved_stats_;

Expand Down
Loading