Skip to content

Commit

Permalink
Adding multiple outputs into sdk with sample (#1667)
Browse files Browse the repository at this point in the history
* Added support for mulitple outputs

* Added test for multiple output

* Adding sample for multiple outputs

* func_signature now shorter form

* Added parameters tag

* Fixed func_signature mistake
  • Loading branch information
zanedurante authored and k8s-ci-robot committed Aug 1, 2019
1 parent 8866747 commit f18b7fd
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 11 deletions.
210 changes: 210 additions & 0 deletions samples/notebooks/Multiple outputs - basics.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Multiple outputs example\n",
"This notebook is a simple example of how to make a component with multiple outputs using the Pipelines SDK.\n",
"\n",
"## Before running notebook:\n",
"\n",
"### Setup notebook server\n",
"This pipeline requires you to [setup a notebook server](https://www.kubeflow.org/docs/notebooks/setup/) in the Kubeflow UI. After you are setup, *upload the notebook in the Kubeflow UI* and then run it in the notebook server.\n",
"\n",
"### Create a GCS bucket\n",
"This pipeline requires a GCS bucket. If you haven't already, [create a GCS bucket](https://cloud.google.com/storage/docs/creating-buckets) to run the notebook. Make sure to create the storage bucket in the same project that you are running Kubeflow on to have the proper permissions by default. You can also create a GCS bucket by running `gsutil mb -p <project_name> gs://<bucket_name>`.\n",
"\n",
"### Upload the notebook in the Kubeflow UI\n",
"In order to run this pipeline, make sure to upload the notebook to your notebook server in the Kubeflow UI. You can clone this repo in the Jupyter notebook server by connecting to the notebook server and then selecting New > Terminal. In the terminal type `git clone https://github.com/kubeflow/pipelines.git`.\n",
"\n",
"### Install Kubeflow pipelines\n",
"Install the `kfp` package if you haven't already."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"!pip install kfp --upgrade"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup project info and imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"GCS_BUCKET = 'gs://[BUCKET-NAME]' # GCS bucket name\n",
"PROJECT_NAME = '[PROJECT-NAME]' # GCP project name\n",
"\n",
"STAGING_GCS_PATH = GCS_BUCKET + '/multiple-output-sample'\n",
"TARGET_IMAGE = 'gcr.io/%s/multi-output:latest' % PROJECT_NAME\n",
"\n",
"BASE_IMAGE = 'tensorflow/tensorflow:1.11.0-py3'\n",
"EXPERIMENT_NAME = 'Multiple Outputs Sample'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp \n",
"import kfp.dsl as dsl\n",
"from kfp import compiler\n",
"from typing import NamedTuple"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create component\n",
"In order to create a component with multiple outputs, use `NamedTuple` with the same syntax as below."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Returns a*b and a+b\n",
"@dsl.python_component(\n",
" name='product_sum',\n",
" description='Calculates the product and the sum',\n",
" base_image=BASE_IMAGE\n",
")\n",
"def product_sum(a: float, b: float) -> NamedTuple(\n",
" 'output', [('product', float), ('sum', float)]):\n",
" '''Returns the product and sum of two numbers'''\n",
" from collections import namedtuple\n",
" \n",
" product_sum_output = namedtuple('output', ['product', 'sum'])\n",
" return product_sum_output(a*b, a+b)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"product_sum_op = compiler.build_python_component(\n",
" component_func=product_sum,\n",
" staging_gcs_path=STAGING_GCS_PATH,\n",
" base_image=BASE_IMAGE,\n",
" target_image=TARGET_IMAGE)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create and run pipeline\n",
"### Create pipeline\n",
"The pipeline parameters are specified in the `pipeline` function signature."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" name='Multiple Outputs Pipeline',\n",
" description='Sample pipeline to showcase multiple outputs'\n",
")\n",
"def pipeline(a=2.0, b=2.5, c=3.0):\n",
" prod_sum_task = product_sum_op(a, b)\n",
" prod_sum_task2 = product_sum_op(b, c)\n",
" prod_sum_task3 = product_sum_op(prod_sum_task.outputs['product'],\n",
" prod_sum_task2.outputs['sum'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_filename = 'multiple-outputs.pipelines.zip'\n",
"compiler.Compiler().compile(pipeline, pipeline_filename)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client = kfp.Client()\n",
"experiment = client.create_experiment(EXPERIMENT_NAME)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"arguments = {\n",
" 'a': 2.0,\n",
" 'b': 2.5,\n",
" 'c': 3.0,\n",
"}\n",
"run_name = 'multiple output run'\n",
"run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename,\n",
" params=arguments)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
47 changes: 38 additions & 9 deletions sdk/python/kfp/compiler/_component_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ def end(self):
line_sep = '\n'
return line_sep.join(self._code) + line_sep

#TODO: currently it supports single output, future support for multiple return values
def _func_to_entrypoint(component_func, python_version='python3'):
'''
args:
Expand All @@ -183,15 +182,27 @@ def _func_to_entrypoint(component_func, python_version='python3'):
annotations = fullargspec[6]
input_args = fullargspec[0]
inputs = {}
output = None
if 'return' in annotations.keys():
output = annotations['return']
output_is_named_tuple = hasattr(output, '_fields')

for key, value in annotations.items():
if key != 'return':
inputs[key] = value
if len(input_args) != len(inputs):
raise Exception('Some input arguments do not contain annotations.')
if 'return' in annotations and annotations['return'] not in [int, float, str, bool]:
if 'return' in annotations and annotations['return'] not in [int,
float, str, bool] and not output_is_named_tuple:
raise Exception('Output type not supported and supported types are [int, float, str, bool]')
if output_is_named_tuple:
types = output._field_types
for field in output._fields: #Make sure all elements are supported
if types[field] not in [int, float, str, bool]:
raise Exception('Output type not supported and supported types are [int, float, str, bool]')

# inputs is a dictionary with key of argument name and value of type class
# output is a type class, e.g. str and int.
# output is a type class, e.g. int, str, bool, float, NamedTuple.

# Follow the same indentation with the component source codes.
component_src = inspect.getsource(component_func)
Expand All @@ -205,12 +216,15 @@ def _func_to_entrypoint(component_func, python_version='python3'):
func_signature = 'def ' + new_func_name + '('
for input_arg in input_args:
func_signature += input_arg + ','
func_signature += '_output_file):'
func_signature = func_signature + '_output_files' if output_is_named_tuple else func_signature + '_output_file'
func_signature += '):'
codegen.writeline(func_signature)

# Call user function
codegen.indent()
call_component_func = 'output = ' + component_func.__name__ + '('
if output_is_named_tuple:
call_component_func = call_component_func.replace('output', 'outputs')
for input_arg in input_args:
call_component_func += inputs[input_arg].__name__ + '(' + input_arg + '),'
call_component_func = call_component_func.rstrip(',')
Expand All @@ -219,6 +233,9 @@ def _func_to_entrypoint(component_func, python_version='python3'):

# Serialize output
codegen.writeline('import os')
if output_is_named_tuple:
codegen.writeline('for _output_file, output in zip(_output_files, outputs):')
codegen.indent()
codegen.writeline('os.makedirs(os.path.dirname(_output_file))')
codegen.writeline('with open(_output_file, "w") as data:')
codegen.indent()
Expand All @@ -231,7 +248,10 @@ def _func_to_entrypoint(component_func, python_version='python3'):
codegen.writeline('parser = argparse.ArgumentParser(description="Parsing arguments")')
for input_arg in input_args:
codegen.writeline('parser.add_argument("' + input_arg + '", type=' + inputs[input_arg].__name__ + ')')
codegen.writeline('parser.add_argument("_output_file", type=str)')
if output_is_named_tuple:
codegen.writeline('parser.add_argument("_output_files", type=str, nargs=' + str(len(annotations['return']._fields)) + ')')
else:
codegen.writeline('parser.add_argument("_output_file", type=str)')
codegen.writeline('args = vars(parser.parse_args())')
codegen.writeline('')
codegen.writeline('if __name__ == "__main__":')
Expand All @@ -248,6 +268,8 @@ def _func_to_entrypoint(component_func, python_version='python3'):
if python_version == 'python2':
src_lines[start_line_num] = 'def ' + component_func.__name__ + '(' + ', '.join((inspect.getfullargspec(component_func).args)) + '):'
dedecorated_component_src = '\n'.join(src_lines[start_line_num:])
if output_is_named_tuple:
dedecorated_component_src = 'from typing import NamedTuple\n' + dedecorated_component_src

complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end()
return complete_component_code
Expand Down Expand Up @@ -345,17 +367,24 @@ def _generate_pythonop(component_func, target_image, target_component_file=None)
#TODO: Humanize the input/output names
input_names = inspect.getfullargspec(component_func)[0]

output_name = 'output'
return_ann = inspect.signature(component_func).return_annotation
output_is_named_tuple = hasattr(return_ann, '_fields')

output_names = ['output']
if output_is_named_tuple:
output_names = return_ann._fields

component_spec = ComponentSpec(
name=component_name,
description=component_description,
inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Chnage type to actual type
outputs=[OutputSpec(name=output_name)],
inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Change type to actual type
outputs=[OutputSpec(name=output_name, type='str') for output_name in output_names],
implementation=ContainerImplementation(
container=ContainerSpec(
image=target_image,
#command=['python3', program_file], #TODO: Include the command line
args=[InputValuePlaceholder(input_name) for input_name in input_names] + [OutputPathPlaceholder(output_name)],
args=[InputValuePlaceholder(input_name) for input_name in input_names] +
[OutputPathPlaceholder(output_name) for output_name in output_names],
)
)
)
Expand Down
38 changes: 36 additions & 2 deletions sdk/python/tests/compiler/component_builder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pathlib import Path
import inspect
from collections import OrderedDict
from typing import NamedTuple

class TestVersionedDependency(unittest.TestCase):

Expand Down Expand Up @@ -128,6 +129,12 @@ def sample_component_func_two(a: str, b: int) -> float:
def sample_component_func_three() -> float:
return 1.0

def sample_component_func_four() -> NamedTuple(
'output', [('a', float), ('b', str)]):
from collections import namedtuple
output = namedtuple('output', ['a', 'b'])
return output(1.0, 'test')

class TestGenerator(unittest.TestCase):
def test_generate_dockerfile(self):
""" Test generate dockerfile """
Expand Down Expand Up @@ -198,7 +205,7 @@ def test_generate_requirement(self):
self.assertEqual(target_payload, golden_payload)
os.remove(temp_file)

def test_generate_entrypoint(self):
def test_func_to_entrypoint(self):
""" Test entrypoint generation """

# prepare
Expand Down Expand Up @@ -281,7 +288,34 @@ def wrapper_sample_component_func_three(_output_file):
'''
self.assertEqual(golden, generated_codes)

def test_generate_entrypoint_python2(self):
generated_codes = _func_to_entrypoint(component_func=sample_component_func_four)
golden = '''\
from typing import NamedTuple
def sample_component_func_four() -> NamedTuple(
'output', [('a', float), ('b', str)]):
from collections import namedtuple
output = namedtuple('output', ['a', 'b'])
return output(1.0, 'test')
def wrapper_sample_component_func_four(_output_files):
outputs = sample_component_func_four()
import os
for _output_file, output in zip(_output_files, outputs):
os.makedirs(os.path.dirname(_output_file))
with open(_output_file, "w") as data:
data.write(str(output))
import argparse
parser = argparse.ArgumentParser(description="Parsing arguments")
parser.add_argument("_output_files", type=str, nargs=2)
args = vars(parser.parse_args())
if __name__ == "__main__":
wrapper_sample_component_func_four(**args)
'''
self.assertEqual(golden, generated_codes)

def test_func_to_entrypoint_python2(self):
""" Test entrypoint generation for python2"""

# prepare
Expand Down

0 comments on commit f18b7fd

Please sign in to comment.