Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Implement sliced downloads in GSClient #389

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased
- Fix `S3Client` cleanup via `Client.__del__` when `S3Client` encounters an exception during initialization. (Issue [#372](https://github.com/drivendataorg/cloudpathlib/issues/372), PR [#373](https://github.com/drivendataorg/cloudpathlib/pull/373))
- Skip mtime checks during upload when force_overwrite_to_cloud is set to improve upload performance. (Issue [#379](https://github.com/drivendataorg/cloudpathlib/issues/379), PR [#380](https://github.com/drivendataorg/cloudpathlib/pull/380), thanks to [@Gilthans](https://github.com/Gilthans))
- Implement sliced downloads in GSClient. (Issue [#387](https://github.com/drivendataorg/cloudpathlib/issues/387), PR [#389](https://github.com/drivendataorg/cloudpathlib/pull/389))

## v0.16.0 (2023-10-09)
- Add "CloudPath" as return type on `__init__` for mypy issues. ([Issue #179](https://github.com/drivendataorg/cloudpathlib/issues/179), [PR #342](https://github.com/drivendataorg/cloudpathlib/pull/342))
Expand Down
15 changes: 14 additions & 1 deletion cloudpathlib/gs/gsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from google.api_core.exceptions import NotFound
from google.auth.exceptions import DefaultCredentialsError
from google.cloud.storage import Client as StorageClient
from google.cloud.storage import transfer_manager


except ModuleNotFoundError:
Expand All @@ -39,6 +40,7 @@ def __init__(
file_cache_mode: Optional[Union[str, FileCacheMode]] = None,
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
download_chunks_concurrently_kwargs: Optional[Dict[str, Any]] = None,
):
"""Class constructor. Sets up a [`Storage
Client`](https://googleapis.dev/python/storage/latest/client.html).
Expand Down Expand Up @@ -76,6 +78,9 @@ def __init__(
the `CLOUDPATHLIB_LOCAL_CACHE_DIR` environment variable.
content_type_method (Optional[Callable]): Function to call to guess media type (mimetype) when
writing a file to the cloud. Defaults to `mimetypes.guess_type`. Must return a tuple (content type, content encoding).
download_chunks_concurrently_kwargs (Optional[Dict[str, Any]]): Keyword arguments to pass to
[`download_chunks_concurrently`](https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.transfer_manager#google_cloud_storage_transfer_manager_download_chunks_concurrently)
for sliced parallel downloads.
"""
if application_credentials is None:
application_credentials = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
Expand All @@ -92,6 +97,8 @@ def __init__(
except DefaultCredentialsError:
self.client = StorageClient.create_anonymous_client()

self.download_chunks_concurrently_kwargs = download_chunks_concurrently_kwargs

super().__init__(
local_cache_dir=local_cache_dir,
content_type_method=content_type_method,
Expand All @@ -118,7 +125,13 @@ def _download_file(self, cloud_path: GSPath, local_path: Union[str, os.PathLike]

local_path = Path(local_path)

blob.download_to_filename(local_path)
if self.download_chunks_concurrently_kwargs is not None:
transfer_manager.download_chunks_concurrently(
blob, local_path, **self.download_chunks_concurrently_kwargs
)
else:
blob.download_to_filename(local_path)

return local_path

def _is_file_or_dir(self, cloud_path: GSPath) -> Optional[str]:
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .mock_clients.mock_gs import (
mocked_client_class_factory as mocked_gsclient_class_factory,
DEFAULT_GS_BUCKET_NAME,
MockTransferManager,
)
from .mock_clients.mock_s3 import mocked_session_class_factory, DEFAULT_S3_BUCKET_NAME

Expand Down Expand Up @@ -184,6 +185,11 @@ def gs_rig(request, monkeypatch, assets_dir):
"StorageClient",
mocked_gsclient_class_factory(test_dir),
)
monkeypatch.setattr(
cloudpathlib.gs.gsclient,
"transfer_manager",
MockTransferManager,
)

rig = CloudProviderTestRig(
path_class=GSPath,
Expand Down
31 changes: 31 additions & 0 deletions tests/mock_clients/mock_gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ def patch(self):
if "updated" in self.metadata:
(self.bucket / self.name).touch()

def reload(
self,
client=None,
projection="noAcl",
if_etag_match=None,
if_etag_not_match=None,
if_generation_match=None,
if_generation_not_match=None,
if_metageneration_match=None,
if_metageneration_not_match=None,
timeout=None,
retry=None,
):
pass

def upload_from_filename(self, filename, content_type=None):
data = Path(filename).read_bytes()
path = self.bucket / self.name
Expand Down Expand Up @@ -153,3 +168,19 @@ def __next__(self):
@property
def prefixes(self):
return self.sub_directories


class MockTransferManager:
@staticmethod
def download_chunks_concurrently(
blob,
filename,
chunk_size=32 * 1024 * 1024,
download_kwargs=None,
deadline=None,
worker_type="process",
max_workers=8,
*,
crc32c_checksum=True,
):
blob.download_to_filename(filename)
10 changes: 10 additions & 0 deletions tests/test_gs_specific.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,13 @@ def test_gspath_properties(path_class):
p2 = path_class("gs://mybucket/")
assert p2.blob == ""
assert p2.bucket == "mybucket"


@pytest.mark.parametrize("worker_type", ["process", "thread"])
def test_concurrent_download(gs_rig, tmp_path, worker_type):
client = gs_rig.client_class(download_chunks_concurrently_kwargs={"worker_type": worker_type})
p = gs_rig.create_cloud_path("dir_0/file0_0.txt", client=client)
dl_dir = tmp_path
assert not (dl_dir / p.name).exists()
p.download_to(dl_dir)
assert (dl_dir / p.name).is_file()
Loading