diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 8406dce46ab7..b449db8d0e21 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.flink.translation.wrappers.streaming; import static org.apache.flink.util.Preconditions.checkState; @@ -25,6 +42,7 @@ import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.options.PipelineOptions; @@ -38,7 +56,6 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; - /** * ExecutableStageDoFnOperator. * SDK harness interaction code adopted from @@ -107,7 +124,6 @@ public void open() throws Exception { String id = new BigInteger(32, ThreadLocalRandom.current()).toString(36); processBundleDescriptor = ProcessBundleDescriptors.fromExecutableStage( id, stage, components, dataEndpoint, stateEndpoint); - // TODO: we need to wire in a StateRequestHandler when creating the bundle processor below logger.info(String.format("Process bundle descriptor: %s", processBundleDescriptor)); } @@ -125,7 +141,9 @@ private void processElementWithSdkHarness(WindowedValue element) throws (RemoteInputDestination) processBundleDescriptor.getRemoteInputDestination(); SdkHarnessClient.BundleProcessor processor = client.getProcessor( - processBundleDescriptor.getProcessBundleDescriptor(), destination); + processBundleDescriptor.getProcessBundleDescriptor(), + destination, + session.getStateDelegator()); processor.getRegistrationFuture().toCompletableFuture().get(); Map>> outputCoders = processBundleDescriptor.getOutputTargetCoders(); @@ -168,7 +186,12 @@ public void accept(WindowedValue input) throws Exception { SdkHarnessClient.RemoteOutputReceiver> receiverMap = receiverBuilder.build(); - try (SdkHarnessClient.ActiveBundle bundle = processor.newBundle(receiverMap)) { + // TODO: wire with side input state + StateRequestHandler stateRequestHandler = + new FlinkStreamingStateRequestHandler(payload, components, getRuntimeContext()); + + try (SdkHarnessClient.ActiveBundle bundle = + processor.newBundle(receiverMap, stateRequestHandler)) { FnDataReceiver> inputReceiver = bundle.getInputReceiver(); logger.finer(String.format("Sending value: %s", element)); inputReceiver.accept(element); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStreamingStateRequestHandler.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStreamingStateRequestHandler.java new file mode 100644 index 000000000000..87151f65943c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStreamingStateRequestHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import java.util.concurrent.CompletionStage; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.runtime.state.OperatorStateBackend; + +/** + * {@link StateRequestHandler} that uses a Flink {@link OperatorStateBackend} to manage state. + */ +class FlinkStreamingStateRequestHandler implements StateRequestHandler { + + private final RunnerApi.ExecutableStagePayload payload; + private final RunnerApi.Components components; + private final RuntimeContext runtimeContext; + + public FlinkStreamingStateRequestHandler( + RunnerApi.ExecutableStagePayload payload, + RunnerApi.Components components, RuntimeContext runtimeContext) { + this.payload = payload; + this.components = components; + this.runtimeContext = runtimeContext; + } + + @Override + public CompletionStage handle( + BeamFnApi.StateRequest request) throws Exception { + throw new UnsupportedOperationException(); + } + +}