Skip to content

Commit

Permalink
Fix compatibility
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
Signed-off-by: v01dstar <[email protected]>
  • Loading branch information
v01dstar committed May 31, 2024
1 parent 5a9e751 commit 3ea895b
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 83 deletions.
4 changes: 2 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ Status ColumnFamilyData::GetMemtablesUserKeyRange(PinnableSlice* smallest,
read_opts.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(mem_->NewIterator(read_opts, &arena));
imm_.current()->AddIterators(read_opts, &merge_iter_builder);
imm_.current()->AddIterators(read_opts, &merge_iter_builder, false);
ScopedArenaIterator mem_iter(merge_iter_builder.Finish());
mem_iter->SeekToFirst();
if (mem_iter->Valid()) {
Expand All @@ -1282,7 +1282,7 @@ Status ColumnFamilyData::GetMemtablesUserKeyRange(PinnableSlice* smallest,
imm_.ExportMemtables(&memtables);
for (auto* mem : memtables) {
auto* iter =
mem->NewRangeTombstoneIterator(read_opts, kMaxSequenceNumber);
mem->NewRangeTombstoneIterator(read_opts, kMaxSequenceNumber, false);
if (iter != nullptr) {
iter->SeekToFirst();
if (iter->Valid()) {
Expand Down
2 changes: 1 addition & 1 deletion db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ TEST_F(CompactFilesTest, IsWriteStalled) {
for (int j = 0; j < 100; ++j) {
char key[16];
bzero(key, 16);
snprintf(key, 5, "foo%.2d", j);
snprintf(key, sizeof(key), "foo%.2d", j);
ASSERT_OK(wb.Put(handles[0], key, "bar"));
}

Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
existing_col = &existing_columns;
}

decision = compaction_filter_->UnsafeFilter(
decision = compaction_filter_->FilterV4(
level_, filter_key, ikey_.sequence, value_type, existing_val,
existing_col, &compaction_filter_value_, &new_columns,
compaction_filter_skip_until_.rep());
Expand Down
11 changes: 5 additions & 6 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,12 +721,11 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) {

TEST_P(CompactionIteratorTest, RemoveAllSingleDeletes) {
struct Filter : public CompactionFilter {
Decision UnsafeFilter(
int /*level*/, const Slice& key, SequenceNumber /*seq*/, ValueType t,
const Slice* /*existing_value*/, const WideColumns*,
std::string* /*new_value*/,
std::vector<std::pair<std::string, std::string>>* /*new_columns*/,
std::string* skip_until) const override {
Decision UnsafeFilter(int /*level*/, const Slice& key,
SequenceNumber /*seq*/, ValueType t,
const Slice* /*existing_value*/,
std::string* /*new_value*/,
std::string* skip_until) const override {
if (t == ValueType::kDeletion) {
*skip_until = key.ToString();
skip_until->back() += 1;
Expand Down
30 changes: 17 additions & 13 deletions db/db_impl/db_impl_merge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,18 @@ Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
if (!s.ok()) {
return s;
}
edit.AddFile(
level, target_file_number, target_path_id, f->fd.GetFileSize(),
f->smallest, f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction, f->temperature,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->file_checksum,
f->file_checksum_func_name, f->min_timestamp, f->max_timestamp);
edit.AddFile(level, target_file_number, target_path_id,
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->temperature,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->epoch_number, f->file_checksum,
f->file_checksum_func_name, f->unique_id,
f->compensated_range_deletion_size, f->tail_size,
f->user_defined_timestamps_persisted);
}
}
vsi.RecoverEpochNumbers(this_cfd);
}
}

Expand All @@ -321,8 +324,8 @@ Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
}

InstrumentedMutexLock lock(&mutex_);
s = versions_->LogAndApply(this_cfds, cf_mopts, edit_ptrs, &mutex_,
directories_.GetDbDir(), false);
s = versions_->LogAndApply(this_cfds, cf_mopts, ReadOptions(), edit_ptrs,
&mutex_, directories_.GetDbDir(), false);
if (!s.ok()) {
return s;
}
Expand All @@ -340,14 +343,15 @@ Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req);
SchedulePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(this_cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
GenerateFlushRequest(this_cfds, FlushReason::kWriteBufferFull,
&flush_req);
SchedulePendingFlush(flush_req);
}
for (auto cfd : this_cfds) {
SchedulePendingCompaction(cfd);
Expand Down
6 changes: 3 additions & 3 deletions db/db_merge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
namespace ROCKSDB_NAMESPACE {

const uint32_t default_cf = 0;
uint32_t operator"" _db(unsigned long long int i) { return i; }
uint32_t operator"" _db(unsigned long long int i) { return uint32_t(i); }
uint32_t operator"" _cf(unsigned long long int i) {
assert(i > 0);
return i;
return uint32_t(i);
}

class DBMergeTest : public testing::Test {
Expand Down Expand Up @@ -85,7 +85,7 @@ class DBMergeTest : public testing::Test {
for (auto* handle : cf_handles) {
uint32_t id = 0;
if (handle->GetName() != "default") {
id = stoul(handle->GetName());
id = uint32_t(stoul(handle->GetName()));
}
db_handles.cfs[id] = handle;
}
Expand Down
15 changes: 10 additions & 5 deletions db/db_write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs3) {
std::unordered_set<WriteThread::Writer*> w_set;
std::vector<port::Thread> threads;
int wait_count_db = 0;
int num_writers1 = 4; // default, cf1-cf3
int num_writers1 = 6; // default, cf1-cf3
InstrumentedMutex mutex;
InstrumentedCondVar cv(&mutex);
std::atomic<int> thread_num(0);
Expand Down Expand Up @@ -497,7 +497,7 @@ TEST_P(DBWriteBufferManagerTest, DynamicFlushSize) {
{
WriteOptions wo;
wo.disableWAL = true;
ASSERT_OK(db2->Put(wo, Key(1), DummyString(60000)));
ASSERT_OK(db2->Put(wo, Key(1), DummyString(50000)));
ASSERT_OK(Put(1, Key(1), DummyString(30000), wo));
ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
// Write to DB.
Expand Down Expand Up @@ -1148,6 +1148,9 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

// Disabled, scine multi-instances RocksDB project has a different stall
// strategy.
//
// Tests a `WriteBufferManager` constructed with `allow_stall == false` does not
// thrash memtable switching when full and a CF receives multiple writes.
// Instead, we expect to switch a CF's memtable for flush only when that CF does
Expand All @@ -1158,7 +1161,7 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
// by writing to that CF's DB.
//
// Not supported in LITE mode due to `GetProperty()` unavailable.
TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) {
TEST_P(DBWriteBufferManagerTest, DISABLED_StopSwitchingMemTablesOnceFlushing) {
Options options = CurrentOptions();
options.arena_block_size = 4 << 10; // 4KB
options.write_buffer_size = 1 << 20; // 1MB
Expand Down Expand Up @@ -1213,12 +1216,14 @@ TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) {
delete shared_wbm_db;
}

TEST_F(DBWriteBufferManagerTest, RuntimeChangeableAllowStall) {
// Disabled, since multi-instances RocksDB project has a different stall
// strategy.
TEST_F(DBWriteBufferManagerTest, DISABLED_RuntimeChangeableAllowStall) {
constexpr int kBigValue = 10000;

Options options = CurrentOptions();
options.write_buffer_manager.reset(
new WriteBufferManager(1, nullptr /* cache */, true /* allow_stall */));
new WriteBufferManager(1, nullptr /* cache */, 1.0 /* stall_ratio */));
DestroyAndReopen(options);

// Pause flush thread so that
Expand Down
38 changes: 18 additions & 20 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,26 +410,23 @@ class VersionBuilder::Rep {
} else if (epoch_number_requirement ==
EpochNumberRequirement::kMustPresent) {
if (lhs->epoch_number == rhs->epoch_number) {
// The following check is disabled due to instance merge.
// bool range_overlapped =
// icmp->Compare(lhs->smallest, rhs->largest) <= 0 &&
// icmp->Compare(lhs->largest, rhs->smallest) >= 0;

// if (range_overlapped) {
// std::ostringstream oss;
// oss << "L0 files of same epoch number but overlapping range
// #"
// << lhs->fd.GetNumber()
// << " , smallest key: " << lhs->smallest.DebugString(true)
// << " , largest key: " << lhs->largest.DebugString(true)
// << " , epoch number: " << lhs->epoch_number << " vs. file
// #"
// << rhs->fd.GetNumber()
// << " , smallest key: " << rhs->smallest.DebugString(true)
// << " , largest key: " << rhs->largest.DebugString(true)
// << " , epoch number: " << rhs->epoch_number;
// return Status::Corruption("VersionBuilder", oss.str());
// }
bool range_overlapped =
icmp->Compare(lhs->smallest, rhs->largest) <= 0 &&
icmp->Compare(lhs->largest, rhs->smallest) >= 0;

if (range_overlapped) {
std::ostringstream oss;
oss << "L0 files of same epoch number but overlapping range#"
<< lhs->fd.GetNumber()
<< " , smallest key: " << lhs->smallest.DebugString(true)
<< " , largest key: " << lhs->largest.DebugString(true)
<< " , epoch number: " << lhs->epoch_number << " vs. file#"
<< rhs->fd.GetNumber()
<< " , smallest key: " << rhs->smallest.DebugString(true)
<< " , largest key: " << rhs->largest.DebugString(true)
<< " , epoch number: " << rhs->epoch_number;
return Status::Corruption("VersionBuilder", oss.str());
}
}

if (!level_zero_cmp_by_epochno_(lhs, rhs)) {
Expand Down Expand Up @@ -536,6 +533,7 @@ class VersionBuilder::Rep {
}
#endif
Status s = CheckConsistencyDetails(vstorage);

if (s.IsCorruption() && s.getState()) {
// Make it clear the error is due to force_consistency_checks = 1 or
// debug build
Expand Down
40 changes: 24 additions & 16 deletions include/rocksdb/compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class CompactionFilter : public Customizable {
kValue,
// Merge operand
kMergeOperand,
<<<<<<< HEAD
// Used internally by the old stacked BlobDB implementation; this value type
// is never passed to application code. Note that when using the new
// integrated BlobDB, values stored separately as blobs are retrieved and
Expand All @@ -72,10 +71,6 @@ class CompactionFilter : public Customizable {
kWideColumnEntity,
// Only used by TiKV's region range filter.
kDeletion,
=======
kBlobIndex, // used internally by BlobDB.
kDeletion, // used only by TiKV's region range filter.
>>>>>>> de47e8ece (filter deletion in compaction filter (#344))
};

// Potential decisions that can be returned by the compaction filter's
Expand Down Expand Up @@ -321,27 +316,40 @@ class CompactionFilter : public Customizable {
skip_until);
}

// This interface supports TiKV's region range filter, meanwhile keeping the
// default FilterV3 implementation for compatibility.
virtual Decision FilterV4(
int level, const Slice& key, SequenceNumber, ValueType value_type,
int level, const Slice& key, SequenceNumber seqno, ValueType value_type,
const Slice* existing_value, const WideColumns* existing_columns,
std::string* new_value,
std::vector<std::pair<std::string, std::string>>* new_columns,
std::string* skip_until) const {
return FilterV3(level, key, value_type, existing_value, existing_columns,
new_value, new_columns, skip_until);
#ifdef NDEBUG
(void)existing_columns;
#endif

assert(!existing_value || !existing_columns);
assert(value_type == ValueType::kWideColumnEntity || existing_value);
assert(value_type != ValueType::kWideColumnEntity || existing_columns);

if (value_type == ValueType::kWideColumnEntity) {
return Decision::kKeep;
}

return UnsafeFilter(level, key, seqno, value_type, *existing_value,
new_value, skip_until);
}

// This interface is reserved for TiKV's region range filter. Only this
// interface can accept `value_type=kTypeDeletion`.
virtual Decision UnsafeFilter(
int level, const Slice& key, SequenceNumber seqno, ValueType value_type,
const Slice* existing_value, const WideColumns* existing_columns,
std::string* new_value,
std::vector<std::pair<std::string, std::string>>* new_columns,
std::string* skip_until) const {
virtual Decision UnsafeFilter(int level, const Slice& key,
SequenceNumber seqno, ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const {
if (value_type != ValueType::kDeletion) {
return FilterV4(level, key, seqno, value_type, existing_value,
existing_columns, new_value, new_columns, skip_until);
return FilterV2(level, key, value_type, existing_value, new_value,
skip_until);
} else {
return Decision::kKeep;
}
Expand Down
13 changes: 7 additions & 6 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Default: nullptr
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;

// Column family based write buffer manager, if this is set, this column
// facmily will not report memtable memory usage to the write buffer manager
// in DBImpl.
//
// Default: null
std::shared_ptr<WriteBufferManager> cf_write_buffer_manager = nullptr;

// RocksDB will try to flush the current memtable after the number of range
// deletions is >= this limit. For workloads with many range
// deletions, limiting the number of range deletions in memtable can help
Expand All @@ -348,12 +355,6 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
//
// Dynamically changeable through SetOptions() API
uint32_t memtable_max_range_deletions = 0;
// Column family based write buffer manager, if this is set, this column
// facmily will not report memtable memory usage to the write buffer manager
// in DBImpl.
//
// Default: null
std::shared_ptr<WriteBufferManager> cf_write_buffer_manager = nullptr;

// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
Expand Down
8 changes: 0 additions & 8 deletions monitoring/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,6 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
"rocksdb.table.open.prefetch.tail.read.bytes"},
};

template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>>(nullptr);
}

template std::shared_ptr<Statistics>
CreateDBStatistics<TICKER_ENUM_MAX, HISTOGRAM_ENUM_MAX>();

static int RegisterBuiltinStatistics(ObjectLibrary& library,
const std::string& /*arg*/) {
library.AddFactory<Statistics>(
Expand Down
8 changes: 8 additions & 0 deletions monitoring/statistics_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,12 @@ bool StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>::HistEnabledForType(
return type < HISTOGRAM_ENUM_MAX;
}

template <uint32_t TICKER_MAX, uint32_t HISTOGRAM_MAX>
std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<StatisticsImpl<TICKER_MAX, HISTOGRAM_MAX>>(nullptr);
}

template std::shared_ptr<Statistics>
CreateDBStatistics<TICKER_ENUM_MAX, HISTOGRAM_ENUM_MAX>();

} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
sizeof(std::shared_ptr<ConcurrentTaskLimiter>)},
{offsetof(struct ColumnFamilyOptions, sst_partitioner_factory),
sizeof(std::shared_ptr<SstPartitionerFactory>)},
{offset_of(&ColumnFamilyOptions::cf_write_buffer_manager),
{offsetof(struct ColumnFamilyOptions, cf_write_buffer_manager),
sizeof(std::shared_ptr<WriteBufferManager>)},
};

Expand Down
2 changes: 1 addition & 1 deletion tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5527,7 +5527,7 @@ class Benchmark {
}
if (use_multi_write_) {
s = db_with_cfh->db->MultiBatchWrite(write_options_,
batches.GetWriteBatch());
batches.GetWriteBatch(), nullptr);
} else if (!use_blob_db_) {
// Not stacked BlobDB
s = db_with_cfh->db->Write(write_options_, &batch);
Expand Down

0 comments on commit 3ea895b

Please sign in to comment.