Skip to content

Commit

Permalink
Merge pull request #30252: Fail later when duplicate transform transl…
Browse files Browse the repository at this point in the history
…ators are on the classpath
  • Loading branch information
kennknowles authored Feb 8, 2024
2 parents a15dd7e + 9b4b509 commit 382c6dc
Show file tree
Hide file tree
Showing 8 changed files with 541 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.runners.core.construction.ExternalTranslation.ExternalTranslator;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -58,7 +59,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSortedSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -399,48 +400,78 @@ public RunnerApi.PTransform translate(
}
}

/**
* Translates a set of registered transforms whose content only differs based by differences in
* their {@link FunctionSpec}s and URNs.
*/
private static class KnownTransformPayloadTranslator<T extends PTransform<?, ?>>
implements TransformTranslator<T> {
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
private static @MonotonicNonNull Map<Class<? extends PTransform>, TransformPayloadTranslator>
knownPayloadTranslators;

private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
loadTransformPayloadTranslators() {
HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators =
new HashMap<>();
@Internal
public static Map<Class<? extends PTransform>, TransformPayloadTranslator>
getKnownPayloadTranslators() {
if (knownPayloadTranslators == null) {
knownPayloadTranslators = loadTransformPayloadTranslators();
}
return knownPayloadTranslators;
}

for (TransformPayloadTranslatorRegistrar registrar :
ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
loadTransformPayloadTranslators() {

Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators =
(Map) registrar.getTransformPayloadTranslators();
HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators = new HashMap<>();

Set<Class<? extends PTransform>> alreadyRegistered =
Sets.intersection(translators.keySet(), newTranslators.keySet());
ImmutableSet.Builder<Class<? extends PTransform>> conflictingRegistrations =
new ImmutableSet.Builder<>();

if (!alreadyRegistered.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Classes already registered: %s", Joiner.on(", ").join(alreadyRegistered)));
}
for (TransformPayloadTranslatorRegistrar registrar :
ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {

translators.putAll(newTranslators);
Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators =
(Map) registrar.getTransformPayloadTranslators();

for (Map.Entry<Class<? extends PTransform>, TransformPayloadTranslator> entry :
newTranslators.entrySet()) {
// spotbugs enforces using entrySet() and then getKey() for micro-optimization
Class<? extends PTransform> ptransformClass = entry.getKey();
TransformPayloadTranslator newTranslator = entry.getValue();
@Nullable TransformPayloadTranslator existingTranslator = translators.get(ptransformClass);

if (existingTranslator == null) {
translators.put(ptransformClass, newTranslator);
} else {
LOG.error(
"Conflicting registrations for {}: {} and {}",
ptransformClass,
existingTranslator,
newTranslator);
conflictingRegistrations.add(ptransformClass);
}
}
return ImmutableMap.copyOf(translators);
}
Set<Class<? extends PTransform>> conflictingRegistrationSet = conflictingRegistrations.build();

if (!conflictingRegistrationSet.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Conflicting registrations for: %s",
Joiner.on(", ").join(conflictingRegistrationSet)));
}

return ImmutableMap.copyOf(translators);
}

/**
* Translates a set of registered transforms whose content only differs based by differences in
* their {@link FunctionSpec}s and URNs.
*/
private static class KnownTransformPayloadTranslator<T extends PTransform<?, ?>>
implements TransformTranslator<T> {

@Override
public boolean canTranslate(PTransform pTransform) {
return KNOWN_PAYLOAD_TRANSLATORS.containsKey(pTransform.getClass());
return getKnownPayloadTranslators().containsKey(pTransform.getClass());
}

@Override
public String getUrn(PTransform transform) {
return KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
return getKnownPayloadTranslators().get(transform.getClass()).getUrn(transform);
}

@Override
Expand All @@ -453,7 +484,7 @@ public RunnerApi.PTransform translate(
translateAppliedPTransform(appliedPTransform, subtransforms, components);

TransformPayloadTranslator payloadTranslator =
KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
getKnownPayloadTranslators().get(appliedPTransform.getTransform().getClass());
FunctionSpec spec = payloadTranslator.translate(appliedPTransform, components);
if (spec != null) {
transformBuilder.setSpec(spec);
Expand Down Expand Up @@ -578,7 +609,6 @@ default String getUrn(T transform) {
return getUrn();
}

/** */
/**
* Translates the given transform represented by the provided {@code AppliedPTransform} to a
* {@code FunctionSpec} with a URN and a payload.
Expand Down
Loading

0 comments on commit 382c6dc

Please sign in to comment.