Skip to content

Commit

Permalink
Components - De-hardcoded local output paths.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark-kun committed Dec 21, 2018
1 parent 379d4d7 commit 8434bd7
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 56 deletions.
22 changes: 16 additions & 6 deletions components/dataflow/predict/src/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import logging
import os
from pathlib import Path
from tensorflow.python.lib.io import file_io


Expand Down Expand Up @@ -60,6 +61,14 @@ def parse_arguments():
type=int,
default=32,
help='Batch size used in prediction.')
parser.add_argument('--prediction-results-uri-pattern-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing prediction results URI pattern.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()
return args
Expand Down Expand Up @@ -236,9 +245,10 @@ def main():

run_predict(args.output, args.data, schema, args.target, model_export_dir,
args.project, args.mode, args.batchsize)
prediction_results = os.path.join(args.output, 'prediction_results-*')
with open('/output.txt', 'w') as f:
f.write(prediction_results)
prediction_results_uri_pattern = os.path.join(args.output, 'prediction_results-*')
Path(args.prediction_results_uri_pattern_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.prediction_results_uri_pattern_output_path).write_text(prediction_results_uri_pattern)


with file_io.FileIO(os.path.join(args.output, 'schema.json'), 'r') as f:
schema = json.load(f)
Expand All @@ -249,11 +259,11 @@ def main():
'storage': 'gcs',
'format': 'csv',
'header': [x['name'] for x in schema],
'source': prediction_results
'source': prediction_results_uri_pattern
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))


if __name__== "__main__":
Expand Down
41 changes: 27 additions & 14 deletions components/dataflow/tfma/src/model_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import datetime
import json
import os
from pathlib import Path

import apache_beam as beam
from ipywidgets.embed import embed_data
Expand Down Expand Up @@ -117,6 +118,14 @@ def parse_arguments():
action='append',
required=True,
help='one or more columns on which to slice for analysis.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing output dir URI.')

return parser.parse_args()

Expand Down Expand Up @@ -207,7 +216,19 @@ def generate_static_html_output(output_dir, slicing_columns):
views_html += _SINGLE_WIDGET_TEMPLATE.format(idx, view)
rendered_template = _STATIC_HTML_TEMPLATE.format(
manager_state=manager_state, widget_views=views_html)
static_html_path = os.path.join(output_dir, _OUTPUT_HTML_FILE)
return rendered_template

def main():
tf.logging.set_verbosity(tf.logging.INFO)
args = parse_arguments()
schema = json.loads(file_io.read_file_to_string(args.schema))
eval_model_parent_dir = os.path.join(args.model, 'tfma_eval_model_dir')
model_export_dir = os.path.join(eval_model_parent_dir, file_io.list_directory(eval_model_parent_dir)[0])
run_analysis(args.output, model_export_dir, args.eval, schema,
args.project, args.mode, args.slice_columns)
rendered_template = generate_static_html_output(args.output, args.slice_columns)

static_html_path = os.path.join(args.output, _OUTPUT_HTML_FILE)
file_io.write_string_to_file(static_html_path, rendered_template)

metadata = {
Expand All @@ -217,21 +238,13 @@ def generate_static_html_output(output_dir, slicing_columns):
'source': static_html_path,
}]
}
with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
ui_metadata_text = json.dumps(metadata)

Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(ui_metadata_text)

def main():
tf.logging.set_verbosity(tf.logging.INFO)
args = parse_arguments()
schema = json.loads(file_io.read_file_to_string(args.schema))
eval_model_parent_dir = os.path.join(args.model, 'tfma_eval_model_dir')
model_export_dir = os.path.join(eval_model_parent_dir, file_io.list_directory(eval_model_parent_dir)[0])
run_analysis(args.output, model_export_dir, args.eval, schema,
args.project, args.mode, args.slice_columns)
generate_static_html_output(args.output, args.slice_columns)
with open('/output.txt', 'w') as f:
f.write(args.output)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)

if __name__== "__main__":
main()
9 changes: 7 additions & 2 deletions components/dataflow/tft/src/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import logging
import os
from pathlib import Path
import tensorflow as tf
import tensorflow_transform as tft

Expand Down Expand Up @@ -80,6 +81,10 @@ def parse_arguments():
required=False,
help=('GCS path to a python file defining '
'"preprocess" and "get_feature_columns" functions.'))
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')

args = parser.parse_args()
return args
Expand Down Expand Up @@ -296,8 +301,8 @@ def wrapped_preprocessing_fn(inputs):

run_transform(args.output, schema, args.train, args.eval,
args.project, args.mode, preprocessing_fn=preprocessing_fn)
with open('/output.txt', 'w') as f:
f.write(args.output)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)

if __name__== "__main__":
main()
9 changes: 7 additions & 2 deletions components/dataproc/analyze/src/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import argparse
import os
from pathlib import Path

from common import _utils

Expand All @@ -37,6 +38,10 @@ def main(argv=None):
parser.add_argument('--output', type=str, help='GCS path to use for output.')
parser.add_argument('--train', type=str, help='GCS path of the training csv file.')
parser.add_argument('--schema', type=str, help='GCS path of the json schema file.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
args = parser.parse_args()

code_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -50,8 +55,8 @@ def main(argv=None):
api, args.project, args.region, args.cluster, dest_files[0], spark_args)
print('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
with open('/output.txt', 'w') as f:
f.write(args.output)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)

print('Job completed.')
finally:
Expand Down
9 changes: 7 additions & 2 deletions components/dataproc/create_cluster/src/create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import argparse
import os
from pathlib import Path

from common import _utils

Expand All @@ -32,6 +33,10 @@ def main(argv=None):
parser.add_argument('--region', type=str, help='Which zone for GCE VMs.')
parser.add_argument('--name', type=str, help='The name of the cluster to create.')
parser.add_argument('--staging', type=str, help='GCS path to use for staging.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
args = parser.parse_args()

code_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -44,8 +49,8 @@ def main(argv=None):
create_response = _utils.create_cluster(api, args.project, args.region, args.name, dest_files[0])
print('Cluster creation request submitted. Waiting for completion...')
_utils.wait_for_operation(api, create_response['name'])
with open('/output.txt', 'w') as f:
f.write(args.name)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)
print('Cluster created.')
finally:
_utils.remove_resources_from_gcs(dest_files)
Expand Down
22 changes: 16 additions & 6 deletions components/dataproc/predict/src/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import argparse
import json
import os
from pathlib import Path

from common import _utils
import logging
Expand All @@ -50,6 +51,15 @@ def main(argv=None):
parser.add_argument('--predict', type=str, help='GCS path of prediction libsvm file.')
parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.')
parser.add_argument('--target', type=str, help='Target column name.')
parser.add_argument('--prediction-results-uri-pattern-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing prediction results URI pattern.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)
Expand All @@ -61,9 +71,9 @@ def main(argv=None):
'ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor', spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
prediction_results = os.path.join(args.output, 'part-*.csv')
with open('/output.txt', 'w') as f:
f.write(prediction_results)
prediction_results_uri_pattern = os.path.join(args.output, 'part-*.csv')
Path(args.prediction_results_uri_pattern_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.prediction_results_uri_pattern_output_path).write_text(prediction_results_uri_pattern)

with file_io.FileIO(os.path.join(args.output, 'schema.json'), 'r') as f:
schema = json.load(f)
Expand All @@ -74,11 +84,11 @@ def main(argv=None):
'storage': 'gcs',
'format': 'csv',
'header': [x['name'] for x in schema],
'source': prediction_results
'source': prediction_results_uri_pattern
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))
logging.info('Job completed.')


Expand Down
10 changes: 8 additions & 2 deletions components/dataproc/train/src/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import argparse
import logging
from pathlib import Path

from common import _utils

Expand All @@ -51,6 +52,11 @@ def main(argv=None):
parser.add_argument('--eval', type=str, help='GCS path of the eval libsvm file pattern.')
parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.')
parser.add_argument('--target', type=str, help='Target column name.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)
Expand All @@ -63,8 +69,8 @@ def main(argv=None):
'ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer', spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
with open('/output.txt', 'w') as f:
f.write(args.output)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)

logging.info('Job completed.')

Expand Down
17 changes: 13 additions & 4 deletions components/kubeflow/dnntrainer/src/trainer/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import argparse
import json
import os
from pathlib import Path
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
Expand Down Expand Up @@ -80,6 +81,14 @@ def parse_arguments():
required=False,
help=('GCS path to a python file defining '
'"preprocess" and "get_feature_columns" functions.'))
parser.add_argument('--exported-model-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing exported model directory URI.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()
args.hidden_layer_size = [int(x.strip()) for x in args.hidden_layer_size.split(',')]
Expand Down Expand Up @@ -341,11 +350,11 @@ def main():
'source': args.job_dir,
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))

with open('/output.txt', 'w') as f:
f.write(args.job_dir)
Path(args.exported_model_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.exported_model_dir_uri_output_path).write_text(args.job_dir)

if __name__ == '__main__':
main()
17 changes: 13 additions & 4 deletions components/kubeflow/launcher/src/launch_tf_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import json
import os
import logging
from pathlib import Path
import requests
import subprocess
import six
Expand Down Expand Up @@ -100,7 +101,15 @@ def main(argv=None):
default=10,
help='Time in minutes to wait for the TFJob to complete')
parser.add_argument('--output-dir', type=str)
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
parser.add_argument('--ui-metadata-type', type=str, default='tensorboard')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')
import sys
all_args = sys.argv[1:]
separator_idx = all_args.index('--')
Expand Down Expand Up @@ -153,8 +162,8 @@ def main(argv=None):
'source': args.output_dir,
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))

wait_response = tf_job_client.wait_for_job(
api_client, tfjob_ns, job_name, kf_version,
Expand Down Expand Up @@ -190,8 +199,8 @@ def main(argv=None):
logging.info('Training success.')

tf_job_client.delete_tf_job(api_client, tfjob_ns, job_name, version=kf_version)
with open('/output.txt', 'w') as f:
f.write(args.output_dir)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output_dir)

if __name__== "__main__":
main()
Loading

0 comments on commit 8434bd7

Please sign in to comment.