diff --git a/gradle.properties b/gradle.properties index a5c5195bb377..953086f0c5f8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -32,5 +32,5 @@ javaVersion=1.8 docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ -flink_versions=1.10,1.11,1.12 +flink_versions=1.10,1.11,1.12,1.13 diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle new file mode 100644 index 000000000000..3fce89bc470e --- /dev/null +++ b/runners/flink/1.13/build.gradle @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '..' +/* All properties required for loading the Flink build script */ +project.ext { + // Set the version of all Flink-related dependencies here. + flink_version = '1.13.0' + // Version specific code overrides. + main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", "${basePath}/1.12/src/main/java", './src/test/java'] + main_resources_overrides = [] + test_resources_overrides = [] + archives_base_name = 'beam-runners-flink-1.13' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.13/job-server-container/build.gradle b/runners/flink/1.13/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/1.13/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.13/job-server/build.gradle b/runners/flink/1.13/job-server/build.gradle new file mode 100644 index 000000000000..a7e6fd6eb599 --- /dev/null +++ b/runners/flink/1.13/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.13-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java new file mode 100644 index 000000000000..2707395b194c --- /dev/null +++ b/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. + * + *

This change is becauses RecordWriter is wrapped in RecordWriterDelegate in 1.10, please refer + * to https://github.com/apache/flink/commit/2c8b4ef572f05bf4740b7e204af1e5e709cd945c for more + * details. + */ +public class StreamSources { + + /** + * Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has + * been removed in Flink 1.12. + * + * @param source Source to get single input from. + * @return Input transformation. + */ + public static Transformation getOnlyInput(OneInputTransformation source) { + return Iterables.getOnlyElement(source.getInputs()); + } + + public static > void run( + StreamSource streamSource, + Object lockingObject, + StreamStatusMaintainer streamStatusMaintainer, + Output> collector) + throws Exception { + streamSource.run( + lockingObject, streamStatusMaintainer, collector, createOperatorChain(streamSource)); + } + + private static OperatorChain createOperatorChain(AbstractStreamOperator operator) { + return new OperatorChain<>( + operator.getContainingTask(), + StreamTask.createRecordWriterDelegate( + operator.getOperatorConfig(), new MockEnvironmentBuilder().build())); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 64444556565e..a6605d95a052 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -72,6 +72,10 @@ include(":runners:flink:1.11:job-server-container") include(":runners:flink:1.12") include(":runners:flink:1.12:job-server") include(":runners:flink:1.12:job-server-container") +// Flink 1.13 +include(":runners:flink:1.13") +include(":runners:flink:1.13:job-server") +include(":runners:flink:1.13:job-server-container") /* End Flink Runner related settings */ include(":runners:twister2") include(":runners:google-cloud-dataflow-java")