diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index 1b0bc71af0..f03048c8cd 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -275,7 +275,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: "s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO], "s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO], "gs": [ARROW_FILE_IO], - "file": [ARROW_FILE_IO], + "file": [ARROW_FILE_IO, FSSPEC_FILE_IO], "hdfs": [ARROW_FILE_IO], "abfs": [FSSPEC_FILE_IO], "abfss": [FSSPEC_FILE_IO], diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 6887007c7f..957cac66f2 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -99,7 +99,7 @@ def s3v4_rest_signer(properties: Properties, request: AWSRequest, **_: Any) -> A def _file(_: Properties) -> LocalFileSystem: - return LocalFileSystem() + return LocalFileSystem(auto_mkdir=True) def _s3(properties: Properties) -> AbstractFileSystem: @@ -173,6 +173,7 @@ def _adlfs(properties: Properties) -> AbstractFileSystem: SCHEME_TO_FS = { + "": _file, "file": _file, "s3": _s3, "s3a": _s3, diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 035f5e8031..8291ce385d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -172,6 +172,13 @@ T = TypeVar("T") +class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem): + def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile: + # In LocalFileSystem, parent directories must be first created before opening an output stream + self.create_dir(os.path.dirname(path), recursive=True) + return super().open_output_stream(path, *args, **kwargs) + + class PyArrowFile(InputFile, OutputFile): """A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances. @@ -378,9 +385,7 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste return GcsFileSystem(**gcs_kwargs) elif scheme == "file": - from pyarrow.fs import LocalFileSystem - - return LocalFileSystem() + return PyArrowLocalFileSystem() else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 217ea8f535..9dbcf8f84e 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -19,6 +19,7 @@ from pathlib import Path from typing import Generator, List +import pyarrow as pa import pytest from pytest import TempPathFactory from pytest_lazyfixture import lazy_fixture @@ -35,7 +36,10 @@ NoSuchTableError, TableAlreadyExistsError, ) +from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL +from pyiceberg.io.pyarrow import schema_to_pyarrow from pyiceberg.schema import Schema +from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import ( NullOrder, SortDirection, @@ -80,7 +84,7 @@ def catalog_memory(warehouse: Path) -> Generator[SqlCatalog, None, None]: @pytest.fixture(scope="module") def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]: props = { - "uri": "sqlite:////tmp/sql-catalog.db", + "uri": f"sqlite:////{warehouse}/sql-catalog.db", "warehouse": f"file://{warehouse}", } catalog = SqlCatalog("test_sql_catalog", **props) @@ -92,7 +96,7 @@ def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]: @pytest.fixture(scope="module") def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, None, None]: props = { - "uri": "sqlite:////tmp/sql-catalog.db", + "uri": f"sqlite:////{warehouse}/sql-catalog.db", "warehouse": f"file://{warehouse}", } catalog = SqlCatalog("test_sql_catalog", **props) @@ -102,6 +106,19 @@ def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, No catalog.destroy_tables() +@pytest.fixture(scope="module") +def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None, None]: + props = { + "uri": f"sqlite:////{warehouse}/sql-catalog.db", + "warehouse": f"file://{warehouse}", + PY_IO_IMPL: FSSPEC_FILE_IO, + } + catalog = SqlCatalog("test_sql_catalog", **props) + catalog.create_tables() + yield catalog + catalog.destroy_tables() + + def test_creation_with_no_uri() -> None: with pytest.raises(NoSuchPropertyException): SqlCatalog("test_ddb_catalog", not_uri="unused") @@ -722,6 +739,47 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_i assert new_schema.find_field("b").field_type == IntegerType() +@pytest.mark.parametrize( + 'catalog', + [ + lazy_fixture('catalog_memory'), + lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), + lazy_fixture('catalog_sqlite_fsspec'), + ], +) +def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None: + database_name, _table_name = random_identifier + catalog.create_namespace(database_name) + table = catalog.create_table(random_identifier, table_schema_simple) + + df = pa.Table.from_pydict( + { + "foo": ["a"], + "bar": [1], + "baz": [True], + }, + schema=schema_to_pyarrow(table_schema_simple), + ) + + table.append(df) + + # new snapshot is written in APPEND mode + assert len(table.metadata.snapshots) == 1 + assert table.metadata.snapshots[0].snapshot_id == table.metadata.current_snapshot_id + assert table.metadata.snapshots[0].parent_snapshot_id is None + assert table.metadata.snapshots[0].sequence_number == 1 + assert table.metadata.snapshots[0].summary is not None + assert table.metadata.snapshots[0].summary.operation == Operation.APPEND + assert table.metadata.snapshots[0].summary['added-data-files'] == '1' + assert table.metadata.snapshots[0].summary['added-records'] == '1' + assert table.metadata.snapshots[0].summary['total-data-files'] == '1' + assert table.metadata.snapshots[0].summary['total-records'] == '1' + + # read back the data + assert df == table.scan().to_arrow() + + @pytest.mark.parametrize( 'catalog', [ diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py index f83268b56f..9f044454b4 100644 --- a/tests/io/test_fsspec.py +++ b/tests/io/test_fsspec.py @@ -15,10 +15,13 @@ # specific language governing permissions and limitations # under the License. +import os +import tempfile import uuid import pytest from botocore.awsrequest import AWSRequest +from fsspec.implementations.local import LocalFileSystem from requests_mock import Mocker from pyiceberg.exceptions import SignError @@ -27,6 +30,26 @@ from pyiceberg.io.pyarrow import PyArrowFileIO +def test_fsspec_infer_local_fs_from_path(fsspec_fileio: FsspecFileIO) -> None: + """Test path with `file` scheme and no scheme both use LocalFileSystem""" + assert isinstance(fsspec_fileio.new_output("file://tmp/warehouse")._fs, LocalFileSystem) + assert isinstance(fsspec_fileio.new_output("/tmp/warehouse")._fs, LocalFileSystem) + + +def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio: FsspecFileIO) -> None: + """Test LocalFileSystem can create path without first creating the parent directories""" + with tempfile.TemporaryDirectory() as tmpdirname: + file_path = f"{tmpdirname}/foo/bar/baz.txt" + output_file = fsspec_fileio.new_output(file_path) + parent_path = os.path.dirname(file_path) + assert output_file._fs.exists(parent_path) is False + try: + with output_file.create() as f: + f.write(b"foo") + except Exception: + pytest.fail("Failed to write to file without parent directory") + + @pytest.mark.s3 def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None: """Test creating a new input file from a fsspec file-io""" diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 5efeb42ed8..b7869ac3fd 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -92,6 +92,26 @@ ) +def test_pyarrow_infer_local_fs_from_path() -> None: + """Test path with `file` scheme and no scheme both use LocalFileSystem""" + assert isinstance(PyArrowFileIO().new_output("file://tmp/warehouse")._filesystem, LocalFileSystem) + assert isinstance(PyArrowFileIO().new_output("/tmp/warehouse")._filesystem, LocalFileSystem) + + +def test_pyarrow_local_fs_can_create_path_without_parent_dir() -> None: + """Test LocalFileSystem can create path without first creating the parent directories""" + with tempfile.TemporaryDirectory() as tmpdirname: + file_path = f"{tmpdirname}/foo/bar/baz.txt" + output_file = PyArrowFileIO().new_output(file_path) + parent_path = os.path.dirname(file_path) + assert output_file._filesystem.get_file_info(parent_path).type == FileType.NotFound + try: + with output_file.create() as f: + f.write(b"foo") + except Exception: + pytest.fail("Failed to write to file without parent directory") + + def test_pyarrow_input_file() -> None: """Test reading a file using PyArrowFile"""