diff --git a/gradle.properties b/gradle.properties index 6bad220e641b..810baacfe0e9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.12,1.13,1.14,1.15,1.16 +flink_versions=1.12,1.13,1.14,1.15,1.16,1.17 # supported python versions python_versions=3.8,3.9,3.10,3.11 diff --git a/runners/flink/1.17/build.gradle b/runners/flink/1.17/build.gradle new file mode 100644 index 000000000000..2dea7b056053 --- /dev/null +++ b/runners/flink/1.17/build.gradle @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '..' + +/* All properties required for loading the Flink build script */ +project.ext { + // Set the version of all Flink-related dependencies here. + flink_version = '1.17.0' + // Version specific code overrides. + main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", "${basePath}/1.15/src/main/java", "${basePath}/1.16/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", "${basePath}/1.15/src/test/java", "${basePath}/1.16/src/test/java", './src/test/java'] + main_resources_overrides = [] + test_resources_overrides = [] + archives_base_name = 'beam-runners-flink-1.17' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.17/job-server-container/build.gradle b/runners/flink/1.17/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/1.17/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.17/job-server/build.gradle b/runners/flink/1.17/job-server/build.gradle new file mode 100644 index 000000000000..89915349ae9a --- /dev/null +++ b/runners/flink/1.17/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.17-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index 2195ecdf1ab7..526dc936d8da 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.flink.translation.types; -import java.io.EOFException; -import java.io.IOException; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; @@ -28,14 +26,17 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; import org.checkerframework.checker.nullness.qual.Nullable; +import java.io.EOFException; +import java.io.IOException; + /** * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for Beam {@link * org.apache.beam.sdk.coders.Coder Coders}. @@ -158,26 +159,81 @@ public TypeSerializerSnapshot snapshotConfiguration() { return new LegacySnapshot<>(this); } - /** A legacy snapshot which does not care about schema compatibility. */ - public static class LegacySnapshot extends TypeSerializerConfigSnapshot { + public static class LegacySnapshot implements TypeSerializerSnapshot { + + int CURRENT_VERSION = 2; + CoderTypeSerializer serializer; - /** Needs to be public to work with {@link VersionedIOReadableWritable}. */ - public LegacySnapshot() {} + public LegacySnapshot() { + } public LegacySnapshot(CoderTypeSerializer serializer) { - setPriorSerializer(serializer); + this.serializer = serializer; } @Override - public int getVersion() { - // We always return the same version - return 1; + public int getCurrentVersion() { + return CURRENT_VERSION; + } + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos(); + InstantiationUtil.serializeObject(streamWithPos, this.serializer); + out.writeInt(streamWithPos.getPosition()); + out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition()); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { + switch (readVersion) { + case 1: + throw new UnsupportedOperationException( + String.format("No longer supported version [%d].", readVersion)); + case 2: + try { + int serializerBytes = in.readInt(); + byte[] buffer = new byte[serializerBytes]; + in.readFully(buffer); + this.serializer = InstantiationUtil.deserializeObject(buffer, userCodeClassLoader); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + break; + default: + throw new IllegalArgumentException("Unrecognized version: " + readVersion); + } } @Override - public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( - TypeSerializer newSerializer) { - // We assume compatibility because we don't have a way of checking schema compatibility + public TypeSerializer restoreSerializer() { + if (serializer == null) { + throw new IllegalStateException( + "Trying to restore the prior serializer but the prior serializer has not been set."); + } + return this.serializer; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + if (newSerializer.getClass() != this.getClass().getDeclaringClass()) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + CoderTypeSerializer coderTypeSerializer = (CoderTypeSerializer) newSerializer; + + if (this.serializer == null) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + if (!this.serializer.coder.getEncodedTypeDescriptor().getType().getTypeName().equals(coderTypeSerializer.coder.getEncodedTypeDescriptor().getType().getTypeName())) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + if (!this.serializer.pipelineOptions.toString().equals(coderTypeSerializer.pipelineOptions.toString())) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index 3c4e43bd339f..1e3ef97c775a 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -74,7 +74,9 @@ private void testWriteAndReadConfigSnapshot(Coder coder) throws IOExcept TypeSerializerSnapshot readSnapshot = new CoderTypeSerializer.LegacySnapshot(); readSnapshot.readSnapshot( writtenSnapshot.getCurrentVersion(), outView.getInputView(), getClass().getClassLoader()); + CoderTypeSerializer restoreSerializer = (CoderTypeSerializer) readSnapshot.restoreSerializer(); - assertThat(readSnapshot.restoreSerializer(), is(serializer)); + assertThat(restoreSerializer, is(serializer)); + assertThat("TypeSerializerSchemaCompatibility should be compatible", writtenSnapshot.resolveSchemaCompatibility(restoreSerializer).isCompatibleAsIs() == true); } } diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 3fbf7eff7dd6..414cda47c4f5 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1508,7 +1508,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16'] + PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16', '1.17'] @classmethod def _add_argparse_args(cls, parser): diff --git a/settings.gradle.kts b/settings.gradle.kts index f4901d7df92b..05479f046edb 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -146,6 +146,10 @@ include(":runners:flink:1.15:job-server-container") include(":runners:flink:1.16") include(":runners:flink:1.16:job-server") include(":runners:flink:1.16:job-server-container") +// Flink 1.17 +include(":runners:flink:1.17") +include(":runners:flink:1.17:job-server") +include(":runners:flink:1.17:job-server-container") /* End Flink Runner related settings */ include(":runners:twister2") include(":runners:google-cloud-dataflow-java")