From 3312381bbd31cf012350dbeafac46b146fd6ddac Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Mon, 4 Dec 2023 17:36:23 +0100 Subject: [PATCH 1/7] GH-39064: [C++][Parquet] Support row group filtering for nested paths for struct fields --- cpp/src/arrow/dataset/file_parquet.cc | 18 ++++++++++++------ cpp/src/arrow/dataset/file_parquet.h | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 3afe4ec85cf49..27fb0ae214df2 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -158,7 +158,7 @@ bool IsNan(const Scalar& value) { } std::optional ColumnChunkStatisticsAsExpression( - const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { + const FieldRef& field_ref, SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { // For the remaining of this function, failure to extract/parse statistics // are ignored by returning nullptr. The goal is two fold. First // avoid an optimization which breaks the computation. Second, allow the @@ -177,7 +177,7 @@ std::optional ColumnChunkStatisticsAsExpression( return std::nullopt; } - return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics); + return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, field_ref, *statistics); } void AddColumnIndices(const SchemaField& schema_field, @@ -357,8 +357,8 @@ Result IsSupportedParquetFile(const ParquetFileFormat& format, } // namespace std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( - const Field& field, const parquet::Statistics& statistics) { - auto field_expr = compute::field_ref(field.name()); + const Field& field, const FieldRef& field_ref, const parquet::Statistics& statistics) { + auto field_expr = compute::field_ref(field_ref); // Optimize for corner case where all values are nulls if (statistics.num_values() == 0 && statistics.null_count() > 0) { @@ -900,13 +900,19 @@ Result> ParquetFileFragment::TestRowGroups( if (statistics_expressions_complete_[match[0]]) continue; statistics_expressions_complete_[match[0]] = true; - const SchemaField& schema_field = manifest_->schema_fields[match[0]]; + SchemaField& schema_field = manifest_->schema_fields[match[0]]; + for (size_t i = 1; i < match.indices().size(); ++i) { + if (schema_field.field->type()->id() != Type::STRUCT ) { + return Status::Invalid("nested paths only supported for structs"); + } + schema_field = schema_field.children[match[i]]; + } int i = 0; for (int row_group : *row_groups_) { auto row_group_metadata = metadata_->RowGroup(row_group); if (auto minmax = - ColumnChunkStatisticsAsExpression(schema_field, *row_group_metadata)) { + ColumnChunkStatisticsAsExpression(ref, schema_field, *row_group_metadata)) { FoldingAnd(&statistics_expressions_[i], std::move(*minmax)); ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i], statistics_expressions_[i].Bind(*physical_schema_)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index f527ce5d70ae0..c09101e58f74b 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -175,7 +175,7 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { Result> Subset(std::vector row_group_ids); static std::optional EvaluateStatisticsAsExpression( - const Field& field, const parquet::Statistics& statistics); + const Field& field, const FieldRef& field_ref, const parquet::Statistics& statistics); private: ParquetFileFragment(FileSource source, std::shared_ptr format, From b1115bf6a2f086f12ffe5a4b61a3521d6a6e15f7 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 5 Dec 2023 11:24:35 +0100 Subject: [PATCH 2/7] format --- cpp/src/arrow/dataset/file_parquet.cc | 13 ++++++++----- cpp/src/arrow/dataset/file_parquet.h | 3 ++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 27fb0ae214df2..2b34669031d18 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -158,7 +158,8 @@ bool IsNan(const Scalar& value) { } std::optional ColumnChunkStatisticsAsExpression( - const FieldRef& field_ref, SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { + const FieldRef& field_ref, SchemaField& schema_field, + const parquet::RowGroupMetaData& metadata) { // For the remaining of this function, failure to extract/parse statistics // are ignored by returning nullptr. The goal is two fold. First // avoid an optimization which breaks the computation. Second, allow the @@ -177,7 +178,8 @@ std::optional ColumnChunkStatisticsAsExpression( return std::nullopt; } - return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, field_ref, *statistics); + return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, field_ref, + *statistics); } void AddColumnIndices(const SchemaField& schema_field, @@ -357,7 +359,8 @@ Result IsSupportedParquetFile(const ParquetFileFormat& format, } // namespace std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( - const Field& field, const FieldRef& field_ref, const parquet::Statistics& statistics) { + const Field& field, const FieldRef& field_ref, + const parquet::Statistics& statistics) { auto field_expr = compute::field_ref(field_ref); // Optimize for corner case where all values are nulls @@ -902,8 +905,8 @@ Result> ParquetFileFragment::TestRowGroups( SchemaField& schema_field = manifest_->schema_fields[match[0]]; for (size_t i = 1; i < match.indices().size(); ++i) { - if (schema_field.field->type()->id() != Type::STRUCT ) { - return Status::Invalid("nested paths only supported for structs"); + if (schema_field.field->type()->id() != Type::STRUCT) { + return Status::Invalid("nested paths only supported for structs"); } schema_field = schema_field.children[match[i]]; } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index c09101e58f74b..aa5943bdbcdbb 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -175,7 +175,8 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { Result> Subset(std::vector row_group_ids); static std::optional EvaluateStatisticsAsExpression( - const Field& field, const FieldRef& field_ref, const parquet::Statistics& statistics); + const Field& field, const FieldRef& field_ref, + const parquet::Statistics& statistics); private: ParquetFileFragment(FileSource source, std::shared_ptr format, From 56476cf969f56fa59122b1f16a06dba7ab5cd015 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 5 Dec 2023 16:54:12 +0100 Subject: [PATCH 3/7] fix for multiple nested fields + add python tests --- cpp/src/arrow/dataset/file_parquet.cc | 23 +++++++++-------- cpp/src/arrow/dataset/file_parquet.h | 4 +++ python/pyarrow/tests/test_dataset.py | 36 +++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 2b34669031d18..e88fe0e3e3444 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -158,7 +158,7 @@ bool IsNan(const Scalar& value) { } std::optional ColumnChunkStatisticsAsExpression( - const FieldRef& field_ref, SchemaField& schema_field, + const FieldRef& field_ref, const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { // For the remaining of this function, failure to extract/parse statistics // are ignored by returning nullptr. The goal is two fold. First @@ -810,7 +810,7 @@ Status ParquetFileFragment::SetMetadata( manifest_ = std::move(manifest); statistics_expressions_.resize(row_groups_->size(), compute::literal(true)); - statistics_expressions_complete_.resize(physical_schema_->num_fields(), false); + statistics_expressions_complete_.resize(manifest_->descr->num_columns(), false); for (int row_group : *row_groups_) { // Ensure RowGroups are indexing valid RowGroups before augmenting. @@ -896,26 +896,29 @@ Result> ParquetFileFragment::TestRowGroups( return std::vector{}; } + const SchemaField* schema_field = nullptr; for (const FieldRef& ref : FieldsInExpression(predicate)) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*physical_schema_)); - if (match.empty()) continue; - if (statistics_expressions_complete_[match[0]]) continue; - statistics_expressions_complete_[match[0]] = true; + schema_field = &manifest_->schema_fields[match[0]]; - SchemaField& schema_field = manifest_->schema_fields[match[0]]; for (size_t i = 1; i < match.indices().size(); ++i) { - if (schema_field.field->type()->id() != Type::STRUCT) { + if (schema_field->field->type()->id() != Type::STRUCT) { return Status::Invalid("nested paths only supported for structs"); } - schema_field = schema_field.children[match[i]]; + schema_field = &schema_field->children[match[i]]; } + + if (!schema_field->is_leaf()) continue; + if (statistics_expressions_complete_[schema_field->column_index]) continue; + statistics_expressions_complete_[schema_field->column_index] = true; + int i = 0; for (int row_group : *row_groups_) { auto row_group_metadata = metadata_->RowGroup(row_group); - if (auto minmax = - ColumnChunkStatisticsAsExpression(ref, schema_field, *row_group_metadata)) { + if (auto minmax = ColumnChunkStatisticsAsExpression(ref, *schema_field, + *row_group_metadata)) { FoldingAnd(&statistics_expressions_[i], std::move(*minmax)); ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i], statistics_expressions_[i].Bind(*physical_schema_)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index aa5943bdbcdbb..75cefb4ec99fb 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -208,7 +208,11 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// or std::nullopt if all row groups are selected. std::optional> row_groups_; + // the expressions (combined for all columns for which statistics have been + // processed) are stored per column group std::vector statistics_expressions_; + // statistics status are kept track of by Parquet Schema column indices + // (i.e. not Arrow schema field index) std::vector statistics_expressions_complete_; std::shared_ptr metadata_; std::shared_ptr manifest_; diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d5e7015a5d5b9..0a0f3157e89db 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1616,6 +1616,42 @@ def test_fragments_parquet_subset_invalid(tempdir): fragment.subset() +@pytest.mark.parquet +def test_fragments_parquet_subset_with_nested_fields(tempdir): + # ensure row group filtering with nested field works + f1 = pa.array([0, 1, 2, 3]) + f21 = pa.array([0.1, 0.2, 0.3, 0.4]) + f22 = pa.array([1, 2, 3, 4]) + f2 = pa.StructArray.from_arrays([f21, f22], names=["f21", "f22"]) + struct_col = pa.StructArray.from_arrays([f1, f2], names=["f1", "f2"]) + table = pa.table({"col": struct_col}) + pq.write_table(table, tempdir / "data_struct.parquet", row_group_size=2) + + dataset = ds.dataset(tempdir / "data_struct.parquet", format="parquet") + fragment = list(dataset.get_fragments())[0] + assert fragment.num_row_groups == 2 + + subfrag = fragment.subset(ds.field("col", "f1") > 2) + assert subfrag.num_row_groups == 1 + subfrag = fragment.subset(ds.field("col", "f1") > 5) + assert subfrag.num_row_groups == 0 + + subfrag = fragment.subset(ds.field("col", "f2", "f21") > 0) + assert subfrag.num_row_groups == 2 + subfrag = fragment.subset(ds.field("col", "f2", "f22") <= 2) + assert subfrag.num_row_groups == 1 + + # nonexisting field ref + with pytest.raises(pa.ArrowInvalid, match="No match for FieldRef.Nested"): + fragment.subset(ds.field("col", "f3") > 0) + + # comparison with struct field is not implemented + with pytest.raises( + NotImplementedError, match="Function 'greater' has no kernel matching" + ): + fragment.subset(ds.field("col", "f2") > 0) + + @pytest.mark.pandas @pytest.mark.parquet def test_fragments_repr(tempdir, dataset): From b87c82975dad1a330d1784f75060b68865d7681e Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 5 Dec 2023 17:35:31 +0100 Subject: [PATCH 4/7] add back compat shim for EvaluateStatisticsAsExpression --- cpp/src/arrow/dataset/file_parquet.cc | 6 ++++++ cpp/src/arrow/dataset/file_parquet.h | 3 +++ 2 files changed, 9 insertions(+) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index e88fe0e3e3444..a55588431588e 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -418,6 +418,12 @@ std::optional ParquetFileFragment::EvaluateStatisticsAsExpr return std::nullopt; } +std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( + const Field& field, const parquet::Statistics& statistics) { + const auto field_name = field.name(); + EvaluateStatisticsAsExpression(field, FieldRef(field_name), statistics); +} + ParquetFileFormat::ParquetFileFormat() : FileFormat(std::make_shared()) {} diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 75cefb4ec99fb..1e81a34fb3cf0 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -174,6 +174,9 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { Result> Subset(compute::Expression predicate); Result> Subset(std::vector row_group_ids); + static std::optional EvaluateStatisticsAsExpression( + const Field& field, const parquet::Statistics& statistics); + static std::optional EvaluateStatisticsAsExpression( const Field& field, const FieldRef& field_ref, const parquet::Statistics& statistics); From 0d3da995134b4cb8b2fb41e8e1c32d9c085b961b Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 5 Dec 2023 18:09:47 +0100 Subject: [PATCH 5/7] fixup --- cpp/src/arrow/dataset/file_parquet.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index a55588431588e..a3a08e12931c6 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -421,7 +421,7 @@ std::optional ParquetFileFragment::EvaluateStatisticsAsExpr std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( const Field& field, const parquet::Statistics& statistics) { const auto field_name = field.name(); - EvaluateStatisticsAsExpression(field, FieldRef(field_name), statistics); + return EvaluateStatisticsAsExpression(field, FieldRef(field_name), statistics); } ParquetFileFormat::ParquetFileFormat() From bd8f1276ca557607466ebc515194bd101d0bcb67 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 6 Dec 2023 08:43:08 +0100 Subject: [PATCH 6/7] add small C++ test --- cpp/src/arrow/dataset/file_parquet_test.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 84d4342a25e20..76cd0af3b835f 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -655,6 +655,12 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragments) { CountRowGroupsInFragment(fragment, {5, 6}, and_(greater_equal(field_ref("i64"), literal(6)), less(field_ref("i64"), literal(8)))); + + // nested field reference + CountRowGroupsInFragment(fragment, {0, 1, 2, 3, 4}, + less(field_ref(FieldRef("struct", "i32")), literal(6))); + CountRowGroupsInFragment(fragment, {1}, + equal(field_ref(FieldRef("struct", "str")), literal("2"))); } TEST_P(TestParquetFileFormatScan, ExplicitRowGroupSelection) { From 42058376eed67019e6bea5715ae3740711266fd1 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 6 Dec 2023 10:01:36 +0100 Subject: [PATCH 7/7] address feedback --- cpp/src/arrow/dataset/file_parquet.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index a3a08e12931c6..e52a6b4882313 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -421,7 +421,8 @@ std::optional ParquetFileFragment::EvaluateStatisticsAsExpr std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( const Field& field, const parquet::Statistics& statistics) { const auto field_name = field.name(); - return EvaluateStatisticsAsExpression(field, FieldRef(field_name), statistics); + return EvaluateStatisticsAsExpression(field, FieldRef(std::move(field_name)), + statistics); } ParquetFileFormat::ParquetFileFormat() @@ -902,11 +903,11 @@ Result> ParquetFileFragment::TestRowGroups( return std::vector{}; } - const SchemaField* schema_field = nullptr; for (const FieldRef& ref : FieldsInExpression(predicate)) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*physical_schema_)); + if (match.empty()) continue; - schema_field = &manifest_->schema_fields[match[0]]; + const SchemaField* schema_field = &manifest_->schema_fields[match[0]]; for (size_t i = 1; i < match.indices().size(); ++i) { if (schema_field->field->type()->id() != Type::STRUCT) {