Skip to content

Commit

Permalink
Merge pull request apache#17662:[BEAM-14334] Remove remaining forkEve…
Browse files Browse the repository at this point in the history
…ry 1 from all Spark tests and stop mixing unit tests with runner validations.
  • Loading branch information
aromanenko-dev authored May 17, 2022
2 parents e65b4a3 + 9e1da7d commit 8fb55ef
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 65 deletions.
84 changes: 22 additions & 62 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ configurations {
examplesJavaIntegrationTest
}

def sparkTestProperties(overrides = [:]) {
def defaults = ["--runner": "TestSparkRunner"]
[
"spark.sql.shuffle.partitions": "4",
"spark.ui.enabled" : "false",
"spark.ui.showConsoleProgress": "false",
"beamTestPipelineOptions" :
JsonOutput.toJson((defaults + overrides).collect { k, v -> "$k=$v" })
]
}

def hadoopVersions = [
"285" : "2.8.5",
"292" : "2.9.2",
Expand Down Expand Up @@ -94,14 +105,7 @@ if (copySourceBase) {
}

test {
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperty "beamTestPipelineOptions", """[
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=true"
]"""
systemProperties sparkTestProperties()
systemProperty "log4j.configuration", "log4j-test.properties"
// Change log level to debug:
// systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
Expand All @@ -113,10 +117,6 @@ test {
}

maxParallelForks 4
useJUnit {
excludeCategories "org.apache.beam.runners.spark.StreamingTest"
excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery"
}

// easily re-run all tests (to deal with flaky tests / SparkContext leaks)
if(project.hasProperty("rerun-tests")) { outputs.upToDateWhen {false} }
Expand Down Expand Up @@ -218,29 +218,17 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=false",
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"])

classpath = configurations.validatesRunner
testClassesDirs = files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)
testClassesDirs += files(project.sourceSets.test.output.classesDirs)

// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
includeCategories 'org.apache.beam.runners.spark.UsesCheckpointRecovery'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
Expand Down Expand Up @@ -293,15 +281,7 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=SparkStructuredStreamingRunner",
"--testMode=true",
"--streaming=false",
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperties sparkTestProperties(["--runner":"SparkStructuredStreamingRunner", "--testMode":"true"])

classpath = configurations.validatesRunner
testClassesDirs = files(
Expand All @@ -310,8 +290,6 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) {
)
testClassesDirs += files(project.sourceSets.test.output.classesDirs)

// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
// Increase memory heap in order to avoid OOM errors
jvmArgs '-Xmx7g'
Expand Down Expand Up @@ -373,27 +351,18 @@ createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'Spark')

tasks.register("hadoopVersionsTest") {
group = "Verification"
def taskNames = hadoopVersions.keySet().stream()
.map { num -> "hadoopVersion${num}Test" }
.collect(Collectors.toList())
dependsOn taskNames
dependsOn hadoopVersions.collect{k,v -> "hadoopVersion${k}Test"}
}

tasks.register("examplesIntegrationTest", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=TestSparkRunner",
"--enableSparkMetricSinks=true",
"--tempLocation=${tempLocation}",
"--tempRoot=${tempLocation}",
"--project=${gcpProject}",
systemProperties sparkTestProperties([
"--tempLocation": "${tempLocation}",
"--tempRoot" : "${tempLocation}",
"--project" : "${gcpProject}"
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"

include '**/*IT.class'
maxParallelForks 4
Expand All @@ -414,19 +383,10 @@ hadoopVersions.each { kv ->
group = "Verification"
description = "Runs Spark tests with Hadoop version $kv.value"
classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperty "beamTestPipelineOptions", """[
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=true"
]"""
// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
systemProperties sparkTestProperties()

include "**/*Test.class"
maxParallelForks 4
useJUnit {
excludeCategories "org.apache.beam.runners.spark.StreamingTest"
excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.runners.spark.SparkContextRule;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -81,6 +82,7 @@ public void testInBatchMode() throws Exception {
@Category(StreamingTest.class)
@Test
public void testInStreamingMode() throws Exception {
pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true);
assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));

Instant instant = new Instant(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.runners.core.metrics.TestMetricsSink;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.Counter;
Expand Down Expand Up @@ -68,6 +69,8 @@ public void init() {
@Category(StreamingTest.class)
@Test
public void testInStreamingMode() throws Exception {
pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true);

Instant instant = new Instant(0);
CreateStream<Integer> source =
CreateStream.of(VarIntCoder.of(), batchDuration())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
Expand Down Expand Up @@ -84,7 +86,7 @@
@Category(StreamingTest.class)
public class CreateStreamTest implements Serializable {

@Rule public final transient TestPipeline p = TestPipeline.create();
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(streamingOptions());
@Rule public final transient ExpectedException thrown = ExpectedException.none();

@Test
Expand Down Expand Up @@ -524,4 +526,10 @@ public void process(ProcessContext context) {
context.output(element);
}
}

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand All @@ -52,7 +54,7 @@ public class SparkCoGroupByKeyStreamingTest {
private static final TupleTag<Integer> INPUT1_TAG = new TupleTag<>("input1");
private static final TupleTag<Integer> INPUT2_TAG = new TupleTag<>("input2");

@Rule public final TestPipeline pipeline = TestPipeline.create();
@Rule public final TestPipeline pipeline = TestPipeline.fromOptions(streamingOptions());

private Duration batchDuration() {
return Duration.millis(
Expand Down Expand Up @@ -165,4 +167,10 @@ public void testInStreamingMode() throws Exception {
});
pipeline.run();
}

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.Serializable;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Source;
Expand All @@ -33,6 +34,7 @@
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -47,7 +49,7 @@ public class StreamingSourceMetricsTest implements Serializable {
private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName();

// Force streaming pipeline using pipeline rule.
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Rule public final transient TestPipeline pipeline = TestPipeline.fromOptions(streamingOptions());

@Test
@Category(StreamingTest.class)
Expand Down Expand Up @@ -87,4 +89,10 @@ public void testUnboundedSourceMetrics() {
greaterThanOrEqualTo(minElements),
false)));
}

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
return options;
}
}

0 comments on commit 8fb55ef

Please sign in to comment.