Skip to content

Commit

Permalink
Merge pull request #24393: [Spark Dataset runner] Fix support for Jav…
Browse files Browse the repository at this point in the history
…a 11
  • Loading branch information
aromanenko-dev authored Nov 29, 2022
2 parents b9088ba + 36b8994 commit db54c84
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -169,7 +168,7 @@ default <T> void putDataset(PCollection<T> pCollection, Dataset<WindowedValue<T>
*/
private class TranslatingVisitor extends PTransformVisitor implements TranslationState {
private final Map<PCollection<?>, TranslationResult<?>> translationResults;
private final Map<Coder<?>, ExpressionEncoder<?>> encoders;
private final Map<Coder<?>, Encoder<?>> encoders;
private final SparkSession sparkSession;
private final SerializablePipelineOptions serializableOptions;
private final StorageLevel storageLevel;
Expand Down Expand Up @@ -209,7 +208,13 @@ <InT extends PInput, OutT extends POutput> void visit(

@Override
public <T> Encoder<T> encoderOf(Coder<T> coder, Factory<T> factory) {
return (Encoder<T>) encoders.computeIfAbsent(coder, (Factory) factory);
// computeIfAbsent fails with Java 11 on recursive factory
Encoder<T> enc = (Encoder<T>) encoders.get(coder);
if (enc == null) {
enc = factory.apply(coder);
encoders.put(coder, enc);
}
return enc;
}

private <T> TranslationResult<T> getResult(PCollection<T> pCollection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -107,7 +108,7 @@ public class EncoderHelpers {
Double.class);

// Default encoders by class
private static final Map<Class<?>, Encoder<?>> DEFAULT_ENCODERS = new HashMap<>();
private static final Map<Class<?>, Encoder<?>> DEFAULT_ENCODERS = new ConcurrentHashMap<>();

// Factory for default encoders by class
private static final Function<Class<?>, @Nullable Encoder<?>> ENCODER_FACTORY =
Expand Down

0 comments on commit db54c84

Please sign in to comment.