From d22a7e783abf445ae7bef2ce26075f9d93b409a7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:26:54 -0400 Subject: [PATCH] Make defaults for optional SchemaTransformProvider methods (#30560) * simplify schematransformprovider --- ...am_PostCommit_Python_Xlang_Gcp_Direct.json | 1 + .../transforms/SchemaTransformProvider.java | 9 +++++-- .../TypedSchemaTransformProvider.java | 20 +++++++++++++++- .../TypedSchemaTransformProviderTest.java | 24 +++++++++++++++++-- ...torageWriteApiSchemaTransformProvider.java | 5 ---- .../BigtableReadSchemaTransformProvider.java | 10 -------- .../BigtableWriteSchemaTransformProvider.java | 10 -------- ...yDirectReadSchemaTransformProviderIT.java} | 2 +- ...ageWriteApiSchemaTransformProviderIT.java} | 2 +- 9 files changed, 51 insertions(+), 32 deletions(-) rename sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/{BigQueryDirectReadSchemaTransformProviderTest.java => BigQueryDirectReadSchemaTransformProviderIT.java} (99%) rename sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/{BigQueryStorageWriteApiSchemaTransformProviderTest.java => BigQueryStorageWriteApiSchemaTransformProviderIT.java} (99%) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index e69de29bb2d1..8b137891791f 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -0,0 +1 @@ + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java index c76d7a25e69b..9d0ad61b7a6c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.schemas.transforms; +import java.util.Collections; import java.util.List; import java.util.Optional; import org.apache.beam.sdk.annotations.Internal; @@ -58,10 +59,14 @@ default String description() { SchemaTransform from(Row configuration); /** Returns the input collection names of this transform. */ - List inputCollectionNames(); + default List inputCollectionNames() { + return Collections.emptyList(); + } /** Returns the output collection names of this transform. */ - List outputCollectionNames(); + default List outputCollectionNames() { + return Collections.emptyList(); + } /** * List the dependencies needed for this transform. Jars from classpath are used by default when diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index 1117f59a7481..e75fa27d2d16 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -17,8 +17,13 @@ */ package org.apache.beam.sdk.schemas.transforms; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import java.lang.reflect.ParameterizedType; import java.util.List; import java.util.Optional; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; @@ -39,7 +44,20 @@ @Internal public abstract class TypedSchemaTransformProvider implements SchemaTransformProvider { - protected abstract Class configurationClass(); + @SuppressWarnings("unchecked") + protected Class configurationClass() { + @Nullable + ParameterizedType parameterizedType = (ParameterizedType) getClass().getGenericSuperclass(); + checkStateNotNull( + parameterizedType, "Could not get the TypedSchemaTransformProvider's parameterized type."); + checkArgument( + parameterizedType.getActualTypeArguments().length == 1, + String.format( + "Expected one parameterized type, but got %s.", + parameterizedType.getActualTypeArguments().length)); + + return (Class) parameterizedType.getActualTypeArguments()[0]; + } /** * Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index 2b698f4f67bb..6b5ccbff4e42 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -94,6 +94,19 @@ public Optional> dependencies( } } + private static class FakeMinimalTypedProvider + extends TypedSchemaTransformProvider { + @Override + public String identifier() { + return "fake:v1"; + } + + @Override + public SchemaTransform from(Configuration config) { + return new FakeSchemaTransform(config); + } + } + public static class FakeSchemaTransform extends SchemaTransform { public Configuration config; @@ -111,6 +124,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { @Test public void testFrom() { SchemaTransformProvider provider = new FakeTypedSchemaIOProvider(); + SchemaTransformProvider minimalProvider = new FakeMinimalTypedProvider(); + Row inputConfig = Row.withSchema(provider.configurationSchema()) .withFieldValue("field1", "field1") @@ -118,8 +133,13 @@ public void testFrom() { .build(); Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config; - assertEquals("field1", outputConfig.getField1()); - assertEquals(13, outputConfig.getField2().intValue()); + Configuration minimalOutputConfig = + ((FakeSchemaTransform) minimalProvider.from(inputConfig)).config; + + for (Configuration config : Arrays.asList(outputConfig, minimalOutputConfig)) { + assertEquals("field1", config.getField1()); + assertEquals(13, config.getField2().intValue()); + } assertEquals("Description of fake provider", provider.description()); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index ca0d0a2784bd..980d783ec43c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -88,11 +88,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider // magic string that tells us to write to dynamic destinations protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"; - @Override - protected Class configurationClass() { - return BigQueryStorageWriteApiSchemaTransformConfiguration.class; - } - @Override protected SchemaTransform from( BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java index a9f113f1ce97..f48a23559141 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java @@ -70,11 +70,6 @@ public class BigtableReadSchemaTransformProvider Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA)))) .build(); - @Override - protected Class configurationClass() { - return BigtableReadSchemaTransformConfiguration.class; - } - @Override protected SchemaTransform from(BigtableReadSchemaTransformConfiguration configuration) { return new BigtableReadSchemaTransform(configuration); @@ -85,11 +80,6 @@ public String identifier() { return "beam:schematransform:org.apache.beam:bigtable_read:v1"; } - @Override - public List inputCollectionNames() { - return Collections.emptyList(); - } - @Override public List outputCollectionNames() { return Collections.singletonList(OUTPUT_TAG); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index b99b69621a84..cc480be6aa7e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -60,11 +60,6 @@ public class BigtableWriteSchemaTransformProvider private static final String INPUT_TAG = "input"; - @Override - protected Class configurationClass() { - return BigtableWriteSchemaTransformConfiguration.class; - } - @Override protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configuration) { return new BigtableWriteSchemaTransform(configuration); @@ -80,11 +75,6 @@ public List inputCollectionNames() { return Collections.singletonList(INPUT_TAG); } - @Override - public List outputCollectionNames() { - return Collections.emptyList(); - } - /** Configuration for writing to Bigtable. */ @DefaultSchema(AutoValueSchema.class) @AutoValue diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java similarity index 99% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java index 2363a870bbd7..958409eb5e3c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java @@ -74,7 +74,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class BigQueryDirectReadSchemaTransformProviderTest { +public class BigQueryDirectReadSchemaTransformProviderIT { private static PipelineOptions testOptions = TestPipeline.testingPipelineOptions(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java similarity index 99% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java index 64ea0b11d1b9..8b8a3b759497 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java @@ -61,7 +61,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class BigQueryStorageWriteApiSchemaTransformProviderTest { +public class BigQueryStorageWriteApiSchemaTransformProviderIT { private FakeDatasetService fakeDatasetService = new FakeDatasetService(); private FakeJobService fakeJobService = new FakeJobService();