Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39064: [C++][Parquet] Support row group filtering for nested paths for struct fields #39065

39 changes: 29 additions & 10 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ bool IsNan(const Scalar& value) {
}

std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) {
const FieldRef& field_ref, const SchemaField& schema_field,
const parquet::RowGroupMetaData& metadata) {
// For the remaining of this function, failure to extract/parse statistics
// are ignored by returning nullptr. The goal is two fold. First
// avoid an optimization which breaks the computation. Second, allow the
Expand All @@ -180,7 +181,8 @@ std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
return std::nullopt;
}

return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics);
return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, field_ref,
*statistics);
}

void AddColumnIndices(const SchemaField& schema_field,
Expand Down Expand Up @@ -360,8 +362,9 @@ Result<bool> IsSupportedParquetFile(const ParquetFileFormat& format,
} // namespace

std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics) {
auto field_expr = compute::field_ref(field.name());
const Field& field, const FieldRef& field_ref,
const parquet::Statistics& statistics) {
auto field_expr = compute::field_ref(field_ref);

// Optimize for corner case where all values are nulls
if (statistics.num_values() == 0 && statistics.null_count() > 0) {
Expand Down Expand Up @@ -418,6 +421,13 @@ std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpr
return std::nullopt;
}

std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics) {
const auto field_name = field.name();
return EvaluateStatisticsAsExpression(field, FieldRef(std::move(field_name)),
statistics);
}

ParquetFileFormat::ParquetFileFormat()
: FileFormat(std::make_shared<ParquetFragmentScanOptions>()) {}

Expand Down Expand Up @@ -810,7 +820,7 @@ Status ParquetFileFragment::SetMetadata(
manifest_ = std::move(manifest);

statistics_expressions_.resize(row_groups_->size(), compute::literal(true));
statistics_expressions_complete_.resize(physical_schema_->num_fields(), false);
statistics_expressions_complete_.resize(manifest_->descr->num_columns(), false);

for (int row_group : *row_groups_) {
// Ensure RowGroups are indexing valid RowGroups before augmenting.
Expand Down Expand Up @@ -900,16 +910,25 @@ Result<std::vector<compute::Expression>> ParquetFileFragment::TestRowGroups(
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*physical_schema_));

if (match.empty()) continue;
if (statistics_expressions_complete_[match[0]]) continue;
statistics_expressions_complete_[match[0]] = true;
const SchemaField* schema_field = &manifest_->schema_fields[match[0]];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the same logic as FieldPath::Get, would you mind extracting it as a separate function? It would be nice to have a clear single entry point for future work on nested field references in parquet

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean including the for loop below, right?

I think a method on the SchemaManifest might be a logical place to have this. It already has a GetColumnField to return a SchemaField based on a single integer index. There could be a variant which accepts a FieldPath

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to merge this before the 15.0 branch cut-off, so going to merge as is, but will look into factoring it out as a helper function in a follow-up!


for (size_t i = 1; i < match.indices().size(); ++i) {
if (schema_field->field->type()->id() != Type::STRUCT) {
return Status::Invalid("nested paths only supported for structs");
}
Comment on lines +916 to +918
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this limit user passing an filter on Map/List?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we currently also don't support any predicate kernels for those data types at the moment AFAIK.

For example for a list column, you can't do something like "list_field > 1" because 1) such kernel isn't implemented, and 2) that actually also doesn't really make sense as a list scalar contains multiple values, so that doesn't evaluate to simple True/False, you need some kind of aggregation like "elementwise_all(list_field > 1)" (i.e. are "all" (or any) values in a list scalar larger than 1).
And even then simplifying such more complex expression based on the parquet statistics would also need to be implemented.

(I would like to see this work at some point, but that's certainly future work)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind add a test for the "List/Map" filter doesn't work in cpp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree list/map filter is so hard to filtering, which might need extra predicates. Let disable it now, but maybe we can test some more complex struct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind add a test for the "List/Map" filter doesn't work in cpp

Filtering with a list or map field actually already fails in an earlier step, when binding the filter expression to the schema (and binding isn't done in FilterRowGroups, it's expected to already be done, also in the test for this it is done up front in the test setup code).

schema_field = &schema_field->children[match[i]];
}

if (!schema_field->is_leaf()) continue;
if (statistics_expressions_complete_[schema_field->column_index]) continue;
statistics_expressions_complete_[schema_field->column_index] = true;

const SchemaField& schema_field = manifest_->schema_fields[match[0]];
int i = 0;
for (int row_group : *row_groups_) {
auto row_group_metadata = metadata_->RowGroup(row_group);

if (auto minmax =
ColumnChunkStatisticsAsExpression(schema_field, *row_group_metadata)) {
if (auto minmax = ColumnChunkStatisticsAsExpression(ref, *schema_field,
*row_group_metadata)) {
FoldingAnd(&statistics_expressions_[i], std::move(*minmax));
ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i],
statistics_expressions_[i].Bind(*physical_schema_));
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics);

static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
const Field& field, const FieldRef& field_ref,
const parquet::Statistics& statistics);

private:
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
Expand Down Expand Up @@ -207,7 +211,11 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
/// or std::nullopt if all row groups are selected.
std::optional<std::vector<int>> row_groups_;

// the expressions (combined for all columns for which statistics have been
// processed) are stored per column group
std::vector<compute::Expression> statistics_expressions_;
// statistics status are kept track of by Parquet Schema column indices
// (i.e. not Arrow schema field index)
std::vector<bool> statistics_expressions_complete_;
std::shared_ptr<parquet::FileMetaData> metadata_;
std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragments) {
CountRowGroupsInFragment(fragment, {5, 6},
and_(greater_equal(field_ref("i64"), literal(6)),
less(field_ref("i64"), literal(8))));

// nested field reference
CountRowGroupsInFragment(fragment, {0, 1, 2, 3, 4},
less(field_ref(FieldRef("struct", "i32")), literal(6)));
CountRowGroupsInFragment(fragment, {1},
equal(field_ref(FieldRef("struct", "str")), literal("2")));
}

TEST_P(TestParquetFileFormatScan, ExplicitRowGroupSelection) {
Expand Down
36 changes: 36 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,42 @@ def test_fragments_parquet_subset_invalid(tempdir):
fragment.subset()


@pytest.mark.parquet
def test_fragments_parquet_subset_with_nested_fields(tempdir):
# ensure row group filtering with nested field works
f1 = pa.array([0, 1, 2, 3])
f21 = pa.array([0.1, 0.2, 0.3, 0.4])
f22 = pa.array([1, 2, 3, 4])
f2 = pa.StructArray.from_arrays([f21, f22], names=["f21", "f22"])
struct_col = pa.StructArray.from_arrays([f1, f2], names=["f1", "f2"])
table = pa.table({"col": struct_col})
pq.write_table(table, tempdir / "data_struct.parquet", row_group_size=2)

dataset = ds.dataset(tempdir / "data_struct.parquet", format="parquet")
fragment = list(dataset.get_fragments())[0]
assert fragment.num_row_groups == 2

subfrag = fragment.subset(ds.field("col", "f1") > 2)
assert subfrag.num_row_groups == 1
subfrag = fragment.subset(ds.field("col", "f1") > 5)
assert subfrag.num_row_groups == 0

subfrag = fragment.subset(ds.field("col", "f2", "f21") > 0)
assert subfrag.num_row_groups == 2
subfrag = fragment.subset(ds.field("col", "f2", "f22") <= 2)
assert subfrag.num_row_groups == 1

# nonexisting field ref
with pytest.raises(pa.ArrowInvalid, match="No match for FieldRef.Nested"):
fragment.subset(ds.field("col", "f3") > 0)

# comparison with struct field is not implemented
with pytest.raises(
NotImplementedError, match="Function 'greater' has no kernel matching"
):
fragment.subset(ds.field("col", "f2") > 0)


@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_repr(tempdir, dataset):
Expand Down
Loading