diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 95c7ca204ca6..9b60019c801e 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -434,7 +434,7 @@ class BeamModulePlugin implements Plugin { def classgraph_version = "4.8.104" def errorprone_version = "2.3.4" def google_clients_version = "1.31.0" - def google_cloud_bigdataoss_version = "2.1.6" + def google_cloud_bigdataoss_version = "2.2.2" def google_cloud_pubsublite_version = "0.13.2" def google_code_gson_version = "2.8.6" def google_oauth_clients_version = "1.31.0" diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 82fc3ae1e5df..f593e2f5c9d8 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -406,7 +406,7 @@ private static long getProjectNumber( try { Project project = ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getProject), + getProject::execute, backoff, RetryDeterminer.SOCKET_ERRORS, IOException.class, diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 9d63d2f8eb9f..d183d1647e11 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.gcp.options; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -78,15 +78,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline /** * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for - * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the - * restrictions and performance implications of this value. + * {@link AsyncWriteChannelOptions#getUploadChunkSize} for more information on the restrictions + * and performance implications of this value. */ @Description( "The buffer size (in bytes) to use when uploading files to GCS. Please see the " - + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more " + + "documentation for AsyncWriteChannelOptions.getUploadChunkSize for more " + "information on the restrictions and performance implications of this value.\n\n" + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" - + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") + + "com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java") @Nullable Integer getGcsUploadBufferSizeBytes(); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java index c3a64629516c..1fdb871605c8 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.gcp.storage; import com.google.auto.value.AutoValue; -import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import org.apache.beam.sdk.io.fs.CreateOptions; import org.checkerframework.checker.nullness.qual.Nullable; @@ -28,8 +28,8 @@ public abstract class GcsCreateOptions extends CreateOptions { /** * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for - * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the - * restrictions and performance implications of this value. + * {@link AsyncWriteChannelOptions#getUploadChunkSize} for more information on the restrictions + * and performance implications of this value. */ public abstract @Nullable Integer gcsUploadBufferSizeBytes(); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 103c23414954..97974f95ca00 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -28,6 +28,7 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpStatusCodes; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; @@ -35,11 +36,13 @@ import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.RewriteResponse; import com.google.api.services.storage.model.StorageObject; +import com.google.auth.Credentials; import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.gcsio.CreateObjectOptions; import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; import com.google.cloud.hadoop.gcsio.StorageResourceId; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; @@ -107,6 +110,7 @@ public GcsUtil create(PipelineOptions options) { storageBuilder.getHttpRequestInitializer(), gcsOptions.getExecutorService(), hasExperiment(options, "use_grpc_for_gcs"), + gcsOptions.getGcpCredential(), gcsOptions.getGcsUploadBufferSizeBytes()); } @@ -116,12 +120,14 @@ public static GcsUtil create( Storage storageClient, HttpRequestInitializer httpRequestInitializer, ExecutorService executorService, + Credentials credentials, @Nullable Integer uploadBufferSizeBytes) { return new GcsUtil( storageClient, httpRequestInitializer, executorService, hasExperiment(options, "use_grpc_for_gcs"), + credentials, uploadBufferSizeBytes); } } @@ -159,9 +165,10 @@ public static GcsUtil create( // Exposed for testing. final ExecutorService executorService; + private Credentials credentials; + private GoogleCloudStorage googleCloudStorage; private GoogleCloudStorageOptions googleCloudStorageOptions; - private final boolean shouldUseGrpc; /** Rewrite operation setting. For testing purposes only. */ @VisibleForTesting @Nullable Long maxBytesRewrittenPerCall; @@ -185,20 +192,22 @@ private GcsUtil( HttpRequestInitializer httpRequestInitializer, ExecutorService executorService, Boolean shouldUseGrpc, + Credentials credentials, @Nullable Integer uploadBufferSizeBytes) { this.storageClient = storageClient; this.httpRequestInitializer = httpRequestInitializer; this.uploadBufferSizeBytes = uploadBufferSizeBytes; this.executorService = executorService; + this.credentials = credentials; this.maxBytesRewrittenPerCall = null; this.numRewriteTokensUsed = null; - this.shouldUseGrpc = shouldUseGrpc; googleCloudStorageOptions = - GoogleCloudStorageOptions.newBuilder() + GoogleCloudStorageOptions.builder() .setAppName("Beam") .setGrpcEnabled(shouldUseGrpc) .build(); - googleCloudStorage = new GoogleCloudStorageImpl(googleCloudStorageOptions, storageClient); + googleCloudStorage = + new GoogleCloudStorageImpl(googleCloudStorageOptions, storageClient, credentials); } // Use this only for testing purposes. @@ -288,11 +297,7 @@ StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throw storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); try { return ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getObject), - backoff, - RetryDeterminer.SOCKET_ERRORS, - IOException.class, - sleeper); + getObject::execute, backoff, RetryDeterminer.SOCKET_ERRORS, IOException.class, sleeper); } catch (IOException | InterruptedException e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -344,10 +349,7 @@ public Objects listObjects( try { return ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(listObject), - createBackOff(), - RetryDeterminer.SOCKET_ERRORS, - IOException.class); + listObject::execute, createBackOff(), RetryDeterminer.SOCKET_ERRORS, IOException.class); } catch (Exception e) { throw new IOException( String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), e); @@ -400,6 +402,22 @@ public SeekableByteChannel open(GcsPath path) throws IOException { return googleCloudStorage.open(new StorageResourceId(path.getBucket(), path.getObject())); } + /** + * Opens an object in GCS. + * + *

Returns a SeekableByteChannel that provides access to data in the bucket. + * + * @param path the GCS filename to read from + * @param readOptions Fine-grained options for behaviors of retries, buffering, etc. + * @return a SeekableByteChannel that can read the object data + */ + @VisibleForTesting + SeekableByteChannel open(GcsPath path, GoogleCloudStorageReadOptions readOptions) + throws IOException { + return googleCloudStorage.open( + new StorageResourceId(path.getBucket(), path.getObject()), readOptions); + } + /** * Creates an object in GCS. * @@ -419,30 +437,19 @@ public WritableByteChannel create(GcsPath path, String type) throws IOException */ public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes) throws IOException { - // When AsyncWriteChannelOptions has toBuilder() method, the following can be changed to: - // AsyncWriteChannelOptions newOptions = - // wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build(); AsyncWriteChannelOptions wcOptions = googleCloudStorageOptions.getWriteChannelOptions(); int uploadChunkSize = (uploadBufferSizeBytes == null) ? wcOptions.getUploadChunkSize() : uploadBufferSizeBytes; AsyncWriteChannelOptions newOptions = - AsyncWriteChannelOptions.builder() - .setBufferSize(wcOptions.getBufferSize()) - .setPipeBufferSize(wcOptions.getPipeBufferSize()) - .setUploadChunkSize(uploadChunkSize) - .setDirectUploadEnabled(wcOptions.isDirectUploadEnabled()) - .build(); + wcOptions.toBuilder().setUploadChunkSize(uploadChunkSize).build(); GoogleCloudStorageOptions newGoogleCloudStorageOptions = - googleCloudStorageOptions - .toBuilder() - .setWriteChannelOptions(newOptions) - .setGrpcEnabled(this.shouldUseGrpc) - .build(); + googleCloudStorageOptions.toBuilder().setWriteChannelOptions(newOptions).build(); GoogleCloudStorage gcpStorage = - new GoogleCloudStorageImpl(newGoogleCloudStorageOptions, this.storageClient); + new GoogleCloudStorageImpl( + newGoogleCloudStorageOptions, this.storageClient, this.credentials); return gcpStorage.create( new StorageResourceId(path.getBucket(), path.getObject()), - new CreateObjectOptions(true, type, CreateObjectOptions.EMPTY_METADATA)); + CreateObjectOptions.builder().setOverwriteExisting(true).setContentType(type).build()); } /** Returns whether the GCS bucket exists and is accessible. */ @@ -487,7 +494,7 @@ Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOExcept try { return ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getBucket), + getBucket::execute, backoff, new RetryDeterminer() { @Override @@ -526,7 +533,7 @@ void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper slee try { ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(insertBucket), + insertBucket::execute, backoff, new RetryDeterminer() { @Override @@ -763,7 +770,7 @@ public void onSuccess(StorageObject response, HttpHeaders httpHeaders) @Override public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { IOException ioException; - if (errorExtractor.itemNotFound(e)) { + if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { ioException = new FileNotFoundException(path.toString()); } else { ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index ae027e9b6aeb..0923ca37ae29 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -52,9 +52,7 @@ import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; -import com.google.cloud.hadoop.util.ClientRequestHelper; import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -768,11 +766,12 @@ public void testGetBucketNotExists() throws IOException { @Test public void testGCSChannelCloseIdempotent() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); GoogleCloudStorageReadOptions readOptions = GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build(); SeekableByteChannel channel = - new GoogleCloudStorageReadChannel( - null, "dummybucket", "dummyobject", null, new ClientRequestHelper<>(), readOptions); + gcsUtil.open(GcsPath.fromComponents("testbucket", "testobject"), readOptions); channel.close(); channel.close(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 52543dbaa019..ae788d76ccc9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -1164,7 +1164,7 @@ public void testCreateTableSucceeds() throws IOException { bigquery, null, PipelineOptionsFactory.create()); Table ret = services.tryCreateTable( - testTable, new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT); + testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 0), Sleeper.DEFAULT); assertEquals(testTable, ret); verifyAllResponsesAreRead(); } @@ -1198,7 +1198,7 @@ public void testCreateTableDoesNotRetry() throws IOException { bigquery, null, PipelineOptionsFactory.create()); try { services.tryCreateTable( - testTable, new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT); + testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 3), Sleeper.DEFAULT); fail(); } catch (IOException e) { verify(responses[0], atLeastOnce()).getStatusCode(); @@ -1235,7 +1235,7 @@ public void testCreateTableSucceedsAlreadyExists() throws IOException { bigquery, null, PipelineOptionsFactory.create()); Table ret = services.tryCreateTable( - testTable, new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT); + testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 0), Sleeper.DEFAULT); assertNull(ret); verifyAllResponsesAreRead(); @@ -1267,7 +1267,7 @@ public void testCreateTableRetry() throws IOException { bigquery, null, PipelineOptionsFactory.create()); Table ret = services.tryCreateTable( - testTable, new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT); + testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 3), Sleeper.DEFAULT); assertEquals(testTable, ret); verifyAllResponsesAreRead();