From 28387edc0a769b80bff3d278b4036d2297419b6c Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Mon, 8 Jan 2024 16:06:59 +0100 Subject: [PATCH] GH-39064: [C++][Parquet] Support row group filtering for nested paths for struct fields (#39065) ### Rationale for this change Currently when filtering with a nested field reference, we were taking the corresponding parquet SchemaField for just the first index of the nested path, i.e. the parent node in the Parquet schema. But logically, filtering on statistics only works for a primitive leaf node. This PR changes that logic to iterate over all indices of the FieldPath, if nested, to ensure we use the actual corresponding child leaf node of the ParquetSchema to get the statistics from. ### Are there any user-facing changes? No, only improving performance by doing the filtering at the row group stage, instead of afterwards on the read data * Closes: #39064 Authored-by: Joris Van den Bossche Signed-off-by: Joris Van den Bossche --- cpp/src/arrow/dataset/file_parquet.cc | 39 ++++++++++++++++------ cpp/src/arrow/dataset/file_parquet.h | 8 +++++ cpp/src/arrow/dataset/file_parquet_test.cc | 6 ++++ python/pyarrow/tests/test_dataset.py | 36 ++++++++++++++++++++ 4 files changed, 79 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 1c2fd2dea6307..0ce08502921f3 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -161,7 +161,8 @@ bool IsNan(const Scalar& value) { } std::optional ColumnChunkStatisticsAsExpression( - const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { + const FieldRef& field_ref, const SchemaField& schema_field, + const parquet::RowGroupMetaData& metadata) { // For the remaining of this function, failure to extract/parse statistics // are ignored by returning nullptr. The goal is two fold. First // avoid an optimization which breaks the computation. Second, allow the @@ -180,7 +181,8 @@ std::optional ColumnChunkStatisticsAsExpression( return std::nullopt; } - return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics); + return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, field_ref, + *statistics); } void AddColumnIndices(const SchemaField& schema_field, @@ -360,8 +362,9 @@ Result IsSupportedParquetFile(const ParquetFileFormat& format, } // namespace std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( - const Field& field, const parquet::Statistics& statistics) { - auto field_expr = compute::field_ref(field.name()); + const Field& field, const FieldRef& field_ref, + const parquet::Statistics& statistics) { + auto field_expr = compute::field_ref(field_ref); // Optimize for corner case where all values are nulls if (statistics.num_values() == 0 && statistics.null_count() > 0) { @@ -418,6 +421,13 @@ std::optional ParquetFileFragment::EvaluateStatisticsAsExpr return std::nullopt; } +std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( + const Field& field, const parquet::Statistics& statistics) { + const auto field_name = field.name(); + return EvaluateStatisticsAsExpression(field, FieldRef(std::move(field_name)), + statistics); +} + ParquetFileFormat::ParquetFileFormat() : FileFormat(std::make_shared()) {} @@ -810,7 +820,7 @@ Status ParquetFileFragment::SetMetadata( manifest_ = std::move(manifest); statistics_expressions_.resize(row_groups_->size(), compute::literal(true)); - statistics_expressions_complete_.resize(physical_schema_->num_fields(), false); + statistics_expressions_complete_.resize(manifest_->descr->num_columns(), false); for (int row_group : *row_groups_) { // Ensure RowGroups are indexing valid RowGroups before augmenting. @@ -900,16 +910,25 @@ Result> ParquetFileFragment::TestRowGroups( ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*physical_schema_)); if (match.empty()) continue; - if (statistics_expressions_complete_[match[0]]) continue; - statistics_expressions_complete_[match[0]] = true; + const SchemaField* schema_field = &manifest_->schema_fields[match[0]]; + + for (size_t i = 1; i < match.indices().size(); ++i) { + if (schema_field->field->type()->id() != Type::STRUCT) { + return Status::Invalid("nested paths only supported for structs"); + } + schema_field = &schema_field->children[match[i]]; + } + + if (!schema_field->is_leaf()) continue; + if (statistics_expressions_complete_[schema_field->column_index]) continue; + statistics_expressions_complete_[schema_field->column_index] = true; - const SchemaField& schema_field = manifest_->schema_fields[match[0]]; int i = 0; for (int row_group : *row_groups_) { auto row_group_metadata = metadata_->RowGroup(row_group); - if (auto minmax = - ColumnChunkStatisticsAsExpression(schema_field, *row_group_metadata)) { + if (auto minmax = ColumnChunkStatisticsAsExpression(ref, *schema_field, + *row_group_metadata)) { FoldingAnd(&statistics_expressions_[i], std::move(*minmax)); ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i], statistics_expressions_[i].Bind(*physical_schema_)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index f527ce5d70ae0..1e81a34fb3cf0 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -177,6 +177,10 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { static std::optional EvaluateStatisticsAsExpression( const Field& field, const parquet::Statistics& statistics); + static std::optional EvaluateStatisticsAsExpression( + const Field& field, const FieldRef& field_ref, + const parquet::Statistics& statistics); + private: ParquetFileFragment(FileSource source, std::shared_ptr format, compute::Expression partition_expression, @@ -207,7 +211,11 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// or std::nullopt if all row groups are selected. std::optional> row_groups_; + // the expressions (combined for all columns for which statistics have been + // processed) are stored per column group std::vector statistics_expressions_; + // statistics status are kept track of by Parquet Schema column indices + // (i.e. not Arrow schema field index) std::vector statistics_expressions_complete_; std::shared_ptr metadata_; std::shared_ptr manifest_; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 84d4342a25e20..76cd0af3b835f 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -655,6 +655,12 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragments) { CountRowGroupsInFragment(fragment, {5, 6}, and_(greater_equal(field_ref("i64"), literal(6)), less(field_ref("i64"), literal(8)))); + + // nested field reference + CountRowGroupsInFragment(fragment, {0, 1, 2, 3, 4}, + less(field_ref(FieldRef("struct", "i32")), literal(6))); + CountRowGroupsInFragment(fragment, {1}, + equal(field_ref(FieldRef("struct", "str")), literal("2"))); } TEST_P(TestParquetFileFormatScan, ExplicitRowGroupSelection) { diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e2bb4400c8bde..ae2146c0bdaee 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1648,6 +1648,42 @@ def test_fragments_parquet_subset_invalid(tempdir): fragment.subset() +@pytest.mark.parquet +def test_fragments_parquet_subset_with_nested_fields(tempdir): + # ensure row group filtering with nested field works + f1 = pa.array([0, 1, 2, 3]) + f21 = pa.array([0.1, 0.2, 0.3, 0.4]) + f22 = pa.array([1, 2, 3, 4]) + f2 = pa.StructArray.from_arrays([f21, f22], names=["f21", "f22"]) + struct_col = pa.StructArray.from_arrays([f1, f2], names=["f1", "f2"]) + table = pa.table({"col": struct_col}) + pq.write_table(table, tempdir / "data_struct.parquet", row_group_size=2) + + dataset = ds.dataset(tempdir / "data_struct.parquet", format="parquet") + fragment = list(dataset.get_fragments())[0] + assert fragment.num_row_groups == 2 + + subfrag = fragment.subset(ds.field("col", "f1") > 2) + assert subfrag.num_row_groups == 1 + subfrag = fragment.subset(ds.field("col", "f1") > 5) + assert subfrag.num_row_groups == 0 + + subfrag = fragment.subset(ds.field("col", "f2", "f21") > 0) + assert subfrag.num_row_groups == 2 + subfrag = fragment.subset(ds.field("col", "f2", "f22") <= 2) + assert subfrag.num_row_groups == 1 + + # nonexisting field ref + with pytest.raises(pa.ArrowInvalid, match="No match for FieldRef.Nested"): + fragment.subset(ds.field("col", "f3") > 0) + + # comparison with struct field is not implemented + with pytest.raises( + NotImplementedError, match="Function 'greater' has no kernel matching" + ): + fragment.subset(ds.field("col", "f2") > 0) + + @pytest.mark.pandas @pytest.mark.parquet def test_fragments_repr(tempdir, dataset):