diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 47b39af8192f..f9140aed3566 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -1207,19 +1207,23 @@ def get_container_image_from_options(pipeline_options): if worker_options.sdk_container_image: return worker_options.sdk_container_image - # TODO(tvalentyn): Use enumerated type instead of strings for job types. - if _is_runner_v2(pipeline_options): - fnapi_suffix = '-fnapi' - else: - fnapi_suffix = '' - - version_suffix = '%s%s' % (sys.version_info[0:2]) - image_name = '{repository}/python{version_suffix}{fnapi_suffix}'.format( - repository=names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY, - version_suffix=version_suffix, - fnapi_suffix=fnapi_suffix) - - image_tag = _get_required_container_version(_is_runner_v2(pipeline_options)) + is_runner_v2 = _is_runner_v2(pipeline_options) + + # Legacy and runner v2 exist in different repositories. + # Set to legacy format, override if runner v2 + container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + image_name = '{repository}/python{major}{minor}'.format( + repository=container_repo, + major=sys.version_info[0], + minor=sys.version_info[1]) + + if is_runner_v2: + image_name = '{repository}/beam_python{major}.{minor}_sdk'.format( + repository=container_repo, + major=sys.version_info[0], + minor=sys.version_info[1]) + + image_tag = _get_required_container_version(is_runner_v2) return image_name + ':' + image_tag diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 8dfe5555bd27..21863d5f8805 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -631,8 +631,8 @@ def test_pinned_worker_harness_image_tag_used_in_dev_sdk(self): self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, ( - names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d-fnapi:%s' % - ( + names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + + '/beam_python%d.%d_sdk:%s' % ( sys.version_info[0], sys.version_info[1], names.BEAM_FNAPI_CONTAINER_VERSION))) @@ -670,7 +670,7 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self): env.proto.workerPools[0].workerHarnessContainerImage, ( names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python%d%d-fnapi:2.2.0' % + '/beam_python%d.%d_sdk:2.2.0' % (sys.version_info[0], sys.version_info[1]))) # batch, legacy pipeline. @@ -704,7 +704,7 @@ def test_worker_harness_image_tag_matches_base_sdk_version_of_an_rc(self): env.proto.workerPools[0].workerHarnessContainerImage, ( names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python%d%d-fnapi:2.2.0' % + '/beam_python%d.%d_sdk:2.2.0' % (sys.version_info[0], sys.version_info[1]))) # batch, legacy pipeline. diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 91fdca9ab21a..e4e03173077f 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -39,7 +39,7 @@ BEAM_CONTAINER_VERSION = 'beam-master-20230412' # Update this version to the next version whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. -BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20230412' +BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20230422' DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'