Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate gcs-connector options to GcsUtil #32769

Merged
merged 10 commits into from
Dec 3, 2024
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.gcp.options;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import java.util.concurrent.ExecutorService;
import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
Expand All @@ -44,6 +45,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline

void setGcsUtil(GcsUtil value);

@JsonIgnore
@Description(
"The GoogleCloudStorageReadOptions instance that should be used to read from Google Cloud Storage.")
@Default.InstanceFactory(GcsUtil.GcsReadOptionsFactory.class)
@Hidden
GoogleCloudStorageReadOptions getGoogleCloudStorageReadOptions();

void setGoogleCloudStorageReadOptions(GoogleCloudStorageReadOptions value);

/**
* The ExecutorService instance to use to create threads, can be overridden to specify an
* ExecutorService that is compatible with the user's environment. If unset, the default is to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ public static GcsCountersOptions create(
}
}

public static class GcsReadOptionsFactory
implements DefaultValueFactory<GoogleCloudStorageReadOptions> {
@Override
public GoogleCloudStorageReadOptions create(PipelineOptions options) {
return GoogleCloudStorageReadOptions.DEFAULT;
}
}

/**
* This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport
* flags specified on the {@link PipelineOptions}.
Expand Down Expand Up @@ -153,7 +161,8 @@ public GcsUtil create(PipelineOptions options) {
: null,
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null));
: null),
gcsOptions.getGoogleCloudStorageReadOptions());
}

/** Returns an instance of {@link GcsUtil} based on the given parameters. */
Expand All @@ -164,7 +173,8 @@ public static GcsUtil create(
ExecutorService executorService,
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
return new GcsUtil(
storageClient,
httpRequestInitializer,
Expand All @@ -173,7 +183,8 @@ public static GcsUtil create(
credentials,
uploadBufferSizeBytes,
null,
gcsCountersOptions);
gcsCountersOptions,
gcsReadOptions);
}
}

Expand Down Expand Up @@ -249,7 +260,8 @@ public static boolean isWildcard(GcsPath spec) {
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
this.storageClient = storageClient;
this.httpRequestInitializer = httpRequestInitializer;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
Expand All @@ -260,6 +272,7 @@ public static boolean isWildcard(GcsPath spec) {
googleCloudStorageOptions =
GoogleCloudStorageOptions.builder()
.setAppName("Beam")
.setReadChannelOptions(gcsReadOptions)
.setGrpcEnabled(shouldUseGrpc)
.build();
googleCloudStorage =
Expand Down Expand Up @@ -565,7 +578,9 @@ private SeekableByteChannel wrapInCounting(
public SeekableByteChannel open(GcsPath path) throws IOException {
String bucket = path.getBucket();
SeekableByteChannel channel =
googleCloudStorage.open(new StorageResourceId(path.getBucket(), path.getObject()));
googleCloudStorage.open(
new StorageResourceId(path.getBucket(), path.getObject()),
this.googleCloudStorageOptions.getReadChannelOptions());
return wrapInCounting(channel, bucket);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public void testGcpCoreApiSurface() throws Exception {
classesInPackage("com.google.api.services.storage"),
classesInPackage("com.google.auth"),
classesInPackage("com.fasterxml.jackson.annotation"),
classesInPackage("com.google.cloud.hadoop.gcsio"),
Copy link
Contributor

@shunping shunping Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for tracking that down! Pushed a fix. one of the precommit tests is still failing though:

org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarnessTest > testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers FAILED
    java.lang.AssertionError at GrpcCleanupRule.java:201

Not sure if/how this could be related to my PR

classesInPackage("com.google.common.collect"), // Via gcs-connector ReadOptions builder
classesInPackage("java"),
classesInPackage("javax"),
classesInPackage("org.apache.beam.sdk"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,32 @@ public void testCreationWithGcsUtilProvided() {
assertSame(gcsUtil, pipelineOptions.getGcsUtil());
}

@Test
public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Exception {
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder()
.setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO)
.setSupportGzipEncoding(true)
.setFastFailOnNotFound(false)
.build();

GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGoogleCloudStorageReadOptions(readOptions);

GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class);
Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(SeekableByteChannel.class));
gcsUtil.setCloudStorageImpl(googleCloudStorageMock);

assertEquals(readOptions, pipelineOptions.getGoogleCloudStorageReadOptions());

// Assert read options are passed to GCS calls
pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path"));
Mockito.verify(googleCloudStorageMock, Mockito.times(1))
.open(StorageResourceId.fromStringPath("gs://bucket/path"), readOptions);
}

@Test
public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
Expand Down Expand Up @@ -1630,7 +1656,8 @@ public static GcsUtilMock createMock(PipelineOptions options) {
: null,
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null));
: null),
gcsOptions.getGoogleCloudStorageReadOptions());
}

private GcsUtilMock(
Expand All @@ -1641,7 +1668,8 @@ private GcsUtilMock(
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
super(
storageClient,
httpRequestInitializer,
Expand All @@ -1650,7 +1678,8 @@ private GcsUtilMock(
credentials,
uploadBufferSizeBytes,
rewriteDataOpBatchLimit,
gcsCountersOptions);
gcsCountersOptions,
gcsReadOptions);
}

@Override
Expand Down
Loading