Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support read_parquet for backend with no native support #9744

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ab2ad16
support read_parquet for backend with no native support
jitingxu1 Aug 1, 2024
661f50d
fix unit tests
jitingxu1 Aug 2, 2024
e16f1bb
resolve Xpass and clickhouse tests
jitingxu1 Aug 2, 2024
eaec7a2
handle different inputs
jitingxu1 Aug 5, 2024
9106ad8
Merge branch 'main' into extend-read-parquet
jitingxu1 Aug 6, 2024
27d7a08
pandas not suporting glob pattern
jitingxu1 Aug 6, 2024
ac6117f
Merge branch 'main' into extend-read-parquet
gforsyth Aug 6, 2024
3ce9674
tests for url and fssepc url
jitingxu1 Aug 18, 2024
24530ca
resolve pandas use pyarrow as default
jitingxu1 Aug 19, 2024
bb238af
add test for is_url and is_fsspec_url
jitingxu1 Aug 21, 2024
12cfc7d
change to fssepc and add examples
jitingxu1 Aug 23, 2024
2cf597a
add reason for mark.never
jitingxu1 Aug 23, 2024
b4cf0ea
re run workflow
jitingxu1 Aug 23, 2024
2ba5002
Merge branch 'main' into extend-read-parquet
jitingxu1 Aug 23, 2024
6f2c754
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 11, 2024
24bfe38
lint
jitingxu1 Sep 15, 2024
6a50c46
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 15, 2024
4579bff
remove pandas
jitingxu1 Sep 15, 2024
d1ed444
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 16, 2024
b01bc6a
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 18, 2024
e70de2f
Merge branch 'ibis-project:main' into extend-read-parquet
jitingxu1 Sep 18, 2024
413ada7
Merge branch 'ibis-project:main' into extend-read-parquet
jitingxu1 Sep 19, 2024
c3fba44
reconcile coe
jitingxu1 Sep 19, 2024
8b6b3c6
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 20, 2024
0d55190
skip trino and impala
jitingxu1 Sep 20, 2024
fda5493
Trigger CI
jitingxu1 Sep 20, 2024
71ebb8e
chore: trigger CI
jitingxu1 Sep 21, 2024
2473c02
chore(test): skip test for backends with own parquet readers
gforsyth Sep 23, 2024
3ab60a8
chore: simplify the logic
jitingxu1 Sep 25, 2024
59c03e0
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 25, 2024
c0c1fd1
chore: lint
jitingxu1 Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import abc
import collections.abc
import functools
import glob
import importlib.metadata
import keyword
import re
import urllib.parse
import urllib.request
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar

Expand Down Expand Up @@ -1199,6 +1202,61 @@
f"{cls.name} backend has not implemented `has_operation` API"
)

def read_parquet(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of BytesIO, I could pass the fsspec object, It could be HTTPFile if we pass an HTTP url. Not sure what is the best way to handle the type of path

@gforsyth any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fsspec is a good option.

"""Register a parquet file as a table in the current backend.

Parameters
----------
path
The data source. May be a path to a file, an iterable of files,
or directory of parquet files.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to the pyarrow loading function.
See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
for more information.

Returns
-------
ir.Table
The just-registered table

"""

table = self._get_pyarrow_table_from_path(path, **kwargs)
table_name = table_name or util.gen_name("read_parquet")
self.create_table(table_name, table)
return self.table(table_name)

def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table:
Copy link
Member

@cpcloud cpcloud Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the implementation of this just be:

return pq.read_table(path, **kwargs)

Did you try that already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that in my first commit, it cannot handle all the cases:

such as glob pattern and Parquet files hosted on some uri: i.e HTTPS SFTP

Pyarrow implements natively the following filesystem subclasses:

Local FS (LocalFileSystem)

S3 (S3FileSystem)

Google Cloud Storage File System (GcsFileSystem)

Hadoop Distributed File System (HDFS) (HadoopFileSystem)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cpcloud does this make sense to you?

pq = util.import_object("pyarrow.parquet")

path = str(path)
# handle url
if util.is_url(path):
headers = kwargs.pop("headers", {})
req_info = urllib.request.Request(path, headers=headers) # noqa: S310

Check warning on line 1242 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1241-L1242

Added lines #L1241 - L1242 were not covered by tests
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
jitingxu1 marked this conversation as resolved.
Show resolved Hide resolved
with urllib.request.urlopen(req_info) as req: # noqa: S310
with BytesIO(req.read()) as reader:
return pq.read_table(reader)

Check warning on line 1245 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1245

Added line #L1245 was not covered by tests

# handle fsspec compatible url
if util.is_fsspec_url(path):
return pq.read_table(path, **kwargs)

Check warning on line 1249 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1249

Added line #L1249 was not covered by tests

# Handle local file paths or patterns
paths = glob.glob(path)
if not paths:
raise ValueError(f"No files found matching pattern: {path!r}")

Check warning on line 1254 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1254

Added line #L1254 was not covered by tests
elif len(paths) == 1:
paths = paths[0]

return pq.read_table(paths, **kwargs)

def _cached(self, expr: ir.Table):
jitingxu1 marked this conversation as resolved.
Show resolved Hide resolved
"""Cache the provided expression.

Expand Down
39 changes: 18 additions & 21 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import pyarrow as pa

pytestmark = pytest.mark.notimpl(["druid", "exasol", "oracle"])


@contextlib.contextmanager
def pushd(new_dir):
Expand Down Expand Up @@ -98,6 +96,7 @@ def gzip_csv(data_dir, tmp_path):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
def test_register_csv(con, data_dir, fname, in_table_name, out_table_name):
with pushd(data_dir / "csv"):
with pytest.warns(FutureWarning, match="v9.1"):
Expand All @@ -109,7 +108,7 @@ def test_register_csv(con, data_dir, fname, in_table_name, out_table_name):


# TODO: rewrite or delete test when register api is removed
@pytest.mark.notimpl(["datafusion"])
@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"])
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -153,6 +152,7 @@ def test_register_csv_gz(con, data_dir, gzip_csv):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_with_dotted_name(con, data_dir, tmp_path):
basename = "foo.bar.baz/diamonds.csv"
f = tmp_path.joinpath(basename)
Expand Down Expand Up @@ -212,6 +212,7 @@ def read_table(path: Path) -> Iterator[tuple[str, pa.Table]]:
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_parquet(
con, tmp_path, data_dir, fname, in_table_name, out_table_name
):
Expand Down Expand Up @@ -252,6 +253,7 @@ def test_register_parquet(
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_iterator_parquet(
con,
tmp_path,
Expand Down Expand Up @@ -280,7 +282,7 @@ def test_register_iterator_parquet(
# TODO: remove entirely when `register` is removed
# This same functionality is implemented across all backends
# via `create_table` and tested in `test_client.py`
@pytest.mark.notimpl(["datafusion"])
@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"])
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -316,7 +318,7 @@ def test_register_pandas(con):
# TODO: remove entirely when `register` is removed
# This same functionality is implemented across all backends
# via `create_table` and tested in `test_client.py`
@pytest.mark.notimpl(["datafusion", "polars"])
@pytest.mark.notimpl(["datafusion", "polars", "druid", "exasol", "oracle"])
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -361,6 +363,7 @@ def test_register_pyarrow_tables(con):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_csv_reregister_schema(con, tmp_path):
foo = tmp_path.joinpath("foo.csv")
with foo.open("w", newline="") as csvfile:
Expand Down Expand Up @@ -390,10 +393,13 @@ def test_csv_reregister_schema(con, tmp_path):
"clickhouse",
"dask",
"datafusion",
"druid",
"exasol",
"flink",
"impala",
"mysql",
"mssql",
"oracle",
"pandas",
"polars",
"postgres",
Expand Down Expand Up @@ -425,9 +431,8 @@ def test_register_garbage(con, monkeypatch):
("functional_alltypes.parquet", "funk_all"),
],
)
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"]
)
@pytest.mark.notyet(["flink"])
@pytest.mark.notimpl(["druid"])
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
pq = pytest.importorskip("pyarrow.parquet")

Expand Down Expand Up @@ -456,19 +461,8 @@ def ft_data(data_dir):
return table.slice(0, nrows)


@pytest.mark.notyet(
[
"flink",
"impala",
"mssql",
"mysql",
"pandas",
"postgres",
"risingwave",
"sqlite",
"trino",
]
)
@pytest.mark.notyet(["flink", "pandas"])
@pytest.mark.notimpl(["druid"])
def test_read_parquet_glob(con, tmp_path, ft_data):
pq = pytest.importorskip("pyarrow.parquet")

Expand Down Expand Up @@ -498,6 +492,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_csv_glob(con, tmp_path, ft_data):
pc = pytest.importorskip("pyarrow.csv")

Expand Down Expand Up @@ -534,6 +529,7 @@ def test_read_csv_glob(con, tmp_path, ft_data):
raises=ValueError,
reason="read_json() missing required argument: 'schema'",
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_json_glob(con, tmp_path, ft_data):
nrows = len(ft_data)
ntables = 2
Expand Down Expand Up @@ -580,6 +576,7 @@ def num_diamonds(data_dir):
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_csv(con, data_dir, in_table_name, num_diamonds):
fname = "diamonds.csv"
with pushd(data_dir / "csv"):
Expand Down
46 changes: 46 additions & 0 deletions ibis/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
import warnings
from types import ModuleType
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from urllib.parse import (
urlparse,
uses_netloc,
uses_params,
uses_relative,
)
from uuid import uuid4

import toolz
Expand All @@ -42,6 +48,10 @@
# https://www.compart.com/en/unicode/U+2026
HORIZONTAL_ELLIPSIS = "\u2026"

_VALID_URLS = set(uses_relative + uses_netloc + uses_params)
_VALID_URLS.discard("")
_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://")


def guid() -> str:
"""Return a uuid4 hexadecimal value."""
Expand Down Expand Up @@ -485,6 +495,42 @@ def import_object(qualname: str) -> Any:
raise ImportError(f"cannot import name {name!r} from {mod_name!r}") from None


def is_url(url: str) -> bool:
"""Check to see if a URL has a valid protocol.

Parameters
----------
url : str
The URL to be checked.

Returns
-------
bool
True if the URL has a valid protocol, False otherwise

"""

return urlparse(url).scheme in _VALID_URLS


def is_fsspec_url(url: str) -> bool:
"""Check if the given URL looks like something fsspec can handle.

Parameters
----------
url : str
The URL string to be checked.

Returns
-------
bool
True if the URL is likely compatible with fsspec, False otherwise.
"""
return bool(_RFC_3986_PATTERN.match(url)) and not url.startswith(
("http://", "https://")
)


def normalize_filename(source: str | Path) -> str:
source = str(source)
for prefix in (
Expand Down
Loading