From 9e1da7d8f9f1c6783c3474a28a9ddcf17dfc9fac Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 13 May 2022 16:16:56 +0200 Subject: [PATCH] [BEAM-14334] Remove remaining forkEvery 1 from all Spark tests and stop mixing unit tests with runner validations. --- runners/spark/spark_runner.gradle | 84 +++++-------------- .../metrics/sink/SparkMetricsSinkTest.java | 2 + .../spark/metrics/SparkMetricsPusherTest.java | 3 + .../streaming/CreateStreamTest.java | 10 ++- .../SparkCoGroupByKeyStreamingTest.java | 10 ++- .../streaming/StreamingSourceMetricsTest.java | 10 ++- 6 files changed, 54 insertions(+), 65 deletions(-) diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index 138b823a77e7..a826f337fb74 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -48,6 +48,17 @@ configurations { examplesJavaIntegrationTest } +def sparkTestProperties(overrides = [:]) { + def defaults = ["--runner": "TestSparkRunner"] + [ + "spark.sql.shuffle.partitions": "4", + "spark.ui.enabled" : "false", + "spark.ui.showConsoleProgress": "false", + "beamTestPipelineOptions" : + JsonOutput.toJson((defaults + overrides).collect { k, v -> "$k=$v" }) + ] +} + def hadoopVersions = [ "285" : "2.8.5", "292" : "2.9.2", @@ -94,14 +105,7 @@ if (copySourceBase) { } test { - systemProperty "spark.sql.shuffle.partitions", "4" - systemProperty "spark.ui.enabled", "false" - systemProperty "spark.ui.showConsoleProgress", "false" - systemProperty "beamTestPipelineOptions", """[ - "--runner=TestSparkRunner", - "--streaming=false", - "--enableSparkMetricSinks=true" - ]""" + systemProperties sparkTestProperties() systemProperty "log4j.configuration", "log4j-test.properties" // Change log level to debug: // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug" @@ -113,10 +117,6 @@ test { } maxParallelForks 4 - useJUnit { - excludeCategories "org.apache.beam.runners.spark.StreamingTest" - excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" - } // easily re-run all tests (to deal with flaky tests / SparkContext leaks) if(project.hasProperty("rerun-tests")) { outputs.upToDateWhen {false} } @@ -218,29 +218,17 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) { group = "Verification" // Disable gradle cache outputs.upToDateWhen { false } - def pipelineOptions = JsonOutput.toJson([ - "--runner=TestSparkRunner", - "--streaming=false", - "--enableSparkMetricSinks=false", - ]) - systemProperty "beamTestPipelineOptions", pipelineOptions - systemProperty "beam.spark.test.reuseSparkContext", "true" - systemProperty "spark.ui.enabled", "false" - systemProperty "spark.ui.showConsoleProgress", "false" + systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"]) classpath = configurations.validatesRunner testClassesDirs = files( project(":sdks:java:core").sourceSets.test.output.classesDirs, project(":runners:core-java").sourceSets.test.output.classesDirs, ) - testClassesDirs += files(project.sourceSets.test.output.classesDirs) - // Only one SparkContext may be running in a JVM (SPARK-2243) - forkEvery 1 maxParallelForks 4 useJUnit { includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' - includeCategories 'org.apache.beam.runners.spark.UsesCheckpointRecovery' // Should be run only in a properly configured SDK harness environment excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' @@ -293,15 +281,7 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) { group = "Verification" // Disable gradle cache outputs.upToDateWhen { false } - def pipelineOptions = JsonOutput.toJson([ - "--runner=SparkStructuredStreamingRunner", - "--testMode=true", - "--streaming=false", - ]) - systemProperty "beamTestPipelineOptions", pipelineOptions - systemProperty "spark.sql.shuffle.partitions", "4" - systemProperty "spark.ui.enabled", "false" - systemProperty "spark.ui.showConsoleProgress", "false" + systemProperties sparkTestProperties(["--runner":"SparkStructuredStreamingRunner", "--testMode":"true"]) classpath = configurations.validatesRunner testClassesDirs = files( @@ -310,8 +290,6 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) { ) testClassesDirs += files(project.sourceSets.test.output.classesDirs) - // Only one SparkContext may be running in a JVM (SPARK-2243) - forkEvery 1 maxParallelForks 4 // Increase memory heap in order to avoid OOM errors jvmArgs '-Xmx7g' @@ -373,27 +351,18 @@ createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'Spark') tasks.register("hadoopVersionsTest") { group = "Verification" - def taskNames = hadoopVersions.keySet().stream() - .map { num -> "hadoopVersion${num}Test" } - .collect(Collectors.toList()) - dependsOn taskNames + dependsOn hadoopVersions.collect{k,v -> "hadoopVersion${k}Test"} } tasks.register("examplesIntegrationTest", Test) { group = "Verification" // Disable gradle cache outputs.upToDateWhen { false } - def pipelineOptions = JsonOutput.toJson([ - "--runner=TestSparkRunner", - "--enableSparkMetricSinks=true", - "--tempLocation=${tempLocation}", - "--tempRoot=${tempLocation}", - "--project=${gcpProject}", + systemProperties sparkTestProperties([ + "--tempLocation": "${tempLocation}", + "--tempRoot" : "${tempLocation}", + "--project" : "${gcpProject}" ]) - systemProperty "beamTestPipelineOptions", pipelineOptions - systemProperty "beam.spark.test.reuseSparkContext", "true" - systemProperty "spark.ui.enabled", "false" - systemProperty "spark.ui.showConsoleProgress", "false" include '**/*IT.class' maxParallelForks 4 @@ -414,19 +383,10 @@ hadoopVersions.each { kv -> group = "Verification" description = "Runs Spark tests with Hadoop version $kv.value" classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath - systemProperty "beam.spark.test.reuseSparkContext", "true" - systemProperty "spark.sql.shuffle.partitions", "4" - systemProperty "spark.ui.enabled", "false" - systemProperty "spark.ui.showConsoleProgress", "false" - systemProperty "beamTestPipelineOptions", """[ - "--runner=TestSparkRunner", - "--streaming=false", - "--enableSparkMetricSinks=true" - ]""" - // Only one SparkContext may be running in a JVM (SPARK-2243) - forkEvery 1 - maxParallelForks 4 + systemProperties sparkTestProperties() + include "**/*Test.class" + maxParallelForks 4 useJUnit { excludeCategories "org.apache.beam.runners.spark.StreamingTest" excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java index f21168336d02..0d067b53eb24 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -24,6 +24,7 @@ import org.apache.beam.runners.spark.SparkContextRule; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; +import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -81,6 +82,7 @@ public void testInBatchMode() throws Exception { @Category(StreamingTest.class) @Test public void testInStreamingMode() throws Exception { + pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true); assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); Instant instant = new Instant(0); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java index aa7ab616ecd8..deb7e0cd5dcc 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.core.metrics.TestMetricsSink; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; +import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.metrics.Counter; @@ -68,6 +69,8 @@ public void init() { @Category(StreamingTest.class) @Test public void testInStreamingMode() throws Exception { + pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true); + Instant instant = new Instant(0); CreateStream source = CreateStream.of(VarIntCoder.of(), batchDuration()) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index 8fde97456227..d4721cd3d823 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -31,11 +31,13 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; +import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; @@ -84,7 +86,7 @@ @Category(StreamingTest.class) public class CreateStreamTest implements Serializable { - @Rule public final transient TestPipeline p = TestPipeline.create(); + @Rule public final transient TestPipeline p = TestPipeline.fromOptions(streamingOptions()); @Rule public final transient ExpectedException thrown = ExpectedException.none(); @Test @@ -524,4 +526,10 @@ public void process(ProcessContext context) { context.output(element); } } + + private static PipelineOptions streamingOptions() { + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + return options; + } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java index fc4e427e2f30..39203d602645 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java @@ -24,9 +24,11 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; +import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -52,7 +54,7 @@ public class SparkCoGroupByKeyStreamingTest { private static final TupleTag INPUT1_TAG = new TupleTag<>("input1"); private static final TupleTag INPUT2_TAG = new TupleTag<>("input2"); - @Rule public final TestPipeline pipeline = TestPipeline.create(); + @Rule public final TestPipeline pipeline = TestPipeline.fromOptions(streamingOptions()); private Duration batchDuration() { return Duration.millis( @@ -165,4 +167,10 @@ public void testInStreamingMode() throws Exception { }); pipeline.run(); } + + private static PipelineOptions streamingOptions() { + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + return options; + } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java index 75dccadca451..ac377cf596b0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java @@ -25,6 +25,7 @@ import java.io.Serializable; import org.apache.beam.runners.spark.StreamingTest; +import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Source; @@ -33,6 +34,7 @@ import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.SourceMetrics; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; @@ -47,7 +49,7 @@ public class StreamingSourceMetricsTest implements Serializable { private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName(); // Force streaming pipeline using pipeline rule. - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public final transient TestPipeline pipeline = TestPipeline.fromOptions(streamingOptions()); @Test @Category(StreamingTest.class) @@ -87,4 +89,10 @@ public void testUnboundedSourceMetrics() { greaterThanOrEqualTo(minElements), false))); } + + private static PipelineOptions streamingOptions() { + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + return options; + } }