Skip to content

Commit

Permalink
docs: update cluster sample (#218)
Browse files Browse the repository at this point in the history
* first steps in adding sample

* consistent formatting with create_cluster.py

* test first draft

* update_cluster sample complete

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/master/packages/owl-bot/README.md

* docs: add update cluster sample - fixing formatting

* Update samples/snippets/update_cluster.py

Co-authored-by: Bu Sun Kim <[email protected]>

* Update samples/snippets/update_cluster.py

Co-authored-by: Bu Sun Kim <[email protected]>

* updated test, still fine-tuning

* added get_cluster to test

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/master/packages/owl-bot/README.md

* another attempt at writing test

* new test pattern

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/master/packages/owl-bot/README.md

* updated static for new_num_instances and fixed linting error

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Bu Sun Kim <[email protected]>
  • Loading branch information
3 people authored Aug 12, 2021
1 parent 0f29c12 commit b11a14b
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

def create_cluster(project_id, region, cluster_name):
"""This sample walks a user through creating a Cloud Dataproc cluster
using the Python client library.
using the Python client library.
Args:
project_id (string): Project to use for creating resources.
region (string): Region where the resources should live.
cluster_name (string): Name to use for creating a cluster.
Args:
project_id (string): Project to use for creating resources.
region (string): Region where the resources should live.
cluster_name (string): Name to use for creating a cluster.
"""

# Create a client with the endpoint set to the desired cluster region.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@

def instantiate_inline_workflow_template(project_id, region):
"""This sample walks a user through submitting a workflow
for a Cloud Dataproc using the Python client library.
for a Cloud Dataproc using the Python client library.
Args:
project_id (string): Project to use for running the workflow.
region (string): Region where the workflow resources should live.
Args:
project_id (string): Project to use for running the workflow.
region (string): Region where the workflow resources should live.
"""

# Create a client with the endpoint set to the desired region.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ def main(project_id, region):
else:
# Use a regional gRPC endpoint. See:
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
client_transport = (
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
)
)
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(client_transport)

Expand Down
24 changes: 13 additions & 11 deletions packages/google-cloud-dataproc/samples/snippets/submit_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

# [START dataproc_submit_job]
import re

# [END dataproc_submit_job]
import sys

# [START dataproc_submit_job]

from google.cloud import dataproc_v1 as dataproc
Expand All @@ -33,21 +35,19 @@

def submit_job(project_id, region, cluster_name):
# Create the job client.
job_client = dataproc.JobControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
})
job_client = dataproc.JobControllerClient(
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
)

# Create the job config. 'main_jar_file_uri' can also be a
# Google Cloud Storage URL.
job = {
'placement': {
'cluster_name': cluster_name
"placement": {"cluster_name": cluster_name},
"spark_job": {
"main_class": "org.apache.spark.examples.SparkPi",
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"args": ["1000"],
},
'spark_job': {
'main_class': 'org.apache.spark.examples.SparkPi',
'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
'args': ['1000']
}
}

operation = job_client.submit_job_as_operation(
Expand All @@ -67,12 +67,14 @@ def submit_job(project_id, region, cluster_name):
)

print(f"Job finished successfully: {output}")


# [END dataproc_submit_job]


if __name__ == "__main__":
if len(sys.argv) < 3:
sys.exit('python submit_job.py project_id region cluster_name')
sys.exit("python submit_job.py project_id region cluster_name")

project_id = sys.argv[1]
region = sys.argv[2]
Expand Down
42 changes: 20 additions & 22 deletions packages/google-cloud-dataproc/samples/snippets/submit_job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,24 @@
import submit_job


PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT']
REGION = 'us-central1'
CLUSTER_NAME = 'py-sj-test-{}'.format(str(uuid.uuid4()))
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = "us-central1"
CLUSTER_NAME = "py-sj-test-{}".format(str(uuid.uuid4()))
CLUSTER = {
'project_id': PROJECT_ID,
'cluster_name': CLUSTER_NAME,
'config': {
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-2'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-2'
}
}
"project_id": PROJECT_ID,
"cluster_name": CLUSTER_NAME,
"config": {
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
},
}


@pytest.fixture(autouse=True)
def setup_teardown():
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
})
cluster_client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)}
)

# Create the cluster.
operation = cluster_client.create_cluster(
Expand All @@ -54,13 +48,17 @@ def setup_teardown():

yield

cluster_client.delete_cluster(request={
"project_id": PROJECT_ID, "region": REGION, "cluster_name": CLUSTER_NAME
})
cluster_client.delete_cluster(
request={
"project_id": PROJECT_ID,
"region": REGION,
"cluster_name": CLUSTER_NAME,
}
)


def test_submit_job(capsys):
submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME)
out, _ = capsys.readouterr()

assert 'Job finished successfully' in out
assert "Job finished successfully" in out
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ def download_output(project, cluster_id, output_bucket, job_id):
print("Downloading output file.")
client = storage.Client(project=project)
bucket = client.get_bucket(output_bucket)
output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
cluster_id, job_id
output_blob = (
"google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
cluster_id, job_id
)
)
return bucket.blob(output_blob).download_as_string()

Expand Down Expand Up @@ -230,8 +232,10 @@ def main(
region = get_region_from_zone(zone)
# Use a regional gRPC endpoint. See:
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
client_transport = (
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
)
)
job_transport = job_controller_grpc_transport.JobControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
Expand Down
78 changes: 78 additions & 0 deletions packages/google-cloud-dataproc/samples/snippets/update_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This sample walks a user through updating the number of clusters using the Dataproc
# client library.

# Usage:
# python update_cluster.py --project_id <PROJECT_ID> --region <REGION> --cluster_name <CLUSTER_NAME>

import sys

# [START dataproc_update_cluster]
from google.cloud import dataproc_v1 as dataproc


def update_cluster(project_id, region, cluster_name, new_num_instances):
"""This sample walks a user through updating a Cloud Dataproc cluster
using the Python client library.
Args:
project_id (str): Project to use for creating resources.
region (str): Region where the resources should live.
cluster_name (str): Name to use for creating a cluster.
"""

# Create a client with the endpoint set to the desired cluster region.
client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Get cluster you wish to update.
cluster = client.get_cluster(
project_id=project_id, region=region, cluster_name=cluster_name
)

# Update number of clusters
mask = {"paths": {"config.worker_config.num_instances": str(new_num_instances)}}

# Update cluster config
cluster.config.worker_config.num_instances = new_num_instances

# Update cluster
operation = client.update_cluster(
project_id=project_id,
region=region,
cluster=cluster,
cluster_name=cluster_name,
update_mask=mask,
)

# Output a success message.
updated_cluster = operation.result()
print(f"Cluster was updated successfully: {updated_cluster.cluster_name}")


# [END dataproc_update_cluster]


if __name__ == "__main__":
if len(sys.argv) < 5:
sys.exit("python update_cluster.py project_id region cluster_name")

project_id = sys.argv[1]
region = sys.argv[2]
cluster_name = sys.argv[3]
new_num_instances = sys.argv[4]
update_cluster(project_id, region, cluster_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This sample walks a user through updating the number of clusters using the Dataproc
# client library.


import os
import uuid

from google.cloud.dataproc_v1.services.cluster_controller.client import (
ClusterControllerClient,
)
import pytest

import update_cluster


PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = "us-central1"
CLUSTER_NAME = f"py-cc-test-{str(uuid.uuid4())}"
NEW_NUM_INSTANCES = 5
CLUSTER = {
"project_id": PROJECT_ID,
"cluster_name": CLUSTER_NAME,
"config": {
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
},
}


@pytest.fixture(autouse=True)
def setup_teardown(cluster_client):
# Create the cluster.
operation = cluster_client.create_cluster(
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
)
operation.result()

yield

cluster_client.delete_cluster(
request={
"project_id": PROJECT_ID,
"region": REGION,
"cluster_name": CLUSTER_NAME,
}
)


@pytest.fixture
def cluster_client():
cluster_client = ClusterControllerClient(
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)}
)
return cluster_client


def test_update_cluster(capsys, cluster_client: ClusterControllerClient):
# Wrapper function for client library function
update_cluster.update_cluster(PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES)
new_num_cluster = cluster_client.get_cluster(
project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME
)

out, _ = capsys.readouterr()
assert CLUSTER_NAME in out
assert new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES

0 comments on commit b11a14b

Please sign in to comment.