Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](memory) Refactor memory allocated failure processing #36090

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions be/src/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,20 @@ inline const std::string& Exception::to_string() const {
return Status::Error<false>(e.code(), e.to_string()); \
} \
} while (0)

#define ASSIGN_STATUS_IF_CATCH_EXCEPTION(stmt, status_) \
do { \
try { \
doris::enable_thread_catch_bad_alloc++; \
Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; \
{ stmt; } \
} catch (const doris::Exception& e) { \
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { \
status_ = Status::MemoryLimitExceeded(fmt::format( \
"PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \
} else { \
status_ = e.to_status(); \
} \
} \
} while (0);
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Status AggSinkLocalState::open(RuntimeState* state) {
// this could cause unable to get JVM
if (Base::_shared_state->probe_expr_ctxs.empty()) {
// _create_agg_status may acquire a lot of memory, may allocate failed when memory is very few
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key));
RETURN_IF_ERROR(_create_agg_status(_agg_data->without_key));
_shared_state->agg_data_created_without_key = true;
}
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
_distinct_row.reserve(rows);

if (!_stop_emplace_flag) {
RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
}

bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
Expand Down
58 changes: 28 additions & 30 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,37 +294,35 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
Status st;
if (local_state._probe_index < local_state._probe_block.rows()) {
DCHECK(local_state._has_set_need_null_map_for_probe);
RETURN_IF_CATCH_EXCEPTION({
std::visit(
[&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe,
auto ignore_null) {
using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>;
if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
st = process_hashtable_ctx.template process<need_null_map_for_probe,
ignore_null>(
arg,
need_null_map_for_probe
? &local_state._null_map_column->get_data()
: nullptr,
mutable_join_block, &temp_block,
local_state._probe_block.rows(), _is_mark_join,
_have_other_join_conjunct);
local_state._mem_tracker->set_consumption(
arg.serialized_keys_size(false));
} else {
st = Status::InternalError("uninited hash table");
}
std::visit(
[&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe,
auto ignore_null) {
using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>;
if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
st = process_hashtable_ctx
.template process<need_null_map_for_probe, ignore_null>(
arg,
need_null_map_for_probe
? &local_state._null_map_column->get_data()
: nullptr,
mutable_join_block, &temp_block,
local_state._probe_block.rows(), _is_mark_join,
_have_other_join_conjunct);
local_state._mem_tracker->set_consumption(
arg.serialized_keys_size(false));
} else {
st = Status::InternalError("uninited hash table probe");
st = Status::InternalError("uninited hash table");
}
},
*local_state._shared_state->hash_table_variants,
*local_state._process_hashtable_ctx_variants,
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
});
} else {
st = Status::InternalError("uninited hash table probe");
}
},
*local_state._shared_state->hash_table_variants,
*local_state._process_hashtable_ctx_variants,
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
} else if (local_state._probe_eos) {
if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) {
std::visit(
Expand Down Expand Up @@ -457,7 +455,7 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
temp_block->columns()));
}

RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_output_block(temp_block, output_block, false));
RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false));
_reset_tuple_is_null_column();
reached_limit(output_block, eos);
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
return true;
};

RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
probe_side_output_column(
mcol, *_left_output_slot_flags, current_offset, last_probe_index,
check_all_match_one(_probe_indexs, last_probe_index, current_offset),
with_other_conjuncts));
with_other_conjuncts);
}

output_block->swap(mutable_block.to_block());
Expand Down
15 changes: 6 additions & 9 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
}

if constexpr (set_probe_side_flag) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
RETURN_IF_ERROR(
(_do_filtering_and_update_visited_flags<set_build_side_flag,
set_probe_side_flag, ignore_null>(
&_join_block, !p._is_left_semi_anti)));
Expand All @@ -185,10 +185,9 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
}

if constexpr (!set_probe_side_flag) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
(_do_filtering_and_update_visited_flags<set_build_side_flag, set_probe_side_flag,
ignore_null>(&_join_block,
!p._is_right_semi_anti)));
RETURN_IF_ERROR((_do_filtering_and_update_visited_flags<set_build_side_flag,
set_probe_side_flag, ignore_null>(
&_join_block, !p._is_right_semi_anti)));
_update_additional_flags(&_join_block);
}

Expand Down Expand Up @@ -499,8 +498,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block
bool* eos) const {
auto& local_state = get_local_state(state);
if (_is_output_left_side_only) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._build_output_block(local_state._child_block.get(), block));
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block));
*eos = local_state._shared_state->left_side_eos;
local_state._need_more_input_data = !local_state._shared_state->left_side_eos;
} else {
Expand All @@ -522,8 +520,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, &tmp_block, tmp_block.columns()));
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._build_output_block(&tmp_block, block, false));
RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block, false));
local_state._reset_tuple_is_null_column();
}
local_state._join_block.clear_column_data();
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) {
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._shared_state->sorter->get_next(state, block, eos));
RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, eos));
local_state.reached_limit(block, eos);
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ Status StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
_agg_data->method_variant));

if (!ret_flag) {
RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), key_columns, rows));
_emplace_into_hash_table(_places.data(), key_columns, rows);

for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
Expand Down
13 changes: 6 additions & 7 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,18 +306,17 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
SCOPED_TIMER(_build_pipelines_timer);
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(), request,
*_query_ctx->desc_tbl, &_root_op,
root_pipeline));
RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), request, *_query_ctx->desc_tbl,
&_root_op, root_pipeline));

// 3. Create sink operator
if (!request.fragment.__isset.output_sink) {
return Status::InternalError("No output sink in this fragment!");
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, root_pipeline->output_row_desc(),
_runtime_state.get(), *_desc_tbl, root_pipeline->id()));
RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request,
root_pipeline->output_row_desc(), _runtime_state.get(),
*_desc_tbl, root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
RETURN_IF_ERROR(root_pipeline->set_sink(_sink));

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Status PipelineTask::execute(bool* eos) {
} else {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state, block, eos));
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
}

if (_block->rows() != 0 || *eos) {
Expand All @@ -353,7 +353,7 @@ Status PipelineTask::execute(bool* eos) {
// return error status with EOF, it is special, could not return directly.
auto sink_function = [&]() -> Status {
Status internal_st;
RETURN_IF_CATCH_EXCEPTION(internal_st = _sink->sink(_state, block, *eos));
internal_st = _sink->sink(_state, block, *eos);
return internal_st;
};
status = sink_function();
Expand Down
41 changes: 19 additions & 22 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,31 +127,28 @@ void TaskScheduler::_do_work(size_t index) {
bool eos = false;
auto status = Status::OK();

try {
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
TUniqueId query_id = task->query_context()->query_id();
std::string task_name = task->task_name();
#ifdef __APPLE__
uint32_t core_id = 0;
uint32_t core_id = 0;
#else
uint32_t core_id = sched_getcpu();
uint32_t core_id = sched_getcpu();
#endif
std::thread::id tid = std::this_thread::get_id();
uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
uint64_t start_time = MonotonicMicros();

status = task->execute(&eos);

uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time, end_time});
} else {
status = task->execute(&eos);
}
} catch (const Exception& e) {
status = e.to_status();
}
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
TUniqueId query_id = task->query_context()->query_id();
std::string task_name = task->task_name();

std::thread::id tid = std::this_thread::get_id();
uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
uint64_t start_time = MonotonicMicros();

status = task->execute(&eos);

uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time, end_time});
} else { status = task->execute(&eos); },
status);

task->set_previous_core_id(index);

Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
this, std::placeholders::_1, std::placeholders::_2));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
Status prepare_st = Status::OK();
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params), prepare_st);
if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
Expand Down
25 changes: 18 additions & 7 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,22 @@ void PInternalService::fold_constant_expr(google::protobuf::RpcController* contr
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
Status st = _fold_constant_expr(request->request(), response);
TFoldConstantParams t_request;
Status st = Status::OK();
{
const uint8_t* buf = (const uint8_t*)request->request().data();
uint32_t len = request->request().size();
st = deserialize_thrift_msg(buf, &len, false, &t_request);
}
if (!st.ok()) {
LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
<< " .and query_id_is: " << t_request.query_id;
}
st = _fold_constant_expr(request->request(), response);
if (!st.ok()) {
LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
<< " .and query_id_is: " << t_request.query_id;
}
st.to_protobuf(response->mutable_status());
});
if (!ret) {
Expand All @@ -1481,12 +1496,8 @@ Status PInternalService::_fold_constant_expr(const std::string& ser_request,
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
}
std::unique_ptr<FoldConstantExecutor> fold_executor = std::make_unique<FoldConstantExecutor>();
Status st = fold_executor->fold_constant_vexpr(t_request, response);
if (!st.ok()) {
LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
<< " .and query_id_is: " << fold_executor->query_id_string();
}
return st;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(fold_executor->fold_constant_vexpr(t_request, response));
return Status::OK();
}

void PInternalService::transmit_block(google::protobuf::RpcController* controller,
Expand Down
Loading
Loading