From 8434bd70fd833b289bebcce8bb9af1fbeab30c45 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 20 Dec 2018 19:13:54 -0800 Subject: [PATCH] Components - De-hardcoded local output paths. --- components/dataflow/predict/src/predict.py | 22 +++++++--- .../dataflow/tfma/src/model_analysis.py | 41 ++++++++++++------- components/dataflow/tft/src/transform.py | 9 +++- components/dataproc/analyze/src/analyze.py | 9 +++- .../create_cluster/src/create_cluster.py | 9 +++- components/dataproc/predict/src/predict.py | 22 +++++++--- components/dataproc/train/src/train.py | 10 ++++- .../kubeflow/dnntrainer/src/trainer/task.py | 17 ++++++-- .../kubeflow/launcher/src/launch_tf_job.py | 17 ++++++-- .../confusion_matrix/src/confusion_matrix.py | 18 ++++++-- components/local/roc/src/roc.py | 17 ++++++-- components/resnet-cmle/resnet/preprocess.py | 9 +++- components/resnet-cmle/resnet/train.py | 17 ++++++-- 13 files changed, 161 insertions(+), 56 deletions(-) diff --git a/components/dataflow/predict/src/predict.py b/components/dataflow/predict/src/predict.py index f68e89031a3e..ce9ac79dce0d 100644 --- a/components/dataflow/predict/src/predict.py +++ b/components/dataflow/predict/src/predict.py @@ -22,6 +22,7 @@ import json import logging import os +from pathlib import Path from tensorflow.python.lib.io import file_io @@ -60,6 +61,14 @@ def parse_arguments(): type=int, default=32, help='Batch size used in prediction.') + parser.add_argument('--prediction-results-uri-pattern-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing prediction results URI pattern.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') args = parser.parse_args() return args @@ -236,9 +245,10 @@ def main(): run_predict(args.output, args.data, schema, args.target, model_export_dir, args.project, args.mode, args.batchsize) - prediction_results = os.path.join(args.output, 'prediction_results-*') - with open('/output.txt', 'w') as f: - f.write(prediction_results) + prediction_results_uri_pattern = os.path.join(args.output, 'prediction_results-*') + Path(args.prediction_results_uri_pattern_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.prediction_results_uri_pattern_output_path).write_text(prediction_results_uri_pattern) + with file_io.FileIO(os.path.join(args.output, 'schema.json'), 'r') as f: schema = json.load(f) @@ -249,11 +259,11 @@ def main(): 'storage': 'gcs', 'format': 'csv', 'header': [x['name'] for x in schema], - 'source': prediction_results + 'source': prediction_results_uri_pattern }] } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) if __name__== "__main__": diff --git a/components/dataflow/tfma/src/model_analysis.py b/components/dataflow/tfma/src/model_analysis.py index 1a885bd6224d..e6bee02c2253 100644 --- a/components/dataflow/tfma/src/model_analysis.py +++ b/components/dataflow/tfma/src/model_analysis.py @@ -21,6 +21,7 @@ import datetime import json import os +from pathlib import Path import apache_beam as beam from ipywidgets.embed import embed_data @@ -117,6 +118,14 @@ def parse_arguments(): action='append', required=True, help='one or more columns on which to slice for analysis.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing output dir URI.') return parser.parse_args() @@ -207,7 +216,19 @@ def generate_static_html_output(output_dir, slicing_columns): views_html += _SINGLE_WIDGET_TEMPLATE.format(idx, view) rendered_template = _STATIC_HTML_TEMPLATE.format( manager_state=manager_state, widget_views=views_html) - static_html_path = os.path.join(output_dir, _OUTPUT_HTML_FILE) + return rendered_template + +def main(): + tf.logging.set_verbosity(tf.logging.INFO) + args = parse_arguments() + schema = json.loads(file_io.read_file_to_string(args.schema)) + eval_model_parent_dir = os.path.join(args.model, 'tfma_eval_model_dir') + model_export_dir = os.path.join(eval_model_parent_dir, file_io.list_directory(eval_model_parent_dir)[0]) + run_analysis(args.output, model_export_dir, args.eval, schema, + args.project, args.mode, args.slice_columns) + rendered_template = generate_static_html_output(args.output, args.slice_columns) + + static_html_path = os.path.join(args.output, _OUTPUT_HTML_FILE) file_io.write_string_to_file(static_html_path, rendered_template) metadata = { @@ -217,21 +238,13 @@ def generate_static_html_output(output_dir, slicing_columns): 'source': static_html_path, }] } - with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + ui_metadata_text = json.dumps(metadata) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(ui_metadata_text) -def main(): - tf.logging.set_verbosity(tf.logging.INFO) - args = parse_arguments() - schema = json.loads(file_io.read_file_to_string(args.schema)) - eval_model_parent_dir = os.path.join(args.model, 'tfma_eval_model_dir') - model_export_dir = os.path.join(eval_model_parent_dir, file_io.list_directory(eval_model_parent_dir)[0]) - run_analysis(args.output, model_export_dir, args.eval, schema, - args.project, args.mode, args.slice_columns) - generate_static_html_output(args.output, args.slice_columns) - with open('/output.txt', 'w') as f: - f.write(args.output) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output) if __name__== "__main__": main() diff --git a/components/dataflow/tft/src/transform.py b/components/dataflow/tft/src/transform.py index cde706e0c08b..f2ace8d27019 100644 --- a/components/dataflow/tft/src/transform.py +++ b/components/dataflow/tft/src/transform.py @@ -20,6 +20,7 @@ import json import logging import os +from pathlib import Path import tensorflow as tf import tensorflow_transform as tft @@ -80,6 +81,10 @@ def parse_arguments(): required=False, help=('GCS path to a python file defining ' '"preprocess" and "get_feature_columns" functions.')) + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') args = parser.parse_args() return args @@ -296,8 +301,8 @@ def wrapped_preprocessing_fn(inputs): run_transform(args.output, schema, args.train, args.eval, args.project, args.mode, preprocessing_fn=preprocessing_fn) - with open('/output.txt', 'w') as f: - f.write(args.output) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output) if __name__== "__main__": main() diff --git a/components/dataproc/analyze/src/analyze.py b/components/dataproc/analyze/src/analyze.py index 6e19f33fff66..229d20eb41a9 100644 --- a/components/dataproc/analyze/src/analyze.py +++ b/components/dataproc/analyze/src/analyze.py @@ -25,6 +25,7 @@ import argparse import os +from pathlib import Path from common import _utils @@ -37,6 +38,10 @@ def main(argv=None): parser.add_argument('--output', type=str, help='GCS path to use for output.') parser.add_argument('--train', type=str, help='GCS path of the training csv file.') parser.add_argument('--schema', type=str, help='GCS path of the json schema file.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') args = parser.parse_args() code_path = os.path.dirname(os.path.realpath(__file__)) @@ -50,8 +55,8 @@ def main(argv=None): api, args.project, args.region, args.cluster, dest_files[0], spark_args) print('Job request submitted. Waiting for completion...') _utils.wait_for_job(api, args.project, args.region, job_id) - with open('/output.txt', 'w') as f: - f.write(args.output) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output) print('Job completed.') finally: diff --git a/components/dataproc/create_cluster/src/create_cluster.py b/components/dataproc/create_cluster/src/create_cluster.py index 11e508ebf10b..e5318c20193e 100644 --- a/components/dataproc/create_cluster/src/create_cluster.py +++ b/components/dataproc/create_cluster/src/create_cluster.py @@ -22,6 +22,7 @@ import argparse import os +from pathlib import Path from common import _utils @@ -32,6 +33,10 @@ def main(argv=None): parser.add_argument('--region', type=str, help='Which zone for GCE VMs.') parser.add_argument('--name', type=str, help='The name of the cluster to create.') parser.add_argument('--staging', type=str, help='GCS path to use for staging.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') args = parser.parse_args() code_path = os.path.dirname(os.path.realpath(__file__)) @@ -44,8 +49,8 @@ def main(argv=None): create_response = _utils.create_cluster(api, args.project, args.region, args.name, dest_files[0]) print('Cluster creation request submitted. Waiting for completion...') _utils.wait_for_operation(api, create_response['name']) - with open('/output.txt', 'w') as f: - f.write(args.name) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output) print('Cluster created.') finally: _utils.remove_resources_from_gcs(dest_files) diff --git a/components/dataproc/predict/src/predict.py b/components/dataproc/predict/src/predict.py index 8b41433bc812..83d6f0fa6c2b 100644 --- a/components/dataproc/predict/src/predict.py +++ b/components/dataproc/predict/src/predict.py @@ -32,6 +32,7 @@ import argparse import json import os +from pathlib import Path from common import _utils import logging @@ -50,6 +51,15 @@ def main(argv=None): parser.add_argument('--predict', type=str, help='GCS path of prediction libsvm file.') parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.') parser.add_argument('--target', type=str, help='Target column name.') + parser.add_argument('--prediction-results-uri-pattern-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing prediction results URI pattern.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') + args = parser.parse_args() logging.getLogger().setLevel(logging.INFO) @@ -61,9 +71,9 @@ def main(argv=None): 'ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor', spark_args) logging.info('Job request submitted. Waiting for completion...') _utils.wait_for_job(api, args.project, args.region, job_id) - prediction_results = os.path.join(args.output, 'part-*.csv') - with open('/output.txt', 'w') as f: - f.write(prediction_results) + prediction_results_uri_pattern = os.path.join(args.output, 'part-*.csv') + Path(args.prediction_results_uri_pattern_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.prediction_results_uri_pattern_output_path).write_text(prediction_results_uri_pattern) with file_io.FileIO(os.path.join(args.output, 'schema.json'), 'r') as f: schema = json.load(f) @@ -74,11 +84,11 @@ def main(argv=None): 'storage': 'gcs', 'format': 'csv', 'header': [x['name'] for x in schema], - 'source': prediction_results + 'source': prediction_results_uri_pattern }] } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) logging.info('Job completed.') diff --git a/components/dataproc/train/src/train.py b/components/dataproc/train/src/train.py index 31a59909f9ed..274688116e34 100644 --- a/components/dataproc/train/src/train.py +++ b/components/dataproc/train/src/train.py @@ -32,6 +32,7 @@ import argparse import logging +from pathlib import Path from common import _utils @@ -51,6 +52,11 @@ def main(argv=None): parser.add_argument('--eval', type=str, help='GCS path of the eval libsvm file pattern.') parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.') parser.add_argument('--target', type=str, help='Target column name.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') + args = parser.parse_args() logging.getLogger().setLevel(logging.INFO) @@ -63,8 +69,8 @@ def main(argv=None): 'ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer', spark_args) logging.info('Job request submitted. Waiting for completion...') _utils.wait_for_job(api, args.project, args.region, job_id) - with open('/output.txt', 'w') as f: - f.write(args.output) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output) logging.info('Job completed.') diff --git a/components/kubeflow/dnntrainer/src/trainer/task.py b/components/kubeflow/dnntrainer/src/trainer/task.py index aca2eb276989..4f6733bbfecb 100644 --- a/components/kubeflow/dnntrainer/src/trainer/task.py +++ b/components/kubeflow/dnntrainer/src/trainer/task.py @@ -16,6 +16,7 @@ import argparse import json import os +from pathlib import Path import tensorflow as tf import tensorflow_transform as tft import tensorflow_model_analysis as tfma @@ -80,6 +81,14 @@ def parse_arguments(): required=False, help=('GCS path to a python file defining ' '"preprocess" and "get_feature_columns" functions.')) + parser.add_argument('--exported-model-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing exported model directory URI.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') args = parser.parse_args() args.hidden_layer_size = [int(x.strip()) for x in args.hidden_layer_size.split(',')] @@ -341,11 +350,11 @@ def main(): 'source': args.job_dir, }] } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) - with open('/output.txt', 'w') as f: - f.write(args.job_dir) + Path(args.exported_model_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.exported_model_dir_uri_output_path).write_text(args.job_dir) if __name__ == '__main__': main() diff --git a/components/kubeflow/launcher/src/launch_tf_job.py b/components/kubeflow/launcher/src/launch_tf_job.py index a728ae76df9b..48d6817a85c8 100644 --- a/components/kubeflow/launcher/src/launch_tf_job.py +++ b/components/kubeflow/launcher/src/launch_tf_job.py @@ -36,6 +36,7 @@ import json import os import logging +from pathlib import Path import requests import subprocess import six @@ -100,7 +101,15 @@ def main(argv=None): default=10, help='Time in minutes to wait for the TFJob to complete') parser.add_argument('--output-dir', type=str) + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') parser.add_argument('--ui-metadata-type', type=str, default='tensorboard') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') import sys all_args = sys.argv[1:] separator_idx = all_args.index('--') @@ -153,8 +162,8 @@ def main(argv=None): 'source': args.output_dir, }] } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) wait_response = tf_job_client.wait_for_job( api_client, tfjob_ns, job_name, kf_version, @@ -190,8 +199,8 @@ def main(argv=None): logging.info('Training success.') tf_job_client.delete_tf_job(api_client, tfjob_ns, job_name, version=kf_version) - with open('/output.txt', 'w') as f: - f.write(args.output_dir) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output_dir) if __name__== "__main__": main() diff --git a/components/local/confusion_matrix/src/confusion_matrix.py b/components/local/confusion_matrix/src/confusion_matrix.py index f7d2701e727f..8c95d41c587c 100644 --- a/components/local/confusion_matrix/src/confusion_matrix.py +++ b/components/local/confusion_matrix/src/confusion_matrix.py @@ -26,6 +26,7 @@ import json import os import pandas as pd +from pathlib import Path from sklearn.metrics import confusion_matrix, accuracy_score from tensorflow.python.lib.io import file_io @@ -38,6 +39,15 @@ def main(argv=None): help='a lambda function as a string to compute target.' + 'For example, "lambda x: x[\'a\'] + x[\'b\']"' + 'If not set, the input must include a "target" column.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') + parser.add_argument('--metrics-output-path', + type=str, + default='/mlpipeline-metrics.json', + help='Local output path for the file containing metrics JSON structure.') + args = parser.parse_args() schema_file = os.path.join(os.path.dirname(args.predictions), 'schema.json') @@ -80,8 +90,8 @@ def main(argv=None): 'labels': list(map(str, vocab)), }] } - with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) accuracy = accuracy_score(df['target'], df['predicted']) metrics = { @@ -91,8 +101,8 @@ def main(argv=None): 'format': "PERCENTAGE", }] } - with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f: - json.dump(metrics, f) + Path(args.metrics_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.metrics_output_path).write_text(json.dumps(metrics)) if __name__== "__main__": main() diff --git a/components/local/roc/src/roc.py b/components/local/roc/src/roc.py index 9de1edf244d2..b57822acd96e 100644 --- a/components/local/roc/src/roc.py +++ b/components/local/roc/src/roc.py @@ -25,6 +25,7 @@ import json import os import pandas as pd +from pathlib import Path from sklearn.metrics import roc_curve, roc_auc_score from tensorflow.python.lib.io import file_io @@ -43,6 +44,14 @@ def main(argv=None): 'For example, "lambda x: x[\'a\'] and x[\'b\']". If missing, ' + 'input must have a "target" column.') parser.add_argument('--output', type=str, help='GCS path of the output directory.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') + parser.add_argument('--metrics-output-path', + type=str, + default='/mlpipeline-metrics.json', + help='Local output path for the file containing metrics JSON structure.') args = parser.parse_args() schema_file = os.path.join(os.path.dirname(args.predictions), 'schema.json') @@ -86,8 +95,8 @@ def main(argv=None): 'source': roc_file }] } - with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) metrics = { 'metrics': [{ @@ -95,8 +104,8 @@ def main(argv=None): 'numberValue': roc_auc, }] } - with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f: - json.dump(metrics, f) + Path(args.metrics_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.metrics_output_path).write_text(json.dumps(metrics)) if __name__== "__main__": main() diff --git a/components/resnet-cmle/resnet/preprocess.py b/components/resnet-cmle/resnet/preprocess.py index 8e015f0cf5e6..b2f436738041 100644 --- a/components/resnet-cmle/resnet/preprocess.py +++ b/components/resnet-cmle/resnet/preprocess.py @@ -21,6 +21,7 @@ import os import subprocess import logging +from pathlib import Path logging.getLogger().setLevel(logging.INFO) @@ -47,6 +48,10 @@ def parse_arguments(): type = str, default = 'gs://flowers_resnet/labels.txt', help = 'Path to labels.txt.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') args = parser.parse_args() return args @@ -56,8 +61,8 @@ def parse_arguments(): output_dir = args.output + '/tpu/preprocessed' - with open("/output.txt", "w") as output_file: - output_file.write(output_dir) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(output_dir) logging.info('Removing old data from ' + output_dir) subprocess.call('gsutil -m rm -rf ' + output_dir, shell=True) diff --git a/components/resnet-cmle/resnet/train.py b/components/resnet-cmle/resnet/train.py index a8fa35953c3c..61138daa4687 100644 --- a/components/resnet-cmle/resnet/train.py +++ b/components/resnet-cmle/resnet/train.py @@ -20,6 +20,7 @@ import argparse import json import os +from pathlib import Path from time import gmtime, strftime import subprocess import logging @@ -76,6 +77,14 @@ def parse_arguments(): type = str, default = '1.9', help = 'Version of TensorFlow to use.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') args = parser.parse_args() return args @@ -104,8 +113,8 @@ def parse_arguments(): --num_train_images=' + str(args.num_train_images) + ' --num_eval_images=' + str(args.num_eval_images) + ' --num_label_classes=' + str(args.num_label_classes) + ' \ --export_dir=' + output_dir + '/export', shell=True) - with open("/output.txt", "w") as output_file: - output_file.write(output_dir) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(output_dir) metadata = { 'outputs' : [{ @@ -113,5 +122,5 @@ def parse_arguments(): 'source': output_dir, }] } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))