diff --git a/be/src/connector/lake_connector.cpp b/be/src/connector/lake_connector.cpp index 28e4ec55d26c9..cd119a755904b 100644 --- a/be/src/connector/lake_connector.cpp +++ b/be/src/connector/lake_connector.cpp @@ -396,7 +396,7 @@ Status LakeDataSource::init_column_access_paths(Schema* schema) { _params.column_access_paths = &_column_access_paths; // update counter - COUNTER_UPDATE(_pushdown_access_paths_counter, leaf_size); + COUNTER_SET(_pushdown_access_paths_counter, leaf_size); return Status::OK(); } @@ -647,7 +647,7 @@ void LakeDataSource::update_counter() { COUNTER_UPDATE(_segments_read_count, _reader->stats().segments_read_count); COUNTER_UPDATE(_total_columns_data_page_count, _reader->stats().total_columns_data_page_count); - COUNTER_UPDATE(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size()); + COUNTER_SET(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size()); StarRocksMetrics::instance()->query_scan_bytes.increment(_bytes_read); StarRocksMetrics::instance()->query_scan_rows.increment(_raw_rows_read); diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.cpp b/be/src/exec/pipeline/scan/connector_scan_operator.cpp index ef50bb44624c5..0fe2522bf7f0a 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/connector_scan_operator.cpp @@ -302,11 +302,9 @@ ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int3 } } - // Only use one chunk source profile, so we can see metrics on scan operator level. - // Since there is adaptive io tasks feature, chunk sources will be used unevenly, - // which leads to sort of "skewed" profile and makes harder to analysis. - return std::make_shared(this, _chunk_source_profiles[0].get(), std::move(morsel), scan_node, - factory->get_chunk_buffer(), _enable_adaptive_io_tasks); + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), + std::move(morsel), scan_node, factory->get_chunk_buffer(), + _enable_adaptive_io_tasks); } void ConnectorScanOperator::attach_chunk_source(int32_t source_index) { diff --git a/be/src/exec/pipeline/scan/olap_chunk_source.cpp b/be/src/exec/pipeline/scan/olap_chunk_source.cpp index a9f6aa9fe125f..119834707b24f 100644 --- a/be/src/exec/pipeline/scan/olap_chunk_source.cpp +++ b/be/src/exec/pipeline/scan/olap_chunk_source.cpp @@ -341,7 +341,7 @@ Status OlapChunkSource::_init_column_access_paths(Schema* schema) { _params.column_access_paths = &_column_access_paths; // update counter - COUNTER_UPDATE(_pushdown_access_paths_counter, leaf_size); + COUNTER_SET(_pushdown_access_paths_counter, leaf_size); return Status::OK(); } @@ -626,7 +626,7 @@ void OlapChunkSource::_update_counter() { COUNTER_UPDATE(_segments_read_count, _reader->stats().segments_read_count); COUNTER_UPDATE(_total_columns_data_page_count, _reader->stats().total_columns_data_page_count); - COUNTER_UPDATE(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size()); + COUNTER_SET(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size()); StarRocksMetrics::instance()->query_scan_bytes.increment(_scan_bytes); StarRocksMetrics::instance()->query_scan_rows.increment(_scan_rows_num); diff --git a/be/src/exec/pipeline/scan/olap_scan_operator.cpp b/be/src/exec/pipeline/scan/olap_scan_operator.cpp index 1ccaade51ea97..ca6654cf0de6e 100644 --- a/be/src/exec/pipeline/scan/olap_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/olap_scan_operator.cpp @@ -102,8 +102,8 @@ void OlapScanOperator::do_close(RuntimeState* state) {} ChunkSourcePtr OlapScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { auto* olap_scan_node = down_cast(_scan_node); - return std::make_shared(this, _chunk_source_profiles[0].get(), std::move(morsel), olap_scan_node, - _ctx.get()); + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), std::move(morsel), + olap_scan_node, _ctx.get()); } int64_t OlapScanOperator::get_scan_table_id() const { diff --git a/be/src/exec/pipeline/scan/scan_operator.cpp b/be/src/exec/pipeline/scan/scan_operator.cpp index 6a29fdfd16708..b6829190b39c3 100644 --- a/be/src/exec/pipeline/scan/scan_operator.cpp +++ b/be/src/exec/pipeline/scan/scan_operator.cpp @@ -103,14 +103,14 @@ void ScanOperator::close(RuntimeState* state) { _default_buffer_capacity_counter = ADD_COUNTER_SKIP_MERGE(_unique_metrics, "DefaultChunkBufferCapacity", TUnit::UNIT, TCounterMergeType::SKIP_ALL); - COUNTER_UPDATE(_default_buffer_capacity_counter, static_cast(default_buffer_capacity())); + COUNTER_SET(_default_buffer_capacity_counter, static_cast(default_buffer_capacity())); _buffer_capacity_counter = ADD_COUNTER_SKIP_MERGE(_unique_metrics, "ChunkBufferCapacity", TUnit::UNIT, TCounterMergeType::SKIP_ALL); - COUNTER_UPDATE(_buffer_capacity_counter, static_cast(buffer_capacity())); + COUNTER_SET(_buffer_capacity_counter, static_cast(buffer_capacity())); _tablets_counter = ADD_COUNTER_SKIP_MERGE(_unique_metrics, "TabletCount", TUnit::UNIT, TCounterMergeType::SKIP_FIRST_MERGE); - COUNTER_UPDATE(_tablets_counter, static_cast(_source_factory()->num_total_original_morsels())); + COUNTER_SET(_tablets_counter, static_cast(_source_factory()->num_total_original_morsels())); _merge_chunk_source_profiles(state); @@ -420,8 +420,8 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in #endif DeferOp timer_defer([chunk_source]() { - COUNTER_UPDATE(chunk_source->scan_timer(), chunk_source->io_task_wait_timer()->value() + - chunk_source->io_task_exec_timer()->value()); + COUNTER_SET(chunk_source->scan_timer(), + chunk_source->io_task_wait_timer()->value() + chunk_source->io_task_exec_timer()->value()); }); COUNTER_UPDATE(chunk_source->io_task_wait_timer(), MonotonicNanos() - io_task_start_nano); SCOPED_TIMER(chunk_source->io_task_exec_timer()); @@ -582,8 +582,14 @@ void ScanOperator::_merge_chunk_source_profiles(RuntimeState* state) { if (!query_ctx->enable_profile()) { return; } + std::vector profiles(_chunk_source_profiles.size()); + for (auto i = 0; i < _chunk_source_profiles.size(); i++) { + profiles[i] = _chunk_source_profiles[i].get(); + } + + ObjectPool obj_pool; + RuntimeProfile* merged_profile = RuntimeProfile::merge_isomorphic_profiles(&obj_pool, profiles, false); - RuntimeProfile* merged_profile = _chunk_source_profiles[0].get(); _unique_metrics->copy_all_info_strings_from(merged_profile); _unique_metrics->copy_all_counters_from(merged_profile); diff --git a/be/src/exec/pipeline/scan/schema_scan_operator.cpp b/be/src/exec/pipeline/scan/schema_scan_operator.cpp index f740b4d9733c2..a5490b06ceb16 100644 --- a/be/src/exec/pipeline/scan/schema_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/schema_scan_operator.cpp @@ -54,7 +54,8 @@ Status SchemaScanOperator::do_prepare(RuntimeState* state) { void SchemaScanOperator::do_close(RuntimeState* state) {} ChunkSourcePtr SchemaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { - return std::make_shared(this, _chunk_source_profiles[0].get(), std::move(morsel), _ctx); + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), + std::move(morsel), _ctx); } ChunkPtr SchemaScanOperator::get_chunk_from_buffer() { diff --git a/be/src/exec/stream/scan/stream_scan_operator.cpp b/be/src/exec/stream/scan/stream_scan_operator.cpp index 5c7780e18f73a..aa7fa32691fe1 100644 --- a/be/src/exec/stream/scan/stream_scan_operator.cpp +++ b/be/src/exec/stream/scan/stream_scan_operator.cpp @@ -118,8 +118,9 @@ Status StreamScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_ ChunkSourcePtr StreamScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { auto* scan_node = down_cast(_scan_node); auto* factory = down_cast(_factory); - return std::make_shared(this, _chunk_source_profiles[0].get(), std::move(morsel), scan_node, - factory->get_chunk_buffer(), enable_adaptive_io_tasks()); + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), + std::move(morsel), scan_node, factory->get_chunk_buffer(), + enable_adaptive_io_tasks()); } bool StreamScanOperator::is_finished() const { diff --git a/be/src/exec/tablet_scanner.cpp b/be/src/exec/tablet_scanner.cpp index 2e6074c564ab6..1a72a87d6e8ae 100644 --- a/be/src/exec/tablet_scanner.cpp +++ b/be/src/exec/tablet_scanner.cpp @@ -385,7 +385,7 @@ void TabletScanner::update_counter() { COUNTER_UPDATE(_parent->_segments_read_count, _reader->stats().segments_read_count); COUNTER_UPDATE(_parent->_total_columns_data_page_count, _reader->stats().total_columns_data_page_count); - COUNTER_UPDATE(_parent->_pushdown_predicates_counter, (int64_t)_params.pred_tree.size()); + COUNTER_SET(_parent->_pushdown_predicates_counter, (int64_t)_params.pred_tree.size()); StarRocksMetrics::instance()->query_scan_bytes.increment(_compressed_bytes_read); StarRocksMetrics::instance()->query_scan_rows.increment(_raw_rows_read); @@ -409,21 +409,21 @@ void TabletScanner::update_counter() { for (auto& [k, v] : _reader->stats().flat_json_hits) { RuntimeProfile::Counter* path_counter = ADD_COUNTER(path_profile, k, TUnit::UNIT); - COUNTER_UPDATE(path_counter, v); + COUNTER_SET(path_counter, v); } } if (_reader->stats().dynamic_json_hits.size() > 0) { auto path_profile = _parent->_scan_profile->create_child("FlatJsonUnhits"); for (auto& [k, v] : _reader->stats().dynamic_json_hits) { RuntimeProfile::Counter* path_counter = ADD_COUNTER(path_profile, k, TUnit::UNIT); - COUNTER_UPDATE(path_counter, v); + COUNTER_SET(path_counter, v); } } if (_reader->stats().merge_json_hits.size() > 0) { auto path_profile = _parent->_scan_profile->create_child("MergeJsonUnhits"); for (auto& [k, v] : _reader->stats().merge_json_hits) { RuntimeProfile::Counter* path_counter = ADD_COUNTER(path_profile, k, TUnit::UNIT); - COUNTER_UPDATE(path_counter, v); + COUNTER_SET(path_counter, v); } } if (_reader->stats().json_init_ns > 0) {