Skip to content

Commit

Permalink
apacheGH-39064: [C++][Parquet] Support row group filtering for nested…
Browse files Browse the repository at this point in the history
… paths for struct fields (apache#39065)

### Rationale for this change

Currently when filtering with a nested field reference, we were taking the corresponding parquet SchemaField for just the first index of the nested path, i.e. the parent node in the Parquet schema. But logically, filtering on statistics only works for a primitive leaf node.

This PR changes that logic to iterate over all indices of the FieldPath, if nested, to ensure we use the actual corresponding child leaf node of the ParquetSchema to get the statistics from.

### Are there any user-facing changes?

No, only improving performance by doing the filtering at the row group stage, instead of afterwards on the read data

* Closes: apache#39064

Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
jorisvandenbossche authored and zanmato1984 committed Feb 28, 2024
1 parent 89aae31 commit 28387ed
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 10 deletions.
39 changes: 29 additions & 10 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ bool IsNan(const Scalar& value) {
}

std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) {
const FieldRef& field_ref, const SchemaField& schema_field,
const parquet::RowGroupMetaData& metadata) {
// For the remaining of this function, failure to extract/parse statistics
// are ignored by returning nullptr. The goal is two fold. First
// avoid an optimization which breaks the computation. Second, allow the
Expand All @@ -180,7 +181,8 @@ std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
return std::nullopt;
}

return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics);
return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, field_ref,
*statistics);
}

void AddColumnIndices(const SchemaField& schema_field,
Expand Down Expand Up @@ -360,8 +362,9 @@ Result<bool> IsSupportedParquetFile(const ParquetFileFormat& format,
} // namespace

std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics) {
auto field_expr = compute::field_ref(field.name());
const Field& field, const FieldRef& field_ref,
const parquet::Statistics& statistics) {
auto field_expr = compute::field_ref(field_ref);

// Optimize for corner case where all values are nulls
if (statistics.num_values() == 0 && statistics.null_count() > 0) {
Expand Down Expand Up @@ -418,6 +421,13 @@ std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpr
return std::nullopt;
}

std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics) {
const auto field_name = field.name();
return EvaluateStatisticsAsExpression(field, FieldRef(std::move(field_name)),
statistics);
}

ParquetFileFormat::ParquetFileFormat()
: FileFormat(std::make_shared<ParquetFragmentScanOptions>()) {}

Expand Down Expand Up @@ -810,7 +820,7 @@ Status ParquetFileFragment::SetMetadata(
manifest_ = std::move(manifest);

statistics_expressions_.resize(row_groups_->size(), compute::literal(true));
statistics_expressions_complete_.resize(physical_schema_->num_fields(), false);
statistics_expressions_complete_.resize(manifest_->descr->num_columns(), false);

for (int row_group : *row_groups_) {
// Ensure RowGroups are indexing valid RowGroups before augmenting.
Expand Down Expand Up @@ -900,16 +910,25 @@ Result<std::vector<compute::Expression>> ParquetFileFragment::TestRowGroups(
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*physical_schema_));

if (match.empty()) continue;
if (statistics_expressions_complete_[match[0]]) continue;
statistics_expressions_complete_[match[0]] = true;
const SchemaField* schema_field = &manifest_->schema_fields[match[0]];

for (size_t i = 1; i < match.indices().size(); ++i) {
if (schema_field->field->type()->id() != Type::STRUCT) {
return Status::Invalid("nested paths only supported for structs");
}
schema_field = &schema_field->children[match[i]];
}

if (!schema_field->is_leaf()) continue;
if (statistics_expressions_complete_[schema_field->column_index]) continue;
statistics_expressions_complete_[schema_field->column_index] = true;

const SchemaField& schema_field = manifest_->schema_fields[match[0]];
int i = 0;
for (int row_group : *row_groups_) {
auto row_group_metadata = metadata_->RowGroup(row_group);

if (auto minmax =
ColumnChunkStatisticsAsExpression(schema_field, *row_group_metadata)) {
if (auto minmax = ColumnChunkStatisticsAsExpression(ref, *schema_field,
*row_group_metadata)) {
FoldingAnd(&statistics_expressions_[i], std::move(*minmax));
ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i],
statistics_expressions_[i].Bind(*physical_schema_));
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics);

static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
const Field& field, const FieldRef& field_ref,
const parquet::Statistics& statistics);

private:
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
Expand Down Expand Up @@ -207,7 +211,11 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
/// or std::nullopt if all row groups are selected.
std::optional<std::vector<int>> row_groups_;

// the expressions (combined for all columns for which statistics have been
// processed) are stored per column group
std::vector<compute::Expression> statistics_expressions_;
// statistics status are kept track of by Parquet Schema column indices
// (i.e. not Arrow schema field index)
std::vector<bool> statistics_expressions_complete_;
std::shared_ptr<parquet::FileMetaData> metadata_;
std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragments) {
CountRowGroupsInFragment(fragment, {5, 6},
and_(greater_equal(field_ref("i64"), literal(6)),
less(field_ref("i64"), literal(8))));

// nested field reference
CountRowGroupsInFragment(fragment, {0, 1, 2, 3, 4},
less(field_ref(FieldRef("struct", "i32")), literal(6)));
CountRowGroupsInFragment(fragment, {1},
equal(field_ref(FieldRef("struct", "str")), literal("2")));
}

TEST_P(TestParquetFileFormatScan, ExplicitRowGroupSelection) {
Expand Down
36 changes: 36 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,42 @@ def test_fragments_parquet_subset_invalid(tempdir):
fragment.subset()


@pytest.mark.parquet
def test_fragments_parquet_subset_with_nested_fields(tempdir):
# ensure row group filtering with nested field works
f1 = pa.array([0, 1, 2, 3])
f21 = pa.array([0.1, 0.2, 0.3, 0.4])
f22 = pa.array([1, 2, 3, 4])
f2 = pa.StructArray.from_arrays([f21, f22], names=["f21", "f22"])
struct_col = pa.StructArray.from_arrays([f1, f2], names=["f1", "f2"])
table = pa.table({"col": struct_col})
pq.write_table(table, tempdir / "data_struct.parquet", row_group_size=2)

dataset = ds.dataset(tempdir / "data_struct.parquet", format="parquet")
fragment = list(dataset.get_fragments())[0]
assert fragment.num_row_groups == 2

subfrag = fragment.subset(ds.field("col", "f1") > 2)
assert subfrag.num_row_groups == 1
subfrag = fragment.subset(ds.field("col", "f1") > 5)
assert subfrag.num_row_groups == 0

subfrag = fragment.subset(ds.field("col", "f2", "f21") > 0)
assert subfrag.num_row_groups == 2
subfrag = fragment.subset(ds.field("col", "f2", "f22") <= 2)
assert subfrag.num_row_groups == 1

# nonexisting field ref
with pytest.raises(pa.ArrowInvalid, match="No match for FieldRef.Nested"):
fragment.subset(ds.field("col", "f3") > 0)

# comparison with struct field is not implemented
with pytest.raises(
NotImplementedError, match="Function 'greater' has no kernel matching"
):
fragment.subset(ds.field("col", "f2") > 0)


@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_repr(tempdir, dataset):
Expand Down

0 comments on commit 28387ed

Please sign in to comment.