Skip to content

Commit

Permalink
[BEAM-12277] Add flink 1.13 build target.
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvk authored and iemejia committed May 28, 2021
1 parent 31988c8 commit b342805
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 1 deletion.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ javaVersion=1.8
docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

flink_versions=1.10,1.11,1.12
flink_versions=1.10,1.11,1.12,1.13

33 changes: 33 additions & 0 deletions runners/flink/1.13/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '..'
/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '1.13.0'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", "${basePath}/1.12/src/main/java", './src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-flink-1.13'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
26 changes: 26 additions & 0 deletions runners/flink/1.13/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
31 changes: 31 additions & 0 deletions runners/flink/1.13/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.13-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;

import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

/**
* {@link StreamSource} utilities, that bridge incompatibilities between Flink releases.
*
* <p>This change is becauses RecordWriter is wrapped in RecordWriterDelegate in 1.10, please refer
* to https://github.com/apache/flink/commit/2c8b4ef572f05bf4740b7e204af1e5e709cd945c for more
* details.
*/
public class StreamSources {

/**
* Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has
* been removed in Flink 1.12.
*
* @param source Source to get single input from.
* @return Input transformation.
*/
public static Transformation<?> getOnlyInput(OneInputTransformation<?, ?> source) {
return Iterables.getOnlyElement(source.getInputs());
}

public static <OutT, SrcT extends SourceFunction<OutT>> void run(
StreamSource<OutT, SrcT> streamSource,
Object lockingObject,
StreamStatusMaintainer streamStatusMaintainer,
Output<StreamRecord<OutT>> collector)
throws Exception {
streamSource.run(
lockingObject, streamStatusMaintainer, collector, createOperatorChain(streamSource));
}

private static OperatorChain<?, ?> createOperatorChain(AbstractStreamOperator<?> operator) {
return new OperatorChain<>(
operator.getContainingTask(),
StreamTask.createRecordWriterDelegate(
operator.getOperatorConfig(), new MockEnvironmentBuilder().build()));
}
}
4 changes: 4 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ include(":runners:flink:1.11:job-server-container")
include(":runners:flink:1.12")
include(":runners:flink:1.12:job-server")
include(":runners:flink:1.12:job-server-container")
// Flink 1.13
include(":runners:flink:1.13")
include(":runners:flink:1.13:job-server")
include(":runners:flink:1.13:job-server-container")
/* End Flink Runner related settings */
include(":runners:twister2")
include(":runners:google-cloud-dataflow-java")
Expand Down

0 comments on commit b342805

Please sign in to comment.