Skip to content

Commit

Permalink
Merge pull request #129 from yananhao12/flink_option_config_map
Browse files Browse the repository at this point in the history
add config map to FlinkPipelineOptions
yananhao12 authored Oct 10, 2024
2 parents 165193a + 9b29cb2 commit ccfc023
Showing 4 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.28'
project.version = '2.45.29'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.28
sdk_version=2.45.28
version=2.45.29
sdk_version=2.45.29

javaVersion=1.8

Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
@@ -77,7 +78,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment(

// Although Flink uses Rest, it expects the address not to contain a http scheme
String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster());
Configuration flinkConfiguration = getFlinkConfiguration(confDir);
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap());
ExecutionEnvironment flinkBatchEnv;

// depending on the master, create the right environment.
@@ -163,7 +164,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(

// Although Flink uses Rest, it expects the address not to contain a http scheme
String masterUrl = stripHttpSchema(options.getFlinkMaster());
Configuration flinkConfiguration = getFlinkConfiguration(confDir);
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap());
StreamExecutionEnvironment flinkStreamEnv;

// depending on the master, create the right environment.
@@ -376,10 +377,19 @@ private static int determineParallelism(
return 1;
}

private static Configuration getFlinkConfiguration(@Nullable String flinkConfDir) {
return flinkConfDir == null || flinkConfDir.isEmpty()
? GlobalConfiguration.loadConfiguration()
: GlobalConfiguration.loadConfiguration(flinkConfDir);
private static Configuration getFlinkConfiguration(
@Nullable String flinkConfDir, @Nullable Map<String, String> flinkConfMap) {
Configuration dynamicProperties = null;
if (flinkConfMap != null && !flinkConfMap.isEmpty()) {
dynamicProperties = Configuration.fromMap(flinkConfMap);
}
if (flinkConfDir != null && !flinkConfDir.isEmpty()) {
return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
} else if (dynamicProperties != null) {
return GlobalConfiguration.loadConfiguration(dynamicProperties);
} else {
return GlobalConfiguration.loadConfiguration();
}
}

private static void applyLatencyTrackingInterval(
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink;

import java.util.Map;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -305,6 +306,11 @@ public interface FlinkPipelineOptions

void setFlinkConfDir(String confDir);

@Description("Map containing Flink configurations")
Map<String, String> getFlinkConfMap();

void setFlinkConfMap(Map<String, String> flinkConfMap);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}

0 comments on commit ccfc023

Please sign in to comment.