Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writing to local filesystem #301

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
"s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"gs": [ARROW_FILE_IO],
"file": [ARROW_FILE_IO],
"file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"hdfs": [ARROW_FILE_IO],
"abfs": [FSSPEC_FILE_IO],
"abfss": [FSSPEC_FILE_IO],
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def s3v4_rest_signer(properties: Properties, request: AWSRequest, **_: Any) -> A


def _file(_: Properties) -> LocalFileSystem:
return LocalFileSystem()
return LocalFileSystem(auto_mkdir=True)


def _s3(properties: Properties) -> AbstractFileSystem:
Expand Down Expand Up @@ -173,6 +173,7 @@ def _adlfs(properties: Properties) -> AbstractFileSystem:


SCHEME_TO_FS = {
"": _file,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow defaults scheme to file when no scheme is present.

return "file", uri.netloc, os.path.abspath(location)

we essentially do the same here

"file": _file,
"s3": _s3,
"s3a": _s3,
Expand Down
11 changes: 8 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@
T = TypeVar("T")


class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem):
def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile:
# In LocalFileSystem, parent directories must be first created before opening an output stream
self.create_dir(os.path.dirname(path), recursive=True)
return super().open_output_stream(path, *args, **kwargs)


class PyArrowFile(InputFile, OutputFile):
"""A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

Expand Down Expand Up @@ -378,9 +385,7 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste

return GcsFileSystem(**gcs_kwargs)
elif scheme == "file":
from pyarrow.fs import LocalFileSystem

return LocalFileSystem()
return PyArrowLocalFileSystem()
else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

Expand Down
62 changes: 60 additions & 2 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pathlib import Path
from typing import Generator, List

import pyarrow as pa
import pytest
from pytest import TempPathFactory
from pytest_lazyfixture import lazy_fixture
Expand All @@ -35,7 +36,10 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.sorting import (
NullOrder,
SortDirection,
Expand Down Expand Up @@ -80,7 +84,7 @@ def catalog_memory(warehouse: Path) -> Generator[SqlCatalog, None, None]:
@pytest.fixture(scope="module")
def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": "sqlite:////tmp/sql-catalog.db",
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog("test_sql_catalog", **props)
Expand All @@ -92,7 +96,7 @@ def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
@pytest.fixture(scope="module")
def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": "sqlite:////tmp/sql-catalog.db",
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog("test_sql_catalog", **props)
Expand All @@ -102,6 +106,19 @@ def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, No
catalog.destroy_tables()


@pytest.fixture(scope="module")
def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
PY_IO_IMPL: FSSPEC_FILE_IO,
}
catalog = SqlCatalog("test_sql_catalog", **props)
catalog.create_tables()
yield catalog
catalog.destroy_tables()


def test_creation_with_no_uri() -> None:
with pytest.raises(NoSuchPropertyException):
SqlCatalog("test_ddb_catalog", not_uri="unused")
Expand Down Expand Up @@ -722,6 +739,47 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_i
assert new_schema.find_field("b").field_type == IntegerType()


@pytest.mark.parametrize(
'catalog',
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
lazy_fixture('catalog_sqlite_without_rowcount'),
lazy_fixture('catalog_sqlite_fsspec'),
],
)
def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None:
database_name, _table_name = random_identifier
catalog.create_namespace(database_name)
table = catalog.create_table(random_identifier, table_schema_simple)

df = pa.Table.from_pydict(
{
"foo": ["a"],
"bar": [1],
"baz": [True],
},
schema=schema_to_pyarrow(table_schema_simple),
)

table.append(df)

# new snapshot is written in APPEND mode
assert len(table.metadata.snapshots) == 1
assert table.metadata.snapshots[0].snapshot_id == table.metadata.current_snapshot_id
assert table.metadata.snapshots[0].parent_snapshot_id is None
assert table.metadata.snapshots[0].sequence_number == 1
assert table.metadata.snapshots[0].summary is not None
assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
assert table.metadata.snapshots[0].summary['added-data-files'] == '1'
assert table.metadata.snapshots[0].summary['added-records'] == '1'
assert table.metadata.snapshots[0].summary['total-data-files'] == '1'
assert table.metadata.snapshots[0].summary['total-records'] == '1'

# read back the data
assert df == table.scan().to_arrow()


@pytest.mark.parametrize(
'catalog',
[
Expand Down
23 changes: 23 additions & 0 deletions tests/io/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
# specific language governing permissions and limitations
# under the License.

import os
import tempfile
import uuid

import pytest
from botocore.awsrequest import AWSRequest
from fsspec.implementations.local import LocalFileSystem
from requests_mock import Mocker

from pyiceberg.exceptions import SignError
Expand All @@ -27,6 +30,26 @@
from pyiceberg.io.pyarrow import PyArrowFileIO


def test_fsspec_infer_local_fs_from_path(fsspec_fileio: FsspecFileIO) -> None:
"""Test path with `file` scheme and no scheme both use LocalFileSystem"""
assert isinstance(fsspec_fileio.new_output("file://tmp/warehouse")._fs, LocalFileSystem)
assert isinstance(fsspec_fileio.new_output("/tmp/warehouse")._fs, LocalFileSystem)


def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio: FsspecFileIO) -> None:
"""Test LocalFileSystem can create path without first creating the parent directories"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_path = f"{tmpdirname}/foo/bar/baz.txt"
output_file = fsspec_fileio.new_output(file_path)
parent_path = os.path.dirname(file_path)
assert output_file._fs.exists(parent_path) is False
try:
with output_file.create() as f:
f.write(b"foo")
except Exception:
pytest.fail("Failed to write to file without parent directory")


@pytest.mark.s3
def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None:
"""Test creating a new input file from a fsspec file-io"""
Expand Down
20 changes: 20 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,26 @@
)


def test_pyarrow_infer_local_fs_from_path() -> None:
"""Test path with `file` scheme and no scheme both use LocalFileSystem"""
assert isinstance(PyArrowFileIO().new_output("file://tmp/warehouse")._filesystem, LocalFileSystem)
assert isinstance(PyArrowFileIO().new_output("/tmp/warehouse")._filesystem, LocalFileSystem)


def test_pyarrow_local_fs_can_create_path_without_parent_dir() -> None:
"""Test LocalFileSystem can create path without first creating the parent directories"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_path = f"{tmpdirname}/foo/bar/baz.txt"
output_file = PyArrowFileIO().new_output(file_path)
parent_path = os.path.dirname(file_path)
assert output_file._filesystem.get_file_info(parent_path).type == FileType.NotFound
try:
with output_file.create() as f:
f.write(b"foo")
except Exception:
pytest.fail("Failed to write to file without parent directory")


def test_pyarrow_input_file() -> None:
"""Test reading a file using PyArrowFile"""

Expand Down