Skip to content

Commit

Permalink
Remove Hadoop dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Nov 25, 2024
1 parent ed811a3 commit 661a826
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 95 deletions.
5 changes: 1 addition & 4 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
* gcs-connector config options can be set via GcsOptions; otherwise will be automatically loaded from default Configuration (Java)

* gcs-connector config options can be set via GcsOptions (Java)

## New Features / Improvements

Expand Down
2 changes: 0 additions & 2 deletions sdks/java/extensions/google-cloud-platform-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ dependencies {
testImplementation library.java.mockito_core
testRuntimeOnly library.java.slf4j_jdk14
provided library.java.bigdataoss_gcs_connector
provided library.java.hadoop_common
testRuntimeOnly library.java.bigdataoss_gcs_connector
testImplementation library.java.hadoop_common
}

// Note that no runner is specified here, so tests running under this task should not be running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.google.api.services.storage.model.StorageObject;
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration;
import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;
Expand Down Expand Up @@ -97,7 +96,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.conf.Configuration;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -129,51 +127,7 @@ public static class GcsReadOptionsFactory
implements DefaultValueFactory<GoogleCloudStorageReadOptions> {
@Override
public GoogleCloudStorageReadOptions create(PipelineOptions options) {
try {
// Check if gcs-connector-hadoop is loaded into classpath
Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration");
Configuration config = new Configuration();
return GoogleCloudStorageReadOptions.builder()
.setFastFailOnNotFound(
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE
.get(config, config::getBoolean))
.setSupportGzipEncoding(
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE
.get(config, config::getBoolean))
.setInplaceSeekLimit(
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get(
config, config::getLong))
.setFadvise(
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get(
config, config::getEnum))
.setMinRangeRequestSize(
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get(
config, config::getInt))
.setGrpcChecksumsEnabled(
GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get(
config, config::getBoolean))
.setGrpcReadTimeoutMillis(
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get(
config, config::getLong))
.setGrpcReadMessageTimeoutMillis(
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get(
config, config::getLong))
.setGrpcReadMetadataTimeoutMillis(
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get(
config, config::getLong))
.setGrpcReadZeroCopyEnabled(
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get(
config, config::getBoolean))
.setTraceLogEnabled(
GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get(
config, config::getBoolean))
.setTraceLogTimeThreshold(
GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get(
config, config::getLong))
.build();
} catch (ClassNotFoundException e) {
return GoogleCloudStorageReadOptions.DEFAULT;
}
return GoogleCloudStorageReadOptions.DEFAULT;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void testGcpCoreApiSurface() throws Exception {
classesInPackage("com.google.auth"),
classesInPackage("com.fasterxml.jackson.annotation"),
classesInPackage("com.google.cloud.hadoop.gcsio"),
classesInPackage("com.google.common.collect"), // via GoogleCloudStorageReadOptions
classesInPackage("java"),
classesInPackage("javax"),
classesInPackage("org.apache.beam.sdk"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -178,32 +177,6 @@ public void testCreationWithGcsUtilProvided() {
assertSame(gcsUtil, pipelineOptions.getGcsUtil());
}

@Test
public void testCreationWithDefaultGoogleCloudStorageReadOptions() throws Exception {
Configuration.addDefaultResource("test-hadoop-conf.xml");
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);

GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class);
Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(SeekableByteChannel.class));
gcsUtil.setCloudStorageImpl(googleCloudStorageMock);

GoogleCloudStorageReadOptions expectedOptions =
GoogleCloudStorageReadOptions.builder()
.setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO)
.setSupportGzipEncoding(true)
.setFastFailOnNotFound(false)
.build();

assertEquals(expectedOptions, pipelineOptions.getGoogleCloudStorageReadOptions());

// Assert read options are passed to GCS calls
pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path"));
Mockito.verify(googleCloudStorageMock, Mockito.times(1))
.open(StorageResourceId.fromStringPath("gs://bucket/path"), expectedOptions);
}

@Test
public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Exception {
GoogleCloudStorageReadOptions readOptions =
Expand Down

This file was deleted.

0 comments on commit 661a826

Please sign in to comment.