From 08aa50323af00ae4e4681aab34973172e81045ad Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Fri, 23 Dec 2022 09:20:00 +0800 Subject: [PATCH] support post write callback (#326) A callback that is called after write succeeds and changes have been applied to memtable. Titan change: tikv/titan#270 Signed-off-by: tabokie --- db/db_impl/compacted_db_impl.h | 2 +- db/db_impl/db_impl.h | 16 ++-- db/db_impl/db_impl_readonly.h | 2 +- db/db_impl/db_impl_secondary.h | 2 +- db/db_impl/db_impl_write.cc | 65 +++++++++------ db/db_test.cc | 2 +- db/db_write_test.cc | 81 +++++++++++++++++++ db/write_batch.cc | 6 ++ db/write_thread.cc | 2 + db/write_thread.h | 11 ++- include/rocksdb/db.h | 12 ++- include/rocksdb/statistics.h | 2 +- include/rocksdb/utilities/stackable_db.h | 4 +- utilities/blob_db/blob_db.h | 2 +- utilities/blob_db/blob_db_impl.cc | 4 +- utilities/blob_db/blob_db_impl.h | 2 +- .../optimistic_transaction_db_impl.h | 4 +- utilities/ttl/db_ttl_impl.cc | 4 +- utilities/ttl/db_ttl_impl.h | 2 +- 19 files changed, 175 insertions(+), 50 deletions(-) diff --git a/db/db_impl/compacted_db_impl.h b/db/db_impl/compacted_db_impl.h index 9afea4e4b8c..01ac99a50a9 100644 --- a/db/db_impl/compacted_db_impl.h +++ b/db/db_impl/compacted_db_impl.h @@ -55,7 +55,7 @@ class CompactedDBImpl : public DBImpl { } using DBImpl::Write; virtual Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/, - uint64_t* /*seq*/) override { + PostWriteCallback* /*callback*/) override { return Status::NotSupported("Not supported in compacted db mode."); } using DBImpl::CompactRange; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 09f7d36362e..b9187f79a0e 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -178,12 +178,12 @@ class DBImpl : public DB { using DB::Write; virtual Status Write(const WriteOptions& options, WriteBatch* updates, - uint64_t* seq) override; + PostWriteCallback* callback) override; using DB::MultiBatchWrite; virtual Status MultiBatchWrite(const WriteOptions& options, std::vector&& updates, - uint64_t* seq) override; + PostWriteCallback* callback) override; using DB::Get; virtual Status Get(const ReadOptions& options, @@ -1322,26 +1322,30 @@ class DBImpl : public DB { uint64_t* log_used = nullptr, uint64_t log_ref = 0, bool disable_memtable = false, uint64_t* seq_used = nullptr, size_t batch_cnt = 0, - PreReleaseCallback* pre_release_callback = nullptr); + PreReleaseCallback* pre_release_callback = nullptr, + PostWriteCallback* post_callback = nullptr); Status MultiBatchWriteImpl(const WriteOptions& write_options, std::vector&& my_batch, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, - uint64_t* seq_used = nullptr); + uint64_t* seq_used = nullptr, + PostWriteCallback* post_callback = nullptr); void MultiBatchWriteCommit(CommitRequest* request); Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, bool disable_memtable = false, - uint64_t* seq_used = nullptr); + uint64_t* seq_used = nullptr, + PostWriteCallback* post_callback = nullptr); // Write only to memtables without joining any write queue Status UnorderedWriteMemtable(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t log_ref, SequenceNumber seq, - const size_t sub_batch_cnt); + const size_t sub_batch_cnt, + PostWriteCallback* post_callback = nullptr); // Whether the batch requires to be assigned with an order enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder }; diff --git a/db/db_impl/db_impl_readonly.h b/db/db_impl/db_impl_readonly.h index 5f077b856d3..2b8e57737cf 100644 --- a/db/db_impl/db_impl_readonly.h +++ b/db/db_impl/db_impl_readonly.h @@ -65,7 +65,7 @@ class DBImplReadOnly : public DBImpl { } using DBImpl::Write; virtual Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/, - uint64_t* /*seq*/) override { + PostWriteCallback* /*callback*/) override { return Status::NotSupported("Not supported operation in read only mode."); } using DBImpl::CompactRange; diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 7aebcc1161e..c6413287c2b 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -137,7 +137,7 @@ class DBImplSecondary : public DBImpl { using DBImpl::Write; Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/, - uint64_t* /*seq*/) override { + PostWriteCallback* /*callback*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 08d6b584c62..cdf5dcc998b 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -106,17 +106,18 @@ void DBImpl::SetRecoverableStatePreReleaseCallback( } Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch, - uint64_t* seq) { + PostWriteCallback* callback) { return WriteImpl(write_options, my_batch, /*callback=*/nullptr, /*log_used=*/nullptr, /*log_ref=*/0, - /*disable_memtable=*/false, seq); + /*disable_memtable=*/false, /*seq=*/nullptr, /*batch_cnt=*/0, + /*pre_release_callback=*/nullptr, callback); } #ifndef ROCKSDB_LITE Status DBImpl::WriteWithCallback(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback) { - return WriteImpl(write_options, my_batch, callback, nullptr); + return WriteImpl(write_options, my_batch, callback); } #endif // ROCKSDB_LITE @@ -135,10 +136,11 @@ void DBImpl::MultiBatchWriteCommit(CommitRequest* request) { Status DBImpl::MultiBatchWrite(const WriteOptions& options, std::vector&& updates, - uint64_t* seq) { + PostWriteCallback* callback) { if (immutable_db_options_.enable_multi_batch_write) { - return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr, 0, - seq); + return MultiBatchWriteImpl(options, std::move(updates), + /*callback=*/nullptr, /*log_used=*/nullptr, + /*log_ref=*/0, /*seq=*/nullptr, callback); } else { return Status::NotSupported(); } @@ -187,12 +189,14 @@ Status DBImpl::MultiBatchWrite(const WriteOptions& options, Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, std::vector&& my_batch, WriteCallback* callback, uint64_t* log_used, - uint64_t log_ref, uint64_t* seq_used) { + uint64_t log_ref, uint64_t* seq_used, + PostWriteCallback* post_callback) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(immutable_db_options_.clock, immutable_db_options_.statistics.get(), DB_WRITE); WriteThread::Writer writer(write_options, std::move(my_batch), callback, - log_ref, false /*disable_memtable*/); + post_callback, log_ref, + false /*disable_memtable*/); CommitRequest request(&writer); writer.request = &request; write_thread_.JoinBatchGroup(&writer); @@ -346,7 +350,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, uint64_t* log_used, uint64_t log_ref, bool disable_memtable, uint64_t* seq_used, size_t batch_cnt, - PreReleaseCallback* pre_release_callback) { + PreReleaseCallback* pre_release_callback, + PostWriteCallback* post_callback) { assert(!seq_per_batch_ || batch_cnt != 0); if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); @@ -376,6 +381,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "pipelined_writes is not compatible with concurrent prepares"); } + if (two_write_queues_ && post_callback) { + return Status::NotSupported( + "post write callback is not compatible with concurrent prepares"); + } + if (disable_memtable && post_callback) { + return Status::NotSupported( + "post write callback is not compatible with disabling memtable"); + } if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt return Status::NotSupported( @@ -429,8 +442,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (!disable_memtable) { TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"); - status = UnorderedWriteMemtable(write_options, my_batch, callback, - log_ref, seq, sub_batch_cnt); + status = + UnorderedWriteMemtable(write_options, my_batch, callback, log_ref, + seq, sub_batch_cnt, post_callback); } return status; } @@ -439,17 +453,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, std::vector updates(1); updates[0] = my_batch; return MultiBatchWriteImpl(write_options, std::move(updates), callback, - log_used, log_ref, seq_used); + log_used, log_ref, seq_used, post_callback); } if (immutable_db_options_.enable_pipelined_write) { return PipelinedWriteImpl(write_options, my_batch, callback, log_used, - log_ref, disable_memtable, seq_used); + log_ref, disable_memtable, seq_used, + post_callback); } PERF_TIMER_GUARD(write_pre_and_post_process_time); - WriteThread::Writer w(write_options, my_batch, callback, log_ref, - disable_memtable, batch_cnt, pre_release_callback); + WriteThread::Writer w(write_options, my_batch, callback, post_callback, + log_ref, disable_memtable, batch_cnt, + pre_release_callback); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); write_thread_.JoinBatchGroup(&w); @@ -779,14 +795,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, - bool disable_memtable, uint64_t* seq_used) { + bool disable_memtable, uint64_t* seq_used, + PostWriteCallback* post_callback) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); WriteContext write_context; - WriteThread::Writer w(write_options, my_batch, callback, log_ref, - disable_memtable, /*_batch_cnt=*/0, + WriteThread::Writer w(write_options, my_batch, callback, post_callback, + log_ref, disable_memtable, /*_batch_cnt=*/0, /*_pre_release_callback=*/nullptr); write_thread_.JoinBatchGroup(&w); TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"); @@ -956,12 +973,13 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t log_ref, SequenceNumber seq, - const size_t sub_batch_cnt) { + const size_t sub_batch_cnt, + PostWriteCallback* post_callback) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); - WriteThread::Writer w(write_options, my_batch, callback, log_ref, - false /*disable_memtable*/); + WriteThread::Writer w(write_options, my_batch, callback, post_callback, + log_ref, false /*disable_memtable*/); if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) { w.sequence = seq; @@ -1010,8 +1028,9 @@ Status DBImpl::WriteImplWALOnly( PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, const PublishLastSeq publish_last_seq, const bool disable_memtable) { PERF_TIMER_GUARD(write_pre_and_post_process_time); - WriteThread::Writer w(write_options, my_batch, callback, log_ref, - disable_memtable, sub_batch_cnt, pre_release_callback); + WriteThread::Writer w(write_options, my_batch, callback, + /*post_callback=*/nullptr, log_ref, disable_memtable, + sub_batch_cnt, pre_release_callback); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); write_thread->JoinBatchGroup(&w); diff --git a/db/db_test.cc b/db/db_test.cc index e42704a84c7..f2373ad1459 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3022,7 +3022,7 @@ class ModelDB : public DB { using DB::Write; Status Write(const WriteOptions& /*options*/, WriteBatch* batch, - uint64_t* /*seq*/) override { + PostWriteCallback* /*callback*/) override { class Handler : public WriteBatch::Handler { public: KVMap* map_; diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 3f53fe597e5..d126aacf9c4 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -563,6 +563,87 @@ TEST_P(DBWriteTest, MultiThreadWrite) { Close(); } +class SimpleCallback : public PostWriteCallback { + std::function f_; + + public: + SimpleCallback(std::function&& f) : f_(f) {} + + void Callback(SequenceNumber seq) override { f_(seq); } +}; + +TEST_P(DBWriteTest, PostWriteCallback) { + Options options = GetOptions(); + if (options.two_write_queues) { + // Not compatible. + return; + } + Reopen(options); + + std::vector threads; + + port::Mutex the_first_can_exit_write_mutex; + the_first_can_exit_write_mutex.Lock(); + port::Mutex can_flush_mutex; + can_flush_mutex.Lock(); + port::Mutex the_second_can_exit_write_mutex; + the_second_can_exit_write_mutex.Lock(); + + std::atomic written(0); + std::atomic flushed(false); + + threads.push_back(port::Thread([&] { + WriteBatch batch; + WriteOptions opts; + opts.sync = false; + opts.disableWAL = true; + SimpleCallback callback([&](SequenceNumber seq) { + ASSERT_NE(seq, 0); + can_flush_mutex.Unlock(); + the_first_can_exit_write_mutex.Lock(); + the_second_can_exit_write_mutex.Unlock(); + }); + batch.Put("key", "value"); + ASSERT_OK(dbfull()->Write(opts, &batch, &callback)); + written.fetch_add(1, std::memory_order_relaxed); + })); + threads.push_back(port::Thread([&] { + WriteBatch batch; + WriteOptions opts; + opts.sync = false; + opts.disableWAL = true; + SimpleCallback callback([&](SequenceNumber seq) { + ASSERT_NE(seq, 0); + the_second_can_exit_write_mutex.Lock(); + }); + batch.Put("key", "value"); + ASSERT_OK(dbfull()->Write(opts, &batch, &callback)); + written.fetch_add(1, std::memory_order_relaxed); + })); + // Flush will enter write thread and wait for pending writes. + threads.push_back(port::Thread([&] { + FlushOptions opts; + opts.wait = false; + can_flush_mutex.Lock(); + ASSERT_OK(dbfull()->Flush(opts)); + can_flush_mutex.Unlock(); + flushed.store(true, std::memory_order_relaxed); + })); + + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + ASSERT_EQ(written.load(std::memory_order_relaxed), 0); + ASSERT_EQ(flushed.load(std::memory_order_relaxed), false); + + the_first_can_exit_write_mutex.Unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + ASSERT_EQ(written.load(std::memory_order_relaxed), 2); + ASSERT_EQ(flushed.load(std::memory_order_relaxed), true); + + for (auto& t : threads) { + t.join(); + } +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, diff --git a/db/write_batch.cc b/db/write_batch.cc index e8987030c4a..82c3b4436f4 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -2552,6 +2552,9 @@ Status WriteBatchInternal::InsertInto( if (!w->status.ok()) { return w->status; } + if (w->post_callback) { + w->post_callback->Callback(w->sequence); + } assert(!seq_per_batch || w->batch_cnt != 0); assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt); } @@ -2579,6 +2582,9 @@ Status WriteBatchInternal::InsertInto( inserter.set_log_number_ref(writer->log_ref); inserter.set_prot_info(writer->multi_batch.batches[0]->prot_info_.get()); Status s = writer->multi_batch.batches[0]->Iterate(&inserter); + if (writer->post_callback && s.ok()) { + writer->post_callback->Callback(sequence); + } assert(!seq_per_batch || batch_cnt != 0); assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt); if (concurrent_memtable_writes) { diff --git a/db/write_thread.cc b/db/write_thread.cc index 77ecaeb5a00..18161e09552 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -863,6 +863,8 @@ void WriteThread::Writer::ConsumeOne(size_t claimed) { if (!s.ok()) { std::lock_guard guard(this->status_lock); this->status = s; + } else if (post_callback) { + post_callback->Callback(sequence); } multi_batch.pending_wb_cnt.fetch_sub(1, std::memory_order_acq_rel); } diff --git a/db/write_thread.h b/db/write_thread.h index 596415814fa..db764ccfc2a 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -186,6 +186,7 @@ class WriteThread { uint64_t log_used; // log number that this batch was inserted into uint64_t log_ref; // log number that memtable insert should reference WriteCallback* callback; + PostWriteCallback* post_callback; bool made_waitable; // records lazy construction of mutex and cv std::atomic state; // write under StateMutex() or pre-link WriteGroup* write_group; @@ -213,6 +214,7 @@ class WriteThread { log_used(0), log_ref(0), callback(nullptr), + post_callback(nullptr), made_waitable(false), state(STATE_INIT), write_group(nullptr), @@ -222,8 +224,8 @@ class WriteThread { link_newer(nullptr) {} Writer(const WriteOptions& write_options, WriteBatch* _batch, - WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable, - size_t _batch_cnt = 0, + WriteCallback* _callback, PostWriteCallback* _post_callback, + uint64_t _log_ref, bool _disable_memtable, size_t _batch_cnt = 0, PreReleaseCallback* _pre_release_callback = nullptr) : sync(write_options.sync), no_slowdown(write_options.no_slowdown), @@ -235,6 +237,7 @@ class WriteThread { log_used(0), log_ref(_log_ref), callback(_callback), + post_callback(_post_callback), made_waitable(false), state(STATE_INIT), write_group(nullptr), @@ -247,7 +250,8 @@ class WriteThread { } Writer(const WriteOptions& write_options, std::vector&& _batch, - WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable, + WriteCallback* _callback, PostWriteCallback* _post_callback, + uint64_t _log_ref, bool _disable_memtable, PreReleaseCallback* _pre_release_callback = nullptr) : sync(write_options.sync), no_slowdown(write_options.no_slowdown), @@ -258,6 +262,7 @@ class WriteThread { log_used(0), log_ref(_log_ref), callback(_callback), + post_callback(_post_callback), made_waitable(false), state(STATE_INIT), write_group(nullptr), diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 89bdbb89ec4..ddd29763086 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -144,6 +144,14 @@ struct GetMergeOperandsOptions { using TablePropertiesCollection = std::unordered_map>; +class PostWriteCallback { + public: + virtual ~PostWriteCallback() {} + + // Will be called while on the write thread after the write executes. + virtual void Callback(SequenceNumber seq) = 0; +}; + // A DB is a persistent, versioned ordered map from keys to values. // A DB is safe for concurrent access from multiple threads without // any external synchronization. @@ -472,14 +480,14 @@ class DB { // Returns OK on success, non-OK on failure. // Note: consider setting options.sync = true. virtual Status Write(const WriteOptions& options, WriteBatch* updates, - uint64_t* seq) = 0; + PostWriteCallback* callback) = 0; virtual Status Write(const WriteOptions& options, WriteBatch* updates) { return Write(options, updates, nullptr); } virtual Status MultiBatchWrite(const WriteOptions& /*options*/, std::vector&& /*updates*/, - uint64_t* /*seq*/) { + PostWriteCallback* /*callback*/) { return Status::NotSupported(); } diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 36cc6d560b1..17749610f35 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -425,7 +425,7 @@ enum Tickers : uint32_t { WARM_FILE_READ_COUNT, COLD_FILE_READ_COUNT, - TICKER_ENUM_MAX + TICKER_ENUM_MAX, }; // The order of items listed in Tickers should be the same as diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 921433ff7cd..935f346e9b6 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -203,8 +203,8 @@ class StackableDB : public DB { using DB::Write; virtual Status Write(const WriteOptions& opts, WriteBatch* updates, - uint64_t* seq) override { - return db_->Write(opts, updates, seq); + PostWriteCallback* callback) override { + return db_->Write(opts, updates, callback); } using DB::NewIterator; diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 2db6b2a4f64..c7d1f0a06a3 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -200,7 +200,7 @@ class BlobDB : public StackableDB { using rocksdb::StackableDB::Write; virtual Status Write(const WriteOptions& opts, WriteBatch* updates, - uint64_t* seq) override = 0; + PostWriteCallback* callback) override = 0; using ROCKSDB_NAMESPACE::StackableDB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& options) override = 0; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 63ff8e46f32..43589d33a18 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -998,7 +998,7 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler { }; Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates, - uint64_t* seq) { + PostWriteCallback* callback) { StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS); RecordTick(statistics_, BLOB_DB_NUM_WRITE); uint32_t default_cf_id = @@ -1016,7 +1016,7 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates, if (!s.ok()) { return s; } - return db_->Write(options, blob_inserter.batch(), seq); + return db_->Write(options, blob_inserter.batch(), callback); } Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key, diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index eabd3a83a0c..c91f3c6ad0c 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -130,7 +130,7 @@ class BlobDBImpl : public BlobDB { using BlobDB::Write; virtual Status Write(const WriteOptions& opts, WriteBatch* updates, - uint64_t* seq) override; + PostWriteCallback* callback) override; virtual Status Close() override; diff --git a/utilities/transactions/optimistic_transaction_db_impl.h b/utilities/transactions/optimistic_transaction_db_impl.h index ed652e8f7ed..5927ed2173a 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.h +++ b/utilities/transactions/optimistic_transaction_db_impl.h @@ -57,11 +57,11 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB { // incompatible with `OptimisticTransactionDB`. using OptimisticTransactionDB::Write; virtual Status Write(const WriteOptions& write_opts, WriteBatch* batch, - uint64_t* seq) override { + PostWriteCallback* callback) override { if (batch->HasDeleteRange()) { return Status::NotSupported(); } - return OptimisticTransactionDB::Write(write_opts, batch, seq); + return OptimisticTransactionDB::Write(write_opts, batch, callback); } size_t GetLockBucketsSize() const { return bucketed_locks_.size(); } diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc index 6852b5cc352..ec54e7c2d7f 100644 --- a/utilities/ttl/db_ttl_impl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -556,7 +556,7 @@ Status DBWithTTLImpl::Merge(const WriteOptions& options, } Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates, - uint64_t* seq) { + PostWriteCallback* callback) { class Handler : public WriteBatch::Handler { public: explicit Handler(SystemClock* clock) : clock_(clock) {} @@ -599,7 +599,7 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates, if (!st.ok()) { return st; } else { - return db_->Write(opts, &(handler.updates_ttl), seq); + return db_->Write(opts, &(handler.updates_ttl), callback); } } diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index 17a3ddae46d..f003de3740f 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -79,7 +79,7 @@ class DBWithTTLImpl : public DBWithTTL { using StackableDB::Write; virtual Status Write(const WriteOptions& opts, WriteBatch* updates, - uint64_t* seq) override; + PostWriteCallback* callback) override; using StackableDB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& opts,