Skip to content

Commit

Permalink
Merge pull request #32705: fix schema inference for parameterized types
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax authored Oct 9, 2024
1 parent 20d0f6e commit c243491
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public PipelineResult run(PipelineOptions options) {
/** Returns the {@link CoderRegistry} that this {@link Pipeline} uses. */
public CoderRegistry getCoderRegistry() {
if (coderRegistry == null) {
coderRegistry = CoderRegistry.createDefault();
coderRegistry = CoderRegistry.createDefault(getSchemaRegistry());
}
return coderRegistry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.beam.sdk.io.fs.MetadataCoder;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.CoderUtils;
Expand Down Expand Up @@ -195,11 +197,17 @@ public <T> Coder<T> coderFor(
* the lexicographically smallest {@link Class#getName() class name} being used.
* </ul>
*/
public static CoderRegistry createDefault(@Nullable SchemaRegistry schemaRegistry) {
return new CoderRegistry(schemaRegistry);
}

/** Backwards compatible version of createDefault. */
public static CoderRegistry createDefault() {
return new CoderRegistry();
return new CoderRegistry(null);
}

private CoderRegistry() {
private CoderRegistry(@Nullable SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
coderProviders = new ArrayDeque<>(REGISTERED_CODER_FACTORIES);
}

Expand Down Expand Up @@ -590,6 +598,8 @@ private static boolean isNullOrEmpty(Collection<?> c) {
/** The list of {@link CoderProvider coder providers} to use to provide Coders. */
private ArrayDeque<CoderProvider> coderProviders;

private final @Nullable SchemaRegistry schemaRegistry;

/**
* Returns a {@link Coder} to use for values of the given type, in a context where the given types
* use the given coders.
Expand Down Expand Up @@ -650,16 +660,28 @@ private Coder<?> getCoderFromParameterizedType(

List<Coder<?>> typeArgumentCoders = new ArrayList<>();
for (Type typeArgument : type.getActualTypeArguments()) {
try {
Coder<?> typeArgumentCoder =
getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
typeArgumentCoders.add(typeArgumentCoder);
} catch (CannotProvideCoderException exc) {
throw new CannotProvideCoderException(
String.format(
"Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()),
exc);
Coder<?> typeArgumentCoder = null;
if (schemaRegistry != null) {
TypeDescriptor<?> typeDescriptor = TypeDescriptor.of(typeArgument);
try {
typeArgumentCoder = schemaRegistry.getSchemaCoder(typeDescriptor);
} catch (NoSuchSchemaException e) {
// No schema.
}
}

if (typeArgumentCoder == null) {
try {
typeArgumentCoder =
getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
} catch (CannotProvideCoderException exc) {
throw new CannotProvideCoderException(
String.format(
"Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()),
exc);
}
}
typeArgumentCoders.add(typeArgumentCoder);
}
return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.List;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
Expand Down Expand Up @@ -223,6 +227,22 @@ public void testRegisterPojo() throws NoSuchSchemaException {
assertTrue(SIMPLE_POJO_SCHEMA.equivalent(schema));
}

@Test
public void testSchemaTypeParameterInsideCoder() throws CannotProvideCoderException {
SchemaRegistry schemaRegistry = SchemaRegistry.createDefault();
schemaRegistry.registerPOJO(SimplePOJO.class);

CoderRegistry coderRegistry = CoderRegistry.createDefault(schemaRegistry);
Coder<Iterable<SimplePOJO>> coder =
coderRegistry.getCoder(TypeDescriptors.iterables(TypeDescriptor.of(SimplePOJO.class)));
assertTrue(coder instanceof IterableCoder);
assertEquals(1, coder.getCoderArguments().size());
assertTrue(coder.getCoderArguments().get(0) instanceof SchemaCoder);
assertTrue(
SIMPLE_POJO_SCHEMA.equivalent(
((SchemaCoder<SimplePOJO>) coder.getCoderArguments().get(0)).getSchema()));
}

@Test
public void testRegisterJavaBean() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
Expand Down

0 comments on commit c243491

Please sign in to comment.