diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java index db4f72dd7e..9d2c97f71e 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java @@ -497,6 +497,11 @@ public class GoogleHadoopFileSystemConfiguration { new HadoopConfigurationProperty<>( "fs.gs.client.type", GoogleCloudStorageFileSystemOptions.DEFAULT.getClientType()); + /** Configuration key to configure client to use for GCS access. */ + public static final HadoopConfigurationProperty GCS_GRPC_WRITE_ENABLE = + new HadoopConfigurationProperty<>( + "fs.gs.grpc.write.enable", GoogleCloudStorageOptions.DEFAULT.isGrpcWriteEnabled()); + /** * Configuration key to configure the properties to optimize gcs-write. This config will be * effective only if fs.gs.client.type is set to STORAGE_CLIENT. @@ -581,6 +586,7 @@ static GoogleCloudStorageOptions.Builder getGcsOptionsBuilder(Configuration conf String projectId = GCS_PROJECT_ID.get(config, config::get); return GoogleCloudStorageOptions.builder() .setAppName(getApplicationName(config)) + .setGrpcWriteEnabled(GCS_GRPC_WRITE_ENABLE.get(config, config::getBoolean)) .setAutoRepairImplicitDirectoriesEnabled( GCS_REPAIR_IMPLICIT_DIRECTORIES_ENABLE.get(config, config::getBoolean)) .setBatchThreads(GCS_BATCH_THREADS.get(config, config::getInt)) diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java index 0cc910f967..9012b5715f 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java @@ -81,6 +81,7 @@ public class GoogleHadoopFileSystemConfigurationTest { put("fs.gs.grpc.trafficdirector.enable", true); put("fs.gs.grpc.write.buffered.requests", 20); put("fs.gs.grpc.write.message.timeout", 3_000L); + put("fs.gs.grpc.write.enable", false); put("fs.gs.hierarchical.namespace.folders.enable", false); put("fs.gs.grpc.write.timeout", 600_000L); put("fs.gs.http.connect-timeout", 5_000L); diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java index cffdb20f95..d86622c479 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java @@ -184,6 +184,10 @@ private static String encodeMetadataValues(byte[] bytes) { @Override public WritableByteChannel create(StorageResourceId resourceId, CreateObjectOptions options) throws IOException { + if (!storageOptions.isGrpcWriteEnabled()) { + return super.create(resourceId, options); + } + logger.atFiner().log("create(%s)", resourceId); checkArgument( resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId); diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageOptions.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageOptions.java index 5311877c8e..9a7a30bcf0 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageOptions.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageOptions.java @@ -73,7 +73,8 @@ public static Builder builder() { .setOperationTraceLogEnabled(false) .setTrafficDirectorEnabled(true) .setWriteChannelOptions(AsyncWriteChannelOptions.DEFAULT) - .setHnBucketRenameEnabled(false); + .setHnBucketRenameEnabled(false) + .setGrpcWriteEnabled(false); } public abstract Builder toBuilder(); @@ -92,6 +93,8 @@ public static Builder builder() { public abstract String getStorageServicePath(); + public abstract boolean isGrpcWriteEnabled(); + @Nullable public abstract String getProjectId(); @@ -239,6 +242,8 @@ public abstract Builder setGrpcMessageTimeoutCheckInterval( public abstract Builder setHnBucketRenameEnabled(boolean enabled); + public abstract Builder setGrpcWriteEnabled(boolean grpcWriteEnabled); + abstract GoogleCloudStorageOptions autoBuild(); public GoogleCloudStorageOptions build() { diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java index b35c409193..f104e06f0f 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.OBJECT_FIELDS; import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.getBucketRequestString; +import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.getRequestString; import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.rewriteRequestString; import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.uploadRequestString; import static com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper.assertObjectContent; @@ -149,6 +150,58 @@ public void open_lazyInit_whenFastFailOnNotFound_isFalse() throws IOException { trackingGcs.delegate.close(); } + /** + * Even when java-storage client in use, write path get short-circuited via {@link + * GoogleCloudStorageOptions} to use the http implementation. + */ + @Test + public void writeObject_withGrpcWriteDisabled() throws IOException { + StorageResourceId resourceId = new StorageResourceId(testBucket, name.getMethodName()); + + int uploadChunkSize = 2 * 1024 * 1024; + TrackingStorageWrapper trackingGcs = + newTrackingGoogleCloudStorage( + getOptionsWithUploadChunk(uploadChunkSize).toBuilder() + .setGrpcWriteEnabled(false) + .build()); + + int partitionsCount = 1; + byte[] partition = + writeObject( + trackingGcs.delegate, + resourceId, + /* partitionSize= */ uploadChunkSize, + partitionsCount); + + assertObjectContent(helperGcs, resourceId, partition, partitionsCount); + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + + assertThat(trackingGcs.getAllRequestStrings()) + .containsExactlyElementsIn( + ImmutableList.builder() + .add(getRequestString(resourceId.getBucketName(), resourceId.getObjectName())) + .add( + TrackingHttpRequestInitializer.resumableUploadRequestString( + resourceId.getBucketName(), + resourceId.getObjectName(), + /* generationId= */ 1, + /* replaceGenerationId= */ true)) + .addAll( + ImmutableList.of( + TrackingHttpRequestInitializer.resumableUploadChunkRequestString( + resourceId.getBucketName(), + resourceId.getObjectName(), + /* generationId= */ 2, + /* uploadId= */ 1))) + .build() + .toArray()) + .inOrder(); + + assertThat(trackingGcs.grpcRequestInterceptor.getAllRequestStrings().size()).isEqualTo(0); + trackingGcs.delegate.close(); + } + @Test public void open_withItemInfo() throws IOException { int expectedSize = 5 * 1024 * 1024; diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageTestHelper.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageTestHelper.java index c06ada81af..e5296a05ed 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageTestHelper.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageTestHelper.java @@ -112,6 +112,7 @@ public static GoogleCloudStorageOptions.Builder getStandardOptionBuilder() { return GoogleCloudStorageOptions.builder() .setAppName(GoogleCloudStorageTestHelper.APP_NAME) .setDirectPathPreferred(TestConfiguration.getInstance().isDirectPathPreferred()) + .setGrpcWriteEnabled(true) .setProjectId(checkNotNull(TestConfiguration.getInstance().getProjectId())); }