Skip to content

Commit

Permalink
adds tests cases for windows extended paths + docs
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed May 7, 2024
1 parent 0b806f2 commit fc718d5
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 15 deletions.
1 change: 0 additions & 1 deletion dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def __init__(self, schema: Schema, config: FilesystemDestinationClientConfigurat
super().__init__(schema, config)
self.fs_client, fs_path = fsspec_from_config(config)
self.is_local_filesystem = config.protocol == "file"
#
self.bucket_path = (
config.make_local_path(config.bucket_url) if self.is_local_filesystem else fs_path
)
Expand Down
14 changes: 14 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@ bucket_url="file://localhost/c$/a/b/c"
bucket_url="file:////localhost/c$/a/b/c"
```

:::caution
Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception.

To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry). `dlt` recognizes both regular and UNC extended paths

```toml
[destination.regular_extended]
bucket_url = '\\?\C:\a\b\c'

[destination.unc_extended]
bucket_url='\\?\UNC\localhost\c$\a\b\c'
```
:::

## Write disposition
The filesystem destination handles the write dispositions as follows:
- `append` - files belonging to such tables are added to the dataset folder
Expand Down
12 changes: 12 additions & 0 deletions docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ You can use both native local file system paths and in form of `file:` uri. Abso
You can find relevant examples in [filesystem destination documentation](../destinations/filesystem.md#local-file-system) which follows
the same rules to specify the `bucket_url`.

:::caution
Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception.

To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry).
**Note that Python glob does not work with extended UNC paths** so you will not be able to use them

```toml
[sources.filesystem]
bucket_url = '\\?\C:\a\b\c'
```
:::

## Run the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by
Expand Down
13 changes: 11 additions & 2 deletions tests/common/storages/test_local_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tests.utils import skipifnotwindows, skipifwindows

UNC_LOCAL_PATH = r"\\localhost\c$\tests\common\test.csv"
UNC_LOCAL_EXT_PATH = r"\\?\UNC\localhost\c$\tests\common\test.csv"
UNC_WSL_PATH = r"\\wsl.localhost\Ubuntu-18.04\home\rudolfix\ .dlt"


Expand All @@ -20,9 +21,10 @@
"bucket_url,file_url",
(
(UNC_LOCAL_PATH, pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_uri()),
(UNC_LOCAL_EXT_PATH, pathlib.PureWindowsPath(UNC_LOCAL_EXT_PATH).as_uri()),
(UNC_WSL_PATH, pathlib.PureWindowsPath(UNC_WSL_PATH).as_uri()),
(r"C:\hello", "file:///C:/hello"),
# (r"\\?\C:\hello", "file:///C:/hello"),
(r"\\?\C:\hello", "file://%3F/C%3A/hello"),
(r"a\b $\b", "file:///" + pathlib.Path(r"a\\" + quote("b $") + r"\b").resolve().as_posix()),
# same paths but with POSIX separators
(
Expand Down Expand Up @@ -232,7 +234,9 @@ def test_filesystem_decompress() -> None:

# create windows UNC paths, on POSIX systems they are not used
WIN_ABS_PATH = os.path.abspath(TEST_SAMPLE_FILES)
WIN_ABS_EXT_PATH = "\\\\?\\" + os.path.abspath(TEST_SAMPLE_FILES)
WIN_UNC_PATH = "\\\\localhost\\" + WIN_ABS_PATH.replace(":", "$").lower()
WIN_UNC_EXT_PATH = "\\\\?\\UNC\\localhost\\" + WIN_ABS_PATH.replace(":", "$").lower()


@skipifnotwindows
Expand All @@ -242,8 +246,13 @@ def test_filesystem_decompress() -> None:
WIN_UNC_PATH,
"file:///" + pathlib.Path(WIN_UNC_PATH).as_posix(),
"file://localhost/" + pathlib.Path(WIN_ABS_PATH).as_posix().replace(":", "$"),
# WIN_UNC_EXT_PATH,
# "file:///" + pathlib.Path(WIN_UNC_EXT_PATH).as_posix(),
# "file://localhost/" + pathlib.Path(WIN_UNC_EXT_PATH).as_posix().replace(":", "$"),
WIN_ABS_PATH,
"file:///" + pathlib.Path(WIN_ABS_PATH).as_posix(),
WIN_ABS_EXT_PATH,
pathlib.Path(WIN_ABS_PATH).as_uri(),
pathlib.Path(WIN_ABS_EXT_PATH).as_uri(),
# r"\\wsl.localhost\Ubuntu-18.04\home\rudolfix\src\dlt\tests\common\storages\samples"
),
)
Expand Down
10 changes: 5 additions & 5 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_pipeline_csv_filesystem_destination(item_type: TestDataItemFormat) -> N
os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True"
os.environ["RESTORE_FROM_DESTINATION"] = "False"
# store locally
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage"
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage"

pipeline = dlt.pipeline(
pipeline_name="parquet_test_" + uniq_id(),
Expand All @@ -110,7 +110,7 @@ def test_csv_options(item_type: TestDataItemFormat) -> None:
os.environ["NORMALIZE__DATA_WRITER__DELIMITER"] = "|"
os.environ["NORMALIZE__DATA_WRITER__INCLUDE_HEADER"] = "False"
# store locally
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage"
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage"
pipeline = dlt.pipeline(
pipeline_name="parquet_test_" + uniq_id(),
destination="filesystem",
Expand Down Expand Up @@ -139,7 +139,7 @@ def test_csv_quoting_style(item_type: TestDataItemFormat) -> None:
os.environ["NORMALIZE__DATA_WRITER__QUOTING"] = "quote_all"
os.environ["NORMALIZE__DATA_WRITER__INCLUDE_HEADER"] = "False"
# store locally
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage"
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage"
pipeline = dlt.pipeline(
pipeline_name="parquet_test_" + uniq_id(),
destination="filesystem",
Expand Down Expand Up @@ -170,7 +170,7 @@ def test_pipeline_parquet_filesystem_destination() -> None:
import pyarrow.parquet as pq # Module is evaluated by other tests

# store locally
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage"
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage"
pipeline = dlt.pipeline(
pipeline_name="parquet_test_" + uniq_id(),
destination="filesystem",
Expand Down Expand Up @@ -264,7 +264,7 @@ def count(*args, **kwargs) -> Any:
"hiphip": counter("Hurraaaa"),
}
now = pendulum.now()
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage"
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage"
os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE"

# the reason why we are patching pendulum.from_timestamp is that
Expand Down
20 changes: 13 additions & 7 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2165,12 +2165,18 @@ def test_yielding_empty_list_creates_table() -> None:
assert rows[0] == (1, None)


@skipifnotwindows
def test_local_filesystem_destination() -> None:
# make it unc path
unc_path = "\\\\localhost\\" + os.path.abspath("_storage").replace(":", "$")
print(unc_path)
local_paths = [os.path.abspath("_storage"), "_storage"]
if os.name == "nt":
local_paths += [
# UNC extended path
"\\\\?\\UNC\\localhost\\" + os.path.abspath("_storage").replace(":", "$"),
# UNC path
"\\\\localhost\\" + os.path.abspath("_storage").replace(":", "$"),
]


@pytest.mark.parametrize("local_path", local_paths)
def test_local_filesystem_destination(local_path: str) -> None:
dataset_name = "mydata_" + uniq_id()

@dlt.resource
Expand All @@ -2180,7 +2186,7 @@ def stateful_resource():

pipeline = dlt.pipeline(
pipeline_name="local_files",
destination=dlt.destinations.filesystem(unc_path),
destination=dlt.destinations.filesystem(local_path),
dataset_name=dataset_name,
)
info = pipeline.run(stateful_resource(), table_name="numbers", write_disposition="replace")
Expand Down Expand Up @@ -2217,7 +2223,7 @@ def stateful_resource():
# all path formats we use must lead to "_storage" relative to tests
assert (
pathlib.Path(fs_client.dataset_path).resolve()
== pathlib.Path(unc_path).joinpath(dataset_name).resolve()
== pathlib.Path(local_path).joinpath(dataset_name).resolve()
)
# same for client
assert len(fs_client.list_table_files("numbers")) == 1
Expand Down

0 comments on commit fc718d5

Please sign in to comment.