Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AVRO] Prioritise Avro providers from "extensions/core" #25611

Merged
merged 1 commit into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@Internal
@Experimental(Kind.SCHEMAS)
public final class Providers {

public interface Identifyable {
/**
* Returns an id that uniquely represents this among others implementing its derived interface.
Expand All @@ -44,16 +45,12 @@ public static <T extends Identifyable> Map<String, T> loadProviders(Class<T> kla
for (T provider : ServiceLoader.load(klass)) {
// Avro provider is treated as a special case since two Avro providers may want to be loaded -
// from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
// TODO: this check should be removed once once AvroPayloadSerializerProvider from "core" is
// TODO: we won't need this check once all Avro providers from "core" will be
// removed
if (provider.identifier().equals("avro")) {
// Avro provider from "extensions/avro" must have a priority.
if (provider
.getClass()
.getName()
.equals(
"org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider")) {
// Use AvroPayloadSerializerProvider from extensions/avro by any case.
if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
// Load Avro provider from "extensions/avro" by any case.
providers.put(provider.identifier(), provider);
} else {
// Load Avro provider from "core" if it was not loaded from Avro extension before.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.beam.sdk.extensions.schemaio.expansion;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.service.AutoService;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -50,22 +53,57 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi

@Override
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
try {
for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
builder.put(
"beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
new ReaderBuilder(schemaIOProvider));
builder.put(
"beam:transform:org.apache.beam:schemaio_"
+ schemaIOProvider.identifier()
+ "_write:v1",
new WriterBuilder(schemaIOProvider));
for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
// Avro provider is treated as a special case since two Avro providers may want to be loaded
// from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
// TODO: we won't need this check once all Avro providers from "core" will be
// removed
if (provider.identifier().equals("avro")) {
// Avro provider from "extensions/avro" must have a priority.
if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
// Load Avro provider from "extensions/avro" by any case.
registerProvider(providers, provider);
} else {
// Load Avro provider from "core" if it was not loaded from Avro extension before.
registerProviderOptionally(providers, provider);
}
} else {
aromanenko-dev marked this conversation as resolved.
Show resolved Hide resolved
final String identifier =
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1";
checkState(
!providers.containsKey(identifier),
"Duplicate providers exist with identifier `%s` for class %s.",
identifier,
SchemaIOProvider.class);
registerProvider(providers, provider);
}
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
return builder.build();
return ImmutableMap.copyOf(providers);
}

private void registerProvider(
Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
providers.put(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
new ReaderBuilder(provider));
providers.put(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
new WriterBuilder(provider));
}

private void registerProviderOptionally(
Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
providers.putIfAbsent(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
new ReaderBuilder(provider));
providers.putIfAbsent(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
new WriterBuilder(provider));
}

public static class Configuration {
Expand Down