Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support canceling running RemoteCompaction on remote side #9725

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
Expand All @@ -2499,7 +2500,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,
nullptr, nullptr, db_id, db_session_id,
nullptr, manual_compaction_canceled, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(output_path),
compaction_input_(compaction_service_input),
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ class CompactionServiceCompactionJob : private CompactionJob {
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
Expand Down
54 changes: 53 additions & 1 deletion db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ class MyTestCompactionService : public CompactionService {
options_override.listeners = listeners_;
}

OpenAndCompactOptions options;
options.canceled = &canceled_;

Status s = DB::OpenAndCompact(
db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
options, db_path_,
db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
compaction_input, compaction_service_result, options_override);
if (is_override_wait_result_) {
*compaction_service_result = override_wait_result_;
Expand Down Expand Up @@ -118,6 +122,8 @@ class MyTestCompactionService : public CompactionService {
is_override_wait_status_ = false;
}

void SetCanceled(bool canceled) { canceled_ = canceled; }

private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
Expand All @@ -136,6 +142,7 @@ class MyTestCompactionService : public CompactionService {
bool is_override_wait_result_ = false;
std::string override_wait_result_;
std::vector<std::shared_ptr<EventListener>> listeners_;
std::atomic_bool canceled_{false};
};

class CompactionServiceTest : public DBTestBase {
Expand Down Expand Up @@ -331,6 +338,51 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
VerifyTestData();
}

TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();

auto my_cs = GetCompactionService();

std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();

// Test cancel compaction at the beginning
my_cs->SetCanceled(true);
auto s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_TRUE(s.IsIncomplete());
// compaction number is not increased
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
VerifyTestData();

// Test cancel compaction in progress
ReopenWithCompactionService(&options);
GenerateTestData();
my_cs = GetCompactionService();
my_cs->SetCanceled(false);

std::atomic_bool cancel_issued{false};
SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
[&](void* /*arg*/) {
cancel_issued = true;
my_cs->SetCanceled(true);
});

SyncPoint::GetInstance()->EnableProcessing();

s = db_->CompactRange(CompactRangeOptions(), &start, &end);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is coming from a saved background error actually. Maybe assert my_cs became true again after the CompactRange().

Does it still work if you make this CompactRange() in a fresh DB or separate test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I updated the test to reopen and then run CompactRange() again, as the compaction failure is a background failure which is not recoverable.

ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(cancel_issued);
// compaction number is not increased
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
VerifyTestData();
}

TEST_F(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Expand Down
31 changes: 23 additions & 8 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,11 @@ Status DB::OpenAsSecondary(
}

Status DBImplSecondary::CompactWithoutInstallation(
ColumnFamilyHandle* cfh, const CompactionServiceInput& input,
CompactionServiceResult* result) {
const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh,
const CompactionServiceInput& input, CompactionServiceResult* result) {
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
InstrumentedMutexLock l(&mutex_);
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
if (!cfd) {
Expand Down Expand Up @@ -774,7 +777,7 @@ Status DBImplSecondary::CompactWithoutInstallation(
file_options_for_compaction_, versions_.get(), &shutting_down_,
&log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
db_id_, db_session_id_, secondary_path_, input, result);
options.canceled, db_id_, db_session_id_, secondary_path_, input, result);

mutex_.Unlock();
s = compaction_job.Run();
Expand All @@ -793,9 +796,13 @@ Status DBImplSecondary::CompactWithoutInstallation(
}

Status DB::OpenAndCompact(
const std::string& name, const std::string& output_directory,
const std::string& input, std::string* result,
const OpenAndCompactOptions& options, const std::string& name,
const std::string& output_directory, const std::string& input,
std::string* output,
const CompactionServiceOptionsOverride& override_options) {
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
CompactionServiceInput compaction_input;
Status s = CompactionServiceInput::Read(input, &compaction_input);
if (!s.ok()) {
Expand Down Expand Up @@ -849,10 +856,10 @@ Status DB::OpenAndCompact(
CompactionServiceResult compaction_result;
DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
assert(handles.size() > 0);
s = db_secondary->CompactWithoutInstallation(handles[0], compaction_input,
&compaction_result);
s = db_secondary->CompactWithoutInstallation(
options, handles[0], compaction_input, &compaction_result);

Status serialization_status = compaction_result.Write(result);
Status serialization_status = compaction_result.Write(output);

for (auto& handle : handles) {
delete handle;
Expand All @@ -864,6 +871,14 @@ Status DB::OpenAndCompact(
return s;
}

Status DB::OpenAndCompact(
const std::string& name, const std::string& output_directory,
const std::string& input, std::string* output,
const CompactionServiceOptionsOverride& override_options) {
return OpenAndCompact(OpenAndCompactOptions(), name, output_directory, input,
output, override_options);
}

#else // !ROCKSDB_LITE

Status DB::OpenAsSecondary(const Options& /*options*/,
Expand Down
8 changes: 5 additions & 3 deletions db/db_impl/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,11 @@ class DBImplSecondary : public DBImpl {
Status CheckConsistency() override;

#ifndef NDEBUG
Status TEST_CompactWithoutInstallation(ColumnFamilyHandle* cfh,
Status TEST_CompactWithoutInstallation(const OpenAndCompactOptions& options,
ColumnFamilyHandle* cfh,
const CompactionServiceInput& input,
CompactionServiceResult* result) {
return CompactWithoutInstallation(cfh, input, result);
return CompactWithoutInstallation(options, cfh, input, result);
}
#endif // NDEBUG

Expand Down Expand Up @@ -346,7 +347,8 @@ class DBImplSecondary : public DBImpl {
// Run compaction without installation, the output files will be placed in the
// secondary DB path. The LSM tree won't be changed, the secondary DB is still
// in read-only mode.
Status CompactWithoutInstallation(ColumnFamilyHandle* cfh,
Status CompactWithoutInstallation(const OpenAndCompactOptions& options,
ColumnFamilyHandle* cfh,
const CompactionServiceInput& input,
CompactionServiceResult* result);

Expand Down
32 changes: 16 additions & 16 deletions db/db_secondary_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ TEST_F(DBSecondaryTest, SimpleInternalCompaction) {
auto cfh = db_secondary_->DefaultColumnFamily();

CompactionServiceResult result;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input,
&result));
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input, &result));

ASSERT_EQ(result.output_files.size(), 1);
InternalKey smallest, largest;
Expand Down Expand Up @@ -248,8 +248,8 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
OpenSecondary(options);
auto cfh = db_secondary_->DefaultColumnFamily();
CompactionServiceResult result;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input1,
&result));
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input1, &result));
ASSERT_OK(result.status);

// pick 2 files on level 1 for compaction, which has 6 overlap files on L2
Expand All @@ -261,8 +261,8 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
}

input2.output_level = 2;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2,
&result));
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input2, &result));
ASSERT_OK(result.status);

CloseSecondary();
Expand All @@ -273,15 +273,15 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
}
OpenSecondary(options);
cfh = db_secondary_->DefaultColumnFamily();
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2,
&result);
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input2, &result);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);

// TODO: L0 -> L1 compaction should success, currently version is not built
// if files is missing.
// ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh,
// input1, &result));
// ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(OpenAndCompactOptions(),
// cfh, input1, &result));
}

TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) {
Expand Down Expand Up @@ -319,8 +319,8 @@ TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) {
auto cfh = db_secondary_->DefaultColumnFamily();

CompactionServiceResult result;
Status s =
db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result);
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input, &result);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);
}
Expand Down Expand Up @@ -356,15 +356,15 @@ TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) {
auto cfh = db_secondary_->DefaultColumnFamily();

CompactionServiceResult result;
Status s =
db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result);
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input, &result);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);

input.input_files.erase(input.input_files.begin());

ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input,
&result));
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(
OpenAndCompactOptions(), cfh, input, &result));
ASSERT_OK(result.status);
}

Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ class DB {
const std::string& input, std::string* output,
const CompactionServiceOptionsOverride& override_options);

static Status OpenAndCompact(
const OpenAndCompactOptions& options, const std::string& name,
const std::string& output_directory, const std::string& input,
std::string* output,
const CompactionServiceOptionsOverride& override_options);

// Experimental and subject to change
// Open DB and trim data newer than specified timestamp.
// The trim_ts specified the user-defined timestamp trim bound.
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1946,6 +1946,11 @@ struct CompactionServiceOptionsOverride {
std::shared_ptr<Statistics> statistics = nullptr;
};

struct OpenAndCompactOptions {
// Allows cancellation of an in-progress compaction.
std::atomic<bool>* canceled = nullptr;
};

#ifndef ROCKSDB_LITE
struct LiveFilesStorageInfoOptions {
// Whether to populate FileStorageInfo::file_checksum* or leave blank
Expand Down