Skip to content

Commit

Permalink
[BugFix] Revert [Enhancement] make connector scan operator profile le…
Browse files Browse the repository at this point in the history
…ss skew (backport #53465) (#53592)
  • Loading branch information
before-Sunrise authored and meegoo committed Dec 10, 2024
1 parent 479b908 commit c07f7b6
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 24 deletions.
4 changes: 2 additions & 2 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ Status LakeDataSource::init_column_access_paths(Schema* schema) {
_params.column_access_paths = &_column_access_paths;

// update counter
COUNTER_UPDATE(_pushdown_access_paths_counter, leaf_size);
COUNTER_SET(_pushdown_access_paths_counter, leaf_size);
return Status::OK();
}

Expand Down Expand Up @@ -647,7 +647,7 @@ void LakeDataSource::update_counter() {
COUNTER_UPDATE(_segments_read_count, _reader->stats().segments_read_count);
COUNTER_UPDATE(_total_columns_data_page_count, _reader->stats().total_columns_data_page_count);

COUNTER_UPDATE(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());
COUNTER_SET(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());

StarRocksMetrics::instance()->query_scan_bytes.increment(_bytes_read);
StarRocksMetrics::instance()->query_scan_rows.increment(_raw_rows_read);
Expand Down
8 changes: 3 additions & 5 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,9 @@ ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int3
}
}

// Only use one chunk source profile, so we can see metrics on scan operator level.
// Since there is adaptive io tasks feature, chunk sources will be used unevenly,
// which leads to sort of "skewed" profile and makes harder to analysis.
return std::make_shared<ConnectorChunkSource>(this, _chunk_source_profiles[0].get(), std::move(morsel), scan_node,
factory->get_chunk_buffer(), _enable_adaptive_io_tasks);
return std::make_shared<ConnectorChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), scan_node, factory->get_chunk_buffer(),
_enable_adaptive_io_tasks);
}

void ConnectorScanOperator::attach_chunk_source(int32_t source_index) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ Status OlapChunkSource::_init_column_access_paths(Schema* schema) {
_params.column_access_paths = &_column_access_paths;

// update counter
COUNTER_UPDATE(_pushdown_access_paths_counter, leaf_size);
COUNTER_SET(_pushdown_access_paths_counter, leaf_size);
return Status::OK();
}

Expand Down Expand Up @@ -626,7 +626,7 @@ void OlapChunkSource::_update_counter() {
COUNTER_UPDATE(_segments_read_count, _reader->stats().segments_read_count);
COUNTER_UPDATE(_total_columns_data_page_count, _reader->stats().total_columns_data_page_count);

COUNTER_UPDATE(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());
COUNTER_SET(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());

StarRocksMetrics::instance()->query_scan_bytes.increment(_scan_bytes);
StarRocksMetrics::instance()->query_scan_rows.increment(_scan_rows_num);
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/scan/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ void OlapScanOperator::do_close(RuntimeState* state) {}

ChunkSourcePtr OlapScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
auto* olap_scan_node = down_cast<OlapScanNode*>(_scan_node);
return std::make_shared<OlapChunkSource>(this, _chunk_source_profiles[0].get(), std::move(morsel), olap_scan_node,
_ctx.get());
return std::make_shared<OlapChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(), std::move(morsel),
olap_scan_node, _ctx.get());
}

int64_t OlapScanOperator::get_scan_table_id() const {
Expand Down
18 changes: 12 additions & 6 deletions be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ void ScanOperator::close(RuntimeState* state) {

_default_buffer_capacity_counter = ADD_COUNTER_SKIP_MERGE(_unique_metrics, "DefaultChunkBufferCapacity",
TUnit::UNIT, TCounterMergeType::SKIP_ALL);
COUNTER_UPDATE(_default_buffer_capacity_counter, static_cast<int64_t>(default_buffer_capacity()));
COUNTER_SET(_default_buffer_capacity_counter, static_cast<int64_t>(default_buffer_capacity()));
_buffer_capacity_counter =
ADD_COUNTER_SKIP_MERGE(_unique_metrics, "ChunkBufferCapacity", TUnit::UNIT, TCounterMergeType::SKIP_ALL);
COUNTER_UPDATE(_buffer_capacity_counter, static_cast<int64_t>(buffer_capacity()));
COUNTER_SET(_buffer_capacity_counter, static_cast<int64_t>(buffer_capacity()));

_tablets_counter =
ADD_COUNTER_SKIP_MERGE(_unique_metrics, "TabletCount", TUnit::UNIT, TCounterMergeType::SKIP_FIRST_MERGE);
COUNTER_UPDATE(_tablets_counter, static_cast<int64_t>(_source_factory()->num_total_original_morsels()));
COUNTER_SET(_tablets_counter, static_cast<int64_t>(_source_factory()->num_total_original_morsels()));

_merge_chunk_source_profiles(state);

Expand Down Expand Up @@ -420,8 +420,8 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
#endif

DeferOp timer_defer([chunk_source]() {
COUNTER_UPDATE(chunk_source->scan_timer(), chunk_source->io_task_wait_timer()->value() +
chunk_source->io_task_exec_timer()->value());
COUNTER_SET(chunk_source->scan_timer(),
chunk_source->io_task_wait_timer()->value() + chunk_source->io_task_exec_timer()->value());
});
COUNTER_UPDATE(chunk_source->io_task_wait_timer(), MonotonicNanos() - io_task_start_nano);
SCOPED_TIMER(chunk_source->io_task_exec_timer());
Expand Down Expand Up @@ -582,8 +582,14 @@ void ScanOperator::_merge_chunk_source_profiles(RuntimeState* state) {
if (!query_ctx->enable_profile()) {
return;
}
std::vector<RuntimeProfile*> profiles(_chunk_source_profiles.size());
for (auto i = 0; i < _chunk_source_profiles.size(); i++) {
profiles[i] = _chunk_source_profiles[i].get();
}

ObjectPool obj_pool;
RuntimeProfile* merged_profile = RuntimeProfile::merge_isomorphic_profiles(&obj_pool, profiles, false);

RuntimeProfile* merged_profile = _chunk_source_profiles[0].get();
_unique_metrics->copy_all_info_strings_from(merged_profile);
_unique_metrics->copy_all_counters_from(merged_profile);

Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/scan/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ Status SchemaScanOperator::do_prepare(RuntimeState* state) {
void SchemaScanOperator::do_close(RuntimeState* state) {}

ChunkSourcePtr SchemaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
return std::make_shared<SchemaChunkSource>(this, _chunk_source_profiles[0].get(), std::move(morsel), _ctx);
return std::make_shared<SchemaChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), _ctx);
}

ChunkPtr SchemaScanOperator::get_chunk_from_buffer() {
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/stream/scan/stream_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ Status StreamScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_
ChunkSourcePtr StreamScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
auto* scan_node = down_cast<ConnectorScanNode*>(_scan_node);
auto* factory = down_cast<StreamScanOperatorFactory*>(_factory);
return std::make_shared<StreamChunkSource>(this, _chunk_source_profiles[0].get(), std::move(morsel), scan_node,
factory->get_chunk_buffer(), enable_adaptive_io_tasks());
return std::make_shared<StreamChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), scan_node, factory->get_chunk_buffer(),
enable_adaptive_io_tasks());
}

bool StreamScanOperator::is_finished() const {
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/tablet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ void TabletScanner::update_counter() {
COUNTER_UPDATE(_parent->_segments_read_count, _reader->stats().segments_read_count);
COUNTER_UPDATE(_parent->_total_columns_data_page_count, _reader->stats().total_columns_data_page_count);

COUNTER_UPDATE(_parent->_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());
COUNTER_SET(_parent->_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());

StarRocksMetrics::instance()->query_scan_bytes.increment(_compressed_bytes_read);
StarRocksMetrics::instance()->query_scan_rows.increment(_raw_rows_read);
Expand All @@ -409,21 +409,21 @@ void TabletScanner::update_counter() {

for (auto& [k, v] : _reader->stats().flat_json_hits) {
RuntimeProfile::Counter* path_counter = ADD_COUNTER(path_profile, k, TUnit::UNIT);
COUNTER_UPDATE(path_counter, v);
COUNTER_SET(path_counter, v);
}
}
if (_reader->stats().dynamic_json_hits.size() > 0) {
auto path_profile = _parent->_scan_profile->create_child("FlatJsonUnhits");
for (auto& [k, v] : _reader->stats().dynamic_json_hits) {
RuntimeProfile::Counter* path_counter = ADD_COUNTER(path_profile, k, TUnit::UNIT);
COUNTER_UPDATE(path_counter, v);
COUNTER_SET(path_counter, v);
}
}
if (_reader->stats().merge_json_hits.size() > 0) {
auto path_profile = _parent->_scan_profile->create_child("MergeJsonUnhits");
for (auto& [k, v] : _reader->stats().merge_json_hits) {
RuntimeProfile::Counter* path_counter = ADD_COUNTER(path_profile, k, TUnit::UNIT);
COUNTER_UPDATE(path_counter, v);
COUNTER_SET(path_counter, v);
}
}
if (_reader->stats().json_init_ns > 0) {
Expand Down

0 comments on commit c07f7b6

Please sign in to comment.