From ee9240c3a658dcaea90a838f39bf330fe5888d32 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 23 Oct 2024 10:10:57 +0800 Subject: [PATCH] [refactor](fragment) Use fragment ID to manage fragment context (#42048) (#42258) pick #42048 Use fragment ID to manage fragment context --- be/src/pipeline/pipeline_fragment_context.cpp | 9 +-- be/src/pipeline/pipeline_fragment_context.h | 14 ---- be/src/runtime/fragment_mgr.cpp | 68 +++++-------------- be/src/runtime/fragment_mgr.h | 8 +-- be/src/runtime/query_context.h | 2 - be/src/runtime/runtime_filter_mgr.cpp | 16 +++-- be/src/runtime/runtime_filter_mgr.h | 1 - be/src/service/backend_service.cpp | 7 -- be/src/service/backend_service.h | 2 +- .../java/org/apache/doris/qe/Coordinator.java | 24 ++++--- gensrc/proto/internal_service.proto | 1 + gensrc/thrift/PaloInternalService.thrift | 1 + 12 files changed, 51 insertions(+), 102 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 8c998ab8c2fe68..b1ee5933d27984 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -585,10 +585,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag void PipelineFragmentContext::_init_next_report_time() { auto interval_s = config::pipeline_status_report_interval; if (_is_report_success && interval_s > 0 && _timeout > interval_s) { - std::vector ins_ids; - instance_ids(ins_ids); - VLOG_FILE << "enable period report: instance_id=" - << fmt::format("{}", fmt::join(ins_ids, ", ")); + VLOG_FILE << "enable period report: fragment id=" << _fragment_id; uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC; // We don't want to wait longer than it takes to run the entire fragment. _previous_report_time = @@ -626,11 +623,9 @@ void PipelineFragmentContext::trigger_report_if_necessary() { return; } if (VLOG_FILE_IS_ON) { - std::vector ins_ids; - instance_ids(ins_ids); VLOG_FILE << "Reporting " << "profile for query_id " << print_id(_query_id) - << ", instance ids: " << fmt::format("{}", fmt::join(ins_ids, ", ")); + << ", fragment id: " << _fragment_id; std::stringstream ss; _runtime_state->runtime_profile()->compute_time_in_profile(); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index b20c324756c095..822a23c54bda4e 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -117,20 +117,6 @@ class PipelineFragmentContext : public TaskExecutionContext { [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; } - void instance_ids(std::vector& ins_ids) const { - ins_ids.resize(_fragment_instance_ids.size()); - for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { - ins_ids[i] = _fragment_instance_ids[i]; - } - } - - void instance_ids(std::vector& ins_ids) const { - ins_ids.resize(_fragment_instance_ids.size()); - for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { - ins_ids[i] = print_id(_fragment_instance_ids[i]); - } - } - void clear_finished_tasks() { for (size_t j = 0; j < _tasks.size(); j++) { for (size_t i = 0; i < _tasks[j].size(); i++) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 577b7ffa157f3c..7ba73442c90168 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -106,7 +106,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT); bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare"); -bvar::Adder g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count"); bvar::Adder g_fragment_executing_count("fragment_executing_count"); bvar::Status g_fragment_last_active_time( @@ -637,18 +636,13 @@ void FragmentMgr::remove_pipeline_context( { std::lock_guard lock(_lock); auto query_id = f_context->get_query_id(); - std::vector ins_ids; - f_context->instance_ids(ins_ids); int64 now = duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); g_fragment_executing_count << -1; g_fragment_last_active_time.set_value(now); - for (const auto& ins_id : ins_ids) { - LOG_INFO("Removing query {} instance {}", print_id(query_id), print_id(ins_id)); - _pipeline_map.erase(ins_id); - g_pipeline_fragment_instances_count << -1; - } + LOG_INFO("Removing query {} fragment {}", print_id(query_id), f_context->get_fragment_id()); + _pipeline_map.erase({query_id, f_context->get_fragment_id()}); } } @@ -782,11 +776,10 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { continue; } auto timeout_second = it.second->timeout_second(); - fmt::format_to(debug_string_buffer, - "No.{} (elapse_second={}s, query_timeout_second={}s, instance_id=" - "{}, is_timeout={}) : {}\n", - i, elapsed, timeout_second, print_id(it.first), - it.second->is_timeout(now), it.second->debug_string()); + fmt::format_to( + debug_string_buffer, + "No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}) : {}\n", i, + elapsed, timeout_second, it.second->is_timeout(now), it.second->debug_string()); i++; } } @@ -846,11 +839,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, for (const auto& local_param : params.local_params) { const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; std::lock_guard lock(_lock); - auto iter = _pipeline_map.find(fragment_instance_id); + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); if (iter != _pipeline_map.end()) { - return Status::InternalError( - "exec_plan_fragment input duplicated fragment_instance_id({})", - UniqueId(fragment_instance_id).to_string()); + return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})", + params.fragment_id); } query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } @@ -866,12 +858,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, g_fragment_executing_count << 1; g_fragment_last_active_time.set_value(now); std::lock_guard lock(_lock); - std::vector ins_ids; - context->instance_ids(ins_ids); // TODO: simplify this mapping - for (const auto& ins_id : ins_ids) { - _pipeline_map.insert({ins_id, context}); - } + _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); } query_ctx->set_pipeline_context(params.fragment_id, context); @@ -916,31 +904,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { << " is cancelled and removed. Reason: " << reason.to_string(); } -void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status reason) { - std::shared_ptr pipeline_ctx; - { - std::lock_guard state_lock(_lock); - DCHECK(!_pipeline_map.contains(instance_id)) - << " Pipeline tasks should be canceled by query instead of instance! Query ID: " - << print_id(_pipeline_map[instance_id]->get_query_id()); - const bool is_pipeline_instance = _pipeline_map.contains(instance_id); - if (is_pipeline_instance) { - auto itr = _pipeline_map.find(instance_id); - if (itr != _pipeline_map.end()) { - pipeline_ctx = itr->second; - } else { - LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id) - << " to cancel"; - return; - } - } - } - - if (pipeline_ctx != nullptr) { - pipeline_ctx->cancel(reason); - } -} - void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; @@ -1206,15 +1169,16 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, RuntimeFilterMgr* runtime_filter_mgr = nullptr; - const auto& fragment_instance_ids = request->fragment_instance_ids(); + const auto& fragment_ids = request->fragment_ids(); { std::unique_lock lock(_lock); - for (UniqueId fragment_instance_id : fragment_instance_ids) { - TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); - + for (auto fragment_id : fragment_ids) { if (is_pipeline) { - auto iter = _pipeline_map.find(tfragment_instance_id); + auto iter = _pipeline_map.find( + {UniqueId(request->query_id()).to_thrift(), fragment_id}); if (iter == _pipeline_map.end()) { + LOG(WARNING) << "No pipeline fragment is found: Query-ID = " + << request->query_id() << " fragment_id = " << fragment_id; continue; } pip_context = iter->second; diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index bc066066f7b6a6..41b63db0b23ad9 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -100,9 +100,6 @@ class FragmentMgr : public RestMonitorIface { Status trigger_pipeline_context_report(const ReportStatusRequest, std::shared_ptr&&); - // Cancel instance (pipeline or nonpipeline). - void cancel_instance(const TUniqueId instance_id, const Status reason); - // Can be used in both version. void cancel_query(const TUniqueId query_id, const Status reason); @@ -169,7 +166,10 @@ class FragmentMgr : public RestMonitorIface { // call _lock, so that there is dead lock. std::mutex _lock; - std::unordered_map> _pipeline_map; + // (QueryID, FragmentID) -> PipelineFragmentContext + std::unordered_map, + std::shared_ptr> + _pipeline_map; // query id -> QueryContext std::unordered_map> _query_ctx_map; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 7a6d6d3c53d49e..afc86f404cb8e0 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -195,8 +195,6 @@ class QueryContext { ThreadPool* get_memtable_flush_pool(); - std::vector get_fragment_instance_ids() const { return fragment_instance_ids; } - int64_t mem_limit() const { return _bytes_limit; } void set_merge_controller_handler( diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index fddf81b434bea7..a4631cfaba7929 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -228,7 +228,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( // so we need to copy to cnt_val cnt_val->producer_size = producer_size; cnt_val->runtime_filter_desc = *runtime_filter_desc; - cnt_val->target_info = *target_info; cnt_val->pool.reset(new ObjectPool()); cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc)); @@ -458,10 +457,17 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ } closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000); // set fragment-id - for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) { - PUniqueId* cur_id = closure->request_->add_fragment_instance_ids(); - cur_id->set_hi(target_fragment_instance_id.hi); - cur_id->set_lo(target_fragment_instance_id.lo); + if (target.__isset.target_fragment_ids) { + for (auto& target_fragment_id : target.target_fragment_ids) { + closure->request_->add_fragment_ids(target_fragment_id); + } + } else { + // FE not upgraded yet. + for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) { + PUniqueId* cur_id = closure->request_->add_fragment_instance_ids(); + cur_id->set_hi(target_fragment_instance_id.hi); + cur_id->set_lo(target_fragment_instance_id.lo); + } } std::shared_ptr stub( diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index d89a3b9f1b1768..b0aea7568cff65 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -168,7 +168,6 @@ class RuntimeFilterMergeControllerEntity { int producer_size; uint64_t global_size; TRuntimeFilterDesc runtime_filter_desc; - std::vector target_info; std::vector targetv2_info; IRuntimeFilter* filter = nullptr; std::unordered_set arrive_id; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index aa29661da02208..d56aa49b19b1cf 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -657,13 +657,6 @@ Status BaseBackendService::start_plan_fragment_execution( QuerySource::INTERNAL_FRONTEND); } -void BaseBackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params) { - LOG(INFO) << "cancel_plan_fragment(): instance_id=" << print_id(params.fragment_instance_id); - _exec_env->fragment_mgr()->cancel_instance( - params.fragment_instance_id, Status::InternalError("cancel message received from FE")); -} - void BaseBackendService::transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) { VLOG_ROW << "transmit_data(): instance_id=" << params.dest_fragment_instance_id diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 4d01107ba8a832..1d4219e21917b8 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -90,7 +90,7 @@ class BaseBackendService : public BackendServiceIf { const TExecPlanFragmentParams& params) override; void cancel_plan_fragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params) override; + const TCancelPlanFragmentParams& params) override {}; void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) override; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8f558726bffe96..b9f90242a29bc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1995,7 +1995,8 @@ private void assignRuntimeFilterAddr() throws Exception { List targetFragments = ridToTargetParam.computeIfAbsent(rid, k -> new ArrayList<>()); for (final FInstanceExecParam instance : params.instanceExecParams) { - targetFragments.add(new FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host))); + targetFragments.add(new FRuntimeFilterTargetParam(instance.fragment().getFragmentId().asInt(), + toBrpcHost(instance.host))); } } @@ -3179,8 +3180,8 @@ Map toThrift(int backendNum) { for (FRuntimeFilterTargetParam targetParam : fParams) { if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) { targetParamsV2.get(targetParam.targetFragmentInstanceAddr) - .target_fragment_instance_ids - .add(targetParam.targetFragmentInstanceId); + .target_fragment_ids + .add(targetParam.targetFragmentId); } else { targetParamsV2.put(targetParam.targetFragmentInstanceAddr, new TRuntimeFilterTargetParamsV2()); @@ -3188,11 +3189,15 @@ Map toThrift(int backendNum) { .target_fragment_instance_addr = targetParam.targetFragmentInstanceAddr; targetParamsV2.get(targetParam.targetFragmentInstanceAddr) - .target_fragment_instance_ids + .target_fragment_ids = new ArrayList<>(); + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_ids + .add(targetParam.targetFragmentId); + // `target_fragment_instance_ids` is a required field targetParamsV2.get(targetParam.targetFragmentInstanceAddr) .target_fragment_instance_ids - .add(targetParam.targetFragmentInstanceId); + = new ArrayList<>(); } } @@ -3201,7 +3206,8 @@ Map toThrift(int backendNum) { } else { List targetParams = Lists.newArrayList(); for (FRuntimeFilterTargetParam targetParam : fParams) { - targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId, + // Instance id make no sense if this runtime filter doesn't have remote targets. + targetParams.add(new TRuntimeFilterTargetParams(new TUniqueId(), targetParam.targetFragmentInstanceAddr)); } localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(), @@ -3371,12 +3377,12 @@ private void updateProfileIfPresent(Consumer profileAction) { // Runtime filter target fragment instance param static class FRuntimeFilterTargetParam { - public TUniqueId targetFragmentInstanceId; + public int targetFragmentId; public TNetworkAddress targetFragmentInstanceAddr; - public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) { - this.targetFragmentInstanceId = id; + public FRuntimeFilterTargetParam(int id, TNetworkAddress host) { + this.targetFragmentId = id; this.targetFragmentInstanceAddr = host; } } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 9abf9d7ea65036..f3764cea233806 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -613,6 +613,7 @@ message PPublishFilterRequestV2 { optional int64 merge_time = 9; optional bool contain_null = 10; optional bool ignored = 11; + repeated int32 fragment_ids = 12; }; message PPublishFilterResponse { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 0a3496ca434e11..5570019ee2b4ba 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -368,6 +368,7 @@ struct TRuntimeFilterTargetParamsV2 { 1: required list target_fragment_instance_ids // The address of the instance where the fragment is expected to run 2: required Types.TNetworkAddress target_fragment_instance_addr + 3: optional list target_fragment_ids } struct TRuntimeFilterParams {