Skip to content

Commit

Permalink
[Enhancement] Support PredicateTree for parquet page index (#53848)
Browse files Browse the repository at this point in the history
Signed-off-by: Smith Cruise <[email protected]>
  • Loading branch information
Smith-Cruise authored Dec 13, 2024
1 parent f766cc2 commit ab8abca
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 164 deletions.
9 changes: 9 additions & 0 deletions be/src/formats/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ class ColumnReader {
return true;
}

// return true means page index filter happened
// return false means no page index filter happened
virtual StatusOr<bool> page_index_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges, CompoundNodeType pred_relation,
const uint64_t rg_first_row, const uint64_t rg_num_rows) {
DCHECK(row_ranges->empty());
return false;
}

private:
// _parquet_field is generated by parquet format, so ParquetField's children order may different from ColumnReader's children.
const ParquetField* _parquet_field = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ Status FileReader::_init_group_readers() {
_group_reader_param.not_existed_slots = &fd_scanner_ctx.not_existed_slots;
// for pageIndex
_group_reader_param.min_max_conjunct_ctxs = fd_scanner_ctx.min_max_conjunct_ctxs;
_group_reader_param.predicate_tree = &fd_scanner_ctx.predicate_tree;

int64_t row_group_first_row = 0;
// select and create row group readers.
Expand Down
30 changes: 24 additions & 6 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "formats/parquet/page_index_reader.h"
#include "formats/parquet/scalar_column_reader.h"
#include "formats/parquet/schema.h"
#include "formats/parquet/zone_map_filter_evaluator.h"
#include "gutil/strings/substitute.h"
#include "runtime/types.h"
#include "simd/simd.h"
Expand Down Expand Up @@ -79,12 +80,29 @@ Status GroupReader::_deal_with_pageindex() {
if (config::parquet_page_index_enable) {
SCOPED_RAW_TIMER(&_param.stats->page_index_ns);
_param.stats->rows_before_page_index += _row_group_metadata->num_rows;
auto page_index_reader =
std::make_unique<PageIndexReader>(this, _param.file, _column_readers, _row_group_metadata,
_param.min_max_conjunct_ctxs, _param.conjunct_ctxs_by_slot);
ASSIGN_OR_RETURN(bool flag, page_index_reader->generate_read_range(_range));
if (flag && !_is_group_filtered) {
page_index_reader->select_column_offset_index();
if (config::parquet_advance_zonemap_filter) {
ASSIGN_OR_RETURN(auto sparse_range, _param.predicate_tree->visit(ZoneMapEvaluator<FilterLevel::PAGE_INDEX>{
*_param.predicate_tree, this}));
if (sparse_range.has_value()) {
if (sparse_range.value().empty()) {
// the whole row group has been filtered
_is_group_filtered = true;
} else if (sparse_range->span_size() < _row_group_metadata->num_rows) {
// some pages have been filtered
_range = sparse_range.value();
for (const auto& pair : _column_readers) {
pair.second->select_offset_index(_range, _row_group_first_row);

This comment has been minimized.

Copy link
@shaeqahmed

shaeqahmed Dec 26, 2024

@Smith-Cruise maybe i dont understand correctly, but won't using the combined row ranges from the compound predicate tree zone map evaluation that unions and intersects the ranges across columns (based on AND/OR), for selecting the offset index for each individual column (narrows down which pages need to be read per column) result in more IO than is necessary, since this calculation should only be narrowed down by conjunctions from other column predicates and never widened because of a disjunction from a different column. e.g. for compound predicate column_A = "rare" OR column_B = "common" the range / pages to read for A should not be increased because of the inclusion of a non selective predicate on column B.

}
}
}
} else {
auto page_index_reader =
std::make_unique<PageIndexReader>(this, _param.file, _column_readers, _row_group_metadata,
_param.min_max_conjunct_ctxs, _param.conjunct_ctxs_by_slot);
ASSIGN_OR_RETURN(bool flag, page_index_reader->generate_read_range(_range));
if (flag && !_is_group_filtered) {
page_index_reader->select_column_offset_index();
}
}
}

Expand Down
1 change: 1 addition & 0 deletions be/src/formats/parquet/group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct GroupReaderParam {

// used for pageIndex
std::vector<ExprContext*> min_max_conjunct_ctxs;
const PredicateTree* predicate_tree = nullptr;

// partition column
const std::vector<HdfsScannerContext::ColumnInfo>* partition_columns = nullptr;
Expand Down
119 changes: 102 additions & 17 deletions be/src/formats/parquet/scalar_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
#include "formats/parquet/scalar_column_reader.h"

#include "formats/parquet/stored_column_reader_with_index.h"
#include "formats/parquet/utils.h"
#include "formats/parquet/zone_map_filter_evaluator.h"
#include "gutil/casts.h"
#include "io/shared_buffered_input_stream.h"
#include "simd/simd.h"
#include "statistics_helper.h"
#include "utils.h"

namespace starrocks::parquet {

Expand All @@ -28,16 +29,20 @@ StatusOr<bool> FixedValueColumnReader::row_group_zone_map_filter(const std::vect
const uint64_t rg_first_row,
const uint64_t rg_num_rows) const {
ZoneMapDetail zone_map{_fixed_value, _fixed_value, _fixed_value.is_null()};
auto is_satisfy = [&](const ZoneMapDetail& detail) {
if (pred_relation == CompoundNodeType::AND) {
return std::ranges::all_of(predicates, [&](const auto* pred) { return pred->zone_map_filter(detail); });
} else {
return predicates.empty() ||
std::ranges::any_of(predicates, [&](const auto* pred) { return pred->zone_map_filter(detail); });
}
};
return ZoneMapEvaluatorUtils::is_satisfy(predicates, zone_map, pred_relation);
}

StatusOr<bool> FixedValueColumnReader::page_index_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges,
CompoundNodeType pred_relation,
const uint64_t rg_first_row,
const uint64_t rg_num_rows) {
DCHECK(row_ranges->empty());
ZoneMapDetail zone_map{_fixed_value, _fixed_value, _fixed_value.is_null()};

return is_satisfy(zone_map);
// is_satisfy = true means no filter happened, return false
// is_satisfy = false means entire row group can be filtered, filter happened, return true
return !ZoneMapEvaluatorUtils::is_satisfy(predicates, zone_map, pred_relation);
}

Status ScalarColumnReader::read_range(const Range<uint64_t>& range, const Filter* filter, ColumnPtr& dst) {
Expand Down Expand Up @@ -320,16 +325,96 @@ StatusOr<bool> ScalarColumnReader::row_group_zone_map_filter(const std::vector<c
return true;
}

auto is_satisfy = [&](const ZoneMapDetail& detail) {
if (pred_relation == CompoundNodeType::AND) {
return std::ranges::all_of(predicates, [&](const auto* pred) { return pred->zone_map_filter(detail); });
return ZoneMapEvaluatorUtils::is_satisfy(predicates, zone_map_detail.value(), pred_relation);
}

StatusOr<bool> ScalarColumnReader::page_index_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges,
CompoundNodeType pred_relation,
const uint64_t rg_first_row, const uint64_t rg_num_rows) {
DCHECK(row_ranges->empty());
const tparquet::ColumnChunk* chunk_meta = get_chunk_metadata();
if (!chunk_meta->__isset.column_index_offset || !chunk_meta->__isset.offset_index_offset ||
!chunk_meta->__isset.meta_data) {
// no page index, dont filter
return false;
}

// get column index
int64_t column_index_offset = chunk_meta->column_index_offset;
uint32_t column_index_length = chunk_meta->column_index_length;

std::vector<uint8_t> page_index_data;
page_index_data.reserve(column_index_length);
RETURN_IF_ERROR(_opts.file->read_at_fully(column_index_offset, page_index_data.data(), column_index_length));

tparquet::ColumnIndex column_index;
RETURN_IF_ERROR(deserialize_thrift_msg(page_index_data.data(), &column_index_length, TProtocolType::COMPACT,
&column_index));

ASSIGN_OR_RETURN(const tparquet::OffsetIndex* offset_index, get_offset_index(rg_first_row));

const size_t page_num = column_index.min_values.size();

ColumnPtr min_column = ColumnHelper::create_column(*_col_type, true);
ColumnPtr max_column = ColumnHelper::create_column(*_col_type, true);
// deal with min_values
auto st = StatisticsHelper::decode_value_into_column(min_column, column_index.min_values, *_col_type,
get_column_parquet_field(), _opts.timezone);
if (!st.ok()) {
// swallow error status
LOG(INFO) << "Error when decode min/max statistics, type " << _col_type->debug_string();
return false;
}
// deal with max_values
st = StatisticsHelper::decode_value_into_column(max_column, column_index.max_values, *_col_type,
get_column_parquet_field(), _opts.timezone);
if (!st.ok()) {
// swallow error status
LOG(INFO) << "Error when decode min/max statistics, type " << _col_type->debug_string();
return false;
}

DCHECK_EQ(page_num, min_column->size());
DCHECK_EQ(page_num, max_column->size());

// fill ZoneMapDetail
const std::vector<bool> null_pages = column_index.null_pages;
std::vector<ZoneMapDetail> zone_map_details{};
for (size_t i = 0; i < page_num; i++) {
if (null_pages[i]) {
// all null
zone_map_details.emplace_back(Datum{}, Datum{}, true);
} else {
return predicates.empty() ||
std::ranges::any_of(predicates, [&](const auto* pred) { return pred->zone_map_filter(detail); });
bool has_null = column_index.null_counts[i] > 0;
zone_map_details.emplace_back(min_column->get(i), max_column->get(i), has_null);
}
};
}

// select all pages by default
Filter page_filter(page_num, 1);
for (size_t i = 0; i < page_num; i++) {
page_filter[i] = ZoneMapEvaluatorUtils::is_satisfy(predicates, zone_map_details[i], pred_relation);
}

return is_satisfy(zone_map_detail.value());
if (!SIMD::contain_zero(page_filter)) {
// no page has been filtered
return false;
}

for (int i = 0; i < page_num; i++) {
if (page_filter[i]) {
int64_t first_row = offset_index->page_locations[i].first_row_index + rg_first_row;
int64_t end_row = first_row;
if (i != page_num - 1) {
end_row = offset_index->page_locations[i + 1].first_row_index + rg_first_row;
} else {
end_row = rg_first_row + rg_num_rows;
}
row_ranges->add(Range<uint64_t>(first_row, end_row));
}
}
return true;
}

} // namespace starrocks::parquet
8 changes: 8 additions & 0 deletions be/src/formats/parquet/scalar_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class FixedValueColumnReader final : public ColumnReader {
CompoundNodeType pred_relation, const uint64_t rg_first_row,
const uint64_t rg_num_rows) const override;

StatusOr<bool> page_index_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges, CompoundNodeType pred_relation,
const uint64_t rg_first_row, const uint64_t rg_num_rows) override;

private:
const Datum _fixed_value;
};
Expand Down Expand Up @@ -121,6 +125,10 @@ class ScalarColumnReader final : public ColumnReader {
CompoundNodeType pred_relation, uint64_t rg_first_row,
uint64_t rg_num_rows) const override;

StatusOr<bool> page_index_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges, CompoundNodeType pred_relation,
const uint64_t rg_first_row, const uint64_t rg_num_rows) override;

private:
// Returns true if all of the data pages in the column chunk are dict encoded
bool _column_all_pages_dict_encoded();
Expand Down
24 changes: 22 additions & 2 deletions be/src/formats/parquet/zone_map_filter_evaluator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ namespace starrocks::parquet {

enum class FilterLevel { ROW_GROUP = 0, PAGE_INDEX };

class ZoneMapEvaluatorUtils {
public:
static bool is_satisfy(const std::vector<const ColumnPredicate*>& predicates, const ZoneMapDetail& detail,
const CompoundNodeType pred_relation) {
if (pred_relation == CompoundNodeType::AND) {
return std::ranges::all_of(predicates, [&](const auto* pred) { return pred->zone_map_filter(detail); });
} else {
return predicates.empty() ||
std::ranges::any_of(predicates, [&](const auto* pred) { return pred->zone_map_filter(detail); });
}
}
};

template <FilterLevel level>
struct ZoneMapEvaluator {
template <CompoundNodeType Type>
Expand All @@ -35,7 +48,7 @@ struct ZoneMapEvaluator {
for (const auto& [cid, col_preds] : cid_to_col_preds) {
SparseRange<uint64_t> cur_row_ranges;

const auto* column_reader = group_reader->get_column_reader(cid);
auto* column_reader = group_reader->get_column_reader(cid);

if (column_reader == nullptr) {
// ColumnReader not found, select all by default
Expand All @@ -47,7 +60,14 @@ struct ZoneMapEvaluator {
cur_row_ranges.add({rg_first_row, rg_first_row + rg_num_rows});
}
} else {
return Status::InternalError("not supported yet");
ASSIGN_OR_RETURN(bool has_filtered,
column_reader->page_index_zone_map_filter(
col_preds, &cur_row_ranges, Type, group_reader->get_row_group_first_row(),
group_reader->get_row_group_metadata()->num_rows));
if (!has_filtered) {
// no filter happened, select the whole row group by default
cur_row_ranges.add({rg_first_row, rg_first_row + rg_num_rows});
}
}

merge_row_ranges<Type>(row_ranges, cur_row_ranges);
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/rowset/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "storage/index/vector/vector_index_reader_factory.h"
#include "storage/index/vector/vector_search_option.h"
#include "storage/lake/update_manager.h"
#include "storage/olap_runtime_range_pruner.h"
#include "storage/olap_runtime_range_pruner.hpp"
#include "storage/projection_iterator.h"
#include "storage/range.h"
Expand Down
2 changes: 2 additions & 0 deletions be/test/exec/hdfs_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2720,6 +2720,7 @@ TEST_F(HdfsScannerTest, TestMinMaxFilterWhenContainsComplexTypes) {
ExprContext* ctx = create_expr_context(&_pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
param->scanner_conjunct_ctxs.push_back(ctx);
param->all_conjunct_ctxs.push_back(ctx);
}
{
std::vector<TExprNode> nodes;
Expand All @@ -2729,6 +2730,7 @@ TEST_F(HdfsScannerTest, TestMinMaxFilterWhenContainsComplexTypes) {
ExprContext* ctx = create_expr_context(&_pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
param->scanner_conjunct_ctxs.push_back(ctx);
param->all_conjunct_ctxs.push_back(ctx);
}

ASSERT_OK(Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state));
Expand Down
Loading

0 comments on commit ab8abca

Please sign in to comment.