Skip to content

Commit

Permalink
Use Spark 3 job-server as default Spark job-server for PortableRunner…
Browse files Browse the repository at this point in the history
… (addresses #23728)
  • Loading branch information
Moritz Mack committed Oct 20, 2022
1 parent 3ab9507 commit 7669a0b
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
## Breaking Changes

* Python SDK CoGroupByKey outputs an iterable allowing for arbitrarily large results. [#21556](https://github.com/apache/beam/issues/21556) Beam users may see an error on transforms downstream from CoGroupByKey. Users must change methods expecting a List to expect an Iterable going forward. See [document](https://docs.google.com/document/d/1RIzm8-g-0CyVsPb6yasjwokJQFoKHG4NjRUcKHKINu0) for information and fixes.
* The PortableRunner for Spark assumes Spark 3 as default Spark major version unless configured otherwise using `--spark_version`.
Spark 2 support is deprecated and will be removed soon ([#23728](https://github.com/apache/beam/issues/23728)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1919,9 +1919,7 @@ class BeamModulePlugin implements Plugin<Project> {
}

if (runner?.equalsIgnoreCase('spark')) {
testRuntimeOnly it.project(path: ":runners:spark:2", configuration: "testRuntimeMigration")
testRuntimeOnly project.library.java.spark_core
testRuntimeOnly project.library.java.spark_streaming
testRuntimeOnly it.project(path: ":runners:spark:3", configuration: "testRuntimeMigration")

// Testing the Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
project.configurations.testRuntimeClasspath {
Expand Down Expand Up @@ -2679,7 +2677,7 @@ class BeamModulePlugin implements Plugin<Project> {
dependsOn = [installGcpTest]
mustRunAfter = [
":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar",
':runners:spark:2:job-server:shadowJar',
':runners:spark:3:job-server:shadowJar',
':sdks:python:container:py37:docker',
':sdks:python:container:py38:docker',
':sdks:python:container:py39:docker',
Expand All @@ -2695,7 +2693,7 @@ class BeamModulePlugin implements Plugin<Project> {
"--parallelism=2",
"--sdk_worker_parallelism=1",
"--flink_job_server_jar=${project.project(flinkJobServerProject).shadowJar.archivePath}",
"--spark_job_server_jar=${project.project(':runners:spark:2:job-server').shadowJar.archivePath}",
"--spark_job_server_jar=${project.project(':runners:spark:3:job-server').shadowJar.archivePath}",
]
if (isStreaming)
options += [
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ task sparkValidatesRunner {

dependsOn ":sdks:go:test:goBuild"
dependsOn ":sdks:java:container:java8:docker"
dependsOn ":runners:spark:2:job-server:shadowJar"
dependsOn ":runners:spark:3:job-server:shadowJar"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner spark",
"--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}",
"--spark_job_server_jar ${project(":runners:spark:3:job-server").shadowJar.archivePath}",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
Expand Down
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,9 +1497,10 @@ def _add_argparse_args(cls, parser):
'For example, http://hostname:6066')
parser.add_argument(
'--spark_version',
default='2',
choices=['2', '3'],
help='Spark major version to use.')
default='3',
choices=['3', '2'],
help='Spark major version to use. '
'Note, Spark 2 support is deprecated')


class TestOptions(PipelineOptions):
Expand Down
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/runners/portability/spark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ def path_to_jar(self):
'Unable to parse jar URL "%s". If using a full URL, make sure '
'the scheme is specified. If using a local file path, make sure '
'the file exists; you may have to first build the job server '
'using `./gradlew runners:spark:2:job-server:shadowJar`.' %
'using `./gradlew runners:spark:3:job-server:shadowJar`.' %
self._jar)
return self._jar
else:
if self._spark_version == '3':
return self.path_to_beam_jar(':runners:spark:3:job-server:shadowJar')
return self.path_to_beam_jar(
':runners:spark:2:job-server:shadowJar',
artifact_id='beam-runners-spark-job-server')
if self._spark_version == '2':
return self.path_to_beam_jar(
':runners:spark:2:job-server:shadowJar',
artifact_id='beam-runners-spark-job-server')
return self.path_to_beam_jar(':runners:spark:3:job-server:shadowJar')

def java_arguments(
self, job_port, artifact_port, expansion_port, artifacts_dir):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def parse_options(self, request):
self.set_spark_job_server_jar(
known_args.spark_job_server_jar or
job_server.JavaJarJobServer.path_to_beam_jar(
':runners:spark:2:job-server:shadowJar'))
':runners:spark:3:job-server:shadowJar'))
self.environment_type = known_args.environment_type
self.environment_options = known_args.environment_options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ def executable_jar(self):
'Unable to parse jar URL "%s". If using a full URL, make sure '
'the scheme is specified. If using a local file path, make sure '
'the file exists; you may have to first build the job server '
'using `./gradlew runners:spark:2:job-server:shadowJar`.' %
'using `./gradlew runners:spark:3:job-server:shadowJar`.' %
self._executable_jar)
url = self._executable_jar
else:
if self._spark_version == '3':
url = job_server.JavaJarJobServer.path_to_beam_jar(
':runners:spark:3:job-server:shadowJar')
else:
if self._spark_version == '2':
url = job_server.JavaJarJobServer.path_to_beam_jar(
':runners:spark:2:job-server:shadowJar',
artifact_id='beam-runners-spark-job-server')
else:
url = job_server.JavaJarJobServer.path_to_beam_jar(
':runners:spark:3:job-server:shadowJar')
return job_server.JavaJarJobServer.local_jar(url)

def create_beam_job(self, job_id, job_name, pipeline, options):
Expand Down
14 changes: 7 additions & 7 deletions sdks/python/test-suites/portable/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ task samzaValidatesRunner() {

def createSparkRunnerTestTask(String workerType) {
def taskName = "sparkCompatibilityMatrix${workerType}"
// `project(':runners:spark:2:job-server').shadowJar.archivePath` is not resolvable until runtime, so hard-code it here.
def jobServerJar = "${rootDir}/runners/spark/2/job-server/build/libs/beam-runners-spark-job-server-${version}.jar"
// `project(':runners:spark:3:job-server').shadowJar.archivePath` is not resolvable until runtime, so hard-code it here.
def jobServerJar = "${rootDir}/runners/spark/3/job-server/build/libs/beam-runners-spark-3-job-server-${version}.jar"
def options = "--spark_job_server_jar=${jobServerJar} --environment_type=${workerType}"
if (workerType == 'PROCESS') {
options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
}
def task = toxTask(taskName, 'spark-runner-test', options)
task.configure {
dependsOn ':runners:spark:2:job-server:shadowJar'
dependsOn ':runners:spark:3:job-server:shadowJar'
if (workerType == 'DOCKER') {
dependsOn pythonContainerTask
} else if (workerType == 'PROCESS') {
Expand Down Expand Up @@ -208,7 +208,7 @@ project.tasks.register("preCommitPy${pythonVersionSuffix}") {
project.tasks.register("postCommitPy${pythonVersionSuffix}") {
dependsOn = ['setupVirtualenv',
"postCommitPy${pythonVersionSuffix}IT",
':runners:spark:2:job-server:shadowJar',
':runners:spark:3:job-server:shadowJar',
'portableLocalRunnerJuliaSetWithSetupPy',
'portableWordCountSparkRunnerBatch',
'portableLocalRunnerTestWithRequirementsFile']
Expand Down Expand Up @@ -248,13 +248,13 @@ project.tasks.register("sparkExamples") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
':runners:spark:2:job-server:shadowJar'
':runners:spark:3:job-server:shadowJar'
]
doLast {
def testOpts = [
"--log-cli-level=INFO",
]
def jobServerJar = "${rootDir}/runners/spark/2/job-server/build/libs/beam-runners-spark-job-server-${version}.jar"
def jobServerJar = "${rootDir}/runners/spark/2/job-server/build/libs/beam-runners-spark-3-job-server-${version}.jar"
def pipelineOpts = [
"--runner=SparkRunner",
"--project=apache-beam-testing",
Expand Down Expand Up @@ -388,7 +388,7 @@ def addTestJavaJarCreator(String runner, Task jobServerJarTask) {

// TODO(BEAM-11333) Update and test multiple Flink versions.
addTestJavaJarCreator("FlinkRunner", tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar"))
addTestJavaJarCreator("SparkRunner", tasks.getByPath(":runners:spark:2:job-server:shadowJar"))
addTestJavaJarCreator("SparkRunner", tasks.getByPath(":runners:spark:3:job-server:shadowJar"))

def addTestFlinkUberJar(boolean saveMainSession) {
project.tasks.register("testUberJarFlinkRunner${saveMainSession ? 'SaveMainSession' : ''}") {
Expand Down
2 changes: 1 addition & 1 deletion website/www/site/content/en/documentation/runners/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ python -m apache_beam.examples.wordcount \
- `--runner`(required): `SparkRunner`.
- `--output_executable_path`(required): path for the bundle jar to be created.
- `--output`(required): where output shall be written.
- `--spark_version`(optional): select spark version 2 (default) or 3.
- `--spark_version`(optional): select spark version 3 (default) or 2 (deprecated!).

5. Submit spark job to Dataproc cluster's master node.

Expand Down

0 comments on commit 7669a0b

Please sign in to comment.