Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make defaults for optional SchemaTransformProvider methods #30560

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.annotations.Internal;
Expand Down Expand Up @@ -58,10 +59,14 @@ default String description() {
SchemaTransform from(Row configuration);

/** Returns the input collection names of this transform. */
List<String> inputCollectionNames();
default List<String> inputCollectionNames() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intellij thinks these methods are unused - is intellij wrong? or could they be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reality, developers make their providers by extending TypedSchemaTransformProvider, which implements SchemaTransformProvider. I guess intellij might grey it out because they're not directly used from SchemaTransformProvider

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if they override these methods but we never call them, does it matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh these methods are eventually called by the expansion service to create the discover response. Reference here:

schemaTransformConfigBuilder.addAllInputPcollectionNames(provider.inputCollectionNames());
schemaTransformConfigBuilder.addAllOutputPcollectionNames(provider.outputCollectionNames());

return Collections.emptyList();
}

/** Returns the output collection names of this transform. */
List<String> outputCollectionNames();
default List<String> outputCollectionNames() {
return Collections.emptyList();
}

/**
* List the dependencies needed for this transform. Jars from classpath are used by default when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.annotations.Internal;
Expand All @@ -39,7 +42,16 @@
@Internal
public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider {

protected abstract Class<ConfigT> configurationClass();
@SuppressWarnings("unchecked")
protected Class<ConfigT> configurationClass() {
Optional<ParameterizedType> parameterizedType =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you just put it in an Optional and then take it out again, might be simpler to go ahead like

@Nullable ParameterizedType parameterizedType = (ParameterizedType) getClass().getGenericSuperclass();
checkStateNotNull(superClass, "Could not ...");
return (Class<ConfigT>) parameterizedType.getActualTypeArguments[0];

FWIW I am not sure if getActualTypeArguments[0] could still be a type variable in some cases. You might want to check that it is a usefully defined type in some more ways... but also this is probably just always going to work, because of how these things are authored. Very nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, applied the suggestion.

Optional.ofNullable((ParameterizedType) getClass().getGenericSuperclass());
checkArgument(
parameterizedType.isPresent(),
"Could not get the TypedSchemaTransformProvider's parameterized type.");

return (Class<ConfigT>) parameterizedType.get().getActualTypeArguments()[0];
}

/**
* Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ public Optional<List<String>> dependencies(
}
}

private static class FakeMinimalTypedProvider
extends TypedSchemaTransformProvider<Configuration> {
@Override
public String identifier() {
return "fake:v1";
}

@Override
public SchemaTransform from(Configuration config) {
return new FakeSchemaTransform(config);
}
}

public static class FakeSchemaTransform extends SchemaTransform {

public Configuration config;
Expand All @@ -111,15 +124,22 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
@Test
public void testFrom() {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
SchemaTransformProvider minimalProvider = new FakeMinimalTypedProvider();

Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("field1", "field1")
.withFieldValue("field2", Integer.valueOf(13))
.build();

Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
assertEquals("field1", outputConfig.getField1());
assertEquals(13, outputConfig.getField2().intValue());
Configuration minimalOutputConfig =
((FakeSchemaTransform) minimalProvider.from(inputConfig)).config;

for (Configuration config : Arrays.asList(outputConfig, minimalOutputConfig)) {
assertEquals("field1", config.getField1());
assertEquals(13, config.getField2().intValue());
}
assertEquals("Description of fake provider", provider.description());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
// magic string that tells us to write to dynamic destinations
protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";

@Override
protected Class<BigQueryStorageWriteApiSchemaTransformConfiguration> configurationClass() {
return BigQueryStorageWriteApiSchemaTransformConfiguration.class;
}

@Override
protected SchemaTransform from(
BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ public class BigtableReadSchemaTransformProvider
Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA))))
.build();

@Override
protected Class<BigtableReadSchemaTransformConfiguration> configurationClass() {
return BigtableReadSchemaTransformConfiguration.class;
}

@Override
protected SchemaTransform from(BigtableReadSchemaTransformConfiguration configuration) {
return new BigtableReadSchemaTransform(configuration);
Expand All @@ -85,11 +80,6 @@ public String identifier() {
return "beam:schematransform:org.apache.beam:bigtable_read:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ public class BigtableWriteSchemaTransformProvider

private static final String INPUT_TAG = "input";

@Override
protected Class<BigtableWriteSchemaTransformConfiguration> configurationClass() {
return BigtableWriteSchemaTransformConfiguration.class;
}

@Override
protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configuration) {
return new BigtableWriteSchemaTransform(configuration);
Expand All @@ -80,11 +75,6 @@ public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}

@Override
public List<String> outputCollectionNames() {
return Collections.emptyList();
}

/** Configuration for writing to Bigtable. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class BigQueryDirectReadSchemaTransformProviderTest {
public class BigQueryDirectReadSchemaTransformProviderIT {

private static PipelineOptions testOptions = TestPipeline.testingPipelineOptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class BigQueryStorageWriteApiSchemaTransformProviderTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks DataflowV1 and V2 tests. Any reason moving them from unit test to integration test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC: @ahmedabu98 @kennknowles @damondouglas

They use fake BigQuery service so has to be executed locally. Either exclude them from Dataflow test suites or move back to unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh no reason, bad mistake. I'll open a PR to revert this part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this, pls take a look at #30623

public class BigQueryStorageWriteApiSchemaTransformProviderIT {

private FakeDatasetService fakeDatasetService = new FakeDatasetService();
private FakeJobService fakeJobService = new FakeJobService();
Expand Down
Loading