diff --git a/components/deprecated/dataflow/predict/component.yaml b/components/deprecated/dataflow/predict/component.yaml deleted file mode 100644 index e861e95a14e2..000000000000 --- a/components/deprecated/dataflow/predict/component.yaml +++ /dev/null @@ -1,33 +0,0 @@ -name: Predict using TF on Dataflow -description: | - Runs TensorFlow prediction on Google Cloud Dataflow - Input and output data is in GCS -inputs: - - {name: Data file pattern, type: GCSPath, description: 'GCS or local path of test file patterns.'} # type: {GCSPath: {data_type: CSV}} - - {name: Schema, type: GCSPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: TFDV schema JSON}} - - {name: Target column, type: String, description: 'Name of the column for prediction target.'} - - {name: Model, type: GCSPath, description: 'GCS or local path of model trained with tft preprocessed data.'} # Models trained with estimator are exported to base/export/export/123456781 directory. # Our trainer export only one model. #TODO: Output single model from trainer # type: {GCSPath: {path_type: Directory, data_type: Exported TensorFlow models dir}} - - {name: Batch size, type: Integer, default: '32', description: 'Batch size used in prediction.'} - - {name: Run mode, type: String, default: local, description: 'Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud".'} - - {name: GCP project, type: GCPProjectID, description: 'The GCP project to run the dataflow job.'} - - {name: Predictions dir, type: GCSPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}} -outputs: - - {name: Predictions dir, type: GCSPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}} - - {name: MLPipeline UI metadata, type: UI metadata} -implementation: - container: - image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:57d9f7f1cfd458e945d297957621716062d89a49 - command: [python2, /ml/predict.py] - args: [ - --data, {inputValue: Data file pattern}, - --schema, {inputValue: Schema}, - --target, {inputValue: Target column}, - --model, {inputValue: Model}, - --mode, {inputValue: Run mode}, - --project, {inputValue: GCP project}, - --batchsize, {inputValue: Batch size}, - --output, {inputValue: Predictions dir}, - ] - fileOutputs: - Predictions dir: /output.txt - MLPipeline UI metadata: /mlpipeline-ui-metadata.json diff --git a/components/deprecated/dataflow/tfdv/component.yaml b/components/deprecated/dataflow/tfdv/component.yaml deleted file mode 100644 index 869d3fb5cbb3..000000000000 --- a/components/deprecated/dataflow/tfdv/component.yaml +++ /dev/null @@ -1,34 +0,0 @@ -name: TFX - Data Validation -description: | - Runs Tensorflow Data Validation. https://www.tensorflow.org/tfx/data_validation/get_started - Tensorflow Data Validation (TFDV) can analyze training and serving data to: - * compute descriptive statistics, - * infer a schema, - * detect data anomalies. -inputs: -- {name: Inference data, type: GCSPath, description: GCS path of the CSV file from which to infer the schema.} # type: {GCSPath: {data_type: CSV}} -- {name: Validation data, type: GCSPath, description: GCS path of the CSV file whose contents should be validated.} # type: {GCSPath: {data_type: CSV}} -- {name: Column names, type: GCSPath, description: GCS json file containing a list of column names.} # type: {GCSPath: {data_type: JSON}} -- {name: Key columns, type: String, description: Comma separated list of columns to treat as keys.} -- {name: GCP project, type: GCPProjectID, default: '', description: The GCP project to run the dataflow job.} -- {name: Run mode, type: String, default: local, description: Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud". } -- {name: Validation output, type: GCSPath, description: GCS or local directory.} # type: {GCSPath: {path_type: Directory}} -outputs: -- {name: Schema, type: GCSPath, description: GCS path of the inferred schema JSON.} # type: {GCSPath: {data_type: TFDV schema JSON}} -- {name: Validation result, type: String, description: Indicates whether anomalies were detected or not.} -implementation: - container: - image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:57d9f7f1cfd458e945d297957621716062d89a49 - command: [python2, /ml/validate.py] - args: [ - --csv-data-for-inference, {inputValue: Inference data}, - --csv-data-to-validate, {inputValue: Validation data}, - --column-names, {inputValue: Column names}, - --key-columns, {inputValue: Key columns}, - --project, {inputValue: GCP project}, - --mode, {inputValue: Run mode}, - --output, {inputValue: Validation output}, - ] - fileOutputs: - Schema: /schema.txt - Validation result: /output_validation_result.txt \ No newline at end of file diff --git a/components/deprecated/dataflow/tfma/component.yaml b/components/deprecated/dataflow/tfma/component.yaml deleted file mode 100644 index c764b5a6c59c..000000000000 --- a/components/deprecated/dataflow/tfma/component.yaml +++ /dev/null @@ -1,34 +0,0 @@ -name: TFX - Analyze model -description: | - Runs Tensorflow Model Analysis. https://www.tensorflow.org/tfx/model_analysis/get_started - TensorFlow Model Analysis allows you to perform model evaluations in the TFX pipeline, and view resultant metrics and plots in a Jupyter notebook. Specifically, it can provide: - * metrics computed on entire training and holdout dataset, as well as next-day evaluations - * tracking metrics over time - * model quality performance on different feature slices -inputs: -- {name: Model, type: GCSPath, description: GCS path to the model which will be evaluated.} # type: {GCSPath: {path_type: Directory, data_type: Exported TensorFlow models dir}} -- {name: Evaluation data, type: GCSPath, description: GCS path of eval files.} # type: {GCSPath: {data_type: CSV}} -- {name: Schema, type: GCSPath, description: GCS json schema file path.} # type: {GCSPath: {data_type: TFDV schema JSON}} -- {name: Run mode, type: String, default: local, description: whether to run the job locally or in Cloud Dataflow.} -- {name: GCP project, type: GCPProjectID, default: '', description: 'The GCP project to run the dataflow job, if running in the `cloud` mode.'} -- {name: Slice columns, type: String, description: Comma-separated list of columns on which to slice for analysis.} -- {name: Analysis results dir, type: GCSPath, description: GCS or local directory where the analysis results should be written.} # type: {GCSPath: {path_type: Directory}} -outputs: -- {name: Analysis results dir, type: GCSPath, description: GCS or local directory where the analysis results should were written.} # type: {GCSPath: {path_type: Directory}} -- {name: MLPipeline UI metadata, type: UI metadata} -implementation: - container: - image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:57d9f7f1cfd458e945d297957621716062d89a49 - command: [python2, /ml/model_analysis.py] - args: [ - --model, {inputValue: Model}, - --eval, {inputValue: Evaluation data}, - --schema, {inputValue: Schema}, - --mode, {inputValue: Run mode}, - --project, {inputValue: GCP project}, - --slice-columns, {inputValue: Slice columns}, - --output, {inputValue: Analysis results dir}, - ] - fileOutputs: - Analysis results dir: /output.txt - MLPipeline UI metadata: /mlpipeline-ui-metadata.json diff --git a/components/deprecated/dataflow/tft/component.yaml b/components/deprecated/dataflow/tft/component.yaml deleted file mode 100644 index b777b966d960..000000000000 --- a/components/deprecated/dataflow/tft/component.yaml +++ /dev/null @@ -1,27 +0,0 @@ -name: Transform using TF on Dataflow -description: Runs TensorFlow Transform on Google Cloud Dataflow -inputs: - - {name: Training data file pattern, type: GCSPath, description: 'GCS path of train file patterns.'} #Also supports local CSV # type: {GCSPath: {data_type: CSV}} - - {name: Evaluation data file pattern, type: GCSPath, description: 'GCS path of eval file patterns.'} #Also supports local CSV # type: {GCSPath: {data_type: CSV}} - - {name: Schema, type: GCSPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: JSON}} - - {name: GCP project, type: GCPProjectID, description: 'The GCP project to run the dataflow job.'} - - {name: Run mode, type: String, default: local, description: 'Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud".' } - - {name: Preprocessing module, type: GCSPath, default: '', description: 'GCS path to a python file defining "preprocess" and "get_feature_columns" functions.'} # type: {GCSPath: {data_type: Python}} - - {name: Transformed data dir, type: GCSPath, description: 'GCS or local directory'} #Also supports local paths # type: {GCSPath: {path_type: Directory}} -outputs: - - {name: Transformed data dir, type: GCSPath} # type: {GCSPath: {path_type: Directory}} -implementation: - container: - image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:57d9f7f1cfd458e945d297957621716062d89a49 - command: [python2, /ml/transform.py] - args: [ - --train, {inputValue: Training data file pattern}, - --eval, {inputValue: Evaluation data file pattern}, - --schema, {inputValue: Schema}, - --project, {inputValue: GCP project}, - --mode, {inputValue: Run mode}, - --preprocessing-module, {inputValue: Preprocessing module}, - --output, {inputValue: Transformed data dir}, - ] - fileOutputs: - Transformed data dir: /output.txt diff --git a/components/deprecated/dataproc/analyze/src/analyze.py b/components/deprecated/dataproc/analyze/src/analyze.py index 6e19f33fff66..229d20eb41a9 100644 --- a/components/deprecated/dataproc/analyze/src/analyze.py +++ b/components/deprecated/dataproc/analyze/src/analyze.py @@ -25,6 +25,7 @@ import argparse import os +from pathlib import Path from common import _utils @@ -37,6 +38,10 @@ def main(argv=None): parser.add_argument('--output', type=str, help='GCS path to use for output.') parser.add_argument('--train', type=str, help='GCS path of the training csv file.') parser.add_argument('--schema', type=str, help='GCS path of the json schema file.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') args = parser.parse_args() code_path = os.path.dirname(os.path.realpath(__file__)) @@ -50,8 +55,8 @@ def main(argv=None): api, args.project, args.region, args.cluster, dest_files[0], spark_args) print('Job request submitted. Waiting for completion...') _utils.wait_for_job(api, args.project, args.region, job_id) - with open('/output.txt', 'w') as f: - f.write(args.output) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output) print('Job completed.') finally: diff --git a/components/deprecated/dataproc/base/Dockerfile b/components/deprecated/dataproc/base/Dockerfile index 3a7576e6cda4..6bca2476aaaf 100644 --- a/components/deprecated/dataproc/base/Dockerfile +++ b/components/deprecated/dataproc/base/Dockerfile @@ -21,7 +21,7 @@ RUN easy_install pip RUN pip install google-api-python-client==1.6.2 -RUN pip install tensorflow==1.6.0 +RUN pip install tensorflow==1.6.0 pathlib2 RUN wget -nv https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && \ unzip -qq google-cloud-sdk.zip -d tools && \ diff --git a/components/deprecated/dataproc/create_cluster/src/create_cluster.py b/components/deprecated/dataproc/create_cluster/src/create_cluster.py index 11e508ebf10b..e5318c20193e 100644 --- a/components/deprecated/dataproc/create_cluster/src/create_cluster.py +++ b/components/deprecated/dataproc/create_cluster/src/create_cluster.py @@ -22,6 +22,7 @@ import argparse import os +from pathlib import Path from common import _utils @@ -32,6 +33,10 @@ def main(argv=None): parser.add_argument('--region', type=str, help='Which zone for GCE VMs.') parser.add_argument('--name', type=str, help='The name of the cluster to create.') parser.add_argument('--staging', type=str, help='GCS path to use for staging.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') args = parser.parse_args() code_path = os.path.dirname(os.path.realpath(__file__)) @@ -44,8 +49,8 @@ def main(argv=None): create_response = _utils.create_cluster(api, args.project, args.region, args.name, dest_files[0]) print('Cluster creation request submitted. Waiting for completion...') _utils.wait_for_operation(api, create_response['name']) - with open('/output.txt', 'w') as f: - f.write(args.name) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output) print('Cluster created.') finally: _utils.remove_resources_from_gcs(dest_files) diff --git a/components/deprecated/dataproc/predict/src/predict.py b/components/deprecated/dataproc/predict/src/predict.py index 8b41433bc812..83d6f0fa6c2b 100644 --- a/components/deprecated/dataproc/predict/src/predict.py +++ b/components/deprecated/dataproc/predict/src/predict.py @@ -32,6 +32,7 @@ import argparse import json import os +from pathlib import Path from common import _utils import logging @@ -50,6 +51,15 @@ def main(argv=None): parser.add_argument('--predict', type=str, help='GCS path of prediction libsvm file.') parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.') parser.add_argument('--target', type=str, help='Target column name.') + parser.add_argument('--prediction-results-uri-pattern-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing prediction results URI pattern.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') + args = parser.parse_args() logging.getLogger().setLevel(logging.INFO) @@ -61,9 +71,9 @@ def main(argv=None): 'ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor', spark_args) logging.info('Job request submitted. Waiting for completion...') _utils.wait_for_job(api, args.project, args.region, job_id) - prediction_results = os.path.join(args.output, 'part-*.csv') - with open('/output.txt', 'w') as f: - f.write(prediction_results) + prediction_results_uri_pattern = os.path.join(args.output, 'part-*.csv') + Path(args.prediction_results_uri_pattern_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.prediction_results_uri_pattern_output_path).write_text(prediction_results_uri_pattern) with file_io.FileIO(os.path.join(args.output, 'schema.json'), 'r') as f: schema = json.load(f) @@ -74,11 +84,11 @@ def main(argv=None): 'storage': 'gcs', 'format': 'csv', 'header': [x['name'] for x in schema], - 'source': prediction_results + 'source': prediction_results_uri_pattern }] } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) logging.info('Job completed.') diff --git a/components/deprecated/dataproc/train/src/train.py b/components/deprecated/dataproc/train/src/train.py index 31a59909f9ed..274688116e34 100644 --- a/components/deprecated/dataproc/train/src/train.py +++ b/components/deprecated/dataproc/train/src/train.py @@ -32,6 +32,7 @@ import argparse import logging +from pathlib import Path from common import _utils @@ -51,6 +52,11 @@ def main(argv=None): parser.add_argument('--eval', type=str, help='GCS path of the eval libsvm file pattern.') parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.') parser.add_argument('--target', type=str, help='Target column name.') + parser.add_argument('--output-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing the output dir URI.') + args = parser.parse_args() logging.getLogger().setLevel(logging.INFO) @@ -63,8 +69,8 @@ def main(argv=None): 'ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer', spark_args) logging.info('Job request submitted. Waiting for completion...') _utils.wait_for_job(api, args.project, args.region, job_id) - with open('/output.txt', 'w') as f: - f.write(args.output) + Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.output_dir_uri_output_path).write_text(args.output) logging.info('Job completed.') diff --git a/components/kubeflow/dnntrainer/component.yaml b/components/kubeflow/dnntrainer/component.yaml index 255c96ebebb2..42966647037a 100644 --- a/components/kubeflow/dnntrainer/component.yaml +++ b/components/kubeflow/dnntrainer/component.yaml @@ -29,7 +29,6 @@ implementation: --target, {inputValue: Target}, --preprocessing-module, {inputValue: Preprocessing module}, --job-dir, {inputValue: Training output dir}, + --exported-model-dir-uri-output-path, {outputPath: Training output dir}, + --ui-metadata-output-path, {outputPath: MLPipeline UI metadata}, ] - fileOutputs: - Training output dir: /output.txt - MLPipeline UI metadata: /mlpipeline-ui-metadata.json diff --git a/components/kubeflow/dnntrainer/src/trainer/task.py b/components/kubeflow/dnntrainer/src/trainer/task.py index aca2eb276989..4f6733bbfecb 100644 --- a/components/kubeflow/dnntrainer/src/trainer/task.py +++ b/components/kubeflow/dnntrainer/src/trainer/task.py @@ -16,6 +16,7 @@ import argparse import json import os +from pathlib import Path import tensorflow as tf import tensorflow_transform as tft import tensorflow_model_analysis as tfma @@ -80,6 +81,14 @@ def parse_arguments(): required=False, help=('GCS path to a python file defining ' '"preprocess" and "get_feature_columns" functions.')) + parser.add_argument('--exported-model-dir-uri-output-path', + type=str, + default='/output.txt', + help='Local output path for the file containing exported model directory URI.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') args = parser.parse_args() args.hidden_layer_size = [int(x.strip()) for x in args.hidden_layer_size.split(',')] @@ -341,11 +350,11 @@ def main(): 'source': args.job_dir, }] } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) - with open('/output.txt', 'w') as f: - f.write(args.job_dir) + Path(args.exported_model_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.exported_model_dir_uri_output_path).write_text(args.job_dir) if __name__ == '__main__': main() diff --git a/components/local/confusion_matrix/component.yaml b/components/local/confusion_matrix/component.yaml index 441f9620e68d..896b4a3a8579 100644 --- a/components/local/confusion_matrix/component.yaml +++ b/components/local/confusion_matrix/component.yaml @@ -15,7 +15,6 @@ implementation: --predictions, {inputValue: Predictions}, --target_lambda, {inputValue: Target lambda}, --output, {inputValue: Output dir}, + --ui-metadata-output-path, {outputPath: MLPipeline UI metadata}, + --metrics-output-path, {outputPath: MLPipeline Metrics}, ] - fileOutputs: - MLPipeline UI metadata: /mlpipeline-ui-metadata.json - MLPipeline Metrics: /mlpipeline-metrics.json diff --git a/components/local/confusion_matrix/src/confusion_matrix.py b/components/local/confusion_matrix/src/confusion_matrix.py index 636c1db1a914..b9bd33d67ecb 100644 --- a/components/local/confusion_matrix/src/confusion_matrix.py +++ b/components/local/confusion_matrix/src/confusion_matrix.py @@ -27,6 +27,7 @@ import os import urlparse import pandas as pd +from pathlib import Path from sklearn.metrics import confusion_matrix, accuracy_score from tensorflow.python.lib.io import file_io @@ -39,6 +40,15 @@ def main(argv=None): help='a lambda function as a string to compute target.' + 'For example, "lambda x: x[\'a\'] + x[\'b\']"' + 'If not set, the input must include a "target" column.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') + parser.add_argument('--metrics-output-path', + type=str, + default='/mlpipeline-metrics.json', + help='Local output path for the file containing metrics JSON structure.') + args = parser.parse_args() storage_service_scheme = urlparse.urlparse(args.output).scheme @@ -85,8 +95,8 @@ def main(argv=None): 'labels': list(map(str, vocab)), }] } - with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) accuracy = accuracy_score(df['target'], df['predicted']) metrics = { @@ -96,8 +106,8 @@ def main(argv=None): 'format': "PERCENTAGE", }] } - with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f: - json.dump(metrics, f) + Path(args.metrics_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.metrics_output_path).write_text(json.dumps(metrics)) if __name__== "__main__": main() diff --git a/components/local/roc/component.yaml b/components/local/roc/component.yaml index 2bed42d3dc8d..c04ce019de1a 100644 --- a/components/local/roc/component.yaml +++ b/components/local/roc/component.yaml @@ -19,7 +19,6 @@ implementation: --true_score_column, {inputValue: True score column}, --target_lambda, {inputValue: Target lambda}, --output, {inputValue: Output dir}, + --ui-metadata-output-path, {outputPath: MLPipeline UI metadata}, + --metrics-output-path, {outputPath: MLPipeline Metrics}, ] - fileOutputs: - MLPipeline UI metadata: /mlpipeline-ui-metadata.json - MLPipeline Metrics: /mlpipeline-metrics.json diff --git a/components/local/roc/src/roc.py b/components/local/roc/src/roc.py index b67f25e52646..1d5626490935 100644 --- a/components/local/roc/src/roc.py +++ b/components/local/roc/src/roc.py @@ -26,6 +26,7 @@ import os import urlparse import pandas as pd +from pathlib import Path from sklearn.metrics import roc_curve, roc_auc_score from tensorflow.python.lib.io import file_io @@ -44,6 +45,14 @@ def main(argv=None): 'For example, "lambda x: x[\'a\'] and x[\'b\']". If missing, ' + 'input must have a "target" column.') parser.add_argument('--output', type=str, help='GCS path of the output directory.') + parser.add_argument('--ui-metadata-output-path', + type=str, + default='/mlpipeline-ui-metadata.json', + help='Local output path for the file containing UI metadata JSON structure.') + parser.add_argument('--metrics-output-path', + type=str, + default='/mlpipeline-metrics.json', + help='Local output path for the file containing metrics JSON structure.') args = parser.parse_args() storage_service_scheme = urlparse.urlparse(args.output).scheme @@ -91,8 +100,8 @@ def main(argv=None): 'source': roc_file }] } - with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) + Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.ui_metadata_output_path).write_text(json.dumps(metadata)) metrics = { 'metrics': [{ @@ -100,8 +109,8 @@ def main(argv=None): 'numberValue': roc_auc, }] } - with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f: - json.dump(metrics, f) + Path(args.metrics_output_path).parent.mkdir(parents=True, exist_ok=True) + Path(args.metrics_output_path).write_text(json.dumps(metrics)) if __name__== "__main__": main()