diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py index d1c2734a8c7..4a9c688f2f9 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py @@ -18,10 +18,26 @@ # pytype: skip-file import logging +import re +import time from dataclasses import dataclass from typing import Optional +from typing import Tuple +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.interactive import interactive_environment as ie +from apache_beam.runners.interactive.utils import progress_indicated +from apache_beam.version import __version__ as beam_version + +try: + from google.cloud import dataproc_v1 + from apache_beam.io.gcp import gcsfilesystem #pylint: disable=ungrouped-imports +except ImportError: + + class UnimportedDataproc: + Cluster = None + + dataproc_v1 = UnimportedDataproc() _LOGGER = logging.getLogger(__name__) @@ -51,6 +67,9 @@ class DataprocClusterManager: required for creating and deleting Dataproc clusters for use under Interactive Beam. """ + IMAGE_VERSION = '2.0.31-debian10' + STAGING_LOG_NAME = 'dataproc-startup-script_output' + def __init__(self, cluster_metadata: MasterURLIdentifier) -> None: """Initializes the DataprocClusterManager with properties required to interface with the Dataproc ClusterControllerClient. @@ -69,7 +88,6 @@ def __init__(self, cluster_metadata: MasterURLIdentifier) -> None: self.cluster_metadata.cluster_name = ie.current_env( ).clusters.default_cluster_name - from google.cloud import dataproc_v1 self._cluster_client = dataproc_v1.ClusterControllerClient( client_options={ 'api_endpoint': \ @@ -79,9 +97,16 @@ def __init__(self, cluster_metadata: MasterURLIdentifier) -> None: if self.cluster_metadata in ie.current_env().clusters.master_urls.inverse: self.master_url = ie.current_env().clusters.master_urls.inverse[ self.cluster_metadata] + self.dashboard = ie.current_env().clusters.master_urls_to_dashboards[ + self.master_url] else: self.master_url = None + self.dashboard = None + self._fs = gcsfilesystem.GCSFileSystem(PipelineOptions()) + self._staging_directory = None + + @progress_indicated def create_cluster(self, cluster: dict) -> None: """Attempts to create a cluster using attributes that were initialized with the DataprocClusterManager instance. @@ -103,7 +128,10 @@ def create_cluster(self, cluster: dict) -> None: _LOGGER.info( 'Cluster created successfully: %s', self.cluster_metadata.cluster_name) - self.master_url = self.get_master_url(self.cluster_metadata) + self._staging_directory = self.get_staging_location(self.cluster_metadata) + self.master_url, self.dashboard = self.get_master_url_and_dashboard( + self.cluster_metadata, + self._staging_directory) except Exception as e: if e.code == 409: _LOGGER.info( @@ -127,7 +155,6 @@ def create_cluster(self, cluster: dict) -> None: 'Unable to create cluster: %s', self.cluster_metadata.cluster_name) raise e - # TODO(victorhc): Add support for user-specified pip packages def create_flink_cluster(self) -> None: """Calls _create_cluster with a configuration that enables FlinkRunner.""" cluster = { @@ -135,11 +162,13 @@ def create_flink_cluster(self) -> None: 'cluster_name': self.cluster_metadata.cluster_name, 'config': { 'software_config': { + 'image_version': self.IMAGE_VERSION, 'optional_components': ['DOCKER', 'FLINK'] }, 'gce_cluster_config': { 'metadata': { - 'flink-start-yarn-session': 'true' + 'flink-start-yarn-session': 'true', + 'PIP_PACKAGES': 'apache-beam[gcp]=={}'.format(beam_version) }, 'service_account_scopes': [ 'https://www.googleapis.com/auth/cloud-platform' @@ -156,6 +185,8 @@ def cleanup(self) -> None: """Deletes the cluster that uses the attributes initialized with the DataprocClusterManager instance.""" try: + if self._staging_directory: + self.cleanup_staging_files(self._staging_directory) self._cluster_client.delete_cluster( request={ 'project_id': self.cluster_metadata.project_id, @@ -186,15 +217,111 @@ def describe(self) -> None: """Returns a dictionary describing the cluster.""" return { 'cluster_metadata': self.cluster_metadata, - 'master_url': self.master_url + 'master_url': self.master_url, + 'dashboard': self.dashboard } - def get_master_url(self, identifier) -> None: + def get_cluster_details( + self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster: + """Gets the Dataproc_v1 Cluster object for the current cluster manager.""" + try: + return self._cluster_client.get_cluster( + request={ + 'project_id': cluster_metadata.project_id, + 'region': cluster_metadata.region, + 'cluster_name': cluster_metadata.cluster_name + }) + except Exception as e: + if e.code == 403: + _LOGGER.error( + 'Due to insufficient project permissions, ' + 'unable to retrieve information for cluster: %s', + cluster_metadata.cluster_name) + raise ValueError( + 'You cannot view clusters in project: {}'.format( + cluster_metadata.project_id)) + elif e.code == 404: + _LOGGER.error( + 'Cluster does not exist: %s', cluster_metadata.cluster_name) + raise ValueError( + 'Cluster was not found: {}'.format(cluster_metadata.cluster_name)) + else: + _LOGGER.error( + 'Failed to get information for cluster: %s', + cluster_metadata.cluster_name) + raise e + + def wait_for_cluster_to_provision( + self, cluster_metadata: MasterURLIdentifier) -> None: + while self.get_cluster_details( + cluster_metadata).status.state.name == 'CREATING': + time.sleep(15) + + def get_staging_location(self, cluster_metadata: MasterURLIdentifier) -> str: + """Gets the staging bucket of an existing Dataproc cluster.""" + try: + self.wait_for_cluster_to_provision(cluster_metadata) + cluster_details = self.get_cluster_details(cluster_metadata) + bucket_name = cluster_details.config.config_bucket + gcs_path = 'gs://' + bucket_name + '/google-cloud-dataproc-metainfo/' + for file in self._fs._list(gcs_path): + if cluster_metadata.cluster_name in file.path: + # this file path split will look something like: + # ['gs://.../google-cloud-dataproc-metainfo/{staging_dir}/', + # '-{node-type}/dataproc-startup-script_output'] + return file.path.split(cluster_metadata.cluster_name)[0] + except Exception as e: + _LOGGER.error( + 'Failed to get %s cluster staging bucket.', + cluster_metadata.cluster_name) + raise e + + def parse_master_url_and_dashboard( + self, cluster_metadata: MasterURLIdentifier, + line: str) -> Tuple[str, str]: + """Parses the master_url and YARN application_id of the Flink process from + an input line. The line containing both the master_url and application id + is always formatted as such: + {text} Found Web Interface {master_url} of application + '{application_id}'.\\n + + Truncated example where '...' represents additional text between segments: + ... google-dataproc-startup[000]: ... activate-component-flink[0000]: + ...org.apache.flink.yarn.YarnClusterDescriptor... [] - + Found Web Interface example-master-url:50000 of application + 'application_123456789000_0001'. + + Returns the flink_master_url and dashboard link as a tuple.""" + cluster_details = self.get_cluster_details(cluster_metadata) + yarn_endpoint = cluster_details.config.endpoint_config.http_ports[ + 'YARN ResourceManager'] + segment = line.split('Found Web Interface ')[1].split(' of application ') + master_url = segment[0] + application_id = re.sub('\'|.\n', '', segment[1]) + dashboard = re.sub( + '/yarn/', + '/gateway/default/yarn/proxy/' + application_id + '/', + yarn_endpoint) + return master_url, dashboard + + def get_master_url_and_dashboard( + self, cluster_metadata: MasterURLIdentifier, + staging_bucket) -> Tuple[Optional[str], Optional[str]]: """Returns the master_url of the current cluster.""" - # TODO(victorhc): Implement the following method to fetch the cluster - # master_url from Dataproc. - return '.'.join([ - self.cluster_metadata.project_id, - self.cluster_metadata.region, - self.cluster_metadata.cluster_name - ]) + startup_logs = [] + for file in self._fs._list(staging_bucket): + if self.STAGING_LOG_NAME in file.path: + startup_logs.append(file.path) + + for log in startup_logs: + content = self._fs.open(log) + for line in content.readlines(): + decoded_line = line.decode() + if 'Found Web Interface' in decoded_line: + return self.parse_master_url_and_dashboard( + cluster_metadata, decoded_line) + return None, None + + def cleanup_staging_files(self, staging_directory: str) -> None: + staging_files = [file.path for file in self._fs._list(staging_directory)] + self._fs.delete(staging_files) diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py index b34641d1e53..ba59cf6c4e6 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py @@ -15,25 +15,57 @@ # limitations under the License. # +"""Tests for apache_beam.runners.interactive.dataproc. +dataproc_cluster_manager.""" # pytype: skip-file import unittest from unittest.mock import patch +from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager +from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier + try: from google.cloud import dataproc_v1 # pylint: disable=unused-import - from apache_beam.runners.interactive.dataproc import dataproc_cluster_manager except ImportError: _dataproc_imported = False else: _dataproc_imported = True +class MockProperty: + def __init__(self, property, value): + object.__setattr__(self, property, value) + + class MockException(Exception): def __init__(self, code=-1): self.code = code +class MockCluster: + def __init__(self, config_bucket=None): + self.config = MockProperty('config_bucket', config_bucket) + self.status = MockProperty('state', MockProperty('name', None)) + + +class MockFileSystem: + def _list(self, dir=None): + return [MockProperty('path', 'test-path/dataproc-startup-script_output')] + + def open(self, dir=None): + return MockFileIO('test-line Found Web Interface test-master-url' \ + ' of application \'test-app-id\'.\n') + + +class MockFileIO: + def __init__(self, contents): + self.contents = contents + + def readlines(self): + return [self.contents.encode('utf-8')] + + @unittest.skipIf(not _dataproc_imported, 'dataproc package was not imported.') class DataprocClusterManagerTest(unittest.TestCase): """Unit test for DataprocClusterManager""" @@ -45,10 +77,9 @@ def test_create_cluster_default_already_exists(self, mock_cluster_client): Tests that no exception is thrown when a cluster already exists, but is using ie.current_env().clusters.default_cluster_name. """ - cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier( + cluster_metadata = MasterURLIdentifier( project_id='test-project', region='test-region') - cluster_manager = dataproc_cluster_manager.DataprocClusterManager( - cluster_metadata) + cluster_manager = DataprocClusterManager(cluster_metadata) from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER with self.assertLogs(_LOGGER, level='INFO') as context_manager: cluster_manager.create_cluster({}) @@ -62,10 +93,9 @@ def test_create_cluster_permission_denied(self, mock_cluster_client): Tests that an exception is thrown when a user is trying to write to a project while having insufficient permissions. """ - cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier( + cluster_metadata = MasterURLIdentifier( project_id='test-project', region='test-region') - cluster_manager = dataproc_cluster_manager.DataprocClusterManager( - cluster_metadata) + cluster_manager = DataprocClusterManager(cluster_metadata) from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER with self.assertLogs(_LOGGER, level='ERROR') as context_manager: self.assertRaises(ValueError, cluster_manager.create_cluster, {}) @@ -81,10 +111,9 @@ def test_create_cluster_region_does_not_exist(self, mock_cluster_client): Tests that an exception is thrown when a user specifies a region that does not exist. """ - cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier( + cluster_metadata = MasterURLIdentifier( project_id='test-project', region='test-region') - cluster_manager = dataproc_cluster_manager.DataprocClusterManager( - cluster_metadata) + cluster_manager = DataprocClusterManager(cluster_metadata) from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER with self.assertLogs(_LOGGER, level='ERROR') as context_manager: self.assertRaises(ValueError, cluster_manager.create_cluster, {}) @@ -98,27 +127,29 @@ def test_create_cluster_other_exception(self, mock_cluster_client): Tests that an exception is thrown when the exception is not handled by any other case under _create_cluster. """ - cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier( + cluster_metadata = MasterURLIdentifier( project_id='test-project', region='test-region') - cluster_manager = dataproc_cluster_manager.DataprocClusterManager( - cluster_metadata) + cluster_manager = DataprocClusterManager(cluster_metadata) from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER with self.assertLogs(_LOGGER, level='ERROR') as context_manager: self.assertRaises(MockException, cluster_manager.create_cluster, {}) self.assertTrue('Unable to create cluster' in context_manager.output[0]) + @patch( + 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' + 'DataprocClusterManager.cleanup_staging_files', + return_value=None) @patch( 'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster', side_effect=MockException(403)) - def test_cleanup_permission_denied(self, mock_cluster_client): + def test_cleanup_permission_denied(self, mock_cluster_client, mock_cleanup): """ Tests that an exception is thrown when a user is trying to delete a project that they have insufficient permissions for. """ - cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier( + cluster_metadata = MasterURLIdentifier( project_id='test-project', region='test-region') - cluster_manager = dataproc_cluster_manager.DataprocClusterManager( - cluster_metadata) + cluster_manager = DataprocClusterManager(cluster_metadata) from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER with self.assertLogs(_LOGGER, level='ERROR') as context_manager: self.assertRaises(ValueError, cluster_manager.cleanup) @@ -126,40 +157,191 @@ def test_cleanup_permission_denied(self, mock_cluster_client): 'Due to insufficient project permissions' in context_manager.output[0]) + @patch( + 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' + 'DataprocClusterManager.cleanup_staging_files', + return_value=None) @patch( 'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster', side_effect=MockException(404)) - def test_cleanup_does_not_exist(self, mock_cluster_client): + def test_cleanup_does_not_exist(self, mock_cluster_client, mock_cleanup): """ Tests that an exception is thrown when cleanup attempts to delete a cluster that does not exist. """ - cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier( + cluster_metadata = MasterURLIdentifier( project_id='test-project', region='test-region') - cluster_manager = dataproc_cluster_manager.DataprocClusterManager( - cluster_metadata) + cluster_manager = DataprocClusterManager(cluster_metadata) from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER with self.assertLogs(_LOGGER, level='ERROR') as context_manager: self.assertRaises(ValueError, cluster_manager.cleanup) self.assertTrue('Cluster does not exist' in context_manager.output[0]) + @patch( + 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' + 'DataprocClusterManager.cleanup_staging_files', + return_value=None) @patch( 'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster', side_effect=MockException()) - def test_cleanup_other_exception(self, mock_cluster_client): + def test_cleanup_other_exception(self, mock_cluster_client, mock_cleanup): """ Tests that an exception is thrown when the exception is not handled by any other case under cleanup. """ - cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier( + cluster_metadata = MasterURLIdentifier( project_id='test-project', region='test-region') - cluster_manager = dataproc_cluster_manager.DataprocClusterManager( - cluster_metadata) + cluster_manager = DataprocClusterManager(cluster_metadata) from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER with self.assertLogs(_LOGGER, level='ERROR') as context_manager: self.assertRaises(MockException, cluster_manager.cleanup) self.assertTrue('Failed to delete cluster' in context_manager.output[0]) + @patch( + 'apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._list', + return_value=[ + MockProperty( + 'path', + 'gs://test-bucket/google-cloud-dataproc-metainfo' + '/test-cluster/item') + ]) + @patch( + 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster', + return_value=MockCluster('test-bucket')) + def test_get_staging_location(self, mock_cluster_client, mock_list): + """ + Test to receive a mock staging location successfully under + get_staging_location. + """ + cluster_metadata = MasterURLIdentifier( + project_id='test-project', + region='test-region', + cluster_name='test-cluster') + cluster_manager = DataprocClusterManager(cluster_metadata) + self.assertEqual( + cluster_manager.get_staging_location(cluster_metadata), + 'gs://test-bucket/google-cloud-dataproc-metainfo/') + + @patch( + 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster', + side_effect=MockException()) + def test_get_staging_location_exception(self, mock_cluster_client): + """ + Test to catch when an error is raised inside get_staging_location. + """ + cluster_metadata = MasterURLIdentifier( + project_id='test-project', + region='test-region', + cluster_name='test-cluster') + cluster_manager = DataprocClusterManager(cluster_metadata) + with self.assertRaises(MockException): + cluster_manager.get_staging_location(cluster_metadata) + + @patch( + 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' + 'DataprocClusterManager.get_cluster_details', + return_value=MockProperty( + 'config', + MockProperty( + 'endpoint_config', + MockProperty( + 'http_ports', + {'YARN ResourceManager': 'test-resource-manager/yarn/'})))) + def test_parse_master_url_and_dashboard(self, mock_cluster_details): + """ + Tests that parse_master_url_and_dashboard properly parses the input + string and produces a mock master_url and mock dashboard link. + """ + cluster_metadata = MasterURLIdentifier( + project_id='test-project', region='test-region') + cluster_manager = DataprocClusterManager(cluster_metadata) + line = 'test-line Found Web Interface test-master-url' \ + ' of application \'test-app-id\'.\n' + master_url, dashboard = cluster_manager.parse_master_url_and_dashboard( + cluster_metadata, line) + self.assertEqual('test-master-url', master_url) + self.assertEqual( + 'test-resource-manager/gateway/default/yarn/proxy/test-app-id/', + dashboard) + + @patch( + 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster', + side_effect=MockException(403)) + def test_get_cluster_details_permission_denied(self, mock_cluster_client): + """ + Tests that an exception is thrown when a user is trying to get information + for a project without sufficient permissions to do so. + """ + cluster_metadata = MasterURLIdentifier( + project_id='test-project', region='test-region') + cluster_manager = DataprocClusterManager(cluster_metadata) + from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER + with self.assertLogs( + _LOGGER, + level='ERROR') as context_manager, self.assertRaises(ValueError): + cluster_manager.get_cluster_details(cluster_metadata) + self.assertTrue( + 'Due to insufficient project permissions' in + context_manager.output[0]) + + @patch( + 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster', + side_effect=MockException(404)) + def test_get_cluster_details_does_not_exist(self, mock_cluster_client): + """ + Tests that an exception is thrown when cleanup attempts to get information + for a cluster that does not exist. + """ + cluster_metadata = MasterURLIdentifier( + project_id='test-project', region='test-region') + cluster_manager = DataprocClusterManager(cluster_metadata) + from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER + with self.assertLogs( + _LOGGER, + level='ERROR') as context_manager, self.assertRaises(ValueError): + cluster_manager.get_cluster_details(cluster_metadata) + self.assertTrue('Cluster does not exist' in context_manager.output[0]) + + @patch( + 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster', + side_effect=MockException()) + def test_get_cluster_details_other_exception(self, mock_cluster_client): + """ + Tests that an exception is thrown when the exception is not handled by + any other case under get_cluster_details. + """ + cluster_metadata = MasterURLIdentifier( + project_id='test-project', region='test-region') + cluster_manager = DataprocClusterManager(cluster_metadata) + from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER + with self.assertLogs( + _LOGGER, + level='ERROR') as context_manager, self.assertRaises(MockException): + cluster_manager.get_cluster_details(cluster_metadata) + self.assertTrue( + 'Failed to get information for cluster' in context_manager.output[0]) + + @patch( + 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' + 'DataprocClusterManager.parse_master_url_and_dashboard', + return_value=('test-master-url', 'test-dashboard-link')) + def test_get_master_url_and_dashboard(self, mock_parse_method): + """ + Tests that get_master_url_and_dashboard detect the line containing the + unique substring which identifies the location of the master_url and + application id of the Flink master. + """ + cluster_metadata = MasterURLIdentifier( + project_id='test-project', region='test-region') + cluster_manager = DataprocClusterManager(cluster_metadata) + cluster_manager._fs = MockFileSystem() + master_url, dashboard = cluster_manager.get_master_url_and_dashboard( + cluster_metadata, + 'test-staging-bucket' + ) + self.assertEqual(master_url, 'test-master-url') + self.assertEqual(dashboard, 'test-dashboard-link') + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py index 5eab4893985..6098b07ab09 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py @@ -364,20 +364,23 @@ def __init__(self) -> None: # pipelines that use the corresponding master_url. self.master_urls_to_pipelines: DefaultDict[ str, List[beam.Pipeline]] = defaultdict(list) + # self.master_urls_to_dashboards map string master_urls to the + # corresponding Apache Flink dashboards. + self.master_urls_to_dashboards: Dict[str, str] = {} def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict: """Returns a description of the cluster associated to the given pipeline. If no pipeline is given then this returns a dictionary of descriptions for - all pipelines. + all pipelines, mapped to by id. """ description = { - ie.current_env().pipeline_id_to_pipeline(pid): dcm.describe() + pid: dcm.describe() for pid, dcm in self.dataproc_cluster_managers.items() } if pipeline: - return description.get(pipeline, None) + return description.get(str(id(pipeline)), None) return description def cleanup( @@ -419,6 +422,7 @@ def cleanup( cluster_manager.cleanup() self.master_urls.pop(master_url, None) self.master_urls_to_pipelines.pop(master_url, None) + self.master_urls_to_dashboards.pop(master_url, None) self.dataproc_cluster_managers.pop(str(id(pipeline)), None) else: cluster_manager_identifiers = set() @@ -429,6 +433,7 @@ def cleanup( self.dataproc_cluster_managers.clear() self.master_urls.clear() self.master_urls_to_pipelines.clear() + self.master_urls_to_dashboards.clear() # Users can set options to guide how Interactive Beam works. @@ -452,7 +457,7 @@ def cleanup( # Examples: # ib.clusters.describe(p) # Check the docstrings for detailed usages. -# TODO(victorhc): Implement all functionality for Clusters() +# TODO(victorhc): Resolve connection issue and add a working example # clusters = Clusters() diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py index b14848c0afc..4541463c1d9 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py @@ -306,15 +306,16 @@ def test_clusters_describe(self): region=region, )) cluster_metadata = MasterURLIdentifier(project_id=project, region=region) - clusters.dataproc_cluster_managers[p] = DataprocClusterManager( - cluster_metadata) - self.assertEqual('test-project', clusters.describe()[None] \ - ['cluster_metadata'].project_id) + clusters.dataproc_cluster_managers[str( + id(p))] = DataprocClusterManager(cluster_metadata) + self.assertEqual( + 'test-project', + clusters.describe()[str(id(p))]['cluster_metadata'].project_id) @patch( 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' - 'DataprocClusterManager.get_master_url', - return_value='test-master-url') + 'DataprocClusterManager.get_master_url_and_dashboard', + return_value=('test-master-url', None)) @patch( 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' 'DataprocClusterManager.cleanup', @@ -350,8 +351,8 @@ def test_clusters_cleanup_forcefully(self, mock_cleanup, mock_master_url): @patch( 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' - 'DataprocClusterManager.get_master_url', - return_value='test-master-url') + 'DataprocClusterManager.get_master_url_and_dashboard', + return_value=('test-master-url', None)) def test_clusters_cleanup_skip_on_duplicate(self, mock_master_url): clusters = ib.Clusters() project = 'test-project' diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index d76d68961af..a384be9bdeb 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -27,6 +27,7 @@ import apache_beam as beam from apache_beam import runners +from apache_beam.options.pipeline_options import FlinkRunnerOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.pipeline import PipelineVisitor from apache_beam.runners.direct import direct_runner @@ -137,6 +138,14 @@ def run_pipeline(self, pipeline, options): watch_sources(pipeline) user_pipeline = ie.current_env().user_pipeline(pipeline) + if user_pipeline: + # When the underlying_runner is a FlinkRunner instance, create a + # corresponding DataprocClusterManager for it if no flink_master_url + # is provided. + master_url = self._get_dataproc_cluster_master_url_if_applicable( + user_pipeline) + if master_url: + options.view_as(FlinkRunnerOptions).flink_master = master_url pipeline_instrument = inst.build_pipeline_instrument(pipeline, options) # The user_pipeline analyzed might be None if the pipeline given has nothing @@ -169,11 +178,6 @@ def exception_handler(e): ie.current_env().set_test_stream_service_controller( user_pipeline, test_stream_service) - # When the underlying_runner is a FlinkRunner instance, create a - # corresponding DataprocClusterManager for it if no flink_master_url - # is provided. - self._create_dataproc_cluster_if_applicable(user_pipeline) - pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api( pipeline_instrument.instrumented_pipeline_proto(), self._underlying_runner, @@ -220,7 +224,8 @@ def visit_transform(self, transform_node): # TODO(victorhc): Move this method somewhere else if performance is impacted # by generating a cluster during runtime. - def _create_dataproc_cluster_if_applicable(self, user_pipeline): + def _get_dataproc_cluster_master_url_if_applicable( + self, user_pipeline: beam.Pipeline) -> str: """ Creates a Dataproc cluster if the provided user_pipeline is running FlinkRunner and no flink_master_url was provided as an option. A cluster is not created when a flink_master_url is detected. @@ -241,7 +246,6 @@ def _create_dataproc_cluster_if_applicable(self, user_pipeline): ]) """ from apache_beam.runners.portability.flink_runner import FlinkRunner - from apache_beam.options.pipeline_options import FlinkRunnerOptions flink_master = user_pipeline.options.view_as( FlinkRunnerOptions).flink_master clusters = ie.current_env().clusters @@ -264,7 +268,7 @@ def _create_dataproc_cluster_if_applicable(self, user_pipeline): cluster_metadata = MasterURLIdentifier( project_id=project_id, region=region, cluster_name=cluster_name) else: - cluster_metadata = clusters.master_urls.inverse.get(flink_master, None) + cluster_metadata = clusters.master_urls.get(flink_master, None) # else noop, no need to log anything because we allow a master_url # (not managed by us) provided by the user. if cluster_metadata: @@ -278,6 +282,9 @@ def _create_dataproc_cluster_if_applicable(self, user_pipeline): id(user_pipeline))] = cluster_manager clusters.master_urls_to_pipelines[cluster_manager.master_url].append( str(id(user_pipeline))) + clusters.master_urls_to_dashboards[ + cluster_manager.master_url] = cluster_manager.dashboard + return cluster_manager.master_url class PipelineResult(beam.runners.runner.PipelineResult): diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index de5f1a5464d..47dedd6578c 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -37,6 +37,7 @@ from apache_beam.runners.interactive import interactive_beam as ib from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive import interactive_runner +from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython from apache_beam.testing.test_stream import TestStream from apache_beam.transforms.window import GlobalWindow @@ -491,7 +492,7 @@ def enter_composite_transform( 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' 'DataprocClusterManager.create_flink_cluster', return_value=None) - def test_create_dataproc_cluster_no_flink_master_or_master_url( + def test_get_master_url_no_flink_master_or_provided_master_url( self, mock_create_cluster): from apache_beam.runners.portability.flink_runner import FlinkRunner runner = interactive_runner.InteractiveRunner( @@ -501,8 +502,7 @@ def test_create_dataproc_cluster_no_flink_master_or_master_url( project='test-project', region='test-region', )) - runner._create_dataproc_cluster_if_applicable(p) - ie.current_env()._tracked_user_pipelines.add_user_pipeline(p) + runner._get_dataproc_cluster_master_url_if_applicable(p) self.assertEqual( ie.current_env().clusters.describe(p)['cluster_metadata'].project_id, 'test-project') @@ -511,14 +511,41 @@ def test_create_dataproc_cluster_no_flink_master_or_master_url( @unittest.skipIf( not ie.current_env().is_interactive_ready, '[interactive] dependency is not installed.') - def test_create_dataproc_cluster_flink_master_provided(self): + def test_get_master_url_no_flink_master_and_master_url_exists(self): + from apache_beam.runners.portability.flink_runner import FlinkRunner + runner = interactive_runner.InteractiveRunner( + underlying_runner=FlinkRunner()) + p = beam.Pipeline( + options=PipelineOptions( + project='test-project', + region='test-region', + )) + cluster_name = ie.current_env().clusters.default_cluster_name + cluster_metadata = MasterURLIdentifier( + project_id='test-project', + region='test-region', + cluster_name=cluster_name) + ie.current_env().clusters.master_urls['test-url'] = cluster_metadata + ie.current_env( + ).clusters.master_urls_to_dashboards['test-url'] = 'test-dashboard' + flink_master = runner._get_dataproc_cluster_master_url_if_applicable(p) + self.assertEqual( + ie.current_env().clusters.describe(p)['cluster_metadata'].project_id, + 'test-project') + self.assertEqual( + flink_master, ie.current_env().clusters.describe(p)['master_url']) + + @unittest.skipIf( + not ie.current_env().is_interactive_ready, + '[interactive] dependency is not installed.') + def test_get_master_url_flink_master_provided(self): runner = interactive_runner.InteractiveRunner() from apache_beam.runners.portability.flink_runner import FlinkRunner p = beam.Pipeline( interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()), options=PipelineOptions( flink_master='--flink_master=example.internal:1')) - runner._create_dataproc_cluster_if_applicable(p) + runner._get_dataproc_cluster_master_url_if_applicable(p) self.assertEqual(ie.current_env().clusters.describe(), {}) ie.current_env().clusters = ib.Clusters() diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py index 26734d40981..99847914ad9 100644 --- a/sdks/python/apache_beam/runners/interactive/utils_test.py +++ b/sdks/python/apache_beam/runners/interactive/utils_test.py @@ -55,9 +55,6 @@ class MockBuckets(): - def __init__(self): - pass - def Get(self, path): if path == 'test-bucket-not-found': raise HttpNotFoundError({'status': 404}, {}, '')