Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DNF, SQL, Compute expression support to dataset filters #39128

Open
davlee1972 opened this issue Dec 7, 2023 · 0 comments
Open

Add DNF, SQL, Compute expression support to dataset filters #39128

davlee1972 opened this issue Dec 7, 2023 · 0 comments

Comments

@davlee1972
Copy link

davlee1972 commented Dec 7, 2023

Describe the enhancement requested

pyarrow.parquet supports DNF filtering. Add enhanced DNF filtering to pyarrow.dataset.
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html

The samples below demos how you can mix pyarrow compute expressions with DNF expressions.

Additional question: Is there an easy way to map pyarrow.compute functions as DNF operators using pyarrow.compute.get_function()?

>>> exp = (pc.field('a') >= 100) & (pc.field('a') <= 1000)
>>> exp
<pyarrow.compute.Expression ((a >= 100) and (a <= 1000))>
>>> _filters_to_expression(exp)
<pyarrow.compute.Expression ((a >= 100) and (a <= 1000))>
>>>
>>> exp = [('a', 'between', [100,1000])]
>>> exp
[('a', 'between', [100, 1000])]
>>> _filters_to_expression(exp)
<pyarrow.compute.Expression ((a >= 100) and (a <= 1000))>
>>>
>>> exp = [(pc.field('a') >= 100), ('a', "<=", 1000)]
>>> exp
[<pyarrow.compute.Expression (a >= 100)>, ('a', '<=', 1000)]
>>> _filters_to_expression(exp)
<pyarrow.compute.Expression ((a >= 100) and (a <= 1000))>
>>>
>>> exp = (pc.starts_with(pc.field('a'), "abc") | (pc.ends_with(pc.field('b'), 'xyz')))
>>> exp
<pyarrow.compute.Expression (starts_with(a, {pattern="abc", ignore_case=false}) or ends_with(b, {pattern="xyz", ignore_case=false}))>
>>> _filters_to_expression(exp)
<pyarrow.compute.Expression (starts_with(a, {pattern="abc", ignore_case=false}) or ends_with(b, {pattern="xyz", ignore_case=false}))>
>>>
>>> exp = [[('a', 'starts_with', 'abc')], [('b', 'ends_with', 'xyz')]]
>>> exp
[[('a', 'starts_with', 'abc')], [('b', 'ends_with', 'xyz')]]
>>> _filters_to_expression(exp)
<pyarrow.compute.Expression (starts_with(a, {pattern="abc", ignore_case=false}) or ends_with(b, {pattern="xyz", ignore_case=false}))>
>>>
>>> exp = [[pc.starts_with(pc.field('a'), "abc")], [('b', 'ends_with', 'xyz')]]
>>> exp
[[<pyarrow.compute.Expression starts_with(a, {pattern="abc", ignore_case=false})>], [('b', 'ends_with', 'xyz')]]
>>> _filters_to_expression(exp)
<pyarrow.compute.Expression (starts_with(a, {pattern="abc", ignore_case=false}) or ends_with(b, {pattern="xyz", ignore_case=false}))>
import functools
import operator
from typing import Union, List, Optional, Tuple
import pyarrow.compute as pc
import pyarrow as pa


def _filters_to_expression(
    filters: Optional[Union[pc.Expression, List[Tuple], List[List[Tuple]]]] = None
) -> pc.Expression:
    """Constructs a pyarrow filter from a combo of DNF(s) or expressions.

    :param filters: pyarrow filters.
     See `pyarrow_read_table <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html>`_ .
    :param filters: Optional[Union[pc.Expression:
    :param List[Tuple]:
    :param List[List[Tuple]]]]:  (Default value = None)
    :returns: pyarrow filter
    :raises ValueError:
        raised when operation requested is not valid or supported

    """
    if isinstance(filters, pc.Expression):
        return filters
    elif isinstance(filters, list):
        """
        if any item in the list is not a pc.expression convert it to one first
        """
        if all([isinstance(f, list) for f in filters]):
            filters = [_filters_to_expression(filter_item) for filter_item in filters]
            return functools.reduce(operator.or_, filters)
        else:
            filters = [_filters_to_expression(filter_item) for filter_item in filters]
            if len(filters) > 1:
                return functools.reduce(operator.and_, filters)
            else:
                return _filters_to_expression(filters[0])
    elif not filters:
        return None
    else:
        assert len(filters) == 3 and isinstance(filters, tuple)
        f, op, val = filters
        f = pc.field(f)
        if op in ("=", "==", "is"):
            if val is None:
                return pc.is_null(f)
            else:
                return f == val
        elif op in ("!=", "is not"):
            if val is None:
                return pc.invert(pc.is_null(f))
            else:
                return f != val
        elif op == "in":
            return pc.is_in(f, pa.array(val))
        elif op == "not in":
            return pc.invert(pc.is_in(f, pa.array(val)))
        elif op == "<":
            return f < val
        elif op == "<=":
            return f <= val
        elif op == ">":
            return f > val
        elif op == ">=":
            return f >= val
        elif op == "between":
            val = pa.array(val).sort()
            return (f >= val[0]) & (f <= val[-1])
        elif op == "not between":
            val = pa.array(val).sort()
            return (f < val[0]) | (f > val[-1])
        elif op == "like":
            return pc.match_like(f, val)
        elif op == "not like":
            return pc.invert(pc.match_like(f, val))
        elif op in ("match_like", "starts_with", "ends_with"):
            func = getattr(pc, op)
            return func(f, val)
        elif op in ("not match_like", "not starts_with", "not ends_with"):
            # strip not before inverting
            op = op[4:]
            func = getattr(pc, op)
            return pc.invert(func(f, val))
        else:
            raise ValueError(f"Not supported operator {op}")

Component(s)

C++, Python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant