Skip to content

Commit

Permalink
Refactor table_factory into MutableCFOptions (#13077)
Browse files Browse the repository at this point in the history
Summary:
This is setting up for a fix to a data race in SetOptions on BlockBasedTableOptions (BBTO), #10079
The race will be fixed by replacing `table_factory` with a modified copy whenever we want to modify a BBTO field.

An argument could be made that this change creates more entaglement between features (e.g. BlobSource <-> MutableCFOptions), rather than (conceptually) minimizing the dependencies of each feature, but
* Most of these things already depended on ImmutableOptions
* Historically there has been a lot of plumbing (and possible small CPU overhead) involved in adding features that need to reach a lot of places, like `block_protection_bytes_per_key`. Keeping those wrapped up in options simplifies that.
* SuperVersion management generally takes care of lifetime management of MutableCFOptions, so is not that difficult. (Crash test agrees so far.)

There are some FIXME places where it is known to be unsafe to replace `block_cache` unless/until we handle shared_ptr tracking properly. HOWEVER, replacing `block_cache` is generally dubious, at least while existing users of the old block cache (e.g. table readers) can continue indefinitely.

The change to cf_options.cc is essentially just moving code (not changing).

I'm not concerned about the performance of copying another shared_ptr with MutableCFOptions, but I left a note about considering an improvement if more shared_ptr are added to it.

Pull Request resolved: #13077

Test Plan:
existing tests, crash test.

Unit test DBOptionsTest.GetLatestCFOptions updated with some temporary logic. MemoryTest required some refactoring (simplification) for the change.

Reviewed By: cbi42

Differential Revision: D64546903

Pulled By: pdillinger

fbshipit-source-id: 69ae97ce5cf4c01b58edc4c5d4687eb1e5bf5855
  • Loading branch information
pdillinger authored and facebook-github-bot committed Oct 17, 2024
1 parent a44f4b8 commit ac24f15
Show file tree
Hide file tree
Showing 36 changed files with 399 additions and 450 deletions.
13 changes: 7 additions & 6 deletions db/blob/blob_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@

namespace ROCKSDB_NAMESPACE {

BlobSource::BlobSource(const ImmutableOptions* immutable_options,
BlobSource::BlobSource(const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const std::string& db_id,
const std::string& db_session_id,
BlobFileCache* blob_file_cache)
: db_id_(db_id),
db_session_id_(db_session_id),
statistics_(immutable_options->statistics.get()),
statistics_(immutable_options.statistics.get()),
blob_file_cache_(blob_file_cache),
blob_cache_(immutable_options->blob_cache),
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {
blob_cache_(immutable_options.blob_cache),
lowest_used_cache_tier_(immutable_options.lowest_used_cache_tier) {
auto bbto =
immutable_options->table_factory->GetOptions<BlockBasedTableOptions>();
mutable_cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
if (bbto &&
bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
.charged == CacheEntryRoleOptions::Decision::kEnabled) {
blob_cache_ = SharedCacheInterface{std::make_shared<ChargedCache>(
immutable_options->blob_cache, bbto->block_cache)};
immutable_options.blob_cache, bbto->block_cache)};
}
}

Expand Down
6 changes: 5 additions & 1 deletion db/blob/blob_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
namespace ROCKSDB_NAMESPACE {

struct ImmutableOptions;
struct MutableCFOptions;
class Status;
class FilePrefetchBuffer;
class Slice;
Expand All @@ -31,7 +32,10 @@ class Slice;
// storage with minimal cost.
class BlobSource {
public:
BlobSource(const ImmutableOptions* immutable_options,
// NOTE: db_id, db_session_id, and blob_file_cache are saved by reference or
// pointer.
BlobSource(const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const std::string& db_id, const std::string& db_session_id,
BlobFileCache* blob_file_cache);

Expand Down
36 changes: 22 additions & 14 deletions db/blob/blob_source_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
DestroyAndReopen(options_);

ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);

constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
Expand Down Expand Up @@ -193,8 +194,8 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);

BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());

ReadOptions read_options;
read_options.verify_checksums = true;
Expand Down Expand Up @@ -464,6 +465,7 @@ TEST_F(BlobSourceTest, GetCompressedBlobs) {
DestroyAndReopen(options_);

ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);

constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
Expand Down Expand Up @@ -498,8 +500,8 @@ TEST_F(BlobSourceTest, GetCompressedBlobs) {
backing_cache.get(), &immutable_options, &file_options,
column_family_id, nullptr /*HistogramImpl*/, nullptr /*IOTracer*/);

BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());

ReadOptions read_options;
read_options.verify_checksums = true;
Expand Down Expand Up @@ -589,6 +591,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) {
DestroyAndReopen(options_);

ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);

constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
Expand Down Expand Up @@ -644,8 +647,8 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) {
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);

BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());

ReadOptions read_options;
read_options.verify_checksums = true;
Expand Down Expand Up @@ -782,6 +785,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
DestroyAndReopen(options_);

ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);

constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
Expand Down Expand Up @@ -827,8 +831,8 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);

BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());

ReadOptions read_options;
read_options.verify_checksums = true;
Expand Down Expand Up @@ -1105,6 +1109,7 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
DestroyAndReopen(options_);

ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);

constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
Expand Down Expand Up @@ -1137,8 +1142,8 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));

BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());

CacheHandleGuard<BlobFileReader> file_reader;
ReadOptions read_options;
Expand Down Expand Up @@ -1405,6 +1410,7 @@ TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
DestroyAndReopen(options_);

ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);

constexpr ExpirationRange expiration_range;

Expand All @@ -1426,8 +1432,8 @@ TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
backing_cache.get(), &immutable_options, &file_options,
kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/);

BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());

ConcurrentCacheReservationManager* cache_res_mgr =
static_cast<ChargedCache*>(blob_source.GetBlobCache())
Expand Down Expand Up @@ -1519,6 +1525,8 @@ TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservation) {
DestroyAndReopen(options_);

ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);

constexpr size_t blob_size = 24 << 10; // 24KB
for (size_t i = 0; i < kNumBlobs; ++i) {
blob_file_size_ -= blobs_[i].size(); // old blob size
Expand Down Expand Up @@ -1546,8 +1554,8 @@ TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservation) {
backing_cache.get(), &immutable_options, &file_options,
kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/);

BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());

ConcurrentCacheReservationManager* cache_res_mgr =
static_cast<ChargedCache*>(blob_source.GetBlobCache())
Expand Down
8 changes: 3 additions & 5 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TableBuilder* NewTableBuilder(const TableBuilderOptions& tboptions,
assert((tboptions.column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
tboptions.column_family_name.empty());
return tboptions.ioptions.table_factory->NewTableBuilder(tboptions, file);
return tboptions.moptions.table_factory->NewTableBuilder(tboptions, file);
}

Status BuildTable(
Expand Down Expand Up @@ -420,17 +420,15 @@ Status BuildTable(
// the goal is to cache it here for further user reads.
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
tboptions.read_options, file_options, tboptions.internal_comparator,
*meta, nullptr /* range_del_agg */,
mutable_cf_options.prefix_extractor, nullptr,
*meta, nullptr /* range_del_agg */, mutable_cf_options, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
TableReaderCaller::kFlush, /*arena=*/nullptr,
/*skip_filter=*/false, tboptions.level_at_creation,
MaxFileSizeForL0MetaPin(mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key*/ nullptr,
/*allow_unprepared_value*/ false,
mutable_cf_options.block_protection_bytes_per_key));
/*allow_unprepared_value*/ false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
OutputValidator file_validator(tboptions.internal_comparator,
Expand Down
4 changes: 2 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,8 @@ ColumnFamilyData::ColumnFamilyData(
blob_file_cache_.reset(
new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
internal_stats_->GetBlobFileReadHist(), io_tracer));
blob_source_.reset(new BlobSource(ioptions(), db_id, db_session_id,
blob_file_cache_.get()));
blob_source_.reset(new BlobSource(ioptions_, mutable_cf_options_, db_id,
db_session_id, blob_file_cache_.get()));

if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ bool Compaction::ShouldFormSubcompactions() const {
return false;
}

if (cfd_->ioptions()->table_factory->Name() ==
if (mutable_cf_options_.table_factory->Name() ==
TableFactory::kPlainTableName()) {
return false;
}
Expand Down
15 changes: 5 additions & 10 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
ReadOptions read_options(Env::IOActivity::kCompaction);
read_options.rate_limiter_priority = GetRateLimiterPriority();
auto* c = compact_->compaction;
if (c->immutable_options()->table_factory->Name() ==
if (c->mutable_cf_options()->table_factory->Name() ==
TableFactory::kPlainTableName()) {
return;
}
Expand Down Expand Up @@ -506,9 +506,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
FileMetaData* f = flevel->files[i].file_metadata;
std::vector<TableReader::Anchor> my_anchors;
Status s = cfd->table_cache()->ApproximateKeyAnchors(
read_options, icomp, *f,
c->mutable_cf_options()->block_protection_bytes_per_key,
my_anchors);
read_options, icomp, *f, *c->mutable_cf_options(), my_anchors);
if (!s.ok() || my_anchors.empty()) {
my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
}
Expand Down Expand Up @@ -711,8 +709,6 @@ Status CompactionJob::Run() {
}
}
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto& prefix_extractor =
compact_->compaction->mutable_cf_options()->prefix_extractor;
std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) {
while (true) {
Expand All @@ -733,7 +729,8 @@ Status CompactionJob::Run() {
InternalIterator* iter = cfd->table_cache()->NewIterator(
verify_table_read_options, file_options_,
cfd->internal_comparator(), files_output[file_idx]->meta,
/*range_del_agg=*/nullptr, prefix_extractor,
/*range_del_agg=*/nullptr,
*compact_->compaction->mutable_cf_options(),
/*table_reader_ptr=*/nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
Expand All @@ -743,9 +740,7 @@ Status CompactionJob::Run() {
*compact_->compaction->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false,
compact_->compaction->mutable_cf_options()
->block_protection_bytes_per_key);
/*allow_unprepared_value=*/false);
auto s = iter->status();

if (s.ok() && paranoid_file_checks_) {
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ class CompactionJobTestBase : public testing::Test {
} else {
assert(false);
}
mutable_cf_options_.table_factory = cf_options_.table_factory;
}

std::string GenerateFileName(uint64_t file_number) {
Expand Down
2 changes: 1 addition & 1 deletion db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Status VerifySstFileChecksumInternal(const Options& options,
options.block_protection_bytes_per_key, false /* skip_filters */,
!kImmortal, false /* force_direct_prefetch */, -1 /* level */);
reader_options.largest_seqno = largest_seqno;
s = ioptions.table_factory->NewTableReader(
s = options.table_factory->NewTableReader(
read_options, reader_options, std::move(file_reader), file_size,
&table_reader, false /* prefetch_index_and_filter_in_cache */);
if (!s.ok()) {
Expand Down
11 changes: 7 additions & 4 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,13 @@ void DBImpl::DumpStats() {
continue;
}

auto* table_factory =
cfd->GetCurrentMutableCFOptions()->table_factory.get();
assert(table_factory != nullptr);
// FIXME: need to a shared_ptr if/when block_cache is going to be mutable
Cache* cache =
table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());

// Release DB mutex for gathering cache entry stats. Pass over all
// column families for this first so that other stats are dumped
// near-atomically.
Expand All @@ -1161,10 +1168,6 @@ void DBImpl::DumpStats() {

// Probe block cache for problems (if not already via another CF)
if (immutable_db_options_.info_log) {
auto* table_factory = cfd->ioptions()->table_factory.get();
assert(table_factory != nullptr);
Cache* cache =
table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());
if (cache && probed_caches.insert(cache).second) {
cache->ReportProblems(immutable_db_options_.info_log);
}
Expand Down
4 changes: 1 addition & 3 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,9 +1199,7 @@ class DBImpl : public DB {

uint64_t TEST_total_log_size() const { return total_log_size_; }

// Returns column family name to ImmutableCFOptions map.
Status TEST_GetAllImmutableCFOptions(
std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map);
void TEST_GetAllBlockCaches(std::unordered_set<const Cache*>* cache_set);

// Return the lastest MutableCFOptions of a column family
Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family,
Expand Down
23 changes: 8 additions & 15 deletions db/db_impl/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,23 +232,16 @@ uint64_t DBImpl::TEST_LogfileNumber() {
return logfile_number_;
}

Status DBImpl::TEST_GetAllImmutableCFOptions(
std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map) {
std::vector<std::string> cf_names;
std::vector<const ImmutableCFOptions*> iopts;
{
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
cf_names.push_back(cfd->GetName());
iopts.push_back(cfd->ioptions());
void DBImpl::TEST_GetAllBlockCaches(
std::unordered_set<const Cache*>* cache_set) {
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (const auto bbto =
cfd->GetCurrentMutableCFOptions()
->table_factory->GetOptions<BlockBasedTableOptions>()) {
cache_set->insert(bbto->block_cache.get());
}
}
iopts_map->clear();
for (size_t i = 0; i < cf_names.size(); ++i) {
iopts_map->insert({cf_names[i], iopts[i]});
}

return Status::OK();
}

uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() {
Expand Down
Loading

0 comments on commit ac24f15

Please sign in to comment.