Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Allow users to pass service name for profiler #26220

Merged
merged 15 commits into from
May 3, 2023
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
* Allow passing service name for google-cloud-profiler (Python) ([#26280](https://github.com/apache/beam/issues/26280)).

## Breaking Changes

Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,16 @@ def validate(self, validator):

return errors

def lookup_dataflow_service_option(self, key, default=None):
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
if not self.dataflow_service_options:
return None
elif key in self.dataflow_service_options:
return default
for service_name in self.dataflow_service_options:
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
if service_name.startswith(key + '='):
return service_name.split('=', 1)[1]
return None


class AzureOptions(PipelineOptions):
"""Azure Blob Storage options."""
Expand Down
1 change: 0 additions & 1 deletion sdks/python/apache_beam/runners/worker/data_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from apache_beam.coders.coder_impl import CoderImpl
Expand Down
23 changes: 12 additions & 11 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,20 @@ def create_harness(environment, dry_run=False):

def main(unused_argv):
"""Main entry point for SDK Fn Harness."""
fn_log_handler, sdk_harness, sdk_pipeline_options = create_harness(os.environ)
experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or []
dataflow_service_options = (
sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options
or [])
if (_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or (
_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options):
(fn_log_handler, sdk_harness,
sdk_pipeline_options) = create_harness(os.environ)
experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or [])
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
service_name = sdk_pipeline_options.view_as(
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
GoogleCloudOptions).lookup_dataflow_service_option(
_ENABLE_GOOGLE_CLOUD_PROFILER, default=os.environ["JOB_NAME"])

if ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or service_name):
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
try:
import googlecloudprofiler
job_id = os.environ["JOB_ID"]
job_name = os.environ["JOB_NAME"]
if job_id and job_name:
service_version = os.environ["JOB_ID"]
if service_version and service_name:
googlecloudprofiler.start(
service=job_name, service_version=job_id, verbose=1)
service=service_name, service_version=service_version, verbose=1)
_LOGGER.info('Turning on Google Cloud Profiler.')
else:
raise RuntimeError('Unable to find the job id or job name from envvar.')
Expand All @@ -199,6 +199,7 @@ def main(unused_argv):
'https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline.'
'For troubleshooting tips with Cloud Profiler see '
'https://cloud.google.com/profiler/docs/troubleshooting.' % e)

try:
_LOGGER.info('Python sdk harness starting.')
sdk_harness.run()
Expand Down