Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update new Watson OpenScale components and pipeline #1287

Merged
merged 5 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python:3.6.8-stretch
FROM aipipeline/pyspark:spark-2.1

RUN pip install --upgrade pip
RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale --no-cache | tail -n 1
RUN pip install --upgrade Minio --no-cache | tail -n 1
RUN pip install psycopg2-binary | tail -n 1

ENV APP_HOME /app
Expand All @@ -11,4 +11,4 @@ WORKDIR $APP_HOME
USER root

ENTRYPOINT ["python"]
CMD ["spark_data_preparation.py"]
CMD ["data_preprocess_spark.py"]
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ metadata:
annotations: {platform: 'IBM Cloud Spark Service'}
inputs:
- {name: bucket_name, description: 'Required. Object storage bucket name'}
- {name: data_url, description: 'Required. URL of the data source'}
- {name: data_url, description: 'Required. URL of the data source'}
outputs:
- {name: output, description: 'Data Filename'}
- {name: output, description: 'Data Filename'}
implementation:
container:
image: docker.io/aipipeline/training_with_spark_service:latest
command: ['python3']
image: docker.io/aipipeline/data_preprocess_spark:latest
command: ['python']
args: [
/app/data_preprocess_spark.py,
-u, data_preprocess_spark.py,
--bucket_name, {inputValue: bucket_name},
--data_url, {inputValue: data_url}
]
fileOutputs:
output: /tmp/output.txt
output: /tmp/filename
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import argparse
import ibm_boto3
import requests
from ibm_botocore.client import Config
from pyspark.sql import SparkSession
from minio import Minio
from minio.error import ResponseError
import re

def get_secret(path):

def get_secret_creds(path):
with open(path, 'r') as f:
cred = f.readline().strip('\'')
f.close()
Expand All @@ -13,15 +15,19 @@ def get_secret(path):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--bucket_name', type=str, help='Object storage bucket name', default="dummy-bucket-name")
parser.add_argument('--data_url', type=str, help='URL of the data source', default="https://raw.githubusercontent.com/emartensibm/german-credit/binary/credit_risk_training.csv")
parser.add_argument('--data_url', type=str, help='URL of the data source', required=True)
args = parser.parse_args()

cos_bucket_name = args.bucket_name
data_url = args.data_url

cos_url = get_secret("/app/secrets/cos_url")
cos_apikey = get_secret("/app/secrets/cos_resource_id")
cos_resource_instance_id = get_secret("/app/secrets/cos_resource_id")
cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint")
cos_access_key = get_secret_creds("/app/secrets/cos_access_key")
cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key")

''' Remove possible http scheme for Minio '''
url = re.compile(r"https?://")
cos_endpoint = url.sub('', cos_endpoint)

''' Download data from data source '''
filename = data_url
Expand All @@ -36,25 +42,22 @@ def get_secret(path):
df_data = spark.read.csv(path=filename, sep=",", header=True, inferSchema=True)
df_data.head()

''' Upload data to IBM Cloud object storage '''
cos = ibm_boto3.resource('s3',
ibm_api_key_id=cos_apikey,
ibm_service_instance_id=cos_resource_instance_id,
ibm_auth_endpoint='https://iam.bluemix.net/oidc/token',
config=Config(signature_version='oauth'),
endpoint_url=cos_url)

buckets = []
for bucket in cos.buckets.all():
buckets.append(bucket.name)
''' Upload data to Cloud object storage '''
cos = Minio(cos_endpoint,
access_key=cos_access_key,
secret_key=cos_secret_key,
secure=True)

if cos_bucket_name not in buckets:
cos.create_bucket(Bucket=cos_bucket_name)
if not cos.bucket_exists(cos_bucket_name):
try:
cos.make_bucket(cos_bucket_name)
except ResponseError as err:
print(err)

cos.Bucket(cos_bucket_name).upload_file(filename, filename)
cos.fput_object(cos_bucket_name, filename, filename)

print('Data ' + filename + ' is uploaded to bucket at ' + cos_bucket_name)
with open("/tmp/filename.txt", "w") as report:
with open("/tmp/filename", "w") as report:
report.write(filename)

df_data.printSchema()
Expand Down
14 changes: 14 additions & 0 deletions components/ibm-components/spark/store_spark_model/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM aipipeline/pyspark:spark-2.1

RUN pip install --upgrade pip
RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale Minio --no-cache | tail -n 1
RUN pip install psycopg2-binary | tail -n 1

ENV APP_HOME /app
COPY src $APP_HOME
WORKDIR $APP_HOME

USER root

ENTRYPOINT ["python"]
CMD ["store_spark_model.py"]
43 changes: 43 additions & 0 deletions components/ibm-components/spark/store_spark_model/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: 'Store Spark Model - Watson Machine Learning'
Tomcli marked this conversation as resolved.
Show resolved Hide resolved
description: |
Store any trained Spark Model using IBM Watson Machine Learning Service
metadata:
annotations: {platform: 'IBM Watson Machine Learning Service'}
inputs:
- {name: bucket_name, description: 'Required. Object storage bucket name'}
- {name: aios_manifest_path, description: 'Required. Object storage file path for the aios manifest file'}
- {name: problem_type, description: 'Required. Model problem type'}
- {name: model_name, description: 'Required. Model name for the trained model'}
- {name: deployment_name, description: 'Required. Deployment name for the trained model'}
- {name: model_filepath, description: 'Required. Name of the trained model zip'}
- {name: train_data_filepath, description: 'Required. Name of the training data zip'}
outputs:
- {name: model_uid, description: 'Stored model UID'}
implementation:
container:
image: docker.io/aipipeline/store_spark_model:latest
command: ['python']
args: [
-u, store_spark_model.py,
--bucket_name, {inputValue: bucket_name},
--aios_manifest_path, {inputValue: aios_manifest_path},
--problem_type, {inputValue: problem_type},
--model_name, {inputValue: model_name},
--deployment_name, {inputValue: deployment_name},
--model_filepath, {inputValue: model_filepath},
--train_data_filepath, {inputValue: train_data_filepath}
]
fileOutputs:
model_uid: /tmp/model_uid
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import argparse
import json
import os
import re
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import PipelineModel
from pyspark import SparkConf, SparkContext
from pyspark.ml import Pipeline, Model
from watson_machine_learning_client import WatsonMachineLearningAPIClient
from minio import Minio


def get_secret_creds(path):
with open(path, 'r') as f:
cred = f.readline().strip('\'')
f.close()
return cred


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--bucket_name', type=str, help='Object storage bucket name', default="dummy-bucket-name")
parser.add_argument('--model_filepath', type=str, help='Name of the trained spark model packaged as zip', default="model.zip")
parser.add_argument('--train_data_filepath', type=str, help='Name of the train_data zip', default="train_data.zip")
parser.add_argument('--aios_manifest_path', type=str, help='Object storage file path for the aios manifest file', default="")
parser.add_argument('--problem_type', type=str, help='Model problem type', default="BINARY_CLASSIFICATION")
parser.add_argument('--model_name', type=str, help='model name for the trained model', default="Spark German Risk Model - Final")
parser.add_argument('--deployment_name', type=str, help='deployment name for the trained model', default="Spark German Risk Deployment - Final")
args = parser.parse_args()

cos_bucket_name = args.bucket_name
model_filepath = args.model_filepath
aios_manifest_path = args.aios_manifest_path
train_data_filepath = args.train_data_filepath
problem_type = args.problem_type
MODEL_NAME = args.model_name
DEPLOYMENT_NAME = args.deployment_name

wml_url = get_secret_creds("/app/secrets/wml_url")
wml_username = get_secret_creds("/app/secrets/wml_username")
wml_password = get_secret_creds("/app/secrets/wml_password")
wml_instance_id = get_secret_creds("/app/secrets/wml_instance_id")
cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint")
cos_access_key = get_secret_creds("/app/secrets/cos_access_key")
cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key")

''' Remove possible http scheme for Minio '''
url = re.compile(r"https?://")
cos_endpoint = url.sub('', cos_endpoint)

WML_CREDENTIALS = {
"url": wml_url,
"username": wml_username,
"password": wml_password,
"instance_id": wml_instance_id
}
''' Load Spark model '''
cos = Minio(cos_endpoint,
access_key=cos_access_key,
secret_key=cos_secret_key,
secure=True)

cos.fget_object(cos_bucket_name, model_filepath, model_filepath)
cos.fget_object(cos_bucket_name, train_data_filepath, train_data_filepath)
cos.fget_object(cos_bucket_name, 'evaluation.json', 'evaluation.json')
if aios_manifest_path:
cos.fget_object(cos_bucket_name, aios_manifest_path, aios_manifest_path)

os.system('unzip %s' % model_filepath)
print('model ' + model_filepath + ' is downloaded')
os.system('unzip %s' % train_data_filepath)
print('train_data ' + train_data_filepath + ' is downloaded')

sc = SparkContext()
model = PipelineModel.load(model_filepath.split('.')[0])
pipeline = Pipeline(stages=model.stages)
spark = SparkSession.builder.getOrCreate()
train_data = spark.read.csv(path=train_data_filepath.split('.')[0], sep=",", header=True, inferSchema=True)

''' Remove previous deployed model '''
wml_client = WatsonMachineLearningAPIClient(WML_CREDENTIALS)
model_deployment_ids = wml_client.deployments.get_uids()
deleted_model_id = None
for deployment_id in model_deployment_ids:
deployment = wml_client.deployments.get_details(deployment_id)
model_id = deployment['entity']['deployable_asset']['guid']
if deployment['entity']['name'] == DEPLOYMENT_NAME:
print('Deleting deployment id', deployment_id)
wml_client.deployments.delete(deployment_id)
print('Deleting model id', model_id)
wml_client.repository.delete(model_id)
deleted_model_id = model_id
wml_client.repository.list_models()

''' Save and Deploy model '''
if aios_manifest_path:
with open(aios_manifest_path) as f:
aios_manifest = json.load(f)
OUTPUT_DATA_SCHEMA = {'fields': aios_manifest['model_schema'], 'type': 'struct'}
f.close()
else:
OUTPUT_DATA_SCHEMA = None

with open('evaluation.json') as f:
evaluation = json.load(f)
f.close()

if problem_type == 'BINARY_CLASSIFICATION':
EVALUATION_METHOD = 'binary'
else:
EVALUATION_METHOD = 'multiclass'

''' Define evaluation threshold '''
model_props = {
wml_client.repository.ModelMetaNames.NAME: "{}".format(MODEL_NAME),
wml_client.repository.ModelMetaNames.EVALUATION_METHOD: EVALUATION_METHOD,
wml_client.repository.ModelMetaNames.EVALUATION_METRICS: evaluation['metrics']
}
if aios_manifest_path:
model_props[wml_client.repository.ModelMetaNames.OUTPUT_DATA_SCHEMA] = OUTPUT_DATA_SCHEMA

wml_models = wml_client.repository.get_details()
model_uid = None
for model_in in wml_models['models']['resources']:
if MODEL_NAME == model_in['entity']['name']:
model_uid = model_in['metadata']['guid']
break

if model_uid is None:
print("Storing model ...")

published_model_details = wml_client.repository.store_model(model=model, meta_props=model_props, training_data=train_data, pipeline=pipeline)
model_uid = wml_client.repository.get_model_uid(published_model_details)
print("Done")
else:
print("Model already exist")

with open("/tmp/model_uid", "w") as report:
report.write(model_uid)
2 changes: 0 additions & 2 deletions components/ibm-components/spark/train_spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
FROM python:3.6.8-stretch
RUN apt-get update
RUN apt-get install -y vim wget curl

ENV APP_HOME /app
COPY src $APP_HOME
Expand Down
18 changes: 10 additions & 8 deletions components/ibm-components/spark/train_spark/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@ description: |
metadata:
annotations: {platform: 'IBM Cloud Spark Service'}
inputs:
- {name: bucket_name, description: 'Required. Object storage bucket name'}
- {name: data_filename, description: 'Required. Name of the data binary'}
- {name: model_filename, description: 'Required. Name of the training model file'}
- {name: bucket_name, description: 'Required. Object storage bucket name'}
- {name: data_filename, description: 'Required. Name of the data binary'}
- {name: model_filename, description: 'Required. Name of the training model file'}
- {name: spark_entrypoint, description: 'Required. Entrypoint command for training the spark model'}
outputs:
- {name: output, description: 'Spark Model Filename'}
- {name: model_filepath, description: 'Spark Model binary filepath'}
- {name: train_data_filepath, description: 'Spark training data filepath'}
implementation:
container:
image: docker.io/aipipeline/training_with_spark_service:latest
command: ['python3']
image: docker.io/aipipeline/train_spark:latest
command: ['python']
args: [
/app/train_spark.py,
-u, train_spark.py,
--bucket_name, {inputValue: bucket_name},
--data_filename, {inputValue: data_filename},
--model_filename, {inputValue: model_filename},
--spark_entrypoint, {inputValue: spark_entrypoint}
]
fileOutputs:
output: /tmp/spark_model.txt
model_filepath: /tmp/model_filepath
train_data_filepath: /tmp/train_data_filepath
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env bash

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down
Loading