Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement] Optimize send fragment logic to reduce send fragment timeout error #9720

Merged
merged 8 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 79 additions & 52 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class FragmentExecState {
void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }

void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; }

private:
void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);

Expand Down Expand Up @@ -171,6 +173,9 @@ class FragmentExecState {
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
// The pipe for data transfering, such as insert.
std::shared_ptr<StreamLoadPipe> _pipe;

// If set the true, this plan fragment will be executed only after FE send execution start rpc.
bool _need_wait_execution_trigger = false;
};

FragmentExecState::FragmentExecState(const TUniqueId& query_id,
Expand Down Expand Up @@ -225,6 +230,11 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
}

Status FragmentExecState::execute() {
if (_need_wait_execution_trigger) {
// if _need_wait_execution_trigger is true, which means this instance
// is prepared but need to wait for the signal to do the rest execution.
_fragments_ctx->wait_for_start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a timeout here, avoid occupy the running thread too much time during exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, there is a "timeout checker" on BE side to check and notify this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a timeout here, avoid occupy the running thread too much time during exceptions.

}
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
Expand Down Expand Up @@ -527,6 +537,22 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
}
}

Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
std::lock_guard<std::mutex> lock(_lock);
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
auto search = _fragments_ctx_map.find(query_id);
if (search == _fragments_ctx_map.end()) {
return Status::InternalError(
strings::Substitute("Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: ",
BackendOptions::get_localhost()));
}
search->second->set_ready_to_execute();
return Status::OK();
}

void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
std::shared_ptr<StreamLoadPipe> pipe) {
{
Expand Down Expand Up @@ -562,65 +588,62 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
}

std::shared_ptr<FragmentExecState> exec_state;
if (!params.__isset.is_simplified_param) {
// This is an old version params, all @Common components is set in TExecPlanFragmentParams.
exec_state.reset(new FragmentExecState(params.params.query_id,
params.params.fragment_instance_id,
params.backend_num, _exec_env, params.coord));
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
if (params.is_simplified_param) {
// Get common components from _fragments_ctx_map
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
if (search == _fragments_ctx_map.end()) {
return Status::InternalError(
strings::Substitute("Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: ",
BackendOptions::get_localhost()));
}
fragments_ctx = search->second;
} else {
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
if (params.is_simplified_param) {
// Get common components from _fragments_ctx_map
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
if (search == _fragments_ctx_map.end()) {
return Status::InternalError(
strings::Substitute("Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: ",
BackendOptions::get_localhost()));
}
fragments_ctx = search->second;
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
fragments_ctx->query_id = params.params.query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
&(fragments_ctx->desc_tbl)));
fragments_ctx->coord_addr = params.coord;
fragments_ctx->query_globals = params.query_globals;

if (params.__isset.resource_info) {
fragments_ctx->user = params.resource_info.user;
fragments_ctx->group = params.resource_info.group;
fragments_ctx->set_rsc_info = true;
}
// This may be a first fragment request of the query.
// Create the query fragments context.
fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
fragments_ctx->query_id = params.params.query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
&(fragments_ctx->desc_tbl)));
fragments_ctx->coord_addr = params.coord;
fragments_ctx->query_globals = params.query_globals;

if (params.__isset.query_options) {
fragments_ctx->timeout_second = params.query_options.query_timeout;
if (params.query_options.__isset.resource_limit) {
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
}
if (params.__isset.resource_info) {
fragments_ctx->user = params.resource_info.user;
fragments_ctx->group = params.resource_info.group;
fragments_ctx->set_rsc_info = true;
}

if (params.__isset.query_options) {
fragments_ctx->timeout_second = params.query_options.query_timeout;
if (params.query_options.__isset.resource_limit) {
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
}
}

{
// Find _fragments_ctx_map again, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
if (search == _fragments_ctx_map.end()) {
_fragments_ctx_map.insert(
std::make_pair(fragments_ctx->query_id, fragments_ctx));
} else {
// Already has a query fragmentscontext, use it
fragments_ctx = search->second;
}
{
// Find _fragments_ctx_map again, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
if (search == _fragments_ctx_map.end()) {
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx));
} else {
// Already has a query fragmentscontext, use it
fragments_ctx = search->second;
}
}
}

exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
params.params.fragment_instance_id,
params.backend_num, _exec_env, fragments_ctx));
exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
params.params.fragment_instance_id, params.backend_num,
_exec_env, fragments_ctx));
if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
// set need_wait_execution_trigger means this instance will not actually being executed
// until the execPlanFragmentStart RPC trigger to start it.
exec_state->set_need_wait_execution_trigger();
}

std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
Expand Down Expand Up @@ -672,6 +695,7 @@ void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
do {
std::vector<TUniqueId> to_cancel;
std::vector<TUniqueId> to_cancel_queries;
DateTimeValue now = DateTimeValue::local_time();
{
std::lock_guard<std::mutex> lock(_lock);
Expand All @@ -682,6 +706,9 @@ void FragmentMgr::cancel_worker() {
}
for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) {
if (it->second->is_timeout(now)) {
// The execution logic of the instance needs to be notified.
// The execution logic of the instance will eventually cancel the execution plan.
it->second->set_ready_to_execute();
it = _fragments_ctx_map.erase(it);
} else {
++it;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class FragmentMgr : public RestMonitorIface {
// TODO(zc): report this is over
Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb);

Status start_query_execution(const PExecPlanFragmentStartRequest* request);

Status cancel(const TUniqueId& fragment_id) {
return cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR);
}
Expand Down
21 changes: 21 additions & 0 deletions be/src/runtime/query_fragments_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ class QueryFragmentsCtx {

ThreadPoolToken* get_token() { return _thread_token.get(); }

void set_ready_to_execute() {
{
std::lock_guard<std::mutex> l(_start_lock);
_ready_to_execute = true;
}
_start_cond.notify_all();
}

void wait_for_start() {
std::unique_lock<std::mutex> l(_start_lock);
while (!_ready_to_execute.load()) {
_start_cond.wait(l);
}
}

public:
TUniqueId query_id;
DescriptorTbl* desc_tbl;
Expand Down Expand Up @@ -92,6 +107,12 @@ class QueryFragmentsCtx {
// So that we can control the max thread that a query can be used to execute.
// If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
std::unique_ptr<ThreadPoolToken> _thread_token;

std::mutex _start_lock;
std::condition_variable _start_cond;
// Only valid when _need_wait_execution_trigger is set to true in FragmentExecState.
// And all fragments of this query will start execution when this is set to true.
std::atomic<bool> _ready_to_execute {false};
};

} // namespace doris
53 changes: 45 additions & 8 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,32 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
bool compact = request->has_compact() ? request->compact() : false;
st = _exec_plan_fragment(request->request(), compact);
PFragmentRequestVersion version =
request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1;
st = _exec_plan_fragment(request->request(), version, compact);
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
}
st.to_protobuf(response->mutable_status());
}

void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* cntl_base,
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response,
google::protobuf::Closure* done) {
exec_plan_fragment(cntl_base, request, response, done);
}

void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller,
const PExecPlanFragmentStartRequest* request,
PExecPlanFragmentResult* result,
google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->fragment_mgr()->start_query_execution(request);
st.to_protobuf(result->mutable_status());
}

void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base,
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
Expand Down Expand Up @@ -201,14 +220,32 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
}
}

Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, bool compact) {
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request,
PFragmentRequestVersion version, bool compact) {
if (version == PFragmentRequestVersion::VERSION_1) {
// VERSION_1 should be removed in v1.2
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
}
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
} else if (version == PFragmentRequestVersion::VERSION_2) {
TExecPlanFragmentParamsList t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
}

for (const TExecPlanFragmentParams& params : t_request.paramsList) {
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
}
return Status::OK();
} else {
return Status::InternalError("invalid version");
}
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
}

void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base,
Expand Down
13 changes: 12 additions & 1 deletion be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ class PInternalServiceImpl : public PBackendService {
PExecPlanFragmentResult* result,
google::protobuf::Closure* done) override;

void exec_plan_fragment_prepare(google::protobuf::RpcController* controller,
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* result,
google::protobuf::Closure* done) override;

void exec_plan_fragment_start(google::protobuf::RpcController* controller,
const PExecPlanFragmentStartRequest* request,
PExecPlanFragmentResult* result,
google::protobuf::Closure* done) override;

void cancel_plan_fragment(google::protobuf::RpcController* controller,
const PCancelPlanFragmentRequest* request,
PCancelPlanFragmentResult* result,
Expand Down Expand Up @@ -121,7 +131,8 @@ class PInternalServiceImpl : public PBackendService {
PHandShakeResponse* response, google::protobuf::Closure* done) override;

private:
Status _exec_plan_fragment(const std::string& s_request, bool compact);
Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version,
bool compact);

Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void runAfterCatalogReady() {
}
}
});
LOG.info("finished to get tablet stat of all backends. cost: {} ms",
LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
(System.currentTimeMillis() - start));

// after update replica in all backends, update index row num
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);

if (lowBEs.isEmpty() && highBEs.isEmpty()) {
LOG.info("cluster is balance: {} with medium: {}. skip", clusterName, medium);
LOG.debug("cluster is balance: {} with medium: {}. skip", clusterName, medium);
return alternativeTablets;
}

Expand Down Expand Up @@ -185,9 +185,11 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
}
} // end for high backends

LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}",
clusterName, medium, alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
if (!alternativeTablets.isEmpty()) {
LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}",
clusterName, medium, alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
}
return alternativeTablets;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void classifyBackendByLoad(TStorageMedium medium) {
}
}

LOG.info("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}",
LOG.debug("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}",
medium, avgLoadScore, lowCounter, midCounter, highCounter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
// first we should check if mid backends is available.
// if all mid backends is not available, we should not start balance
if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
LOG.info("all mid load backends is dead: {} with medium: {}. skip",
LOG.debug("all mid load backends is dead: {} with medium: {}. skip",
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
return alternativeTablets;
}
Expand Down Expand Up @@ -231,9 +231,11 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(

// remove balanced BEs from prio backends
prioBackends.keySet().removeIf(id -> !unbalancedBEs.contains(id));
LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}",
clusterName, medium, alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
if (!alternativeTablets.isEmpty()) {
LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}",
clusterName, medium, alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
}
return alternativeTablets;
}

Expand Down
Loading