Skip to content

Commit

Permalink
fix for multiple nested fields + add python tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisvandenbossche committed Dec 5, 2023
1 parent b1115bf commit 56476cf
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
23 changes: 13 additions & 10 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ bool IsNan(const Scalar& value) {
}

std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
const FieldRef& field_ref, SchemaField& schema_field,
const FieldRef& field_ref, const SchemaField& schema_field,
const parquet::RowGroupMetaData& metadata) {
// For the remaining of this function, failure to extract/parse statistics
// are ignored by returning nullptr. The goal is two fold. First
Expand Down Expand Up @@ -810,7 +810,7 @@ Status ParquetFileFragment::SetMetadata(
manifest_ = std::move(manifest);

statistics_expressions_.resize(row_groups_->size(), compute::literal(true));
statistics_expressions_complete_.resize(physical_schema_->num_fields(), false);
statistics_expressions_complete_.resize(manifest_->descr->num_columns(), false);

for (int row_group : *row_groups_) {
// Ensure RowGroups are indexing valid RowGroups before augmenting.
Expand Down Expand Up @@ -896,26 +896,29 @@ Result<std::vector<compute::Expression>> ParquetFileFragment::TestRowGroups(
return std::vector<compute::Expression>{};
}

const SchemaField* schema_field = nullptr;
for (const FieldRef& ref : FieldsInExpression(predicate)) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*physical_schema_));

if (match.empty()) continue;
if (statistics_expressions_complete_[match[0]]) continue;
statistics_expressions_complete_[match[0]] = true;
schema_field = &manifest_->schema_fields[match[0]];

SchemaField& schema_field = manifest_->schema_fields[match[0]];
for (size_t i = 1; i < match.indices().size(); ++i) {
if (schema_field.field->type()->id() != Type::STRUCT) {
if (schema_field->field->type()->id() != Type::STRUCT) {
return Status::Invalid("nested paths only supported for structs");
}
schema_field = schema_field.children[match[i]];
schema_field = &schema_field->children[match[i]];
}

if (!schema_field->is_leaf()) continue;
if (statistics_expressions_complete_[schema_field->column_index]) continue;
statistics_expressions_complete_[schema_field->column_index] = true;

int i = 0;
for (int row_group : *row_groups_) {
auto row_group_metadata = metadata_->RowGroup(row_group);

if (auto minmax =
ColumnChunkStatisticsAsExpression(ref, schema_field, *row_group_metadata)) {
if (auto minmax = ColumnChunkStatisticsAsExpression(ref, *schema_field,
*row_group_metadata)) {
FoldingAnd(&statistics_expressions_[i], std::move(*minmax));
ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i],
statistics_expressions_[i].Bind(*physical_schema_));
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
/// or std::nullopt if all row groups are selected.
std::optional<std::vector<int>> row_groups_;

// the expressions (combined for all columns for which statistics have been
// processed) are stored per column group
std::vector<compute::Expression> statistics_expressions_;
// statistics status are kept track of by Parquet Schema column indices
// (i.e. not Arrow schema field index)
std::vector<bool> statistics_expressions_complete_;
std::shared_ptr<parquet::FileMetaData> metadata_;
std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
Expand Down
36 changes: 36 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1616,6 +1616,42 @@ def test_fragments_parquet_subset_invalid(tempdir):
fragment.subset()


@pytest.mark.parquet
def test_fragments_parquet_subset_with_nested_fields(tempdir):
# ensure row group filtering with nested field works
f1 = pa.array([0, 1, 2, 3])
f21 = pa.array([0.1, 0.2, 0.3, 0.4])
f22 = pa.array([1, 2, 3, 4])
f2 = pa.StructArray.from_arrays([f21, f22], names=["f21", "f22"])
struct_col = pa.StructArray.from_arrays([f1, f2], names=["f1", "f2"])
table = pa.table({"col": struct_col})
pq.write_table(table, tempdir / "data_struct.parquet", row_group_size=2)

dataset = ds.dataset(tempdir / "data_struct.parquet", format="parquet")
fragment = list(dataset.get_fragments())[0]
assert fragment.num_row_groups == 2

subfrag = fragment.subset(ds.field("col", "f1") > 2)
assert subfrag.num_row_groups == 1
subfrag = fragment.subset(ds.field("col", "f1") > 5)
assert subfrag.num_row_groups == 0

subfrag = fragment.subset(ds.field("col", "f2", "f21") > 0)
assert subfrag.num_row_groups == 2
subfrag = fragment.subset(ds.field("col", "f2", "f22") <= 2)
assert subfrag.num_row_groups == 1

# nonexisting field ref
with pytest.raises(pa.ArrowInvalid, match="No match for FieldRef.Nested"):
fragment.subset(ds.field("col", "f3") > 0)

# comparison with struct field is not implemented
with pytest.raises(
NotImplementedError, match="Function 'greater' has no kernel matching"
):
fragment.subset(ds.field("col", "f2") > 0)


@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_repr(tempdir, dataset):
Expand Down

0 comments on commit 56476cf

Please sign in to comment.