Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-use serializable pipeline options when already available. #24192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
private final SerializablePipelineOptions pipelineOptions;

public CoderTypeInformation(Coder<T> coder, PipelineOptions pipelineOptions) {
this(coder, new SerializablePipelineOptions(pipelineOptions));
}

public CoderTypeInformation(Coder<T> coder, SerializablePipelineOptions pipelineOptions) {
checkNotNull(coder);
checkNotNull(pipelineOptions);
this.coder = coder;
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
this.pipelineOptions = pipelineOptions;
}

public Coder<T> getCoder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ public ByteBuffer getKey(WindowedValue<KV<K, V>> value) {

@Override
public TypeInformation<ByteBuffer> getProducedType() {
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get());
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ public ByteBuffer getKey(WindowedValue<KV<KV<K, V>, Double>> value) {

@Override
public TypeInformation<ByteBuffer> getProducedType() {
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get());
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ public ByteBuffer getKey(WindowedValue<KeyedWorkItem<K, V>> value) throws Except

@Override
public TypeInformation<ByteBuffer> getProducedType() {
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get());
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
@SuppressWarnings("unchecked")
CoderTypeInformation<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>>
typeInformation =
(CoderTypeInformation)
new CoderTypeInformation<>(checkpointCoder, serializedOptions.get());
(CoderTypeInformation) new CoderTypeInformation<>(checkpointCoder, serializedOptions);
stateForCheckpoint =
stateStore.getListState(
new ListStateDescriptor<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
Expand Down Expand Up @@ -102,14 +101,14 @@ public <T extends State> T state(
public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> address, Coder<T2> coder) {

return new FlinkBroadcastValueState<>(
stateBackend, address, namespace, coder, pipelineOptions.get());
stateBackend, address, namespace, coder, pipelineOptions);
}

@Override
public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address, Coder<T2> elemCoder) {

return new FlinkBroadcastBagState<>(
stateBackend, address, namespace, elemCoder, pipelineOptions.get());
stateBackend, address, namespace, elemCoder, pipelineOptions);
}

@Override
Expand Down Expand Up @@ -142,7 +141,7 @@ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {

return new FlinkCombiningState<>(
stateBackend, address, combineFn, namespace, accumCoder, pipelineOptions.get());
stateBackend, address, combineFn, namespace, accumCoder, pipelineOptions);
}

@Override
Expand Down Expand Up @@ -187,7 +186,7 @@ private abstract class AbstractBroadcastState<T> {
String name,
StateNamespace namespace,
Coder<T> coder,
PipelineOptions pipelineOptions) {
SerializablePipelineOptions pipelineOptions) {
this.name = name;

this.namespace = namespace;
Expand Down Expand Up @@ -303,7 +302,7 @@ private class FlinkBroadcastValueState<T> extends AbstractBroadcastState<T>
StateTag<ValueState<T>> address,
StateNamespace namespace,
Coder<T> coder,
PipelineOptions pipelineOptions) {
SerializablePipelineOptions pipelineOptions) {
super(flinkStateBackend, address.getId(), namespace, coder, pipelineOptions);

this.namespace = namespace;
Expand Down Expand Up @@ -363,7 +362,7 @@ private class FlinkBroadcastBagState<T> extends AbstractBroadcastState<List<T>>
StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder,
PipelineOptions pipelineOptions) {
SerializablePipelineOptions pipelineOptions) {
super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder), pipelineOptions);

this.namespace = namespace;
Expand Down Expand Up @@ -451,7 +450,7 @@ private class FlinkCombiningState<InputT, AccumT, OutputT> extends AbstractBroad
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
PipelineOptions pipelineOptions) {
SerializablePipelineOptions pipelineOptions) {
super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions);

this.namespace = namespace;
Expand Down Expand Up @@ -568,7 +567,7 @@ private class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT>
StateNamespace namespace,
Coder<AccumT> accumCoder,
CombineWithContext.Context context) {
super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions.get());
super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions);

this.namespace = namespace;
this.address = address;
Expand Down