diff --git a/CHANGES.md b/CHANGES.md index b6e9ac833c85..01b81bc5d6a6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,10 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686)) -* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) -* gcs-connector config options can be set via GcsOptions; otherwise will be automatically loaded from default Configuration (Java) - +* gcs-connector config options can be set via GcsOptions (Java) ## New Features / Improvements diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle index aff8239cdb9f..493bb6b05018 100644 --- a/sdks/java/extensions/google-cloud-platform-core/build.gradle +++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle @@ -59,9 +59,7 @@ dependencies { testImplementation library.java.mockito_core testRuntimeOnly library.java.slf4j_jdk14 provided library.java.bigdataoss_gcs_connector - provided library.java.hadoop_common testRuntimeOnly library.java.bigdataoss_gcs_connector - testImplementation library.java.hadoop_common } // Note that no runner is specified here, so tests running under this task should not be running diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index efeb1dce1b68..5f3a7a89e8e6 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -39,7 +39,6 @@ import com.google.api.services.storage.model.StorageObject; import com.google.auth.Credentials; import com.google.auto.value.AutoValue; -import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration; import com.google.cloud.hadoop.gcsio.CreateObjectOptions; import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; @@ -97,7 +96,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors; -import org.apache.hadoop.conf.Configuration; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.slf4j.Logger; @@ -129,51 +127,7 @@ public static class GcsReadOptionsFactory implements DefaultValueFactory { @Override public GoogleCloudStorageReadOptions create(PipelineOptions options) { - try { - // Check if gcs-connector-hadoop is loaded into classpath - Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration"); - Configuration config = new Configuration(); - return GoogleCloudStorageReadOptions.builder() - .setFastFailOnNotFound( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE - .get(config, config::getBoolean)) - .setSupportGzipEncoding( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE - .get(config, config::getBoolean)) - .setInplaceSeekLimit( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get( - config, config::getLong)) - .setFadvise( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get( - config, config::getEnum)) - .setMinRangeRequestSize( - GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get( - config, config::getInt)) - .setGrpcChecksumsEnabled( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get( - config, config::getBoolean)) - .setGrpcReadTimeoutMillis( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get( - config, config::getLong)) - .setGrpcReadMessageTimeoutMillis( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get( - config, config::getLong)) - .setGrpcReadMetadataTimeoutMillis( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get( - config, config::getLong)) - .setGrpcReadZeroCopyEnabled( - GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get( - config, config::getBoolean)) - .setTraceLogEnabled( - GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get( - config, config::getBoolean)) - .setTraceLogTimeThreshold( - GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get( - config, config::getLong)) - .build(); - } catch (ClassNotFoundException e) { - return GoogleCloudStorageReadOptions.DEFAULT; - } + return GoogleCloudStorageReadOptions.DEFAULT; } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index 2a1c17f5602f..9415d566b8fd 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -56,7 +56,6 @@ public void testGcpCoreApiSurface() throws Exception { classesInPackage("com.google.auth"), classesInPackage("com.fasterxml.jackson.annotation"), classesInPackage("com.google.cloud.hadoop.gcsio"), - classesInPackage("com.google.common.collect"), // via GoogleCloudStorageReadOptions classesInPackage("java"), classesInPackage("javax"), classesInPackage("org.apache.beam.sdk"), diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index d8fafc56cc7b..97082572ce41 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -112,7 +112,6 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Before; import org.junit.Rule; @@ -178,32 +177,6 @@ public void testCreationWithGcsUtilProvided() { assertSame(gcsUtil, pipelineOptions.getGcsUtil()); } - @Test - public void testCreationWithDefaultGoogleCloudStorageReadOptions() throws Exception { - Configuration.addDefaultResource("test-hadoop-conf.xml"); - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class); - Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) - .thenReturn(Mockito.mock(SeekableByteChannel.class)); - gcsUtil.setCloudStorageImpl(googleCloudStorageMock); - - GoogleCloudStorageReadOptions expectedOptions = - GoogleCloudStorageReadOptions.builder() - .setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO) - .setSupportGzipEncoding(true) - .setFastFailOnNotFound(false) - .build(); - - assertEquals(expectedOptions, pipelineOptions.getGoogleCloudStorageReadOptions()); - - // Assert read options are passed to GCS calls - pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path")); - Mockito.verify(googleCloudStorageMock, Mockito.times(1)) - .open(StorageResourceId.fromStringPath("gs://bucket/path"), expectedOptions); - } - @Test public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Exception { GoogleCloudStorageReadOptions readOptions = diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml b/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml deleted file mode 100644 index 9bb55c8d678d..000000000000 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - fs.gs.inputstream.fast.fail.on.not.found.enable - false - - - fs.gs.inputstream.support.gzip.encoding.enable - true - - - fs.gs.inputstream.fadvise - AUTO - -