Skip to content

Commit

Permalink
[fix](move-memtable) fix move memtable core when use multi table load (
Browse files Browse the repository at this point in the history
…apache#35458)

## Proposed changes

Move memtable core when use multi table load:
```
0x51f000c73860 is located 3040 bytes inside of 3456-byte region [0x51f000c72c80,0x51f000c73a00)
freed by thread T4867 (FragmentMgrThre) here:
    #0 0x558f6ad7f43d in operator delete(void*) (/mnt/hdd01/STRESS_ENV/be/lib/doris_be+0x22eec43d) (BuildId: b46f73d1f76dfcd6)
    #1 0x558f6e6cea2c in std::__new_allocator<doris::PTabletID>::deallocate(doris::PTabletID*, unsigned long) /mnt/disk2/xujianxu/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/new_allocator.h:168:2
    #2 0x558f6e6ce9e7 in std::allocator<doris::PTabletID>::deallocate(doris::PTabletID*, unsigned long) /mnt/disk2/xujianxu/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/allocator.h:210:25
    #3 0x558f6e6ce9e7 in std::allocator_traits<std::allocator<doris::PTabletID>>::deallocate(std::allocator<doris::PTabletID>&, doris::PTabletID*, unsigned long) /mnt/disk2/xujianxu/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/alloc_traits.h:516:13
    #4 0x558f6e6ce9e7 in std::_Vector_base<doris::PTabletID, std::allocator<doris::PTabletID>>::_M_deallocate(doris::PTabletID*, unsigned long) /mnt/disk2/xujianxu/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/stl_vector.h:387:4
    #5 0x558f6e6d0780 in void std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>::_M_range_insert<__gnu_cxx::__normal_iterator<doris::PTabletID const*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>>>(__gnu_cxx::__normal_iterator<doris::PTabletID*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>>, __gnu_cxx::__normal_iterator<doris::PTabletID const*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>>, __gnu_cxx::__normal_iterator<doris::PTabletID const*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>>, std::forward_iterator_tag) /mnt/disk2/xujianxu/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/vector.tcc:832:3
    #6 0x558f6e6c54c5 in __gnu_cxx::__normal_iterator<doris::PTabletID*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>> std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>::insert<__gnu_cxx::__normal_iterator<doris::PTabletID const*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>>, void>(__gnu_cxx::__normal_iterator<doris::PTabletID const*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>>, __gnu_cxx::__normal_iterator<doris::PTabletID const*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>>, __gnu_cxx::__normal_iterator<doris::PTabletID const*, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>>>) /mnt/disk2/xujianxu/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/stl_vector.h:1483:4
    apache#7 0x558f9b4b214f in doris::LoadStreamMap::save_tablets_to_commit(long, std::vector<doris::PTabletID, std::allocator<doris::PTabletID>> const&) /mnt/disk2/xujianxu/doris/be/src/vec/sink/load_stream_map_pool.cpp:90:13
    apache#8 0x558f9b7258dd in doris::vectorized::VTabletWriterV2::_calc_tablets_to_commit() /mnt/disk2/xujianxu/doris/be/src/vec/sink/writer/vtablet_writer_v2.cpp:650:27
    apache#9 0x558f9b7229f1 in doris::vectorized::VTabletWriterV2::close(doris::Status) /mnt/disk2/xujianxu/doris/be/src/vec/sink/writer/vtablet_writer_v2.cpp:547:9
```

Multiple sinks with different table loads use the load id, causing
confusion in the use of shared data structures between sinks.
  • Loading branch information
sollhui authored and dataroaring committed Jun 7, 2024
1 parent fbb32e7 commit eb68dd6
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 151 deletions.
20 changes: 0 additions & 20 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,26 +214,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
*file_reader = stream_load_ctx->pipe;
}

if (file_reader->get() == nullptr) {
return Status::OK();
}

auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
if (multi_table_pipe == nullptr || runtime_state == nullptr) {
return Status::OK();
}

TUniqueId pipe_id;
if (runtime_state->enable_pipeline_x_exec()) {
pipe_id = io::StreamLoadPipe::calculate_pipe_id(runtime_state->query_id(),
runtime_state->fragment_id());
} else {
pipe_id = runtime_state->fragment_instance_id();
}
*file_reader = multi_table_pipe->get_pipe(pipe_id);
LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id
<< " pipe: " << (*file_reader).get();

return Status::OK();
}

Expand Down
219 changes: 97 additions & 122 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ Status MultiTablePipe::append_json(const char* data, size_t size) {
}

KafkaConsumerPipePtr MultiTablePipe::get_pipe_by_table(const std::string& table) {
auto pipe = _planned_pipes.find(table);
DCHECK(pipe != _planned_pipes.end());
return pipe->second;
auto pair = _planned_tables.find(table);
DCHECK(pair != _planned_tables.end());
return std::static_pointer_cast<io::KafkaConsumerPipe>(pair->second->pipe);
}

static std::string_view get_first_part(const char* dat, char delimiter) {
Expand All @@ -78,15 +78,15 @@ static std::string_view get_first_part(const char* dat, char delimiter) {
}

Status MultiTablePipe::finish() {
for (auto& pair : _planned_pipes) {
RETURN_IF_ERROR(pair.second->finish());
for (auto& pair : _planned_tables) {
RETURN_IF_ERROR(pair.second->pipe->finish());
}
return Status::OK();
}

void MultiTablePipe::cancel(const std::string& reason) {
for (auto& pair : _planned_pipes) {
pair.second->cancel(reason);
for (auto& pair : _planned_tables) {
pair.second->pipe->cancel(reason);
}
}

Expand All @@ -101,19 +101,29 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
return Status::InternalError("empty data");
}
KafkaConsumerPipePtr pipe = nullptr;
auto iter = _planned_pipes.find(table);
if (iter != _planned_pipes.end()) {
pipe = iter->second;
auto iter = _planned_tables.find(table);
if (iter != _planned_tables.end()) {
pipe = std::static_pointer_cast<io::KafkaConsumerPipe>(iter->second->pipe);
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in planned kafka pipe");
} else {
iter = _unplanned_pipes.find(table);
if (iter == _unplanned_pipes.end()) {
iter = _unplanned_tables.find(table);
if (iter == _unplanned_tables.end()) {
std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(doris::ExecEnv::GetInstance());
ctx->id = UniqueId::gen_uid();
pipe = std::make_shared<io::KafkaConsumerPipe>();
LOG(INFO) << "create new unplanned pipe: " << pipe.get() << ", ctx: " << _ctx->brief();
_unplanned_pipes.emplace(table, pipe);
ctx->pipe = pipe;
#ifndef BE_TEST
RETURN_NOT_OK_STATUS_WITH_WARN(
doris::ExecEnv::GetInstance()->new_load_stream_mgr()->put(ctx->id, ctx),
"put stream load ctx error");
#endif
_unplanned_tables.emplace(table, ctx);
LOG(INFO) << "create new unplanned table ctx, table: " << table
<< "load id: " << ctx->id << ", txn id: " << _ctx->txn_id;
} else {
pipe = iter->second;
pipe = std::static_pointer_cast<io::KafkaConsumerPipe>(iter->second->pipe);
}

// It is necessary to determine whether the sum of pipe_current_capacity and size is greater than pipe_max_capacity,
Expand All @@ -124,7 +134,7 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
auto pipe_current_capacity = pipe->current_capacity();
auto pipe_max_capacity = pipe->max_capacity();
if (_unplanned_row_cnt >= _row_threshold ||
_unplanned_pipes.size() >= _wait_tables_threshold ||
_unplanned_tables.size() >= _wait_tables_threshold ||
pipe_current_capacity + size > pipe_max_capacity) {
LOG(INFO) << fmt::format(
"unplanned row cnt={} reach row_threshold={} or "
Expand All @@ -151,112 +161,106 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size

#ifndef BE_TEST
Status MultiTablePipe::request_and_exec_plans() {
if (_unplanned_pipes.empty()) {
if (_unplanned_tables.empty()) {
return Status::OK();
}

// get list of table names in unplanned pipes
std::vector<std::string> tables;
fmt::memory_buffer log_buffer;
log_buffer.clear();
fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_pipes.size());
for (auto& pair : _unplanned_pipes) {
tables.push_back(pair.first);
fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_tables.size());
for (auto& pair : _unplanned_tables) {
fmt::format_to(log_buffer, "{} ", pair.first);
}
fmt::format_to(log_buffer, "]");
LOG(INFO) << fmt::to_string(log_buffer);

TStreamLoadPutRequest request;
set_request_auth(&request, _ctx->auth);
request.db = _ctx->db;
request.table_names = tables;
request.__isset.table_names = true;
request.txnId = _ctx->txn_id;
request.formatType = _ctx->format;
request.__set_compress_type(_ctx->compress_type);
request.__set_header_type(_ctx->header_type);
request.__set_loadId(_ctx->id.to_thrift());
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
request.__set_user(_ctx->qualified_user);
request.__set_cloud_cluster(_ctx->cloud_cluster);
// no need to register new_load_stream_mgr coz it is already done in routineload submit task

// plan this load
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
TNetworkAddress master_addr = exec_env->master_info()->network_address;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, this](FrontendServiceConnection& client) {
client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request);
}));
_ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;

Status plan_status(Status::create(_ctx->multi_table_put_result.status));
if (!plan_status.ok()) {
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief();
return plan_status;
}

Status st;
if (_ctx->multi_table_put_result.__isset.params &&
!_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
} else if (!_ctx->multi_table_put_result.__isset.params &&
_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
} else {
return Status::Aborted("too many or too few params are set in multi_table_put_result.");
}
for (auto& pair : _unplanned_tables) {
TStreamLoadPutRequest request;
set_request_auth(&request, _ctx->auth);
std::vector<std::string> tables;
tables.push_back(pair.first);
request.db = _ctx->db;
request.table_names = tables;
request.__isset.table_names = true;
request.txnId = _ctx->txn_id;
request.formatType = _ctx->format;
request.__set_compress_type(_ctx->compress_type);
request.__set_header_type(_ctx->header_type);
request.__set_loadId((pair.second->id).to_thrift());
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
request.__set_user(_ctx->qualified_user);
request.__set_cloud_cluster(_ctx->cloud_cluster);
// no need to register new_load_stream_mgr coz it is already done in routineload submit task

// plan this load
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
TNetworkAddress master_addr = exec_env->master_info()->network_address;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, this](FrontendServiceConnection& client) {
client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request);
}));
_ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;

Status plan_status(Status::create(_ctx->multi_table_put_result.status));
if (!plan_status.ok()) {
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief();
return plan_status;
}

if (_ctx->multi_table_put_result.__isset.params &&
!_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
} else if (!_ctx->multi_table_put_result.__isset.params &&
_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
} else {
return Status::Aborted("too many or too few params are set in multi_table_put_result.");
}
if (!st.ok()) {
return st;
}
}
_unplanned_tables.clear();
return st;
}

template <typename ExecParam>
Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
// put unplanned pipes into planned pipes and clear unplanned pipes
for (auto& pipe : _unplanned_pipes) {
_ctx->table_list.push_back(pipe.first);
_planned_pipes.emplace(pipe.first, pipe.second);
for (auto& pair : _unplanned_tables) {
_ctx->table_list.push_back(pair.first);
_planned_tables.emplace(pair.first, pair.second);
}
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}",
_unplanned_pipes.size(), _planned_pipes.size(), params.size())
_unplanned_tables.size(), _planned_tables.size(), params.size())
<< ", ctx: " << _ctx->brief();
_unplanned_pipes.clear();

for (auto& plan : params) {
DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed",
{ return Status::Aborted("MultiTablePipe.exec_plans.failed"); });
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
_unplanned_tables.find(plan.table_name) == _unplanned_tables.end()) {
return Status::Aborted("Missing vital param: table_name");
}

if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
RETURN_IF_ERROR(
put_pipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]));
LOG(INFO) << "fragment_instance_id=" << print_id(plan.params.fragment_instance_id)
<< " table=" << plan.table_name << ", ctx: " << _ctx->brief();
} else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
RETURN_IF_ERROR(put_pipe(pipe_id, _planned_pipes[plan.table_name]));
LOG(INFO) << "pipe_id=" << pipe_id << ", table=" << plan.table_name
<< ", ctx: " << _ctx->brief();
} else {
LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or "
"`TPipelineFragmentParams`, will crash"
<< ", ctx: " << _ctx->brief();
CHECK(false);
}

_inflight_cnt++;

RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
plan, [this](RuntimeState* state, Status* status) {
plan, [this, plan](RuntimeState* state, Status* status) {
DCHECK(state);
auto pair = _planned_tables.find(plan.table_name);
if (pair == _planned_tables.end()) {
LOG(WARNING) << "failed to get ctx, table: " << plan.table_name;
} else {
doris::ExecEnv::GetInstance()->new_load_stream_mgr()->remove(
pair->second->id);
}

{
std::lock_guard<std::mutex> l(_tablet_commit_infos_lock);
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
Expand Down Expand Up @@ -300,12 +304,12 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
#else
Status MultiTablePipe::request_and_exec_plans() {
// put unplanned pipes into planned pipes
for (auto& pipe : _unplanned_pipes) {
_planned_pipes.emplace(pipe.first, pipe.second);
for (auto& pipe : _unplanned_tables) {
_planned_tables.emplace(pipe.first, pipe.second);
}
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}",
_unplanned_pipes.size(), _planned_pipes.size());
_unplanned_pipes.clear();
_unplanned_tables.size(), _planned_tables.size());
_unplanned_tables.clear();
return Status::OK();
}

Expand All @@ -330,35 +334,6 @@ void MultiTablePipe::_handle_consumer_finished() {
_ctx->promise.set_value(_status); // when all done, finish the routine load task
}

Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
std::shared_ptr<io::StreamLoadPipe> pipe) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
return Status::InternalError("id already exist");
}
_pipe_map.emplace(pipe_id, pipe);
return Status::OK();
}

std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::get_pipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it == std::end(_pipe_map)) {
return {};
}
return it->second;
}

void MultiTablePipe::remove_pipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
_pipe_map.erase(it);
VLOG_NOTICE << "remove stream load pipe: " << pipe_id;
}
}

template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<TExecPlanFragmentParams> params);
template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
Expand Down
11 changes: 2 additions & 9 deletions be/src/io/fs/multi_table_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ class MultiTablePipe : public KafkaConsumerPipe {

void cancel(const std::string& reason) override;

// register <instance id, pipe> pair
Status put_pipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe);

std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& pipe_id);

void remove_pipe(const TUniqueId& pipe_id);

private:
// parse table name from data
std::string parse_dst_table(const char* data, size_t size);
Expand All @@ -82,8 +75,8 @@ class MultiTablePipe : public KafkaConsumerPipe {
void _handle_consumer_finished();

private:
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
std::unordered_map<std::string /*table*/, std::shared_ptr<StreamLoadContext>> _planned_tables;
std::unordered_map<std::string /*table*/, std::shared_ptr<StreamLoadContext>> _unplanned_tables;
std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when exceed threshold
// inflight count, when it is zero, means consume and all plans is finished
std::atomic<uint64_t> _inflight_cnt {1};
Expand Down

0 comments on commit eb68dd6

Please sign in to comment.