Skip to content

Commit

Permalink
Update new Watson OpenScale components and pipeline (#1287)
Browse files Browse the repository at this point in the history
* update new openscale components and pipeline

* cleanup unused library

* fix readme and naming as requested and minor cleanup

* condense Dockerfile with new base image, load PipelineModel from spark.stage

* retrigger github checks from Travis
  • Loading branch information
Tomcli authored and k8s-ci-robot committed May 10, 2019
1 parent b675e02 commit 8418b31
Show file tree
Hide file tree
Showing 26 changed files with 781 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python:3.6.8-stretch
FROM aipipeline/pyspark:spark-2.1

RUN pip install --upgrade pip
RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale --no-cache | tail -n 1
RUN pip install --upgrade Minio --no-cache | tail -n 1
RUN pip install psycopg2-binary | tail -n 1

ENV APP_HOME /app
Expand All @@ -11,4 +11,4 @@ WORKDIR $APP_HOME
USER root

ENTRYPOINT ["python"]
CMD ["spark_data_preparation.py"]
CMD ["data_preprocess_spark.py"]
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ metadata:
annotations: {platform: 'IBM Cloud Spark Service'}
inputs:
- {name: bucket_name, description: 'Required. Object storage bucket name'}
- {name: data_url, description: 'Required. URL of the data source'}
- {name: data_url, description: 'Required. URL of the data source'}
outputs:
- {name: output, description: 'Data Filename'}
- {name: output, description: 'Data Filename'}
implementation:
container:
image: docker.io/aipipeline/training_with_spark_service:latest
command: ['python3']
image: docker.io/aipipeline/data_preprocess_spark:latest
command: ['python']
args: [
/app/data_preprocess_spark.py,
-u, data_preprocess_spark.py,
--bucket_name, {inputValue: bucket_name},
--data_url, {inputValue: data_url}
]
fileOutputs:
output: /tmp/output.txt
output: /tmp/filename
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import argparse
import ibm_boto3
import requests
from ibm_botocore.client import Config
from pyspark.sql import SparkSession
from minio import Minio
from minio.error import ResponseError
import re

def get_secret(path):

def get_secret_creds(path):
with open(path, 'r') as f:
cred = f.readline().strip('\'')
f.close()
Expand All @@ -13,15 +15,19 @@ def get_secret(path):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--bucket_name', type=str, help='Object storage bucket name', default="dummy-bucket-name")
parser.add_argument('--data_url', type=str, help='URL of the data source', default="https://raw.githubusercontent.com/emartensibm/german-credit/binary/credit_risk_training.csv")
parser.add_argument('--data_url', type=str, help='URL of the data source', required=True)
args = parser.parse_args()

cos_bucket_name = args.bucket_name
data_url = args.data_url

cos_url = get_secret("/app/secrets/cos_url")
cos_apikey = get_secret("/app/secrets/cos_resource_id")
cos_resource_instance_id = get_secret("/app/secrets/cos_resource_id")
cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint")
cos_access_key = get_secret_creds("/app/secrets/cos_access_key")
cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key")

''' Remove possible http scheme for Minio '''
url = re.compile(r"https?://")
cos_endpoint = url.sub('', cos_endpoint)

''' Download data from data source '''
filename = data_url
Expand All @@ -36,25 +42,22 @@ def get_secret(path):
df_data = spark.read.csv(path=filename, sep=",", header=True, inferSchema=True)
df_data.head()

''' Upload data to IBM Cloud object storage '''
cos = ibm_boto3.resource('s3',
ibm_api_key_id=cos_apikey,
ibm_service_instance_id=cos_resource_instance_id,
ibm_auth_endpoint='https://iam.bluemix.net/oidc/token',
config=Config(signature_version='oauth'),
endpoint_url=cos_url)

buckets = []
for bucket in cos.buckets.all():
buckets.append(bucket.name)
''' Upload data to Cloud object storage '''
cos = Minio(cos_endpoint,
access_key=cos_access_key,
secret_key=cos_secret_key,
secure=True)

if cos_bucket_name not in buckets:
cos.create_bucket(Bucket=cos_bucket_name)
if not cos.bucket_exists(cos_bucket_name):
try:
cos.make_bucket(cos_bucket_name)
except ResponseError as err:
print(err)

cos.Bucket(cos_bucket_name).upload_file(filename, filename)
cos.fput_object(cos_bucket_name, filename, filename)

print('Data ' + filename + ' is uploaded to bucket at ' + cos_bucket_name)
with open("/tmp/filename.txt", "w") as report:
with open("/tmp/filename", "w") as report:
report.write(filename)

df_data.printSchema()
Expand Down
14 changes: 14 additions & 0 deletions components/ibm-components/spark/store_spark_model/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM aipipeline/pyspark:spark-2.1

RUN pip install --upgrade pip
RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale Minio --no-cache | tail -n 1
RUN pip install psycopg2-binary | tail -n 1

ENV APP_HOME /app
COPY src $APP_HOME
WORKDIR $APP_HOME

USER root

ENTRYPOINT ["python"]
CMD ["store_spark_model.py"]
43 changes: 43 additions & 0 deletions components/ibm-components/spark/store_spark_model/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: 'Store Spark Model - Watson Machine Learning'
description: |
Store any trained Spark Model using IBM Watson Machine Learning Service
metadata:
annotations: {platform: 'IBM Watson Machine Learning Service'}
inputs:
- {name: bucket_name, description: 'Required. Object storage bucket name'}
- {name: aios_manifest_path, description: 'Required. Object storage file path for the aios manifest file'}
- {name: problem_type, description: 'Required. Model problem type'}
- {name: model_name, description: 'Required. Model name for the trained model'}
- {name: deployment_name, description: 'Required. Deployment name for the trained model'}
- {name: model_filepath, description: 'Required. Name of the trained model zip'}
- {name: train_data_filepath, description: 'Required. Name of the training data zip'}
outputs:
- {name: model_uid, description: 'Stored model UID'}
implementation:
container:
image: docker.io/aipipeline/store_spark_model:latest
command: ['python']
args: [
-u, store_spark_model.py,
--bucket_name, {inputValue: bucket_name},
--aios_manifest_path, {inputValue: aios_manifest_path},
--problem_type, {inputValue: problem_type},
--model_name, {inputValue: model_name},
--deployment_name, {inputValue: deployment_name},
--model_filepath, {inputValue: model_filepath},
--train_data_filepath, {inputValue: train_data_filepath}
]
fileOutputs:
model_uid: /tmp/model_uid
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import argparse
import json
import os
import re
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import PipelineModel
from pyspark import SparkConf, SparkContext
from pyspark.ml import Pipeline, Model
from watson_machine_learning_client import WatsonMachineLearningAPIClient
from minio import Minio


def get_secret_creds(path):
with open(path, 'r') as f:
cred = f.readline().strip('\'')
f.close()
return cred


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--bucket_name', type=str, help='Object storage bucket name', default="dummy-bucket-name")
parser.add_argument('--model_filepath', type=str, help='Name of the trained spark model packaged as zip', default="model.zip")
parser.add_argument('--train_data_filepath', type=str, help='Name of the train_data zip', default="train_data.zip")
parser.add_argument('--aios_manifest_path', type=str, help='Object storage file path for the aios manifest file', default="")
parser.add_argument('--problem_type', type=str, help='Model problem type', default="BINARY_CLASSIFICATION")
parser.add_argument('--model_name', type=str, help='model name for the trained model', default="Spark German Risk Model - Final")
parser.add_argument('--deployment_name', type=str, help='deployment name for the trained model', default="Spark German Risk Deployment - Final")
args = parser.parse_args()

cos_bucket_name = args.bucket_name
model_filepath = args.model_filepath
aios_manifest_path = args.aios_manifest_path
train_data_filepath = args.train_data_filepath
problem_type = args.problem_type
MODEL_NAME = args.model_name
DEPLOYMENT_NAME = args.deployment_name

wml_url = get_secret_creds("/app/secrets/wml_url")
wml_username = get_secret_creds("/app/secrets/wml_username")
wml_password = get_secret_creds("/app/secrets/wml_password")
wml_instance_id = get_secret_creds("/app/secrets/wml_instance_id")
cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint")
cos_access_key = get_secret_creds("/app/secrets/cos_access_key")
cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key")

''' Remove possible http scheme for Minio '''
url = re.compile(r"https?://")
cos_endpoint = url.sub('', cos_endpoint)

WML_CREDENTIALS = {
"url": wml_url,
"username": wml_username,
"password": wml_password,
"instance_id": wml_instance_id
}
''' Load Spark model '''
cos = Minio(cos_endpoint,
access_key=cos_access_key,
secret_key=cos_secret_key,
secure=True)

cos.fget_object(cos_bucket_name, model_filepath, model_filepath)
cos.fget_object(cos_bucket_name, train_data_filepath, train_data_filepath)
cos.fget_object(cos_bucket_name, 'evaluation.json', 'evaluation.json')
if aios_manifest_path:
cos.fget_object(cos_bucket_name, aios_manifest_path, aios_manifest_path)

os.system('unzip %s' % model_filepath)
print('model ' + model_filepath + ' is downloaded')
os.system('unzip %s' % train_data_filepath)
print('train_data ' + train_data_filepath + ' is downloaded')

sc = SparkContext()
model = PipelineModel.load(model_filepath.split('.')[0])
pipeline = Pipeline(stages=model.stages)
spark = SparkSession.builder.getOrCreate()
train_data = spark.read.csv(path=train_data_filepath.split('.')[0], sep=",", header=True, inferSchema=True)

''' Remove previous deployed model '''
wml_client = WatsonMachineLearningAPIClient(WML_CREDENTIALS)
model_deployment_ids = wml_client.deployments.get_uids()
deleted_model_id = None
for deployment_id in model_deployment_ids:
deployment = wml_client.deployments.get_details(deployment_id)
model_id = deployment['entity']['deployable_asset']['guid']
if deployment['entity']['name'] == DEPLOYMENT_NAME:
print('Deleting deployment id', deployment_id)
wml_client.deployments.delete(deployment_id)
print('Deleting model id', model_id)
wml_client.repository.delete(model_id)
deleted_model_id = model_id
wml_client.repository.list_models()

''' Save and Deploy model '''
if aios_manifest_path:
with open(aios_manifest_path) as f:
aios_manifest = json.load(f)
OUTPUT_DATA_SCHEMA = {'fields': aios_manifest['model_schema'], 'type': 'struct'}
f.close()
else:
OUTPUT_DATA_SCHEMA = None

with open('evaluation.json') as f:
evaluation = json.load(f)
f.close()

if problem_type == 'BINARY_CLASSIFICATION':
EVALUATION_METHOD = 'binary'
else:
EVALUATION_METHOD = 'multiclass'

''' Define evaluation threshold '''
model_props = {
wml_client.repository.ModelMetaNames.NAME: "{}".format(MODEL_NAME),
wml_client.repository.ModelMetaNames.EVALUATION_METHOD: EVALUATION_METHOD,
wml_client.repository.ModelMetaNames.EVALUATION_METRICS: evaluation['metrics']
}
if aios_manifest_path:
model_props[wml_client.repository.ModelMetaNames.OUTPUT_DATA_SCHEMA] = OUTPUT_DATA_SCHEMA

wml_models = wml_client.repository.get_details()
model_uid = None
for model_in in wml_models['models']['resources']:
if MODEL_NAME == model_in['entity']['name']:
model_uid = model_in['metadata']['guid']
break

if model_uid is None:
print("Storing model ...")

published_model_details = wml_client.repository.store_model(model=model, meta_props=model_props, training_data=train_data, pipeline=pipeline)
model_uid = wml_client.repository.get_model_uid(published_model_details)
print("Done")
else:
print("Model already exist")

with open("/tmp/model_uid", "w") as report:
report.write(model_uid)
2 changes: 0 additions & 2 deletions components/ibm-components/spark/train_spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
FROM python:3.6.8-stretch
RUN apt-get update
RUN apt-get install -y vim wget curl

ENV APP_HOME /app
COPY src $APP_HOME
Expand Down
18 changes: 10 additions & 8 deletions components/ibm-components/spark/train_spark/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@ description: |
metadata:
annotations: {platform: 'IBM Cloud Spark Service'}
inputs:
- {name: bucket_name, description: 'Required. Object storage bucket name'}
- {name: data_filename, description: 'Required. Name of the data binary'}
- {name: model_filename, description: 'Required. Name of the training model file'}
- {name: bucket_name, description: 'Required. Object storage bucket name'}
- {name: data_filename, description: 'Required. Name of the data binary'}
- {name: model_filename, description: 'Required. Name of the training model file'}
- {name: spark_entrypoint, description: 'Required. Entrypoint command for training the spark model'}
outputs:
- {name: output, description: 'Spark Model Filename'}
- {name: model_filepath, description: 'Spark Model binary filepath'}
- {name: train_data_filepath, description: 'Spark training data filepath'}
implementation:
container:
image: docker.io/aipipeline/training_with_spark_service:latest
command: ['python3']
image: docker.io/aipipeline/train_spark:latest
command: ['python']
args: [
/app/train_spark.py,
-u, train_spark.py,
--bucket_name, {inputValue: bucket_name},
--data_filename, {inputValue: data_filename},
--model_filename, {inputValue: model_filename},
--spark_entrypoint, {inputValue: spark_entrypoint}
]
fileOutputs:
output: /tmp/spark_model.txt
model_filepath: /tmp/model_filepath
train_data_filepath: /tmp/train_data_filepath
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env bash

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down
Loading

0 comments on commit 8418b31

Please sign in to comment.