Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert(components): refactor - De-hardcoded local output paths #4478

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions components/deprecated/dataflow/predict/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Predict using TF on Dataflow
description: |
Runs TensorFlow prediction on Google Cloud Dataflow
Input and output data is in GCS
inputs:
- {name: Data file pattern, type: GCSPath, description: 'GCS or local path of test file patterns.'} # type: {GCSPath: {data_type: CSV}}
- {name: Schema, type: GCSPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: TFDV schema JSON}}
- {name: Target column, type: String, description: 'Name of the column for prediction target.'}
- {name: Model, type: GCSPath, description: 'GCS or local path of model trained with tft preprocessed data.'} # Models trained with estimator are exported to base/export/export/123456781 directory. # Our trainer export only one model. #TODO: Output single model from trainer # type: {GCSPath: {path_type: Directory, data_type: Exported TensorFlow models dir}}
- {name: Batch size, type: Integer, default: '32', description: 'Batch size used in prediction.'}
- {name: Run mode, type: String, default: local, description: 'Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud".'}
- {name: GCP project, type: GCPProjectID, description: 'The GCP project to run the dataflow job.'}
- {name: Predictions dir, type: GCSPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}}
outputs:
- {name: Predictions dir, type: GCSPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}}
- {name: MLPipeline UI metadata, type: UI metadata}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:57d9f7f1cfd458e945d297957621716062d89a49
command: [python2, /ml/predict.py]
args: [
--data, {inputValue: Data file pattern},
--schema, {inputValue: Schema},
--target, {inputValue: Target column},
--model, {inputValue: Model},
--mode, {inputValue: Run mode},
--project, {inputValue: GCP project},
--batchsize, {inputValue: Batch size},
--output, {inputValue: Predictions dir},
]
fileOutputs:
Predictions dir: /output.txt
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
34 changes: 34 additions & 0 deletions components/deprecated/dataflow/tfdv/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: TFX - Data Validation
description: |
Runs Tensorflow Data Validation. https://www.tensorflow.org/tfx/data_validation/get_started
Tensorflow Data Validation (TFDV) can analyze training and serving data to:
* compute descriptive statistics,
* infer a schema,
* detect data anomalies.
inputs:
- {name: Inference data, type: GCSPath, description: GCS path of the CSV file from which to infer the schema.} # type: {GCSPath: {data_type: CSV}}
- {name: Validation data, type: GCSPath, description: GCS path of the CSV file whose contents should be validated.} # type: {GCSPath: {data_type: CSV}}
- {name: Column names, type: GCSPath, description: GCS json file containing a list of column names.} # type: {GCSPath: {data_type: JSON}}
- {name: Key columns, type: String, description: Comma separated list of columns to treat as keys.}
- {name: GCP project, type: GCPProjectID, default: '', description: The GCP project to run the dataflow job.}
- {name: Run mode, type: String, default: local, description: Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud". }
- {name: Validation output, type: GCSPath, description: GCS or local directory.} # type: {GCSPath: {path_type: Directory}}
outputs:
- {name: Schema, type: GCSPath, description: GCS path of the inferred schema JSON.} # type: {GCSPath: {data_type: TFDV schema JSON}}
- {name: Validation result, type: String, description: Indicates whether anomalies were detected or not.}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:57d9f7f1cfd458e945d297957621716062d89a49
command: [python2, /ml/validate.py]
args: [
--csv-data-for-inference, {inputValue: Inference data},
--csv-data-to-validate, {inputValue: Validation data},
--column-names, {inputValue: Column names},
--key-columns, {inputValue: Key columns},
--project, {inputValue: GCP project},
--mode, {inputValue: Run mode},
--output, {inputValue: Validation output},
]
fileOutputs:
Schema: /schema.txt
Validation result: /output_validation_result.txt
34 changes: 34 additions & 0 deletions components/deprecated/dataflow/tfma/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: TFX - Analyze model
description: |
Runs Tensorflow Model Analysis. https://www.tensorflow.org/tfx/model_analysis/get_started
TensorFlow Model Analysis allows you to perform model evaluations in the TFX pipeline, and view resultant metrics and plots in a Jupyter notebook. Specifically, it can provide:
* metrics computed on entire training and holdout dataset, as well as next-day evaluations
* tracking metrics over time
* model quality performance on different feature slices
inputs:
- {name: Model, type: GCSPath, description: GCS path to the model which will be evaluated.} # type: {GCSPath: {path_type: Directory, data_type: Exported TensorFlow models dir}}
- {name: Evaluation data, type: GCSPath, description: GCS path of eval files.} # type: {GCSPath: {data_type: CSV}}
- {name: Schema, type: GCSPath, description: GCS json schema file path.} # type: {GCSPath: {data_type: TFDV schema JSON}}
- {name: Run mode, type: String, default: local, description: whether to run the job locally or in Cloud Dataflow.}
- {name: GCP project, type: GCPProjectID, default: '', description: 'The GCP project to run the dataflow job, if running in the `cloud` mode.'}
- {name: Slice columns, type: String, description: Comma-separated list of columns on which to slice for analysis.}
- {name: Analysis results dir, type: GCSPath, description: GCS or local directory where the analysis results should be written.} # type: {GCSPath: {path_type: Directory}}
outputs:
- {name: Analysis results dir, type: GCSPath, description: GCS or local directory where the analysis results should were written.} # type: {GCSPath: {path_type: Directory}}
- {name: MLPipeline UI metadata, type: UI metadata}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:57d9f7f1cfd458e945d297957621716062d89a49
command: [python2, /ml/model_analysis.py]
args: [
--model, {inputValue: Model},
--eval, {inputValue: Evaluation data},
--schema, {inputValue: Schema},
--mode, {inputValue: Run mode},
--project, {inputValue: GCP project},
--slice-columns, {inputValue: Slice columns},
--output, {inputValue: Analysis results dir},
]
fileOutputs:
Analysis results dir: /output.txt
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
27 changes: 27 additions & 0 deletions components/deprecated/dataflow/tft/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Transform using TF on Dataflow
description: Runs TensorFlow Transform on Google Cloud Dataflow
inputs:
- {name: Training data file pattern, type: GCSPath, description: 'GCS path of train file patterns.'} #Also supports local CSV # type: {GCSPath: {data_type: CSV}}
- {name: Evaluation data file pattern, type: GCSPath, description: 'GCS path of eval file patterns.'} #Also supports local CSV # type: {GCSPath: {data_type: CSV}}
- {name: Schema, type: GCSPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: JSON}}
- {name: GCP project, type: GCPProjectID, description: 'The GCP project to run the dataflow job.'}
- {name: Run mode, type: String, default: local, description: 'Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud".' }
- {name: Preprocessing module, type: GCSPath, default: '', description: 'GCS path to a python file defining "preprocess" and "get_feature_columns" functions.'} # type: {GCSPath: {data_type: Python}}
- {name: Transformed data dir, type: GCSPath, description: 'GCS or local directory'} #Also supports local paths # type: {GCSPath: {path_type: Directory}}
outputs:
- {name: Transformed data dir, type: GCSPath} # type: {GCSPath: {path_type: Directory}}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:57d9f7f1cfd458e945d297957621716062d89a49
command: [python2, /ml/transform.py]
args: [
--train, {inputValue: Training data file pattern},
--eval, {inputValue: Evaluation data file pattern},
--schema, {inputValue: Schema},
--project, {inputValue: GCP project},
--mode, {inputValue: Run mode},
--preprocessing-module, {inputValue: Preprocessing module},
--output, {inputValue: Transformed data dir},
]
fileOutputs:
Transformed data dir: /output.txt
9 changes: 2 additions & 7 deletions components/deprecated/dataproc/analyze/src/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import argparse
import os
from pathlib import Path

from common import _utils

Expand All @@ -38,10 +37,6 @@ def main(argv=None):
parser.add_argument('--output', type=str, help='GCS path to use for output.')
parser.add_argument('--train', type=str, help='GCS path of the training csv file.')
parser.add_argument('--schema', type=str, help='GCS path of the json schema file.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
args = parser.parse_args()

code_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -55,8 +50,8 @@ def main(argv=None):
api, args.project, args.region, args.cluster, dest_files[0], spark_args)
print('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)
with open('/output.txt', 'w') as f:
f.write(args.output)

print('Job completed.')
finally:
Expand Down
2 changes: 1 addition & 1 deletion components/deprecated/dataproc/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RUN easy_install pip

RUN pip install google-api-python-client==1.6.2

RUN pip install tensorflow==1.6.0 pathlib2
RUN pip install tensorflow==1.6.0

RUN wget -nv https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && \
unzip -qq google-cloud-sdk.zip -d tools && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import argparse
import os
from pathlib import Path

from common import _utils

Expand All @@ -33,10 +32,6 @@ def main(argv=None):
parser.add_argument('--region', type=str, help='Which zone for GCE VMs.')
parser.add_argument('--name', type=str, help='The name of the cluster to create.')
parser.add_argument('--staging', type=str, help='GCS path to use for staging.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
args = parser.parse_args()

code_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -49,8 +44,8 @@ def main(argv=None):
create_response = _utils.create_cluster(api, args.project, args.region, args.name, dest_files[0])
print('Cluster creation request submitted. Waiting for completion...')
_utils.wait_for_operation(api, create_response['name'])
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)
with open('/output.txt', 'w') as f:
f.write(args.name)
print('Cluster created.')
finally:
_utils.remove_resources_from_gcs(dest_files)
Expand Down
22 changes: 6 additions & 16 deletions components/deprecated/dataproc/predict/src/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import argparse
import json
import os
from pathlib import Path

from common import _utils
import logging
Expand All @@ -51,15 +50,6 @@ def main(argv=None):
parser.add_argument('--predict', type=str, help='GCS path of prediction libsvm file.')
parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.')
parser.add_argument('--target', type=str, help='Target column name.')
parser.add_argument('--prediction-results-uri-pattern-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing prediction results URI pattern.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)
Expand All @@ -71,9 +61,9 @@ def main(argv=None):
'ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor', spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
prediction_results_uri_pattern = os.path.join(args.output, 'part-*.csv')
Path(args.prediction_results_uri_pattern_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.prediction_results_uri_pattern_output_path).write_text(prediction_results_uri_pattern)
prediction_results = os.path.join(args.output, 'part-*.csv')
with open('/output.txt', 'w') as f:
f.write(prediction_results)

with file_io.FileIO(os.path.join(args.output, 'schema.json'), 'r') as f:
schema = json.load(f)
Expand All @@ -84,11 +74,11 @@ def main(argv=None):
'storage': 'gcs',
'format': 'csv',
'header': [x['name'] for x in schema],
'source': prediction_results_uri_pattern
'source': prediction_results
}]
}
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
logging.info('Job completed.')


Expand Down
10 changes: 2 additions & 8 deletions components/deprecated/dataproc/train/src/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import argparse
import logging
from pathlib import Path

from common import _utils

Expand All @@ -52,11 +51,6 @@ def main(argv=None):
parser.add_argument('--eval', type=str, help='GCS path of the eval libsvm file pattern.')
parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.')
parser.add_argument('--target', type=str, help='Target column name.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)
Expand All @@ -69,8 +63,8 @@ def main(argv=None):
'ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer', spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)
with open('/output.txt', 'w') as f:
f.write(args.output)

logging.info('Job completed.')

Expand Down
5 changes: 3 additions & 2 deletions components/kubeflow/dnntrainer/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ implementation:
--target, {inputValue: Target},
--preprocessing-module, {inputValue: Preprocessing module},
--job-dir, {inputValue: Training output dir},
--exported-model-dir-uri-output-path, {outputPath: Training output dir},
--ui-metadata-output-path, {outputPath: MLPipeline UI metadata},
]
fileOutputs:
Training output dir: /output.txt
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
17 changes: 4 additions & 13 deletions components/kubeflow/dnntrainer/src/trainer/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import argparse
import json
import os
from pathlib import Path
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
Expand Down Expand Up @@ -81,14 +80,6 @@ def parse_arguments():
required=False,
help=('GCS path to a python file defining '
'"preprocess" and "get_feature_columns" functions.'))
parser.add_argument('--exported-model-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing exported model directory URI.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()
args.hidden_layer_size = [int(x.strip()) for x in args.hidden_layer_size.split(',')]
Expand Down Expand Up @@ -350,11 +341,11 @@ def main():
'source': args.job_dir,
}]
}
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)

Path(args.exported_model_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.exported_model_dir_uri_output_path).write_text(args.job_dir)
with open('/output.txt', 'w') as f:
f.write(args.job_dir)

if __name__ == '__main__':
main()
5 changes: 3 additions & 2 deletions components/local/confusion_matrix/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ implementation:
--predictions, {inputValue: Predictions},
--target_lambda, {inputValue: Target lambda},
--output, {inputValue: Output dir},
--ui-metadata-output-path, {outputPath: MLPipeline UI metadata},
--metrics-output-path, {outputPath: MLPipeline Metrics},
]
fileOutputs:
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
MLPipeline Metrics: /mlpipeline-metrics.json
18 changes: 4 additions & 14 deletions components/local/confusion_matrix/src/confusion_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import os
import urlparse
import pandas as pd
from pathlib import Path
from sklearn.metrics import confusion_matrix, accuracy_score
from tensorflow.python.lib.io import file_io

Expand All @@ -40,15 +39,6 @@ def main(argv=None):
help='a lambda function as a string to compute target.' +
'For example, "lambda x: x[\'a\'] + x[\'b\']"' +
'If not set, the input must include a "target" column.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')
parser.add_argument('--metrics-output-path',
type=str,
default='/mlpipeline-metrics.json',
help='Local output path for the file containing metrics JSON structure.')

args = parser.parse_args()

storage_service_scheme = urlparse.urlparse(args.output).scheme
Expand Down Expand Up @@ -95,8 +85,8 @@ def main(argv=None):
'labels': list(map(str, vocab)),
}]
}
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))
with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)

accuracy = accuracy_score(df['target'], df['predicted'])
metrics = {
Expand All @@ -106,8 +96,8 @@ def main(argv=None):
'format': "PERCENTAGE",
}]
}
Path(args.metrics_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.metrics_output_path).write_text(json.dumps(metrics))
with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
json.dump(metrics, f)

if __name__== "__main__":
main()
Loading