Skip to content

Commit

Permalink
[refactor](fragment) Use fragment ID to manage fragment context (apac…
Browse files Browse the repository at this point in the history
…he#42048) (apache#42258)

pick apache#42048

Use fragment ID to manage fragment context
  • Loading branch information
Gabriel39 authored Oct 23, 2024
1 parent 1a30b13 commit ee9240c
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 102 deletions.
9 changes: 2 additions & 7 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,10 +585,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
void PipelineFragmentContext::_init_next_report_time() {
auto interval_s = config::pipeline_status_report_interval;
if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
std::vector<string> ins_ids;
instance_ids(ins_ids);
VLOG_FILE << "enable period report: instance_id="
<< fmt::format("{}", fmt::join(ins_ids, ", "));
VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
// We don't want to wait longer than it takes to run the entire fragment.
_previous_report_time =
Expand Down Expand Up @@ -626,11 +623,9 @@ void PipelineFragmentContext::trigger_report_if_necessary() {
return;
}
if (VLOG_FILE_IS_ON) {
std::vector<string> ins_ids;
instance_ids(ins_ids);
VLOG_FILE << "Reporting "
<< "profile for query_id " << print_id(_query_id)
<< ", instance ids: " << fmt::format("{}", fmt::join(ins_ids, ", "));
<< ", fragment id: " << _fragment_id;

std::stringstream ss;
_runtime_state->runtime_profile()->compute_time_in_profile();
Expand Down
14 changes: 0 additions & 14 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,6 @@ class PipelineFragmentContext : public TaskExecutionContext {

[[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }

void instance_ids(std::vector<TUniqueId>& ins_ids) const {
ins_ids.resize(_fragment_instance_ids.size());
for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
ins_ids[i] = _fragment_instance_ids[i];
}
}

void instance_ids(std::vector<string>& ins_ids) const {
ins_ids.resize(_fragment_instance_ids.size());
for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
ins_ids[i] = print_id(_fragment_instance_ids[i]);
}
}

void clear_finished_tasks() {
for (size_t j = 0; j < _tasks.size(); j++) {
for (size_t i = 0; i < _tasks[j].size(); i++) {
Expand Down
68 changes: 16 additions & 52 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT);
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
bvar::Adder<int64_t> g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count");

bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
bvar::Status<uint64_t> g_fragment_last_active_time(
Expand Down Expand Up @@ -637,18 +636,13 @@ void FragmentMgr::remove_pipeline_context(
{
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << -1;
g_fragment_last_active_time.set_value(now);
for (const auto& ins_id : ins_ids) {
LOG_INFO("Removing query {} instance {}", print_id(query_id), print_id(ins_id));
_pipeline_map.erase(ins_id);
g_pipeline_fragment_instances_count << -1;
}
LOG_INFO("Removing query {} fragment {}", print_id(query_id), f_context->get_fragment_id());
_pipeline_map.erase({query_id, f_context->get_fragment_id()});
}
}

Expand Down Expand Up @@ -782,11 +776,10 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
continue;
}
auto timeout_second = it.second->timeout_second();
fmt::format_to(debug_string_buffer,
"No.{} (elapse_second={}s, query_timeout_second={}s, instance_id="
"{}, is_timeout={}) : {}\n",
i, elapsed, timeout_second, print_id(it.first),
it.second->is_timeout(now), it.second->debug_string());
fmt::format_to(
debug_string_buffer,
"No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}) : {}\n", i,
elapsed, timeout_second, it.second->is_timeout(now), it.second->debug_string());
i++;
}
}
Expand Down Expand Up @@ -846,11 +839,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
std::lock_guard<std::mutex> lock(_lock);
auto iter = _pipeline_map.find(fragment_instance_id);
auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
return Status::InternalError(
"exec_plan_fragment input duplicated fragment_instance_id({})",
UniqueId(fragment_instance_id).to_string());
return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})",
params.fragment_id);
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
Expand All @@ -866,12 +858,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
std::lock_guard<std::mutex> lock(_lock);
std::vector<TUniqueId> ins_ids;
context->instance_ids(ins_ids);
// TODO: simplify this mapping
for (const auto& ins_id : ins_ids) {
_pipeline_map.insert({ins_id, context});
}
_pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}
query_ctx->set_pipeline_context(params.fragment_id, context);

Expand Down Expand Up @@ -916,31 +904,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
<< " is cancelled and removed. Reason: " << reason.to_string();
}

void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status reason) {
std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
{
std::lock_guard<std::mutex> state_lock(_lock);
DCHECK(!_pipeline_map.contains(instance_id))
<< " Pipeline tasks should be canceled by query instead of instance! Query ID: "
<< print_id(_pipeline_map[instance_id]->get_query_id());
const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
if (is_pipeline_instance) {
auto itr = _pipeline_map.find(instance_id);
if (itr != _pipeline_map.end()) {
pipeline_ctx = itr->second;
} else {
LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id)
<< " to cancel";
return;
}
}
}

if (pipeline_ctx != nullptr) {
pipeline_ctx->cancel(reason);
}
}

void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";

Expand Down Expand Up @@ -1206,15 +1169,16 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,

RuntimeFilterMgr* runtime_filter_mgr = nullptr;

const auto& fragment_instance_ids = request->fragment_instance_ids();
const auto& fragment_ids = request->fragment_ids();
{
std::unique_lock<std::mutex> lock(_lock);
for (UniqueId fragment_instance_id : fragment_instance_ids) {
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();

for (auto fragment_id : fragment_ids) {
if (is_pipeline) {
auto iter = _pipeline_map.find(tfragment_instance_id);
auto iter = _pipeline_map.find(
{UniqueId(request->query_id()).to_thrift(), fragment_id});
if (iter == _pipeline_map.end()) {
LOG(WARNING) << "No pipeline fragment is found: Query-ID = "
<< request->query_id() << " fragment_id = " << fragment_id;
continue;
}
pip_context = iter->second;
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ class FragmentMgr : public RestMonitorIface {
Status trigger_pipeline_context_report(const ReportStatusRequest,
std::shared_ptr<pipeline::PipelineFragmentContext>&&);

// Cancel instance (pipeline or nonpipeline).
void cancel_instance(const TUniqueId instance_id, const Status reason);

// Can be used in both version.
void cancel_query(const TUniqueId query_id, const Status reason);

Expand Down Expand Up @@ -169,7 +166,10 @@ class FragmentMgr : public RestMonitorIface {
// call _lock, so that there is dead lock.
std::mutex _lock;

std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
// (QueryID, FragmentID) -> PipelineFragmentContext
std::unordered_map<std::pair<TUniqueId, int>,
std::shared_ptr<pipeline::PipelineFragmentContext>>
_pipeline_map;

// query id -> QueryContext
std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ class QueryContext {

ThreadPool* get_memtable_flush_pool();

std::vector<TUniqueId> get_fragment_instance_ids() const { return fragment_instance_ids; }

int64_t mem_limit() const { return _bytes_limit; }

void set_merge_controller_handler(
Expand Down
16 changes: 11 additions & 5 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
// so we need to copy to cnt_val
cnt_val->producer_size = producer_size;
cnt_val->runtime_filter_desc = *runtime_filter_desc;
cnt_val->target_info = *target_info;
cnt_val->pool.reset(new ObjectPool());
cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc));

Expand Down Expand Up @@ -458,10 +457,17 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
}
closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000);
// set fragment-id
for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) {
PUniqueId* cur_id = closure->request_->add_fragment_instance_ids();
cur_id->set_hi(target_fragment_instance_id.hi);
cur_id->set_lo(target_fragment_instance_id.lo);
if (target.__isset.target_fragment_ids) {
for (auto& target_fragment_id : target.target_fragment_ids) {
closure->request_->add_fragment_ids(target_fragment_id);
}
} else {
// FE not upgraded yet.
for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) {
PUniqueId* cur_id = closure->request_->add_fragment_instance_ids();
cur_id->set_hi(target_fragment_instance_id.hi);
cur_id->set_lo(target_fragment_instance_id.lo);
}
}

std::shared_ptr<PBackendService_Stub> stub(
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ class RuntimeFilterMergeControllerEntity {
int producer_size;
uint64_t global_size;
TRuntimeFilterDesc runtime_filter_desc;
std::vector<doris::TRuntimeFilterTargetParams> target_info;
std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
IRuntimeFilter* filter = nullptr;
std::unordered_set<UniqueId> arrive_id;
Expand Down
7 changes: 0 additions & 7 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,6 @@ Status BaseBackendService::start_plan_fragment_execution(
QuerySource::INTERNAL_FRONTEND);
}

void BaseBackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
const TCancelPlanFragmentParams& params) {
LOG(INFO) << "cancel_plan_fragment(): instance_id=" << print_id(params.fragment_instance_id);
_exec_env->fragment_mgr()->cancel_instance(
params.fragment_instance_id, Status::InternalError("cancel message received from FE"));
}

void BaseBackendService::transmit_data(TTransmitDataResult& return_val,
const TTransmitDataParams& params) {
VLOG_ROW << "transmit_data(): instance_id=" << params.dest_fragment_instance_id
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class BaseBackendService : public BackendServiceIf {
const TExecPlanFragmentParams& params) override;

void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
const TCancelPlanFragmentParams& params) override;
const TCancelPlanFragmentParams& params) override {};

void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) override;

Expand Down
24 changes: 15 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1995,7 +1995,8 @@ private void assignRuntimeFilterAddr() throws Exception {
List<FRuntimeFilterTargetParam> targetFragments = ridToTargetParam.computeIfAbsent(rid,
k -> new ArrayList<>());
for (final FInstanceExecParam instance : params.instanceExecParams) {
targetFragments.add(new FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host)));
targetFragments.add(new FRuntimeFilterTargetParam(instance.fragment().getFragmentId().asInt(),
toBrpcHost(instance.host)));
}
}

Expand Down Expand Up @@ -3179,20 +3180,24 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {
for (FRuntimeFilterTargetParam targetParam : fParams) {
if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
.add(targetParam.targetFragmentInstanceId);
.target_fragment_ids
.add(targetParam.targetFragmentId);
} else {
targetParamsV2.put(targetParam.targetFragmentInstanceAddr,
new TRuntimeFilterTargetParamsV2());
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_addr
= targetParam.targetFragmentInstanceAddr;
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
.target_fragment_ids
= new ArrayList<>();
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_ids
.add(targetParam.targetFragmentId);
// `target_fragment_instance_ids` is a required field
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
.add(targetParam.targetFragmentInstanceId);
= new ArrayList<>();
}
}

Expand All @@ -3201,7 +3206,8 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {
} else {
List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
for (FRuntimeFilterTargetParam targetParam : fParams) {
targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
// Instance id make no sense if this runtime filter doesn't have remote targets.
targetParams.add(new TRuntimeFilterTargetParams(new TUniqueId(),
targetParam.targetFragmentInstanceAddr));
}
localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(),
Expand Down Expand Up @@ -3371,12 +3377,12 @@ private void updateProfileIfPresent(Consumer<SummaryProfile> profileAction) {

// Runtime filter target fragment instance param
static class FRuntimeFilterTargetParam {
public TUniqueId targetFragmentInstanceId;
public int targetFragmentId;

public TNetworkAddress targetFragmentInstanceAddr;

public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
this.targetFragmentInstanceId = id;
public FRuntimeFilterTargetParam(int id, TNetworkAddress host) {
this.targetFragmentId = id;
this.targetFragmentInstanceAddr = host;
}
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ message PPublishFilterRequestV2 {
optional int64 merge_time = 9;
optional bool contain_null = 10;
optional bool ignored = 11;
repeated int32 fragment_ids = 12;
};

message PPublishFilterResponse {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ struct TRuntimeFilterTargetParamsV2 {
1: required list<Types.TUniqueId> target_fragment_instance_ids
// The address of the instance where the fragment is expected to run
2: required Types.TNetworkAddress target_fragment_instance_addr
3: optional list<i32> target_fragment_ids
}

struct TRuntimeFilterParams {
Expand Down

0 comments on commit ee9240c

Please sign in to comment.