Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataProc commands implementation #862

Merged
merged 4 commits into from
Mar 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion component_sdk/python/kfp_component/google/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from . import ml_engine, dataflow
from . import ml_engine, dataflow, dataproc
3 changes: 2 additions & 1 deletion component_sdk/python/kfp_component/google/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._utils import normalize_name, dump_file, check_resource_changed
from ._utils import (normalize_name, dump_file,
check_resource_changed, wait_operation_done)
31 changes: 31 additions & 0 deletions component_sdk/python/kfp_component/google/common/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import re
import os
import time

def normalize_name(name,
valid_first_char_pattern='a-zA-Z',
Expand Down Expand Up @@ -90,3 +91,33 @@ def check_resource_changed(requested_resource,
return True
return False

def wait_operation_done(get_operation, wait_interval):
"""Waits for an operation to be done.

Args:
get_operation: the name of the operation.
wait_interval: the wait interview between pulling job
status.

Returns:
The completed operation.
"""
operation = None
while True:
operation = get_operation()
operation_name = operation.get('name')
done = operation.get('done', False)
if done:
break
logging.info('Operation {} is not done. Wait for {}s.'.format(
operation_name, wait_interval))
time.sleep(wait_interval)
error = operation.get('error', None)
if error:
raise RuntimeError('Failed to complete operation {}: {} {}'.format(
operation_name,
error.get('code', 'Unknown code'),
error.get('message', 'Unknown message'),
))
return operation

23 changes: 23 additions & 0 deletions component_sdk/python/kfp_component/google/dataproc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ._create_cluster import create_cluster
from ._delete_cluster import delete_cluster
from ._submit_job import submit_job
from ._submit_pyspark_job import submit_pyspark_job
from ._submit_spark_job import submit_spark_job
from ._submit_sparksql_job import submit_sparksql_job
from ._submit_hadoop_job import submit_hadoop_job
from ._submit_hive_job import submit_hive_job
from ._submit_pig_job import submit_pig_job
109 changes: 109 additions & 0 deletions component_sdk/python/kfp_component/google/dataproc/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time

import googleapiclient.discovery as discovery
from ..common import wait_operation_done

class DataprocClient:
""" Internal client for calling Dataproc APIs.
"""
def __init__(self):
self._dataproc = discovery.build('dataproc', 'v1')

def create_cluster(self, project_id, region, cluster, request_id):
"""Creates a new dataproc cluster.
"""
return self._dataproc.projects().regions().clusters().create(
projectId = project_id,
region = region,
requestId = request_id,
body = cluster
).execute()

def get_cluster(self, project_id, region, name):
"""Gets the resource representation for a cluster in a project.
"""
return self._dataproc.projects().regions().clusters().get(
projectId = project_id,
region = region,
clusterName = name
).execute()

def delete_cluster(self, project_id, region, name, request_id):
"""Deletes a cluster in a project.
"""
return self._dataproc.projects().regions().clusters().delete(
projectId = project_id,
region = region,
clusterName = name,
requestId = request_id
).execute()

def submit_job(self, project_id, region, job, request_id):
"""Submits a job to a cluster.
"""
return self._dataproc.projects().regions().jobs().submit(
projectId = project_id,
region = region,
body = {
'job': job,
'requestId': request_id
}
).execute()

def get_job(self, project_id, region, job_id):
"""Gets a job details
"""
return self._dataproc.projects().regions().jobs().get(
projectId = project_id,
region = region,
jobId = job_id
).execute()

def get_operation(self, operation_name):
"""Gets a operation by name.
"""
return self._dataproc.projects().regions().operations().get(
name = operation_name
).execute()

def wait_for_operation_done(self, operation_name, wait_interval):
"""Waits for an operation to be done.

Args:
operation_name: the name of the operation.
wait_interval: the wait interview between pulling job
status.

Returns:
The completed operation.
"""
return wait_operation_done(
lambda: self.get_operation(operation_name), wait_interval)

def cancel_operation(self, operation_name):
"""Cancels an operation.

Args:
operation_name: the name of the operation.
"""
if not operation_name:
return

self._dataproc.projects().regions().operations().cancel(
name = operation_name
).execute()
109 changes: 109 additions & 0 deletions component_sdk/python/kfp_component/google/dataproc/_create_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json

from fire import decorators
from ._client import DataprocClient
from kfp_component.core import KfpExecutionContext, display
from .. import common as gcp_common

@decorators.SetParseFns(image_version=str)
def create_cluster(project_id, region, name=None, name_prefix=None,
initialization_actions=None, config_bucket=None, image_version=None,
cluster=None, wait_interval=30):
"""Creates a DataProc cluster under a project.

Args:
project_id (str): Required. The ID of the Google Cloud Platform project
that the cluster belongs to.
region (str): Required. The Cloud Dataproc region in which to handle the
request.
name (str): Optional. The cluster name. Cluster names within a project
must be unique. Names of deleted clusters can be reused.
name_prefix (str): Optional. The prefix of the cluster name.
initialization_actions (list): Optional. List of GCS URIs of executables
to execute on each node after config is completed. By default,
executables are run on master and all worker nodes.
config_bucket (str): Optional. A Google Cloud Storage bucket used to
stage job dependencies, config files, and job driver console output.
image_version (str): Optional. The version of software inside the cluster.
cluster (dict): Optional. The full cluster config. See [full details](
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster)
wait_interval (int): The wait seconds between polling the operation.
Defaults to 30s.

Returns:
The created cluster object.

Output Files:
$KFP_OUTPUT_PATH/dataproc/cluster_name.txt: The cluster name of the
created cluster.
"""
if not cluster:
cluster = {}
cluster['projectId'] = project_id
cluster['config'] = {}
if name:
cluster['clusterName'] = name
if initialization_actions:
cluster['config']['initializationActions'] = list(
map(lambda file: {
'executableFile': file
}, initialization_actions)
)
if config_bucket:
cluster['config']['configBucket'] = config_bucket
if image_version:
if 'softwareConfig' not in cluster:
cluster['softwareConfig'] = {}
cluster['softwareConfig']['imageVersion'] = image_version

return _create_cluster_internal(project_id, region, cluster, name_prefix,
wait_interval)

def _create_cluster_internal(project_id, region, cluster, name_prefix,
wait_interval):
client = DataprocClient()
operation_name = None
with KfpExecutionContext(
on_cancel=lambda: client.cancel_operation(operation_name)) as ctx:
_set_cluster_name(cluster, ctx.context_id(), name_prefix)
_dump_metadata(cluster, region)
operation = client.create_cluster(project_id, region, cluster,
request_id=ctx.context_id())
operation_name = operation.get('name')
operation = client.wait_for_operation_done(operation_name,
wait_interval)
return _dump_cluster(operation.get('response'))

def _set_cluster_name(cluster, context_id, name_prefix):
if 'clusterName' in cluster:
return
if not name_prefix:
name_prefix = 'cluster'
cluster['clusterName'] = name_prefix + '-' + context_id

def _dump_metadata(cluster, region):
display.display(display.Link(
'https://console.cloud.google.com/dataproc/clusters/{}?project={}&region={}'.format(
cluster.get('clusterName'), cluster.get('projectId'), region),
'Cluster Details'
))

def _dump_cluster(cluster):
gcp_common.dump_file('/tmp/kfp/output/dataproc/cluster.json',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid hard-coding any paths inside the library/package? If you want to hardcode them, can you please hardcode them at the component level (in component.yaml) so that they can be fixed/modified without having to change the source code or the container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have another PR to make the kfp output dir to be configurable. Do you mind if it is sent later than this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Thank you for doing this. I know that you're not a fan of configurable paths, so I really appreciate what you're doing.

json.dumps(cluster))
gcp_common.dump_file('/tmp/kfp/output/dataproc/cluster_name.txt',
cluster.get('clusterName'))
return cluster
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ._client import DataprocClient
from kfp_component.core import KfpExecutionContext

def delete_cluster(project_id, region, name, wait_interval=30):
"""Deletes a DataProc cluster.

Args:
project_id (str): Required. The ID of the Google Cloud Platform project
that the cluster belongs to.
region (str): Required. The Cloud Dataproc region in which to handle the
request.
name (str): Required. The cluster name to delete.
wait_interval (int): The wait seconds between polling the operation.
Defaults to 30s.

"""
client = DataprocClient()
operation_name = None
with KfpExecutionContext(
on_cancel=lambda: client.cancel_operation(operation_name)) as ctx:
operation = client.delete_cluster(project_id, region, name,
request_id=ctx.context_id())
operation_name = operation.get('name')
return client.wait_for_operation_done(operation_name,
wait_interval)
Loading