From 0ffdd54e897f8c9da98aedc68e063417efad119b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 21 Dec 2024 00:03:52 -0500 Subject: [PATCH 1/6] Add stage_file_with_retry and replace stage_file with it in apiclient. --- .../runners/dataflow/internal/apiclient.py | 35 ++++++++-- .../dataflow/internal/apiclient_test.py | 66 ++++++++++++++++++- 2 files changed, 91 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 97996bd6cbb2..068cef986f3b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -557,13 +557,11 @@ def _cached_gcs_file_copy(self, from_path, to_path, sha256): source_file_names=[cached_path], destination_file_names=[to_path]) _LOGGER.info('Copied cached artifact from %s to %s', from_path, to_path) - @retry.with_exponential_backoff( - retry_filter=retry.retry_on_server_errors_and_timeout_filter) def _uncached_gcs_file_copy(self, from_path, to_path): to_folder, to_name = os.path.split(to_path) total_size = os.path.getsize(from_path) - with open(from_path, 'rb') as f: - self.stage_file(to_folder, to_name, f, total_size=total_size) + self.stage_file_with_retry( + to_folder, to_name, from_path, total_size=total_size) def _stage_resources(self, pipeline, options): google_cloud_options = options.view_as(GoogleCloudOptions) @@ -692,6 +690,29 @@ def stage_file( (gcs_or_local_path, e)) raise + @retry.with_exponential_backoff( + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def stage_file_with_retry( + self, + gcs_or_local_path, + file_name, + stream_or_path, + mime_type='application/octet-stream', + total_size=None): + + if isinstance(stream_or_path, str): + path = stream_or_path + with open(path, 'rb') as stream: + self.stage_file( + gcs_or_local_path, file_name, stream, mime_type, total_size) + elif isinstance(stream_or_path, io.BufferedIOBase): + stream = stream_or_path + assert stream.seekable(), "stream must be seekable" + if stream.tell() > 0: + stream.seek(0) + self.stage_file( + gcs_or_local_path, file_name, stream, mime_type, total_size) + @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): """Creates job description. May stage and/or submit for remote execution.""" @@ -703,7 +724,7 @@ def create_job(self, job): job.options.view_as(GoogleCloudOptions).template_location) if job.options.view_as(DebugOptions).lookup_experiment('upload_graph'): - self.stage_file( + self.stage_file_with_retry( job.options.view_as(GoogleCloudOptions).staging_location, "dataflow_graph.json", io.BytesIO(job.json().encode('utf-8'))) @@ -718,7 +739,7 @@ def create_job(self, job): if job_location: gcs_or_local_path = os.path.dirname(job_location) file_name = os.path.basename(job_location) - self.stage_file( + self.stage_file_with_retry( gcs_or_local_path, file_name, io.BytesIO(job.json().encode('utf-8'))) if not template_location: @@ -790,7 +811,7 @@ def create_job_description(self, job): resources = self._stage_resources(job.proto_pipeline, job.options) # Stage proto pipeline. - self.stage_file( + self.stage_file_with_retry( job.google_cloud_options.staging_location, shared_names.STAGED_PIPELINE_FILENAME, io.BytesIO(job.proto_pipeline.SerializeToString())) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 6587e619a500..4b32dc567b06 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -19,11 +19,13 @@ # pytype: skip-file +import io import itertools import json import logging import os import sys +import time import unittest import mock @@ -1064,7 +1066,11 @@ def test_graph_is_uploaded(self): side_effect=None): client.create_job(job) client.stage_file.assert_called_once_with( - mock.ANY, "dataflow_graph.json", mock.ANY) + mock.ANY, + "dataflow_graph.json", + mock.ANY, + 'application/octet-stream', + None) client.create_job_description.assert_called_once() def test_create_job_returns_existing_job(self): @@ -1174,8 +1180,18 @@ def test_template_file_generation_with_upload_graph(self): client.create_job(job) client.stage_file.assert_has_calls([ - mock.call(mock.ANY, 'dataflow_graph.json', mock.ANY), - mock.call(mock.ANY, 'template', mock.ANY) + mock.call( + mock.ANY, + 'dataflow_graph.json', + mock.ANY, + 'application/octet-stream', + None), + mock.call( + mock.ANY, + 'template', + mock.ANY, + 'application/octet-stream', + None) ]) client.create_job_description.assert_called_once() # template is generated, but job should not be submitted to the @@ -1653,6 +1669,50 @@ def exists_return_value(*args): })) self.assertEqual(pipeline, pipeline_expected) + def test_stage_file_with_retry(self): + count = 0 + + def effect(self, *args, **kwargs): + nonlocal count + count += 1 + if count > 1: + return + raise Exception("This exception is raised for testing purpose.") + + pipeline_options = PipelineOptions([ + '--project', + 'test_project', + '--job_name', + 'test_job_name', + '--temp_location', + 'gs://test-location/temp', + ]) + pipeline_options.view_as(GoogleCloudOptions).no_auth = True + client = apiclient.DataflowApplicationClient(pipeline_options) + + with mock.patch.object(time, 'sleep'): + count = 0 + with mock.patch("builtins.open", + mock.mock_open(read_data="data")) as mock_file_open: + with mock.patch.object(client, 'stage_file') as mock_stage_file: + mock_stage_file.side_effect = effect + # call with a file name + client.stage_file_with_retry( + "/to", "new_name", "/from/old_name", total_size=1024) + self.assertEqual(mock_file_open.call_count, 2) + self.assertEqual(mock_stage_file.call_count, 2) + + count = 0 + with mock.patch("builtins.open", + mock.mock_open(read_data="data")) as mock_file_open: + with mock.patch.object(client, 'stage_file') as mock_stage_file: + mock_stage_file.side_effect = effect + # call with a seekable stream + client.stage_file_with_retry( + "/to", "new_name", io.BytesIO(b'test'), total_size=4) + mock_file_open.assert_not_called() + self.assertEqual(mock_stage_file.call_count, 2) + if __name__ == '__main__': unittest.main() From e891af866888e42a634fb8857338f7266ae56975 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 21 Dec 2024 00:31:29 -0500 Subject: [PATCH 2/6] Minor change on assertion. --- sdks/python/apache_beam/runners/dataflow/internal/apiclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 068cef986f3b..bc0b6fe14e08 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -707,8 +707,8 @@ def stage_file_with_retry( gcs_or_local_path, file_name, stream, mime_type, total_size) elif isinstance(stream_or_path, io.BufferedIOBase): stream = stream_or_path - assert stream.seekable(), "stream must be seekable" if stream.tell() > 0: + assert stream.seekable(), "stream must be seekable" stream.seek(0) self.stage_file( gcs_or_local_path, file_name, stream, mime_type, total_size) From da834fdee214f60b839d425b21cfb562bf1ad759 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 21 Dec 2024 10:02:08 -0500 Subject: [PATCH 3/6] Improve the behavior of an unseekable stream input. --- .../runners/dataflow/internal/apiclient.py | 17 +++-- .../dataflow/internal/apiclient_test.py | 71 +++++++++++++------ 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index bc0b6fe14e08..f9b84e16906d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -42,6 +42,7 @@ import re import sys import time +import traceback import warnings from copy import copy from datetime import datetime @@ -707,11 +708,17 @@ def stage_file_with_retry( gcs_or_local_path, file_name, stream, mime_type, total_size) elif isinstance(stream_or_path, io.BufferedIOBase): stream = stream_or_path - if stream.tell() > 0: - assert stream.seekable(), "stream must be seekable" - stream.seek(0) - self.stage_file( - gcs_or_local_path, file_name, stream, mime_type, total_size) + try: + self.stage_file( + gcs_or_local_path, file_name, stream, mime_type, total_size) + except Exception as exn: + if stream.seekable(): + stream.seek(0) + raise exn + else: + raise retry.PermanentException( + "Failed to tell or seek in stream because we caught exception:", + ''.join(traceback.format_exception_only(exn.__class__, exn))) @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 4b32dc567b06..9d3804b0ce32 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -44,6 +44,7 @@ from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo from apache_beam.transforms.environments import DockerEnvironment +from apache_beam.utils import retry # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -1670,14 +1671,16 @@ def exists_return_value(*args): self.assertEqual(pipeline, pipeline_expected) def test_stage_file_with_retry(self): - count = 0 - def effect(self, *args, **kwargs): nonlocal count count += 1 - if count > 1: - return - raise Exception("This exception is raised for testing purpose.") + # Fail the first two calls and succeed afterward + if count <= 2: + raise Exception("This exception is raised for testing purpose.") + + class Unseekable(io.BufferedIOBase): + def seekable(self): + return False pipeline_options = PipelineOptions([ '--project', @@ -1690,28 +1693,50 @@ def effect(self, *args, **kwargs): pipeline_options.view_as(GoogleCloudOptions).no_auth = True client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(time, 'sleep'): - count = 0 - with mock.patch("builtins.open", - mock.mock_open(read_data="data")) as mock_file_open: - with mock.patch.object(client, 'stage_file') as mock_stage_file: - mock_stage_file.side_effect = effect - # call with a file name + with mock.patch.object(client, 'stage_file') as mock_stage_file: + mock_stage_file.side_effect = effect + + with mock.patch.object(time, 'sleep') as mock_sleep: + with mock.patch("builtins.open", + mock.mock_open(read_data="data")) as mock_file_open: + count = 0 + # calling with a file name client.stage_file_with_retry( - "/to", "new_name", "/from/old_name", total_size=1024) - self.assertEqual(mock_file_open.call_count, 2) - self.assertEqual(mock_stage_file.call_count, 2) - - count = 0 - with mock.patch("builtins.open", - mock.mock_open(read_data="data")) as mock_file_open: - with mock.patch.object(client, 'stage_file') as mock_stage_file: - mock_stage_file.side_effect = effect - # call with a seekable stream + "/to", "new_name", "/from/old_name", total_size=4) + self.assertEqual(mock_stage_file.call_count, 3) + self.assertEqual(mock_sleep.call_count, 2) + self.assertEqual(mock_file_open.call_count, 3) + + count = 0 + mock_stage_file.reset_mock() + mock_sleep.reset_mock() + mock_file_open.reset_mock() + + # calling with a seekable stream client.stage_file_with_retry( "/to", "new_name", io.BytesIO(b'test'), total_size=4) + self.assertEqual(mock_stage_file.call_count, 3) + self.assertEqual(mock_sleep.call_count, 2) + # no open() is called if a stream is provided + mock_file_open.assert_not_called() + + count = 0 + mock_sleep.reset_mock() + mock_file_open.reset_mock() + mock_stage_file.reset_mock() + + # calling with an unseekable stream + self.assertRaises( + retry.PermanentException, + client.stage_file_with_retry, + "/to", + "new_name", + Unseekable(), + total_size=4) + # Unseekable is only staged once. There won't be any retry if it fails + self.assertEqual(mock_stage_file.call_count, 1) + mock_sleep.assert_not_called() mock_file_open.assert_not_called() - self.assertEqual(mock_stage_file.call_count, 2) if __name__ == '__main__': From 6d105e57110d7d933334eb9c215b9d0261dc531f Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 21 Dec 2024 10:08:12 -0500 Subject: [PATCH 4/6] Minor changes on exception message. --- .../apache_beam/runners/dataflow/internal/apiclient.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index f9b84e16906d..530b2e8efd78 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -717,8 +717,9 @@ def stage_file_with_retry( raise exn else: raise retry.PermanentException( - "Failed to tell or seek in stream because we caught exception:", - ''.join(traceback.format_exception_only(exn.__class__, exn))) + "Skip retrying because we caught exception:", + ''.join(traceback.format_exception_only(exn.__class__, exn)), + ', but the stream is not seekable') @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): From bbaf64018dfc4a6daa585ab491386730f4fa4672 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 21 Dec 2024 10:24:09 -0500 Subject: [PATCH 5/6] Add some error handling on unsupported arg type. --- .../runners/dataflow/internal/apiclient.py | 11 +++++++---- .../runners/dataflow/internal/apiclient_test.py | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 530b2e8efd78..3df37db8220f 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -706,7 +706,7 @@ def stage_file_with_retry( with open(path, 'rb') as stream: self.stage_file( gcs_or_local_path, file_name, stream, mime_type, total_size) - elif isinstance(stream_or_path, io.BufferedIOBase): + elif isinstance(stream_or_path, io.IOBase): stream = stream_or_path try: self.stage_file( @@ -717,9 +717,12 @@ def stage_file_with_retry( raise exn else: raise retry.PermanentException( - "Skip retrying because we caught exception:", - ''.join(traceback.format_exception_only(exn.__class__, exn)), - ', but the stream is not seekable') + "Skip retrying because we caught exception:" + + ''.join(traceback.format_exception_only(exn.__class__, exn)) + + ', but the stream is not seekable.') + else: + raise retry.PermanentException( + f"Unsupported type {type(stream_or_path)} in stream_or_path") @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 9d3804b0ce32..f2f4fd2d2be9 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1678,7 +1678,7 @@ def effect(self, *args, **kwargs): if count <= 2: raise Exception("This exception is raised for testing purpose.") - class Unseekable(io.BufferedIOBase): + class Unseekable(io.IOBase): def seekable(self): return False From 03eef0e85db518281fd1c87e39edbe86a9aaaece Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 21 Dec 2024 15:37:39 -0500 Subject: [PATCH 6/6] Add one more test case and minor revision on comments. --- .../runners/dataflow/internal/apiclient.py | 4 +++- .../dataflow/internal/apiclient_test.py | 21 ++++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 3df37db8220f..4e65156f3bc7 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -713,6 +713,7 @@ def stage_file_with_retry( gcs_or_local_path, file_name, stream, mime_type, total_size) except Exception as exn: if stream.seekable(): + # reset cursor for possible retrying stream.seek(0) raise exn else: @@ -722,7 +723,8 @@ def stage_file_with_retry( ', but the stream is not seekable.') else: raise retry.PermanentException( - f"Unsupported type {type(stream_or_path)} in stream_or_path") + "Skip retrying because type " + str(type(stream_or_path)) + + "stream_or_path is unsupported.") @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index f2f4fd2d2be9..d055065cb9d9 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1733,11 +1733,30 @@ def seekable(self): "new_name", Unseekable(), total_size=4) - # Unseekable is only staged once. There won't be any retry if it fails + # Unseekable streams are staged once. If staging fails, no retries are + # attempted. self.assertEqual(mock_stage_file.call_count, 1) mock_sleep.assert_not_called() mock_file_open.assert_not_called() + count = 0 + mock_sleep.reset_mock() + mock_file_open.reset_mock() + mock_stage_file.reset_mock() + + # calling with something else + self.assertRaises( + retry.PermanentException, + client.stage_file_with_retry, + "/to", + "new_name", + object(), + total_size=4) + # No staging will be called for wrong arg type + mock_stage_file.assert_not_called() + mock_sleep.assert_not_called() + mock_file_open.assert_not_called() + if __name__ == '__main__': unittest.main()