Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jay-zhuang committed Jan 29, 2022
1 parent 2562306 commit 66bc370
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 44 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Remove ReadOptions::iter_start_seqnum which has been deprecated.
* Remove DBOptions::preserved_deletes and DB::SetPreserveDeletesSequenceNumber().
* Remove deprecated API AdvancedColumnFamilyOptions::rate_limit_delay_max_milliseconds.
* Add subcompaction callback APIs: `OnSubcompactionBegin()` and `OnSubcompactionCompleted()`.

### Behavior Changes
* Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO.
Expand Down
29 changes: 21 additions & 8 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,23 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
}
#endif // !ROCKSDB_LITE

void CompactionJob::BuildSubcompactionJobInfo(
SubcompactionState* sub_compact,
SubcompactionJobInfo* subcompaction_job_info) const {
Compaction* c = compact_->compaction;
ColumnFamilyData* cfd = c->column_family_data();

subcompaction_job_info->cf_id = cfd->GetID();
subcompaction_job_info->cf_name = cfd->GetName();
subcompaction_job_info->status = sub_compact->status;
subcompaction_job_info->thread_id = env_->GetThreadID();
subcompaction_job_info->job_id = job_id_;
subcompaction_job_info->subcompaction_job_id = sub_compact->sub_job_id;
subcompaction_job_info->base_input_level = c->start_level();
subcompaction_job_info->output_level = c->output_level();
subcompaction_job_info->stats = sub_compact->compaction_job_stats;
}

void CompactionJob::NotifyOnSubcompactionBegin(
SubcompactionState* sub_compact) {
#ifndef ROCKSDB_LITE
Expand All @@ -1234,10 +1251,8 @@ void CompactionJob::NotifyOnSubcompactionBegin(

sub_compact->notify_on_subcompaction_completion = true;

CompactionJobInfo info{};
BuildCompactionJobInfo(cfd, c, sub_compact->status,
sub_compact->compaction_job_stats, job_id_,
info.subcompaction_job_id, nullptr, env_, &info);
SubcompactionJobInfo info{};
BuildSubcompactionJobInfo(sub_compact, &info);

for (auto listener : db_options_.listeners) {
listener->OnSubcompactionBegin(info);
Expand Down Expand Up @@ -1266,10 +1281,8 @@ void CompactionJob::NotifyOnSubcompactionCompleted(
return;
}

CompactionJobInfo info{};
BuildCompactionJobInfo(cfd, c, sub_compact->status,
sub_compact->compaction_job_stats, job_id_,
info.subcompaction_job_id, nullptr, env_, &info);
SubcompactionJobInfo info{};
BuildSubcompactionJobInfo(sub_compact, &info);

for (auto listener : db_options_.listeners) {
listener->OnSubcompactionCompleted(info);
Expand Down
4 changes: 4 additions & 0 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class CompactionJob {
void UpdateCompactionInputStatsHelper(
int* num_files, uint64_t* bytes_read, int input_level);

void BuildSubcompactionJobInfo(
SubcompactionState* sub_compact,
SubcompactionJobInfo* subcompaction_job_info) const;

void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact);

void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact);
Expand Down
19 changes: 10 additions & 9 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4679,7 +4679,6 @@ TEST_F(DBCompactionTest, SubcompactionEvent) {
InstrumentedMutexLock l(&mutex_);
ASSERT_EQ(running_compactions_.find(ci.job_id),
running_compactions_.end());
ASSERT_EQ(ci.subcompaction_job_id, -1);
running_compactions_.emplace(ci.job_id, 0);
}

Expand All @@ -4688,23 +4687,25 @@ TEST_F(DBCompactionTest, SubcompactionEvent) {
InstrumentedMutexLock l(&mutex_);
auto it = running_compactions_.find(ci.job_id);
ASSERT_NE(it, running_compactions_.end());
ASSERT_EQ(it->second, 0);
ASSERT_EQ(it->second.size(), 0);
running_compactions_.erase(it);
}

void OnSubcompactionBegin(const CompactionJobInfo& ci) override {
void OnSubcompactionBegin(const SubcompactionJobInfo& si) override {
InstrumentedMutexLock l(&mutex_);
auto it = running_compactions_.find(ci.job_id);
auto it = running_compactions_.find(si.job_id);
ASSERT_NE(it, running_compactions_.end());
it->second++;
auto r = it->second.insert(si.subcompaction_job_id);
ASSERT_TRUE(r.second); // each subcompaction_job_id should be different
total_subcompaction_cnt_++;
}

void OnSubcompactionCompleted(const CompactionJobInfo& ci) override {
void OnSubcompactionCompleted(const SubcompactionJobInfo& si) override {
InstrumentedMutexLock l(&mutex_);
auto it = running_compactions_.find(ci.job_id);
auto it = running_compactions_.find(si.job_id);
ASSERT_NE(it, running_compactions_.end());
it->second--;
auto r = it->second.erase(si.subcompaction_job_id);
ASSERT_EQ(r, 1);
}

size_t GetRunningCompactionCount() {
Expand All @@ -4719,7 +4720,7 @@ TEST_F(DBCompactionTest, SubcompactionEvent) {

private:
InstrumentedMutex mutex_;
std::unordered_map<int, int> running_compactions_;
std::unordered_map<int, std::unordered_set<int>> running_compactions_;
size_t total_subcompaction_cnt_ = 0;
};

Expand Down
11 changes: 5 additions & 6 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,11 @@ class DBImpl : public DB {
bool HaveManualCompaction(ColumnFamilyData* cfd);
bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
#ifndef ROCKSDB_LITE
void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c,
const Status& st,
const CompactionJobStats& compaction_job_stats,
const int job_id, const Version* current,
CompactionJobInfo* compaction_job_info) const;
// Reserve the next 'num' file numbers for to-be-ingested external SST files,
// and return the current file_number in 'next_file_number'.
// Write a version edit to the MANIFEST.
Expand Down Expand Up @@ -2379,12 +2384,6 @@ extern uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<const autovector<MemTable*>*>& memtables_to_flush);

extern void BuildCompactionJobInfo(
const ColumnFamilyData* cfd, Compaction* c, const Status& st,
const CompactionJobStats& compaction_job_stats, const int job_id,
const int subcompaction_job_id, const Version* current, Env* env,
CompactionJobInfo* compaction_job_info);

// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
Expand Down
25 changes: 10 additions & 15 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1403,8 +1403,7 @@ Status DBImpl::CompactFilesImpl(

if (compaction_job_info != nullptr) {
BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
job_context->job_id, -1, version, env_,
compaction_job_info);
job_context->job_id, version, compaction_job_info);
}

if (status.ok()) {
Expand Down Expand Up @@ -1510,8 +1509,7 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
{
CompactionJobInfo info{};
BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, -1, current, env_,
&info);
BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, current, &info);
for (auto listener : immutable_db_options_.listeners) {
listener->OnCompactionBegin(this, info);
}
Expand Down Expand Up @@ -1551,8 +1549,8 @@ void DBImpl::NotifyOnCompactionCompleted(
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
{
CompactionJobInfo info{};
BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, -1,
current, env_, &info);
BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
&info);
for (auto listener : immutable_db_options_.listeners) {
listener->OnCompactionCompleted(this, info);
}
Expand Down Expand Up @@ -3548,19 +3546,16 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
}

#ifndef ROCKSDB_LITE
void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c,
const Status& st,
const CompactionJobStats& compaction_job_stats,
const int job_id, const int subcompaction_job_id,
const Version* current, Env* env,
CompactionJobInfo* compaction_job_info) {
void DBImpl::BuildCompactionJobInfo(
const ColumnFamilyData* cfd, Compaction* c, const Status& st,
const CompactionJobStats& compaction_job_stats, const int job_id,
const Version* current, CompactionJobInfo* compaction_job_info) const {
assert(compaction_job_info != nullptr);
compaction_job_info->cf_id = cfd->GetID();
compaction_job_info->cf_name = cfd->GetName();
compaction_job_info->status = st;
compaction_job_info->thread_id = env->GetThreadID();
compaction_job_info->thread_id = env_->GetThreadID();
compaction_job_info->job_id = job_id;
compaction_job_info->subcompaction_job_id = subcompaction_job_id;
compaction_job_info->base_input_level = c->start_level();
compaction_job_info->output_level = c->output_level();
compaction_job_info->stats = compaction_job_stats;
Expand All @@ -3576,7 +3571,7 @@ void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c,
compaction_job_info->input_files.push_back(fn);
compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
if (current && compaction_job_info->table_properties.count(fn) == 0) {
if (compaction_job_info->table_properties.count(fn) == 0) {
std::shared_ptr<const TableProperties> tp;
auto s = current->GetTableProperties(&tp, fmd, &fn);
if (s.ok()) {
Expand Down
53 changes: 47 additions & 6 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,42 @@ struct CompactionFileInfo {
uint64_t oldest_blob_file_number;
};

struct SubcompactionJobInfo {
~SubcompactionJobInfo() { status.PermitUncheckedError(); }
// the id of the column family where the compaction happened.
uint32_t cf_id;
// the name of the column family where the compaction happened.
std::string cf_name;
// the status indicating whether the compaction was successful or not.
Status status;
// the id of the thread that completed this compaction job.
uint64_t thread_id;
// the job id, which is unique in the same thread.
int job_id;

// sub-compaction job id, which is only unique within the same compaction, so
// use both 'job_id' and 'subcompaction_job_id' to identify a subcompaction
// within an instance.
// For non subcompaction job, it's set to -1.
int subcompaction_job_id;
// the smallest input level of the compaction.
int base_input_level;
// the output level of the compaction.
int output_level;

// Reason to run the compaction
CompactionReason compaction_reason;

// Compression algorithm used for output files
CompressionType compression;

// Statistics and other additional details on the compaction
CompactionJobStats stats;

// Compression algorithm used for blob output files.
CompressionType blob_compression_type;
};

struct CompactionJobInfo {
~CompactionJobInfo() { status.PermitUncheckedError(); }
// the id of the column family where the compaction happened.
Expand All @@ -376,8 +412,6 @@ struct CompactionJobInfo {
// the job id, which is unique in the same thread.
int job_id;

// sub-compaction job id, which is only unique within the same compaction.
int subcompaction_job_id;
// the smallest input level of the compaction.
int base_input_level;
// the output level of the compaction.
Expand Down Expand Up @@ -583,7 +617,11 @@ class EventListener : public Customizable {
const CompactionJobInfo& /*ci*/) {}

// A callback function to RocksDB which will be called before a sub-compaction
// begins. The default implementation is a no-op.
// begins. If a compaction is split to 2 sub-compactions, it will trigger one
// `OnCompactionBegin()` first, then two `OnSubcompactionBegin()`.
// If compaction is not split, it will still trigger one
// `OnSubcompactionBegin()`, as internally, compaction is always handled by
// sub-compaction. The default implementation is a no-op.
//
// Note that this function must be implemented in a way such that
// it should not run for an extended period of time before the function
Expand All @@ -595,10 +633,13 @@ class EventListener : public Customizable {
// returned, and must be copied if it's needed outside this function.
// Note: `table_properties` is not set for sub-compaction, the information
// could be got from `OnCompactionBegin()`.
virtual void OnSubcompactionBegin(const CompactionJobInfo& /*ci*/) {}
virtual void OnSubcompactionBegin(const SubcompactionJobInfo& /*si*/) {}

// A callback function to RocksDB which will be called whenever a
// sub-compaction completed. The default implementation is a no-op.
// sub-compaction completed. The same as `OnSubcompactionBegin()`, if a
// compaction is split to 2 sub-compactions, it will be triggered twice. If
// a compaction is not split, it will still be triggered once.
// The default implementation is a no-op.
//
// Note that this function must be implemented in a way such that
// it should not run for an extended period of time before the function
Expand All @@ -610,7 +651,7 @@ class EventListener : public Customizable {
// returned, and must be copied if it's needed outside this function.
// Note: `table_properties` is not set for sub-compaction, the information
// could be got from `OnCompactionCompleted()`.
virtual void OnSubcompactionCompleted(const CompactionJobInfo& /*ci*/) {}
virtual void OnSubcompactionCompleted(const SubcompactionJobInfo& /*si*/) {}

// A callback function for RocksDB which will be called whenever
// a SST file is created. Different from OnCompactionCompleted and
Expand Down

0 comments on commit 66bc370

Please sign in to comment.