Skip to content

Commit

Permalink
control grpc write path via flag (#1188) (#1220) (#1223)
Browse files Browse the repository at this point in the history
  • Loading branch information
singhravidutt authored Jul 30, 2024
1 parent d23a77d commit ac23990
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ public class GoogleHadoopFileSystemConfiguration {
new HadoopConfigurationProperty<>(
"fs.gs.client.type", GoogleCloudStorageFileSystemOptions.DEFAULT.getClientType());

/** Configuration key to configure client to use for GCS access. */
public static final HadoopConfigurationProperty<Boolean> GCS_GRPC_WRITE_ENABLE =
new HadoopConfigurationProperty<>(
"fs.gs.grpc.write.enable", GoogleCloudStorageOptions.DEFAULT.isGrpcWriteEnabled());

/**
* Configuration key to configure the properties to optimize gcs-write. This config will be
* effective only if fs.gs.client.type is set to STORAGE_CLIENT.
Expand Down Expand Up @@ -581,6 +586,7 @@ static GoogleCloudStorageOptions.Builder getGcsOptionsBuilder(Configuration conf
String projectId = GCS_PROJECT_ID.get(config, config::get);
return GoogleCloudStorageOptions.builder()
.setAppName(getApplicationName(config))
.setGrpcWriteEnabled(GCS_GRPC_WRITE_ENABLE.get(config, config::getBoolean))
.setAutoRepairImplicitDirectoriesEnabled(
GCS_REPAIR_IMPLICIT_DIRECTORIES_ENABLE.get(config, config::getBoolean))
.setBatchThreads(GCS_BATCH_THREADS.get(config, config::getInt))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class GoogleHadoopFileSystemConfigurationTest {
put("fs.gs.grpc.trafficdirector.enable", true);
put("fs.gs.grpc.write.buffered.requests", 20);
put("fs.gs.grpc.write.message.timeout", 3_000L);
put("fs.gs.grpc.write.enable", false);
put("fs.gs.hierarchical.namespace.folders.enable", false);
put("fs.gs.grpc.write.timeout", 600_000L);
put("fs.gs.http.connect-timeout", 5_000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ private static String encodeMetadataValues(byte[] bytes) {
@Override
public WritableByteChannel create(StorageResourceId resourceId, CreateObjectOptions options)
throws IOException {
if (!storageOptions.isGrpcWriteEnabled()) {
return super.create(resourceId, options);
}

logger.atFiner().log("create(%s)", resourceId);
checkArgument(
resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public static Builder builder() {
.setOperationTraceLogEnabled(false)
.setTrafficDirectorEnabled(true)
.setWriteChannelOptions(AsyncWriteChannelOptions.DEFAULT)
.setHnBucketRenameEnabled(false);
.setHnBucketRenameEnabled(false)
.setGrpcWriteEnabled(false);
}

public abstract Builder toBuilder();
Expand All @@ -92,6 +93,8 @@ public static Builder builder() {

public abstract String getStorageServicePath();

public abstract boolean isGrpcWriteEnabled();

@Nullable
public abstract String getProjectId();

Expand Down Expand Up @@ -239,6 +242,8 @@ public abstract Builder setGrpcMessageTimeoutCheckInterval(

public abstract Builder setHnBucketRenameEnabled(boolean enabled);

public abstract Builder setGrpcWriteEnabled(boolean grpcWriteEnabled);

abstract GoogleCloudStorageOptions autoBuild();

public GoogleCloudStorageOptions build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.OBJECT_FIELDS;
import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.getBucketRequestString;
import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.getRequestString;
import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.rewriteRequestString;
import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.uploadRequestString;
import static com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper.assertObjectContent;
Expand Down Expand Up @@ -149,6 +150,58 @@ public void open_lazyInit_whenFastFailOnNotFound_isFalse() throws IOException {
trackingGcs.delegate.close();
}

/**
* Even when java-storage client in use, write path get short-circuited via {@link
* GoogleCloudStorageOptions} to use the http implementation.
*/
@Test
public void writeObject_withGrpcWriteDisabled() throws IOException {
StorageResourceId resourceId = new StorageResourceId(testBucket, name.getMethodName());

int uploadChunkSize = 2 * 1024 * 1024;
TrackingStorageWrapper<GoogleCloudStorage> trackingGcs =
newTrackingGoogleCloudStorage(
getOptionsWithUploadChunk(uploadChunkSize).toBuilder()
.setGrpcWriteEnabled(false)
.build());

int partitionsCount = 1;
byte[] partition =
writeObject(
trackingGcs.delegate,
resourceId,
/* partitionSize= */ uploadChunkSize,
partitionsCount);

assertObjectContent(helperGcs, resourceId, partition, partitionsCount);
assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.getAllRequestStrings())
.containsExactlyElementsIn(
ImmutableList.builder()
.add(getRequestString(resourceId.getBucketName(), resourceId.getObjectName()))
.add(
TrackingHttpRequestInitializer.resumableUploadRequestString(
resourceId.getBucketName(),
resourceId.getObjectName(),
/* generationId= */ 1,
/* replaceGenerationId= */ true))
.addAll(
ImmutableList.of(
TrackingHttpRequestInitializer.resumableUploadChunkRequestString(
resourceId.getBucketName(),
resourceId.getObjectName(),
/* generationId= */ 2,
/* uploadId= */ 1)))
.build()
.toArray())
.inOrder();

assertThat(trackingGcs.grpcRequestInterceptor.getAllRequestStrings().size()).isEqualTo(0);
trackingGcs.delegate.close();
}

@Test
public void open_withItemInfo() throws IOException {
int expectedSize = 5 * 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static GoogleCloudStorageOptions.Builder getStandardOptionBuilder() {
return GoogleCloudStorageOptions.builder()
.setAppName(GoogleCloudStorageTestHelper.APP_NAME)
.setDirectPathPreferred(TestConfiguration.getInstance().isDirectPathPreferred())
.setGrpcWriteEnabled(true)
.setProjectId(checkNotNull(TestConfiguration.getInstance().getProjectId()));
}

Expand Down

0 comments on commit ac23990

Please sign in to comment.