Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Add snippets for upload_chunks_concurrently and add chunk_size #1135

Merged
merged 6 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions samples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import storage_transfer_manager_download_bucket
import storage_transfer_manager_download_chunks_concurrently
import storage_transfer_manager_download_many
import storage_transfer_manager_upload_chunks_concurrently
import storage_transfer_manager_upload_directory
import storage_transfer_manager_upload_many
import storage_upload_file
Expand Down Expand Up @@ -243,7 +244,10 @@ def test_upload_blob_with_kms(test_bucket):
with tempfile.NamedTemporaryFile() as source_file:
source_file.write(b"test")
storage_upload_with_kms_key.upload_blob_with_kms(
test_bucket.name, source_file.name, blob_name, KMS_KEY,
test_bucket.name,
source_file.name,
blob_name,
KMS_KEY,
)
bucket = storage.Client().bucket(test_bucket.name)
kms_blob = bucket.get_blob(blob_name)
Expand Down Expand Up @@ -396,7 +400,10 @@ def test_move_blob(test_bucket_create, test_blob):
print(f"test_move_blob not found in bucket {test_bucket_create.name}")

storage_move_file.move_blob(
bucket.name, test_blob.name, test_bucket_create.name, "test_move_blob",
bucket.name,
test_blob.name,
test_bucket_create.name,
"test_move_blob",
)

assert test_bucket_create.get_blob("test_move_blob") is not None
Expand All @@ -412,7 +419,10 @@ def test_copy_blob(test_blob):
pass

storage_copy_file.copy_blob(
bucket.name, test_blob.name, bucket.name, "test_copy_blob",
bucket.name,
test_blob.name,
bucket.name,
"test_copy_blob",
)

assert bucket.get_blob("test_copy_blob") is not None
Expand Down Expand Up @@ -551,7 +561,10 @@ def test_define_bucket_website_configuration(test_bucket):
def test_object_get_kms_key(test_bucket):
with tempfile.NamedTemporaryFile() as source_file:
storage_upload_with_kms_key.upload_blob_with_kms(
test_bucket.name, source_file.name, "test_upload_blob_encrypted", KMS_KEY,
test_bucket.name,
source_file.name,
"test_upload_blob_encrypted",
KMS_KEY,
)
kms_key = storage_object_get_kms_key.object_get_kms_key(
test_bucket.name, "test_upload_blob_encrypted"
Expand All @@ -568,7 +581,10 @@ def test_storage_compose_file(test_bucket):

with tempfile.NamedTemporaryFile() as dest_file:
destination = storage_compose_file.compose_file(
test_bucket.name, source_files[0], source_files[1], dest_file.name,
test_bucket.name,
source_files[0],
source_files[1],
dest_file.name,
)
composed = destination.download_as_string()

Expand Down Expand Up @@ -608,7 +624,8 @@ def test_change_default_storage_class(test_bucket, capsys):

def test_change_file_storage_class(test_blob, capsys):
blob = storage_change_file_storage_class.change_file_storage_class(
test_blob.bucket.name, test_blob.name,
test_blob.bucket.name,
test_blob.name,
)
out, _ = capsys.readouterr()
assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out
Expand Down Expand Up @@ -763,9 +780,7 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys):
with tempfile.NamedTemporaryFile() as file:
file.write(b"test")

storage_upload_file.upload_blob(
test_bucket.name, file.name, BLOB_NAME
)
storage_upload_file.upload_blob(test_bucket.name, file.name, BLOB_NAME)

with tempfile.TemporaryDirectory() as downloads:
# Download the file.
Expand All @@ -777,4 +792,22 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys):
)
out, _ = capsys.readouterr()

assert "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) in out
assert (
"Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME))
in out
)


def test_transfer_manager_upload_chunks_concurrently(test_bucket, capsys):
BLOB_NAME = "test_file.txt"

with tempfile.NamedTemporaryFile() as file:
file.write(b"test")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't exercise multiple chunks; recommend increasing the object size to test chunk_size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sufficient to test the snippet. The feature itself is not under test here - it is fully covered in the integration tests, with appropriately-sized test files.

file.flush()

storage_transfer_manager_upload_chunks_concurrently.upload_chunks_concurrently(
test_bucket.name, file.name, BLOB_NAME
)

out, _ = capsys.readouterr()
assert "File {} uploaded to {}".format(file.name, BLOB_NAME) in out
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# limitations under the License.

# [START storage_transfer_manager_download_chunks_concurrently]
def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
def download_chunks_concurrently(
bucket_name, blob_name, filename, chunk_size=32 * 1024 * 1024, processes=8
):
"""Download a single file in chunks, concurrently in a process pool."""

# The ID of your GCS bucket
Expand All @@ -25,6 +27,11 @@ def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
# The destination filename or path
# filename = ""

# The size of each chunk. The performance impact of this value depends on
# the use case. The remote service has a minimum of 5 MiB and a maximum of
# 5 GiB.
# chunk_size = 32 * 1024 * 1024 (32 MiB)

# The maximum number of processes to use for the operation. The performance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the rule of thumb here: "number of cores your CPU has"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, workloads with small files benefit from many times that number and workloads with large files max out the NIC below that, so the number of cores is not a good starting place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that would go into docs instead of sample comments?

# impact of this value depends on the use case, but smaller files usually
# benefit from a higher number of processes. Each additional process occupies
Expand All @@ -37,7 +44,11 @@ def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)

transfer_manager.download_chunks_concurrently(blob, filename, max_workers=processes)
transfer_manager.download_chunks_concurrently(
blob, filename, chunk_size=chunk_size, max_workers=processes
)

print("Downloaded {} to {}.".format(blob_name, filename))


# [END storage_transfer_manager_download_chunks_concurrently]
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2022 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2023

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix

#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START storage_transfer_manager_upload_chunks_concurrently]
def upload_chunks_concurrently(
bucket_name,
source_filename,
destination_blob_name,
chunk_size=32 * 1024 * 1024,
processes=8,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it might be easier to understand as "workers" for this and the other samples, I'll change it.

):
"""Upload a single file, in chunks, concurrently in a process pool."""
# The ID of your GCS bucket
# bucket_name = "your-bucket-name"

# The path to your file to upload
# source_filename = "local/path/to/file"

# The ID of your GCS object
# destination_blob_name = "storage-object-name"

# The size of each chunk. The performance impact of this value depends on
# the use case. The remote service has a minimum of 5 MiB and a maximum of
# 5 GiB.
# chunk_size = 32 * 1024 * 1024 (32 MiB)

# The maximum number of processes to use for the operation. The performance
# impact of this value depends on the use case. Each additional process
# occupies some CPU and memory resources until finished.
# processes=8

from google.cloud.storage import Client, transfer_manager

storage_client = Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

transfer_manager.upload_chunks_concurrently(
source_filename, blob, chunk_size=chunk_size, max_workers=processes
)

print(f"File {source_filename} uploaded to {destination_blob_name}.")


# [END storage_transfer_manager_upload_chunks_concurrently]