From c909a36e176c3309d9fe37c6276ea4794f6a8a4b Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 11 Apr 2023 10:35:18 -0400 Subject: [PATCH 01/12] handle service name for profiler --- .../runners/worker/sdk_worker_main.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index b643034899d0..3c74c726b98c 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -176,19 +176,24 @@ def create_harness(environment, dry_run=False): def main(unused_argv): """Main entry point for SDK Fn Harness.""" fn_log_handler, sdk_harness, sdk_pipeline_options = create_harness(os.environ) - experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or [] dataflow_service_options = ( sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options or []) - if (_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or ( - _ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options): + + exp = sdk_pipeline_options.view_as(DebugOptions).lookup_experiments( + _ENABLE_GOOGLE_CLOUD_PROFILER) + if exp or (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options): + if not isinstance(exp, bool): + # case of user passed profiler service name + service_name = exp + else: + service_name = os.environ["JOB_NAME"] + service_version = os.environ["JOB_ID"] try: import googlecloudprofiler - job_id = os.environ["JOB_ID"] - job_name = os.environ["JOB_NAME"] - if job_id and job_name: + if service_version and service_name: googlecloudprofiler.start( - service=job_name, service_version=job_id, verbose=1) + service=service_name, service_version=service_version, verbose=1) _LOGGER.info('Turning on Google Cloud Profiler.') else: raise RuntimeError('Unable to find the job id or job name from envvar.') @@ -199,6 +204,7 @@ def main(unused_argv): 'https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline.' 'For troubleshooting tips with Cloud Profiler see ' 'https://cloud.google.com/profiler/docs/troubleshooting.' % e) + try: _LOGGER.info('Python sdk harness starting.') sdk_harness.run() From 378bee482f7df50740cd72d97dde09f3649fefd0 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 12 Apr 2023 11:17:50 -0400 Subject: [PATCH 02/12] add debug messages --- .../runners/worker/sdk_worker_main.py | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 3c74c726b98c..df92e9b7a1d9 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -28,6 +28,7 @@ import traceback from google.protobuf import text_format # type: ignore # not in typeshed +import googlecloudprofiler from apache_beam.internal import pickler from apache_beam.io import filesystems @@ -175,23 +176,39 @@ def create_harness(environment, dry_run=False): def main(unused_argv): """Main entry point for SDK Fn Harness.""" - fn_log_handler, sdk_harness, sdk_pipeline_options = create_harness(os.environ) + _LOGGER.info("DEBUG: entered main") + (fn_log_handler, sdk_harness, + sdk_pipeline_options) = create_harness(os.environ) dataflow_service_options = ( sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options or []) + _LOGGER.info("DEBUG: got dataflow service options") + # exp = sdk_pipeline_options.view_as(DebugOptions).lookup_experiments() + experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) + _LOGGER.info("DEBUG: got experiments") + exp = None + if not exp and ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or + (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)): + exp = True + for experiment in experiments: + if experiment.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='): + exp = experiment.split('=', 1)[1] - exp = sdk_pipeline_options.view_as(DebugOptions).lookup_experiments( - _ENABLE_GOOGLE_CLOUD_PROFILER) - if exp or (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options): + _LOGGER.info("DEBUG: exp populated") + if exp: if not isinstance(exp, bool): # case of user passed profiler service name + _LOGGER.info("DEBUG: not a bool, assigning service name") service_name = exp else: service_name = os.environ["JOB_NAME"] service_version = os.environ["JOB_ID"] try: - import googlecloudprofiler + _LOGGER.info("DEBUG: gcprofiler imported") if service_version and service_name: + # _LOGGER.info( + # "PROFILER ENABLED: service_name=%s, service_version=%s".format( + # service_name, service_version)) googlecloudprofiler.start( service=service_name, service_version=service_version, verbose=1) _LOGGER.info('Turning on Google Cloud Profiler.') From 543e53b32e4eafc41d1afb6b65228e4bd15fe323 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 13 Apr 2023 16:04:04 -0400 Subject: [PATCH 03/12] remove logs --- .../apache_beam/runners/worker/sdk_worker_main.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index df92e9b7a1d9..6ad49bf485e6 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -176,16 +176,12 @@ def create_harness(environment, dry_run=False): def main(unused_argv): """Main entry point for SDK Fn Harness.""" - _LOGGER.info("DEBUG: entered main") (fn_log_handler, sdk_harness, sdk_pipeline_options) = create_harness(os.environ) dataflow_service_options = ( sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options or []) - _LOGGER.info("DEBUG: got dataflow service options") - # exp = sdk_pipeline_options.view_as(DebugOptions).lookup_experiments() experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) - _LOGGER.info("DEBUG: got experiments") exp = None if not exp and ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)): @@ -194,21 +190,15 @@ def main(unused_argv): if experiment.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='): exp = experiment.split('=', 1)[1] - _LOGGER.info("DEBUG: exp populated") if exp: if not isinstance(exp, bool): # case of user passed profiler service name - _LOGGER.info("DEBUG: not a bool, assigning service name") service_name = exp else: service_name = os.environ["JOB_NAME"] service_version = os.environ["JOB_ID"] try: - _LOGGER.info("DEBUG: gcprofiler imported") if service_version and service_name: - # _LOGGER.info( - # "PROFILER ENABLED: service_name=%s, service_version=%s".format( - # service_name, service_version)) googlecloudprofiler.start( service=service_name, service_version=service_version, verbose=1) _LOGGER.info('Turning on Google Cloud Profiler.') From 76a5a7011052c46e75516fda9913903270128f76 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 13 Apr 2023 16:04:04 -0400 Subject: [PATCH 04/12] move import statement and logs --- .../apache_beam/runners/worker/sdk_worker_main.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index df92e9b7a1d9..cab5c1aa057a 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -28,7 +28,6 @@ import traceback from google.protobuf import text_format # type: ignore # not in typeshed -import googlecloudprofiler from apache_beam.internal import pickler from apache_beam.io import filesystems @@ -176,16 +175,12 @@ def create_harness(environment, dry_run=False): def main(unused_argv): """Main entry point for SDK Fn Harness.""" - _LOGGER.info("DEBUG: entered main") (fn_log_handler, sdk_harness, sdk_pipeline_options) = create_harness(os.environ) dataflow_service_options = ( sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options or []) - _LOGGER.info("DEBUG: got dataflow service options") - # exp = sdk_pipeline_options.view_as(DebugOptions).lookup_experiments() experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) - _LOGGER.info("DEBUG: got experiments") exp = None if not exp and ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)): @@ -194,21 +189,16 @@ def main(unused_argv): if experiment.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='): exp = experiment.split('=', 1)[1] - _LOGGER.info("DEBUG: exp populated") if exp: if not isinstance(exp, bool): # case of user passed profiler service name - _LOGGER.info("DEBUG: not a bool, assigning service name") service_name = exp else: service_name = os.environ["JOB_NAME"] service_version = os.environ["JOB_ID"] try: - _LOGGER.info("DEBUG: gcprofiler imported") + import googlecloudprofiler if service_version and service_name: - # _LOGGER.info( - # "PROFILER ENABLED: service_name=%s, service_version=%s".format( - # service_name, service_version)) googlecloudprofiler.start( service=service_name, service_version=service_version, verbose=1) _LOGGER.info('Turning on Google Cloud Profiler.') From 9430fd2ef35642a394c6b7216601ae3b0d651eb6 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 13 Apr 2023 16:23:37 -0400 Subject: [PATCH 05/12] rm unnecessary check --- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 8f5fdfaa9df1..0d054c333fa0 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -182,8 +182,8 @@ def main(unused_argv): or []) experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) exp = None - if not exp and ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or - (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)): + if ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or + (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)): exp = True for experiment in experiments: if experiment.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='): From 46f910ed83e71739642c935b526445b33494fb10 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 14 Apr 2023 11:39:55 -0400 Subject: [PATCH 06/12] add helper to GCloudOption --- CHANGES.md | 1 + .../apache_beam/options/pipeline_options.py | 10 ++++++++++ .../runners/worker/sdk_worker_main.py | 20 ++++--------------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3113c8750054..2123e195ce4d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Allow passing service name for google-cloud-profiler (Python) ([#26280](https://github.com/apache/beam/issues/26280)). ## Breaking Changes diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index a602912cd97f..5cf7650417d4 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -876,6 +876,16 @@ def validate(self, validator): return errors + def lookup_dataflow_service_option(self, key, default=None): + if not self.dataflow_service_options: + return None + elif key in self.dataflow_service_options: + return default + for service_name in self.dataflow_service_options: + if service_name.startswith(key + '='): + return service_name.split('=', 1)[1] + return None + class AzureOptions(PipelineOptions): """Azure Blob Storage options.""" diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 0d054c333fa0..80f696fb37d4 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -177,24 +177,12 @@ def main(unused_argv): """Main entry point for SDK Fn Harness.""" (fn_log_handler, sdk_harness, sdk_pipeline_options) = create_harness(os.environ) - dataflow_service_options = ( - sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options - or []) experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) - exp = None - if ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or - (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)): - exp = True - for experiment in experiments: - if experiment.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='): - exp = experiment.split('=', 1)[1] + service_name = sdk_pipeline_options.view_as( + GoogleCloudOptions).lookup_dataflow_service_option( + _ENABLE_GOOGLE_CLOUD_PROFILER, default=os.environ["JOB_NAME"]) - if exp: - if not isinstance(exp, bool): - # case of user passed profiler service name - service_name = exp - else: - service_name = os.environ["JOB_NAME"] + if ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or service_name): try: import googlecloudprofiler service_version = os.environ["JOB_ID"] From 901c6036875bb8a5149981ea2b83f6d2ab6cf559 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 14 Apr 2023 12:38:39 -0400 Subject: [PATCH 07/12] rm lint --- sdks/python/apache_beam/runners/worker/data_sampler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/data_sampler.py b/sdks/python/apache_beam/runners/worker/data_sampler.py index 2b37d0080405..7cc8152693da 100644 --- a/sdks/python/apache_beam/runners/worker/data_sampler.py +++ b/sdks/python/apache_beam/runners/worker/data_sampler.py @@ -29,7 +29,6 @@ from typing import Iterable from typing import List from typing import Optional -from typing import Tuple from typing import Union from apache_beam.coders.coder_impl import CoderImpl From c9ba755053406d8efc4e7501f6998ac9f848dcbe Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 17 Apr 2023 13:09:15 -0400 Subject: [PATCH 08/12] added new helper and unit test --- .../apache_beam/options/pipeline_options.py | 11 +++---- .../options/pipeline_options_test.py | 8 +++++ .../runners/worker/sdk_worker_main.py | 29 ++++++++++++------- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 5cf7650417d4..4c9ab7b6b2f9 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -22,6 +22,7 @@ import argparse import json import logging +import os from typing import Any from typing import Callable from typing import Dict @@ -876,14 +877,14 @@ def validate(self, validator): return errors - def lookup_dataflow_service_option(self, key, default=None): + def get_cloud_profiler_service_name(self, key): if not self.dataflow_service_options: return None elif key in self.dataflow_service_options: - return default - for service_name in self.dataflow_service_options: - if service_name.startswith(key + '='): - return service_name.split('=', 1)[1] + return os.environ["JOB_NAME"] + for option_name in self.dataflow_service_options: + if option_name.startswith(key + '='): + return option_name.split('=', 1)[1] return None diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index f83f703e33ba..12c4061f2882 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -626,6 +626,14 @@ def test_lookup_experiments(self): self.assertEqual( True, debug_options.lookup_experiment('existing_experiment')) + def test_get_cloud_profiler_service_name(self): + options = PipelineOptions( + ['--dataflow_service_options=enable_google_cloud_profiler=sample']) + self.assertEqual( + 'sample', + options.view_as(GoogleCloudOptions).get_cloud_profiler_service_name( + 'enable_google_cloud_profiler')) + def test_transform_name_mapping(self): options = PipelineOptions(['--transform_name_mapping={\"from\":\"to\"}']) mapping = options.view_as(GoogleCloudOptions).transform_name_mapping diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 80f696fb37d4..274752f0f122 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -173,22 +173,21 @@ def create_harness(environment, dry_run=False): return fn_log_handler, sdk_harness, sdk_pipeline_options -def main(unused_argv): - """Main entry point for SDK Fn Harness.""" - (fn_log_handler, sdk_harness, - sdk_pipeline_options) = create_harness(os.environ) +def _start_profiler_if_enabled(sdk_pipeline_options): experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) - service_name = sdk_pipeline_options.view_as( - GoogleCloudOptions).lookup_dataflow_service_option( - _ENABLE_GOOGLE_CLOUD_PROFILER, default=os.environ["JOB_NAME"]) - - if ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or service_name): + gcp_profiler_service_name = sdk_pipeline_options.view_as( + GoogleCloudOptions).get_cloud_profiler_service_name( + _ENABLE_GOOGLE_CLOUD_PROFILER) + if (_ENABLE_GOOGLE_CLOUD_PROFILER in experiments + ) or gcp_profiler_service_name: try: import googlecloudprofiler service_version = os.environ["JOB_ID"] - if service_version and service_name: + if service_version and gcp_profiler_service_name: googlecloudprofiler.start( - service=service_name, service_version=service_version, verbose=1) + service=gcp_profiler_service_name, + service_version=service_version, + verbose=1) _LOGGER.info('Turning on Google Cloud Profiler.') else: raise RuntimeError('Unable to find the job id or job name from envvar.') @@ -200,6 +199,14 @@ def main(unused_argv): 'For troubleshooting tips with Cloud Profiler see ' 'https://cloud.google.com/profiler/docs/troubleshooting.' % e) + +def main(unused_argv): + """Main entry point for SDK Fn Harness.""" + (fn_log_handler, sdk_harness, + sdk_pipeline_options) = create_harness(os.environ) + + _start_profiler_if_enabled(sdk_pipeline_options) + try: _LOGGER.info('Python sdk harness starting.') sdk_harness.run() From 149636dca65fc0ddffbf78ed23f8d1d54eee81b3 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 21 Apr 2023 11:23:09 -0400 Subject: [PATCH 09/12] separate functions --- .../apache_beam/options/pipeline_options.py | 6 +-- .../runners/worker/sdk_worker_main.py | 47 +++++++++++-------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 4c9ab7b6b2f9..e221558583c0 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -877,13 +877,13 @@ def validate(self, validator): return errors - def get_cloud_profiler_service_name(self, key): + def get_cloud_profiler_service_name(self): if not self.dataflow_service_options: return None - elif key in self.dataflow_service_options: + elif 'enable_google_cloud_profiler' in self.dataflow_service_options: return os.environ["JOB_NAME"] for option_name in self.dataflow_service_options: - if option_name.startswith(key + '='): + if option_name.startswith('enable_google_cloud_profiler='): return option_name.split('=', 1)[1] return None diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 274752f0f122..9daf6ef703d9 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -173,31 +173,38 @@ def create_harness(environment, dry_run=False): return fn_log_handler, sdk_harness, sdk_pipeline_options +def _start_profiler(gcp_profiler_service_name, gcp_profiler_service_version): + try: + import googlecloudprofiler + if gcp_profiler_service_version: + googlecloudprofiler.start( + service=gcp_profiler_service_name, + service_version=gcp_profiler_service_version, + verbose=1) + _LOGGER.info('Turning on Google Cloud Profiler.') + else: + raise RuntimeError('Unable to find the job id from envvar.') + except Exception as e: # pylint: disable=broad-except + _LOGGER.warning( + 'Unable to start google cloud profiler due to error: %s. For how to ' + 'enable Cloud Profiler with Dataflow see ' + 'https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline.' + 'For troubleshooting tips with Cloud Profiler see ' + 'https://cloud.google.com/profiler/docs/troubleshooting.' % e) + + def _start_profiler_if_enabled(sdk_pipeline_options): experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) gcp_profiler_service_name = sdk_pipeline_options.view_as( GoogleCloudOptions).get_cloud_profiler_service_name( _ENABLE_GOOGLE_CLOUD_PROFILER) - if (_ENABLE_GOOGLE_CLOUD_PROFILER in experiments - ) or gcp_profiler_service_name: - try: - import googlecloudprofiler - service_version = os.environ["JOB_ID"] - if service_version and gcp_profiler_service_name: - googlecloudprofiler.start( - service=gcp_profiler_service_name, - service_version=service_version, - verbose=1) - _LOGGER.info('Turning on Google Cloud Profiler.') - else: - raise RuntimeError('Unable to find the job id or job name from envvar.') - except Exception as e: # pylint: disable=broad-except - _LOGGER.warning( - 'Unable to start google cloud profiler due to error: %s. For how to ' - 'enable Cloud Profiler with Dataflow see ' - 'https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline.' - 'For troubleshooting tips with Cloud Profiler see ' - 'https://cloud.google.com/profiler/docs/troubleshooting.' % e) + + if _ENABLE_GOOGLE_CLOUD_PROFILER in experiments and \ + not gcp_profiler_service_name: + gcp_profiler_service_name = os.environ["JOB_NAME"] + + if gcp_profiler_service_name: + _start_profiler(gcp_profiler_service_name, os.environ["JOB_ID"]) def main(unused_argv): From f4ad1a092f719ca9593371aa3e8043ffa1bfebc5 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 21 Apr 2023 15:49:57 -0400 Subject: [PATCH 10/12] updated unit tests --- .../options/pipeline_options_test.py | 8 ----- .../runners/worker/sdk_worker_main.py | 12 ++++---- .../runners/worker/sdk_worker_main_test.py | 29 +++++++++++++++++++ 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 12c4061f2882..f83f703e33ba 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -626,14 +626,6 @@ def test_lookup_experiments(self): self.assertEqual( True, debug_options.lookup_experiment('existing_experiment')) - def test_get_cloud_profiler_service_name(self): - options = PipelineOptions( - ['--dataflow_service_options=enable_google_cloud_profiler=sample']) - self.assertEqual( - 'sample', - options.view_as(GoogleCloudOptions).get_cloud_profiler_service_name( - 'enable_google_cloud_profiler')) - def test_transform_name_mapping(self): options = PipelineOptions(['--transform_name_mapping={\"from\":\"to\"}']) mapping = options.view_as(GoogleCloudOptions).transform_name_mapping diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 9daf6ef703d9..e6d90fbb9047 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -193,18 +193,16 @@ def _start_profiler(gcp_profiler_service_name, gcp_profiler_service_version): 'https://cloud.google.com/profiler/docs/troubleshooting.' % e) -def _start_profiler_if_enabled(sdk_pipeline_options): +def _get_gcp_profiler_name_if_enabled(sdk_pipeline_options): experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) gcp_profiler_service_name = sdk_pipeline_options.view_as( - GoogleCloudOptions).get_cloud_profiler_service_name( - _ENABLE_GOOGLE_CLOUD_PROFILER) + GoogleCloudOptions).get_cloud_profiler_service_name() if _ENABLE_GOOGLE_CLOUD_PROFILER in experiments and \ not gcp_profiler_service_name: gcp_profiler_service_name = os.environ["JOB_NAME"] - if gcp_profiler_service_name: - _start_profiler(gcp_profiler_service_name, os.environ["JOB_ID"]) + return gcp_profiler_service_name def main(unused_argv): @@ -212,7 +210,9 @@ def main(unused_argv): (fn_log_handler, sdk_harness, sdk_pipeline_options) = create_harness(os.environ) - _start_profiler_if_enabled(sdk_pipeline_options) + gcp_profiler_name = _get_gcp_profiler_name_if_enabled(sdk_pipeline_options) + if gcp_profiler_name: + _start_profiler(gcp_profiler_name, os.environ["JOB_ID"]) try: _LOGGER.info('Python sdk harness starting.') diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py index c7ec4220850f..2cba8d7be181 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py @@ -22,6 +22,7 @@ import io import logging import unittest +import os from hamcrest import all_of from hamcrest import assert_that @@ -205,6 +206,34 @@ def test__set_log_level_overrides_error(self): sdk_worker_main._set_log_level_overrides(overrides) self.assertIn(expected, cm.output[0]) + def test_gcp_profiler_uses_provided_service_name_when_specified(self): + options = PipelineOptions( + ['--dataflow_service_options=enable_google_cloud_profiler=sample']) + gcp_profiler_name = sdk_worker_main._get_gcp_profiler_name_if_enabled( + options) + sdk_worker_main._start_profiler = unittest.mock.MagicMock() + sdk_worker_main._start_profiler(gcp_profiler_name, "version") + sdk_worker_main._start_profiler.assert_called_with("sample", "version") + + @unittest.mock.patch.dict(os.environ, {"JOB_NAME": "sample_job"}, clear=True) + def test_gcp_profiler_uses_job_name_when_service_name_not_specified(self): + options = PipelineOptions( + ['--dataflow_service_options=enable_google_cloud_profiler']) + gcp_profiler_name = sdk_worker_main._get_gcp_profiler_name_if_enabled( + options) + sdk_worker_main._start_profiler = unittest.mock.MagicMock() + sdk_worker_main._start_profiler(gcp_profiler_name, "version") + sdk_worker_main._start_profiler.assert_called_with("sample_job", "version") + + @unittest.mock.patch.dict(os.environ, {"JOB_NAME": "sample_job"}, clear=True) + def test_gcp_profiler_uses_job_name_when_enabled_as_experiment(self): + options = PipelineOptions(['--experiment=enable_google_cloud_profiler']) + gcp_profiler_name = sdk_worker_main._get_gcp_profiler_name_if_enabled( + options) + sdk_worker_main._start_profiler = unittest.mock.MagicMock() + sdk_worker_main._start_profiler(gcp_profiler_name, "version") + sdk_worker_main._start_profiler.assert_called_with("sample_job", "version") + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From bd979057fdaa67889d2ab58d67ebe1eb8df6cb95 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 24 Apr 2023 11:37:50 -0400 Subject: [PATCH 11/12] refactored plus check for envvar --- .../apache_beam/options/pipeline_options.py | 17 +++++++++++------ .../runners/worker/sdk_worker_main.py | 9 ++------- .../runners/worker/sdk_worker_main_test.py | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index e221558583c0..283c2caa49da 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -878,13 +878,18 @@ def validate(self, validator): return errors def get_cloud_profiler_service_name(self): - if not self.dataflow_service_options: - return None - elif 'enable_google_cloud_profiler' in self.dataflow_service_options: + _ENABLE_GOOGLE_CLOUD_PROFILER = 'enable_google_cloud_profiler' + if self.dataflow_service_options: + if _ENABLE_GOOGLE_CLOUD_PROFILER in self.dataflow_service_options: + return os.environ["JOB_NAME"] + for option_name in self.dataflow_service_options: + if option_name.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='): + return option_name.split('=', 1)[1] + + experiments = self.view_as(DebugOptions).experiments or [] + if _ENABLE_GOOGLE_CLOUD_PROFILER in experiments: return os.environ["JOB_NAME"] - for option_name in self.dataflow_service_options: - if option_name.startswith('enable_google_cloud_profiler='): - return option_name.split('=', 1)[1] + return None diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index e6d90fbb9047..e0545e1f0077 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -176,14 +176,14 @@ def create_harness(environment, dry_run=False): def _start_profiler(gcp_profiler_service_name, gcp_profiler_service_version): try: import googlecloudprofiler - if gcp_profiler_service_version: + if gcp_profiler_service_name and gcp_profiler_service_version: googlecloudprofiler.start( service=gcp_profiler_service_name, service_version=gcp_profiler_service_version, verbose=1) _LOGGER.info('Turning on Google Cloud Profiler.') else: - raise RuntimeError('Unable to find the job id from envvar.') + raise RuntimeError('Unable to find the job id or job name from envvar.') except Exception as e: # pylint: disable=broad-except _LOGGER.warning( 'Unable to start google cloud profiler due to error: %s. For how to ' @@ -194,14 +194,9 @@ def _start_profiler(gcp_profiler_service_name, gcp_profiler_service_version): def _get_gcp_profiler_name_if_enabled(sdk_pipeline_options): - experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or []) gcp_profiler_service_name = sdk_pipeline_options.view_as( GoogleCloudOptions).get_cloud_profiler_service_name() - if _ENABLE_GOOGLE_CLOUD_PROFILER in experiments and \ - not gcp_profiler_service_name: - gcp_profiler_service_name = os.environ["JOB_NAME"] - return gcp_profiler_service_name diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py index 2cba8d7be181..00e09840787f 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py @@ -21,8 +21,8 @@ import io import logging -import unittest import os +import unittest from hamcrest import all_of from hamcrest import assert_that From 1706a5562a55e66245dc13c780e2cc6ec7644564 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 24 Apr 2023 11:40:35 -0400 Subject: [PATCH 12/12] updated changes.md --- CHANGES.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 1b068be47408..6b7fbc5f5fe4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,7 +64,6 @@ ## New Features / Improvements -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Allow passing service name for google-cloud-profiler (Python) ([#26280](https://github.com/apache/beam/issues/26280)). * Dead letter queue support added to RunInference in Python ([#24209](https://github.com/apache/beam/issues/24209)).