Skip to content

Commit

Permalink
Merge pull request #14817: [BEAM-8889] Upgrade GCSIO to 2.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
chamikaramj authored Jul 11, 2021
2 parents 403ad51 + b6e2281 commit 2f57eec
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ class BeamModulePlugin implements Plugin<Project> {
def classgraph_version = "4.8.104"
def errorprone_version = "2.3.4"
def google_clients_version = "1.31.0"
def google_cloud_bigdataoss_version = "2.1.6"
def google_cloud_bigdataoss_version = "2.2.2"
def google_cloud_pubsublite_version = "0.13.2"
def google_code_gson_version = "2.8.6"
def google_oauth_clients_version = "1.31.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private static long getProjectNumber(
try {
Project project =
ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(getProject),
getProject::execute,
backoff,
RetryDeterminer.SOCKET_ERRORS,
IOException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.beam.sdk.extensions.gcp.options;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -78,15 +78,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline

/**
* The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
* {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
* restrictions and performance implications of this value.
* {@link AsyncWriteChannelOptions#getUploadChunkSize} for more information on the restrictions
* and performance implications of this value.
*/
@Description(
"The buffer size (in bytes) to use when uploading files to GCS. Please see the "
+ "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more "
+ "documentation for AsyncWriteChannelOptions.getUploadChunkSize for more "
+ "information on the restrictions and performance implications of this value.\n\n"
+ "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
+ "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
+ "com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java")
@Nullable
Integer getGcsUploadBufferSizeBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.beam.sdk.extensions.gcp.storage;

import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -28,8 +28,8 @@ public abstract class GcsCreateOptions extends CreateOptions {

/**
* The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
* {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
* restrictions and performance implications of this value.
* {@link AsyncWriteChannelOptions#getUploadChunkSize} for more information on the restrictions
* and performance implications of this value.
*/
public abstract @Nullable Integer gcsUploadBufferSizeBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.RewriteResponse;
import com.google.api.services.storage.model.StorageObject;
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
Expand Down Expand Up @@ -107,6 +110,7 @@ public GcsUtil create(PipelineOptions options) {
storageBuilder.getHttpRequestInitializer(),
gcsOptions.getExecutorService(),
hasExperiment(options, "use_grpc_for_gcs"),
gcsOptions.getGcpCredential(),
gcsOptions.getGcsUploadBufferSizeBytes());
}

Expand All @@ -116,12 +120,14 @@ public static GcsUtil create(
Storage storageClient,
HttpRequestInitializer httpRequestInitializer,
ExecutorService executorService,
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes) {
return new GcsUtil(
storageClient,
httpRequestInitializer,
executorService,
hasExperiment(options, "use_grpc_for_gcs"),
credentials,
uploadBufferSizeBytes);
}
}
Expand Down Expand Up @@ -159,9 +165,10 @@ public static GcsUtil create(
// Exposed for testing.
final ExecutorService executorService;

private Credentials credentials;

private GoogleCloudStorage googleCloudStorage;
private GoogleCloudStorageOptions googleCloudStorageOptions;
private final boolean shouldUseGrpc;

/** Rewrite operation setting. For testing purposes only. */
@VisibleForTesting @Nullable Long maxBytesRewrittenPerCall;
Expand All @@ -185,20 +192,22 @@ private GcsUtil(
HttpRequestInitializer httpRequestInitializer,
ExecutorService executorService,
Boolean shouldUseGrpc,
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes) {
this.storageClient = storageClient;
this.httpRequestInitializer = httpRequestInitializer;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
this.executorService = executorService;
this.credentials = credentials;
this.maxBytesRewrittenPerCall = null;
this.numRewriteTokensUsed = null;
this.shouldUseGrpc = shouldUseGrpc;
googleCloudStorageOptions =
GoogleCloudStorageOptions.newBuilder()
GoogleCloudStorageOptions.builder()
.setAppName("Beam")
.setGrpcEnabled(shouldUseGrpc)
.build();
googleCloudStorage = new GoogleCloudStorageImpl(googleCloudStorageOptions, storageClient);
googleCloudStorage =
new GoogleCloudStorageImpl(googleCloudStorageOptions, storageClient, credentials);
}

// Use this only for testing purposes.
Expand Down Expand Up @@ -288,11 +297,7 @@ StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throw
storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
try {
return ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(getObject),
backoff,
RetryDeterminer.SOCKET_ERRORS,
IOException.class,
sleeper);
getObject::execute, backoff, RetryDeterminer.SOCKET_ERRORS, IOException.class, sleeper);
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -344,10 +349,7 @@ public Objects listObjects(

try {
return ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(listObject),
createBackOff(),
RetryDeterminer.SOCKET_ERRORS,
IOException.class);
listObject::execute, createBackOff(), RetryDeterminer.SOCKET_ERRORS, IOException.class);
} catch (Exception e) {
throw new IOException(
String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), e);
Expand Down Expand Up @@ -400,6 +402,22 @@ public SeekableByteChannel open(GcsPath path) throws IOException {
return googleCloudStorage.open(new StorageResourceId(path.getBucket(), path.getObject()));
}

/**
* Opens an object in GCS.
*
* <p>Returns a SeekableByteChannel that provides access to data in the bucket.
*
* @param path the GCS filename to read from
* @param readOptions Fine-grained options for behaviors of retries, buffering, etc.
* @return a SeekableByteChannel that can read the object data
*/
@VisibleForTesting
SeekableByteChannel open(GcsPath path, GoogleCloudStorageReadOptions readOptions)
throws IOException {
return googleCloudStorage.open(
new StorageResourceId(path.getBucket(), path.getObject()), readOptions);
}

/**
* Creates an object in GCS.
*
Expand All @@ -419,30 +437,19 @@ public WritableByteChannel create(GcsPath path, String type) throws IOException
*/
public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes)
throws IOException {
// When AsyncWriteChannelOptions has toBuilder() method, the following can be changed to:
// AsyncWriteChannelOptions newOptions =
// wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build();
AsyncWriteChannelOptions wcOptions = googleCloudStorageOptions.getWriteChannelOptions();
int uploadChunkSize =
(uploadBufferSizeBytes == null) ? wcOptions.getUploadChunkSize() : uploadBufferSizeBytes;
AsyncWriteChannelOptions newOptions =
AsyncWriteChannelOptions.builder()
.setBufferSize(wcOptions.getBufferSize())
.setPipeBufferSize(wcOptions.getPipeBufferSize())
.setUploadChunkSize(uploadChunkSize)
.setDirectUploadEnabled(wcOptions.isDirectUploadEnabled())
.build();
wcOptions.toBuilder().setUploadChunkSize(uploadChunkSize).build();
GoogleCloudStorageOptions newGoogleCloudStorageOptions =
googleCloudStorageOptions
.toBuilder()
.setWriteChannelOptions(newOptions)
.setGrpcEnabled(this.shouldUseGrpc)
.build();
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(newOptions).build();
GoogleCloudStorage gcpStorage =
new GoogleCloudStorageImpl(newGoogleCloudStorageOptions, this.storageClient);
new GoogleCloudStorageImpl(
newGoogleCloudStorageOptions, this.storageClient, this.credentials);
return gcpStorage.create(
new StorageResourceId(path.getBucket(), path.getObject()),
new CreateObjectOptions(true, type, CreateObjectOptions.EMPTY_METADATA));
CreateObjectOptions.builder().setOverwriteExisting(true).setContentType(type).build());
}

/** Returns whether the GCS bucket exists and is accessible. */
Expand Down Expand Up @@ -487,7 +494,7 @@ Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOExcept

try {
return ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(getBucket),
getBucket::execute,
backoff,
new RetryDeterminer<IOException>() {
@Override
Expand Down Expand Up @@ -526,7 +533,7 @@ void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper slee

try {
ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(insertBucket),
insertBucket::execute,
backoff,
new RetryDeterminer<IOException>() {
@Override
Expand Down Expand Up @@ -763,7 +770,7 @@ public void onSuccess(StorageObject response, HttpHeaders httpHeaders)
@Override
public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException {
IOException ioException;
if (errorExtractor.itemNotFound(e)) {
if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
ioException = new FileNotFoundException(path.toString());
} else {
ioException = new IOException(String.format("Error trying to get %s: %s", path, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.cloud.hadoop.util.ClientRequestHelper;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -768,11 +766,12 @@ public void testGetBucketNotExists() throws IOException {

@Test
public void testGCSChannelCloseIdempotent() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build();
SeekableByteChannel channel =
new GoogleCloudStorageReadChannel(
null, "dummybucket", "dummyobject", null, new ClientRequestHelper<>(), readOptions);
gcsUtil.open(GcsPath.fromComponents("testbucket", "testobject"), readOptions);
channel.close();
channel.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ public void testCreateTableSucceeds() throws IOException {
bigquery, null, PipelineOptionsFactory.create());
Table ret =
services.tryCreateTable(
testTable, new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT);
testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 0), Sleeper.DEFAULT);
assertEquals(testTable, ret);
verifyAllResponsesAreRead();
}
Expand Down Expand Up @@ -1198,7 +1198,7 @@ public void testCreateTableDoesNotRetry() throws IOException {
bigquery, null, PipelineOptionsFactory.create());
try {
services.tryCreateTable(
testTable, new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT);
testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 3), Sleeper.DEFAULT);
fail();
} catch (IOException e) {
verify(responses[0], atLeastOnce()).getStatusCode();
Expand Down Expand Up @@ -1235,7 +1235,7 @@ public void testCreateTableSucceedsAlreadyExists() throws IOException {
bigquery, null, PipelineOptionsFactory.create());
Table ret =
services.tryCreateTable(
testTable, new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT);
testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 0), Sleeper.DEFAULT);

assertNull(ret);
verifyAllResponsesAreRead();
Expand Down Expand Up @@ -1267,7 +1267,7 @@ public void testCreateTableRetry() throws IOException {
bigquery, null, PipelineOptionsFactory.create());
Table ret =
services.tryCreateTable(
testTable, new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT);
testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 3), Sleeper.DEFAULT);
assertEquals(testTable, ret);
verifyAllResponsesAreRead();

Expand Down

0 comments on commit 2f57eec

Please sign in to comment.