From 8418b316f5b4d096fc15109ab90061984217d3ed Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Fri, 10 May 2019 01:09:57 -0700 Subject: [PATCH] Update new Watson OpenScale components and pipeline (#1287) * update new openscale components and pipeline * cleanup unused library * fix readme and naming as requested and minor cleanup * condense Dockerfile with new base image, load PipelineModel from spark.stage * retrigger github checks from Travis --- .../spark/data_preprocess_spark/Dockerfile | 6 +- .../data_preprocess_spark/component.yaml | 12 +- .../src/data_preprocess_spark.py | 47 +++--- .../spark/store_spark_model/Dockerfile | 14 ++ .../spark/store_spark_model/component.yaml | 43 ++++++ .../src/store_spark_model.py | 139 ++++++++++++++++++ .../spark/train_spark/Dockerfile | 2 - .../spark/train_spark/component.yaml | 18 ++- .../spark/train_spark/src/spark-submit.sh | 2 + .../spark/train_spark/src/train_spark.py | 32 ++-- .../spark/train_spark/src/wrapper.py | 34 ++--- .../watson/deploy/component.yaml | 18 ++- .../watson/deploy/src/wml-deploy.py | 88 ++++++----- .../watson/manage/monitor_fairness/Dockerfile | 2 +- .../manage/monitor_fairness/component.yaml | 4 +- .../monitor_fairness/src/monitor_fairness.py | 38 +++-- .../manage/monitor_quality/component.yaml | 2 - .../monitor_quality/src/monitor_quality.py | 12 +- .../watson/manage/subscribe/Dockerfile | 2 +- .../watson/manage/subscribe/component.yaml | 20 ++- .../watson/manage/subscribe/src/subscribe.py | 75 ++++++++-- samples/ibm-samples/openscale/README.md | 116 +++++++++++++++ .../openscale/credentials/creds.ini | 19 +++ samples/ibm-samples/openscale/openscale.py | 83 +++++++++++ .../ibm-samples/openscale/source/aios.json | 45 ++++++ samples/ibm-samples/openscale/source/model.py | 75 ++++++++++ 26 files changed, 781 insertions(+), 167 deletions(-) create mode 100644 components/ibm-components/spark/store_spark_model/Dockerfile create mode 100644 components/ibm-components/spark/store_spark_model/component.yaml create mode 100644 components/ibm-components/spark/store_spark_model/src/store_spark_model.py create mode 100644 samples/ibm-samples/openscale/README.md create mode 100644 samples/ibm-samples/openscale/credentials/creds.ini create mode 100644 samples/ibm-samples/openscale/openscale.py create mode 100644 samples/ibm-samples/openscale/source/aios.json create mode 100644 samples/ibm-samples/openscale/source/model.py diff --git a/components/ibm-components/spark/data_preprocess_spark/Dockerfile b/components/ibm-components/spark/data_preprocess_spark/Dockerfile index 39b721fbc4c..104bd3444ca 100644 --- a/components/ibm-components/spark/data_preprocess_spark/Dockerfile +++ b/components/ibm-components/spark/data_preprocess_spark/Dockerfile @@ -1,7 +1,7 @@ -FROM python:3.6.8-stretch +FROM aipipeline/pyspark:spark-2.1 RUN pip install --upgrade pip -RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale --no-cache | tail -n 1 +RUN pip install --upgrade Minio --no-cache | tail -n 1 RUN pip install psycopg2-binary | tail -n 1 ENV APP_HOME /app @@ -11,4 +11,4 @@ WORKDIR $APP_HOME USER root ENTRYPOINT ["python"] -CMD ["spark_data_preparation.py"] +CMD ["data_preprocess_spark.py"] diff --git a/components/ibm-components/spark/data_preprocess_spark/component.yaml b/components/ibm-components/spark/data_preprocess_spark/component.yaml index c1a66244433..234ae01d383 100644 --- a/components/ibm-components/spark/data_preprocess_spark/component.yaml +++ b/components/ibm-components/spark/data_preprocess_spark/component.yaml @@ -17,17 +17,17 @@ metadata: annotations: {platform: 'IBM Cloud Spark Service'} inputs: - {name: bucket_name, description: 'Required. Object storage bucket name'} - - {name: data_url, description: 'Required. URL of the data source'} + - {name: data_url, description: 'Required. URL of the data source'} outputs: - - {name: output, description: 'Data Filename'} + - {name: output, description: 'Data Filename'} implementation: container: - image: docker.io/aipipeline/training_with_spark_service:latest - command: ['python3'] + image: docker.io/aipipeline/data_preprocess_spark:latest + command: ['python'] args: [ - /app/data_preprocess_spark.py, + -u, data_preprocess_spark.py, --bucket_name, {inputValue: bucket_name}, --data_url, {inputValue: data_url} ] fileOutputs: - output: /tmp/output.txt + output: /tmp/filename diff --git a/components/ibm-components/spark/data_preprocess_spark/src/data_preprocess_spark.py b/components/ibm-components/spark/data_preprocess_spark/src/data_preprocess_spark.py index 1a67bd539bc..50aab2177dc 100644 --- a/components/ibm-components/spark/data_preprocess_spark/src/data_preprocess_spark.py +++ b/components/ibm-components/spark/data_preprocess_spark/src/data_preprocess_spark.py @@ -1,10 +1,12 @@ import argparse -import ibm_boto3 import requests -from ibm_botocore.client import Config from pyspark.sql import SparkSession +from minio import Minio +from minio.error import ResponseError +import re -def get_secret(path): + +def get_secret_creds(path): with open(path, 'r') as f: cred = f.readline().strip('\'') f.close() @@ -13,15 +15,19 @@ def get_secret(path): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--bucket_name', type=str, help='Object storage bucket name', default="dummy-bucket-name") - parser.add_argument('--data_url', type=str, help='URL of the data source', default="https://raw.githubusercontent.com/emartensibm/german-credit/binary/credit_risk_training.csv") + parser.add_argument('--data_url', type=str, help='URL of the data source', required=True) args = parser.parse_args() cos_bucket_name = args.bucket_name data_url = args.data_url - cos_url = get_secret("/app/secrets/cos_url") - cos_apikey = get_secret("/app/secrets/cos_resource_id") - cos_resource_instance_id = get_secret("/app/secrets/cos_resource_id") + cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint") + cos_access_key = get_secret_creds("/app/secrets/cos_access_key") + cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key") + + ''' Remove possible http scheme for Minio ''' + url = re.compile(r"https?://") + cos_endpoint = url.sub('', cos_endpoint) ''' Download data from data source ''' filename = data_url @@ -36,25 +42,22 @@ def get_secret(path): df_data = spark.read.csv(path=filename, sep=",", header=True, inferSchema=True) df_data.head() - ''' Upload data to IBM Cloud object storage ''' - cos = ibm_boto3.resource('s3', - ibm_api_key_id=cos_apikey, - ibm_service_instance_id=cos_resource_instance_id, - ibm_auth_endpoint='https://iam.bluemix.net/oidc/token', - config=Config(signature_version='oauth'), - endpoint_url=cos_url) - - buckets = [] - for bucket in cos.buckets.all(): - buckets.append(bucket.name) + ''' Upload data to Cloud object storage ''' + cos = Minio(cos_endpoint, + access_key=cos_access_key, + secret_key=cos_secret_key, + secure=True) - if cos_bucket_name not in buckets: - cos.create_bucket(Bucket=cos_bucket_name) + if not cos.bucket_exists(cos_bucket_name): + try: + cos.make_bucket(cos_bucket_name) + except ResponseError as err: + print(err) - cos.Bucket(cos_bucket_name).upload_file(filename, filename) + cos.fput_object(cos_bucket_name, filename, filename) print('Data ' + filename + ' is uploaded to bucket at ' + cos_bucket_name) - with open("/tmp/filename.txt", "w") as report: + with open("/tmp/filename", "w") as report: report.write(filename) df_data.printSchema() diff --git a/components/ibm-components/spark/store_spark_model/Dockerfile b/components/ibm-components/spark/store_spark_model/Dockerfile new file mode 100644 index 00000000000..981e2623751 --- /dev/null +++ b/components/ibm-components/spark/store_spark_model/Dockerfile @@ -0,0 +1,14 @@ +FROM aipipeline/pyspark:spark-2.1 + +RUN pip install --upgrade pip +RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale Minio --no-cache | tail -n 1 +RUN pip install psycopg2-binary | tail -n 1 + +ENV APP_HOME /app +COPY src $APP_HOME +WORKDIR $APP_HOME + +USER root + +ENTRYPOINT ["python"] +CMD ["store_spark_model.py"] diff --git a/components/ibm-components/spark/store_spark_model/component.yaml b/components/ibm-components/spark/store_spark_model/component.yaml new file mode 100644 index 00000000000..dba6eb14f24 --- /dev/null +++ b/components/ibm-components/spark/store_spark_model/component.yaml @@ -0,0 +1,43 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: 'Store Spark Model - Watson Machine Learning' +description: | + Store any trained Spark Model using IBM Watson Machine Learning Service +metadata: + annotations: {platform: 'IBM Watson Machine Learning Service'} +inputs: + - {name: bucket_name, description: 'Required. Object storage bucket name'} + - {name: aios_manifest_path, description: 'Required. Object storage file path for the aios manifest file'} + - {name: problem_type, description: 'Required. Model problem type'} + - {name: model_name, description: 'Required. Model name for the trained model'} + - {name: deployment_name, description: 'Required. Deployment name for the trained model'} + - {name: model_filepath, description: 'Required. Name of the trained model zip'} + - {name: train_data_filepath, description: 'Required. Name of the training data zip'} +outputs: + - {name: model_uid, description: 'Stored model UID'} +implementation: + container: + image: docker.io/aipipeline/store_spark_model:latest + command: ['python'] + args: [ + -u, store_spark_model.py, + --bucket_name, {inputValue: bucket_name}, + --aios_manifest_path, {inputValue: aios_manifest_path}, + --problem_type, {inputValue: problem_type}, + --model_name, {inputValue: model_name}, + --deployment_name, {inputValue: deployment_name}, + --model_filepath, {inputValue: model_filepath}, + --train_data_filepath, {inputValue: train_data_filepath} + ] + fileOutputs: + model_uid: /tmp/model_uid diff --git a/components/ibm-components/spark/store_spark_model/src/store_spark_model.py b/components/ibm-components/spark/store_spark_model/src/store_spark_model.py new file mode 100644 index 00000000000..2bc359fcdd7 --- /dev/null +++ b/components/ibm-components/spark/store_spark_model/src/store_spark_model.py @@ -0,0 +1,139 @@ +import argparse +import json +import os +import re +from pyspark.sql import SparkSession +from pyspark.ml.pipeline import PipelineModel +from pyspark import SparkConf, SparkContext +from pyspark.ml import Pipeline, Model +from watson_machine_learning_client import WatsonMachineLearningAPIClient +from minio import Minio + + +def get_secret_creds(path): + with open(path, 'r') as f: + cred = f.readline().strip('\'') + f.close() + return cred + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('--bucket_name', type=str, help='Object storage bucket name', default="dummy-bucket-name") + parser.add_argument('--model_filepath', type=str, help='Name of the trained spark model packaged as zip', default="model.zip") + parser.add_argument('--train_data_filepath', type=str, help='Name of the train_data zip', default="train_data.zip") + parser.add_argument('--aios_manifest_path', type=str, help='Object storage file path for the aios manifest file', default="") + parser.add_argument('--problem_type', type=str, help='Model problem type', default="BINARY_CLASSIFICATION") + parser.add_argument('--model_name', type=str, help='model name for the trained model', default="Spark German Risk Model - Final") + parser.add_argument('--deployment_name', type=str, help='deployment name for the trained model', default="Spark German Risk Deployment - Final") + args = parser.parse_args() + + cos_bucket_name = args.bucket_name + model_filepath = args.model_filepath + aios_manifest_path = args.aios_manifest_path + train_data_filepath = args.train_data_filepath + problem_type = args.problem_type + MODEL_NAME = args.model_name + DEPLOYMENT_NAME = args.deployment_name + + wml_url = get_secret_creds("/app/secrets/wml_url") + wml_username = get_secret_creds("/app/secrets/wml_username") + wml_password = get_secret_creds("/app/secrets/wml_password") + wml_instance_id = get_secret_creds("/app/secrets/wml_instance_id") + cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint") + cos_access_key = get_secret_creds("/app/secrets/cos_access_key") + cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key") + + ''' Remove possible http scheme for Minio ''' + url = re.compile(r"https?://") + cos_endpoint = url.sub('', cos_endpoint) + + WML_CREDENTIALS = { + "url": wml_url, + "username": wml_username, + "password": wml_password, + "instance_id": wml_instance_id + } + ''' Load Spark model ''' + cos = Minio(cos_endpoint, + access_key=cos_access_key, + secret_key=cos_secret_key, + secure=True) + + cos.fget_object(cos_bucket_name, model_filepath, model_filepath) + cos.fget_object(cos_bucket_name, train_data_filepath, train_data_filepath) + cos.fget_object(cos_bucket_name, 'evaluation.json', 'evaluation.json') + if aios_manifest_path: + cos.fget_object(cos_bucket_name, aios_manifest_path, aios_manifest_path) + + os.system('unzip %s' % model_filepath) + print('model ' + model_filepath + ' is downloaded') + os.system('unzip %s' % train_data_filepath) + print('train_data ' + train_data_filepath + ' is downloaded') + + sc = SparkContext() + model = PipelineModel.load(model_filepath.split('.')[0]) + pipeline = Pipeline(stages=model.stages) + spark = SparkSession.builder.getOrCreate() + train_data = spark.read.csv(path=train_data_filepath.split('.')[0], sep=",", header=True, inferSchema=True) + + ''' Remove previous deployed model ''' + wml_client = WatsonMachineLearningAPIClient(WML_CREDENTIALS) + model_deployment_ids = wml_client.deployments.get_uids() + deleted_model_id = None + for deployment_id in model_deployment_ids: + deployment = wml_client.deployments.get_details(deployment_id) + model_id = deployment['entity']['deployable_asset']['guid'] + if deployment['entity']['name'] == DEPLOYMENT_NAME: + print('Deleting deployment id', deployment_id) + wml_client.deployments.delete(deployment_id) + print('Deleting model id', model_id) + wml_client.repository.delete(model_id) + deleted_model_id = model_id + wml_client.repository.list_models() + + ''' Save and Deploy model ''' + if aios_manifest_path: + with open(aios_manifest_path) as f: + aios_manifest = json.load(f) + OUTPUT_DATA_SCHEMA = {'fields': aios_manifest['model_schema'], 'type': 'struct'} + f.close() + else: + OUTPUT_DATA_SCHEMA = None + + with open('evaluation.json') as f: + evaluation = json.load(f) + f.close() + + if problem_type == 'BINARY_CLASSIFICATION': + EVALUATION_METHOD = 'binary' + else: + EVALUATION_METHOD = 'multiclass' + + ''' Define evaluation threshold ''' + model_props = { + wml_client.repository.ModelMetaNames.NAME: "{}".format(MODEL_NAME), + wml_client.repository.ModelMetaNames.EVALUATION_METHOD: EVALUATION_METHOD, + wml_client.repository.ModelMetaNames.EVALUATION_METRICS: evaluation['metrics'] + } + if aios_manifest_path: + model_props[wml_client.repository.ModelMetaNames.OUTPUT_DATA_SCHEMA] = OUTPUT_DATA_SCHEMA + + wml_models = wml_client.repository.get_details() + model_uid = None + for model_in in wml_models['models']['resources']: + if MODEL_NAME == model_in['entity']['name']: + model_uid = model_in['metadata']['guid'] + break + + if model_uid is None: + print("Storing model ...") + + published_model_details = wml_client.repository.store_model(model=model, meta_props=model_props, training_data=train_data, pipeline=pipeline) + model_uid = wml_client.repository.get_model_uid(published_model_details) + print("Done") + else: + print("Model already exist") + + with open("/tmp/model_uid", "w") as report: + report.write(model_uid) diff --git a/components/ibm-components/spark/train_spark/Dockerfile b/components/ibm-components/spark/train_spark/Dockerfile index 578ebf71dd3..0b627733261 100644 --- a/components/ibm-components/spark/train_spark/Dockerfile +++ b/components/ibm-components/spark/train_spark/Dockerfile @@ -1,6 +1,4 @@ FROM python:3.6.8-stretch -RUN apt-get update -RUN apt-get install -y vim wget curl ENV APP_HOME /app COPY src $APP_HOME diff --git a/components/ibm-components/spark/train_spark/component.yaml b/components/ibm-components/spark/train_spark/component.yaml index e887e2fe279..dc9c0af745c 100644 --- a/components/ibm-components/spark/train_spark/component.yaml +++ b/components/ibm-components/spark/train_spark/component.yaml @@ -16,22 +16,24 @@ description: | metadata: annotations: {platform: 'IBM Cloud Spark Service'} inputs: - - {name: bucket_name, description: 'Required. Object storage bucket name'} - - {name: data_filename, description: 'Required. Name of the data binary'} - - {name: model_filename, description: 'Required. Name of the training model file'} + - {name: bucket_name, description: 'Required. Object storage bucket name'} + - {name: data_filename, description: 'Required. Name of the data binary'} + - {name: model_filename, description: 'Required. Name of the training model file'} - {name: spark_entrypoint, description: 'Required. Entrypoint command for training the spark model'} outputs: - - {name: output, description: 'Spark Model Filename'} + - {name: model_filepath, description: 'Spark Model binary filepath'} + - {name: train_data_filepath, description: 'Spark training data filepath'} implementation: container: - image: docker.io/aipipeline/training_with_spark_service:latest - command: ['python3'] + image: docker.io/aipipeline/train_spark:latest + command: ['python'] args: [ - /app/train_spark.py, + -u, train_spark.py, --bucket_name, {inputValue: bucket_name}, --data_filename, {inputValue: data_filename}, --model_filename, {inputValue: model_filename}, --spark_entrypoint, {inputValue: spark_entrypoint} ] fileOutputs: - output: /tmp/spark_model.txt + model_filepath: /tmp/model_filepath + train_data_filepath: /tmp/train_data_filepath diff --git a/components/ibm-components/spark/train_spark/src/spark-submit.sh b/components/ibm-components/spark/train_spark/src/spark-submit.sh index 2ec8c268c29..5fd435054a1 100644 --- a/components/ibm-components/spark/train_spark/src/spark-submit.sh +++ b/components/ibm-components/spark/train_spark/src/spark-submit.sh @@ -1,3 +1,5 @@ +#!/usr/bin/env bash + # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/components/ibm-components/spark/train_spark/src/train_spark.py b/components/ibm-components/spark/train_spark/src/train_spark.py index 8f51ef0d523..84ef9ae182b 100644 --- a/components/ibm-components/spark/train_spark/src/train_spark.py +++ b/components/ibm-components/spark/train_spark/src/train_spark.py @@ -1,11 +1,9 @@ import os import argparse import json -import subprocess -import time -def get_secret(path): +def get_secret_creds(path): with open(path, 'r') as f: cred = f.readline().strip('\'') f.close() @@ -15,7 +13,7 @@ def get_secret(path): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--bucket_name', type=str, help='Object storage bucket name', default="dummy-bucket-name") - parser.add_argument('--data_filename', type=str, help='Name of the data binary', default="credit_risk_training.csv") + parser.add_argument('--data_filename', type=str, help='Name of the data binary', default="") parser.add_argument('--model_filename', type=str, help='Name of the training model file', default="model.py") parser.add_argument('--spark_entrypoint', type=str, help='Entrypoint command for training the spark model', default="python model.py") args = parser.parse_args() @@ -25,19 +23,19 @@ def get_secret(path): model_filename = args.model_filename spark_entrypoint = args.spark_entrypoint - cos_url = get_secret("/app/secrets/cos_url") - cos_apikey = get_secret("/app/secrets/cos_apikey") - cos_resource_instance_id = get_secret("/app/secrets/cos_resource_id") - tenant_id = get_secret("/app/secrets/spark_tenant_id") - cluster_master_url = get_secret("/app/secrets/spark_cluster_master_url") - tenant_secret = get_secret("/app/secrets/spark_tenant_secret") - instance_id = get_secret("/app/secrets/spark_instance_id") + cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint") + cos_access_key = get_secret_creds("/app/secrets/cos_access_key") + cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key") + tenant_id = get_secret_creds("/app/secrets/spark_tenant_id") + cluster_master_url = get_secret_creds("/app/secrets/spark_cluster_master_url") + tenant_secret = get_secret_creds("/app/secrets/spark_tenant_secret") + instance_id = get_secret_creds("/app/secrets/spark_instance_id") ''' Create credentials and vcap files for spark submit''' creds = { - "cos_url": cos_url, - "cos_apikey": cos_apikey, - "cos_resource_id": cos_resource_instance_id, + "cos_endpoint": cos_endpoint, + "cos_access_key": cos_access_key, + "cos_secret_key": cos_secret_key, "bucket_name": cos_bucket_name, "data_filename": data_filename, "model_filename": model_filename, @@ -63,5 +61,7 @@ def get_secret(path): os.system('./spark-submit.sh --vcap ./vcap.json --deploy-mode cluster --conf spark.service.spark_version=2.1 --files creds.json wrapper.py') os.system('cat stdout') - with open("/tmp/spark_model.txt", "w") as report: - report.write(model_filename) + with open("/tmp/model_filepath", "w") as report: + report.write("model.zip") + with open("/tmp/train_data_filepath", "w") as report: + report.write("train_data.zip") diff --git a/components/ibm-components/spark/train_spark/src/wrapper.py b/components/ibm-components/spark/train_spark/src/wrapper.py index 755426db407..4105cc4d212 100644 --- a/components/ibm-components/spark/train_spark/src/wrapper.py +++ b/components/ibm-components/spark/train_spark/src/wrapper.py @@ -4,11 +4,10 @@ import json import re -# Install ibm cos package if the spark kernel is not running on IBM service. -# call_command('pip install ibm-cos-sdk --user') -import ibm_boto3 -from ibm_botocore.client import Config +os.system('pip install Minio --user') + +from minio import Minio # Load Credential file @@ -17,27 +16,26 @@ creds = json.load(f) f.close() +# Remove possible http scheme for Minio +url = re.compile(r"https?://") +cos_endpoint = url.sub('', creds['cos_endpoint']) # Download the data and model file from the object storage. -cos = ibm_boto3.resource('s3', - ibm_api_key_id=creds['cos_apikey'], - ibm_service_instance_id=creds['cos_resource_id'], - ibm_auth_endpoint='https://iam.bluemix.net/oidc/token', - config=Config(signature_version='oauth'), - endpoint_url=creds['cos_url']) +cos = Minio(cos_endpoint, + access_key=creds['cos_access_key'], + secret_key=creds['cos_secret_key'], + secure=True) -cos.Bucket(creds['bucket_name']).download_file(creds['data_filename'], creds['data_filename']) -cos.Bucket(creds['bucket_name']).download_file(creds['model_filename'], creds['model_filename']) +cos.fget_object(creds['bucket_name'], creds['data_filename'], creds['data_filename']) +cos.fget_object(creds['bucket_name'], creds['model_filename'], creds['model_filename']) os.system('chmod 755 %s' % creds['model_filename']) os.system(creds['spark_entrypoint']) os.system('zip -r model.zip model') -os.system('zip -r pipeline.zip pipeline') os.system('zip -r train_data.zip train_data') -cos.Bucket(creds['bucket_name']).upload_file('model.zip', 'model.zip') -cos.Bucket(creds['bucket_name']).upload_file('pipeline.zip', 'pipeline.zip') -cos.Bucket(creds['bucket_name']).upload_file('train_data.zip', 'train_data.zip') -cos.Bucket(creds['bucket_name']).upload_file('evaluation.json', 'evaluation.json') +cos.fput_object(creds['bucket_name'], 'model.zip', 'model.zip') +cos.fput_object(creds['bucket_name'], 'train_data.zip', 'train_data.zip') +cos.fput_object(creds['bucket_name'], 'evaluation.json', 'evaluation.json') -print('Trained model, pipeline, and train_data are uploaded.') +print('Trained model and train_data are uploaded.') diff --git a/components/ibm-components/watson/deploy/component.yaml b/components/ibm-components/watson/deploy/component.yaml index 883fb3aacb8..82d96ce82ef 100644 --- a/components/ibm-components/watson/deploy/component.yaml +++ b/components/ibm-components/watson/deploy/component.yaml @@ -16,20 +16,24 @@ description: | metadata: annotations: {platform: 'IBM Watson Machine Learning'} inputs: - - {name: model_uid, description: 'Required. UID for the stored model on Watson Machine Learning'} - - {name: model_name, description: 'Required. Model Name on Watson Machine Learning'} - - {name: scoring_payload, description: 'Required. Sample Payload file name in the object storage'} + - {name: model_uid, description: 'Required. UID for the stored model on Watson Machine Learning'} + - {name: model_name, description: 'Required. Model Name on Watson Machine Learning'} + - {name: scoring_payload, description: 'Sample Payload file name in the object storage', default: ''} + - {name: deployment_name, description: 'Deployment Name on Watson Machine Learning', default: ''} outputs: - - {name: output, description: 'Link to the deployed model web service'} + - {name: scoring_endpoint, description: 'Link to the deployed model web service'} + - {name: model_uid, description: 'UID for the stored model on Watson Machine Learning'} implementation: container: image: docker.io/aipipeline/wml-deploy:latest - command: ['python3'] + command: ['python'] args: [ /app/wml-deploy.py, --model-uid, {inputValue: model_uid}, --model-name, {inputValue: model_name}, - --scoring-payload, {inputValue: scoring_payload} + --scoring-payload, {inputValue: scoring_payload}, + --deployment-name, {inputValue: deployment_name} ] fileOutputs: - output: /tmp/output + scoring_endpoint: /tmp/scoring_endpoint + model_uid: /tmp/model_uid diff --git a/components/ibm-components/watson/deploy/src/wml-deploy.py b/components/ibm-components/watson/deploy/src/wml-deploy.py index afdd728ddfb..165eeb3a994 100644 --- a/components/ibm-components/watson/deploy/src/wml-deploy.py +++ b/components/ibm-components/watson/deploy/src/wml-deploy.py @@ -9,7 +9,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# + # define the function to deploy the model def getSecret(secret): @@ -22,10 +22,12 @@ def deploy(args): from watson_machine_learning_client import WatsonMachineLearningAPIClient from minio import Minio import os - + import re + wml_model_name = args.model_name - wml_scoring_payload = args.scoring_payload model_uid = args.model_uid + wml_scoring_payload = args.scoring_payload if args.scoring_payload else '' + deployment_name = args.deployment_name if args.deployment_name else wml_model_name # retrieve credentials wml_url = getSecret("/app/secrets/wml_url") @@ -33,12 +35,6 @@ def deploy(args): wml_password = getSecret("/app/secrets/wml_password") wml_instance_id = getSecret("/app/secrets/wml_instance_id") - cos_endpoint = getSecret("/app/secrets/cos_endpoint") - cos_access_key = getSecret("/app/secrets/cos_access_key") - cos_secret_key = getSecret("/app/secrets/cos_secret_key") - - cos_input_bucket = getSecret("/app/secrets/cos_input_bucket") - # set up the WML client wml_credentials = { "url": wml_url, @@ -46,43 +42,59 @@ def deploy(args): "password": wml_password, "instance_id": wml_instance_id } - client = WatsonMachineLearningAPIClient( wml_credentials ) - + client = WatsonMachineLearningAPIClient(wml_credentials) + # deploy the model - deployment_name = wml_model_name - deployment_desc = "deployment of %s" %wml_model_name - deployment = client.deployments.create( model_uid, deployment_name, deployment_desc ) - scoring_endpoint = client.deployments.get_scoring_url( deployment ) - print( "scoring_endpoint: ", scoring_endpoint ) - - # download scoring payload - payload_file = os.path.join('/app', wml_scoring_payload) - - cos = Minio(cos_endpoint, - access_key = cos_access_key, - secret_key = cos_secret_key) - cos.fget_object(cos_input_bucket, wml_scoring_payload, payload_file) - - # scoring the deployment - import json - with open( payload_file ) as data_file: - test_data = json.load( data_file ) - payload = test_data[ 'payload' ] - data_file.close() - - print("Scoring result: ") - result = client.deployments.score( scoring_endpoint, payload ) - print(result) + deployment_desc = "deployment of %s" % wml_model_name + deployment = client.deployments.create(model_uid, deployment_name, deployment_desc) + scoring_endpoint = client.deployments.get_scoring_url(deployment) + print("scoring_endpoint: ", scoring_endpoint) + + if wml_scoring_payload: + # download scoring payload if exist + cos_endpoint = getSecret("/app/secrets/cos_endpoint") + cos_access_key = getSecret("/app/secrets/cos_access_key") + cos_secret_key = getSecret("/app/secrets/cos_secret_key") + cos_input_bucket = getSecret("/app/secrets/cos_input_bucket") + + # Make sure http scheme is not exist for Minio + url = re.compile(r"https?://") + cos_endpoint = url.sub('', cos_endpoint) - with open("/tmp/output", "w") as f: - print(result, file=f) + payload_file = os.path.join('/app', wml_scoring_payload) + + cos = Minio(cos_endpoint, + access_key=cos_access_key, + secret_key=cos_secret_key) + cos.fget_object(cos_input_bucket, wml_scoring_payload, payload_file) + + # scoring the deployment + import json + with open(payload_file) as data_file: + test_data = json.load(data_file) + payload = test_data['payload'] + data_file.close() + + print("Scoring result: ") + result = client.deployments.score(scoring_endpoint, payload) + else: + result = 'Scoring payload is not provided' + + print(result) + with open("/tmp/scoring_endpoint", "w") as f: + print(scoring_endpoint, file=f) + f.close() + with open("/tmp/model_uid", "w") as f: + print(model_uid, file=f) f.close() + if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument('--model-name', type=str, required=True) - parser.add_argument('--scoring-payload', type=str, required=True) parser.add_argument('--model-uid', type=str, required=True) + parser.add_argument('--deployment-name', type=str) + parser.add_argument('--scoring-payload', type=str) args = parser.parse_args() deploy(args) diff --git a/components/ibm-components/watson/manage/monitor_fairness/Dockerfile b/components/ibm-components/watson/manage/monitor_fairness/Dockerfile index 365f8552058..bde9a7427a3 100644 --- a/components/ibm-components/watson/manage/monitor_fairness/Dockerfile +++ b/components/ibm-components/watson/manage/monitor_fairness/Dockerfile @@ -1,7 +1,7 @@ FROM python:3.6.8-stretch RUN pip install --upgrade pip -RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale --no-cache | tail -n 1 +RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale Minio pandas --no-cache | tail -n 1 RUN pip install psycopg2-binary | tail -n 1 ENV APP_HOME /app diff --git a/components/ibm-components/watson/manage/monitor_fairness/component.yaml b/components/ibm-components/watson/manage/monitor_fairness/component.yaml index 53f04230fca..8ac82db8ac4 100644 --- a/components/ibm-components/watson/manage/monitor_fairness/component.yaml +++ b/components/ibm-components/watson/manage/monitor_fairness/component.yaml @@ -21,6 +21,7 @@ inputs: - {name: fairness_min_records, description: 'Minimum amount of records for performing a fairness monitor.', default: '5'} - {name: aios_manifest_path, description: 'Object storage file path for the aios manifest file.', default: 'aios.json'} - {name: cos_bucket_name, description: 'Object storage bucket name.', default: 'bucket-name'} + - {name: data_filename, description: 'Name of the data binary', default: ''} implementation: container: image: docker.io/aipipeline/monitor_fairness:latest @@ -31,5 +32,6 @@ implementation: --fairness_threshold, {inputValue: fairness_threshold}, --fairness_min_records, {inputValue: fairness_min_records}, --aios_manifest_path, {inputValue: aios_manifest_path}, - --cos_bucket_name, {inputValue: cos_bucket_name} + --cos_bucket_name, {inputValue: cos_bucket_name}, + --data_filename, {inputValue: data_filename} ] diff --git a/components/ibm-components/watson/manage/monitor_fairness/src/monitor_fairness.py b/components/ibm-components/watson/manage/monitor_fairness/src/monitor_fairness.py index 214b1911621..763661945ac 100644 --- a/components/ibm-components/watson/manage/monitor_fairness/src/monitor_fairness.py +++ b/components/ibm-components/watson/manage/monitor_fairness/src/monitor_fairness.py @@ -1,12 +1,13 @@ import json import argparse -import ibm_boto3 -from ibm_botocore.client import Config +import re from ibm_ai_openscale import APIClient from ibm_ai_openscale.engines import * from ibm_ai_openscale.utils import * from ibm_ai_openscale.supporting_classes import PayloadRecord, Feature from ibm_ai_openscale.supporting_classes.enums import * +from minio import Minio +import pandas as pd def get_secret_creds(path): with open(path, 'r') as f: @@ -21,6 +22,7 @@ def get_secret_creds(path): parser.add_argument('--fairness_min_records', type=int, help='Minimum amount of records for performing a fairness monitor', default=5) parser.add_argument('--aios_manifest_path', type=str, help='Object storage file path for the aios manifest file', default='aios.json') parser.add_argument('--cos_bucket_name', type=str, help='Object storage bucket name', default='bucket-name') + parser.add_argument('--data_filename', type=str, help='Name of the data binary', default="") args = parser.parse_args() model_name = args.model_name @@ -28,25 +30,31 @@ def get_secret_creds(path): fairness_min_records = args.fairness_min_records cos_bucket_name = args.cos_bucket_name aios_manifest_path = args.aios_manifest_path + data_filename = args.data_filename aios_guid = get_secret_creds("/app/secrets/aios_guid") cloud_api_key = get_secret_creds("/app/secrets/cloud_api_key") - cos_url = get_secret_creds("/app/secrets/cos_url") - cos_apikey = get_secret_creds("/app/secrets/cos_apikey") - cos_resource_instance_id = get_secret_creds("/app/secrets/cos_resource_id") + cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint") + cos_access_key = get_secret_creds("/app/secrets/cos_access_key") + cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key") - ''' Upload data to IBM Cloud object storage ''' - cos = ibm_boto3.resource('s3', - ibm_api_key_id=cos_apikey, - ibm_service_instance_id=cos_resource_instance_id, - ibm_auth_endpoint='https://iam.bluemix.net/oidc/token', - config=Config(signature_version='oauth'), - endpoint_url=cos_url) + ''' Remove possible http scheme for Minio ''' + url = re.compile(r"https?://") + cos_endpoint = url.sub('', cos_endpoint) - cos.Bucket(cos_bucket_name).download_file(aios_manifest_path, 'aios.json') + ''' Upload data to Cloud object storage ''' + cos = Minio(cos_endpoint, + access_key=cos_access_key, + secret_key=cos_secret_key, + secure=True) + cos.fget_object(cos_bucket_name, aios_manifest_path, 'aios.json') print('Fairness definition file ' + aios_manifest_path + ' is downloaded') + cos.fget_object(cos_bucket_name, data_filename, data_filename) + pd_data = pd.read_csv(data_filename, sep=",", header=0, engine='python') + print('training data ' + data_filename + ' is downloaded and loaded') + """ Load manifest JSON file """ with open('aios.json') as f: aios_manifest = json.load(f) @@ -74,10 +82,10 @@ def get_secret_creds(path): subscription.fairness_monitoring.enable( features=feature_list, - prediction_column='predictedLabel', favourable_classes=aios_manifest['fairness_favourable_classes'], unfavourable_classes=aios_manifest['fairness_unfavourable_classes'], - min_records=fairness_min_records + min_records=fairness_min_records, + training_data=pd_data ) run_details = subscription.fairness_monitoring.run() diff --git a/components/ibm-components/watson/manage/monitor_quality/component.yaml b/components/ibm-components/watson/manage/monitor_quality/component.yaml index 8e077e7f52e..9f5177cfbcd 100644 --- a/components/ibm-components/watson/manage/monitor_quality/component.yaml +++ b/components/ibm-components/watson/manage/monitor_quality/component.yaml @@ -17,7 +17,6 @@ metadata: annotations: {platform: 'IBM Watson OpenScale'} inputs: - {name: model_name, description: 'Deployed model name on OpenScale.', default: 'AIOS Spark German Risk Model - Final'} - - {name: problem_type, description: 'Model problem type.', default: 'BINARY_CLASSIFICATION'} - {name: quality_threshold, description: 'Amount of threshold for quality monitoring', default: '0.7'} - {name: quality_min_records, description: 'Minimum amount of records for performing a quality monitor.', default: '5'} implementation: @@ -27,7 +26,6 @@ implementation: args: [ -u, monitor_quality.py, --model_name, {inputValue: model_name}, - --problem_type, {inputValue: problem_type}, --quality_threshold, {inputValue: quality_threshold}, --quality_min_records, {inputValue: quality_min_records} ] diff --git a/components/ibm-components/watson/manage/monitor_quality/src/monitor_quality.py b/components/ibm-components/watson/manage/monitor_quality/src/monitor_quality.py index 52c74ce3672..15e539b8d59 100644 --- a/components/ibm-components/watson/manage/monitor_quality/src/monitor_quality.py +++ b/components/ibm-components/watson/manage/monitor_quality/src/monitor_quality.py @@ -15,13 +15,11 @@ def get_secret_creds(path): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--model_name', type=str, help='Deployed model name', default="AIOS Spark German Risk Model - Final") - parser.add_argument('--problem_type', type=str, help='Model problem type', default="BINARY_CLASSIFICATION") parser.add_argument('--quality_threshold', type=float, help='Amount of threshold for quality monitoring', default=0.7) parser.add_argument('--quality_min_records', type=int, help='Minimum amount of records for performing a quality monitor', default=5) args = parser.parse_args() model_name = args.model_name - problem_type = args.problem_type quality_threshold = args.quality_threshold quality_min_records = args.quality_min_records @@ -43,15 +41,7 @@ def get_secret_creds(path): if ai_client.data_mart.subscriptions.get_details(sub)['entity']['asset']['name'] == model_name: subscription = ai_client.data_mart.subscriptions.get(sub) - PROBLEMTYPE = ProblemType.BINARY_CLASSIFICATION - if problem_type == 'BINARY_CLASSIFICATION': - PROBLEMTYPE = ProblemType.BINARY_CLASSIFICATION - elif problem_type == 'MULTICLASS_CLASSIFICATION': - PROBLEMTYPE = ProblemType.MULTICLASS_CLASSIFICATION - elif problem_type == 'REGRESSION': - PROBLEMTYPE = ProblemType.REGRESSION - - subscription.quality_monitoring.enable(problem_type=PROBLEMTYPE, threshold=quality_threshold, min_records=quality_min_records) + subscription.quality_monitoring.enable(threshold=quality_threshold, min_records=quality_min_records) # Runs need to post the minial payload records in order to trigger the monitoring run. # run_details = subscription.quality_monitoring.run() diff --git a/components/ibm-components/watson/manage/subscribe/Dockerfile b/components/ibm-components/watson/manage/subscribe/Dockerfile index 2c9f66930f5..9b0238eb0f5 100644 --- a/components/ibm-components/watson/manage/subscribe/Dockerfile +++ b/components/ibm-components/watson/manage/subscribe/Dockerfile @@ -1,7 +1,7 @@ FROM python:3.6.8-stretch RUN pip install --upgrade pip -RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale --no-cache | tail -n 1 +RUN pip install --upgrade watson-machine-learning-client ibm-ai-openscale Minio --no-cache | tail -n 1 RUN pip install psycopg2-binary | tail -n 1 ENV APP_HOME /app diff --git a/components/ibm-components/watson/manage/subscribe/component.yaml b/components/ibm-components/watson/manage/subscribe/component.yaml index c3c60c1d45d..485c1ee2caa 100644 --- a/components/ibm-components/watson/manage/subscribe/component.yaml +++ b/components/ibm-components/watson/manage/subscribe/component.yaml @@ -16,12 +16,15 @@ description: | metadata: annotations: {platform: 'IBM Watson OpenScale'} inputs: - - {name: model_name, description: 'Deployed model name.', default: 'AIOS Spark German Risk Model - Final'} - - {name: model_uid, description: 'Deployed model uid.', default: 'dummy uid'} - - {name: aios_schema, description: 'OpenScale Schema Name', default: 'data_mart_credit_risk'} - - {name: label_column, description: 'Model label column name.', default: 'Risk'} + - {name: model_name, description: 'Deployed model name.', default: 'AIOS Spark German Risk Model - Final'} + - {name: model_uid, description: 'Deployed model uid.', default: 'dummy uid'} + - {name: aios_schema, description: 'OpenScale Schema Name', default: 'data_mart_credit_risk'} + - {name: label_column, description: 'Model label column name.', default: 'Risk'} + - {name: aios_manifest_path, description: 'Object storage file path for the aios manifest file', default: ''} + - {name: bucket_name, description: 'Object storage bucket name', default: 'dummy-bucket-name'} + - {name: problem_type, description: 'Model problem type', default: 'BINARY_CLASSIFICATION'} outputs: - - {name: model_name, description: 'Deployed model name.'} + - {name: model_name, description: 'Deployed model name.'} implementation: container: image: docker.io/aipipeline/subscribe:latest @@ -31,7 +34,10 @@ implementation: --model_name, {inputValue: model_name}, --model_uid, {inputValue: model_uid}, --aios_schema, {inputValue: aios_schema}, - --label_column, {inputValue: label_column} + --label_column, {inputValue: label_column}, + --aios_manifest_path, {inputValue: aios_manifest_path}, + --bucket_name, {inputValue: bucket_name}, + --problem_type, {inputValue: problem_type} ] fileOutputs: - model_name: /tmp/model_name.txt + model_name: /tmp/model_name diff --git a/components/ibm-components/watson/manage/subscribe/src/subscribe.py b/components/ibm-components/watson/manage/subscribe/src/subscribe.py index ea59d5dbc29..47896172ed2 100644 --- a/components/ibm-components/watson/manage/subscribe/src/subscribe.py +++ b/components/ibm-components/watson/manage/subscribe/src/subscribe.py @@ -1,11 +1,13 @@ import json import argparse +import re from ibm_ai_openscale import APIClient from ibm_ai_openscale.engines import * from ibm_ai_openscale.utils import * from ibm_ai_openscale.supporting_classes import PayloadRecord, Feature from ibm_ai_openscale.supporting_classes.enums import * from watson_machine_learning_client import WatsonMachineLearningAPIClient +from minio import Minio def get_secret_creds(path): with open(path, 'r') as f: @@ -19,19 +21,42 @@ def get_secret_creds(path): parser.add_argument('--model_name', type=str, help='Deployed model name', default="AIOS Spark German Risk Model - Final") parser.add_argument('--model_uid', type=str, help='Deployed model uid', default="dummy uid") parser.add_argument('--label_column', type=str, help='Model label column name', default="Risk") + parser.add_argument('--aios_manifest_path', type=str, help='Object storage file path for the aios manifest file', default="") + parser.add_argument('--bucket_name', type=str, help='Object storage bucket name', default="dummy-bucket-name") + parser.add_argument('--problem_type', type=str, help='Model problem type', default="BINARY_CLASSIFICATION") args = parser.parse_args() aios_schema = args.aios_schema model_name = args.model_name model_uid = args.model_uid label_column = args.label_column - - wml_creds = get_secret_creds("/app/secrets/wml_credentials") + aios_manifest_path = args.aios_manifest_path + cos_bucket_name = args.bucket_name + problem_type = args.problem_type + + wml_url = get_secret_creds("/app/secrets/wml_url") + wml_username = get_secret_creds("/app/secrets/wml_username") + wml_password = get_secret_creds("/app/secrets/wml_password") + wml_instance_id = get_secret_creds("/app/secrets/wml_instance_id") + wml_apikey = get_secret_creds("/app/secrets/wml_apikey") aios_guid = get_secret_creds("/app/secrets/aios_guid") cloud_api_key = get_secret_creds("/app/secrets/cloud_api_key") postgres_uri = get_secret_creds("/app/secrets/postgres_uri") - - WML_CREDENTIALS = json.loads(wml_creds) + cos_endpoint = get_secret_creds("/app/secrets/cos_endpoint") + cos_access_key = get_secret_creds("/app/secrets/cos_access_key") + cos_secret_key = get_secret_creds("/app/secrets/cos_secret_key") + + ''' Make sure http scheme is not exist for Minio ''' + url = re.compile(r"https?://") + cos_endpoint = url.sub('', cos_endpoint) + + WML_CREDENTIALS = { + "url": wml_url, + "username": wml_username, + "password": wml_password, + "instance_id": wml_instance_id, + "apikey": wml_apikey + } AIOS_CREDENTIALS = { "instance_guid": aios_guid, @@ -55,7 +80,7 @@ def get_secret_creds(path): try: data_mart_details = ai_client.data_mart.get_details() if 'internal_database' in data_mart_details['database_configuration'] and data_mart_details['database_configuration']['internal_database']: - if POSTGRES_CREDENTIALS is None: + if POSTGRES_CREDENTIALS: print('Using existing internal datamart') else: print('Switching to external datamart') @@ -65,7 +90,7 @@ def get_secret_creds(path): else: print('Using existing external datamart') except: - if POSTGRES_CREDENTIALS is None: + if POSTGRES_CREDENTIALS: print('Setting up internal datamart') ai_client.data_mart.setup(internal_db=True) else: @@ -74,7 +99,6 @@ def get_secret_creds(path): ai_client.data_mart.setup(db_credentials=POSTGRES_CREDENTIALS, schema=SCHEMA_NAME) data_mart_details = ai_client.data_mart.get_details() - print(data_mart_details) binding_uid = ai_client.data_mart.bindings.add('WML instance', WatsonMachineLearningInstance(WML_CREDENTIALS)) if binding_uid is None: @@ -91,11 +115,44 @@ def get_secret_creds(path): ai_client.data_mart.subscriptions.delete(subscription) print('Deleted existing subscription for', model_name) + ''' Obtain feature and categorical columns ''' + # Download aios manifest file + cos = Minio(cos_endpoint, + access_key=cos_access_key, + secret_key=cos_secret_key, + secure=True) + cos.fget_object(cos_bucket_name, aios_manifest_path, aios_manifest_path) + + # Extract necessary column names + feature_columns = [] + categorical_columns = [] + with open(aios_manifest_path) as f: + aios_manifest = json.load(f) + OUTPUT_DATA_SCHEMA = {'fields': aios_manifest['model_schema'], 'type': 'struct'} + for column in aios_manifest['model_schema']: + if column['metadata'].get('modeling_role', '') == 'feature': + feature_columns.append(column['name']) + if column['metadata'].get('measure', '') == 'discrete': + categorical_columns.append(column['name']) + f.close() + + PROBLEMTYPE = ProblemType.BINARY_CLASSIFICATION + if problem_type == 'BINARY_CLASSIFICATION': + PROBLEMTYPE = ProblemType.BINARY_CLASSIFICATION + elif problem_type == 'MULTICLASS_CLASSIFICATION': + PROBLEMTYPE = ProblemType.MULTICLASS_CLASSIFICATION + elif problem_type == 'REGRESSION': + PROBLEMTYPE = ProblemType.REGRESSION + subscription = ai_client.data_mart.subscriptions.add(WatsonMachineLearningAsset( model_uid, label_column=label_column, + input_data_type=InputDataType.STRUCTURED, + problem_type=PROBLEMTYPE, prediction_column='predictedLabel', - probability_column='probability' + probability_column='probability', + feature_columns=feature_columns, + categorical_columns=categorical_columns )) if subscription is None: print('Exists already') @@ -120,5 +177,5 @@ def get_secret_creds(path): print('Scoring endpoint is: ' + credit_risk_scoring_endpoint + '\n') - with open("/tmp/model_name.txt", "w") as report: + with open("/tmp/model_name", "w") as report: report.write(model_name) diff --git a/samples/ibm-samples/openscale/README.md b/samples/ibm-samples/openscale/README.md new file mode 100644 index 00000000000..1cf73b92bc6 --- /dev/null +++ b/samples/ibm-samples/openscale/README.md @@ -0,0 +1,116 @@ +# Watson OpenScale Example + +This simple OpenScale pipeline will demonstrate how to train a model using IBM Spark Service, store and deploy it with Watson Machine Learning, and then use Watson OpenScale for fairness and quality monitoring. + +## Prerequisites +This pipeline requires the user to have provisioned OpenScale, Spark, and Machine Learning Service on Watson, a cloud object store set up and the service credentials configured in the creds.ini file. + +To provision your own OpenScale, Spark, Watson Machine Learning services and cloud object store, following are the required steps. + +1. IBM Watson Machine Learning service instance + +To create a Watson Machine Learning service, go to [IBM Cloud](https://cloud.ibm.com/), login with IBM account id first. From the `Catalog` page, click on `AI` tab on the left side to go to this [page](https://cloud.ibm.com/catalog?category=ai). Then click on the [`Machine Learning`](https://cloud.ibm.com/catalog/services/machine-learning) link and follow the instructions to create the service. + +Once the service is created, from the service's `Dashboard`, follow the instructions to generate `service credentials`. Refer to IBM Cloud [documents](https://cloud.ibm.com/docs) for help if needed. Collect the `url`, `username`, `password`, `apikey`, and `instance_id` info from the service credentials as these will be required to access the service. + +2. IBM Watson OpenScale service instance + +The IBM Watson OpenScale service will help us monitor the quality and fairness status for the deployed models. From the `Catalog` page, click on `AI` tab on the left side to go to this [page](https://cloud.ibm.com/catalog?category=ai). Then click on the [`Watson OpenScale`](https://cloud.ibm.com/catalog/services/watson-openscale) link and follow the instructions to create the service. + +Once the service is created, click on service's `Launch Application` and click on configuration (the 4th icon on the left side) from the new pop up link. Collect the `Datamart ID` which is the GUID for Watson OpenScale. + +In addition, collect the IBM Cloud API Key from this [page](https://cloud.ibm.com/iam#/apikeys) to enable service binding for OpenScale service. + +3. IBM Spark service instance + +The IBM Spark service will provide several spark executors to help train our example model. From the `Catalog` page, click on `Web and Application` tab on the left side to go to this [page](https://cloud.ibm.com/catalog?category=app_services). Then click on the [`Apache Spark`](https://cloud.ibm.com/catalog/services/apache-spark) link and follow the instructions to create the service. + +Once the service is created, from the service's `Service credentials` on the left side, follow the instructions to generate `service credentials`. Refer to IBM Cloud [documents](https://cloud.ibm.com/docs) for help if needed. +Collect the `tenant_secret`, `tenant_id`, `cluster_master_url`, and `instance_id` info from the service credentials as these will be required to access the service. + +4. A cloud object store + +Watson Machine Learning service loads datasets from cloud object store and stores model outputs and other artifacts to cloud object store. Users can use any cloud object store they already preserve. Users can also create a cloud object store with `IBM Cloud Object Storage` service by following this [link](https://console.bluemix.net/catalog/services/cloud-object-storage). + +Collect the `endpoint`, `access_key_id` and `secret_access_key` fields from the service credentials for the cloud object store. Create the service credentials first if not exist. To ensure generating HMAC credentials, specify the following in the `Add Inline Configuration Parameters` field: `{"HMAC":true}`. + +Create a bucket for storing the train datasets and model source codes. + +Then, upload all the files in the `source` folder to the created bucket. + +5. Set up access credentials + +This pipeline sample reads the credentials from a file hosted in a github repo. Refer to `creds.ini` file and input user's specific credentials. Then upload the file to a github repo the user has access. + +To access the credentials file, the user should provide a github access token and the link to the raw content of the file. Modify the `GITHUB_TOKEN` and `CONFIG_FILE_URL` variables in the below code block and run the Python code to create a Kubernetes secret using the KubeFlow pipeline. + +```python +import kfp.dsl as dsl +import kfp.components as components +from kfp import compiler +import kfp +secret_name = 'aios-creds' +configuration_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/ibm-components/commons/config/component.yaml') +client = kfp.Client() +EXPERIMENT_NAME = 'create secret' +exp = client.create_experiment(name=EXPERIMENT_NAME) +@dsl.pipeline( + name='create secret', + description='' +) +def secret_pipeline( + GITHUB_TOKEN='', + CONFIG_FILE_URL='https://raw.githubusercontent.com/user/repository/branch/creds.ini', +): + get_configuration = configuration_op( + token=GITHUB_TOKEN, + url=CONFIG_FILE_URL, + name=secret_name + ) +compiler.Compiler().compile(secret_pipeline, 'secret_pipeline.tar.gz') +run = client.run_pipeline(exp.id, 'secret_pipeline', 'secret_pipeline.tar.gz') +``` + +## Instructions + +First, install the necessary Python Packages +```shell +pip3 install ai_pipeline_params +``` + +In this repository, run the following commands to create the argo files using the Kubeflow pipeline SDK. +```shell +dsl-compile --py openscale.py --output openscale.tar.gz +``` + +Then, submit `openscale.tar.gz` to the kubeflow pipeline UI. From there you can create different experiments and runs with the OpenScale pipeline. + +## Pipeline Parameters +- **bucket-name**: Object Storage bucket that has Spark training files and OpenScale manifest +- **training-data-link**: Link to a public data source if the data is not being preprocessed. +- **postgres-schema-name**: PostgreSQL schema name for storing model payload metrics +- **label-name**: Model label name in the dataset. +- **problem-type**: Model output type. Possible options are `BINARY_CLASSIFICATION`, `MULTICLASS_CLASSIFICATION`, and `REGRESSION` +- **threshold**: Model threshold that is recommended for the OpenScale service to monitor. +- **aios-manifest-path**: Manifest files path in the object storage that defines the fairness definition and model schema. +- **model-file-path**: Model file path in the object storage for the spark service to execute. +- **spark-entrypoint**: Entrypoint command to execute the model training using spark service. +- **model-name**: Model name for storing the trained model in Watson Machine Learning service. +- **deployment-name**: Deployment name for deploying the stored model in Watson Machine Learning service. + +## Credentials needed to be stored in the creds.ini +- **aios_guid**: GUID of the OpenScale service +- **cloud_api_key**: IBM Cloud API Key +- **postgres_uri**: PostgreSQL URI for storing model payload. Leave it with the empty string `""` if you wish to use the default database that comes with the OpenScale service. +- **spark_tenant_id**: Spark tenant ID from the IBM Apache Spark service. +- **spark_tenant_secret**: Spark tenant secret from the IBM Apache Spark service. +- **spark_cluster_master_url**: Spark cluster master URL from the IBM Apache Spark service. +- **spark_instance_id**: Spark instance ID from the IBM Apache Spark service. +- **cos_endpoint**: Object Storage endpoint. +- **cos_access_key**: Object Storage access key ID +- **cos_secret_key**: Object Storage secret access key. +- **wml_url**: URL endpoint from the Watson Machine Learning service. +- **wml_username**: Username from the Watson Machine Learning service. +- **wml_password**: Password from the Watson Machine Learning service. +- **wml_instance_id**: Instance ID from the Watson Machine Learning service. +- **wml_apikey**: API Key from the Watson Machine Learning service. diff --git a/samples/ibm-samples/openscale/credentials/creds.ini b/samples/ibm-samples/openscale/credentials/creds.ini new file mode 100644 index 00000000000..d8a4f6ecc1f --- /dev/null +++ b/samples/ibm-samples/openscale/credentials/creds.ini @@ -0,0 +1,19 @@ +[CREDENTIALS] +aios_guid = OpenscaleGuid +cloud_api_key = IBMCloudAPIKey +postgres_uri = postgreSQLURI + +spark_tenant_id = SparkTenantId +spark_tenant_secret = SparkTenantSecret +spark_cluster_master_url = https://spark.bluemix.net +spark_instance_id = SparkInstanceId + +cos_endpoint = ObjectStoreEndpointUrl +cos_access_key = ObjectStoreAccessKeyID +cos_secret_key = ObjectStoreSecretAccessKey + +wml_url = https://us-south.ml.cloud.ibm.com +wml_username = WMLUserName +wml_password = WMLPassword +wml_instance_id = WMLInstanceId +wml_apikey = WMLAPIKey diff --git a/samples/ibm-samples/openscale/openscale.py b/samples/ibm-samples/openscale/openscale.py new file mode 100644 index 00000000000..da451cec0b4 --- /dev/null +++ b/samples/ibm-samples/openscale/openscale.py @@ -0,0 +1,83 @@ +import kfp.dsl as dsl +import kfp.components as components +import ai_pipeline_params as params + +secret_name = 'aios-creds' + +preprocess_spark_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/ibm-components/spark/data_preprocess_spark/component.yaml') +train_spark_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/ibm-components/spark/train_spark/component.yaml') +store_spark_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/ibm-components/spark/store_spark_model/component.yaml') +deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/ibm-components/watson/deploy/component.yaml') +subscribe_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/ibm-components/watson/manage/subscribe/component.yaml') +fairness_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/ibm-components/watson/manage/monitor_fairness/component.yaml') +quality_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/ibm-components/watson/manage/monitor_quality/component.yaml') + + +@dsl.pipeline( + name='Watson OpenScale Pipeline', + description='A pipeline for end to end Spark machine learning workflow and model monitoring.' +) +def aiosPipeline( + BUCKET_NAME='', + TRAINING_DATA_LINK='https://raw.githubusercontent.com/emartensibm/german-credit/master/german_credit_data_biased_training.csv', + POSTGRES_SCHEMA_NAME='data_mart_credit', + LABEL_NAME='Risk', + PROBLEM_TYPE='BINARY_CLASSIFICATION', + THRESHOLD='0.7', + AIOS_MANIFEST_PATH='aios.json', + MODEL_FILE_PATH='model.py', + SPARK_ENTRYPOINT='python model.py', + MODEL_NAME='Spark German Risk Model - Final', + DEPLOYMENT_NAME='Spark German Risk Deployment - Final' +): + + """A pipeline for Spark machine learning workflow with OpenScale.""" + + data_preprocess_spark = preprocess_spark_op( + bucket_name=BUCKET_NAME, + data_url=TRAINING_DATA_LINK + ).apply(params.use_ai_pipeline_params(secret_name)) + train_spark = train_spark_op( + bucket_name=BUCKET_NAME, + data_filename=data_preprocess_spark.output, + model_filename=MODEL_FILE_PATH, + spark_entrypoint=SPARK_ENTRYPOINT + ).apply(params.use_ai_pipeline_params(secret_name)) + store_spark_model = store_spark_op( + bucket_name=BUCKET_NAME, + aios_manifest_path=AIOS_MANIFEST_PATH, + problem_type=PROBLEM_TYPE, + model_name=MODEL_NAME, + deployment_name=DEPLOYMENT_NAME, + model_filepath=train_spark.outputs['model_filepath'], + train_data_filepath=train_spark.outputs['train_data_filepath'] + ).apply(params.use_ai_pipeline_params(secret_name)) + deploy = deploy_op( + model_uid=store_spark_model.output, + model_name=MODEL_NAME, + deployment_name=DEPLOYMENT_NAME + ).apply(params.use_ai_pipeline_params(secret_name)) + subscribe = subscribe_op( + model_uid=deploy.outputs['model_uid'], + model_name=MODEL_NAME, + aios_schema=POSTGRES_SCHEMA_NAME, + label_column=LABEL_NAME, + aios_manifest_path=AIOS_MANIFEST_PATH, + bucket_name=BUCKET_NAME, + problem_type=PROBLEM_TYPE + ).apply(params.use_ai_pipeline_params(secret_name)) + monitor_quality = quality_op( + model_name=subscribe.output, + quality_threshold=THRESHOLD + ).apply(params.use_ai_pipeline_params(secret_name)) + monitor_fairness = fairness_op( + model_name=subscribe.output, + aios_manifest_path=AIOS_MANIFEST_PATH, + cos_bucket_name=BUCKET_NAME, + data_filename=data_preprocess_spark.output + ).apply(params.use_ai_pipeline_params(secret_name)) + + +if __name__ == '__main__': + import kfp.compiler as compiler + compiler.Compiler().compile(aiosPipeline, __file__ + '.tar.gz') diff --git a/samples/ibm-samples/openscale/source/aios.json b/samples/ibm-samples/openscale/source/aios.json new file mode 100644 index 00000000000..6113f246eaa --- /dev/null +++ b/samples/ibm-samples/openscale/source/aios.json @@ -0,0 +1,45 @@ +{ + "fairness_favourable_classes": ["No Risk"], + "fairness_unfavourable_classes": ["Risk"], + "fairness_features": [ + { + "feature_name": "Sex", + "majority": ["male"], + "minority": ["female"], + "threshold": 0.95 + }, + { + "feature_name": "Age", + "majority": [[26,75]], + "minority": [[18,25]], + "threshold": 0.95 + } + ], + "model_schema": [ + {"metadata": {"measure": "discrete","modeling_role": "feature"}, "name": "CheckingStatus", "nullable": true, "type": "string"}, + {"metadata": {"modeling_role": "feature"}, "name": "LoanDuration", "nullable": true, "type": "integer"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"}, "name": "CreditHistory", "nullable": true, "type": "string"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"}, "name": "LoanPurpose", "nullable": true, "type": "string"}, + {"metadata": {"modeling_role": "feature"}, "name": "LoanAmount", "nullable": true, "type": "integer"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"}, "name": "ExistingSavings", "nullable": true, "type": "string"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"}, "name": "EmploymentDuration", "nullable": true, "type": "string"}, + {"metadata": {"modeling_role": "feature"}, "name": "InstallmentPercent", "nullable": true, "type": "integer"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"}, "name": "Sex","nullable": true,"type": "string"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"},"name": "OthersOnLoan","nullable": true,"type": "string"}, + {"metadata": {"modeling_role": "feature"},"name": "CurrentResidenceDuration","nullable": true,"type": "integer"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"},"name": "OwnsProperty","nullable": true,"type": "string"}, + {"metadata": {"modeling_role": "feature"},"name": "Age","nullable": true,"type": "integer"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"},"name": "InstallmentPlans","nullable": true,"type": "string"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"},"name": "Housing","nullable": true,"type": "string"}, + {"metadata": {"modeling_role": "feature"},"name": "ExistingCreditsCount","nullable": true,"type": "integer"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"},"name": "Job","nullable": true,"type": "string"}, + {"metadata": {"modeling_role": "feature"},"name": "Dependents","nullable": true,"type": "integer"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"},"name": "Telephone","nullable": true,"type": "string"}, + {"metadata": {"measure": "discrete", "modeling_role": "feature"},"name": "ForeignWorker","nullable": true,"type": "string"}, + {"metadata": {"modeling_role": "probability"},"name": "probability","nullable": true,"type": {"containsNull": true, "elementType": "double", "type": "array"}}, + {"metadata": {"modeling_role": "prediction"},"name": "prediction","nullable": true,"type": "double"}, + {"metadata": {"modeling_role": "decoded-target"},"name": "predictedLabel","nullable": true,"type": "string"}, + {"metadata": {"modeling_role": "debiased-prediction"},"name": "debiased_prediction","nullable": true,"type": "double"}, + {"metadata": {"modeling_role": "debiased-probability"},"name": "debiased_probability","nullable": true,"type": {"containsNull": true,"elementType": "double","type": "array"}} + ] +} diff --git a/samples/ibm-samples/openscale/source/model.py b/samples/ibm-samples/openscale/source/model.py new file mode 100644 index 00000000000..be46c8077f5 --- /dev/null +++ b/samples/ibm-samples/openscale/source/model.py @@ -0,0 +1,75 @@ +import pyspark +from pyspark.sql import SparkSession +from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler +from pyspark.ml.evaluation import BinaryClassificationEvaluator +from pyspark.ml import Pipeline, Model +from pyspark.ml.classification import RandomForestClassifier +import json + +''' Read data with Spark SQL ''' +spark = SparkSession.builder.getOrCreate() +df_data = spark.read.csv(path="german_credit_data_biased_training.csv", sep=",", header=True, inferSchema=True) +df_data.head() + +spark_df = df_data +(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24) + +print("Number of records for training: " + str(train_data.count())) +print("Number of records for evaluation: " + str(test_data.count())) + +spark_df.printSchema() + +si_CheckingStatus = StringIndexer(inputCol = 'CheckingStatus', outputCol = 'CheckingStatus_IX') +si_CreditHistory = StringIndexer(inputCol = 'CreditHistory', outputCol = 'CreditHistory_IX') +si_LoanPurpose = StringIndexer(inputCol = 'LoanPurpose', outputCol = 'LoanPurpose_IX') +si_ExistingSavings = StringIndexer(inputCol = 'ExistingSavings', outputCol = 'ExistingSavings_IX') +si_EmploymentDuration = StringIndexer(inputCol = 'EmploymentDuration', outputCol = 'EmploymentDuration_IX') +si_Sex = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_IX') +si_OthersOnLoan = StringIndexer(inputCol = 'OthersOnLoan', outputCol = 'OthersOnLoan_IX') +si_OwnsProperty = StringIndexer(inputCol = 'OwnsProperty', outputCol = 'OwnsProperty_IX') +si_InstallmentPlans = StringIndexer(inputCol = 'InstallmentPlans', outputCol = 'InstallmentPlans_IX') +si_Housing = StringIndexer(inputCol = 'Housing', outputCol = 'Housing_IX') +si_Job = StringIndexer(inputCol = 'Job', outputCol = 'Job_IX') +si_Telephone = StringIndexer(inputCol = 'Telephone', outputCol = 'Telephone_IX') +si_ForeignWorker = StringIndexer(inputCol = 'ForeignWorker', outputCol = 'ForeignWorker_IX') + +si_Label = StringIndexer(inputCol="Risk", outputCol="label").fit(spark_df) +label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=si_Label.labels) +va_features = VectorAssembler(inputCols=["CheckingStatus_IX", "CreditHistory_IX", "LoanPurpose_IX", "ExistingSavings_IX", "EmploymentDuration_IX", "Sex_IX", \ + "OthersOnLoan_IX", "OwnsProperty_IX", "InstallmentPlans_IX", "Housing_IX", "Job_IX", "Telephone_IX", "ForeignWorker_IX", \ + "LoanDuration", "LoanAmount", "InstallmentPercent", "CurrentResidenceDuration", "LoanDuration", "Age", "ExistingCreditsCount", \ + "Dependents"], outputCol="features") + +''' Train Model with RF classifier ''' +classifier = RandomForestClassifier(featuresCol="features") + +pipeline = Pipeline(stages=[si_CheckingStatus, si_CreditHistory, si_EmploymentDuration, si_ExistingSavings, si_ForeignWorker, si_Housing, si_InstallmentPlans, si_Job, si_LoanPurpose, si_OthersOnLoan,\ + si_OwnsProperty, si_Sex, si_Telephone, si_Label, va_features, classifier, label_converter]) +model = pipeline.fit(train_data) + +predictions = model.transform(test_data) +evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction") +area_under_curve = evaluatorDT.evaluate(predictions) + +# default evaluation is areaUnderROC +print("areaUnderROC = %g" % area_under_curve) +print(model) +print(predictions) + +# Persistent model, pipeline, and training data +model.write().overwrite().save('model') +train_data.write.option("header", "true").mode("overwrite").csv('train_data') + +evaluation_metrics = { + 'metrics': [ + { + "name": "areaUnderROC", + "value": area_under_curve, + "threshold": 0.7 + } + ] +} + +with open('evaluation.json', 'w') as f: + json.dump(evaluation_metrics, f, indent=2) +f.close()