From 6da5fa6c295e10e44b2f046a9faa5612cca38af0 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 17 Mar 2022 12:17:03 -0700 Subject: [PATCH] Populate environment capabilities in v1beta3 protos. (#17042) --- .../apache_beam/runners/dataflow/internal/apiclient.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 9a9a4f9692e7..c63572e50fcd 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -288,8 +288,6 @@ def __init__( # Dataflow workers. environments_to_use = self._get_environments_from_tranforms() if _use_unified_worker(options): - python_sdk_container_image = get_container_image_from_options(options) - # Adding container images for other SDKs that may be needed for # cross-language pipelines. for id, environment in environments_to_use: @@ -303,11 +301,12 @@ def __init__( container_image = dataflow.SdkHarnessContainerImage() container_image.containerImage = container_image_url - # Currently we only set following to True for Python SDK. - # TODO: set this correctly for remote environments that might be Python. container_image.useSingleCorePerContainer = ( - container_image_url == python_sdk_container_image) + common_urns.protocols.MULTI_CORE_BUNDLE_PROCESSING in + environment.capabilities) container_image.environmentId = id + for capability in environment.capabilities: + container_image.capabilities.append(capability) pool.sdkHarnessContainerImages.append(container_image) if self.debug_options.number_of_worker_harness_threads: