From b6a6bf751a30222210b96335dc4640132af239ac Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 2 Sep 2024 23:29:32 +0200 Subject: [PATCH] sets default concurrency for blob upload for adlfs to 1 --- dlt/common/storages/fsspec_filesystem.py | 11 +++++++++++ .../docs/dlt-ecosystem/destinations/filesystem.md | 9 +++++++++ 2 files changed, 20 insertions(+) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 7da5ebabef..76dbd98e6e 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -84,6 +84,16 @@ class FileItem(TypedDict, total=False): CREDENTIALS_DISPATCH["abfss"] = CREDENTIALS_DISPATCH["az"] CREDENTIALS_DISPATCH["gcs"] = CREDENTIALS_DISPATCH["gs"] +# Default kwargs for protocol +DEFAULT_KWARGS = { + # disable concurrent + "az": {"max_concurrency": 1} +} +DEFAULT_KWARGS["adl"] = DEFAULT_KWARGS["az"] +DEFAULT_KWARGS["abfs"] = DEFAULT_KWARGS["az"] +DEFAULT_KWARGS["azure"] = DEFAULT_KWARGS["az"] +DEFAULT_KWARGS["abfss"] = DEFAULT_KWARGS["az"] + def fsspec_filesystem( protocol: str, @@ -125,6 +135,7 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny: register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem") + fs_kwargs.update(DEFAULT_KWARGS.get(protocol, {})) if config.kwargs is not None: fs_kwargs.update(config.kwargs) if config.client_kwargs is not None: diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 018b838363..4d60f3d0ad 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -186,6 +186,15 @@ azure_client_secret = "client_secret" azure_tenant_id = "tenant_id" # please set me up! ``` +:::caution +**Concurrent blob uploads** +`dlt` limits the number of concurrent connections for a single uploaded blob to 1. By default `adlfs` that we use, splits blobs into 4 MB chunks and uploads them concurrently which leads to gigabytes of used memory and thousands of connections for a larger load packages. You can increase the maximum concurrency as follows: +```toml +[destination.filesystem.kwargs] +max_concurrency=3 +``` +::: + ### Local file system If for any reason you want to have those files in a local folder, set up the `bucket_url` as follows (you are free to use `config.toml` for that as there are no secrets required)