From 8421e5ff228d0755a9d9cc922273cc068462c99d Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 29 Sep 2023 17:02:47 -0700 Subject: [PATCH 1/4] docs: Add snippets for upload_chunks_concurrently and add chunk_size --- samples/snippets/snippets_test.py | 53 ++++++++++++++---- ...er_manager_download_chunks_concurrently.py | 15 ++++- ...sfer_manager_upload_chunks_concurrently.py | 56 +++++++++++++++++++ 3 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 2da7bb94c..afe803de5 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -75,6 +75,7 @@ import storage_transfer_manager_download_bucket import storage_transfer_manager_download_chunks_concurrently import storage_transfer_manager_download_many +import storage_transfer_manager_upload_chunks_concurrently import storage_transfer_manager_upload_directory import storage_transfer_manager_upload_many import storage_upload_file @@ -243,7 +244,10 @@ def test_upload_blob_with_kms(test_bucket): with tempfile.NamedTemporaryFile() as source_file: source_file.write(b"test") storage_upload_with_kms_key.upload_blob_with_kms( - test_bucket.name, source_file.name, blob_name, KMS_KEY, + test_bucket.name, + source_file.name, + blob_name, + KMS_KEY, ) bucket = storage.Client().bucket(test_bucket.name) kms_blob = bucket.get_blob(blob_name) @@ -396,7 +400,10 @@ def test_move_blob(test_bucket_create, test_blob): print(f"test_move_blob not found in bucket {test_bucket_create.name}") storage_move_file.move_blob( - bucket.name, test_blob.name, test_bucket_create.name, "test_move_blob", + bucket.name, + test_blob.name, + test_bucket_create.name, + "test_move_blob", ) assert test_bucket_create.get_blob("test_move_blob") is not None @@ -412,7 +419,10 @@ def test_copy_blob(test_blob): pass storage_copy_file.copy_blob( - bucket.name, test_blob.name, bucket.name, "test_copy_blob", + bucket.name, + test_blob.name, + bucket.name, + "test_copy_blob", ) assert bucket.get_blob("test_copy_blob") is not None @@ -551,7 +561,10 @@ def test_define_bucket_website_configuration(test_bucket): def test_object_get_kms_key(test_bucket): with tempfile.NamedTemporaryFile() as source_file: storage_upload_with_kms_key.upload_blob_with_kms( - test_bucket.name, source_file.name, "test_upload_blob_encrypted", KMS_KEY, + test_bucket.name, + source_file.name, + "test_upload_blob_encrypted", + KMS_KEY, ) kms_key = storage_object_get_kms_key.object_get_kms_key( test_bucket.name, "test_upload_blob_encrypted" @@ -568,7 +581,10 @@ def test_storage_compose_file(test_bucket): with tempfile.NamedTemporaryFile() as dest_file: destination = storage_compose_file.compose_file( - test_bucket.name, source_files[0], source_files[1], dest_file.name, + test_bucket.name, + source_files[0], + source_files[1], + dest_file.name, ) composed = destination.download_as_string() @@ -608,7 +624,8 @@ def test_change_default_storage_class(test_bucket, capsys): def test_change_file_storage_class(test_blob, capsys): blob = storage_change_file_storage_class.change_file_storage_class( - test_blob.bucket.name, test_blob.name, + test_blob.bucket.name, + test_blob.name, ) out, _ = capsys.readouterr() assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out @@ -763,9 +780,7 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys): with tempfile.NamedTemporaryFile() as file: file.write(b"test") - storage_upload_file.upload_blob( - test_bucket.name, file.name, BLOB_NAME - ) + storage_upload_file.upload_blob(test_bucket.name, file.name, BLOB_NAME) with tempfile.TemporaryDirectory() as downloads: # Download the file. @@ -777,4 +792,22 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys): ) out, _ = capsys.readouterr() - assert "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) in out + assert ( + "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) + in out + ) + + +def test_transfer_manager_upload_chunks_concurrently(test_bucket, capsys): + BLOB_NAME = "test_file.txt" + + with tempfile.NamedTemporaryFile() as file: + file.write(b"test") + file.flush() + + storage_transfer_manager_upload_chunks_concurrently.upload_chunks_concurrently( + test_bucket.name, file.name, BLOB_NAME + ) + + out, _ = capsys.readouterr() + assert "File {} uploaded to {}".format(file.name, BLOB_NAME) in out diff --git a/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py b/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py index 9ddec094e..f2c120be0 100644 --- a/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py +++ b/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py @@ -13,7 +13,9 @@ # limitations under the License. # [START storage_transfer_manager_download_chunks_concurrently] -def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8): +def download_chunks_concurrently( + bucket_name, blob_name, filename, chunk_size=32 * 1024 * 1024, processes=8 +): """Download a single file in chunks, concurrently in a process pool.""" # The ID of your GCS bucket @@ -25,6 +27,11 @@ def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8): # The destination filename or path # filename = "" + # The size of each chunk. The performance impact of this value depends on + # the use case. The remote service has a minimum of 5 MiB and a maximum of + # 5 GiB. + # chunk_size = 32 * 1024 * 1024 (32 MiB) + # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies @@ -37,7 +44,11 @@ def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8): bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) - transfer_manager.download_chunks_concurrently(blob, filename, max_workers=processes) + transfer_manager.download_chunks_concurrently( + blob, filename, chunk_size=chunk_size, max_workers=processes + ) print("Downloaded {} to {}.".format(blob_name, filename)) + + # [END storage_transfer_manager_download_chunks_concurrently] diff --git a/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py b/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py new file mode 100644 index 000000000..2315e67f7 --- /dev/null +++ b/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py @@ -0,0 +1,56 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START storage_transfer_manager_upload_chunks_concurrently] +def upload_chunks_concurrently( + bucket_name, + source_filename, + destination_blob_name, + chunk_size=32 * 1024 * 1024, + processes=8, +): + """Upload a single file, in chunks, concurrently in a process pool.""" + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The path to your file to upload + # source_filename = "local/path/to/file" + + # The ID of your GCS object + # destination_blob_name = "storage-object-name" + + # The size of each chunk. The performance impact of this value depends on + # the use case. The remote service has a minimum of 5 MiB and a maximum of + # 5 GiB. + # chunk_size = 32 * 1024 * 1024 (32 MiB) + + # The maximum number of processes to use for the operation. The performance + # impact of this value depends on the use case. Each additional process + # occupies some CPU and memory resources until finished. + # processes=8 + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(destination_blob_name) + + transfer_manager.upload_chunks_concurrently( + source_filename, blob, chunk_size=chunk_size, max_workers=processes + ) + + print(f"File {source_filename} uploaded to {destination_blob_name}.") + + +# [END storage_transfer_manager_upload_chunks_concurrently] From 89dbd113224ec6aa65ce0d6905f5a8b31111547f Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 9 Oct 2023 10:02:16 -0700 Subject: [PATCH 2/4] switch from 'processes' to 'workers' in sample nomenclature --- .../snippets/storage_transfer_manager_download_bucket.py | 9 +++++---- ...rage_transfer_manager_download_chunks_concurrently.py | 9 +++++---- .../snippets/storage_transfer_manager_download_many.py | 9 +++++---- ...torage_transfer_manager_upload_chunks_concurrently.py | 9 +++++---- .../storage_transfer_manager_upload_directory.py | 9 +++++---- samples/snippets/storage_transfer_manager_upload_many.py | 9 +++++---- 6 files changed, 30 insertions(+), 24 deletions(-) diff --git a/samples/snippets/storage_transfer_manager_download_bucket.py b/samples/snippets/storage_transfer_manager_download_bucket.py index 4f21ee6e9..5d94a67ae 100644 --- a/samples/snippets/storage_transfer_manager_download_bucket.py +++ b/samples/snippets/storage_transfer_manager_download_bucket.py @@ -14,7 +14,7 @@ # [START storage_transfer_manager_download_bucket] def download_bucket_with_transfer_manager( - bucket_name, destination_directory="", processes=8, max_results=1000 + bucket_name, destination_directory="", workers=8, max_results=1000 ): """Download all of the blobs in a bucket, concurrently in a process pool. @@ -40,8 +40,9 @@ def download_bucket_with_transfer_manager( # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 # The maximum number of results to fetch from bucket.list_blobs(). This # sample code fetches all of the blobs up to max_results and queues them all @@ -60,7 +61,7 @@ def download_bucket_with_transfer_manager( blob_names = [blob.name for blob in bucket.list_blobs(max_results=max_results)] results = transfer_manager.download_many_to_path( - bucket, blob_names, destination_directory=destination_directory, max_workers=processes + bucket, blob_names, destination_directory=destination_directory, max_workers=workers ) for name, result in zip(blob_names, results): diff --git a/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py b/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py index f2c120be0..b6ac9982d 100644 --- a/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py +++ b/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py @@ -14,7 +14,7 @@ # [START storage_transfer_manager_download_chunks_concurrently] def download_chunks_concurrently( - bucket_name, blob_name, filename, chunk_size=32 * 1024 * 1024, processes=8 + bucket_name, blob_name, filename, chunk_size=32 * 1024 * 1024, workers=8 ): """Download a single file in chunks, concurrently in a process pool.""" @@ -35,8 +35,9 @@ def download_chunks_concurrently( # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from google.cloud.storage import Client, transfer_manager @@ -45,7 +46,7 @@ def download_chunks_concurrently( blob = bucket.blob(blob_name) transfer_manager.download_chunks_concurrently( - blob, filename, chunk_size=chunk_size, max_workers=processes + blob, filename, chunk_size=chunk_size, max_workers=workers ) print("Downloaded {} to {}.".format(blob_name, filename)) diff --git a/samples/snippets/storage_transfer_manager_download_many.py b/samples/snippets/storage_transfer_manager_download_many.py index 500eea1ce..02cb9b887 100644 --- a/samples/snippets/storage_transfer_manager_download_many.py +++ b/samples/snippets/storage_transfer_manager_download_many.py @@ -14,7 +14,7 @@ # [START storage_transfer_manager_download_many] def download_many_blobs_with_transfer_manager( - bucket_name, blob_names, destination_directory="", processes=8 + bucket_name, blob_names, destination_directory="", workers=8 ): """Download blobs in a list by name, concurrently in a process pool. @@ -46,8 +46,9 @@ def download_many_blobs_with_transfer_manager( # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from google.cloud.storage import Client, transfer_manager @@ -55,7 +56,7 @@ def download_many_blobs_with_transfer_manager( bucket = storage_client.bucket(bucket_name) results = transfer_manager.download_many_to_path( - bucket, blob_names, destination_directory=destination_directory, max_workers=processes + bucket, blob_names, destination_directory=destination_directory, max_workers=workers ) for name, result in zip(blob_names, results): diff --git a/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py b/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py index 2315e67f7..40b6982b3 100644 --- a/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py +++ b/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py @@ -18,7 +18,7 @@ def upload_chunks_concurrently( source_filename, destination_blob_name, chunk_size=32 * 1024 * 1024, - processes=8, + workers=8, ): """Upload a single file, in chunks, concurrently in a process pool.""" # The ID of your GCS bucket @@ -37,8 +37,9 @@ def upload_chunks_concurrently( # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case. Each additional process - # occupies some CPU and memory resources until finished. - # processes=8 + # occupies some CPU and memory resources until finished. Threads can be used + # instead of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from google.cloud.storage import Client, transfer_manager @@ -47,7 +48,7 @@ def upload_chunks_concurrently( blob = bucket.blob(destination_blob_name) transfer_manager.upload_chunks_concurrently( - source_filename, blob, chunk_size=chunk_size, max_workers=processes + source_filename, blob, chunk_size=chunk_size, max_workers=workers ) print(f"File {source_filename} uploaded to {destination_blob_name}.") diff --git a/samples/snippets/storage_transfer_manager_upload_directory.py b/samples/snippets/storage_transfer_manager_upload_directory.py index c0dbb9c9c..329ca1081 100644 --- a/samples/snippets/storage_transfer_manager_upload_directory.py +++ b/samples/snippets/storage_transfer_manager_upload_directory.py @@ -13,7 +13,7 @@ # limitations under the License. # [START storage_transfer_manager_upload_directory] -def upload_directory_with_transfer_manager(bucket_name, source_directory, processes=8): +def upload_directory_with_transfer_manager(bucket_name, source_directory, workers=8): """Upload every file in a directory, including all files in subdirectories. Each blob name is derived from the filename, not including the `directory` @@ -33,8 +33,9 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, proces # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from pathlib import Path @@ -65,7 +66,7 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, proces # Start the upload. results = transfer_manager.upload_many_from_filenames( - bucket, string_paths, source_directory=source_directory, max_workers=processes + bucket, string_paths, source_directory=source_directory, max_workers=workers ) for name, result in zip(string_paths, results): diff --git a/samples/snippets/storage_transfer_manager_upload_many.py b/samples/snippets/storage_transfer_manager_upload_many.py index 2ed647650..1b9b9fc89 100644 --- a/samples/snippets/storage_transfer_manager_upload_many.py +++ b/samples/snippets/storage_transfer_manager_upload_many.py @@ -14,7 +14,7 @@ # [START storage_transfer_manager_upload_many] def upload_many_blobs_with_transfer_manager( - bucket_name, filenames, source_directory="", processes=8 + bucket_name, filenames, source_directory="", workers=8 ): """Upload every file in a list to a bucket, concurrently in a process pool. @@ -43,8 +43,9 @@ def upload_many_blobs_with_transfer_manager( # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from google.cloud.storage import Client, transfer_manager @@ -52,7 +53,7 @@ def upload_many_blobs_with_transfer_manager( bucket = storage_client.bucket(bucket_name) results = transfer_manager.upload_many_from_filenames( - bucket, filenames, source_directory=source_directory, max_workers=processes + bucket, filenames, source_directory=source_directory, max_workers=workers ) for name, result in zip(filenames, results): From aa35cfd0877642a7e8fd65089120540bcba5e441 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 9 Oct 2023 10:35:05 -0700 Subject: [PATCH 3/4] copyright --- .../storage_transfer_manager_upload_chunks_concurrently.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py b/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py index 40b6982b3..009f09648 100644 --- a/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py +++ b/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py @@ -1,4 +1,4 @@ -# Copyright 2022 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. From 18faaf11ad258c341079844355ce0266766a6f75 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 9 Oct 2023 14:14:56 -0700 Subject: [PATCH 4/4] tests --- samples/snippets/snippets_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index afe803de5..8014411e8 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -711,7 +711,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): test_bucket.name, BLOB_NAMES, source_directory="{}/".format(uploads), - processes=8, + workers=8, ) out, _ = capsys.readouterr() @@ -723,7 +723,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): storage_transfer_manager_download_bucket.download_bucket_with_transfer_manager( test_bucket.name, destination_directory=os.path.join(downloads, ""), - processes=8, + workers=8, max_results=10000, ) out, _ = capsys.readouterr() @@ -737,7 +737,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): test_bucket.name, blob_names=BLOB_NAMES, destination_directory=os.path.join(downloads, ""), - processes=8, + workers=8, ) out, _ = capsys.readouterr() @@ -788,7 +788,7 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys): test_bucket.name, BLOB_NAME, os.path.join(downloads, BLOB_NAME), - processes=8, + workers=8, ) out, _ = capsys.readouterr()