From c3d6d6c1de6243851fc8ea4db0751f3120ea8e6a Mon Sep 17 00:00:00 2001 From: Zane Durante Date: Mon, 29 Jul 2019 15:34:06 -0700 Subject: [PATCH 1/6] Added support for mulitple outputs --- sdk/python/kfp/compiler/_component_builder.py | 50 +++++++++++++++---- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/sdk/python/kfp/compiler/_component_builder.py b/sdk/python/kfp/compiler/_component_builder.py index 7bd9866d89d..c2b7bd197f0 100644 --- a/sdk/python/kfp/compiler/_component_builder.py +++ b/sdk/python/kfp/compiler/_component_builder.py @@ -169,7 +169,6 @@ def end(self): line_sep = '\n' return line_sep.join(self._code) + line_sep -#TODO: currently it supports single output, future support for multiple return values def _func_to_entrypoint(component_func, python_version='python3'): ''' args: @@ -182,15 +181,27 @@ def _func_to_entrypoint(component_func, python_version='python3'): annotations = fullargspec[6] input_args = fullargspec[0] inputs = {} + output = None + if 'return' in annotations.keys(): + output = annotations['return'] + output_is_named_tuple = hasattr(output, '_fields') + for key, value in annotations.items(): if key != 'return': inputs[key] = value if len(input_args) != len(inputs): raise Exception('Some input arguments do not contain annotations.') - if 'return' in annotations and annotations['return'] not in [int, float, str, bool]: + if 'return' in annotations and annotations['return'] not in [int, + float, str, bool] and not output_is_named_tuple: raise Exception('Output type not supported and supported types are [int, float, str, bool]') + if output_is_named_tuple: + types = output._field_types + for field in output._fields: #Make sure all elements are supported + if types[field] not in [int, float, str, bool]: + raise Exception('Output type not supported and supported types are [int, float, str, bool]') + # inputs is a dictionary with key of argument name and value of type class - # output is a type class, e.g. str and int. + # output is a type class, e.g. int, str, bool, float, NamedTuple. # Follow the same indentation with the component source codes. component_src = inspect.getsource(component_func) @@ -204,12 +215,18 @@ def _func_to_entrypoint(component_func, python_version='python3'): func_signature = 'def ' + new_func_name + '(' for input_arg in input_args: func_signature += input_arg + ',' - func_signature += '_output_file):' + if output_is_named_tuple: + func_signature += '_output_files' + else: + func_signature += '_output_file' + func_signature += '):' codegen.writeline(func_signature) # Call user function codegen.indent() call_component_func = 'output = ' + component_func.__name__ + '(' + if output_is_named_tuple: + call_component_func = call_component_func.replace('output', 'outputs') for input_arg in input_args: call_component_func += inputs[input_arg].__name__ + '(' + input_arg + '),' call_component_func = call_component_func.rstrip(',') @@ -218,6 +235,9 @@ def _func_to_entrypoint(component_func, python_version='python3'): # Serialize output codegen.writeline('import os') + if output_is_named_tuple: + codegen.writeline('for _output_file, output in zip(_output_files, outputs):') + codegen.indent() codegen.writeline('os.makedirs(os.path.dirname(_output_file))') codegen.writeline('with open(_output_file, "w") as data:') codegen.indent() @@ -230,7 +250,10 @@ def _func_to_entrypoint(component_func, python_version='python3'): codegen.writeline('parser = argparse.ArgumentParser(description="Parsing arguments")') for input_arg in input_args: codegen.writeline('parser.add_argument("' + input_arg + '", type=' + inputs[input_arg].__name__ + ')') - codegen.writeline('parser.add_argument("_output_file", type=str)') + if output_is_named_tuple: + codegen.writeline('parser.add_argument("_output_files", type=str, nargs=' + str(len(annotations['return']._fields)) + ')') + else: + codegen.writeline('parser.add_argument("_output_file", type=str)') codegen.writeline('args = vars(parser.parse_args())') codegen.writeline('') codegen.writeline('if __name__ == "__main__":') @@ -247,6 +270,8 @@ def _func_to_entrypoint(component_func, python_version='python3'): if python_version == 'python2': src_lines[start_line_num] = 'def ' + component_func.__name__ + '(' + ', '.join((inspect.getfullargspec(component_func).args)) + '):' dedecorated_component_src = '\n'.join(src_lines[start_line_num:]) + if output_is_named_tuple: + dedecorated_component_src = 'from typing import NamedTuple\n' + dedecorated_component_src complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end() return complete_component_code @@ -422,17 +447,24 @@ def _generate_pythonop(component_func, target_image, target_component_file=None) #TODO: Humanize the input/output names input_names = inspect.getfullargspec(component_func)[0] - output_name = 'output' + return_ann = inspect.signature(component_func).return_annotation + output_is_named_tuple = hasattr(return_ann, '_fields') + + output_names = ['output'] + if output_is_named_tuple: + output_names = return_ann._fields + component_spec = ComponentSpec( name=component_name, description=component_description, - inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Chnage type to actual type - outputs=[OutputSpec(name=output_name)], + inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Change type to actual type + outputs=[OutputSpec(name=output_name, type='str') for output_name in output_names], implementation=ContainerImplementation( container=ContainerSpec( image=target_image, #command=['python3', program_file], #TODO: Include the command line - args=[InputValuePlaceholder(input_name) for input_name in input_names] + [OutputPathPlaceholder(output_name)], + args=[InputValuePlaceholder(input_name) for input_name in input_names] + + [OutputPathPlaceholder(output_name) for output_name in output_names], ) ) ) From d2fb102c43d646f254907542739f1955ca3728e0 Mon Sep 17 00:00:00 2001 From: Zane Durante Date: Mon, 29 Jul 2019 15:34:43 -0700 Subject: [PATCH 2/6] Added test for multiple output --- .../tests/compiler/component_builder_test.py | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/compiler/component_builder_test.py b/sdk/python/tests/compiler/component_builder_test.py index c5b31348c6a..d4c8985405f 100644 --- a/sdk/python/tests/compiler/component_builder_test.py +++ b/sdk/python/tests/compiler/component_builder_test.py @@ -25,6 +25,7 @@ from pathlib import Path import inspect from collections import OrderedDict +from typing import NamedTuple GCS_BASE = 'gs://kfp-testing/' @@ -131,6 +132,12 @@ def sample_component_func_two(a: str, b: int) -> float: def sample_component_func_three() -> float: return 1.0 +def sample_component_func_four() -> NamedTuple( + 'output', [('a', float), ('b', str)]): + from collections import namedtuple + output = namedtuple('output', ['a', 'b']) + return output(1.0, 'test') + class TestGenerator(unittest.TestCase): def test_generate_dockerfile(self): """ Test generate dockerfile """ @@ -201,7 +208,7 @@ def test_generate_requirement(self): self.assertEqual(target_payload, golden_payload) os.remove(temp_file) - def test_generate_entrypoint(self): + def test_func_to_entrypoint(self): """ Test entrypoint generation """ # prepare @@ -284,7 +291,34 @@ def wrapper_sample_component_func_three(_output_file): ''' self.assertEqual(golden, generated_codes) - def test_generate_entrypoint_python2(self): + generated_codes = _func_to_entrypoint(component_func=sample_component_func_four) + golden = '''\ +from typing import NamedTuple +def sample_component_func_four() -> NamedTuple( + 'output', [('a', float), ('b', str)]): + from collections import namedtuple + output = namedtuple('output', ['a', 'b']) + return output(1.0, 'test') + +def wrapper_sample_component_func_four(_output_files): + outputs = sample_component_func_four() + import os + for _output_file, output in zip(_output_files, outputs): + os.makedirs(os.path.dirname(_output_file)) + with open(_output_file, "w") as data: + data.write(str(output)) + +import argparse +parser = argparse.ArgumentParser(description="Parsing arguments") +parser.add_argument("_output_files", type=str, nargs=2) +args = vars(parser.parse_args()) + +if __name__ == "__main__": + wrapper_sample_component_func_four(**args) +''' + self.assertEqual(golden, generated_codes) + + def test_func_to_entrypoint_python2(self): """ Test entrypoint generation for python2""" # prepare From a1d5e43ee046e726fb34ed257545d0f5c37d77f7 Mon Sep 17 00:00:00 2001 From: Zane Durante Date: Mon, 29 Jul 2019 15:35:12 -0700 Subject: [PATCH 3/6] Adding sample for multiple outputs --- .../notebooks/Multiple outputs - basics.ipynb | 206 ++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 samples/notebooks/Multiple outputs - basics.ipynb diff --git a/samples/notebooks/Multiple outputs - basics.ipynb b/samples/notebooks/Multiple outputs - basics.ipynb new file mode 100644 index 00000000000..4bee406f714 --- /dev/null +++ b/samples/notebooks/Multiple outputs - basics.ipynb @@ -0,0 +1,206 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Multiple outputs example\n", + "This notebook is a simple example of how to make a component with multiple outputs using the Pipelines SDK.\n", + "\n", + "## Before running notebook:\n", + "\n", + "### Setup notebook server\n", + "This pipeline requires you to [setup a notebook server](https://www.kubeflow.org/docs/notebooks/setup/) in the Kubeflow UI. After you are setup, *upload the notebook in the Kubeflow UI* and then run it in the notebook server.\n", + "\n", + "### Create a GCS bucket\n", + "This pipeline requires a GCS bucket. If you haven't already, [create a GCS bucket](https://cloud.google.com/storage/docs/creating-buckets) to run the notebook. Make sure to create the storage bucket in the same project that you are running Kubeflow on to have the proper permissions by default. You can also create a GCS bucket by running `gsutil mb -p gs://`.\n", + "\n", + "### Upload the notebook in the Kubeflow UI\n", + "In order to run this pipeline, make sure to upload the notebook to your notebook server in the Kubeflow UI. You can clone this repo in the Jupyter notebook server by connecting to the notebook server and then selecting New > Terminal. In the terminal type `git clone https://github.com/kubeflow/pipelines.git`.\n", + "\n", + "### Install Kubeflow pipelines\n", + "Install the `kfp` package if you haven't already." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "!pip install kfp --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup project info and imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "GCS_BUCKET = 'gs://[BUCKET-NAME]' # GCS bucket name\n", + "PROJECT_NAME = '[PROJECT-NAME]' # GCP project name\n", + "\n", + "STAGING_GCS_PATH = GCS_BUCKET + '/multiple-output-sample'\n", + "TARGET_IMAGE = 'gcr.io/%s/multi-output:latest' % PROJECT_NAME\n", + "\n", + "BASE_IMAGE = 'tensorflow/tensorflow:1.11.0-py3'\n", + "EXPERIMENT_NAME = 'Multiple Outputs Sample'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp \n", + "import kfp.dsl as dsl\n", + "from kfp import compiler\n", + "from typing import NamedTuple" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create component\n", + "In order to create a component with multiple outputs, use `NamedTuple` with the same syntax as below." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Returns a*b and a+b\n", + "@dsl.python_component(\n", + " name='product_sum',\n", + " description='Calculates the product and the sum',\n", + " base_image=BASE_IMAGE\n", + ")\n", + "def product_sum(a: float, b: float) -> NamedTuple(\n", + " 'output', [('product', float), ('sum', float)]):\n", + " '''Returns the product and sum of two numbers'''\n", + " from collections import namedtuple\n", + " \n", + " product_sum_output = namedtuple('output', ['product', 'sum'])\n", + " return product_sum_output(a*b, a+b)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "product_sum_op = compiler.build_python_component(\n", + " component_func=product_sum,\n", + " staging_gcs_path=STAGING_GCS_PATH,\n", + " base_image=BASE_IMAGE,\n", + " target_image=TARGET_IMAGE)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create and run pipeline\n", + "### Create pipeline\n", + "The pipeline parameters are specified in the `pipeline` function signature." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "@dsl.pipeline(\n", + " name='Multiple Outputs Pipeline',\n", + " description='Sample pipeline to showcase multiple outputs'\n", + ")\n", + "def pipeline(a=2.0, b=2.5, c=3.0):\n", + " prod_sum_task = product_sum_op(a, b)\n", + " prod_sum_task2 = product_sum_op(b, c)\n", + " prod_sum_task3 = product_sum_op(prod_sum_task.outputs['product'],\n", + " prod_sum_task2.outputs['sum'])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_filename = 'multiple-outputs.pipelines.zip'\n", + "compiler.Compiler().compile(pipeline, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "arguments = {\n", + " 'a': 2.0,\n", + " 'b': 2.5,\n", + " 'c': 3.0,\n", + "}\n", + "run_name = 'multiple output run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename,\n", + " params=arguments)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 5b890733f9a4389a2091ec3fb415144aebc0a735 Mon Sep 17 00:00:00 2001 From: Zane Durante Date: Wed, 31 Jul 2019 11:29:26 -0700 Subject: [PATCH 4/6] func_signature now shorter form --- sdk/python/kfp/compiler/_component_builder.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdk/python/kfp/compiler/_component_builder.py b/sdk/python/kfp/compiler/_component_builder.py index c2b7bd197f0..fd3f4a676e5 100644 --- a/sdk/python/kfp/compiler/_component_builder.py +++ b/sdk/python/kfp/compiler/_component_builder.py @@ -215,10 +215,7 @@ def _func_to_entrypoint(component_func, python_version='python3'): func_signature = 'def ' + new_func_name + '(' for input_arg in input_args: func_signature += input_arg + ',' - if output_is_named_tuple: - func_signature += '_output_files' - else: - func_signature += '_output_file' + func_signature = func_signature + '_output_files' if output_is_named_tuple else '_output_file' func_signature += '):' codegen.writeline(func_signature) From 48ee48fe9b9dfa7dbac0542bd65ad73e32b0c209 Mon Sep 17 00:00:00 2001 From: Zane Durante Date: Wed, 31 Jul 2019 11:47:18 -0700 Subject: [PATCH 5/6] Added parameters tag --- samples/notebooks/Multiple outputs - basics.ipynb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/samples/notebooks/Multiple outputs - basics.ipynb b/samples/notebooks/Multiple outputs - basics.ipynb index 4bee406f714..d12137c4022 100644 --- a/samples/notebooks/Multiple outputs - basics.ipynb +++ b/samples/notebooks/Multiple outputs - basics.ipynb @@ -43,7 +43,11 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "tags": [ + "parameters" + ] + }, "outputs": [], "source": [ "GCS_BUCKET = 'gs://[BUCKET-NAME]' # GCS bucket name\n", From c38238a279c9b0ed51e669a5eab53262aa4f77b0 Mon Sep 17 00:00:00 2001 From: Zane Durante Date: Wed, 31 Jul 2019 12:34:09 -0700 Subject: [PATCH 6/6] Fixed func_signature mistake --- sdk/python/kfp/compiler/_component_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/kfp/compiler/_component_builder.py b/sdk/python/kfp/compiler/_component_builder.py index fd3f4a676e5..f4065dda7ea 100644 --- a/sdk/python/kfp/compiler/_component_builder.py +++ b/sdk/python/kfp/compiler/_component_builder.py @@ -215,7 +215,7 @@ def _func_to_entrypoint(component_func, python_version='python3'): func_signature = 'def ' + new_func_name + '(' for input_arg in input_args: func_signature += input_arg + ',' - func_signature = func_signature + '_output_files' if output_is_named_tuple else '_output_file' + func_signature = func_signature + '_output_files' if output_is_named_tuple else func_signature + '_output_file' func_signature += '):' codegen.writeline(func_signature)