Skip to content

Commit

Permalink
Merge pull request #138 from MabelYC/revert
Browse files Browse the repository at this point in the history
revert changes and bump version
  • Loading branch information
yananhao12 authored Jan 16, 2025
2 parents eae8179 + eec6468 commit 47669f1
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"JavaTestProperties": {
"SUPPORTED_VERSIONS": ["8", "11", "17", "21"],
"FLINK_VERSIONS": ["1.16", "1.17", "1.18"],
"FLINK_VERSIONS": ["1.15", "1.16", "1.17", "1.18"],
"SPARK_VERSIONS": ["2", "3"]
},
"GoTestProperties": {
Expand Down
5 changes: 2 additions & 3 deletions auto-elr/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ dependencies {
implementation project(":runners:core-java")
implementation project(":runners:java-fn-execution")
implementation project(":runners:samza")
implementation project(":runners:flink:1.15")
implementation project(":runners:flink:1.16")
implementation project(":runners:flink:1.17")
implementation project(":runners:flink:1.18")
implementation project(":runners:spark:2")
implementation project(":runners:spark:3")
Expand Down Expand Up @@ -60,9 +60,8 @@ tasks.all { task ->
task.mustRunAfter(":runners:core-java:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:java-fn-execution:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:samza:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:flink:1.15:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:flink:1.16:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:flink:1.17:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:flink:1.18:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:spark:2:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:spark:3:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:portability:java:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
Expand Down
18 changes: 12 additions & 6 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,16 @@ tasks.register("javaPreCommit") {
dependsOn(":runners:core-java:build")
dependsOn(":runners:direct-java:build")
dependsOn(":runners:extensions-java:metrics:build")
dependsOn(":runners:flink:1.12:build")
dependsOn(":runners:flink:1.12:job-server:build")
dependsOn(":runners:flink:1.13:build")
dependsOn(":runners:flink:1.13:job-server:build")
dependsOn(":runners:flink:1.14:build")
dependsOn(":runners:flink:1.14:job-server:build")
dependsOn(":runners:flink:1.15:build")
dependsOn(":runners:flink:1.15:job-server:build")
dependsOn(":runners:flink:1.16:build")
dependsOn(":runners:flink:1.16:job-server:build")
dependsOn(":runners:flink:1.17:build")
dependsOn(":runners:flink:1.17:job-server:build")
dependsOn(":runners:flink:1.18:build")
dependsOn(":runners:flink:1.18:job-server:build")
dependsOn(":runners:google-cloud-dataflow-java:build")
dependsOn(":runners:google-cloud-dataflow-java:examples-streaming:build")
dependsOn(":runners:google-cloud-dataflow-java:examples:build")
Expand Down Expand Up @@ -335,9 +339,11 @@ tasks.register("javaPostCommit") {

tasks.register("javaPostCommitSickbay") {
dependsOn(":runners:samza:validatesRunnerSickbay")
dependsOn(":runners:flink:1.12:validatesRunnerSickbay")
dependsOn(":runners:flink:1.13:validatesRunnerSickbay")
dependsOn(":runners:flink:1.14:validatesRunnerSickbay")
dependsOn(":runners:flink:1.15:validatesRunnerSickbay")
dependsOn(":runners:flink:1.16:validatesRunnerSickbay")
dependsOn(":runners:flink:1.17:validatesRunnerSickbay")
dependsOn(":runners:flink:1.18:validatesRunnerSickbay")
dependsOn(":runners:spark:2:job-server:validatesRunnerSickbay")
dependsOn(":runners:spark:3:job-server:validatesRunnerSickbay")
dependsOn(":runners:direct-java:validatesRunnerSickbay")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.34'
project.version = '2.45.35'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.34
sdk_version=2.45.34
version=2.45.35
sdk_version=2.45.35

javaVersion=1.8

docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

# supported flink versions
flink_versions=1.16,1.17,1.18
flink_versions=1.15,1.16,1.17,1.18
# supported python versions
python_versions=3.8,3.9,3.10,3.11
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Additionally, you can read [here](https://beam.apache.org/documentation/runners/
#### Run example

##### Portable
1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.16`, `Flink 1.17`, `Flink 1.18`.
1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.15`, `Flink 1.16`, `Flink 1.17`, `Flink 1.18`.
2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.10_job_server:latest`
3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.expansion.ExternalConfigRegistrar;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -79,7 +78,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment(

// Although Flink uses Rest, it expects the address not to contain a http scheme
String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster());
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options);
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap());
ExecutionEnvironment flinkBatchEnv;

// depending on the master, create the right environment.
Expand Down Expand Up @@ -165,7 +164,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(

// Although Flink uses Rest, it expects the address not to contain a http scheme
String masterUrl = stripHttpSchema(options.getFlinkMaster());
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options);
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap());
StreamExecutionEnvironment flinkStreamEnv;

// depending on the master, create the right environment.
Expand Down Expand Up @@ -378,13 +377,10 @@ private static int determineParallelism(
return 1;
}

@VisibleForTesting
static Configuration getFlinkConfiguration(
@Nullable String flinkConfDir, FlinkPipelineOptions flinkPipelineOptions) {
private static Configuration getFlinkConfiguration(
@Nullable String flinkConfDir, @Nullable Map<String, String> flinkConfMap) {
Configuration dynamicProperties = null;
final Map<String, String> flinkConfMap = flinkPipelineOptions.getFlinkConfMap();
flinkConfMap.putAll(ExternalConfigRegistrar.getConfig(flinkPipelineOptions));
if (!flinkConfMap.isEmpty()) {
if (flinkConfMap != null && !flinkConfMap.isEmpty()) {
dynamicProperties = Configuration.fromMap(flinkConfMap);
}
if (flinkConfDir != null && !flinkConfDir.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
*/
package org.apache.beam.runners.flink;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -309,19 +307,10 @@ public interface FlinkPipelineOptions
void setFlinkConfDir(String confDir);

@Description("Map containing Flink configurations")
@Default.InstanceFactory(FlinkConfMapFactory.class)
Map<String, String> getFlinkConfMap();

void setFlinkConfMap(Map<String, String> flinkConfMap);

/** Returns an empty map, to avoid handling null. */
class FlinkConfMapFactory implements DefaultValueFactory<Map<String, String>> {
@Override
public Map<String, String> create(PipelineOptions options) {
return new HashMap<>();
}
}

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
*/
package org.apache.beam.runners.flink;

import static org.apache.beam.runners.flink.FlinkExecutionEnvironments.getFlinkConfiguration;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
Expand All @@ -34,8 +31,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
Expand Down Expand Up @@ -498,22 +493,6 @@ public void shouldCreateRocksDbStateBackend() {
assertThat(sev.getStateBackend(), instanceOf(RocksDBStateBackend.class));
}

@Test
public void testGetFlinkConfiguration() {
Configuration configuration = getFlinkConfiguration(null, getDefaultPipelineOptions());
assertNotNull(configuration);
}

@Test
public void testGetFlinkConfigurationWithConfigMap() {
FlinkPipelineOptions options = getDefaultPipelineOptions();
options.setFlinkConfMap(
new HashMap<>(ImmutableMap.<String, String>builder().put("mapKey", "mapValue").build()));
Configuration configuration = getFlinkConfiguration(null, options);
assertTrue(configuration.containsKey("mapKey"));
assertEquals("mapValue", configuration.getString("mapKey", ""));
}

private void checkHostAndPort(Object env, String expectedHost, int expectedPort) {
String host =
((Configuration) Whitebox.getInternalState(env, "configuration"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.beam.runners.samza.container.BeamJobCoordinatorRunner;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.util.ConfigUtils;
import org.apache.beam.sdk.expansion.ExternalConfigRegistrar;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.config.ApplicationConfig;
Expand Down Expand Up @@ -175,7 +174,6 @@ private static Map<String, String> createUserConfig(SamzaPipelineOptions options
if (options.getConfigOverride() != null) {
config.putAll(options.getConfigOverride());
}
config.putAll(ExternalConfigRegistrar.getConfig(options));

return config;
}
Expand Down

This file was deleted.

4 changes: 4 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ include(":runners:extensions-java:metrics")
* verify versions in website/www/site/content/en/documentation/runners/flink.md
* verify version in sdks/python/apache_beam/runners/interactive/interactive_beam.py
*/
// Flink 1.15
include(":runners:flink:1.15")
include(":runners:flink:1.15:job-server")
include(":runners:flink:1.15:job-server-container")
// Flink 1.16
include(":runners:flink:1.16")
include(":runners:flink:1.16:job-server")
Expand Down

0 comments on commit 47669f1

Please sign in to comment.