Skip to content

Commit

Permalink
refactor(components): De-hardcoded local output paths. (kubeflow#580)
Browse files Browse the repository at this point in the history
* Components - De-hardcoded local output paths.

* pip install pathlib2

* Added component.yaml changes

* The Dataflow components have been deleted
  • Loading branch information
Ark-kun authored and Jeffwan committed Dec 9, 2020
1 parent f47a8fa commit af63227
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 162 deletions.
33 changes: 0 additions & 33 deletions components/deprecated/dataflow/predict/component.yaml

This file was deleted.

34 changes: 0 additions & 34 deletions components/deprecated/dataflow/tfdv/component.yaml

This file was deleted.

34 changes: 0 additions & 34 deletions components/deprecated/dataflow/tfma/component.yaml

This file was deleted.

27 changes: 0 additions & 27 deletions components/deprecated/dataflow/tft/component.yaml

This file was deleted.

9 changes: 7 additions & 2 deletions components/deprecated/dataproc/analyze/src/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import argparse
import os
from pathlib import Path

from common import _utils

Expand All @@ -37,6 +38,10 @@ def main(argv=None):
parser.add_argument('--output', type=str, help='GCS path to use for output.')
parser.add_argument('--train', type=str, help='GCS path of the training csv file.')
parser.add_argument('--schema', type=str, help='GCS path of the json schema file.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
args = parser.parse_args()

code_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -50,8 +55,8 @@ def main(argv=None):
api, args.project, args.region, args.cluster, dest_files[0], spark_args)
print('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
with open('/output.txt', 'w') as f:
f.write(args.output)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)

print('Job completed.')
finally:
Expand Down
2 changes: 1 addition & 1 deletion components/deprecated/dataproc/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RUN easy_install pip

RUN pip install google-api-python-client==1.6.2

RUN pip install tensorflow==1.6.0
RUN pip install tensorflow==1.6.0 pathlib2

RUN wget -nv https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && \
unzip -qq google-cloud-sdk.zip -d tools && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import argparse
import os
from pathlib import Path

from common import _utils

Expand All @@ -32,6 +33,10 @@ def main(argv=None):
parser.add_argument('--region', type=str, help='Which zone for GCE VMs.')
parser.add_argument('--name', type=str, help='The name of the cluster to create.')
parser.add_argument('--staging', type=str, help='GCS path to use for staging.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
args = parser.parse_args()

code_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -44,8 +49,8 @@ def main(argv=None):
create_response = _utils.create_cluster(api, args.project, args.region, args.name, dest_files[0])
print('Cluster creation request submitted. Waiting for completion...')
_utils.wait_for_operation(api, create_response['name'])
with open('/output.txt', 'w') as f:
f.write(args.name)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)
print('Cluster created.')
finally:
_utils.remove_resources_from_gcs(dest_files)
Expand Down
22 changes: 16 additions & 6 deletions components/deprecated/dataproc/predict/src/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import argparse
import json
import os
from pathlib import Path

from common import _utils
import logging
Expand All @@ -50,6 +51,15 @@ def main(argv=None):
parser.add_argument('--predict', type=str, help='GCS path of prediction libsvm file.')
parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.')
parser.add_argument('--target', type=str, help='Target column name.')
parser.add_argument('--prediction-results-uri-pattern-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing prediction results URI pattern.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)
Expand All @@ -61,9 +71,9 @@ def main(argv=None):
'ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor', spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
prediction_results = os.path.join(args.output, 'part-*.csv')
with open('/output.txt', 'w') as f:
f.write(prediction_results)
prediction_results_uri_pattern = os.path.join(args.output, 'part-*.csv')
Path(args.prediction_results_uri_pattern_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.prediction_results_uri_pattern_output_path).write_text(prediction_results_uri_pattern)

with file_io.FileIO(os.path.join(args.output, 'schema.json'), 'r') as f:
schema = json.load(f)
Expand All @@ -74,11 +84,11 @@ def main(argv=None):
'storage': 'gcs',
'format': 'csv',
'header': [x['name'] for x in schema],
'source': prediction_results
'source': prediction_results_uri_pattern
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))
logging.info('Job completed.')


Expand Down
10 changes: 8 additions & 2 deletions components/deprecated/dataproc/train/src/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import argparse
import logging
from pathlib import Path

from common import _utils

Expand All @@ -51,6 +52,11 @@ def main(argv=None):
parser.add_argument('--eval', type=str, help='GCS path of the eval libsvm file pattern.')
parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.')
parser.add_argument('--target', type=str, help='Target column name.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)
Expand All @@ -63,8 +69,8 @@ def main(argv=None):
'ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer', spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
with open('/output.txt', 'w') as f:
f.write(args.output)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)

logging.info('Job completed.')

Expand Down
5 changes: 2 additions & 3 deletions components/kubeflow/dnntrainer/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ implementation:
--target, {inputValue: Target},
--preprocessing-module, {inputValue: Preprocessing module},
--job-dir, {inputValue: Training output dir},
--exported-model-dir-uri-output-path, {outputPath: Training output dir},
--ui-metadata-output-path, {outputPath: MLPipeline UI metadata},
]
fileOutputs:
Training output dir: /output.txt
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
17 changes: 13 additions & 4 deletions components/kubeflow/dnntrainer/src/trainer/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import argparse
import json
import os
from pathlib import Path
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
Expand Down Expand Up @@ -80,6 +81,14 @@ def parse_arguments():
required=False,
help=('GCS path to a python file defining '
'"preprocess" and "get_feature_columns" functions.'))
parser.add_argument('--exported-model-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing exported model directory URI.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()
args.hidden_layer_size = [int(x.strip()) for x in args.hidden_layer_size.split(',')]
Expand Down Expand Up @@ -341,11 +350,11 @@ def main():
'source': args.job_dir,
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))

with open('/output.txt', 'w') as f:
f.write(args.job_dir)
Path(args.exported_model_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.exported_model_dir_uri_output_path).write_text(args.job_dir)

if __name__ == '__main__':
main()
5 changes: 2 additions & 3 deletions components/local/confusion_matrix/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ implementation:
--predictions, {inputValue: Predictions},
--target_lambda, {inputValue: Target lambda},
--output, {inputValue: Output dir},
--ui-metadata-output-path, {outputPath: MLPipeline UI metadata},
--metrics-output-path, {outputPath: MLPipeline Metrics},
]
fileOutputs:
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
MLPipeline Metrics: /mlpipeline-metrics.json
18 changes: 14 additions & 4 deletions components/local/confusion_matrix/src/confusion_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import os
import urlparse
import pandas as pd
from pathlib import Path
from sklearn.metrics import confusion_matrix, accuracy_score
from tensorflow.python.lib.io import file_io

Expand All @@ -39,6 +40,15 @@ def main(argv=None):
help='a lambda function as a string to compute target.' +
'For example, "lambda x: x[\'a\'] + x[\'b\']"' +
'If not set, the input must include a "target" column.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')
parser.add_argument('--metrics-output-path',
type=str,
default='/mlpipeline-metrics.json',
help='Local output path for the file containing metrics JSON structure.')

args = parser.parse_args()

storage_service_scheme = urlparse.urlparse(args.output).scheme
Expand Down Expand Up @@ -85,8 +95,8 @@ def main(argv=None):
'labels': list(map(str, vocab)),
}]
}
with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))

accuracy = accuracy_score(df['target'], df['predicted'])
metrics = {
Expand All @@ -96,8 +106,8 @@ def main(argv=None):
'format': "PERCENTAGE",
}]
}
with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
json.dump(metrics, f)
Path(args.metrics_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.metrics_output_path).write_text(json.dumps(metrics))

if __name__== "__main__":
main()
Loading

0 comments on commit af63227

Please sign in to comment.