diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 3df37db8220..4e65156f3bc 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -713,6 +713,7 @@ def stage_file_with_retry( gcs_or_local_path, file_name, stream, mime_type, total_size) except Exception as exn: if stream.seekable(): + # reset cursor for possible retrying stream.seek(0) raise exn else: @@ -722,7 +723,8 @@ def stage_file_with_retry( ', but the stream is not seekable.') else: raise retry.PermanentException( - f"Unsupported type {type(stream_or_path)} in stream_or_path") + "Skip retrying because type " + str(type(stream_or_path)) + + "stream_or_path is unsupported.") @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index f2f4fd2d2be..d055065cb9d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1733,11 +1733,30 @@ def seekable(self): "new_name", Unseekable(), total_size=4) - # Unseekable is only staged once. There won't be any retry if it fails + # Unseekable streams are staged once. If staging fails, no retries are + # attempted. self.assertEqual(mock_stage_file.call_count, 1) mock_sleep.assert_not_called() mock_file_open.assert_not_called() + count = 0 + mock_sleep.reset_mock() + mock_file_open.reset_mock() + mock_stage_file.reset_mock() + + # calling with something else + self.assertRaises( + retry.PermanentException, + client.stage_file_with_retry, + "/to", + "new_name", + object(), + total_size=4) + # No staging will be called for wrong arg type + mock_stage_file.assert_not_called() + mock_sleep.assert_not_called() + mock_file_open.assert_not_called() + if __name__ == '__main__': unittest.main()