Skip to content

Commit

Permalink
[flink] add support for Flink 1.17 (apache#29939)
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik authored and minxhe committed Nov 22, 2024
1 parent 7c97428 commit 75715f9
Show file tree
Hide file tree
Showing 19 changed files with 461 additions and 60 deletions.
Empty file.
6 changes: 4 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ javaVersion=1.8
docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

flink_versions=1.12,1.13,1.14,1.15,1.16

# supported flink versions
flink_versions=1.12,1.13,1.14,1.15,1.16,1.17
# supported python versions
python_versions=3.8,3.9,3.10,3.11
12 changes: 2 additions & 10 deletions runners/flink/1.12/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,10 @@
* limitations under the License.
*/

def basePath = '..'
/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_major = '1.12'
flink_version = '1.12.7'
// Version specific code overrides.
main_source_overrides = ['./src/main/java']
test_source_overrides = ['./src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-flink-1.12'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
apply from: "../flink_runner.gradle"
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
})
public class CoderTypeSerializer<T> extends TypeSerializer<T> {

private static final long serialVersionUID = 7247319138941746449L;

private final Coder<T> coder;

/**
Expand Down Expand Up @@ -155,10 +157,13 @@ public int hashCode() {

@Override
public TypeSerializerSnapshot<T> snapshotConfiguration() {
return new LegacySnapshot<>(this);
return new UnversionedTypeSerializerSnapshot<>(this);
}

/** A legacy snapshot which does not care about schema compatibility. */
/**
* A legacy snapshot which does not care about schema compatibility. This is used only for state
* restore of state created by Beam 2.54.0 and below for Flink 1.16 and below.
*/
public static class LegacySnapshot<T> extends TypeSerializerConfigSnapshot<T> {

/** Needs to be public to work with {@link VersionedIOReadableWritable}. */
Expand All @@ -177,6 +182,7 @@ public int getVersion() {
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {

// We assume compatibility because we don't have a way of checking schema compatibility
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private void testWriteAndReadConfigSnapshot(Coder<String> coder) throws IOExcept
ComparatorTestBase.TestOutputView outView = new ComparatorTestBase.TestOutputView();
writtenSnapshot.writeSnapshot(outView);

TypeSerializerSnapshot readSnapshot = new CoderTypeSerializer.LegacySnapshot();
TypeSerializerSnapshot readSnapshot = new UnversionedTypeSerializerSnapshot();
readSnapshot.readSnapshot(
writtenSnapshot.getCurrentVersion(), outView.getInputView(), getClass().getClassLoader());

Expand Down
12 changes: 2 additions & 10 deletions runners/flink/1.13/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,10 @@
* limitations under the License.
*/

def basePath = '..'
/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_major = '1.13'
flink_version = '1.13.5'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.12/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.12/src/test/java", './src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-flink-1.13'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
apply from: "../flink_runner.gradle"
13 changes: 2 additions & 11 deletions runners/flink/1.14/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,10 @@
* limitations under the License.
*/

def basePath = '..'

/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_major = '1.14'
flink_version = '1.14.3'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", './src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-flink-1.14'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
apply from: "../flink_runner.gradle"
13 changes: 2 additions & 11 deletions runners/flink/1.15/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,10 @@
* limitations under the License.
*/

def basePath = '..'

/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_major = '1.15'
flink_version = '1.15.0'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", './src/test/java']
main_resources_overrides = []
test_resources_overrides = ["${basePath}/1.12/src/test/resources", "${basePath}/1.13/src/test/resources", "${basePath}/1.14/src/test/resources", './src/test/resources']
archives_base_name = 'beam-runners-flink-1.15'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
apply from: "../flink_runner.gradle"
15 changes: 3 additions & 12 deletions runners/flink/1.16/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,10 @@
* limitations under the License.
*/

def basePath = '..'

/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '11600.3'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", "${basePath}/1.15/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", "${basePath}/1.15/src/test/java", './src/test/java']
main_resources_overrides = []
test_resources_overrides = ["${basePath}/1.12/src/test/resources", "${basePath}/1.13/src/test/resources", "${basePath}/1.14/src/test/resources", "${basePath}/1.15/src/test/resources", './src/test/resources']
archives_base_name = 'beam-runners-flink-1.16'
flink_major = '1.16'
flink_version = '1.16.0'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
apply from: "../flink_runner.gradle"
25 changes: 25 additions & 0 deletions runners/flink/1.17/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

project.ext {
flink_major = '1.17'
flink_version = '1.17.0'
}

// Load the main build script which contains all build logic.
apply from: "../flink_runner.gradle"
26 changes: 26 additions & 0 deletions runners/flink/1.17/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
31 changes: 31 additions & 0 deletions runners/flink/1.17/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.17-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"
Loading

0 comments on commit 75715f9

Please sign in to comment.