Skip to content

Commit

Permalink
Make defaults for optional SchemaTransformProvider methods (#30560)
Browse files Browse the repository at this point in the history
* simplify schematransformprovider
  • Loading branch information
ahmedabu98 authored Mar 11, 2024
1 parent fedca3c commit d22a7e7
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.annotations.Internal;
Expand Down Expand Up @@ -58,10 +59,14 @@ default String description() {
SchemaTransform from(Row configuration);

/** Returns the input collection names of this transform. */
List<String> inputCollectionNames();
default List<String> inputCollectionNames() {
return Collections.emptyList();
}

/** Returns the output collection names of this transform. */
List<String> outputCollectionNames();
default List<String> outputCollectionNames() {
return Collections.emptyList();
}

/**
* List the dependencies needed for this transform. Jars from classpath are used by default when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
Expand All @@ -39,7 +44,20 @@
@Internal
public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider {

protected abstract Class<ConfigT> configurationClass();
@SuppressWarnings("unchecked")
protected Class<ConfigT> configurationClass() {
@Nullable
ParameterizedType parameterizedType = (ParameterizedType) getClass().getGenericSuperclass();
checkStateNotNull(
parameterizedType, "Could not get the TypedSchemaTransformProvider's parameterized type.");
checkArgument(
parameterizedType.getActualTypeArguments().length == 1,
String.format(
"Expected one parameterized type, but got %s.",
parameterizedType.getActualTypeArguments().length));

return (Class<ConfigT>) parameterizedType.getActualTypeArguments()[0];
}

/**
* Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ public Optional<List<String>> dependencies(
}
}

private static class FakeMinimalTypedProvider
extends TypedSchemaTransformProvider<Configuration> {
@Override
public String identifier() {
return "fake:v1";
}

@Override
public SchemaTransform from(Configuration config) {
return new FakeSchemaTransform(config);
}
}

public static class FakeSchemaTransform extends SchemaTransform {

public Configuration config;
Expand All @@ -111,15 +124,22 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
@Test
public void testFrom() {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
SchemaTransformProvider minimalProvider = new FakeMinimalTypedProvider();

Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("field1", "field1")
.withFieldValue("field2", Integer.valueOf(13))
.build();

Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
assertEquals("field1", outputConfig.getField1());
assertEquals(13, outputConfig.getField2().intValue());
Configuration minimalOutputConfig =
((FakeSchemaTransform) minimalProvider.from(inputConfig)).config;

for (Configuration config : Arrays.asList(outputConfig, minimalOutputConfig)) {
assertEquals("field1", config.getField1());
assertEquals(13, config.getField2().intValue());
}
assertEquals("Description of fake provider", provider.description());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
// magic string that tells us to write to dynamic destinations
protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";

@Override
protected Class<BigQueryStorageWriteApiSchemaTransformConfiguration> configurationClass() {
return BigQueryStorageWriteApiSchemaTransformConfiguration.class;
}

@Override
protected SchemaTransform from(
BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ public class BigtableReadSchemaTransformProvider
Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA))))
.build();

@Override
protected Class<BigtableReadSchemaTransformConfiguration> configurationClass() {
return BigtableReadSchemaTransformConfiguration.class;
}

@Override
protected SchemaTransform from(BigtableReadSchemaTransformConfiguration configuration) {
return new BigtableReadSchemaTransform(configuration);
Expand All @@ -85,11 +80,6 @@ public String identifier() {
return "beam:schematransform:org.apache.beam:bigtable_read:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ public class BigtableWriteSchemaTransformProvider

private static final String INPUT_TAG = "input";

@Override
protected Class<BigtableWriteSchemaTransformConfiguration> configurationClass() {
return BigtableWriteSchemaTransformConfiguration.class;
}

@Override
protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configuration) {
return new BigtableWriteSchemaTransform(configuration);
Expand All @@ -80,11 +75,6 @@ public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}

@Override
public List<String> outputCollectionNames() {
return Collections.emptyList();
}

/** Configuration for writing to Bigtable. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class BigQueryDirectReadSchemaTransformProviderTest {
public class BigQueryDirectReadSchemaTransformProviderIT {

private static PipelineOptions testOptions = TestPipeline.testingPipelineOptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class BigQueryStorageWriteApiSchemaTransformProviderTest {
public class BigQueryStorageWriteApiSchemaTransformProviderIT {

private FakeDatasetService fakeDatasetService = new FakeDatasetService();
private FakeJobService fakeJobService = new FakeJobService();
Expand Down

0 comments on commit d22a7e7

Please sign in to comment.