Skip to content

Commit

Permalink
support post write callback (#326)
Browse files Browse the repository at this point in the history
A callback that is called after write succeeds and changes have been applied to memtable.

Titan change: tikv/titan#270

Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie authored Dec 23, 2022
1 parent 8899a36 commit 08aa503
Show file tree
Hide file tree
Showing 19 changed files with 175 additions and 50 deletions.
2 changes: 1 addition & 1 deletion db/db_impl/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class CompactedDBImpl : public DBImpl {
}
using DBImpl::Write;
virtual Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/,
uint64_t* /*seq*/) override {
PostWriteCallback* /*callback*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DBImpl::CompactRange;
Expand Down
16 changes: 10 additions & 6 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ class DBImpl : public DB {

using DB::Write;
virtual Status Write(const WriteOptions& options, WriteBatch* updates,
uint64_t* seq) override;
PostWriteCallback* callback) override;

using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates,
uint64_t* seq) override;
PostWriteCallback* callback) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
Expand Down Expand Up @@ -1322,26 +1322,30 @@ class DBImpl : public DB {
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, uint64_t* seq_used = nullptr,
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);
PreReleaseCallback* pre_release_callback = nullptr,
PostWriteCallback* post_callback = nullptr);

Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
uint64_t* seq_used = nullptr,
PostWriteCallback* post_callback = nullptr);
void MultiBatchWriteCommit(CommitRequest* request);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false,
uint64_t* seq_used = nullptr);
uint64_t* seq_used = nullptr,
PostWriteCallback* post_callback = nullptr);

// Write only to memtables without joining any write queue
Status UnorderedWriteMemtable(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t log_ref, SequenceNumber seq,
const size_t sub_batch_cnt);
const size_t sub_batch_cnt,
PostWriteCallback* post_callback = nullptr);

// Whether the batch requires to be assigned with an order
enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder };
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class DBImplReadOnly : public DBImpl {
}
using DBImpl::Write;
virtual Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/,
uint64_t* /*seq*/) override {
PostWriteCallback* /*callback*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::CompactRange;
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class DBImplSecondary : public DBImpl {

using DBImpl::Write;
Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/,
uint64_t* /*seq*/) override {
PostWriteCallback* /*callback*/) override {
return Status::NotSupported("Not supported operation in secondary mode.");
}

Expand Down
65 changes: 42 additions & 23 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,18 @@ void DBImpl::SetRecoverableStatePreReleaseCallback(
}

Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch,
uint64_t* seq) {
PostWriteCallback* callback) {
return WriteImpl(write_options, my_batch, /*callback=*/nullptr,
/*log_used=*/nullptr, /*log_ref=*/0,
/*disable_memtable=*/false, seq);
/*disable_memtable=*/false, /*seq=*/nullptr, /*batch_cnt=*/0,
/*pre_release_callback=*/nullptr, callback);
}

#ifndef ROCKSDB_LITE
Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback) {
return WriteImpl(write_options, my_batch, callback, nullptr);
return WriteImpl(write_options, my_batch, callback);
}
#endif // ROCKSDB_LITE

Expand All @@ -135,10 +136,11 @@ void DBImpl::MultiBatchWriteCommit(CommitRequest* request) {

Status DBImpl::MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates,
uint64_t* seq) {
PostWriteCallback* callback) {
if (immutable_db_options_.enable_multi_batch_write) {
return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr, 0,
seq);
return MultiBatchWriteImpl(options, std::move(updates),
/*callback=*/nullptr, /*log_used=*/nullptr,
/*log_ref=*/0, /*seq=*/nullptr, callback);
} else {
return Status::NotSupported();
}
Expand Down Expand Up @@ -187,12 +189,14 @@ Status DBImpl::MultiBatchWrite(const WriteOptions& options,
Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
uint64_t log_ref, uint64_t* seq_used,
PostWriteCallback* post_callback) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock,
immutable_db_options_.statistics.get(), DB_WRITE);
WriteThread::Writer writer(write_options, std::move(my_batch), callback,
log_ref, false /*disable_memtable*/);
post_callback, log_ref,
false /*disable_memtable*/);
CommitRequest request(&writer);
writer.request = &request;
write_thread_.JoinBatchGroup(&writer);
Expand Down Expand Up @@ -346,7 +350,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
size_t batch_cnt,
PreReleaseCallback* pre_release_callback) {
PreReleaseCallback* pre_release_callback,
PostWriteCallback* post_callback) {
assert(!seq_per_batch_ || batch_cnt != 0);
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
Expand Down Expand Up @@ -376,6 +381,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (two_write_queues_ && post_callback) {
return Status::NotSupported(
"post write callback is not compatible with concurrent prepares");
}
if (disable_memtable && post_callback) {
return Status::NotSupported(
"post write callback is not compatible with disabling memtable");
}
if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
// TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
return Status::NotSupported(
Expand Down Expand Up @@ -429,8 +442,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
if (!disable_memtable) {
TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
status = UnorderedWriteMemtable(write_options, my_batch, callback,
log_ref, seq, sub_batch_cnt);
status =
UnorderedWriteMemtable(write_options, my_batch, callback, log_ref,
seq, sub_batch_cnt, post_callback);
}
return status;
}
Expand All @@ -439,17 +453,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*> updates(1);
updates[0] = my_batch;
return MultiBatchWriteImpl(write_options, std::move(updates), callback,
log_used, log_ref, seq_used);
log_used, log_ref, seq_used, post_callback);
}

if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
log_ref, disable_memtable, seq_used,
post_callback);
}

PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, batch_cnt, pre_release_callback);
WriteThread::Writer w(write_options, my_batch, callback, post_callback,
log_ref, disable_memtable, batch_cnt,
pre_release_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

write_thread_.JoinBatchGroup(&w);
Expand Down Expand Up @@ -779,14 +795,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) {
bool disable_memtable, uint64_t* seq_used,
PostWriteCallback* post_callback) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

WriteContext write_context;

WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, /*_batch_cnt=*/0,
WriteThread::Writer w(write_options, my_batch, callback, post_callback,
log_ref, disable_memtable, /*_batch_cnt=*/0,
/*_pre_release_callback=*/nullptr);
write_thread_.JoinBatchGroup(&w);
TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup");
Expand Down Expand Up @@ -956,12 +973,13 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback, uint64_t log_ref,
SequenceNumber seq,
const size_t sub_batch_cnt) {
const size_t sub_batch_cnt,
PostWriteCallback* post_callback) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

WriteThread::Writer w(write_options, my_batch, callback, log_ref,
false /*disable_memtable*/);
WriteThread::Writer w(write_options, my_batch, callback, post_callback,
log_ref, false /*disable_memtable*/);

if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
w.sequence = seq;
Expand Down Expand Up @@ -1010,8 +1028,9 @@ Status DBImpl::WriteImplWALOnly(
PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
const PublishLastSeq publish_last_seq, const bool disable_memtable) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, sub_batch_cnt, pre_release_callback);
WriteThread::Writer w(write_options, my_batch, callback,
/*post_callback=*/nullptr, log_ref, disable_memtable,
sub_batch_cnt, pre_release_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

write_thread->JoinBatchGroup(&w);
Expand Down
2 changes: 1 addition & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3022,7 +3022,7 @@ class ModelDB : public DB {

using DB::Write;
Status Write(const WriteOptions& /*options*/, WriteBatch* batch,
uint64_t* /*seq*/) override {
PostWriteCallback* /*callback*/) override {
class Handler : public WriteBatch::Handler {
public:
KVMap* map_;
Expand Down
81 changes: 81 additions & 0 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,87 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
Close();
}

class SimpleCallback : public PostWriteCallback {
std::function<void(SequenceNumber)> f_;

public:
SimpleCallback(std::function<void(SequenceNumber)>&& f) : f_(f) {}

void Callback(SequenceNumber seq) override { f_(seq); }
};

TEST_P(DBWriteTest, PostWriteCallback) {
Options options = GetOptions();
if (options.two_write_queues) {
// Not compatible.
return;
}
Reopen(options);

std::vector<port::Thread> threads;

port::Mutex the_first_can_exit_write_mutex;
the_first_can_exit_write_mutex.Lock();
port::Mutex can_flush_mutex;
can_flush_mutex.Lock();
port::Mutex the_second_can_exit_write_mutex;
the_second_can_exit_write_mutex.Lock();

std::atomic<uint64_t> written(0);
std::atomic<bool> flushed(false);

threads.push_back(port::Thread([&] {
WriteBatch batch;
WriteOptions opts;
opts.sync = false;
opts.disableWAL = true;
SimpleCallback callback([&](SequenceNumber seq) {
ASSERT_NE(seq, 0);
can_flush_mutex.Unlock();
the_first_can_exit_write_mutex.Lock();
the_second_can_exit_write_mutex.Unlock();
});
batch.Put("key", "value");
ASSERT_OK(dbfull()->Write(opts, &batch, &callback));
written.fetch_add(1, std::memory_order_relaxed);
}));
threads.push_back(port::Thread([&] {
WriteBatch batch;
WriteOptions opts;
opts.sync = false;
opts.disableWAL = true;
SimpleCallback callback([&](SequenceNumber seq) {
ASSERT_NE(seq, 0);
the_second_can_exit_write_mutex.Lock();
});
batch.Put("key", "value");
ASSERT_OK(dbfull()->Write(opts, &batch, &callback));
written.fetch_add(1, std::memory_order_relaxed);
}));
// Flush will enter write thread and wait for pending writes.
threads.push_back(port::Thread([&] {
FlushOptions opts;
opts.wait = false;
can_flush_mutex.Lock();
ASSERT_OK(dbfull()->Flush(opts));
can_flush_mutex.Unlock();
flushed.store(true, std::memory_order_relaxed);
}));

std::this_thread::sleep_for(std::chrono::milliseconds{100});
ASSERT_EQ(written.load(std::memory_order_relaxed), 0);
ASSERT_EQ(flushed.load(std::memory_order_relaxed), false);

the_first_can_exit_write_mutex.Unlock();
std::this_thread::sleep_for(std::chrono::milliseconds{10});
ASSERT_EQ(written.load(std::memory_order_relaxed), 2);
ASSERT_EQ(flushed.load(std::memory_order_relaxed), true);

for (auto& t : threads) {
t.join();
}
}

INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
Expand Down
6 changes: 6 additions & 0 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2552,6 +2552,9 @@ Status WriteBatchInternal::InsertInto(
if (!w->status.ok()) {
return w->status;
}
if (w->post_callback) {
w->post_callback->Callback(w->sequence);
}
assert(!seq_per_batch || w->batch_cnt != 0);
assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
}
Expand Down Expand Up @@ -2579,6 +2582,9 @@ Status WriteBatchInternal::InsertInto(
inserter.set_log_number_ref(writer->log_ref);
inserter.set_prot_info(writer->multi_batch.batches[0]->prot_info_.get());
Status s = writer->multi_batch.batches[0]->Iterate(&inserter);
if (writer->post_callback && s.ok()) {
writer->post_callback->Callback(sequence);
}
assert(!seq_per_batch || batch_cnt != 0);
assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
if (concurrent_memtable_writes) {
Expand Down
2 changes: 2 additions & 0 deletions db/write_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,8 @@ void WriteThread::Writer::ConsumeOne(size_t claimed) {
if (!s.ok()) {
std::lock_guard<SpinMutex> guard(this->status_lock);
this->status = s;
} else if (post_callback) {
post_callback->Callback(sequence);
}
multi_batch.pending_wb_cnt.fetch_sub(1, std::memory_order_acq_rel);
}
Expand Down
Loading

0 comments on commit 08aa503

Please sign in to comment.