Skip to content

Commit

Permalink
[enhancement](memtable) use shared ptr for flush token since it is sh…
Browse files Browse the repository at this point in the history
…ared between memtable write thread and flush thread (apache#38023)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->

---------

Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras authored Jul 18, 2024
1 parent 1e09c39 commit eaabc2f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
30 changes: 19 additions & 11 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOU
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");

class MemtableFlushTask final : public Runnable {
ENABLE_FACTORY_CREATOR(MemtableFlushTask);

public:
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable,
MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::unique_ptr<MemTable> memtable,
int32_t segment_id, int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(std::move(memtable)),
Expand All @@ -56,11 +58,16 @@ class MemtableFlushTask final : public Runnable {
~MemtableFlushTask() override { g_flush_task_num << -1; }

void run() override {
_flush_token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time);
auto token = _flush_token.lock();
if (token) {
token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time);
} else {
LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
}
}

private:
FlushToken* _flush_token;
std::weak_ptr<FlushToken> _flush_token;
std::unique_ptr<MemTable> _memtable;
int32_t _segment_id;
int64_t _submit_task_time;
Expand Down Expand Up @@ -91,8 +98,9 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
return Status::OK();
}
int64_t submit_task_time = MonotonicNanos();
auto task = std::make_shared<MemtableFlushTask>(
this, std::move(mem_table), _rowset_writer->allocate_segment_id(), submit_task_time);
auto task = MemtableFlushTask::create_shared(shared_from_this(), std::move(mem_table),
_rowset_writer->allocate_segment_id(),
submit_task_time);
Status ret = _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no need to notify _cond here
Expand Down Expand Up @@ -224,8 +232,8 @@ void MemTableFlushExecutor::init(int num_disk) {
}

// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token,
RowsetWriter* rowset_writer,
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
bool is_high_priority) {
switch (rowset_writer->type()) {
case ALPHA_ROWSET:
Expand All @@ -234,7 +242,7 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
case BETA_ROWSET: {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
flush_token = std::make_unique<FlushToken>(pool);
flush_token = FlushToken::create_shared(pool);
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
Expand All @@ -243,11 +251,11 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
}
}

Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token,
RowsetWriter* rowset_writer,
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
ThreadPool* wg_flush_pool_ptr) {
if (rowset_writer->type() == BETA_ROWSET) {
flush_token = std::make_unique<FlushToken>(wg_flush_pool_ptr);
flush_token = FlushToken::create_shared(wg_flush_pool_ptr);
} else {
return Status::InternalError<false>("not support alpha rowset load now.");
}
Expand Down
20 changes: 12 additions & 8 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
// 1. Immediately disallow submission of any subsequent memtable
// 2. For the memtables that have already been submitted, there is no need to flush,
// because the entire job will definitely fail;
class FlushToken {
class FlushToken : public std::enable_shared_from_this<FlushToken> {
ENABLE_FACTORY_CREATOR(FlushToken);

public:
explicit FlushToken(ThreadPool* thread_pool)
: _flush_status(Status::OK()), _thread_pool(thread_pool) {}
FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {}

Status submit(std::unique_ptr<MemTable> mem_table);

Expand All @@ -72,7 +73,9 @@ class FlushToken {
// get flush operations' statistics
const FlushStatistic& get_stats() const { return _stats; }

void set_rowset_writer(RowsetWriter* rowset_writer) { _rowset_writer = rowset_writer; }
void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) {
_rowset_writer = rowset_writer;
}

const MemTableStat& memtable_stat() { return _memtable_stat; }

Expand All @@ -96,7 +99,7 @@ class FlushToken {

FlushStatistic _stats;

RowsetWriter* _rowset_writer = nullptr;
std::shared_ptr<RowsetWriter> _rowset_writer = nullptr;

MemTableStat _memtable_stat;

Expand Down Expand Up @@ -129,10 +132,11 @@ class MemTableFlushExecutor {
// because it needs path hash of each data dir.
void init(int num_disk);

Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer,
bool is_high_priority);
Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority);

Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer,
Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
ThreadPool* wg_flush_pool_ptr);

private:
Expand Down
20 changes: 10 additions & 10 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
// by assigning segment_id to memtable before submiting to flush executor,
// we can make sure same keys sort in the same order in all replicas.
if (wg_flush_pool_ptr) {
RETURN_IF_ERROR(ExecEnv::GetInstance()
->storage_engine()
.memtable_flush_executor()
->create_flush_token(_flush_token, _rowset_writer.get(),
wg_flush_pool_ptr));
RETURN_IF_ERROR(
ExecEnv::GetInstance()
->storage_engine()
.memtable_flush_executor()
->create_flush_token(_flush_token, _rowset_writer, wg_flush_pool_ptr));
} else {
RETURN_IF_ERROR(ExecEnv::GetInstance()
->storage_engine()
.memtable_flush_executor()
->create_flush_token(_flush_token, _rowset_writer.get(),
_req.is_high_priority));
RETURN_IF_ERROR(
ExecEnv::GetInstance()
->storage_engine()
.memtable_flush_executor()
->create_flush_token(_flush_token, _rowset_writer, _req.is_high_priority));
}

_is_init = true;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ class MemTableWriter {
TabletSchemaSPtr _tablet_schema;
bool _unique_key_mow = false;

std::unique_ptr<FlushToken> _flush_token;
// This variable is accessed from writer thread and token flush thread
// use a shared ptr to avoid use after free problem.
std::shared_ptr<FlushToken> _flush_token;
std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
SpinLock _mem_table_tracker_lock;
Expand Down

0 comments on commit eaabc2f

Please sign in to comment.