Skip to content

Commit

Permalink
[BEAM-12094] Add Spark 3 to Python.
Browse files Browse the repository at this point in the history
  • Loading branch information
ibzib committed Jun 22, 2021
1 parent c0b8e65 commit 37e3fdf
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,8 @@ def _add_argparse_args(cls, parser):
'the execution.')
parser.add_argument(
'--spark_job_server_jar',
help='Path or URL to a Beam Spark jobserver jar.')
help='Path or URL to a Beam Spark job server jar. '
'Overrides --spark_version.')
parser.add_argument(
'--spark_submit_uber_jar',
default=False,
Expand All @@ -1327,6 +1328,11 @@ def _add_argparse_args(cls, parser):
help='URL for the Spark REST endpoint. '
'Only required when using spark_submit_uber_jar. '
'For example, http://hostname:6066')
parser.add_argument(
'--spark_version',
default='2',
choices=['2', '3'],
help='Spark major version to use.')


class TestOptions(PipelineOptions):
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/runners/portability/spark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(self, options):
options = options.view_as(pipeline_options.SparkRunnerOptions)
self._jar = options.spark_job_server_jar
self._master_url = options.spark_master_url
self._spark_version = options.spark_version

def path_to_jar(self):
if self._jar:
Expand All @@ -91,6 +92,8 @@ def path_to_jar(self):
self._jar)
return self._jar
else:
if self._spark_version == '3':
return self.path_to_beam_jar(':runners:spark:3:job-server:shadowJar')
return self.path_to_beam_jar(
':runners:spark:2:job-server:shadowJar',
artifact_id='beam-runners-spark-job-server')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ class SparkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
def __init__(self, rest_url, options):
super(SparkUberJarJobServer, self).__init__()
self._rest_url = rest_url
self._executable_jar = (
options.view_as(
pipeline_options.SparkRunnerOptions).spark_job_server_jar)
self._artifact_port = (
options.view_as(pipeline_options.JobServerOptions).artifact_port)
self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-spark')
spark_options = options.view_as(pipeline_options.SparkRunnerOptions)
self._executable_jar = spark_options.spark_job_server_jar
self._spark_version = spark_options.spark_version

def start(self):
return self
Expand All @@ -73,9 +73,13 @@ def executable_jar(self):
self._executable_jar)
url = self._executable_jar
else:
url = job_server.JavaJarJobServer.path_to_beam_jar(
':runners:spark:2:job-server:shadowJar',
artifact_id='beam-runners-spark-job-server')
if self._spark_version == '3':
url = job_server.JavaJarJobServer.path_to_beam_jar(
':runners:spark:3:job-server:shadowJar')
else:
url = job_server.JavaJarJobServer.path_to_beam_jar(
':runners:spark:2:job-server:shadowJar',
artifact_id='beam-runners-spark-job-server')
return job_server.JavaJarJobServer.local_jar(url)

def create_beam_job(self, job_id, job_name, pipeline, options):
Expand Down

0 comments on commit 37e3fdf

Please sign in to comment.