From a9df149283464f3bd7988e505af555860f5109d2 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Wed, 24 Jul 2024 08:59:27 +0200 Subject: [PATCH] fix: cancel upload when BlobWriter exits with exception Apply suggestions from code review --- README.rst | 6 ++++ docs/storage/exceptions.rst | 2 +- google/cloud/storage/fileio.py | 13 ++++++++ tests/system/test_fileio.py | 40 ++++++++++++++++++++++ tests/unit/test_fileio.py | 61 ++++++++++++++++++++++++++++++---- 5 files changed, 114 insertions(+), 8 deletions(-) diff --git a/README.rst b/README.rst index 569fde177..21a7aaad4 100644 --- a/README.rst +++ b/README.rst @@ -72,6 +72,12 @@ setup.py file. Applications which do not import directly from `google-resumable-media` can safely disregard this dependency. This backwards compatibility feature will be removed in a future major version update. +Miscellaneous +~~~~~~~~~~~~~ + +- The BlobWriter class now attempts to terminate an ongoing resumable upload if + the writer exits with an exception. + Quick Start ----------- diff --git a/docs/storage/exceptions.rst b/docs/storage/exceptions.rst index 81285394e..4b4995ca7 100644 --- a/docs/storage/exceptions.rst +++ b/docs/storage/exceptions.rst @@ -1,5 +1,5 @@ Exceptions -~~~~~~~~~ +~~~~~~~~~~ .. automodule:: google.cloud.storage.exceptions :members: diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 97d234983..985dd0012 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -437,6 +437,19 @@ def close(self): self._upload_chunks_from_buffer(1) self._buffer.close() + def terminate(self): + """Cancel the ResumableUpload.""" + if self._upload_and_transport: + upload, transport = self._upload_and_transport + transport.delete(upload.upload_url) + self._buffer.close() + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + self.terminate() + else: + self.close() + @property def closed(self): return self._buffer.closed diff --git a/tests/system/test_fileio.py b/tests/system/test_fileio.py index 79e0ab7da..be3943e97 100644 --- a/tests/system/test_fileio.py +++ b/tests/system/test_fileio.py @@ -14,6 +14,9 @@ # limitations under the License. +import pytest + +from google.cloud.storage.fileio import CHUNK_SIZE_MULTIPLE from .test_blob import _check_blob_hash @@ -76,3 +79,40 @@ def test_blobwriter_and_blobreader_text_mode( assert text_data[:100] == reader.read(100) assert 0 == reader.seek(0) assert reader.read() == text_data + + +def test_blobwriter_exit( + shared_bucket, + blobs_to_delete, + service_account, +): + blob = shared_bucket.blob("NeverUploaded") + + # no-op when nothing was uploaded yet + with pytest.raises(ValueError, match="SIGTERM received"): + with blob.open("wb") as writer: + writer.write(b"first chunk") # not yet uploaded + raise ValueError("SIGTERM received") # no upload to cancel in __exit__ + # blob should not exist + assert not blob.exists() + + # unhandled exceptions should cancel the upload + with pytest.raises(ValueError, match="SIGTERM received"): + with blob.open("wb", chunk_size=CHUNK_SIZE_MULTIPLE) as writer: + writer.write(b"first chunk") # not yet uploaded + writer.write(bytes(CHUNK_SIZE_MULTIPLE)) # uploaded + raise ValueError("SIGTERM received") # upload is cancelled in __exit__ + # blob should not exist + assert not blob.exists() + + # handled exceptions should not cancel the upload + with blob.open("wb", chunk_size=CHUNK_SIZE_MULTIPLE) as writer: + writer.write(b"first chunk") # not yet uploaded + writer.write(bytes(CHUNK_SIZE_MULTIPLE)) # uploaded + try: + raise ValueError("This is fine") + except ValueError: + pass # no exception context passed to __exit__ + blobs_to_delete.append(blob) + # blob should have been uploaded + assert blob.exists() diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index cafc65e49..b2cb4b2a9 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -21,6 +21,7 @@ import mock from google.api_core.exceptions import RequestRangeNotSatisfiable +from google.cloud.storage.fileio import CHUNK_SIZE_MULTIPLE from google.cloud.storage.retry import DEFAULT_RETRY TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n" @@ -377,7 +378,7 @@ def test_write(self, mock_warn): # Write under chunk_size. This should be buffered and the upload not # initiated. writer.write(TEST_BINARY_DATA[0:4]) - blob.initiate_resumable_upload.assert_not_called() + blob._initiate_resumable_upload.assert_not_called() # Write over chunk_size. This should result in upload initialization # and multiple chunks uploaded. @@ -426,6 +427,52 @@ def test_close_errors(self): with self.assertRaises(ValueError): writer.write(TEST_BINARY_DATA) + def test_terminate_after_initiate(self): + blob = mock.Mock() + + upload = mock.Mock(upload_url="dummy") + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with self.assertRaises(RuntimeError): + with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer: + writer.write(bytes(CHUNK_SIZE_MULTIPLE + 1)) # initiate upload + raise RuntimeError # should terminate the upload + blob._initiate_resumable_upload.assert_called_once() # upload initiated + self.assertTrue(writer.closed) # terminate called + transport.delete.assert_called_with("dummy") # resumable upload terminated + + def test_terminate_before_initiate(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with self.assertRaises(RuntimeError): + with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer: + writer.write(bytes(CHUNK_SIZE_MULTIPLE - 1)) # upload not yet initiated + raise RuntimeError # there is no resumable upload to terminate + blob._initiate_resumable_upload.assert_not_called() # upload not yet initiated + self.assertTrue(writer.closed) # terminate called + transport.delete.assert_not_called() # there's no resumable upload to terminate + + def test_terminate_skipped(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer: + writer.write(bytes(CHUNK_SIZE_MULTIPLE + 1)) # upload initiated + blob._initiate_resumable_upload.assert_called() # upload initiated + self.assertTrue(writer.closed) # close called + transport.delete.assert_not_called() # terminate not called + def test_flush_fails(self): blob = mock.Mock(chunk_size=None) writer = self._make_blob_writer(blob) @@ -468,7 +515,7 @@ def test_conditional_retry_failure(self): # Write under chunk_size. This should be buffered and the upload not # initiated. writer.write(TEST_BINARY_DATA[0:4]) - blob.initiate_resumable_upload.assert_not_called() + blob._initiate_resumable_upload.assert_not_called() # Write over chunk_size. This should result in upload initialization # and multiple chunks uploaded. @@ -520,7 +567,7 @@ def test_conditional_retry_pass(self): # Write under chunk_size. This should be buffered and the upload not # initiated. writer.write(TEST_BINARY_DATA[0:4]) - blob.initiate_resumable_upload.assert_not_called() + blob._initiate_resumable_upload.assert_not_called() # Write over chunk_size. This should result in upload initialization # and multiple chunks uploaded. @@ -573,7 +620,7 @@ def test_forced_default_retry(self): # Write under chunk_size. This should be buffered and the upload not # initiated. writer.write(TEST_BINARY_DATA[0:4]) - blob.initiate_resumable_upload.assert_not_called() + blob._initiate_resumable_upload.assert_not_called() # Write over chunk_size. This should result in upload initialization # and multiple chunks uploaded. @@ -619,7 +666,7 @@ def test_num_retries_and_retry_conflict(self, mock_warn): # Write under chunk_size. This should be buffered and the upload not # initiated. writer.write(TEST_BINARY_DATA[0:4]) - blob.initiate_resumable_upload.assert_not_called() + blob._initiate_resumable_upload.assert_not_called() # Write over chunk_size. The mock will raise a ValueError, simulating # actual behavior when num_retries and retry are both specified. @@ -673,7 +720,7 @@ def test_num_retries_only(self, mock_warn): # Write under chunk_size. This should be buffered and the upload not # initiated. writer.write(TEST_BINARY_DATA[0:4]) - blob.initiate_resumable_upload.assert_not_called() + blob._initiate_resumable_upload.assert_not_called() # Write over chunk_size. This should result in upload initialization # and multiple chunks uploaded. @@ -965,7 +1012,7 @@ def test_write(self, mock_warn): # Write under chunk_size. This should be buffered and the upload not # initiated. writer.write(TEST_MULTIBYTE_TEXT_DATA[0:2]) - blob.initiate_resumable_upload.assert_not_called() + blob._initiate_resumable_upload.assert_not_called() # Write all data and close. writer.write(TEST_MULTIBYTE_TEXT_DATA[2:])