Skip to content

Commit

Permalink
Support canceling running RemoteCompaction on remote side (#9725)
Browse files Browse the repository at this point in the history
Summary:
Add the ability to cancel remote compaction on the remote side by
setting `OpenAndCompactOptions.canceled` to true.

Pull Request resolved: #9725

Test Plan: added unittest

Reviewed By: ajkr

Differential Revision: D35018800

Pulled By: jay-zhuang

fbshipit-source-id: be3652f9645e0347df429e42a5614d5a9b3a1ec4
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Apr 13, 2022
1 parent 9454e74 commit dc1c90c
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 29 deletions.
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
Expand All @@ -2499,7 +2500,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,
nullptr, nullptr, db_id, db_session_id,
nullptr, manual_compaction_canceled, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(output_path),
compaction_input_(compaction_service_input),
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ class CompactionServiceCompactionJob : private CompactionJob {
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
Expand Down
54 changes: 53 additions & 1 deletion db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ class MyTestCompactionService : public CompactionService {
options_override.listeners = listeners_;
}

OpenAndCompactOptions options;
options.canceled = &canceled_;

Status s = DB::OpenAndCompact(
db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
options, db_path_,
db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
compaction_input, compaction_service_result, options_override);
if (is_override_wait_result_) {
*compaction_service_result = override_wait_result_;
Expand Down Expand Up @@ -118,6 +122,8 @@ class MyTestCompactionService : public CompactionService {
is_override_wait_status_ = false;
}

void SetCanceled(bool canceled) { canceled_ = canceled; }

private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
Expand All @@ -136,6 +142,7 @@ class MyTestCompactionService : public CompactionService {
bool is_override_wait_result_ = false;
std::string override_wait_result_;
std::vector<std::shared_ptr<EventListener>> listeners_;
std::atomic_bool canceled_{false};
};

class CompactionServiceTest : public DBTestBase {
Expand Down Expand Up @@ -331,6 +338,51 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
VerifyTestData();
}

TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();

auto my_cs = GetCompactionService();

std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();

// Test cancel compaction at the beginning
my_cs->SetCanceled(true);
auto s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_TRUE(s.IsIncomplete());
// compaction number is not increased
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
VerifyTestData();

// Test cancel compaction in progress
ReopenWithCompactionService(&options);
GenerateTestData();
my_cs = GetCompactionService();
my_cs->SetCanceled(false);

std::atomic_bool cancel_issued{false};
SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
[&](void* /*arg*/) {
cancel_issued = true;
my_cs->SetCanceled(true);
});

SyncPoint::GetInstance()->EnableProcessing();

s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(cancel_issued);
// compaction number is not increased
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
VerifyTestData();
}

TEST_F(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Expand Down
31 changes: 23 additions & 8 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,11 @@ Status DB::OpenAsSecondary(
}

Status DBImplSecondary::CompactWithoutInstallation(
ColumnFamilyHandle* cfh, const CompactionServiceInput& input,
CompactionServiceResult* result) {
const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh,
const CompactionServiceInput& input, CompactionServiceResult* result) {
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
InstrumentedMutexLock l(&mutex_);
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
if (!cfd) {
Expand Down Expand Up @@ -774,7 +777,7 @@ Status DBImplSecondary::CompactWithoutInstallation(
file_options_for_compaction_, versions_.get(), &shutting_down_,
&log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
db_id_, db_session_id_, secondary_path_, input, result);
options.canceled, db_id_, db_session_id_, secondary_path_, input, result);

mutex_.Unlock();
s = compaction_job.Run();
Expand All @@ -793,9 +796,13 @@ Status DBImplSecondary::CompactWithoutInstallation(
}

Status DB::OpenAndCompact(
const std::string& name, const std::string& output_directory,
const std::string& input, std::string* result,
const OpenAndCompactOptions& options, const std::string& name,
const std::string& output_directory, const std::string& input,
std::string* output,
const CompactionServiceOptionsOverride& override_options) {
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
CompactionServiceInput compaction_input;
Status s = CompactionServiceInput::Read(input, &compaction_input);
if (!s.ok()) {
Expand Down Expand Up @@ -849,10 +856,10 @@ Status DB::OpenAndCompact(
CompactionServiceResult compaction_result;
DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
assert(handles.size() > 0);
s = db_secondary->CompactWithoutInstallation(handles[0], compaction_input,
&compaction_result);
s = db_secondary->CompactWithoutInstallation(
options, handles[0], compaction_input, &compaction_result);

Status serialization_status = compaction_result.Write(result);
Status serialization_status = compaction_result.Write(output);

for (auto& handle : handles) {
delete handle;
Expand All @@ -864,6 +871,14 @@ Status DB::OpenAndCompact(
return s;
}

Status DB::OpenAndCompact(
const std::string& name, const std::string& output_directory,
const std::string& input, std::string* output,
const CompactionServiceOptionsOverride& override_options) {
return OpenAndCompact(OpenAndCompactOptions(), name, output_directory, input,
output, override_options);
}

#else // !ROCKSDB_LITE

Status DB::OpenAsSecondary(const Options& /*options*/,
Expand Down
8 changes: 5 additions & 3 deletions db/db_impl/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,11 @@ class DBImplSecondary : public DBImpl {
Status CheckConsistency() override;

#ifndef NDEBUG
Status TEST_CompactWithoutInstallation(ColumnFamilyHandle* cfh,
Status TEST_CompactWithoutInstallation(const OpenAndCompactOptions& options,
ColumnFamilyHandle* cfh,
const CompactionServiceInput& input,
CompactionServiceResult* result) {
return CompactWithoutInstallation(cfh, input, result);
return CompactWithoutInstallation(options, cfh, input, result);
}
#endif // NDEBUG

Expand Down Expand Up @@ -346,7 +347,8 @@ class DBImplSecondary : public DBImpl {
// Run compaction without installation, the output files will be placed in the
// secondary DB path. The LSM tree won't be changed, the secondary DB is still
// in read-only mode.
Status CompactWithoutInstallation(ColumnFamilyHandle* cfh,
Status CompactWithoutInstallation(const OpenAndCompactOptions& options,
ColumnFamilyHandle* cfh,
const CompactionServiceInput& input,
CompactionServiceResult* result);

Expand Down
32 changes: 16 additions & 16 deletions db/db_secondary_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ TEST_F(DBSecondaryTest, SimpleInternalCompaction) {
auto cfh = db_secondary_->DefaultColumnFamily();

CompactionServiceResult result;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input,
&result));
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input, &result));

ASSERT_EQ(result.output_files.size(), 1);
InternalKey smallest, largest;
Expand Down Expand Up @@ -248,8 +248,8 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
OpenSecondary(options);
auto cfh = db_secondary_->DefaultColumnFamily();
CompactionServiceResult result;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input1,
&result));
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input1, &result));
ASSERT_OK(result.status);

// pick 2 files on level 1 for compaction, which has 6 overlap files on L2
Expand All @@ -261,8 +261,8 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
}

input2.output_level = 2;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2,
&result));
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input2, &result));
ASSERT_OK(result.status);

CloseSecondary();
Expand All @@ -273,15 +273,15 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
}
OpenSecondary(options);
cfh = db_secondary_->DefaultColumnFamily();
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2,
&result);
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input2, &result);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);

// TODO: L0 -> L1 compaction should success, currently version is not built
// if files is missing.
// ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh,
// input1, &result));
// ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(OpenAndCompactOptions(),
// cfh, input1, &result));
}

TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) {
Expand Down Expand Up @@ -319,8 +319,8 @@ TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) {
auto cfh = db_secondary_->DefaultColumnFamily();

CompactionServiceResult result;
Status s =
db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result);
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input, &result);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);
}
Expand Down Expand Up @@ -356,15 +356,15 @@ TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) {
auto cfh = db_secondary_->DefaultColumnFamily();

CompactionServiceResult result;
Status s =
db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result);
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input, &result);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);

input.input_files.erase(input.input_files.begin());

ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input,
&result));
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input, &result));
ASSERT_OK(result.status);
}

Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ class DB {
const std::string& input, std::string* output,
const CompactionServiceOptionsOverride& override_options);

static Status OpenAndCompact(
const OpenAndCompactOptions& options, const std::string& name,
const std::string& output_directory, const std::string& input,
std::string* output,
const CompactionServiceOptionsOverride& override_options);

// Experimental and subject to change
// Open DB and trim data newer than specified timestamp.
// The trim_ts specified the user-defined timestamp trim bound.
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1946,6 +1946,11 @@ struct CompactionServiceOptionsOverride {
std::shared_ptr<Statistics> statistics = nullptr;
};

struct OpenAndCompactOptions {
// Allows cancellation of an in-progress compaction.
std::atomic<bool>* canceled = nullptr;
};

#ifndef ROCKSDB_LITE
struct LiveFilesStorageInfoOptions {
// Whether to populate FileStorageInfo::file_checksum* or leave blank
Expand Down

0 comments on commit dc1c90c

Please sign in to comment.