Skip to content

Commit

Permalink
Update dataflow python to use external containers (#26383)
Browse files Browse the repository at this point in the history
* Update dataflow python to use external containers

* Lint

* Update tests to new container format

* Lint

* remove version_suffix
  • Loading branch information
damccorm authored Apr 24, 2023
1 parent 44a17cb commit 85d4276
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 18 deletions.
30 changes: 17 additions & 13 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,19 +1207,23 @@ def get_container_image_from_options(pipeline_options):
if worker_options.sdk_container_image:
return worker_options.sdk_container_image

# TODO(tvalentyn): Use enumerated type instead of strings for job types.
if _is_runner_v2(pipeline_options):
fnapi_suffix = '-fnapi'
else:
fnapi_suffix = ''

version_suffix = '%s%s' % (sys.version_info[0:2])
image_name = '{repository}/python{version_suffix}{fnapi_suffix}'.format(
repository=names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
version_suffix=version_suffix,
fnapi_suffix=fnapi_suffix)

image_tag = _get_required_container_version(_is_runner_v2(pipeline_options))
is_runner_v2 = _is_runner_v2(pipeline_options)

# Legacy and runner v2 exist in different repositories.
# Set to legacy format, override if runner v2
container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY
image_name = '{repository}/python{major}{minor}'.format(
repository=container_repo,
major=sys.version_info[0],
minor=sys.version_info[1])

if is_runner_v2:
image_name = '{repository}/beam_python{major}.{minor}_sdk'.format(
repository=container_repo,
major=sys.version_info[0],
minor=sys.version_info[1])

image_tag = _get_required_container_version(is_runner_v2)
return image_name + ':' + image_tag


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ def test_pinned_worker_harness_image_tag_used_in_dev_sdk(self):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(
names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d-fnapi:%s' %
(
names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/beam_python%d.%d_sdk:%s' % (
sys.version_info[0],
sys.version_info[1],
names.BEAM_FNAPI_CONTAINER_VERSION)))
Expand Down Expand Up @@ -670,7 +670,7 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self):
env.proto.workerPools[0].workerHarnessContainerImage,
(
names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python%d%d-fnapi:2.2.0' %
'/beam_python%d.%d_sdk:2.2.0' %
(sys.version_info[0], sys.version_info[1])))

# batch, legacy pipeline.
Expand Down Expand Up @@ -704,7 +704,7 @@ def test_worker_harness_image_tag_matches_base_sdk_version_of_an_rc(self):
env.proto.workerPools[0].workerHarnessContainerImage,
(
names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python%d%d-fnapi:2.2.0' %
'/beam_python%d.%d_sdk:2.2.0' %
(sys.version_info[0], sys.version_info[1])))

# batch, legacy pipeline.
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/dataflow/internal/names.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
BEAM_CONTAINER_VERSION = 'beam-master-20230412'
# Update this version to the next version whenever there is a change that
# requires changes to SDK harness container or SDK harness launcher.
BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20230412'
BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20230422'

DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'

Expand Down

0 comments on commit 85d4276

Please sign in to comment.