diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index c983af0fb3ea71..606df9fe980571 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -975,11 +975,10 @@ class RuntimePredicateWrapper { Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, std::shared_ptr* res, - bool build_bf_exactly) { + int node_id, std::shared_ptr* res) { *res = std::make_shared(state, desc); (*res)->set_role(role); - return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); + return (*res)->init_with_desc(desc, query_options, node_id); } RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() { @@ -1348,7 +1347,7 @@ std::string IRuntimeFilter::formatted_state() const { } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - int node_id, bool build_bf_exactly) { + int node_id) { // if node_id == -1 , it shouldn't be a consumer DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer())); @@ -1370,21 +1369,10 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size ? options->runtime_bloom_filter_max_size : 0; - auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size; - // We build runtime filter by exact distinct count if all of 3 conditions are met: - // 1. Only 1 join key - // 2. Bloom filter - // 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join). - params.build_bf_exactly = - build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER || - _runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); + params.build_bf_exactly = desc->__isset.build_bf_exactly && desc->build_bf_exactly; params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv; - if (!sync_filter_size) { - params.build_bf_exactly &= !_is_broadcast_join; - } - if (desc->__isset.bloom_filter_size_bytes) { params.bloom_filter_size = desc->bloom_filter_size_bytes; } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 63ef3e2dbd4653..49232236179669 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -213,8 +213,7 @@ class IRuntimeFilter { static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, std::shared_ptr* res, - bool build_bf_exactly = false); + int node_id, std::shared_ptr* res); RuntimeFilterContextSPtr& get_shared_context_ref(); @@ -260,7 +259,7 @@ class IRuntimeFilter { // init filter with desc Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - int node_id = -1, bool build_bf_exactly = false); + int node_id = -1); // serialize _wrapper to protobuf Status serialize(PMergeFilterRequest* request, void** data, int* len); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 864927e1721923..e4c67b24193593 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -91,8 +91,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo RETURN_IF_ERROR(_hash_table_init(state)); _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->register_producer_runtime_filter( - p._runtime_filter_descs[i], &_runtime_filters[i], _build_expr_ctxs.size() == 1)); + RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], + &_runtime_filters[i])); } _runtime_filter_slots = diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 9e3e8a08ca83a5..35b9de619f393d 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -67,7 +67,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], - &_runtime_filters[i], false)); + &_runtime_filters[i])); } return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1e72fa756d3dd3..f96e4152500808 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -269,8 +269,11 @@ void FragmentMgr::stop() { // Only me can delete { - std::lock_guard lock(_lock); + std::unique_lock lock(_query_ctx_map_mutex); _query_ctx_map.clear(); + } + { + std::unique_lock lock(_pipeline_map_mutex); _pipeline_map.clear(); } _thread_pool->shutdown(); @@ -583,11 +586,7 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r TUniqueId query_id; query_id.__set_hi(request->query_id().hi()); query_id.__set_lo(request->query_id().lo()); - std::shared_ptr q_ctx = nullptr; - { - std::lock_guard lock(_lock); - q_ctx = _get_or_erase_query_ctx(query_id); - } + auto q_ctx = get_query_ctx(query_id); if (q_ctx) { q_ctx->set_ready_to_execute(Status::OK()); LOG_INFO("Query {} start execution", print_id(query_id)); @@ -602,114 +601,107 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r void FragmentMgr::remove_pipeline_context( std::shared_ptr f_context) { - { - std::lock_guard lock(_lock); - auto query_id = f_context->get_query_id(); - int64 now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - g_fragment_executing_count << -1; - g_fragment_last_active_time.set_value(now); - _pipeline_map.erase({query_id, f_context->get_fragment_id()}); - } + auto query_id = f_context->get_query_id(); + int64 now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + g_fragment_executing_count << -1; + g_fragment_last_active_time.set_value(now); + + std::unique_lock lock(_pipeline_map_mutex); + _pipeline_map.erase({query_id, f_context->get_fragment_id()}); } -std::shared_ptr FragmentMgr::_get_or_erase_query_ctx(const TUniqueId& query_id) { +std::shared_ptr FragmentMgr::get_query_ctx(const TUniqueId& query_id) { + std::shared_lock lock(_query_ctx_map_mutex); auto search = _query_ctx_map.find(query_id); if (search != _query_ctx_map.end()) { if (auto q_ctx = search->second.lock()) { return q_ctx; - } else { - LOG(WARNING) << "Query context (query id = " << print_id(query_id) - << ") has been released."; - _query_ctx_map.erase(search); - return nullptr; } } return nullptr; } -std::shared_ptr FragmentMgr::get_or_erase_query_ctx_with_lock( - const TUniqueId& query_id) { - std::unique_lock lock(_lock); - return _get_or_erase_query_ctx(query_id); -} - -template -Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, - QuerySource query_source, - std::shared_ptr& query_ctx) { +Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& params, + TUniqueId query_id, bool pipeline, + QuerySource query_source, + std::shared_ptr& query_ctx) { DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", { return Status::InternalError("FragmentMgr._get_query_ctx.failed, query id {}", print_id(query_id)); }); + + // Find _query_ctx_map, in case some other request has already + // create the query fragments context. + query_ctx = get_query_ctx(query_id); if (params.is_simplified_param) { // Get common components from _query_ctx_map - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { - query_ctx = q_ctx; - } else { + if (!query_ctx) { return Status::InternalError( "Failed to get query fragments context. Query {} may be timeout or be " "cancelled. host: {}", print_id(query_id), BackendOptions::get_localhost()); } } else { - // Find _query_ctx_map, in case some other request has already - // create the query fragments context. - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { - query_ctx = q_ctx; - return Status::OK(); - } + if (!query_ctx) { + std::unique_lock lock(_query_ctx_map_mutex); + // Only one thread need create query ctx. other thread just get query_ctx in _query_ctx_map. + auto search = _query_ctx_map.find(query_id); + if (search != _query_ctx_map.end()) { + query_ctx = search->second.lock(); + } - // First time a fragment of a query arrived. print logs. - LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord - << ", total fragment num on current host: " << params.fragment_num_on_host - << ", fe process uuid: " << params.query_options.fe_process_uuid - << ", query type: " << params.query_options.query_type - << ", report audit fe:" << params.current_connect_fe; - - // This may be a first fragment request of the query. - // Create the query fragments context. - query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options, - params.coord, params.is_nereids, - params.current_connect_fe, query_source); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); - RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, - &(query_ctx->desc_tbl))); - // set file scan range params - if (params.__isset.file_scan_params) { - query_ctx->file_scan_range_params_map = params.file_scan_params; - } + if (!query_ctx) { + // First time a fragment of a query arrived. print logs. + LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord + << ", total fragment num on current host: " << params.fragment_num_on_host + << ", fe process uuid: " << params.query_options.fe_process_uuid + << ", query type: " << params.query_options.query_type + << ", report audit fe:" << params.current_connect_fe; + + // This may be a first fragment request of the query. + // Create the query fragments context. + query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options, + params.coord, params.is_nereids, + params.current_connect_fe, query_source); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); + RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, + &(query_ctx->desc_tbl))); + // set file scan range params + if (params.__isset.file_scan_params) { + query_ctx->file_scan_range_params_map = params.file_scan_params; + } - query_ctx->query_globals = params.query_globals; + query_ctx->query_globals = params.query_globals; - if (params.__isset.resource_info) { - query_ctx->user = params.resource_info.user; - query_ctx->group = params.resource_info.group; - query_ctx->set_rsc_info = true; - } + if (params.__isset.resource_info) { + query_ctx->user = params.resource_info.user; + query_ctx->group = params.resource_info.group; + query_ctx->set_rsc_info = true; + } - _set_scan_concurrency(params, query_ctx.get()); - - if (params.__isset.workload_groups && !params.workload_groups.empty()) { - uint64_t tg_id = params.workload_groups[0].id; - WorkloadGroupPtr workload_group_ptr = - _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); - if (workload_group_ptr != nullptr) { - RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); - RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); - _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id), - tg_id); - } else { - LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id()) - << "can't find its workload group " << tg_id; + _set_scan_concurrency(params, query_ctx.get()); + + if (params.__isset.workload_groups && !params.workload_groups.empty()) { + uint64_t tg_id = params.workload_groups[0].id; + WorkloadGroupPtr workload_group_ptr = + _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); + if (workload_group_ptr != nullptr) { + RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); + RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); + _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( + print_id(query_id), tg_id); + } else { + LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id()) + << "can't find its workload group " << tg_id; + } + } + // There is some logic in query ctx's dctor, we could not check if exists and delete the + // temp query ctx now. For example, the query id maybe removed from workload group's queryset. + _query_ctx_map.insert({query_id, query_ctx}); } } - // There is some logic in query ctx's dctor, we could not check if exists and delete the - // temp query ctx now. For example, the query id maybe removed from workload group's queryset. - _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); } return Status::OK(); } @@ -723,13 +715,13 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { fmt::memory_buffer debug_string_buffer; size_t i = 0; { - std::lock_guard lock(_lock); fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are still running! duration_limit={}\n", _pipeline_map.size(), duration); - timespec now; clock_gettime(CLOCK_MONOTONIC, &now); + + std::shared_lock lock(_pipeline_map_mutex); for (auto& it : _pipeline_map) { auto elapsed = it.second->elapsed_time() / 1000000000.0; if (elapsed < duration) { @@ -748,7 +740,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { } std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { return q_ctx->print_all_pipeline_context(); } else { return fmt::format( @@ -767,7 +759,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, << apache::thrift::ThriftDebugString(params.query_options).c_str(); std::shared_ptr query_ctx; - RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_source, query_ctx)); + RETURN_IF_ERROR( + _get_or_create_query_ctx(params, params.query_id, true, query_source, query_ctx)); SCOPED_ATTACH_TASK(query_ctx.get()); int64_t duration_ns = 0; std::shared_ptr context = @@ -800,16 +793,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, } { - // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. - std::lock_guard lock(_lock); for (const auto& local_param : params.local_params) { const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; - auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); - if (iter != _pipeline_map.end()) { - return Status::InternalError( - "exec_plan_fragment query_id({}) input duplicated fragment_id({})", - print_id(params.query_id), params.fragment_id); - } query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } @@ -818,7 +803,15 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, .count(); g_fragment_executing_count << 1; g_fragment_last_active_time.set_value(now); - // TODO: simplify this mapping + + // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. + std::unique_lock lock(_pipeline_map_mutex); + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); + if (iter != _pipeline_map.end()) { + return Status::InternalError( + "exec_plan_fragment query_id({}) input duplicated fragment_id({})", + print_id(params.query_id), params.fragment_id); + } _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); } @@ -848,8 +841,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { std::shared_ptr query_ctx = nullptr; std::vector all_instance_ids; { - std::lock_guard state_lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; // Copy instanceids to avoid concurrent modification. // And to reduce the scope of lock. @@ -862,7 +854,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { } query_ctx->cancel(reason); { - std::lock_guard state_lock(_lock); + std::unique_lock l(_query_ctx_map_mutex); _query_ctx_map.erase(query_id); } LOG(INFO) << "Query " << print_id(query_id) @@ -898,7 +890,7 @@ void FragmentMgr::cancel_worker() { std::vector> ctx; { - std::lock_guard lock(_lock); + std::shared_lock lock(_pipeline_map_mutex); ctx.reserve(_pipeline_map.size()); for (auto& pipeline_itr : _pipeline_map) { ctx.push_back(pipeline_itr.second); @@ -910,29 +902,34 @@ void FragmentMgr::cancel_worker() { std::unordered_map, BrpcItem> brpc_stub_with_queries; { - std::lock_guard lock(_lock); - for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { - if (auto q_ctx = it->second.lock()) { - if (q_ctx->is_timeout(now)) { - LOG_WARNING("Query {} is timeout", print_id(it->first)); - queries_timeout.push_back(it->first); - } else if (config::enable_brpc_connection_check) { - auto brpc_stubs = q_ctx->get_using_brpc_stubs(); - for (auto& item : brpc_stubs) { - if (!brpc_stub_with_queries.contains(item.second)) { - brpc_stub_with_queries.emplace(item.second, - BrpcItem {item.first, {q_ctx}}); - } else { - brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx); + { + // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must + // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok + std::unique_lock lock(_query_ctx_map_mutex); + for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { + if (auto q_ctx = it->second.lock()) { + if (q_ctx->is_timeout(now)) { + LOG_WARNING("Query {} is timeout", print_id(it->first)); + queries_timeout.push_back(it->first); + } else if (config::enable_brpc_connection_check) { + auto brpc_stubs = q_ctx->get_using_brpc_stubs(); + for (auto& item : brpc_stubs) { + if (!brpc_stub_with_queries.contains(item.second)) { + brpc_stub_with_queries.emplace(item.second, + BrpcItem {item.first, {q_ctx}}); + } else { + brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx); + } } } + ++it; + } else { + it = _query_ctx_map.erase(it); } - ++it; - } else { - it = _query_ctx_map.erase(it); } } + std::shared_lock lock(_query_ctx_map_mutex); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel @@ -1215,7 +1212,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, const auto& fragment_ids = request->fragment_ids(); { - std::unique_lock lock(_lock); + std::shared_lock lock(_pipeline_map_mutex); for (auto fragment_id : fragment_ids) { auto iter = _pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id}); @@ -1267,8 +1264,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1291,8 +1287,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1312,8 +1307,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1330,7 +1324,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, void FragmentMgr::get_runtime_query_info(std::vector* query_info_list) { { - std::lock_guard lock(_lock); + std::unique_lock lock(_query_ctx_map_mutex); for (auto iter = _query_ctx_map.begin(); iter != _query_ctx_map.end();) { if (auto q_ctx = iter->second.lock()) { WorkloadQueryInfo workload_query_info; @@ -1353,19 +1347,9 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id, return Status::InvalidArgument("exes_status is nullptr"); } - std::shared_ptr query_context = nullptr; - - { - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { - query_context = q_ctx; - } else { - return Status::NotFound("Query {} has been released", print_id(query_id)); - } - } - + std::shared_ptr query_context = get_query_ctx(query_id); if (query_context == nullptr) { - return Status::NotFound("Query {} not found", print_id(query_id)); + return Status::NotFound("Query {} not found or released", print_id(query_id)); } *exec_status = query_context->get_realtime_exec_status(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 0eac0469683961..63d666788d0a5f 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -133,7 +133,7 @@ class FragmentMgr : public RestMonitorIface { ThreadPool* get_thread_pool() { return _thread_pool.get(); } int32_t running_query_num() { - std::unique_lock ctx_lock(_lock); + std::shared_lock lock(_query_ctx_map_mutex); return _query_ctx_map.size(); } @@ -145,7 +145,7 @@ class FragmentMgr : public RestMonitorIface { Status get_realtime_exec_status(const TUniqueId& query_id, TReportExecStatusParams* exec_status); - std::shared_ptr get_or_erase_query_ctx_with_lock(const TUniqueId& query_id); + std::shared_ptr get_query_ctx(const TUniqueId& query_id); private: struct BrpcItem { @@ -153,14 +153,12 @@ class FragmentMgr : public RestMonitorIface { std::vector> queries; }; - std::shared_ptr _get_or_erase_query_ctx(const TUniqueId& query_id); - template void _set_scan_concurrency(const Param& params, QueryContext* query_ctx); - template - Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, - QuerySource query_type, std::shared_ptr& query_ctx); + Status _get_or_create_query_ctx(const TPipelineFragmentParams& params, TUniqueId query_id, + bool pipeline, QuerySource query_type, + std::shared_ptr& query_ctx); void _check_brpc_available(const std::shared_ptr& brpc_stub, const BrpcItem& brpc_item); @@ -168,20 +166,21 @@ class FragmentMgr : public RestMonitorIface { // This is input params ExecEnv* _exec_env = nullptr; + // The lock protect the `_pipeline_map` + std::shared_mutex _pipeline_map_mutex; + // (QueryID, FragmentID) -> PipelineFragmentContext + phmap::flat_hash_map, + std::shared_ptr> + _pipeline_map; + // The lock should only be used to protect the structures in fragment manager. Has to be // used in a very small scope because it may dead lock. For example, if the _lock is used // in prepare stage, the call path is prepare --> expr prepare --> may call allocator // when allocate failed, allocator may call query_is_cancelled, query is callced will also // call _lock, so that there is dead lock. - std::mutex _lock; - - // (QueryID, FragmentID) -> PipelineFragmentContext - std::unordered_map, - std::shared_ptr> - _pipeline_map; - + std::shared_mutex _query_ctx_map_mutex; // query id -> QueryContext - std::unordered_map> _query_ctx_map; + phmap::flat_hash_map> _query_ctx_map; std::unordered_map> _bf_size_map; CountDownLatch _stop_background_threads_latch; diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 0cb313747b0373..dd426f1ab81d3e 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -45,8 +45,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig _backend_id(backend_id), _enable_profile(enable_profile) { std::shared_ptr query_context = - ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock( - _load_id.to_thrift()); + ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift()); std::shared_ptr mem_tracker = nullptr; WorkloadGroupPtr wg_ptr = nullptr; diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 752e2ff95b2917..60da45fa685fbf 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -428,7 +428,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e TUniqueId load_tid = ((UniqueId)load_id).to_thrift(); #ifndef BE_TEST std::shared_ptr query_context = - ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(load_tid); + ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid); if (query_context != nullptr) { _query_thread_context = {load_tid, query_context->query_mem_tracker, query_context->workload_group()}; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index bb100fcbb42ec5..c16db7c67d3420 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -90,7 +90,7 @@ std::vector> RuntimeFilterMgr::get_consume_filte Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, std::shared_ptr* consumer_filter, - bool build_bf_exactly, bool need_local_merge) { + bool need_local_merge) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; bool has_exist = false; @@ -110,7 +110,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc if (!has_exist) { std::shared_ptr filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::CONSUMER, - node_id, &filter, build_bf_exactly)); + node_id, &filter)); _consumer_map[key].emplace_back(node_id, filter); *consumer_filter = filter; } else if (!need_local_merge) { @@ -122,7 +122,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc Status RuntimeFilterMgr::register_local_merge_producer_filter( const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options, - std::shared_ptr producer_filter, bool build_bf_exactly) { + std::shared_ptr producer_filter) { DCHECK(_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -143,8 +143,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter( if (iter->second.filters.empty()) { std::shared_ptr merge_filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, - RuntimeFilterRole::PRODUCER, -1, &merge_filter, - build_bf_exactly)); + RuntimeFilterRole::PRODUCER, -1, &merge_filter)); merge_filter->set_ignored(); iter->second.filters.emplace_back(merge_filter); } @@ -181,10 +180,9 @@ doris::LocalMergeFilters* RuntimeFilterMgr::get_local_merge_producer_filters(int return &iter->second; } -Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, - std::shared_ptr* producer_filter, - bool build_bf_exactly) { +Status RuntimeFilterMgr::register_producer_filter( + const TRuntimeFilterDesc& desc, const TQueryOptions& options, + std::shared_ptr* producer_filter) { DCHECK(!_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -196,7 +194,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc return Status::InvalidArgument("filter has registed"); } RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1, - producer_filter, build_bf_exactly)); + producer_filter)); _producer_map.emplace(key, *producer_filter); return Status::OK(); } @@ -233,8 +231,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; - RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, - -1, false)); + RETURN_IF_ERROR( + cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1)); cnt_val->filter->set_ignored(); _filter_map.emplace(filter_id, cnt_val); return Status::OK(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 0a6f8318feaba0..9f4cf5f4e22a07 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -100,19 +100,17 @@ class RuntimeFilterMgr { // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, std::shared_ptr* consumer_filter, - bool build_bf_exactly = false, bool need_local_merge = false); + bool need_local_merge = false); Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr producer_filter, - bool build_bf_exactly = false); + std::shared_ptr producer_filter); Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); LocalMergeFilters* get_local_merge_producer_filters(int filter_id); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr* producer_filter, - bool build_bf_exactly = false); + std::shared_ptr* producer_filter); // update filter by remote void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 344180bad771ac..579329c1082633 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -295,7 +295,7 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt } std::weak_ptr RuntimeState::get_query_ctx_weak() { - return _exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id()); + return _exec_env->fragment_mgr()->get_query_ctx(_query_ctx->query_id()); } void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId& id) { @@ -516,14 +516,13 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() { } Status RuntimeState::register_producer_runtime_filter( - const TRuntimeFilterDesc& desc, std::shared_ptr* producer_filter, - bool build_bf_exactly) { + const TRuntimeFilterDesc& desc, std::shared_ptr* producer_filter) { // Producers are created by local runtime filter mgr and shared by global runtime filter manager. // When RF is published, consumers in both global and local RF mgr will be found. - RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly)); + RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, query_options(), + producer_filter)); RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( - desc, query_options(), *producer_filter, build_bf_exactly)); + desc, query_options(), *producer_filter)); return Status::OK(); } @@ -532,10 +531,10 @@ Status RuntimeState::register_consumer_runtime_filter( std::shared_ptr* consumer_filter) { if (desc.has_remote_targets || need_local_merge) { return global_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, - consumer_filter, false, true); + consumer_filter, true); } else { return local_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, - consumer_filter, false, false); + consumer_filter, false); } } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 0bc81bca4d99a1..ad63510e2af82c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -561,8 +561,7 @@ class RuntimeState { } Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc, - std::shared_ptr* producer_filter, - bool build_bf_exactly); + std::shared_ptr* producer_filter); Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index 32fc9d5efce771..e0ec2bef62fc2a 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -36,6 +36,7 @@ #include "vec/data_types/data_type_string.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" class Arena; class IColumn; @@ -598,3 +599,5 @@ class AggregateFunctionGuard { }; } // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp index 18662bf66cf38c..8bf6c32c0872de 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp @@ -29,6 +29,7 @@ #include "vec/functions/function.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" AggregateFunctionPtr create_aggregate_function_approx_count_distinct( const std::string& name, const DataTypes& argument_types, const bool result_is_nullable, diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h index d267499e059818..3ef22be9fca74c 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h +++ b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h @@ -38,6 +38,7 @@ #include "vec/io/io_helper.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { class Arena; class BufferReadable; @@ -64,8 +65,7 @@ struct AggregateFunctionApproxCountDistinctData { void write(BufferWritable& buf) const { std::string result; result.resize(hll_data.max_serialized_size()); - int size = hll_data.serialize((uint8_t*)result.data()); - result.resize(size); + result.resize(hll_data.serialize((uint8_t*)result.data())); write_binary(result, buf); } @@ -136,3 +136,5 @@ class AggregateFunctionApproxCountDistinct final }; } // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.cpp b/be/src/vec/aggregate_functions/aggregate_function_avg.cpp index 6a6711f90f983e..6109f0b0c601cd 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.cpp @@ -25,6 +25,7 @@ #include "vec/core/field.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" template struct Avg { diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h b/be/src/vec/aggregate_functions/aggregate_function_avg.h index 62fbb8078ea949..8b24db692aef05 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h @@ -41,6 +41,7 @@ #include "vec/io/io_helper.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { class Arena; class BufferReadable; @@ -72,7 +73,8 @@ struct AggregateFunctionAvgData { ResultT result() const { if constexpr (std::is_floating_point_v) { if constexpr (std::numeric_limits::is_iec559) { - return static_cast(sum) / count; /// allow division by zero + return static_cast(sum) / + static_cast(count); /// allow division by zero } } @@ -91,7 +93,7 @@ struct AggregateFunctionAvgData { if constexpr (IsDecimal256) { return static_cast(sum / T(count)); } else { - return static_cast(sum) / count; + return static_cast(sum) / static_cast(count); } } } @@ -124,7 +126,11 @@ class AggregateFunctionAvg final IsDecimalV2, ColumnDecimal, std::conditional_t, ColumnDecimal, ColumnFloat64>>; + // The result calculated by PercentileApprox is an approximate value, + // so the underlying storage uses float. The following calls will involve + // an implicit cast to float. + using DataType = typename Data::ResultType; /// ctor for native types AggregateFunctionAvg(const DataTypes& argument_types_) : IAggregateFunctionDataHelper>(argument_types_), @@ -148,9 +154,9 @@ class AggregateFunctionAvg final const auto& column = assert_cast(*columns[0]); if constexpr (IsDecimalNumber) { - this->data(place).sum += column.get_data()[row_num].value; + this->data(place).sum += (DataType)column.get_data()[row_num].value; } else { - this->data(place).sum += column.get_data()[row_num]; + this->data(place).sum += (DataType)column.get_data()[row_num]; } ++this->data(place).count; } @@ -282,3 +288,5 @@ class AggregateFunctionAvg final }; } // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.cpp b/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.cpp index fc5df5303fd15d..70a707b02e992b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.cpp @@ -21,6 +21,7 @@ #include "vec/aggregate_functions/helpers.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" void register_aggregate_function_avg_weighted(AggregateFunctionSimpleFactory& factory) { factory.register_function_both("avg_weighted", creator_with_type::creator); diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h b/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h index b59a3dccf0cea8..d1a5921b45039f 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h +++ b/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h @@ -35,6 +35,7 @@ #include "vec/io/io_helper.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { class Arena; class BufferReadable; @@ -57,7 +58,7 @@ struct AggregateFunctionAvgWeightedData { DecimalV2Value value = binary_cast(data_val); data_sum = data_sum + (double(value) * weight_val); } else { - data_sum = data_sum + (data_val * weight_val); + data_sum = data_sum + (double(data_val) * weight_val); } weight_sum = weight_sum + weight_val; } @@ -138,3 +139,5 @@ class AggregateFunctionAvgWeight final }; } // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_binary.h b/be/src/vec/aggregate_functions/aggregate_function_binary.h index 9fba9d11a1013a..fd5fc55d253661 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_binary.h +++ b/be/src/vec/aggregate_functions/aggregate_function_binary.h @@ -36,6 +36,7 @@ #include "vec/io/io_helper.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" template typename Moments> struct StatFunc { @@ -127,3 +128,5 @@ AggregateFunctionPtr create_with_two_basic_numeric_types(const DataTypePtr& firs } } // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_bit.cpp b/be/src/vec/aggregate_functions/aggregate_function_bit.cpp index 97a6c0e92fa723..981ced1fbd5a46 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bit.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_bit.cpp @@ -24,6 +24,7 @@ #include "vec/aggregate_functions/helpers.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" void register_aggregate_function_bit(AggregateFunctionSimpleFactory& factory) { factory.register_function_both( diff --git a/be/src/vec/aggregate_functions/aggregate_function_bit.h b/be/src/vec/aggregate_functions/aggregate_function_bit.h index 1ab01b03ceea38..d9760fdd30080b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bit.h +++ b/be/src/vec/aggregate_functions/aggregate_function_bit.h @@ -30,6 +30,7 @@ #include "vec/io/io_helper.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { class Arena; class BufferReadable; @@ -142,4 +143,5 @@ class AggregateFunctionBitwise final } }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp b/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp index e9c86d4b9556da..47ddf2d81b6a71 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp @@ -23,6 +23,7 @@ #include "vec/data_types/data_type_nullable.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" template class AggregateFunctionTemplate> AggregateFunctionPtr create_with_int_data_type(const DataTypes& argument_type) { @@ -33,7 +34,11 @@ AggregateFunctionPtr create_with_int_data_type(const DataTypes& argument_type) { return std::make_shared>>( \ argument_type); \ } - FOR_INTEGER_TYPES(DISPATCH) + // Keep consistent with the FE definition; the function does not have an int128 type. + DISPATCH(Int8) + DISPATCH(Int16) + DISPATCH(Int32) + DISPATCH(Int64) #undef DISPATCH LOG(WARNING) << "with unknowed type, failed in create_with_int_data_type bitmap_union_int" << " and type is: " << argument_type[0]->get_name(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h index b0619a63e1ffe8..fb17b0a80be092 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h @@ -38,6 +38,7 @@ #include "vec/data_types/data_type_number.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { class Arena; class BufferReadable; @@ -432,4 +433,5 @@ AggregateFunctionPtr create_aggregate_function_bitmap_union(const std::string& n const DataTypes& argument_types, const bool result_is_nullable); -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.cpp b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.cpp index 0b95ddfd46f0d5..2a2c86303f3000 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.cpp @@ -23,6 +23,7 @@ #include "vec/data_types/data_type_nullable.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" template AggregateFunctionPtr create_with_int_data_type(const DataTypes& argument_types) { @@ -32,7 +33,11 @@ AggregateFunctionPtr create_with_int_data_type(const DataTypes& argument_types) if (which.idx == TypeIndex::TYPE) { \ return std::make_shared>(argument_types); \ } - FOR_INTEGER_TYPES(DISPATCH) + // Keep consistent with the FE definition; the function does not have an int128 type. + DISPATCH(Int8) + DISPATCH(Int16) + DISPATCH(Int32) + DISPATCH(Int64) #undef DISPATCH LOG(WARNING) << "with unknown type, failed in create_with_int_data_type bitmap_union_int" << " and type is: " << argument_types[0]->get_name(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h index 5747faf1b8e8c1..bff32aa606ccd2 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h @@ -31,6 +31,7 @@ #include "vec/data_types/data_type_bitmap.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { class Arena; class BufferReadable; @@ -226,4 +227,5 @@ class AggregateFunctionBitmapAgg final } }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.cpp b/be/src/vec/aggregate_functions/aggregate_function_collect.cpp index d726b7c6355318..a4853ff98659a0 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_collect.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_collect.cpp @@ -26,6 +26,7 @@ #include "vec/aggregate_functions/helpers.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" template AggregateFunctionPtr do_create_agg_function_collect(bool distinct, const DataTypes& argument_types, diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h b/be/src/vec/aggregate_functions/aggregate_function_collect.h index da310c6e0cc4c2..2d18a56313f3f9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_collect.h +++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h @@ -46,6 +46,7 @@ #include "vec/io/var_int.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { class Arena; } // namespace vectorized @@ -836,3 +837,5 @@ class AggregateFunctionCollect }; } // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_combinator.h b/be/src/vec/aggregate_functions/aggregate_function_combinator.h index 1593d74ed4e59d..0908ac8d0278f1 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_combinator.h +++ b/be/src/vec/aggregate_functions/aggregate_function_combinator.h @@ -26,6 +26,7 @@ #include "vec/data_types/data_type.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" /** Aggregate function combinator allows to take one aggregate function * and transform it to another aggregate function. @@ -69,3 +70,5 @@ class IAggregateFunctionCombinator { }; } // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_corr.cpp b/be/src/vec/aggregate_functions/aggregate_function_corr.cpp index cdaab6e086f4a5..e0a51ca6629a06 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_corr.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_corr.cpp @@ -21,6 +21,7 @@ #include "vec/core/types.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" template struct CorrMoment { diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.cpp b/be/src/vec/aggregate_functions/aggregate_function_count.cpp index 5cfe5af41982f6..72d12cf65fe9d0 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_count.cpp @@ -26,6 +26,7 @@ #include "vec/aggregate_functions/factory_helpers.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" AggregateFunctionPtr create_aggregate_function_count(const std::string& name, const DataTypes& argument_types, diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h b/be/src/vec/aggregate_functions/aggregate_function_count.h index 7b54d074683b04..630994a7967957 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count.h +++ b/be/src/vec/aggregate_functions/aggregate_function_count.h @@ -41,6 +41,7 @@ #include "vec/io/var_int.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { class Arena; class BufferReadable; @@ -321,3 +322,5 @@ class AggregateFunctionCountNotNullUnary final }; } // namespace doris::vectorized + +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.cpp b/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.cpp index 093b31d57db554..20235d9e2ef2e9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.cpp @@ -26,6 +26,7 @@ #include "vec/core/types.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" AggregateFunctionPtr create_aggregate_function_count_by_enum(const std::string& name, const DataTypes& argument_types, diff --git a/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h b/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h index 1f5093de68263e..543ae55f872da6 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h +++ b/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h @@ -32,6 +32,7 @@ #include "vec/io/io_helper.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" struct CountByEnumData { std::unordered_map cbe; @@ -46,8 +47,7 @@ void build_json_from_vec(rapidjson::StringBuffer& buffer, doc.SetArray(); rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); - int vec_size_number = data_vec.size(); - for (int idx = 0; idx < vec_size_number; ++idx) { + for (size_t idx = 0; idx < data_vec.size(); ++idx) { rapidjson::Value obj(rapidjson::kObjectType); rapidjson::Value obj_cbe(rapidjson::kObjectType); @@ -239,4 +239,5 @@ class AggregateFunctionCountByEnum final size_t arg_count; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized +#include "common/compile_check_end.h" diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp index 4c5fe1321952d6..d9c091fb601868 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp @@ -28,6 +28,7 @@ #include "vec/data_types/data_type_nullable.h" namespace doris::vectorized { +#include "common/compile_check_begin.h" template