Skip to content

Commit

Permalink
Implement multi batches write
Browse files Browse the repository at this point in the history
Implement multi batches write

Signed-off-by: v01dstar <[email protected]>

Fix SIGABRT caused by uninitialized mutex (#296) (#298)

* Fix SIGABRT caused by uninitialized mutex

Signed-off-by: Wenbo Zhang <[email protected]>

* Use spinlock instead of mutex to reduce writer ctor cost

Signed-off-by: Wenbo Zhang <[email protected]>

* Update db/write_thread.h

Co-authored-by: Xinye Tao <[email protected]>
Signed-off-by: Wenbo Zhang <[email protected]>

Co-authored-by: Xinye Tao <[email protected]>
Signed-off-by: Wenbo Zhang <[email protected]>

Co-authored-by: Xinye Tao <[email protected]>
  • Loading branch information
v01dstar and tabokie committed May 19, 2024
1 parent 6b53a73 commit 05caaee
Show file tree
Hide file tree
Showing 23 changed files with 802 additions and 84 deletions.
7 changes: 7 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,13 @@ Status ColumnFamilyData::ValidateOptions(
}
}
}

if (db_options.enable_multi_batch_write &&
cf_options.max_successive_merges > 0) {
return Status::NotSupported(
"Multi thread write is only supported with no successive merges");
}

return s;
}

Expand Down
14 changes: 13 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ class DBImpl : public DB {
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;

using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -1473,6 +1477,13 @@ class DBImpl : public DB {
PreReleaseCallback* pre_release_callback = nullptr,
PostMemTableCallback* post_memtable_callback = nullptr);

Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
void MultiBatchWriteCommit(CommitRequest* request);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
Expand Down Expand Up @@ -2003,7 +2014,8 @@ class DBImpl : public DB {
mutex_.Lock();
}

if (!immutable_db_options_.unordered_write) {
if (!immutable_db_options_.unordered_write &&
!immutable_db_options_.enable_multi_batch_write) {
// Then the writes are finished before the next write group starts
return;
}
Expand Down
12 changes: 11 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
result.avoid_flush_during_recovery = false;
}

// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_multi_batch_write = false;
}

if (result.enable_multi_batch_write) {
result.enable_pipelined_write = false;
result.allow_concurrent_memtable_write = true;
}

ImmutableDBOptions immutable_db_options(result);
if (!immutable_db_options.IsWalDirSameAsDBPath()) {
// Either the WAL dir and db_paths[0]/db_name are not the same, or we
Expand Down Expand Up @@ -1289,7 +1299,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
batch_to_use, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, wal_number, this,
&trim_history_scheduler_, true, wal_number, 0, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
MaybeIgnoreError(&status);
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ Status DBImplSecondary::RecoverLogFiles(
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(),
nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
true, log_number, this, false /* concurrent_memtable_writes */,
true, log_number, 0, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
}
// If column family was not found, it might mean that the WAL write
Expand Down
Loading

0 comments on commit 05caaee

Please sign in to comment.