diff --git a/.test-infra/jenkins/CommonTestProperties.groovy b/.test-infra/jenkins/CommonTestProperties.groovy index 01ad3b513b97..d7be46453011 100644 --- a/.test-infra/jenkins/CommonTestProperties.groovy +++ b/.test-infra/jenkins/CommonTestProperties.groovy @@ -26,7 +26,7 @@ class CommonTestProperties { } static String getFlinkVersion() { - return "1.12" + return "1.13" } enum Runner { diff --git a/CHANGES.md b/CHANGES.md index 343d0f050e53..2b0781e74a91 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ ## New Features / Improvements * `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and `AGGREGATE` are now reserved keywords. ([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)). +* Flink 1.13 is now supported by the Flink runner ([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)). * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). ## Breaking Changes diff --git a/gradle.properties b/gradle.properties index a5c5195bb377..953086f0c5f8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -32,5 +32,5 @@ javaVersion=1.8 docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ -flink_versions=1.10,1.11,1.12 +flink_versions=1.10,1.11,1.12,1.13 diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle new file mode 100644 index 000000000000..ec865d7c5efb --- /dev/null +++ b/runners/flink/1.13/build.gradle @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '..' +/* All properties required for loading the Flink build script */ +project.ext { + // Set the version of all Flink-related dependencies here. + flink_version = '1.13.0' + // Version specific code overrides. + main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", "${basePath}/1.12/src/test/java", './src/test/java'] + main_resources_overrides = [] + test_resources_overrides = [] + archives_base_name = 'beam-runners-flink-1.13' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.13/job-server-container/build.gradle b/runners/flink/1.13/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/1.13/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.13/job-server/build.gradle b/runners/flink/1.13/job-server/build.gradle new file mode 100644 index 000000000000..a7e6fd6eb599 --- /dev/null +++ b/runners/flink/1.13/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.13-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java new file mode 100644 index 000000000000..2707395b194c --- /dev/null +++ b/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. + * + *
This change is becauses RecordWriter is wrapped in RecordWriterDelegate in 1.10, please refer
+ * to https://github.com/apache/flink/commit/2c8b4ef572f05bf4740b7e204af1e5e709cd945c for more
+ * details.
+ */
+public class StreamSources {
+
+ /**
+ * Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has
+ * been removed in Flink 1.12.
+ *
+ * @param source Source to get single input from.
+ * @return Input transformation.
+ */
+ public static Transformation> getOnlyInput(OneInputTransformation, ?> source) {
+ return Iterables.getOnlyElement(source.getInputs());
+ }
+
+ public static Artifact Id
-
+≥ 2.30.0
+ ≥ 2.31.0
+ 1.13.x *
+ beam-runners-flink-1.13
+
+
+1.12.x *
+ beam-runners-flink-1.12
+
+
+1.11.x *
+ beam-runners-flink-1.11
+
+
+1.10.x
+ beam-runners-flink-1.10
+
+
2.30.0
1.12.x *
beam-runners-flink-1.12