Skip to content

Commit

Permalink
ARROW-17483: [Python] Support Expression filters in non-legacy Parque…
Browse files Browse the repository at this point in the history
…tDataset/read_table (#14011)

Authored-by: Miles Granger <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
milesgranger authored Sep 13, 2022
1 parent ef8cb09 commit 8ceb3b8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 19 deletions.
9 changes: 6 additions & 3 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,9 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
raise NotImplementedError("split_row_groups not yet implemented")

if filters is not None:
if hasattr(filters, "cast"):
raise TypeError(
"Expressions as filter not supported for legacy dataset")
filters = _check_filters(filters)
self._filter(filters)

Expand Down Expand Up @@ -2338,9 +2341,9 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None,
if decryption_properties is not None:
read_options.update(decryption_properties=decryption_properties)

# map filters to Expressions
self._filters = filters
self._filter_expression = filters and _filters_to_expression(filters)
self._filter_expression = None
if filters is not None:
self._filter_expression = _filters_to_expression(filters)

# map old filesystems to new one
if filesystem is not None:
Expand Down
38 changes: 22 additions & 16 deletions python/pyarrow/tests/parquet/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import pytest

import pyarrow as pa
import pyarrow.compute as pc
from pyarrow import fs
from pyarrow.filesystem import LocalFileSystem
from pyarrow.tests import util
Expand Down Expand Up @@ -556,7 +557,14 @@ def test_filters_invalid_column(tempdir, use_legacy_dataset):

@pytest.mark.pandas
@parametrize_legacy_dataset
def test_filters_read_table(tempdir, use_legacy_dataset):
@pytest.mark.parametrize("filters",
([('integers', '<', 3)],
[[('integers', '<', 3)]],
pc.field('integers') < 3,
pc.field('nested', 'a') < 3,
pc.field('nested', 'b').cast(pa.int64()) < 3))
@pytest.mark.parametrize("read", (pq.read_table, pq.read_pandas))
def test_filters_read_table(tempdir, use_legacy_dataset, filters, read):
# test that filters keyword is passed through in read_table
fs = LocalFileSystem._get_instance()
base_path = tempdir
Expand All @@ -565,29 +573,27 @@ def test_filters_read_table(tempdir, use_legacy_dataset):
partition_spec = [
['integers', integer_keys],
]
N = 5
N = len(integer_keys)

df = pd.DataFrame({
'index': np.arange(N),
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
'nested': np.array([{'a': i, 'b': str(i)} for i in range(N)])
})

_generate_partition_directories(fs, base_path, partition_spec, df)

table = pq.read_table(
base_path, filesystem=fs, filters=[('integers', '<', 3)],
use_legacy_dataset=use_legacy_dataset)
assert table.num_rows == 3

table = pq.read_table(
base_path, filesystem=fs, filters=[[('integers', '<', 3)]],
use_legacy_dataset=use_legacy_dataset)
assert table.num_rows == 3
kwargs = dict(filesystem=fs, filters=filters,
use_legacy_dataset=use_legacy_dataset)

table = pq.read_pandas(
base_path, filters=[('integers', '<', 3)],
use_legacy_dataset=use_legacy_dataset)
assert table.num_rows == 3
# Using Expression in legacy dataset not supported
if use_legacy_dataset and isinstance(filters, pc.Expression):
msg = "Expressions as filter not supported for legacy dataset"
with pytest.raises(TypeError, match=msg):
read(base_path, **kwargs)
else:
table = read(base_path, **kwargs)
assert table.num_rows == 3


@pytest.mark.pandas
Expand Down

0 comments on commit 8ceb3b8

Please sign in to comment.