Skip to content

Commit

Permalink
feature(sdk): Add custom task definition support (kubeflow#526)
Browse files Browse the repository at this point in the history
* add custom task template support

* fix typo

* fix lint

* update cel example to use official custom task

* add troubleshooting for example
  • Loading branch information
Tomcli authored Apr 7, 2021
1 parent 8d5d584 commit 48ac3bc
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
artifact_image: "minio/mc:RELEASE.2020-11-25T23-04-07Z"
archive_logs: "false"
track_artifacts: "true"
strip_eof: "false"
strip_eof: "true"
inject_default_script: "true"
artifact_script: |-
#!/usr/bin/env sh
Expand Down
1 change: 1 addition & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ If you are interested more in the larger list of pipelines samples we are testin
+ [The flip-coin pipeline](/samples/flip-coin)
+ [Nested pipeline example](/samples/nested-pipeline)
+ [Pipeline with Nested loops](/samples/nested-loops)
+ [Using Tekton Custom Task on KFP](/samples/tekton-custom-task)
30 changes: 30 additions & 0 deletions samples/tekton-custom-task/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Using Tekton Custom Task on KFP

This example shows how to define any Tekton Custom Task on KFP-Tekton and handle the custom task inputs and outputs.

## prerequisites
- Install [KFP Tekton prerequisites](/samples/README.md)

## Instructions
1. Install the Condtion custom task controller for computing runtime conditions for this example. Make sure to setup GOPATH and [ko](https://github.com/google/ko) before running the commands below.
```shell
git clone https://github.com/tektoncd/experimental/
cd experimental/cel
ko apply -f config/
```

2. Compile the flip-coin pipeline using the compiler inside the python code. The kfp-tekton SDK will produce a Tekton pipeline yaml definition in the same directory called `tekton-custom-task.yaml`.
```
# Compile the python code
python tekton-custom-task.py
```
Then, upload the `tekton-custom-task.yaml` file to the Kubeflow pipeline dashboard with Tekton Backend to run this pipeline.
## Troubleshooting
Make sure the Tekton deployment enabled custom task as instructed in the KFP-Tekton deployment.
```shell
kubectl patch cm feature-flags -n tekton-pipelines \
-p '{"data":{"disable-home-env-overwrite":"true","disable-working-directory-overwrite":"true", "enable-custom-tasks": "true"}}'
```
54 changes: 54 additions & 0 deletions samples/tekton-custom-task/tekton-custom-task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp
from kfp import dsl
from kfp_tekton.tekton import CEL_ConditionOp


def flip_coin_op():
"""Flip a coin and output heads or tails randomly."""
return dsl.ContainerOp(
name='Flip coin',
image='python:alpine3.6',
command=['sh', '-c'],
arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
'else \'tails\'; print(result)" | tee /tmp/output'],
file_outputs={'output': '/tmp/output'}
)


def print_op(msg):
"""Print a message."""
return dsl.ContainerOp(
name='Print',
image='alpine:3.6',
command=['echo', msg],
)


@dsl.pipeline(
name='Tekton custom task on Kubeflow Pipeline',
description='Shows how to use Tekton custom task with KFP'
)
def custom_task_pipeline():
flip = flip_coin_op()
cel_condition = CEL_ConditionOp("'%s' == 'heads'" % flip.output)
print_op('Condition output is %s' % cel_condition.output)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(custom_task_pipeline, __file__.replace('.py', '.yaml'))

22 changes: 11 additions & 11 deletions sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def fix_big_data_passing(workflow: dict) -> dict:
resource_templates = []
for task in tasks:
resource_params = [
param.get('name') for param in task["taskSpec"].get('params', [])
param.get('name') for param in task.get("taskSpec", {}).get('params', [])
if param.get('name') == 'action'
or param.get('name') == 'success-condition'
]
Expand Down Expand Up @@ -174,7 +174,7 @@ def fix_big_data_passing(workflow: dict) -> dict:
# Searching for artifact input consumers in container template inputs
for template in container_templates:
template_name = template['name']
for input_artifact in template['taskSpec'].get('artifacts', {}):
for input_artifact in template.get("taskSpec", {}).get('artifacts', {}):
raw_data = input_artifact['raw'][
'data'] # The structure must exist
# The raw data must be a single input parameter reference. Otherwise (e.g. it's a string
Expand Down Expand Up @@ -485,33 +485,33 @@ def big_data_passing_tasks(task: dict, pipelinerun_template: dict,
workspaces_parameter = '$(workspaces.%s.path)/%s-%s' % (
task_name, task_name, task_output.get('name'))
task['taskSpec'] = replace_big_data_placeholder(
task['taskSpec'], placeholder, workspaces_parameter)
task.get("taskSpec", {}), placeholder, workspaces_parameter)
pipelinerun_template['metadata']['annotations'] = replace_big_data_placeholder(
pipelinerun_template['metadata']['annotations'], placeholder, workspaces_parameter)

# Remove artifacts outputs from results
task['taskSpec']['results'] = [
task.get("taskSpec", {})['results'] = [
result for result in task_outputs
if (task_name, result.get('name')) not in outputs_tasks
]

# Data passing for task inputs
task_spec = task.get('taskSpec', {})
task_parmas = task_spec.get('params', [])
task_params = task_spec.get('params', [])
task_artifacts = task_spec.get('artifacts', [])
for task_parma in task_parmas:
if (task_name, task_parma.get('name')) in inputs_tasks:
for task_param in task_params:
if (task_name, task_param.get('name')) in inputs_tasks:
if not task_spec.setdefault('workspaces', []):
task_spec['workspaces'].append({"name": task_name})
# Replace the args for the inputs in the task_spec
# /tmp/inputs/text/data ---->
# $(workspaces.task_name.path)/task_parma.get('name')
# $(workspaces.task_name.path)/task_param.get('name')
placeholder = '/tmp/inputs/text/data'
for task_artifact in task_artifacts:
if task_artifact.get('name') == task_parma.get('name'):
if task_artifact.get('name') == task_param.get('name'):
placeholder = task_artifact.get('path')
workspaces_parameter = '$(workspaces.%s.path)/%s' % (
task_name, task_parma.get('name'))
task_name, task_param.get('name'))
task['taskSpec'] = replace_big_data_placeholder(
task_spec, placeholder, workspaces_parameter)
task_spec = task.get('taskSpec', {})
Expand All @@ -522,7 +522,7 @@ def big_data_passing_tasks(task: dict, pipelinerun_template: dict,
task = input_artifacts_tasks(task, task_artifact)

# Remove artifacts parameter from params
task['taskSpec']['params'] = [
task.get("taskSpec", {})['params'] = [
param for param in task_spec.get('params', [])
if (task_name, param.get('name')) not in inputs_tasks
]
Expand Down
114 changes: 84 additions & 30 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from kfp_tekton.compiler.yaml_utils import dump_yaml
from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf
from kfp_tekton.compiler._tekton_handler import _handle_tekton_pipeline_variables, _handle_tekton_custom_task
from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES

DEFAULT_ARTIFACT_BUCKET = env.get('DEFAULT_ARTIFACT_BUCKET', 'mlpipeline')
DEFAULT_ARTIFACT_ENDPOINT = env.get('DEFAULT_ARTIFACT_ENDPOINT', 'minio-service.kubeflow:9000')
Expand Down Expand Up @@ -535,17 +536,48 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
'taskSpec': template['spec'],
}

task_ref['taskSpec']['metadata'] = task_ref['taskSpec'].get('metadata', {})
task_labels = template['metadata'].get('labels', {})
task_ref['taskSpec']['metadata']['labels'] = task_labels
task_labels['pipelines.kubeflow.org/pipelinename'] = task_labels.get('pipelines.kubeflow.org/pipelinename', '')
task_labels['pipelines.kubeflow.org/generation'] = task_labels.get('pipelines.kubeflow.org/generation', '')
cache_default = self.pipeline_labels.get('pipelines.kubeflow.org/cache_enabled', 'true')
task_labels['pipelines.kubeflow.org/cache_enabled'] = task_labels.get('pipelines.kubeflow.org/cache_enabled', cache_default)

task_annotations = template['metadata'].get('annotations', {})
task_ref['taskSpec']['metadata']['annotations'] = task_annotations
task_annotations['tekton.dev/template'] = task_annotations.get('tekton.dev/template', '')
for i in template['spec'].get('steps', []):
# TODO: change the below conditions to map with a label
# or a list of images with optimized actions
if i.get('image', '') in TEKTON_CUSTOM_TASK_IMAGES:
custom_task_args = {}
container_args = i.get('args', [])
for index, item in enumerate(container_args):
if item.startswith('--'):
custom_task_args[item[2:]] = container_args[index + 1]
non_param_keys = ['name', 'apiVersion', 'kind']
task_params = []
for key, value in custom_task_args.items():
if key not in non_param_keys:
task_params.append({'name': key, 'value': value})
task_ref = {
'name': template['metadata']['name'],
'params': task_params,
# For processing Tekton parameter mapping later on.
'orig_params': task_ref['params'],
'taskRef': {
'name': custom_task_args['name'],
'apiVersion': custom_task_args['apiVersion'],
'kind': custom_task_args['kind']
}
}
# Pop custom task artifacts since we have no control of how
# custom task controller is handling the container/task execution.
self.artifact_items.pop(template['metadata']['name'], None)
self.output_artifacts.pop(template['metadata']['name'], None)
break
if task_ref.get('taskSpec', ''):
task_ref['taskSpec']['metadata'] = task_ref['taskSpec'].get('metadata', {})
task_labels = template['metadata'].get('labels', {})
task_ref['taskSpec']['metadata']['labels'] = task_labels
task_labels['pipelines.kubeflow.org/pipelinename'] = task_labels.get('pipelines.kubeflow.org/pipelinename', '')
task_labels['pipelines.kubeflow.org/generation'] = task_labels.get('pipelines.kubeflow.org/generation', '')
cache_default = self.pipeline_labels.get('pipelines.kubeflow.org/cache_enabled', 'true')
task_labels['pipelines.kubeflow.org/cache_enabled'] = task_labels.get('pipelines.kubeflow.org/cache_enabled', cache_default)

task_annotations = template['metadata'].get('annotations', {})
task_ref['taskSpec']['metadata']['annotations'] = task_annotations
task_annotations['tekton.dev/template'] = task_annotations.get('tekton.dev/template', '')

task_refs.append(task_ref)

Expand Down Expand Up @@ -613,23 +645,43 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
loop_args.extend(self.loops_pipeline[key]['loop_sub_args'])
for task in task_refs:
op = pipeline.ops.get(task['name'])
for tp in task.get('params', []):
if tp['name'] in pipeline_param_names + loop_args:
tp['value'] = '$(params.%s)' % tp['name']
else:
for pp in op.inputs:
if tp['name'] == pp.full_name:
tp['value'] = '$(tasks.%s.results.%s)' % (pp.op_name, pp.name)
# Create input artifact tracking annotation
input_annotation = self.input_artifacts.get(task['name'], [])
input_annotation.append(
{
'name': tp['name'],
'parent_task': pp.op_name
}
)
self.input_artifacts[task['name']] = input_annotation
break
# Substitute Custom task paramters to the correct mapping.
if task.get('orig_params', []):
orig_params = [p['name'] for p in task.get('orig_params', [])]
for tp in task.get('params', []):
pipeline_params = re.findall('\$\(inputs.params.([^ \t\n.:,;{}]+)\)', tp.get('value', ''))
pipeline_param = pipeline_params[0] if pipeline_params else ""
if pipeline_param in orig_params:
if pipeline_param in pipeline_param_names + loop_args:
substitute_param = '$(params.%s)' % pipeline_param
tp['value'] = re.sub('\$\(inputs.params.([^ \t\n.:,;{}]+)\)', substitute_param, tp.get('value', ''))
else:
for pp in op.inputs:
if pipeline_param == pp.full_name:
substitute_param = '$(tasks.%s.results.%s)' % (pp.op_name, pp.name)
tp['value'] = re.sub('\$\(inputs.params.([^ \t\n.:,;{}]+)\)', substitute_param, tp.get('value', ''))
break
# Not necessary for Tekton execution
task.pop('orig_params', None)
else:
op = pipeline.ops.get(task['name'])
for tp in task.get('params', []):
if tp['name'] in pipeline_param_names + loop_args:
tp['value'] = '$(params.%s)' % tp['name']
else:
for pp in op.inputs:
if tp['name'] == pp.full_name:
tp['value'] = '$(tasks.%s.results.%s)' % (pp.op_name, pp.name)
# Create input artifact tracking annotation
input_annotation = self.input_artifacts.get(task['name'], [])
input_annotation.append(
{
'name': tp['name'],
'parent_task': pp.op_name
}
)
self.input_artifacts[task['name']] = input_annotation
break

# add retries params
for task in task_refs:
Expand All @@ -640,8 +692,10 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
# add timeout params to task_refs, instead of task.
for task in task_refs:
op = pipeline.ops.get(task['name'])
if op != None and (not TEKTON_GLOBAL_DEFAULT_TIMEOUT or op.timeout):
task['timeout'] = '%ds' % op.timeout
# Custom task doesn't support timeout feature
if task.get('taskSpec', ''):
if op != None and (not TEKTON_GLOBAL_DEFAULT_TIMEOUT or op.timeout):
task['timeout'] = '%ds' % op.timeout

# handle resourceOp cases in pipeline
self._process_resourceOp(task_refs, pipeline)
Expand Down
28 changes: 26 additions & 2 deletions sdk/python/kfp_tekton/tekton.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# limitations under the License.

from typing import List

from kfp import dsl
from kfp.dsl._container_op import ContainerOp
from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name


CEL_EVAL_IMAGE = "aipipeline/cel-eval:latest"
TEKTON_CUSTOM_TASK_IMAGES = [CEL_EVAL_IMAGE]


class AnySequencer(ContainerOp):
"""A containerOp that will proceed when any of the dependent containerOps completed
successfully
Expand Down Expand Up @@ -60,8 +63,29 @@ def after_any(any: List[dsl.ContainerOp], name: str = None):
The function adds a new AnySequencer and connects the given op to it
'''
seq = AnySequencer(any, name)

def _after_components(cop):
cop.after(seq)
return cop
return _after_components


def CEL_ConditionOp(condition_statement):
'''A containerOp template for CEL and converts it into Tekton custom task
during Tekton compiliation.
Args:
condition_statement: CEL expression statement using string and/or pipeline params.
'''
ConditionOp = dsl.ContainerOp(
name="condition-cel",
image=CEL_EVAL_IMAGE,
command=["sh", "-c"],
arguments=["--apiVersion", "cel.tekton.dev/v1alpha1",
"--kind", "CEL",
"--name", "cel_condition",
"--status", condition_statement],
file_outputs={'status': '/tmp/tekton'}
)
ConditionOp.add_pod_annotation("valid_container", "false")
return ConditionOp
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ def test_recursion_while_workflow(self):
from .testdata.recursion_while import flipcoin
self._test_pipeline_workflow(flipcoin, 'recursion_while.yaml')

def test_tekton_custom_task_workflow(self):
"""
Test Tekton custom task workflow.
"""
from .testdata.tekton_custom_task import custom_task_pipeline
self._test_pipeline_workflow(custom_task_pipeline, 'tekton_custom_task.yaml')

def test_long_pipeline_name_workflow(self):
"""
Test long pipeline name workflow.
Expand Down
Loading

0 comments on commit 48ac3bc

Please sign in to comment.