From d352d6086b64bf4390fe8fb551a80892090f6be0 Mon Sep 17 00:00:00 2001 From: Victor Date: Mon, 21 Mar 2022 16:44:11 -0400 Subject: [PATCH] [BEAM-14071] Enabling Flink on Dataproc for Interactive Beam (#17044) --- .../runners/interactive/cache_manager.py | 9 +++++- .../dataproc/dataproc_cluster_manager.py | 12 +++---- .../runners/interactive/interactive_beam.py | 19 +++++++---- .../interactive/interactive_environment.py | 6 ++-- .../runners/interactive/interactive_runner.py | 5 ++- .../interactive/interactive_runner_test.py | 32 +++++++++++++++++-- 6 files changed, 60 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py index 300f3a3b5efc..1960733ba38d 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py @@ -190,7 +190,14 @@ def __init__(self, cache_dir=None, cache_format='text'): def size(self, *labels): if self.exists(*labels): - return sum(os.path.getsize(path) for path in self._match(*labels)) + matched_path = self._match(*labels) + # if any matched path has a gs:// prefix, it must be cached on GCS + if 'gs://' in matched_path[0]: + from apache_beam.io.gcp import gcsio + return sum( + sum(gcsio.GcsIO().list_prefix(path).values()) + for path in matched_path) + return sum(os.path.getsize(path) for path in matched_path) return 0 def exists(self, *labels): diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py index 4a9c688f2f97..8bb8e2486022 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py @@ -27,7 +27,6 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive.utils import progress_indicated -from apache_beam.version import __version__ as beam_version try: from google.cloud import dataproc_v1 @@ -67,9 +66,6 @@ class DataprocClusterManager: required for creating and deleting Dataproc clusters for use under Interactive Beam. """ - IMAGE_VERSION = '2.0.31-debian10' - STAGING_LOG_NAME = 'dataproc-startup-script_output' - def __init__(self, cluster_metadata: MasterURLIdentifier) -> None: """Initializes the DataprocClusterManager with properties required to interface with the Dataproc ClusterControllerClient. @@ -162,13 +158,13 @@ def create_flink_cluster(self) -> None: 'cluster_name': self.cluster_metadata.cluster_name, 'config': { 'software_config': { - 'image_version': self.IMAGE_VERSION, + 'image_version': ie.current_env().clusters. + DATAPROC_IMAGE_VERSION, 'optional_components': ['DOCKER', 'FLINK'] }, 'gce_cluster_config': { 'metadata': { - 'flink-start-yarn-session': 'true', - 'PIP_PACKAGES': 'apache-beam[gcp]=={}'.format(beam_version) + 'flink-start-yarn-session': 'true' }, 'service_account_scopes': [ 'https://www.googleapis.com/auth/cloud-platform' @@ -310,7 +306,7 @@ def get_master_url_and_dashboard( """Returns the master_url of the current cluster.""" startup_logs = [] for file in self._fs._list(staging_bucket): - if self.STAGING_LOG_NAME in file.path: + if ie.current_env().clusters.DATAPROC_STAGING_LOG_NAME in file.path: startup_logs.append(file.path) for log in startup_logs: diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py index 6098b07ab09c..6fa1f8a3d5ec 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py @@ -37,7 +37,6 @@ from typing import DefaultDict from typing import Dict from typing import List -from typing import Mapping from typing import Optional import pandas as pd @@ -45,8 +44,6 @@ import apache_beam as beam from apache_beam.dataframe.frame_base import DeferredBase from apache_beam.runners.interactive import interactive_environment as ie -from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager -from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier from apache_beam.runners.interactive.display import pipeline_graph from apache_beam.runners.interactive.display.pcoll_visualization import visualize from apache_beam.runners.interactive.display.pcoll_visualization import visualize_computed_pcoll @@ -349,17 +346,26 @@ class Clusters: Example of calling the Interactive Beam clusters describe method:: ib.clusters.describe() """ + # Explicitly set the Flink version here to ensure compatibility with 2.0 + # Dataproc images: + # https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0 + DATAPROC_FLINK_VERSION = '1.12' + DATAPROC_IMAGE_VERSION = '2.0.31-debian10' + DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output' + def __init__(self) -> None: """Instantiates default values for Dataproc cluster interactions. """ + # Set the default_cluster_name that will be used when creating Dataproc + # clusters. self.default_cluster_name = 'interactive-beam-cluster' # Bidirectional 1-1 mapping between master_urls (str) to cluster metadata # (MasterURLIdentifier), where self.master_urls.inverse is a mapping from # MasterURLIdentifier -> str. - self.master_urls: Mapping[str, MasterURLIdentifier] = bidict() + self.master_urls = bidict() # self.dataproc_cluster_managers map string pipeline ids to instances of # DataprocClusterManager. - self.dataproc_cluster_managers: Dict[str, DataprocClusterManager] = {} + self.dataproc_cluster_managers = {} # self.master_urls_to_pipelines map string master_urls to lists of # pipelines that use the corresponding master_url. self.master_urls_to_pipelines: DefaultDict[ @@ -457,8 +463,7 @@ def cleanup( # Examples: # ib.clusters.describe(p) # Check the docstrings for detailed usages. -# TODO(victorhc): Resolve connection issue and add a working example -# clusters = Clusters() +clusters = Clusters() def watch(watchable): diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py index 325423dc88fe..5fc638cfedc6 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py @@ -170,10 +170,8 @@ def __init__(self): self._test_stream_service_controllers = {} self._cached_source_signature = {} self._tracked_user_pipelines = UserPipelineTracker() - # TODO(victorhc): remove the cluster instantiation after the - # interactive_beam.clusters class has been enabled. - from apache_beam.runners.interactive.interactive_beam import Clusters - self.clusters = Clusters() + from apache_beam.runners.interactive.interactive_beam import clusters + self.clusters = clusters # Tracks the computation completeness of PCollections. PCollections tracked # here don't need to be re-computed when data introspection is needed. diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index a384be9bdebe..b2da72cadf29 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -145,7 +145,10 @@ def run_pipeline(self, pipeline, options): master_url = self._get_dataproc_cluster_master_url_if_applicable( user_pipeline) if master_url: - options.view_as(FlinkRunnerOptions).flink_master = master_url + flink_options = options.view_as(FlinkRunnerOptions) + flink_options.flink_master = master_url + flink_options.flink_version = ie.current_env( + ).clusters.DATAPROC_FLINK_VERSION pipeline_instrument = inst.build_pipeline_instrument(pipeline, options) # The user_pipeline analyzed might be None if the pipeline given has nothing diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index 47dedd6578c6..d11cf32bdf62 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -31,6 +31,7 @@ import apache_beam as beam from apache_beam.dataframe.convert import to_dataframe +from apache_beam.options.pipeline_options import FlinkRunnerOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.direct import direct_runner @@ -543,12 +544,39 @@ def test_get_master_url_flink_master_provided(self): from apache_beam.runners.portability.flink_runner import FlinkRunner p = beam.Pipeline( interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()), - options=PipelineOptions( - flink_master='--flink_master=example.internal:1')) + options=PipelineOptions(flink_master='--flink_master=test.internal:1')) runner._get_dataproc_cluster_master_url_if_applicable(p) self.assertEqual(ie.current_env().clusters.describe(), {}) ie.current_env().clusters = ib.Clusters() + @unittest.skipIf( + not ie.current_env().is_interactive_ready, + '[interactive] dependency is not installed.') + @patch( + 'apache_beam.runners.interactive.interactive_runner.' + 'InteractiveRunner._get_dataproc_cluster_master_url_if_applicable', + return_value='test.internal:1') + def test_set_flink_dataproc_version(self, mock_get_master_url): + runner = interactive_runner.InteractiveRunner() + options = PipelineOptions() + p = beam.Pipeline(interactive_runner.InteractiveRunner()) + + # Watch the local scope for Interactive Beam so that values will be cached. + ib.watch(locals()) + + # This is normally done in the interactive_utils when a transform is + # applied but needs an IPython environment. So we manually run this here. + ie.current_env().track_user_pipelines() + + # Run the pipeline + runner.run_pipeline(p, options) + + # Check that the Flink version is set to the Dataproc image Flink version + # inside ib.clusters. + self.assertEqual( + options.view_as(FlinkRunnerOptions).flink_version, + ib.clusters.DATAPROC_FLINK_VERSION) + if __name__ == '__main__': unittest.main()