From 33ed2a5aaa03d7641b05e4e713ee437f5b56b984 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Thu, 28 Feb 2019 14:58:03 +0800 Subject: [PATCH 01/12] Feature: sidecar for ContainerOp --- sdk/python/kfp/compiler/compiler.py | 30 ++++ sdk/python/kfp/dsl/__init__.py | 2 +- sdk/python/kfp/dsl/_container_op.py | 229 +++++++++++++++++----------- 3 files changed, 173 insertions(+), 88 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index ba83896043d..1ec1a281749 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -332,6 +332,36 @@ def _build_conventional_artifact(name, path): if op.num_retries: template['retryStrategy'] = {'limit': op.num_retries} + if op.sidecars: + def _sidecar_to_template(tsidecar): + index, sidecar = tsidecar + sidecar_template = { + 'name': f'{op.name}-{sidecar.name}-{index}', + 'image': sidecar.image + } + sidecar_processed_arguments = self._process_args(op.arguments, op.argument_inputs) + sidecar_processed_command = self._process_args(op.command, op.argument_inputs) + if sidecar_processed_arguments: + sidecar_template['args'] = sidecar_processed_arguments + if sidecar_processed_command: + sidecar_template['command'] = sidecar_processed_command + # Set resources. + if sidecar.resource_limits or sidecar.resource_requests: + sidecar_template['resources'] = {} + if sidecar.resource_limits: + sidecar_template['resources']['limits'] = sidecar.resource_limits + if sidecar.resource_requests: + sidecar_template['resources']['requests'] = sidecar.resource_requests + # env variables + if sidecar.env_variables: + sidecar_template['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, sidecar.env_variables)) + # volume mounts + if sidecar.volume_mounts: + sidecar_template['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, sidecar.volume_mounts)) + return sidecar_template + + template['sidecars'] = list(map(_sidecar_to_template, enumerate(op.sidecars))) + return template def _group_to_template(self, group, inputs, outputs, dependencies): diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index 06fae50887b..fd4953642f7 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -15,7 +15,7 @@ from ._pipeline_param import PipelineParam from ._pipeline import Pipeline, pipeline, get_pipeline_conf -from ._container_op import ContainerOp +from ._container_op import ContainerOp, SideCar from ._ops_group import OpsGroup, ExitHandler, Condition from ._component import python_component #TODO: expose the component decorator when ready \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 4fb7d6aee59..7fecbb0babc 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -20,89 +20,22 @@ import re from typing import Dict -class ContainerOp(object): - """Represents an op implemented by a docker container image.""" - def __init__(self, name: str, image: str, command: str=None, arguments: str=None, - file_outputs : Dict[str, str]=None, is_exit_handler=False): - """Create a new instance of ContainerOp. +class ContainerBase(object): + """Represents a container template in a pod""" + def __init__(self, name: str, image: str): + """ Args: - name: the name of the op. It does not have to be unique within a pipeline - because the pipeline will generates a unique new name in case of conflicts. - image: the container image name, such as 'python:3.5-jessie' - command: the command to run in the container. - If None, uses default CMD in defined in container. - arguments: the arguments of the command. The command can include "%s" and supply - a PipelineParam as the string replacement. For example, ('echo %s' % input_param). - At container run time the argument will be 'echo param_value'. - file_outputs: Maps output labels to local file paths. At pipeline run time, - the value of a PipelineParam is saved to its corresponding local file. It's - one way for outside world to receive outputs of the container. - is_exit_handler: Whether it is used as an exit handler. + name: the name of the container specified as a DNS_LABEL. + image: the sidecar container image name, such as 'rabbitmq:latest'. """ - - if not _pipeline.Pipeline.get_default_pipeline(): - raise ValueError('Default pipeline not defined.') - - valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' - if not re.match(valid_name_regex, name): - raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name)) - - self.human_name = name - self.name = _pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler) + self.name = name self.image = image - self.command = command - self.arguments = arguments - self.is_exit_handler = is_exit_handler self.resource_limits = {} self.resource_requests = {} - self.node_selector = {} - self.volumes = [] self.volume_mounts = [] self.env_variables = [] - self.pod_annotations = {} - self.pod_labels = {} - self.num_retries = 0 - self._metadata = None - - self.argument_inputs = _extract_pipelineparams([str(arg) for arg in (command or []) + (arguments or [])]) - - self.file_outputs = file_outputs - self.dependent_op_names = [] - - self.inputs = [] - if self.argument_inputs: - self.inputs += self.argument_inputs - - self.outputs = {} - if file_outputs: - self.outputs = {name: _pipeline_param.PipelineParam(name, op_name=self.name) - for name in file_outputs.keys()} - - self.output=None - if len(self.outputs) == 1: - self.output = list(self.outputs.values())[0] - - def apply(self, mod_func): - """Applies a modifier function to self. The function should return the passed object. - This is needed to chain "extention methods" to this class. - - Example: - from kfp.gcp import use_gcp_secret - task = ( - train_op(...) - .set_memory_request('1GB') - .apply(use_gcp_secret('user-gcp-sa')) - .set_memory_limit('2GB') - ) - """ - return mod_func(self) - - def after(self, op): - """Specify explicit dependency on another op.""" - self.dependent_op_names.append(op.name) - return self def _validate_memory_string(self, memory_string): """Validate a given string is valid for memory request or limit.""" @@ -212,19 +145,7 @@ def set_gpu_limit(self, gpu, vendor = "nvidia"): if vendor != 'nvidia' and vendor != 'amd': raise ValueError('vendor can only be nvidia or amd.') - return self.add_resource_limit("%s.com/gpu" % vendor, gpu) - - def add_volume(self, volume): - """Add K8s volume to the container - - Args: - volume: Kubernetes volumes - For detailed spec, check volume definition - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py - """ - - self.volumes.append(volume) - return self + return self.add_resource_limit("%s.com/gpu" % vendor, gpu) def add_volume_mount(self, volume_mount): """Add volume to the container @@ -250,6 +171,132 @@ def add_env_variable(self, env_variable): self.env_variables.append(env_variable) return self + +class SideCar(ContainerBase): + """Represents a sidecar container to be provisioned together with a docker container image for ContainerOp.""" + + def __init__(self, name: str, image: str, command: str=None, arguments: str=None): + """ + Args: + name: the name of the sidecar. + image: the sidecar container image name, such as 'rabbitmq:latest' + command: the command to run in the container. + If None, uses default CMD in defined in container. + arguments: the arguments of the command. The command can include "%s" and supply + a PipelineParam as the string replacement. For example, ('echo %s' % input_param). + At container run time the argument will be 'echo param_value'. + """ + # name will be generated when attached to ContainerOps + super().__init__(name, image) + self.command = command + self.arguments = arguments + + matches = [] + for arg in (command or []) + (arguments or []): + match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg)) + matches += match + + self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2]) + for x in list(set(matches))] + + +class ContainerOp(ContainerBase): + """Represents an op implemented by a docker container image.""" + + def __init__(self, name: str, image: str, command: str=None, arguments: str=None, + file_outputs : Dict[str, str]=None, is_exit_handler=False): + """Create a new instance of ContainerOp. + + Args: + name: the name of the op. It does not have to be unique within a pipeline + because the pipeline will generates a unique new name in case of conflicts. + image: the container image name, such as 'python:3.5-jessie' + command: the command to run in the container. + If None, uses default CMD in defined in container. + arguments: the arguments of the command. The command can include "%s" and supply + a PipelineParam as the string replacement. For example, ('echo %s' % input_param). + At container run time the argument will be 'echo param_value'. + file_outputs: Maps output labels to local file paths. At pipeline run time, + the value of a PipelineParam is saved to its corresponding local file. It's + one way for outside world to receive outputs of the container. + is_exit_handler: Whether it is used as an exit handler. + """ + if not _pipeline.Pipeline.get_default_pipeline(): + raise ValueError('Default pipeline not defined.') + + valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' + if not re.match(valid_name_regex, name): + raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name)) + + # human_name must exist to construct containerOps name + self.human_name = name + super().__init__(_pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler), image) + + self.command = command + self.arguments = arguments + self.is_exit_handler = is_exit_handler + self.node_selector = {} + self.volumes = [] + self.pod_annotations = {} + self.pod_labels = {} + self.num_retries = 0 + self.sidecars = [] + + matches = [] + for arg in (command or []) + (arguments or []): + match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg)) + matches += match + + self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2]) + for x in list(set(matches))] + self.file_outputs = file_outputs + self.dependent_op_names = [] + + self.inputs = [] + if self.argument_inputs: + self.inputs += self.argument_inputs + + self.outputs = {} + if file_outputs: + self.outputs = {name: _pipeline_param.PipelineParam(name, op_name=self.name) + for name in file_outputs.keys()} + + self.output=None + if len(self.outputs) == 1: + self.output = list(self.outputs.values())[0] + + def apply(self, mod_func): + """Applies a modifier function to self. The function should return the passed object. + This is needed to chain "extention methods" to this class. + + Example: + from kfp.gcp import use_gcp_secret + task = ( + train_op(...) + .set_memory_request('1GB') + .apply(use_gcp_secret('user-gcp-sa')) + .set_memory_limit('2GB') + ) + """ + return mod_func(self) + + def after(self, op): + """Specify explicit dependency on another op.""" + self.dependent_op_names.append(op.name) + return self + + def add_volume(self, volume): + """Add K8s volume to the container + + Args: + volume: Kubernetes volumes + For detailed spec, check volume definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py + """ + + self.volumes.append(volume) + return self + def add_node_selector_constraint(self, label_name, value): """Add a constraint for nodeSelector. Each constraint is a key-value pair label. For the container to be eligible to run on a node, the node must have each of the constraints appeared @@ -295,6 +342,14 @@ def set_retry(self, num_retries: int): self.num_retries = num_retries return self + def add_sidecar(self, sidecar: SideCar): + """Add a sidecar to the ContainerOps. + + Args: + sidecar: SideCar object. + """ + self.sidecars.append(sidecar) + def __repr__(self): return str({self.__class__.__name__: self.__dict__}) From c99a46d62a2445e75c8774791448d33f15831092 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 1 Mar 2019 19:16:03 +0800 Subject: [PATCH 02/12] replace f-string with string format for compatibility with py3.5 --- sdk/python/kfp/compiler/compiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 1ec1a281749..7647f14bf81 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -336,7 +336,7 @@ def _build_conventional_artifact(name, path): def _sidecar_to_template(tsidecar): index, sidecar = tsidecar sidecar_template = { - 'name': f'{op.name}-{sidecar.name}-{index}', + 'name': '{}-{}-{}'.format(op.name, sidecar.name, index), 'image': sidecar.image } sidecar_processed_arguments = self._process_args(op.arguments, op.argument_inputs) From 383a4f5a38857066470e898a9c6f69b0c672ae3c Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 8 Mar 2019 20:45:38 +0800 Subject: [PATCH 03/12] ContainerOp now can be updated with any k8s V1Container attributes as well as sidecars with Sidecar class. ContainerOp accepts PipelineParam in any valid k8 properties. --- sdk/python/kfp/compiler/_k8s_helper.py | 9 +- sdk/python/kfp/compiler/_op_to_template.py | 277 +++++ sdk/python/kfp/compiler/compiler.py | 171 +-- sdk/python/kfp/dsl/__init__.py | 2 +- sdk/python/kfp/dsl/_container_op.py | 1105 ++++++++++++----- sdk/python/kfp/dsl/_pipeline_param.py | 69 +- sdk/python/tests/compiler/compiler_tests.py | 5 +- .../with_sidecars_and_pipelineparams.py | 111 ++ .../with_sidecars_and_pipelineparams.yaml | 227 ++++ sdk/python/tests/dsl/container_op_tests.py | 9 +- sdk/python/tests/dsl/pipeline_param_tests.py | 25 +- 11 files changed, 1514 insertions(+), 496 deletions(-) create mode 100644 sdk/python/kfp/compiler/_op_to_template.py create mode 100644 sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py create mode 100644 sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml diff --git a/sdk/python/kfp/compiler/_k8s_helper.py b/sdk/python/kfp/compiler/_k8s_helper.py index bc54c307b0c..14ffe8024a9 100644 --- a/sdk/python/kfp/compiler/_k8s_helper.py +++ b/sdk/python/kfp/compiler/_k8s_helper.py @@ -19,6 +19,9 @@ import logging import re +from .. import dsl + + class K8sHelper(object): """ Kubernetes Helper """ @@ -159,7 +162,11 @@ def convert_k8s_obj_to_json(k8s_obj): for sub_obj in obj) elif isinstance(k8s_obj, (datetime, date)): return k8s_obj.isoformat() - + elif isinstance(k8s_obj, dsl.PipelineParam): + if isinstance(k8s_obj.value, str): + return k8s_obj.value + return '{{inputs.parameters.%s}}' % k8s_obj.full_name + if isinstance(k8s_obj, dict): obj_dict = k8s_obj else: diff --git a/sdk/python/kfp/compiler/_op_to_template.py b/sdk/python/kfp/compiler/_op_to_template.py new file mode 100644 index 00000000000..dfbb002c7da --- /dev/null +++ b/sdk/python/kfp/compiler/_op_to_template.py @@ -0,0 +1,277 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +from typing import Union, List, Any, Callable, TypeVar, Dict + +from ._k8s_helper import K8sHelper +from .. import dsl + +# generics +T = TypeVar('T') + + +def _get_pipelineparam(payload: str) -> List[str]: + """Get a list of `PipelineParam` from a string. + + Args: + payload {str}: string + """ + + matches = re.findall( + r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', + payload) + return [ + dsl.PipelineParam(x[1], x[0], x[2]) for x in list(set(matches)) + ] + + +def _sanitize_pipelineparam(param: dsl.PipelineParam, in_place=True): + """Sanitize the name and op_name of a PipelineParam. + + Args: + params: a PipelineParam to sanitize + in_place: if set, do an in-place update to PipelineParm, otherwise return a + new instance of PipelineParam. + """ + if in_place: + param.name = K8sHelper.sanitize_k8s_name(param.name) + param.op_name = K8sHelper.sanitize_k8s_name( + param.op_name) if param.op_name else param.op_name + return param + + return dsl.PipelineParam( + K8sHelper.sanitize_k8s_name(param.name), + K8sHelper.sanitize_k8s_name(param.op_name), param.value) + + +def _sanitize_pipelineparams( + params: Union[dsl.PipelineParam, List[dsl.PipelineParam]], + in_place=True): + """Sanitize the name(s) of a PipelineParam (or a list of PipelineParam) and + return a list of sanitized PipelineParam. + + Args: + params: a PipelineParam or a list of PipelineParam to sanitize + in_place: if set, do an in-place update to the PipelineParm, otherwise return + new instances of PipelineParam. + """ + params = params if isinstance(params, list) else [params] + return [_sanitize_pipelineparam(param, in_place) for param in params] + + +def _process_obj(obj: Any, map_to_tmpl_var: dict): + """Recursively sanitize and replace any PipelineParam (instances and serialized strings) + in the object with the corresponding template variables + (i.e. '{{inputs.parameters.}}'). + + Args: + obj: any obj that may have PipelineParam + map_to_tmpl_var: a dict that maps an unsanitized pipeline + params signature into a template var + """ + # serialized str might be unsanitized + if isinstance(obj, str): + # get signature + pipeline_params = _get_pipelineparam(obj) + if not pipeline_params: + return obj + # replace all unsanitized signature with template var + for param in pipeline_params: + pattern = str(param) + sanitized = str(_sanitize_pipelineparam(param)) + obj = re.sub(pattern, map_to_tmpl_var[sanitized], obj) + + # list + if isinstance(obj, list): + return [_process_obj(item, map_to_tmpl_var) for item in obj] + + # tuple + if isinstance(obj, tuple): + return tuple((_process_obj(item, map_to_tmpl_var) for item in obj)) + + # dict + if isinstance(obj, dict): + return { + key: _process_obj(value, map_to_tmpl_var) + for key, value in obj.items() + } + + # pipelineparam + if isinstance(obj, dsl.PipelineParam): + # if not found in unsanitized map, then likely to be sanitized + return map_to_tmpl_var.get( + str(obj), '{{inputs.parameters.%s}}' % obj.full_name) + + # k8s_obj + if hasattr(obj, 'swagger_types') and isinstance(obj.swagger_types, dict): + # process everything inside recursively + for key in obj.swagger_types.keys(): + setattr(obj, key, _process_obj(getattr(obj, key), map_to_tmpl_var)) + # return json representation of the k8s obj + return K8sHelper.convert_k8s_obj_to_json(obj) + + # do nothing + return obj + + +def _process_container_ops(op: dsl.ContainerOp): + """Recursively go through the attrs listed in `attrs_with_pipelineparams` + and sanitize and replace pipeline params with template var string. + + Returns a processed `ContainerOp`. + + NOTE this is an in-place update to `ContainerOp`'s attributes (i.e. other than + `file_outputs`, and `outputs`, all `PipelineParam` are replaced with the + corresponding template variable strings). + + Args: + op {dsl.ContainerOp}: class that inherits from ds.ContainerOp + + Returns: + dsl.ContainerOp + """ + + # tmp map: unsanitized rpr -> sanitized PipelineParam + # in-place sanitize of all PipelineParam (except outputs and file_outputs) + _map = { + str(param): _sanitize_pipelineparam(param, in_place=True) + for param in op.inputs + } + + # map: unsanitized pipeline param rpr -> template var string + # used to replace unsanitized pipeline param strings with the corresponding + # template var strings + map_to_tmpl_var = { + key: '{{inputs.parameters.%s}}' % param.full_name + for key, param in _map.items() + } + + # process all attr with pipelineParams except inputs and outputs parameters + for key in op.attrs_with_pipelineparams: + setattr(op, key, _process_obj(getattr(op, key), map_to_tmpl_var)) + + return op + + +def _parameters_to_json(params: List[dsl.PipelineParam]): + """Converts a list of PipelineParam into an argo `parameter` JSON obj.""" + _to_json = (lambda param: dict(name=param.full_name, value=param.value) + if param.value else dict(name=param.full_name)) + params = [_to_json(param) for param in params] + # Sort to make the results deterministic. + params.sort(key=lambda x: x['name']) + return params + + +# TODO: artifacts? +def _inputs_to_json(inputs_params: List[dsl.PipelineParam], _artifacts=None): + """Converts a list of PipelineParam into an argo `inputs` JSON obj.""" + parameters = _parameters_to_json(inputs_params) + return {'parameters': parameters } if parameters else None + + +def _outputs_to_json(outputs: Dict[str, dsl.PipelineParam], + file_outputs: Dict[str, str], + output_artifacts: List[dict]): + """Creates an argo `outputs` JSON obj.""" + output_parameters = [] + for param in outputs.values(): + output_parameters.append({ + 'name': param.full_name, + 'valueFrom': { + 'path': file_outputs[param.name] + } + }) + output_parameters.sort(key=lambda x: x['name']) + ret = {} + if output_parameters: + ret['parameters'] = output_parameters + if output_artifacts: + ret['artifacts'] = output_artifacts + + return ret + + +def _build_conventional_artifact(name): + return { + 'name': name, + 'path': '/' + name + '.json', + 's3': { + # TODO: parameterize namespace for minio service + 'endpoint': 'minio-service.kubeflow:9000', + 'bucket': 'mlpipeline', + 'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz', + 'insecure': True, + 'accessKeySecret': { + 'name': 'mlpipeline-minio-artifact', + 'key': 'accesskey', + }, + 'secretKeySecret': { + 'name': 'mlpipeline-minio-artifact', + 'key': 'secretkey' + } + }, + } + +# TODO: generate argo python classes from swagger and use convert_k8s_obj_to_json?? +def _op_to_template(op: dsl.ContainerOp): + """Generate template given an operator inherited from dsl.ContainerOp.""" + + # NOTE in-place update to ContainerOp + # replace all PipelineParams (except in `file_outputs`, `outputs`, `inputs`) + # with template var strings + processed_op = _process_container_ops(op) + + # default output artifacts + output_artifacts = [ + _build_conventional_artifact(name) + for name in ['mlpipeline-ui-metadata', 'mlpipeline-metrics'] + ] + + # workflow template + template = { + 'name': op.name, + 'container': K8sHelper.convert_k8s_obj_to_json(op.container) + } + + # inputs + inputs = _inputs_to_json(op.inputs) + if inputs: + template['inputs'] = inputs + + # outputs + template['outputs'] = _outputs_to_json(op.outputs, op.file_outputs, + output_artifacts) + + # node selector + if processed_op.node_selector: + template['nodeSelector'] = processed_op.node_selector + + # metadata + if processed_op.pod_annotations or processed_op.pod_labels: + template['metadata'] = {} + if processed_op.pod_annotations: + template['metadata']['annotations'] = processed_op.pod_annotations + if processed_op.pod_labels: + template['metadata']['labels'] = processed_op.pod_labels + # retries + if processed_op.num_retries: + template['retryStrategy'] = {'limit': processed_op.num_retries} + + # sidecars + if processed_op.sidecars: + template['sidecars'] = processed_op.sidecars + + return template diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 7647f14bf81..600cb433bce 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -21,6 +21,8 @@ from .. import dsl from ._k8s_helper import K8sHelper +from ._op_to_template import _op_to_template + class Compiler(object): """DSL Compiler. @@ -117,7 +119,6 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): # it as input for its parent groups. if param.value: continue - full_name = self._pipelineparam_full_name(param) if param.op_name: upstream_op = pipeline.ops[param.op_name] @@ -211,158 +212,8 @@ def _resolve_value_or_reference(self, value_or_reference, potential_references): else: return str(value_or_reference) - def _process_args(self, raw_args, argument_inputs): - if not raw_args: - return [] - processed_args = list(map(str, raw_args)) - for i, _ in enumerate(processed_args): - # unsanitized_argument_inputs stores a dict: string of sanitized param -> string of unsanitized param - matches = [] - match = re.findall(r'{{pipelineparam:op=([\w\s\_-]*);name=([\w\s\_-]+);value=(.*?)}}', str(processed_args[i])) - matches += match - unsanitized_argument_inputs = {} - for x in list(set(matches)): - sanitized_str = str(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(x[1]), K8sHelper.sanitize_k8s_name(x[0]), x[2])) - unsanitized_argument_inputs[sanitized_str] = str(dsl.PipelineParam(x[1], x[0], x[2])) - - if argument_inputs: - for param in argument_inputs: - if str(param) in unsanitized_argument_inputs: - full_name = self._pipelineparam_full_name(param) - processed_args[i] = re.sub(unsanitized_argument_inputs[str(param)], '{{inputs.parameters.%s}}' % full_name, - processed_args[i]) - return processed_args - def _op_to_template(self, op): - """Generate template given an operator inherited from dsl.ContainerOp.""" - - def _build_conventional_artifact(name, path): - return { - 'name': name, - 'path': path, - 's3': { - # TODO: parameterize namespace for minio service - 'endpoint': 'minio-service.kubeflow:9000', - 'bucket': 'mlpipeline', - 'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz', - 'insecure': True, - 'accessKeySecret': { - 'name': 'mlpipeline-minio-artifact', - 'key': 'accesskey', - }, - 'secretKeySecret': { - 'name': 'mlpipeline-minio-artifact', - 'key': 'secretkey' - } - }, - } - - processed_arguments = self._process_args(op.arguments, op.argument_inputs) - processed_command = self._process_args(op.command, op.argument_inputs) - - input_parameters = [] - for param in op.inputs: - one_parameter = {'name': self._pipelineparam_full_name(param)} - if param.value: - one_parameter['value'] = str(param.value) - input_parameters.append(one_parameter) - # Sort to make the results deterministic. - input_parameters.sort(key=lambda x: x['name']) - - output_parameters = [] - for param in op.outputs.values(): - output_parameters.append({ - 'name': self._pipelineparam_full_name(param), - 'valueFrom': {'path': op.file_outputs[param.name]} - }) - output_parameters.sort(key=lambda x: x['name']) - - template = { - 'name': op.name, - 'container': { - 'image': op.image, - } - } - if processed_arguments: - template['container']['args'] = processed_arguments - if processed_command: - template['container']['command'] = processed_command - if input_parameters: - template['inputs'] = {'parameters': input_parameters} - - template['outputs'] = {} - if output_parameters: - template['outputs'] = {'parameters': output_parameters} - - # Generate artifact for metadata output - # The motivation of appending the minio info in the yaml - # is to specify a unique path for the metadata. - # TODO: after argo addresses the issue that configures a unique path - # for the artifact output when default artifact repository is configured, - # this part needs to be updated to use the default artifact repository. - output_artifacts = [] - output_artifacts.append(_build_conventional_artifact('mlpipeline-ui-metadata', '/mlpipeline-ui-metadata.json')) - output_artifacts.append(_build_conventional_artifact('mlpipeline-metrics', '/mlpipeline-metrics.json')) - template['outputs']['artifacts'] = output_artifacts - - # Set resources. - if op.resource_limits or op.resource_requests: - template['container']['resources'] = {} - if op.resource_limits: - template['container']['resources']['limits'] = op.resource_limits - if op.resource_requests: - template['container']['resources']['requests'] = op.resource_requests - - # Set nodeSelector. - if op.node_selector: - template['nodeSelector'] = op.node_selector - - if op.env_variables: - template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables)) - if op.volume_mounts: - template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts)) - - if op.pod_annotations or op.pod_labels: - template['metadata'] = {} - if op.pod_annotations: - template['metadata']['annotations'] = op.pod_annotations - if op.pod_labels: - template['metadata']['labels'] = op.pod_labels - - if op.num_retries: - template['retryStrategy'] = {'limit': op.num_retries} - - if op.sidecars: - def _sidecar_to_template(tsidecar): - index, sidecar = tsidecar - sidecar_template = { - 'name': '{}-{}-{}'.format(op.name, sidecar.name, index), - 'image': sidecar.image - } - sidecar_processed_arguments = self._process_args(op.arguments, op.argument_inputs) - sidecar_processed_command = self._process_args(op.command, op.argument_inputs) - if sidecar_processed_arguments: - sidecar_template['args'] = sidecar_processed_arguments - if sidecar_processed_command: - sidecar_template['command'] = sidecar_processed_command - # Set resources. - if sidecar.resource_limits or sidecar.resource_requests: - sidecar_template['resources'] = {} - if sidecar.resource_limits: - sidecar_template['resources']['limits'] = sidecar.resource_limits - if sidecar.resource_requests: - sidecar_template['resources']['requests'] = sidecar.resource_requests - # env variables - if sidecar.env_variables: - sidecar_template['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, sidecar.env_variables)) - # volume mounts - if sidecar.volume_mounts: - sidecar_template['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, sidecar.volume_mounts)) - return sidecar_template - - template['sidecars'] = list(map(_sidecar_to_template, enumerate(op.sidecars))) - - return template + return _op_to_template(op) def _group_to_template(self, group, inputs, outputs, dependencies): """Generate template given an OpsGroup. @@ -378,15 +229,14 @@ def _group_to_template(self, group, inputs, outputs, dependencies): template['inputs'] = { 'parameters': template_inputs } - # Generate outputs section. if outputs.get(group.name, None): template_outputs = [] - for param_name, depentent_name in outputs[group.name]: + for param_name, dependent_name in outputs[group.name]: template_outputs.append({ 'name': param_name, 'valueFrom': { - 'parameter': '{{tasks.%s.outputs.parameters.%s}}' % (depentent_name, param_name) + 'parameter': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) } }) template_outputs.sort(key=lambda x: x['name']) @@ -451,7 +301,7 @@ def _create_templates(self, pipeline): templates.append(self._group_to_template(g, inputs, outputs, dependencies)) for op in pipeline.ops.values(): - templates.append(self._op_to_template(op)) + templates.append(_op_to_template(op)) return templates def _create_volumes(self, pipeline): @@ -463,9 +313,9 @@ def _create_volumes(self, pipeline): for v in op.volumes: # Remove volume duplicates which have the same name #TODO: check for duplicity based on the serialized volumes instead of just name. - if v.name not in volume_name_set: - volume_name_set.add(v.name) - volumes.append(K8sHelper.convert_k8s_obj_to_json(v)) + if v['name'] not in volume_name_set: + volume_name_set.add(v['name']) + volumes.append(v) volumes.sort(key=lambda x: x['name']) return volumes @@ -570,7 +420,7 @@ def _compile(self, pipeline_func): for op in p.ops.values(): sanitized_name = K8sHelper.sanitize_k8s_name(op.name) op.name = sanitized_name - for param in op.inputs + op.argument_inputs: + for param in op.inputs: # + op.argument_inputs: param.name = K8sHelper.sanitize_k8s_name(param.name) if param.op_name: param.op_name = K8sHelper.sanitize_k8s_name(param.op_name) @@ -590,7 +440,6 @@ def _compile(self, pipeline_func): op.file_outputs = sanitized_file_outputs sanitized_ops[sanitized_name] = op p.ops = sanitized_ops - workflow = self._create_pipeline_workflow(args_list_with_defaults, p) return workflow diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index fd4953642f7..413faab5c69 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -15,7 +15,7 @@ from ._pipeline_param import PipelineParam from ._pipeline import Pipeline, pipeline, get_pipeline_conf -from ._container_op import ContainerOp, SideCar +from ._container_op import ContainerOp, Sidecar from ._ops_group import OpsGroup, ExitHandler, Condition from ._component import python_component #TODO: expose the component decorator when ready \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 7fecbb0babc..88c59eca8b1 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -12,352 +12,819 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re +from typing import Any, Dict, List, TypeVar, Union +from kubernetes.client.models import ( + V1Container, V1EnvVar, V1EnvFromSource, V1SecurityContext, V1Probe, + V1ResourceRequirements, V1VolumeDevice, V1VolumeMount, V1ContainerPort, + V1Lifecycle) from . import _pipeline from . import _pipeline_param -from ._pipeline_param import _extract_pipelineparams -from ._metadata import ComponentMeta -import re -from typing import Dict - - -class ContainerBase(object): - """Represents a container template in a pod""" - - def __init__(self, name: str, image: str): - """ - Args: - name: the name of the container specified as a DNS_LABEL. - image: the sidecar container image name, such as 'rabbitmq:latest'. - """ - self.name = name - self.image = image - self.resource_limits = {} - self.resource_requests = {} - self.volume_mounts = [] - self.env_variables = [] - - def _validate_memory_string(self, memory_string): - """Validate a given string is valid for memory request or limit.""" - - if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', memory_string) is None: - raise ValueError('Invalid memory string. Should be an integer, or integer followed ' - 'by one of "E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki"') - - def _validate_cpu_string(self, cpu_string): - "Validate a given string is valid for cpu request or limit." - - if re.match(r'^[0-9]+m$', cpu_string) is not None: - return - - try: - float(cpu_string) - except ValueError: - raise ValueError('Invalid cpu string. Should be float or integer, or integer followed ' - 'by "m".') - - def _validate_positive_number(self, str_value, param_name): - "Validate a given string is in positive integer format." - - try: - int_value = int(str_value) - except ValueError: - raise ValueError('Invalid {}. Should be integer.'.format(param_name)) - - if int_value <= 0: - raise ValueError('{} must be positive integer.'.format(param_name)) - - def add_resource_limit(self, resource_name, value): - """Add the resource limit of the container. - - Args: - resource_name: The name of the resource. It can be cpu, memory, etc. - value: The string value of the limit. - """ - - self.resource_limits[resource_name] = value - return self - - def add_resource_request(self, resource_name, value): - """Add the resource request of the container. - - Args: - resource_name: The name of the resource. It can be cpu, memory, etc. - value: The string value of the request. - """ - - self.resource_requests[resource_name] = value - return self - - def set_memory_request(self, memory): - """Set memory request (minimum) for this operator. - - Args: - memory: a string which can be a number or a number followed by one of - "E", "P", "T", "G", "M", "K". - """ - - self._validate_memory_string(memory) - return self.add_resource_request("memory", memory) - - def set_memory_limit(self, memory): - """Set memory limit (maximum) for this operator. - - Args: - memory: a string which can be a number or a number followed by one of - "E", "P", "T", "G", "M", "K". - """ - self._validate_memory_string(memory) - return self.add_resource_limit("memory", memory) - - def set_cpu_request(self, cpu): - """Set cpu request (minimum) for this operator. - Args: - cpu: A string which can be a number or a number followed by "m", which means 1/1000. - """ +# generics +T = TypeVar('T') +# type alias: either a string or a list of string +StringOrStringList = Union[str, List[str]] - self._validate_cpu_string(cpu) - return self.add_resource_request("cpu", cpu) - def set_cpu_limit(self, cpu): - """Set cpu limit (maximum) for this operator. +# util functions +def as_list(value: Any, if_none: Union[None, List] = None) -> List: + """Convert any value except None to a list if not already a list.""" + if value is None: + return if_none + return value if isinstance(value, list) else [value] - Args: - cpu: A string which can be a number or a number followed by "m", which means 1/1000. - """ - self._validate_cpu_string(cpu) - return self.add_resource_limit("cpu", cpu) +def create_and_append(current_list: Union[List[T], None], item: T) -> List[T]: + """Create a list (if needed) and appends an item to it.""" + current_list = current_list or [] + current_list.append(item) + return current_list - def set_gpu_limit(self, gpu, vendor = "nvidia"): - """Set gpu limit for the operator. This function add '.com/gpu' into resource limit. - Note that there is no need to add GPU request. GPUs are only supposed to be specified in - the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/. - Args: - gpu: A string which must be a positive number. - vendor: Optional. A string which is the vendor of the requested gpu. The supported values - are: 'nvidia' (default), and 'amd'. - """ - - self._validate_positive_number(gpu, 'gpu') - if vendor != 'nvidia' and vendor != 'amd': - raise ValueError('vendor can only be nvidia or amd.') - - return self.add_resource_limit("%s.com/gpu" % vendor, gpu) - - def add_volume_mount(self, volume_mount): - """Add volume to the container - - Args: - volume_mount: Kubernetes volume mount - For detailed spec, check volume mount definition - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_mount.py - """ - - self.volume_mounts.append(volume_mount) - return self - - def add_env_variable(self, env_variable): - """Add environment variable to the container. - - Args: - env_variable: Kubernetes environment variable - For detailed spec, check environment variable definition - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var.py +class Container(V1Container): """ + A wrapper over k8s container definition object (io.k8s.api.core.v1.Container), + which is used to represent the `container` property in argo's workflow + template (io.argoproj.workflow.v1alpha1.Template). + + `Container` class also comes with utility functions to set and update the + the various properties for a k8s container definition. + + NOTE: A notable difference is that `name` is not required and will not be + processed for `Container` (in contrast to `V1Container` where `name` is a + required property). + + See: + - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container.py + - https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json + + + Example: - self.env_variables.append(env_variable) - return self + from kfp.dsl import ContainerOp + from kubernetes.client.models import V1EnvVar + + # creates a operation + op = ContainerOp(name='bash-ops', i + mage='busybox:latest', + command=['echo'], + arguments=['$MSG']) -class SideCar(ContainerBase): - """Represents a sidecar container to be provisioned together with a docker container image for ContainerOp.""" + # returns a `Container` object from `ContainerOp` + # and add an environment variable to `Container` + op.container.add_env_variable(V1EnvVar(name='MSG', value='hello world')) - def __init__(self, name: str, image: str, command: str=None, arguments: str=None): """ - Args: - name: the name of the sidecar. - image: the sidecar container image name, such as 'rabbitmq:latest' - command: the command to run in the container. - If None, uses default CMD in defined in container. - arguments: the arguments of the command. The command can include "%s" and supply - a PipelineParam as the string replacement. For example, ('echo %s' % input_param). - At container run time the argument will be 'echo param_value'. """ - # name will be generated when attached to ContainerOps - super().__init__(name, image) - self.command = command - self.arguments = arguments - - matches = [] - for arg in (command or []) + (arguments or []): - match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg)) - matches += match - - self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2]) - for x in list(set(matches))] - - -class ContainerOp(ContainerBase): - """Represents an op implemented by a docker container image.""" - - def __init__(self, name: str, image: str, command: str=None, arguments: str=None, - file_outputs : Dict[str, str]=None, is_exit_handler=False): - """Create a new instance of ContainerOp. - - Args: - name: the name of the op. It does not have to be unique within a pipeline - because the pipeline will generates a unique new name in case of conflicts. - image: the container image name, such as 'python:3.5-jessie' - command: the command to run in the container. - If None, uses default CMD in defined in container. - arguments: the arguments of the command. The command can include "%s" and supply - a PipelineParam as the string replacement. For example, ('echo %s' % input_param). - At container run time the argument will be 'echo param_value'. - file_outputs: Maps output labels to local file paths. At pipeline run time, - the value of a PipelineParam is saved to its corresponding local file. It's - one way for outside world to receive outputs of the container. - is_exit_handler: Whether it is used as an exit handler. + Attributes: + swagger_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. """ - if not _pipeline.Pipeline.get_default_pipeline(): - raise ValueError('Default pipeline not defined.') - - valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' - if not re.match(valid_name_regex, name): - raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name)) - - # human_name must exist to construct containerOps name - self.human_name = name - super().__init__(_pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler), image) - - self.command = command - self.arguments = arguments - self.is_exit_handler = is_exit_handler - self.node_selector = {} - self.volumes = [] - self.pod_annotations = {} - self.pod_labels = {} - self.num_retries = 0 - self.sidecars = [] - - matches = [] - for arg in (command or []) + (arguments or []): - match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg)) - matches += match - - self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2]) - for x in list(set(matches))] - self.file_outputs = file_outputs - self.dependent_op_names = [] - - self.inputs = [] - if self.argument_inputs: - self.inputs += self.argument_inputs - - self.outputs = {} - if file_outputs: - self.outputs = {name: _pipeline_param.PipelineParam(name, op_name=self.name) - for name in file_outputs.keys()} - - self.output=None - if len(self.outputs) == 1: - self.output = list(self.outputs.values())[0] - - def apply(self, mod_func): - """Applies a modifier function to self. The function should return the passed object. - This is needed to chain "extention methods" to this class. + # remove `name` from swagger_types so `name` is not generated in the JSON + swagger_types = { + key: value + for key, value in V1Container.swagger_types.items() if key != 'name' + } + attribute_map = { + key: value + for key, value in V1Container.attribute_map.items() if key != 'name' + } + + def __init__(self, image: str, command: List[str], args: List[str], + **kwargs): + """Creates a new instance of `Container`. + + Args: + image {str}: image to use, e.g. busybox:latest + command {List[str]}: entrypoint array. Not executed within a shell. + args {List[str]}: arguments to entrypoint. + **kwargs: keyword arguments for `V1Container` + """ + # set name to '' if name is not provided + # k8s container MUST have a name + # argo workflow template does not need a name for container def + if not kwargs.get('name'): + kwargs['name'] = '' + + super(Container, self).__init__( + image=image, command=command, args=args, **kwargs) + + def _validate_memory_string(self, memory_string): + """Validate a given string is valid for memory request or limit.""" + + if isinstance(memory_string, _pipeline_param.PipelineParam): + if memory_string.value: + memory_string = memory_string.value + else: + return + + if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', + memory_string) is None: + raise ValueError( + 'Invalid memory string. Should be an integer, or integer followed ' + 'by one of "E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki"') + + def _validate_cpu_string(self, cpu_string): + "Validate a given string is valid for cpu request or limit." + + if isinstance(cpu_string, _pipeline_param.PipelineParam): + if cpu_string.value: + cpu_string = cpu_string.value + else: + return + + if re.match(r'^[0-9]+m$', cpu_string) is not None: + return + + try: + float(cpu_string) + except ValueError: + raise ValueError( + 'Invalid cpu string. Should be float or integer, or integer followed ' + 'by "m".') + + def _validate_positive_number(self, str_value, param_name): + "Validate a given string is in positive integer format." + + if isinstance(str_value, _pipeline_param.PipelineParam): + if str_value.value: + str_value = str_value.value + else: + return + + try: + int_value = int(str_value) + except ValueError: + raise ValueError( + 'Invalid {}. Should be integer.'.format(param_name)) + + if int_value <= 0: + raise ValueError('{} must be positive integer.'.format(param_name)) + + def add_resource_limit(self, resource_name, value): + """Add the resource limit of the container. + + Args: + resource_name: The name of the resource. It can be cpu, memory, etc. + value: The string value of the limit. + """ + + self.resources = self.resources or V1ResourceRequirements() + self.resources.limits = self.resources.limits or {} + self.resources.limits.update({resource_name: value}) + return self + + def add_resource_request(self, resource_name, value): + """Add the resource request of the container. + + Args: + resource_name: The name of the resource. It can be cpu, memory, etc. + value: The string value of the request. + """ + + self.resources = self.resources or V1ResourceRequirements() + self.resources.requests = self.resources.requests or {} + self.resources.requests.update({resource_name: value}) + return self + + def set_memory_request(self, memory): + """Set memory request (minimum) for this operator. + + Args: + memory: a string which can be a number or a number followed by one of + "E", "P", "T", "G", "M", "K". + """ + + self._validate_memory_string(memory) + return self.add_resource_request("memory", memory) + + def set_memory_limit(self, memory): + """Set memory limit (maximum) for this operator. + + Args: + memory: a string which can be a number or a number followed by one of + "E", "P", "T", "G", "M", "K". + """ + self._validate_memory_string(memory) + return self.add_resource_limit("memory", memory) + + def set_cpu_request(self, cpu): + """Set cpu request (minimum) for this operator. + + Args: + cpu: A string which can be a number or a number followed by "m", which means 1/1000. + """ + + self._validate_cpu_string(cpu) + return self.add_resource_request("cpu", cpu) + + def set_cpu_limit(self, cpu): + """Set cpu limit (maximum) for this operator. + + Args: + cpu: A string which can be a number or a number followed by "m", which means 1/1000. + """ + + self._validate_cpu_string(cpu) + return self.add_resource_limit("cpu", cpu) + + def set_gpu_limit(self, gpu, vendor="nvidia"): + """Set gpu limit for the operator. This function add '.com/gpu' into resource limit. + Note that there is no need to add GPU request. GPUs are only supposed to be specified in + the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/. + + Args: + gpu: A string which must be a positive number. + vendor: Optional. A string which is the vendor of the requested gpu. The supported values + are: 'nvidia' (default), and 'amd'. + """ + + self._validate_positive_number(gpu, 'gpu') + if vendor != 'nvidia' and vendor != 'amd': + raise ValueError('vendor can only be nvidia or amd.') + + return self.add_resource_limit("%s.com/gpu" % vendor, gpu) + + def add_volume_mount(self, volume_mount): + """Add volume to the container + + Args: + volume_mount: Kubernetes volume mount + For detailed spec, check volume mount definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_mount.py + """ + + if not isinstance(volume_mount, V1VolumeMount): + raise ValueError( + 'invalid argument. Must be of instance `V1VolumeMount`.') - Example: - from kfp.gcp import use_gcp_secret - task = ( - train_op(...) - .set_memory_request('1GB') - .apply(use_gcp_secret('user-gcp-sa')) - .set_memory_limit('2GB') - ) + self.volume_mounts = create_and_append(self.volume_mounts, + volume_mount) + return self + + def add_volume_devices(self, volume_device): + """ + Add a block device to be used by the container. + + Args: + volume_device: Kubernetes volume device + For detailed spec, volume device definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_device.py + """ + + if not isinstance(volume_device, V1VolumeDevice): + raise ValueError( + 'invalid argument. Must be of instance `V1VolumeDevice`.') + + self.volume_devices = create_and_append(self.volume_devices, + volume_device) + return self + + def add_env_variable(self, env_variable): + """Add environment variable to the container. + + Args: + env_variable: Kubernetes environment variable + For detailed spec, check environment variable definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var.py + """ + + if not isinstance(env_variable, V1EnvVar): + raise ValueError( + 'invalid argument. Must be of instance `V1EnvVar`.') + + self.env = create_and_append(self.env, env_variable) + return self + + def add_env_from(self, env_from): + """Add a source to populate environment variables int the container. + + Args: + env_from: Kubernetes environment from source + For detailed spec, check environment from source definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var_source.py + """ + + if not isinstance(env_from, V1EnvFromSource): + raise ValueError( + 'invalid argument. Must be of instance `V1EnvFromSource`.') + + self.env_from = create_and_append(self.env_from, env_from) + return self + + def set_image_pull_policy(self, image_pull_policy): + """Set image pull policy for the container. + + Args: + image_pull_policy: One of `Always`, `Never`, `IfNotPresent`. + """ + if image_pull_policy not in ['Always', 'Never', 'IfNotPresent']: + raise ValueError( + 'Invalid imagePullPolicy. Must be one of `Always`, `Never`, `IfNotPresent`.' + ) + + self.image_pull_policy = image_pull_policy + return self + + def add_port(self, container_port): + """Add a container port to the container. + + Args: + container_port: Kubernetes container port + For detailed spec, check container port definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container_port.py + """ + + if not isinstance(container_port, V1ContainerPort): + raise ValueError( + 'invalid argument. Must be of instance `V1ContainerPort`.') + + self.ports = create_and_append(self.ports, container_port) + return self + + def set_security_context(self, security_context): + """Set security configuration to be applied on the container. + + Args: + security_context: Kubernetes security context + For detailed spec, check security context definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_security_context.py + """ + + if not isinstance(security_context, V1SecurityContext): + raise ValueError( + 'invalid argument. Must be of instance `V1SecurityContext`.') + + self.security_context = security_context + return self + + def set_stdin(self, stdin=True): + """ + Whether this container should allocate a buffer for stdin in the container + runtime. If this is not set, reads from stdin in the container will always + result in EOF. + + Args: + stdin: boolean flag + """ + + self.stdin = stdin + return self + + def set_stdin_once(self, stdin_once=True): + """ + Whether the container runtime should close the stdin channel after it has + been opened by a single attach. When stdin is true the stdin stream will + remain open across multiple attach sessions. If stdinOnce is set to true, + stdin is opened on container start, is empty until the first client attaches + to stdin, and then remains open and accepts data until the client + disconnects, at which time stdin is closed and remains closed until the + container is restarted. If this flag is false, a container processes that + reads from stdin will never receive an EOF. + + Args: + stdin_once: boolean flag + """ + + self.stdin_once = stdin_once + return self + + def set_termination_message_path(self, termination_message_path): + """ + Path at which the file to which the container's termination message will be + written is mounted into the container's filesystem. Message written is + intended to be brief final status, such as an assertion failure message. + Will be truncated by the node if greater than 4096 bytes. The total message + length across all containers will be limited to 12kb. + + Args: + termination_message_path: path for the termination message + """ + self.termination_message_path = termination_message_path + return self + + def set_termination_message_policy(self, termination_message_policy): + """ + Indicate how the termination message should be populated. File will use the + contents of terminationMessagePath to populate the container status message + on both success and failure. FallbackToLogsOnError will use the last chunk + of container log output if the termination message file is empty and the + container exited with an error. The log output is limited to 2048 bytes or + 80 lines, whichever is smaller. + + Args: + termination_message_policy: `File` or `FallbackToLogsOnError` + """ + if termination_message_policy not in ['File', 'FallbackToLogsOnError']: + raise ValueError( + 'terminationMessagePolicy must be `File` or `FallbackToLogsOnError`' + ) + self.termination_message_policy = termination_message_policy + return self + + def set_tty(self, tty=True): + """ + Whether this container should allocate a TTY for itself, also requires + 'stdin' to be true. + + Args: + tty: boolean flag + """ + + self.tty = tty + return self + + def set_readiness_probe(self, readiness_probe): + """ + Set a readiness probe for the container. + + Args: + readiness_probe: Kubernetes readiness probe + For detailed spec, check probe definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_probe.py + """ + + if not isinstance(readiness_probe, V1Probe): + raise ValueError( + 'invalid argument. Must be of instance `V1Probe`.') + + self.readiness_probe = readiness_probe + return self + + def set_liveness_probe(self, liveness_probe): + """ + Set a liveness probe for the container. + + Args: + liveness_probe: Kubernetes liveness probe + For detailed spec, check probe definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_probe.py + """ + + if not isinstance(liveness_probe, V1Probe): + raise ValueError( + 'invalid argument. Must be of instance `V1Probe`.') + + self.liveness_probe = liveness_probe + return self + + def set_lifecycle(self, lifecycle): + """ + Setup a lifecycle config for the container. + + Args: + lifecycle: Kubernetes lifecycle + For detailed spec, lifecycle definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_lifecycle.py + """ + + if not isinstance(lifecycle, V1Lifecycle): + raise ValueError( + 'invalid argument. Must be of instance `V1Lifecycle`.') + + self.lifecycle = lifecycle + return self + + +class Sidecar(Container): """ - return mod_func(self) + Represents an argo workflow sidecar (io.argoproj.workflow.v1alpha1.Sidecar) + to be used in `sidecars` property in argo's workflow template + (io.argoproj.workflow.v1alpha1.Template). - def after(self, op): - """Specify explicit dependency on another op.""" - self.dependent_op_names.append(op.name) - return self + `Sidecar` inherits from `Container` class with an addition of `mirror_volume_mounts` + attribute (`mirrorVolumeMounts` property). - def add_volume(self, volume): - """Add K8s volume to the container + See https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json - Args: - volume: Kubernetes volumes - For detailed spec, check volume definition - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py - """ - - self.volumes.append(volume) - return self - - def add_node_selector_constraint(self, label_name, value): - """Add a constraint for nodeSelector. Each constraint is a key-value pair label. For the - container to be eligible to run on a node, the node must have each of the constraints appeared - as labels. - - Args: - label_name: The name of the constraint label. - value: The value of the constraint label. - """ - - self.node_selector[label_name] = value - return self - - def add_pod_annotation(self, name: str, value: str): - """Adds a pod's metadata annotation. + Example - Args: - name: The name of the annotation. - value: The value of the annotation. - """ + from kfp.dsl import ContainerOp, Sidecar - self.pod_annotations[name] = value - return self - def add_pod_label(self, name: str, value: str): - """Adds a pod's metadata label. + # creates a `ContainerOp` and adds a redis `Sidecar` + op = (ContainerOp(name='foo-op', image='busybox:latest') + .add_sidecar( + Sidecar(name='redis', image='redis:alpine'))) - Args: - name: The name of the label. - value: The value of the label. """ - - self.pod_labels[name] = value - return self - - def set_retry(self, num_retries: int): - """Sets the number of times the task is retried until it's declared failed. - - Args: - num_retries: Number of times to retry on failures. """ - - self.num_retries = num_retries - return self - - def add_sidecar(self, sidecar: SideCar): - """Add a sidecar to the ContainerOps. - - Args: - sidecar: SideCar object. + Attributes: + swagger_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. """ - self.sidecars.append(sidecar) - - def __repr__(self): - return str({self.__class__.__name__: self.__dict__}) - - def _set_metadata(self, metadata): - '''_set_metadata passes the containerop the metadata information - Args: - metadata (ComponentMeta): component metadata - ''' - if not isinstance(metadata, ComponentMeta): - raise ValueError('_set_medata is expecting ComponentMeta.') - self._metadata = metadata \ No newline at end of file + # adds `mirror_volume_mounts` to `Sidecar` swagger definition + # NOTE inherits definition from `V1Container` rather than `Container` + # because `Container` has no `name` property. + swagger_types = dict( + **V1Container.swagger_types, mirror_volume_mounts='bool') + + attribute_map = dict( + **V1Container.attribute_map, mirror_volume_mounts='mirrorVolumeMounts') + + def __init__(self, + name: str, + image: str, + command: StringOrStringList = None, + args: StringOrStringList = None, + mirror_volume_mounts: bool = None, + **kwargs): + """Creates a new instance of `Sidecar`. + + Args: + name {str}: unique name for the sidecar container + image {str}: image to use for the sidecar container, e.g. redis:alpine + command {StringOrStringList}: entrypoint array. Not executed within a shell. + args {StringOrStringList}: arguments to the entrypoint. + mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same + volumes specified in the main container to the sidecar (including artifacts), + at the same mountPaths. This enables dind daemon to partially see the same + filesystem as the main container in order to use features such as docker + volume binding + **kwargs: keyword arguments available for `Container` + + """ + super().__init__( + name=name, + image=image, + command=as_list(command), + args=as_list(args), + **kwargs) + + self.mirror_volume_mounts = mirror_volume_mounts + + def set_mirror_volume_mounts(self, mirror_volume_mounts=True): + """ + Setting mirrorVolumeMounts to true will mount the same volumes specified + in the main container to the sidecar (including artifacts), at the same + mountPaths. This enables dind daemon to partially see the same filesystem + as the main container in order to use features such as docker volume + binding. + + Args: + mirror_volume_mounts: boolean flag + """ + + self.mirror_volume_mounts = mirror_volume_mounts + return self + + @property + def inputs(self): + """A list of PipelineParam found in the Sidecar object.""" + return _pipeline_param.extract_pipelineparams_from_any(self) + + +class ContainerOp(object): + """Represents an op implemented by a container image.""" + + # list of attributes that might have pipeline params - used to generate + # the input parameters during compilation. + # Excludes `file_outputs` and `outputs` as they are handled separately + # in the compilation process to generate the DAGs and task io parameters. + attrs_with_pipelineparams = [ + '_container', 'node_selector', 'volumes', 'pod_annotations', + 'pod_labels', 'num_retries', 'sidecars' + ] + + def __init__(self, + name: str, + image: str, + command: StringOrStringList = None, + arguments: StringOrStringList = None, + file_outputs: Dict[str, str] = None, + is_exit_handler=False): + """Create a new instance of ContainerOp. + + Args: + name: the name of the op. It does not have to be unique within a pipeline + because the pipeline will generates a unique new name in case of conflicts. + image: the container image name, such as 'python:3.5-jessie' + command: the command to run in the container. + If None, uses default CMD in defined in container. + arguments: the arguments of the command. The command can include "%s" and supply + a PipelineParam as the string replacement. For example, ('echo %s' % input_param). + At container run time the argument will be 'echo param_value'. + file_outputs: Maps output labels to local file paths. At pipeline run time, + the value of a PipelineParam is saved to its corresponding local file. It's + one way for outside world to receive outputs of the container. + is_exit_handler: Whether it is used as an exit handler. + """ + if not _pipeline.Pipeline.get_default_pipeline(): + raise ValueError('Default pipeline not defined.') + + valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' + if not re.match(valid_name_regex, name): + raise ValueError( + 'Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' + % (name)) + + # convert to list if not a list + command = as_list(command) + arguments = as_list(arguments) + + # human_name must exist to construct containerOps name + self.human_name = name + # actual name for argo workflow + self.name = _pipeline.Pipeline.get_default_pipeline().add_op( + self, is_exit_handler) + + # `container` prop in `io.argoproj.workflow.v1alpha1.Template` + self._container = Container( + image=image, args=arguments, command=command) + # for chaining, and returning back to `ContainerOp` when updating `Container` + # i.e + # op._container.set_image_policy('Always').parent == op # True + setattr(self._container, "parent", self) + + # NOTE for backward compatibility (remove in future?) + # proxy old ContainerOp callables to Container + + # attributes to NOT proxy + ignore_set = frozenset(['to_dict', 'to_str']) + + # decorator func to proxy a method in `Container` into `ContainerOp` + def _proxy(proxy_attr): + """Decorator func to proxy to ContainerOp.container""" + + def _decorated(*args, **kwargs): + ret = getattr(self._container, proxy_attr)(*args, **kwargs) + if ret == self._container: + return self + return ret + + return _decorated + + # iter thru container and attach a proxy func to the container method + for attr_to_proxy in dir(self._container): + func = getattr(self._container, attr_to_proxy) + # only proxy public callables + if hasattr(func, + '__call__') and (attr_to_proxy[0] != '_' + ) and attr_to_proxy not in ignore_set: + setattr(self, attr_to_proxy, _proxy(attr_to_proxy)) + + # TODO: proper k8s definitions so that `convert_k8s_obj_to_json` can be used? + # `io.argoproj.workflow.v1alpha1.Template` properties + self.node_selector = {} + self.volumes = [] + self.pod_annotations = {} + self.pod_labels = {} + self.num_retries = 0 + self.sidecars = [] + + # attributes specific to `ContainerOp` + self._inputs = [] + self.file_outputs = file_outputs + self.dependent_op_names = [] + self.is_exit_handler = is_exit_handler + + self.outputs = {} + if file_outputs: + self.outputs = { + name: _pipeline_param.PipelineParam(name, op_name=self.name) + for name in file_outputs.keys() + } + + self.output = None + if len(self.outputs) == 1: + self.output = list(self.outputs.values())[0] + + @property + def inputs(self): + """List of PipelineParams that will be converted into input parameters + (io.argoproj.workflow.v1alpha1.Inputs) for the argo workflow. + """ + # iterate thru and extract all the `PipelineParam` in `ContainerOp` when + # called the 1st time (because there are in-place updates to `PipelineParam` + # during compilation - remove in-place updates for easier debugging?) + if not self._inputs: + self._inputs = [] + # TODO replace with proper k8s obj? + for key in self.attrs_with_pipelineparams: + self._inputs += [ + param for param in _pipeline_param. + extract_pipelineparams_from_any(getattr(self, key)) + ] + return self._inputs + + @inputs.setter + def inputs(self, value): + # to support in-place updates + self._inputs = value + + @property + def container(self): + """`Container` object that represents the `container` property in + `io.argoproj.workflow.v1alpha1.Template`. Can be used to update the + container configurations. + + Example: + import kfp.dsl as dsl + from kubernetes.client.models import V1EnvVar + + @dsl.pipeline(name='example_pipeline') + def immediate_value_pipeline(): + op1 = (dsl.ContainerOp(name='example', image='nginx:alpine') + .container + .add_env_variable(V1EnvVar(name='HOST', value='foo.bar')) + .add_env_variable(V1EnvVar(name='PORT', value='80')) + .parent # return the parent `ContainerOp` + ) + """ + return self._container + + def apply(self, mod_func): + """Applies a modifier function to self. The function should return the passed object. + This is needed to chain "extention methods" to this class. + + Example: + from kfp.gcp import use_gcp_secret + task = ( + train_op(...) + .set_memory_request('1GB') + .apply(use_gcp_secret('user-gcp-sa')) + .set_memory_limit('2GB') + ) + """ + return mod_func(self) + + def after(self, op): + """Specify explicit dependency on another op.""" + self.dependent_op_names.append(op.name) + return self + + def add_volume(self, volume): + """Add K8s volume to the container + + Args: + volume: Kubernetes volumes + For detailed spec, check volume definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py + """ + self.volumes.append(volume) + return self + + def add_node_selector_constraint(self, label_name, value): + """Add a constraint for nodeSelector. Each constraint is a key-value pair label. For the + container to be eligible to run on a node, the node must have each of the constraints appeared + as labels. + + Args: + label_name: The name of the constraint label. + value: The value of the constraint label. + """ + + self.node_selector[label_name] = value + return self + + def add_pod_annotation(self, name: str, value: str): + """Adds a pod's metadata annotation. + + Args: + name: The name of the annotation. + value: The value of the annotation. + """ + + self.pod_annotations[name] = value + return self + + def add_pod_label(self, name: str, value: str): + """Adds a pod's metadata label. + + Args: + name: The name of the label. + value: The value of the label. + """ + + self.pod_labels[name] = value + return self + + def set_retry(self, num_retries: int): + """Sets the number of times the task is retried until it's declared failed. + + Args: + num_retries: Number of times to retry on failures. + """ + + self.num_retries = num_retries + return self + + def add_sidecar(self, sidecar: Sidecar): + """Add a sidecar to the ContainerOps. + + Args: + sidecar: SideCar object. + """ + + self.sidecars.append(sidecar) + return self + + def __repr__(self): + return str({self.__class__.__name__: self.__dict__}) diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index a2c0d67714c..27cfd831d57 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -14,14 +14,13 @@ import re -from collections import namedtuple - - +from collections import namedtuple, Iterable +from typing import List # TODO: Move this to a separate class # For now, this identifies a condition with only "==" operator supported. ConditionOperator = namedtuple('ConditionOperator', 'operator operand1 operand2') -def _extract_pipelineparams(payloads: str or list[str]): +def _extract_pipelineparams(payloads: str or List[str]): """_extract_pipelineparam extract a list of PipelineParam instances from the payload string. Note: this function removes all duplicate matches. @@ -37,6 +36,54 @@ def _extract_pipelineparams(payloads: str or list[str]): matches += re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', payload) return [PipelineParam(x[1], x[0], x[2]) for x in list(set(matches))] + +def extract_pipelineparams_from_any(payload) -> List['PipelineParam']: + """Recursively extract PipelineParam instances or serialized string from any object or list of objects. + + Args: + payload (str or k8_obj or list[str or k8_obj]): a string/a list + of strings that contains serialized pipelineparams or a k8 definition + object. + Return: + List[PipelineParam] + """ + if not payload: + return [] + + # PipelineParam + if isinstance(payload, PipelineParam): + return [payload] + + # str + if isinstance(payload, str): + return _extract_pipelineparams(payload) + + # list or tuple or iterable + if isinstance(payload, list) or isinstance(payload, tuple) or isinstance(payload, Iterable): + pipeline_params = [] + for item in payload: + pipeline_params += extract_pipelineparams_from_any(item) + return list(set(pipeline_params)) + + # dict + if isinstance(payload, dict): + pipeline_params = [] + for item in payload.values(): + pipeline_params += extract_pipelineparams_from_any(item) + return list(set(pipeline_params)) + + # k8s object + if hasattr(payload, 'swagger_types') and isinstance(payload.swagger_types, dict): + pipeline_params = [] + for key in payload.swagger_types.keys(): + pipeline_params += extract_pipelineparams_from_any(getattr(payload, key)) + + return list(set(pipeline_params)) + + # return empty list + return [] + + class PipelineParam(object): """Representing a future value that is passed between pipeline components. @@ -66,9 +113,19 @@ def __init__(self, name: str, op_name: str=None, value: str=None): if op_name and value: raise ValueError('op_name and value cannot be both set.') - self.op_name = op_name self.name = name - self.value = value + # ensure value is None even if empty string or empty list + # so that serialization and unserialization remain consistent + # (i.e. None => '' => None) + self.op_name = op_name if op_name else None + self.value = value if value else None + + @property + def full_name(self): + """Unique name in the argo yaml for the PipelineParam""" + if self.op_name: + return self.op_name + '-' + self.name + return self.name def __str__(self): """String representation. diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 97218e0fd2a..de7e2534457 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -203,7 +203,6 @@ def _test_py_compile(self, file_base_name): with open(os.path.join(test_data_dir, file_base_name + '.yaml'), 'r') as f: golden = yaml.load(f) compiled = self._get_yaml_from_tar(target_tar) - self.maxDiff = None self.assertEqual(golden, compiled) finally: @@ -213,6 +212,10 @@ def test_py_compile_basic(self): """Test basic sequential pipeline.""" self._test_py_compile('basic') + def test_py_compile_with_sidecars_and_pipelineparams(self): + """Test pipeline with_sidecars and pipelineparams in any k8s attributes.""" + self._test_py_compile('with_sidecars_and_pipelineparams') + def test_py_compile_condition(self): """Test a pipeline with conditions.""" self._test_py_compile('coin') diff --git a/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py new file mode 100644 index 00000000000..0c06ae22a26 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py @@ -0,0 +1,111 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +import kfp.gcp as gcp + + +class GetFrequentWordOp(dsl.ContainerOp): + """A get frequent word class representing a component in ML Pipelines. + + The class provides a nice interface to users by hiding details such as container, + command, arguments. + """ + + def __init__(self, name, message): + """Args: + name: An identifier of the step which needs to be unique within a pipeline. + message: a dsl.PipelineParam object representing an input message. + """ + super(GetFrequentWordOp, self).__init__( + name=name, + image='python:3.5-jessie', + command=['sh', '-c'], + arguments=[ + 'python -c "from collections import Counter; ' + 'words = Counter(\'%s\'.split()); print(max(words, key=words.get))" ' + '| tee /tmp/message.txt' % message + ], + file_outputs={'word': '/tmp/message.txt'}) + + +class SaveMessageOp(dsl.ContainerOp): + """A class representing a component in ML Pipelines. + + It saves a message to a given output_path. + """ + + def __init__(self, name, message, output_path): + """Args: + name: An identifier of the step which needs to be unique within a pipeline. + message: a dsl.PipelineParam object representing the message to be saved. + output_path: a dsl.PipelineParam object representing the GCS path for output file. + """ + super(SaveMessageOp, self).__init__( + name=name, + image='google/cloud-sdk', + command=['sh', '-c'], + arguments=[ + 'echo %s | tee /tmp/results.txt | gsutil cp /tmp/results.txt %s' + % (message, output_path) + ]) + + +class ExitHandlerOp(dsl.ContainerOp): + """A class representing a component in ML Pipelines. + """ + + def __init__(self, name): + super(ExitHandlerOp, self).__init__( + name=name, + image='python:3.5-jessie', + command=['sh', '-c'], + arguments=['echo exit!']) + + +@dsl.pipeline( + name='Save Most Frequent', + description='Get Most Frequent Word and Save to GCS') +def save_most_frequent_word(message: str, + outputpath: str, + cpu_limit='0.5', + gpu_limit='2', + mirror=True, + sidecar_image_tag='latest'): + """A pipeline function describing the orchestration of the workflow.""" + + exit_op = ExitHandlerOp('exiting') + with dsl.ExitHandler(exit_op): + counter = GetFrequentWordOp(name='get-Frequent', message=message) + counter.set_memory_request('200M') + + saver = SaveMessageOp( + name='save', message=counter.output, output_path=outputpath) + + # update k8s container definition with pipeline params + (saver.container + .set_cpu_limit(cpu_limit) + .set_gpu_limit(gpu_limit) + .set_image_pull_policy("Always")) + + saver.add_node_selector_constraint('cloud.google.com/gke-accelerator', + 'nvidia-tesla-k80') + saver.apply( + gcp.use_tpu(tpu_cores=8, tpu_resource='v2', tf_version='1.12')) + + # add sidecar with str-based PipelineParam, as well as PipelineParam to k8s properties + saver.add_sidecar( + dsl.Sidecar('busybox', + 'busybox:%s' % sidecar_image_tag) + .set_mirror_volume_mounts(mirror)) diff --git a/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml new file mode 100644 index 00000000000..34c85b2ace4 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml @@ -0,0 +1,227 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: argoproj.io/v1alpha1 +spec: + entrypoint: save-most-frequent + onExit: exiting + arguments: + parameters: + - name: message + - name: outputpath + - name: cpu-limit + value: '0.5' + - name: gpu-limit + value: '2' + - name: mirror + value: 'True' + - name: sidecar-image-tag + value: latest + serviceAccountName: pipeline-runner + templates: + - name: exit-handler-1 + inputs: + parameters: + - name: message + - name: mirror + - name: outputpath + - name: sidecar-image-tag + dag: + tasks: + - arguments: + parameters: + - name: message + value: "{{inputs.parameters.message}}" + template: get-frequent + name: get-frequent + - dependencies: + - get-frequent + arguments: + parameters: + - name: get-frequent-word + value: "{{tasks.get-frequent.outputs.parameters.get-frequent-word}}" + - name: mirror + value: "{{inputs.parameters.mirror}}" + - name: outputpath + value: "{{inputs.parameters.outputpath}}" + - name: sidecar-image-tag + value: "{{inputs.parameters.sidecar-image-tag}}" + template: save + name: save + - outputs: + artifacts: + - path: "/mlpipeline-ui-metadata.json" + name: mlpipeline-ui-metadata + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + - path: "/mlpipeline-metrics.json" + name: mlpipeline-metrics + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + container: + command: + - sh + - "-c" + image: python:3.5-jessie + args: + - echo exit! + name: exiting + - outputs: + artifacts: + - path: "/mlpipeline-ui-metadata.json" + name: mlpipeline-ui-metadata + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + - path: "/mlpipeline-metrics.json" + name: mlpipeline-metrics + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + parameters: + - valueFrom: + path: "/tmp/message.txt" + name: get-frequent-word + container: + command: + - sh + - "-c" + image: python:3.5-jessie + resources: + requests: + memory: 200M + args: + - python -c "from collections import Counter; words = Counter('{{inputs.parameters.message}}'.split()); + print(max(words, key=words.get))" | tee /tmp/message.txt + inputs: + parameters: + - name: message + name: get-frequent + - sidecars: + - mirrorVolumeMounts: "{{inputs.parameters.mirror}}" + image: busybox:{{inputs.parameters.sidecar-image-tag}} + name: busybox + metadata: + annotations: + tf-version.cloud-tpus.google.com: '1.12' + inputs: + parameters: + - name: get-frequent-word + - name: mirror + - name: outputpath + - name: sidecar-image-tag + container: + imagePullPolicy: Always + command: + - sh + - "-c" + resources: + limits: + cloud-tpus.google.com/v2: '8' + nvidia.com/gpu: "{{inputs.parameters.gpu-limit}}" + cpu: "{{inputs.parameters.cpu-limit}}" + image: google/cloud-sdk + args: + - echo {{inputs.parameters.get-frequent-word}} | tee /tmp/results.txt | gsutil + cp /tmp/results.txt {{inputs.parameters.outputpath}} + nodeSelector: + cloud.google.com/gke-accelerator: nvidia-tesla-k80 + outputs: + artifacts: + - path: "/mlpipeline-ui-metadata.json" + name: mlpipeline-ui-metadata + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + - path: "/mlpipeline-metrics.json" + name: mlpipeline-metrics + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + name: save + - name: save-most-frequent + inputs: + parameters: + - name: message + - name: mirror + - name: outputpath + - name: sidecar-image-tag + dag: + tasks: + - arguments: + parameters: + - name: message + value: "{{inputs.parameters.message}}" + - name: mirror + value: "{{inputs.parameters.mirror}}" + - name: outputpath + value: "{{inputs.parameters.outputpath}}" + - name: sidecar-image-tag + value: "{{inputs.parameters.sidecar-image-tag}}" + template: exit-handler-1 + name: exit-handler-1 + - name: exiting + template: exiting +kind: Workflow +metadata: + generateName: save-most-frequent- diff --git a/sdk/python/tests/dsl/container_op_tests.py b/sdk/python/tests/dsl/container_op_tests.py index 957694d59c4..8f37df3ac21 100644 --- a/sdk/python/tests/dsl/container_op_tests.py +++ b/sdk/python/tests/dsl/container_op_tests.py @@ -13,7 +13,7 @@ # limitations under the License. -from kfp.dsl import Pipeline, PipelineParam, ContainerOp +from kfp.dsl import Pipeline, PipelineParam, ContainerOp, Sidecar import unittest class TestContainerOp(unittest.TestCase): @@ -23,14 +23,19 @@ def test_basic(self): with Pipeline('somename') as p: param1 = PipelineParam('param1') param2 = PipelineParam('param2') - op1 = ContainerOp(name='op1', image='image', + op1 = (ContainerOp(name='op1', image='image', arguments=['%s hello %s %s' % (param1, param2, param1)], file_outputs={'out1': '/tmp/b'}) + .add_sidecar(Sidecar(name='sidecar1', image='image1')) + .add_sidecar(Sidecar(name='sidecar2', image='image2'))) self.assertCountEqual([x.name for x in op1.inputs], ['param1', 'param2']) self.assertCountEqual(list(op1.outputs.keys()), ['out1']) self.assertCountEqual([x.op_name for x in op1.outputs.values()], ['op1']) self.assertEqual(op1.output.name, 'out1') + self.assertCountEqual([sidecar.name for sidecar in op1.sidecars], ['sidecar1', 'sidecar2']) + self.assertCountEqual([sidecar.image for sidecar in op1.sidecars], ['image1', 'image2']) + def test_after_op(self): """Test duplicate ops.""" diff --git a/sdk/python/tests/dsl/pipeline_param_tests.py b/sdk/python/tests/dsl/pipeline_param_tests.py index ed403aa6516..489c08e880b 100644 --- a/sdk/python/tests/dsl/pipeline_param_tests.py +++ b/sdk/python/tests/dsl/pipeline_param_tests.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from kubernetes.client.models import V1Container, V1EnvVar from kfp.dsl import PipelineParam -from kfp.dsl._pipeline_param import _extract_pipelineparams +from kfp.dsl._pipeline_param import _extract_pipelineparams, extract_pipelineparams_from_any import unittest @@ -37,8 +37,8 @@ def test_str_repr(self): p = PipelineParam(name='param3', value='value3') self.assertEqual('{{pipelineparam:op=;name=param3;value=value3}}', str(p)) - def test_extract_pipelineparam(self): - """Test _extract_pipeleineparam.""" + def test_extract_pipelineparams(self): + """Test _extract_pipeleineparams.""" p1 = PipelineParam(name='param1', op_name='op1') p2 = PipelineParam(name='param2') @@ -49,4 +49,19 @@ def test_extract_pipelineparam(self): self.assertListEqual([p1, p2, p3], params) payload = [str(p1) + stuff_chars + str(p2), str(p2) + stuff_chars + str(p3)] params = _extract_pipelineparams(payload) - self.assertListEqual([p1, p2, p3], params) \ No newline at end of file + self.assertListEqual([p1, p2, p3], params) + + def test_extract_pipelineparams_from_any(self): + """Test extract_pipeleineparams.""" + p1 = PipelineParam(name='param1', op_name='op1') + p2 = PipelineParam(name='param2') + p3 = PipelineParam(name='param3', value='value3') + stuff_chars = ' between ' + payload = str(p1) + stuff_chars + str(p2) + stuff_chars + str(p3) + + container = V1Container(name=p1, + image=p2, + env=[V1EnvVar(name="foo", value=payload)]) + + params = extract_pipelineparams_from_any(container) + self.assertListEqual(sorted([p1, p2, p3]), sorted(params)) From 6773893a4a4417ec33510a6e861e5346afd91517 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 15 Mar 2019 20:48:26 +0800 Subject: [PATCH 04/12] WIP: fix conflicts and bugs with recent master. TODO: more complex template with pipeline params --- sdk/python/kfp/compiler/_op_to_template.py | 17 +- sdk/python/kfp/components/_dsl_bridge.py | 2 +- sdk/python/kfp/dsl/__init__.py | 2 +- sdk/python/kfp/dsl/_container_op.py | 96 ++++++-- sdk/python/kfp/dsl/_pipeline_param.py | 8 +- sdk/python/tests/compiler/compiler_tests.py | 13 +- .../tests/compiler/testdata/pipelineparams.py | 42 ++++ .../compiler/testdata/pipelineparams.yaml | 142 +++++++++++ sdk/python/tests/compiler/testdata/sidecar.py | 39 +++ .../tests/compiler/testdata/sidecar.yaml | 120 +++++++++ .../with_sidecars_and_pipelineparams.py | 111 --------- .../with_sidecars_and_pipelineparams.yaml | 227 ------------------ .../tests/components/test_components.py | 12 +- sdk/python/tests/components/test_python_op.py | 1 - sdk/python/tests/dsl/container_op_tests.py | 41 +++- 15 files changed, 487 insertions(+), 386 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/pipelineparams.py create mode 100644 sdk/python/tests/compiler/testdata/pipelineparams.yaml create mode 100644 sdk/python/tests/compiler/testdata/sidecar.py create mode 100644 sdk/python/tests/compiler/testdata/sidecar.yaml delete mode 100644 sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py delete mode 100644 sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml diff --git a/sdk/python/kfp/compiler/_op_to_template.py b/sdk/python/kfp/compiler/_op_to_template.py index dfbb002c7da..8535c5dfb51 100644 --- a/sdk/python/kfp/compiler/_op_to_template.py +++ b/sdk/python/kfp/compiler/_op_to_template.py @@ -29,12 +29,8 @@ def _get_pipelineparam(payload: str) -> List[str]: payload {str}: string """ - matches = re.findall( - r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', - payload) - return [ - dsl.PipelineParam(x[1], x[0], x[2]) for x in list(set(matches)) - ] + matches = dsl._match_serialized_pipelineparam(payload) + return [dsl.PipelineParam(x[1], x[0], x[2]) for x in list(set(matches))] def _sanitize_pipelineparam(param: dsl.PipelineParam, in_place=True): @@ -117,7 +113,7 @@ def _process_obj(obj: Any, map_to_tmpl_var: dict): # k8s_obj if hasattr(obj, 'swagger_types') and isinstance(obj.swagger_types, dict): # process everything inside recursively - for key in obj.swagger_types.keys(): + for key in obj.swagger_types.keys(): setattr(obj, key, _process_obj(getattr(obj, key), map_to_tmpl_var)) # return json representation of the k8s obj return K8sHelper.convert_k8s_obj_to_json(obj) @@ -179,7 +175,7 @@ def _parameters_to_json(params: List[dsl.PipelineParam]): def _inputs_to_json(inputs_params: List[dsl.PipelineParam], _artifacts=None): """Converts a list of PipelineParam into an argo `inputs` JSON obj.""" parameters = _parameters_to_json(inputs_params) - return {'parameters': parameters } if parameters else None + return {'parameters': parameters} if parameters else None def _outputs_to_json(outputs: Dict[str, dsl.PipelineParam], @@ -225,6 +221,7 @@ def _build_conventional_artifact(name): }, } + # TODO: generate argo python classes from swagger and use convert_k8s_obj_to_json?? def _op_to_template(op: dsl.ContainerOp): """Generate template given an operator inherited from dsl.ContainerOp.""" @@ -247,13 +244,13 @@ def _op_to_template(op: dsl.ContainerOp): } # inputs - inputs = _inputs_to_json(op.inputs) + inputs = _inputs_to_json(processed_op.inputs) if inputs: template['inputs'] = inputs # outputs template['outputs'] = _outputs_to_json(op.outputs, op.file_outputs, - output_artifacts) + output_artifacts) # node selector if processed_op.node_selector: diff --git a/sdk/python/kfp/components/_dsl_bridge.py b/sdk/python/kfp/components/_dsl_bridge.py index 90a9c75a818..eb29b095b16 100644 --- a/sdk/python/kfp/components/_dsl_bridge.py +++ b/sdk/python/kfp/components/_dsl_bridge.py @@ -169,7 +169,7 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma if env: from kubernetes import client as k8s_client for name, value in env.items(): - task.add_env_variable(k8s_client.V1EnvVar(name=name, value=value)) + task.container.add_env_variable(k8s_client.V1EnvVar(name=name, value=value)) if need_dummy: _dummy_pipeline.__exit__() diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index 413faab5c69..4359a8060db 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. -from ._pipeline_param import PipelineParam +from ._pipeline_param import PipelineParam, _match_serialized_pipelineparam from ._pipeline import Pipeline, pipeline, get_pipeline_conf from ._container_op import ContainerOp, Sidecar from ._ops_group import OpsGroup, ExitHandler, Condition diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 1c81cf03ed5..6f7c46904bc 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -13,7 +13,8 @@ # limitations under the License. import re -from typing import Any, Dict, List, TypeVar, Union +import warnings +from typing import Any, Dict, List, TypeVar, Union, Callable from kubernetes.client.models import ( V1Container, V1EnvVar, V1EnvFromSource, V1SecurityContext, V1Probe, V1ResourceRequirements, V1VolumeDevice, V1VolumeMount, V1ContainerPort, @@ -30,6 +31,35 @@ # util functions +def deprecation_warning(func: Callable, op_name: str, + container_name: str) -> Callable: + """Decorator function to give a pending deprecation warning""" + + def _wrapped(*args, **kwargs): + warnings.warn( + '`dsl.ContainerOp.%s` will be removed in future releases. ' + 'Use `dsl.ContainerOp.container.%s` instead.' % + (op_name, container_name), PendingDeprecationWarning) + return func(*args, **kwargs) + + return _wrapped + + +def _proxy_container_op_props(cls): + # properties mapping to proxy: ContainerOps. => Container. + prop_map = dict(image='image', env_variables='env') + # itera and create class props + for op_prop, container_prop in prop_map.items(): + getter = deprecation_warning( + lambda self: getattr(self._container, container_prop), op_prop, + container_prop) + setter = deprecation_warning( + lambda self, value: setattr(self._container, container_prop, value + ), op_prop, container_prop) + setattr(cls, op_prop, property(getter, setter)) + return cls + + def as_list(value: Any, if_none: Union[None, List] = None) -> List: """Convert any value except None to a list if not already a list.""" if value is None: @@ -599,6 +629,7 @@ def __init__(self, image: str, command: StringOrStringList = None, arguments: StringOrStringList = None, + sidecars: List[Sidecar] = None, file_outputs: Dict[str, str] = None, is_exit_handler=False): """Create a new instance of ContainerOp. @@ -655,20 +686,21 @@ def _proxy(proxy_attr): """Decorator func to proxy to ContainerOp.container""" def _decorated(*args, **kwargs): + # execute method ret = getattr(self._container, proxy_attr)(*args, **kwargs) if ret == self._container: return self return ret - return _decorated + return deprecation_warning(_decorated, proxy_attr, proxy_attr) # iter thru container and attach a proxy func to the container method for attr_to_proxy in dir(self._container): func = getattr(self._container, attr_to_proxy) - # only proxy public callables - if hasattr(func, - '__call__') and (attr_to_proxy[0] != '_' - ) and attr_to_proxy not in ignore_set: + # ignore private methods + if hasattr(func, '__call__') and (attr_to_proxy[0] != '_') and ( + attr_to_proxy not in ignore_set): + # only proxy public callables setattr(self, attr_to_proxy, _proxy(attr_to_proxy)) # TODO: proper k8s definitions so that `convert_k8s_obj_to_json` can be used? @@ -678,7 +710,7 @@ def _decorated(*args, **kwargs): self.pod_annotations = {} self.pod_labels = {} self.num_retries = 0 - self.sidecars = [] + self.sidecars = sidecars or [] # attributes specific to `ContainerOp` self._inputs = [] @@ -697,7 +729,22 @@ def _decorated(*args, **kwargs): self.output = None if len(self.outputs) == 1: self.output = list(self.outputs.values())[0] - + + @property + def command(self): + return self._container.command + + @command.setter + def command(self, value): + self._container.command = as_list(value) + + @property + def arguments(self): + return self._container.args + + @arguments.setter + def arguments(self, value): + self._container.args = as_list(value) @property def inputs(self): @@ -715,6 +762,8 @@ def inputs(self): param for param in _pipeline_param. extract_pipelineparams_from_any(getattr(self, key)) ] + # keep only unique + self._inputs = list(set(self._inputs)) return self._inputs @inputs.setter @@ -831,7 +880,7 @@ def add_sidecar(self, sidecar: Sidecar): def __repr__(self): return str({self.__class__.__name__: self.__dict__}) - + def _set_metadata(self, metadata): '''_set_metadata passes the containerop the metadata information and configures the right output @@ -839,16 +888,21 @@ def _set_metadata(self, metadata): metadata (ComponentMeta): component metadata ''' if not isinstance(metadata, ComponentMeta): - raise ValueError('_set_medata is expecting ComponentMeta.') - self._metadata = metadata + raise ValueError('_set_medata is expecting ComponentMeta.') + + self._metadata = metadata + if self.file_outputs: - for output in self.file_outputs.keys(): - output_type = self.outputs[output].param_type - for output_meta in self._metadata.outputs: - if output_meta.name == output: - output_type = output_meta.param_type - self.outputs[output].param_type = output_type - - self.output=None - if len(self.outputs) == 1: - self.output = list(self.outputs.values())[0] \ No newline at end of file + for output in self.file_outputs.keys(): + output_type = self.outputs[output].param_type + for output_meta in self._metadata.outputs: + if output_meta.name == output: + output_type = output_meta.param_type + self.outputs[output].param_type = output_type + + self.output = None + if len(self.outputs) == 1: + self.output = list(self.outputs.values())[0] + + +ContainerOp = _proxy_container_op_props(ContainerOp) \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index bab16a53f61..e2b4a885b61 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -14,7 +14,7 @@ import re -from collections import namedtuple, Iterable +from collections import namedtuple from typing import List from ._metadata import TypeMeta @@ -71,10 +71,10 @@ def extract_pipelineparams_from_any(payload) -> List['PipelineParam']: # str if isinstance(payload, str): - return _extract_pipelineparams(payload) + return list(set(_extract_pipelineparams(payload))) - # list or tuple or iterable - if isinstance(payload, list) or isinstance(payload, tuple) or isinstance(payload, Iterable): + # list or tuple + if isinstance(payload, list) or isinstance(payload, tuple): pipeline_params = [] for item in payload: pipeline_params += extract_pipelineparams_from_any(item) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index fb59522e81e..d95597995e1 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -207,6 +207,9 @@ def _test_py_compile(self, file_base_name): with open(os.path.join(test_data_dir, file_base_name + '.yaml'), 'r') as f: golden = yaml.load(f) compiled = self._get_yaml_from_tar(target_tar) + import json + with open('/pipelines/sdk/python/dist/'+file_base_name+'.yaml', 'w') as f: + json.dump(compiled, f) self.maxDiff = None self.assertEqual(golden, compiled) finally: @@ -216,9 +219,13 @@ def test_py_compile_basic(self): """Test basic sequential pipeline.""" self._test_py_compile('basic') - def test_py_compile_with_sidecars_and_pipelineparams(self): - """Test pipeline with_sidecars and pipelineparams in any k8s attributes.""" - self._test_py_compile('with_sidecars_and_pipelineparams') + def test_py_compile_with_sidecar(self): + """Test pipeline with sidecar.""" + self._test_py_compile('sidecar') + + def test_py_compile_with_pipelineparams(self): + """Test pipeline with multiple pipeline params.""" + self._test_py_compile('pipelineparams') def test_py_compile_condition(self): """Test a pipeline with conditions.""" diff --git a/sdk/python/tests/compiler/testdata/pipelineparams.py b/sdk/python/tests/compiler/testdata/pipelineparams.py new file mode 100644 index 00000000000..30dfce836bd --- /dev/null +++ b/sdk/python/tests/compiler/testdata/pipelineparams.py @@ -0,0 +1,42 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +from kubernetes import client as k8s_client +from kubernetes.client.models import V1EnvVar + + +@dsl.pipeline(name='PipelineParams', description='A pipeline with multiple pipeline params.') +def pipelineparams_pipeline(tag: str = 'latest', sleep_ms: int = 10): + + echo = dsl.Sidecar( + name='echo', + image='hashicorp/http-echo:%s' % tag, + args=['-text="hello world"']) + + op1 = dsl.ContainerOp( + name='download', + image='busybox:%s' % tag, + command=['sh', '-c'], + arguments=['sleep %s; wget localhost:5678 -O /tmp/results.txt' % sleep_ms], + sidecars=[echo], + file_outputs={'downloaded': '/tmp/results.txt'}) + + op2 = dsl.ContainerOp( + name='echo', + image='library/bash', + command=['sh', '-c'], + arguments=['echo $MSG %s' % op1.output]) + + op2.container.add_env_variable(V1EnvVar(name='MSG', value='pipelineParams: ')) diff --git a/sdk/python/tests/compiler/testdata/pipelineparams.yaml b/sdk/python/tests/compiler/testdata/pipelineparams.yaml new file mode 100644 index 00000000000..d9bbcc5cca2 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/pipelineparams.yaml @@ -0,0 +1,142 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +--- +apiVersion: argoproj.io/v1alpha1 +metadata: + generateName: pipelineparams- +spec: + entrypoint: pipelineparams + arguments: + parameters: + - name: tag + value: latest + - name: sleep-ms + value: '10' + templates: + - name: download + inputs: + parameters: + - name: sleep-ms + - name: tag + container: + image: busybox:{{inputs.parameters.tag}} + args: + - sleep {{inputs.parameters.sleep-ms}}; wget localhost:5678 -O /tmp/results.txt + command: + - sh + - "-c" + outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: "/mlpipeline-ui-metadata.json" + s3: + endpoint: minio-service.kubeflow:9000 + insecure: true + bucket: mlpipeline + accessKeySecret: + name: mlpipeline-minio-artifact + key: accesskey + secretKeySecret: + name: mlpipeline-minio-artifact + key: secretkey + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + - name: mlpipeline-metrics + path: "/mlpipeline-metrics.json" + s3: + endpoint: minio-service.kubeflow:9000 + insecure: true + bucket: mlpipeline + accessKeySecret: + name: mlpipeline-minio-artifact + key: accesskey + secretKeySecret: + name: mlpipeline-minio-artifact + key: secretkey + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + parameters: + - name: download-downloaded + valueFrom: + path: "/tmp/results.txt" + sidecars: + - image: hashicorp/http-echo:{{inputs.parameters.tag}} + name: echo + args: + - -text="hello world" + - name: echo + inputs: + parameters: + - name: download-downloaded + container: + image: library/bash + args: + - echo $MSG {{inputs.parameters.download-downloaded}} + command: + - sh + - "-c" + env: + - name: MSG + value: 'pipelineParams: ' + outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: "/mlpipeline-ui-metadata.json" + s3: + endpoint: minio-service.kubeflow:9000 + insecure: true + bucket: mlpipeline + accessKeySecret: + name: mlpipeline-minio-artifact + key: accesskey + secretKeySecret: + name: mlpipeline-minio-artifact + key: secretkey + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + - name: mlpipeline-metrics + path: "/mlpipeline-metrics.json" + s3: + endpoint: minio-service.kubeflow:9000 + insecure: true + bucket: mlpipeline + accessKeySecret: + name: mlpipeline-minio-artifact + key: accesskey + secretKeySecret: + name: mlpipeline-minio-artifact + key: secretkey + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + - name: pipelineparams + inputs: + parameters: + - name: sleep-ms + - name: tag + dag: + tasks: + - name: download + arguments: + parameters: + - name: sleep-ms + value: "{{inputs.parameters.sleep-ms}}" + - name: tag + value: "{{inputs.parameters.tag}}" + template: download + - dependencies: + - download + arguments: + parameters: + - name: download-downloaded + value: "{{tasks.download.outputs.parameters.download-downloaded}}" + name: echo + template: echo + serviceAccountName: pipeline-runner +kind: Workflow diff --git a/sdk/python/tests/compiler/testdata/sidecar.py b/sdk/python/tests/compiler/testdata/sidecar.py new file mode 100644 index 00000000000..effe265f2d3 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/sidecar.py @@ -0,0 +1,39 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +from kubernetes import client as k8s_client + + +@dsl.pipeline(name='Sidecar', description='A pipeline with sidecars.') +def sidecar_pipeline(): + + echo = dsl.Sidecar( + name='echo', + image='hashicorp/http-echo', + args=['-text="hello world"']) + + op1 = dsl.ContainerOp( + name='download', + image='busybox', + command=['sh', '-c'], + arguments=['sleep 10; wget localhost:5678 -O /tmp/results.txt'], + sidecars=[echo], + file_outputs={'downloaded': '/tmp/results.txt'}) + + op2 = dsl.ContainerOp( + name='echo', + image='library/bash', + command=['sh', '-c'], + arguments=['echo %s' % op1.output]) diff --git a/sdk/python/tests/compiler/testdata/sidecar.yaml b/sdk/python/tests/compiler/testdata/sidecar.yaml new file mode 100644 index 00000000000..edd8a5c0b3d --- /dev/null +++ b/sdk/python/tests/compiler/testdata/sidecar.yaml @@ -0,0 +1,120 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +kind: Workflow +metadata: + generateName: sidecar- +apiVersion: argoproj.io/v1alpha1 +spec: + arguments: + parameters: [] + templates: + - outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: "/mlpipeline-ui-metadata.json" + s3: + endpoint: minio-service.kubeflow:9000 + secretKeySecret: + name: mlpipeline-minio-artifact + key: secretkey + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + bucket: mlpipeline + accessKeySecret: + name: mlpipeline-minio-artifact + key: accesskey + insecure: true + - name: mlpipeline-metrics + path: "/mlpipeline-metrics.json" + s3: + endpoint: minio-service.kubeflow:9000 + secretKeySecret: + name: mlpipeline-minio-artifact + key: secretkey + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + bucket: mlpipeline + accessKeySecret: + name: mlpipeline-minio-artifact + key: accesskey + insecure: true + parameters: + - name: download-downloaded + valueFrom: + path: "/tmp/results.txt" + name: download + sidecars: + - image: hashicorp/http-echo + name: echo + args: + - -text="hello world" + container: + image: busybox + args: + - sleep 10; wget localhost:5678 -O /tmp/results.txt + command: + - sh + - "-c" + - outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: "/mlpipeline-ui-metadata.json" + s3: + endpoint: minio-service.kubeflow:9000 + secretKeySecret: + name: mlpipeline-minio-artifact + key: secretkey + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + bucket: mlpipeline + accessKeySecret: + name: mlpipeline-minio-artifact + key: accesskey + insecure: true + - name: mlpipeline-metrics + path: "/mlpipeline-metrics.json" + s3: + endpoint: minio-service.kubeflow:9000 + secretKeySecret: + name: mlpipeline-minio-artifact + key: secretkey + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + bucket: mlpipeline + accessKeySecret: + name: mlpipeline-minio-artifact + key: accesskey + insecure: true + name: echo + inputs: + parameters: + - name: download-downloaded + container: + image: library/bash + args: + - echo {{inputs.parameters.download-downloaded}} + command: + - sh + - "-c" + - name: sidecar + dag: + tasks: + - name: download + template: download + - arguments: + parameters: + - name: download-downloaded + value: "{{tasks.download.outputs.parameters.download-downloaded}}" + name: echo + dependencies: + - download + template: echo + serviceAccountName: pipeline-runner + entrypoint: sidecar diff --git a/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py deleted file mode 100644 index 0c06ae22a26..00000000000 --- a/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright 2018 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import kfp.dsl as dsl -import kfp.gcp as gcp - - -class GetFrequentWordOp(dsl.ContainerOp): - """A get frequent word class representing a component in ML Pipelines. - - The class provides a nice interface to users by hiding details such as container, - command, arguments. - """ - - def __init__(self, name, message): - """Args: - name: An identifier of the step which needs to be unique within a pipeline. - message: a dsl.PipelineParam object representing an input message. - """ - super(GetFrequentWordOp, self).__init__( - name=name, - image='python:3.5-jessie', - command=['sh', '-c'], - arguments=[ - 'python -c "from collections import Counter; ' - 'words = Counter(\'%s\'.split()); print(max(words, key=words.get))" ' - '| tee /tmp/message.txt' % message - ], - file_outputs={'word': '/tmp/message.txt'}) - - -class SaveMessageOp(dsl.ContainerOp): - """A class representing a component in ML Pipelines. - - It saves a message to a given output_path. - """ - - def __init__(self, name, message, output_path): - """Args: - name: An identifier of the step which needs to be unique within a pipeline. - message: a dsl.PipelineParam object representing the message to be saved. - output_path: a dsl.PipelineParam object representing the GCS path for output file. - """ - super(SaveMessageOp, self).__init__( - name=name, - image='google/cloud-sdk', - command=['sh', '-c'], - arguments=[ - 'echo %s | tee /tmp/results.txt | gsutil cp /tmp/results.txt %s' - % (message, output_path) - ]) - - -class ExitHandlerOp(dsl.ContainerOp): - """A class representing a component in ML Pipelines. - """ - - def __init__(self, name): - super(ExitHandlerOp, self).__init__( - name=name, - image='python:3.5-jessie', - command=['sh', '-c'], - arguments=['echo exit!']) - - -@dsl.pipeline( - name='Save Most Frequent', - description='Get Most Frequent Word and Save to GCS') -def save_most_frequent_word(message: str, - outputpath: str, - cpu_limit='0.5', - gpu_limit='2', - mirror=True, - sidecar_image_tag='latest'): - """A pipeline function describing the orchestration of the workflow.""" - - exit_op = ExitHandlerOp('exiting') - with dsl.ExitHandler(exit_op): - counter = GetFrequentWordOp(name='get-Frequent', message=message) - counter.set_memory_request('200M') - - saver = SaveMessageOp( - name='save', message=counter.output, output_path=outputpath) - - # update k8s container definition with pipeline params - (saver.container - .set_cpu_limit(cpu_limit) - .set_gpu_limit(gpu_limit) - .set_image_pull_policy("Always")) - - saver.add_node_selector_constraint('cloud.google.com/gke-accelerator', - 'nvidia-tesla-k80') - saver.apply( - gcp.use_tpu(tpu_cores=8, tpu_resource='v2', tf_version='1.12')) - - # add sidecar with str-based PipelineParam, as well as PipelineParam to k8s properties - saver.add_sidecar( - dsl.Sidecar('busybox', - 'busybox:%s' % sidecar_image_tag) - .set_mirror_volume_mounts(mirror)) diff --git a/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml deleted file mode 100644 index 34c85b2ace4..00000000000 --- a/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml +++ /dev/null @@ -1,227 +0,0 @@ -# Copyright 2018 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -apiVersion: argoproj.io/v1alpha1 -spec: - entrypoint: save-most-frequent - onExit: exiting - arguments: - parameters: - - name: message - - name: outputpath - - name: cpu-limit - value: '0.5' - - name: gpu-limit - value: '2' - - name: mirror - value: 'True' - - name: sidecar-image-tag - value: latest - serviceAccountName: pipeline-runner - templates: - - name: exit-handler-1 - inputs: - parameters: - - name: message - - name: mirror - - name: outputpath - - name: sidecar-image-tag - dag: - tasks: - - arguments: - parameters: - - name: message - value: "{{inputs.parameters.message}}" - template: get-frequent - name: get-frequent - - dependencies: - - get-frequent - arguments: - parameters: - - name: get-frequent-word - value: "{{tasks.get-frequent.outputs.parameters.get-frequent-word}}" - - name: mirror - value: "{{inputs.parameters.mirror}}" - - name: outputpath - value: "{{inputs.parameters.outputpath}}" - - name: sidecar-image-tag - value: "{{inputs.parameters.sidecar-image-tag}}" - template: save - name: save - - outputs: - artifacts: - - path: "/mlpipeline-ui-metadata.json" - name: mlpipeline-ui-metadata - s3: - insecure: true - endpoint: minio-service.kubeflow:9000 - key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz - accessKeySecret: - key: accesskey - name: mlpipeline-minio-artifact - secretKeySecret: - key: secretkey - name: mlpipeline-minio-artifact - bucket: mlpipeline - - path: "/mlpipeline-metrics.json" - name: mlpipeline-metrics - s3: - insecure: true - endpoint: minio-service.kubeflow:9000 - key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz - accessKeySecret: - key: accesskey - name: mlpipeline-minio-artifact - secretKeySecret: - key: secretkey - name: mlpipeline-minio-artifact - bucket: mlpipeline - container: - command: - - sh - - "-c" - image: python:3.5-jessie - args: - - echo exit! - name: exiting - - outputs: - artifacts: - - path: "/mlpipeline-ui-metadata.json" - name: mlpipeline-ui-metadata - s3: - insecure: true - endpoint: minio-service.kubeflow:9000 - key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz - accessKeySecret: - key: accesskey - name: mlpipeline-minio-artifact - secretKeySecret: - key: secretkey - name: mlpipeline-minio-artifact - bucket: mlpipeline - - path: "/mlpipeline-metrics.json" - name: mlpipeline-metrics - s3: - insecure: true - endpoint: minio-service.kubeflow:9000 - key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz - accessKeySecret: - key: accesskey - name: mlpipeline-minio-artifact - secretKeySecret: - key: secretkey - name: mlpipeline-minio-artifact - bucket: mlpipeline - parameters: - - valueFrom: - path: "/tmp/message.txt" - name: get-frequent-word - container: - command: - - sh - - "-c" - image: python:3.5-jessie - resources: - requests: - memory: 200M - args: - - python -c "from collections import Counter; words = Counter('{{inputs.parameters.message}}'.split()); - print(max(words, key=words.get))" | tee /tmp/message.txt - inputs: - parameters: - - name: message - name: get-frequent - - sidecars: - - mirrorVolumeMounts: "{{inputs.parameters.mirror}}" - image: busybox:{{inputs.parameters.sidecar-image-tag}} - name: busybox - metadata: - annotations: - tf-version.cloud-tpus.google.com: '1.12' - inputs: - parameters: - - name: get-frequent-word - - name: mirror - - name: outputpath - - name: sidecar-image-tag - container: - imagePullPolicy: Always - command: - - sh - - "-c" - resources: - limits: - cloud-tpus.google.com/v2: '8' - nvidia.com/gpu: "{{inputs.parameters.gpu-limit}}" - cpu: "{{inputs.parameters.cpu-limit}}" - image: google/cloud-sdk - args: - - echo {{inputs.parameters.get-frequent-word}} | tee /tmp/results.txt | gsutil - cp /tmp/results.txt {{inputs.parameters.outputpath}} - nodeSelector: - cloud.google.com/gke-accelerator: nvidia-tesla-k80 - outputs: - artifacts: - - path: "/mlpipeline-ui-metadata.json" - name: mlpipeline-ui-metadata - s3: - insecure: true - endpoint: minio-service.kubeflow:9000 - key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz - accessKeySecret: - key: accesskey - name: mlpipeline-minio-artifact - secretKeySecret: - key: secretkey - name: mlpipeline-minio-artifact - bucket: mlpipeline - - path: "/mlpipeline-metrics.json" - name: mlpipeline-metrics - s3: - insecure: true - endpoint: minio-service.kubeflow:9000 - key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz - accessKeySecret: - key: accesskey - name: mlpipeline-minio-artifact - secretKeySecret: - key: secretkey - name: mlpipeline-minio-artifact - bucket: mlpipeline - name: save - - name: save-most-frequent - inputs: - parameters: - - name: message - - name: mirror - - name: outputpath - - name: sidecar-image-tag - dag: - tasks: - - arguments: - parameters: - - name: message - value: "{{inputs.parameters.message}}" - - name: mirror - value: "{{inputs.parameters.mirror}}" - - name: outputpath - value: "{{inputs.parameters.outputpath}}" - - name: sidecar-image-tag - value: "{{inputs.parameters.sidecar-image-tag}}" - template: exit-handler-1 - name: exit-handler-1 - - name: exiting - template: exiting -kind: Workflow -metadata: - generateName: save-most-frequent- diff --git a/sdk/python/tests/components/test_components.py b/sdk/python/tests/components/test_components.py index 1d133d131ec..17a9b91047c 100644 --- a/sdk/python/tests/components/test_components.py +++ b/sdk/python/tests/components/test_components.py @@ -34,9 +34,9 @@ def _test_load_component_from_file(self, component_path: str): self.assertEqual(task1.human_name, 'Add') self.assertEqual(task_factory1.__doc__.strip(), 'Returns sum of two arguments') - self.assertEqual(task1.image, 'python:3.5') - self.assertEqual(task1.arguments[0], str(arg1)) - self.assertEqual(task1.arguments[1], str(arg2)) + self.assertEqual(task1.container.image, 'python:3.5') + self.assertEqual(task1.container.args[0], str(arg1)) + self.assertEqual(task1.container.args[1], str(arg2)) def test_load_component_from_yaml_file(self): _this_file = Path(__file__).resolve() @@ -68,7 +68,7 @@ def test_load_component_from_url(self): arg2 = 5 task1 = task_factory1(arg1, arg2) assert task1.human_name == component_dict['name'] - assert task1.image == component_dict['implementation']['container']['image'] + assert task1.container.image == component_dict['implementation']['container']['image'] assert task1.arguments[0] == str(arg1) assert task1.arguments[1] == str(arg2) @@ -83,7 +83,7 @@ def test_loading_minimal_component(self): task_factory1 = comp.load_component(text=component_text) task1 = task_factory1() - assert task1.image == component_dict['implementation']['container']['image'] + assert task1.container.image == component_dict['implementation']['container']['image'] @unittest.expectedFailure def test_fail_on_duplicate_input_names(self): @@ -482,7 +482,7 @@ def test_handling_env(self): import kfp with kfp.dsl.Pipeline('Dummy'): #Forcing the TaskSpec conversion to ContainerOp task1 = task_factory1() - actual_env = {env_var.name: env_var.value for env_var in task1.env_variables} + actual_env = {env_var.name: env_var.value for env_var in task1.container.env} expected_env = {'key1': 'value 1', 'key2': 'value 2'} self.assertDictEqual(expected_env, actual_env) diff --git a/sdk/python/tests/components/test_python_op.py b/sdk/python/tests/components/test_python_op.py index f82a0946d62..447dabe1ca1 100644 --- a/sdk/python/tests/components/test_python_op.py +++ b/sdk/python/tests/components/test_python_op.py @@ -49,7 +49,6 @@ def helper_test_2_in_1_out_component_using_local_call(self, func, op): task = op(arg1, arg2) full_command = task.command + task.arguments - process = subprocess.run(full_command) output_path = list(task.file_outputs.values())[0] diff --git a/sdk/python/tests/dsl/container_op_tests.py b/sdk/python/tests/dsl/container_op_tests.py index 8f37df3ac21..8982b3cbecb 100644 --- a/sdk/python/tests/dsl/container_op_tests.py +++ b/sdk/python/tests/dsl/container_op_tests.py @@ -13,8 +13,12 @@ # limitations under the License. -from kfp.dsl import Pipeline, PipelineParam, ContainerOp, Sidecar +import warnings import unittest +from kubernetes.client.models import V1EnvVar, V1VolumeMount + +from kfp.dsl import Pipeline, PipelineParam, ContainerOp, Sidecar + class TestContainerOp(unittest.TestCase): @@ -44,3 +48,38 @@ def test_after_op(self): op2 = ContainerOp(name='op2', image='image') op2.after(op1) self.assertCountEqual(op2.dependent_op_names, [op1.name]) + + + def test_deprecation_warnings(self): + """Test deprecation warnings.""" + with Pipeline('somename') as p: + op = ContainerOp(name='op1', image='image') + + with self.assertWarns(PendingDeprecationWarning): + op.env_variables = [V1EnvVar(name="foo", value="bar")] + + with self.assertWarns(PendingDeprecationWarning): + op.image = 'image2' + + with self.assertWarns(PendingDeprecationWarning): + op.set_memory_request('10M') + + with self.assertWarns(PendingDeprecationWarning): + op.set_memory_limit('10M') + + with self.assertWarns(PendingDeprecationWarning): + op.set_cpu_request('100m') + + with self.assertWarns(PendingDeprecationWarning): + op.set_cpu_limit('1') + + with self.assertWarns(PendingDeprecationWarning): + op.set_gpu_limit('1') + + with self.assertWarns(PendingDeprecationWarning): + op.add_env_variable(V1EnvVar(name="foo", value="bar")) + + with self.assertWarns(PendingDeprecationWarning): + op.add_volume_mount(V1VolumeMount( + mount_path='/secret/gcp-credentials', + name='gcp-credentials')) \ No newline at end of file From 2dc83117fbc26eaa48fd8afecc7fc622bec86665 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 22 Mar 2019 11:40:13 +0800 Subject: [PATCH 05/12] fix proxy args --- sdk/python/kfp/dsl/_container_op.py | 32 ++++++++++++----- sdk/python/tests/dsl/container_op_tests.py | 40 +++++++++++----------- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 6f7c46904bc..233a5e6f759 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -45,17 +45,29 @@ def _wrapped(*args, **kwargs): return _wrapped -def _proxy_container_op_props(cls): +def _create_getter_setter(prop): + """Create a tuple of getter and setter methods for a property in `Container`.""" + def _getter(self): + return getattr(self._container, prop) + def _setter(self, value): + return setattr(self._container, prop, value) + return _getter, _setter + + +def _proxy_container_op_props(cls: "ContainerOp"): + """Takes the `ContainerOp` class and proxy the PendingDeprecation properties + in `ContainerOp` to the `Container` instance. + """ # properties mapping to proxy: ContainerOps. => Container. prop_map = dict(image='image', env_variables='env') # itera and create class props for op_prop, container_prop in prop_map.items(): - getter = deprecation_warning( - lambda self: getattr(self._container, container_prop), op_prop, - container_prop) - setter = deprecation_warning( - lambda self, value: setattr(self._container, container_prop, value - ), op_prop, container_prop) + # create getter and setter + _getter, _setter = _create_getter_setter(container_prop) + # decorate with deprecation warning + getter = deprecation_warning(_getter, op_prop, container_prop) + setter = deprecation_warning(_setter, op_prop, container_prop) + # update attribites with properties setattr(cls, op_prop, property(getter, setter)) return cls @@ -537,7 +549,6 @@ class Sidecar(Container): from kfp.dsl import ContainerOp, Sidecar - # creates a `ContainerOp` and adds a redis `Sidecar` op = (ContainerOp(name='foo-op', image='busybox:latest') .add_sidecar( @@ -630,6 +641,7 @@ def __init__(self, command: StringOrStringList = None, arguments: StringOrStringList = None, sidecars: List[Sidecar] = None, + container_kwargs: Dict = None, file_outputs: Dict[str, str] = None, is_exit_handler=False): """Create a new instance of ContainerOp. @@ -643,6 +655,10 @@ def __init__(self, arguments: the arguments of the command. The command can include "%s" and supply a PipelineParam as the string replacement. For example, ('echo %s' % input_param). At container run time the argument will be 'echo param_value'. + sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy + together with the `main` container. + container_kwargs: the dict of additional keyword arguments to pass to the + op's `Container` definition. file_outputs: Maps output labels to local file paths. At pipeline run time, the value of a PipelineParam is saved to its corresponding local file. It's one way for outside world to receive outputs of the container. diff --git a/sdk/python/tests/dsl/container_op_tests.py b/sdk/python/tests/dsl/container_op_tests.py index 8982b3cbecb..9035dacd9e4 100644 --- a/sdk/python/tests/dsl/container_op_tests.py +++ b/sdk/python/tests/dsl/container_op_tests.py @@ -55,31 +55,31 @@ def test_deprecation_warnings(self): with Pipeline('somename') as p: op = ContainerOp(name='op1', image='image') - with self.assertWarns(PendingDeprecationWarning): - op.env_variables = [V1EnvVar(name="foo", value="bar")] + with self.assertWarns(PendingDeprecationWarning): + op.env_variables = [V1EnvVar(name="foo", value="bar")] - with self.assertWarns(PendingDeprecationWarning): - op.image = 'image2' + with self.assertWarns(PendingDeprecationWarning): + op.image = 'image2' - with self.assertWarns(PendingDeprecationWarning): - op.set_memory_request('10M') + with self.assertWarns(PendingDeprecationWarning): + op.set_memory_request('10M') - with self.assertWarns(PendingDeprecationWarning): - op.set_memory_limit('10M') + with self.assertWarns(PendingDeprecationWarning): + op.set_memory_limit('10M') - with self.assertWarns(PendingDeprecationWarning): - op.set_cpu_request('100m') + with self.assertWarns(PendingDeprecationWarning): + op.set_cpu_request('100m') - with self.assertWarns(PendingDeprecationWarning): - op.set_cpu_limit('1') + with self.assertWarns(PendingDeprecationWarning): + op.set_cpu_limit('1') - with self.assertWarns(PendingDeprecationWarning): - op.set_gpu_limit('1') + with self.assertWarns(PendingDeprecationWarning): + op.set_gpu_limit('1') - with self.assertWarns(PendingDeprecationWarning): - op.add_env_variable(V1EnvVar(name="foo", value="bar")) + with self.assertWarns(PendingDeprecationWarning): + op.add_env_variable(V1EnvVar(name="foo", value="bar")) - with self.assertWarns(PendingDeprecationWarning): - op.add_volume_mount(V1VolumeMount( - mount_path='/secret/gcp-credentials', - name='gcp-credentials')) \ No newline at end of file + with self.assertWarns(PendingDeprecationWarning): + op.add_volume_mount(V1VolumeMount( + mount_path='/secret/gcp-credentials', + name='gcp-credentials')) \ No newline at end of file From 3f11fc60544683a654af66f77319e9241fd30dd4 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 22 Mar 2019 14:32:37 +0800 Subject: [PATCH 06/12] Fixed to work with latest master head --- sdk/python/kfp/compiler/_op_to_template.py | 70 ++----------------- sdk/python/kfp/dsl/_pipeline_param.py | 36 ++++++++-- sdk/python/tests/compiler/compiler_tests.py | 3 - .../tests/components/test_components.py | 2 +- 4 files changed, 39 insertions(+), 72 deletions(-) diff --git a/sdk/python/kfp/compiler/_op_to_template.py b/sdk/python/kfp/compiler/_op_to_template.py index 8535c5dfb51..d230fa4fdf2 100644 --- a/sdk/python/kfp/compiler/_op_to_template.py +++ b/sdk/python/kfp/compiler/_op_to_template.py @@ -22,51 +22,6 @@ T = TypeVar('T') -def _get_pipelineparam(payload: str) -> List[str]: - """Get a list of `PipelineParam` from a string. - - Args: - payload {str}: string - """ - - matches = dsl._match_serialized_pipelineparam(payload) - return [dsl.PipelineParam(x[1], x[0], x[2]) for x in list(set(matches))] - - -def _sanitize_pipelineparam(param: dsl.PipelineParam, in_place=True): - """Sanitize the name and op_name of a PipelineParam. - - Args: - params: a PipelineParam to sanitize - in_place: if set, do an in-place update to PipelineParm, otherwise return a - new instance of PipelineParam. - """ - if in_place: - param.name = K8sHelper.sanitize_k8s_name(param.name) - param.op_name = K8sHelper.sanitize_k8s_name( - param.op_name) if param.op_name else param.op_name - return param - - return dsl.PipelineParam( - K8sHelper.sanitize_k8s_name(param.name), - K8sHelper.sanitize_k8s_name(param.op_name), param.value) - - -def _sanitize_pipelineparams( - params: Union[dsl.PipelineParam, List[dsl.PipelineParam]], - in_place=True): - """Sanitize the name(s) of a PipelineParam (or a list of PipelineParam) and - return a list of sanitized PipelineParam. - - Args: - params: a PipelineParam or a list of PipelineParam to sanitize - in_place: if set, do an in-place update to the PipelineParm, otherwise return - new instances of PipelineParam. - """ - params = params if isinstance(params, list) else [params] - return [_sanitize_pipelineparam(param, in_place) for param in params] - - def _process_obj(obj: Any, map_to_tmpl_var: dict): """Recursively sanitize and replace any PipelineParam (instances and serialized strings) in the object with the corresponding template variables @@ -80,14 +35,12 @@ def _process_obj(obj: Any, map_to_tmpl_var: dict): # serialized str might be unsanitized if isinstance(obj, str): # get signature - pipeline_params = _get_pipelineparam(obj) - if not pipeline_params: + param_tuples = dsl._match_serialized_pipelineparam(obj) + if not param_tuples: return obj # replace all unsanitized signature with template var - for param in pipeline_params: - pattern = str(param) - sanitized = str(_sanitize_pipelineparam(param)) - obj = re.sub(pattern, map_to_tmpl_var[sanitized], obj) + for param_tuple in param_tuples: + obj = re.sub(param_tuple.pattern, map_to_tmpl_var[param_tuple.pattern], obj) # list if isinstance(obj, list): @@ -139,19 +92,10 @@ def _process_container_ops(op: dsl.ContainerOp): dsl.ContainerOp """ - # tmp map: unsanitized rpr -> sanitized PipelineParam - # in-place sanitize of all PipelineParam (except outputs and file_outputs) - _map = { - str(param): _sanitize_pipelineparam(param, in_place=True) - for param in op.inputs - } - - # map: unsanitized pipeline param rpr -> template var string - # used to replace unsanitized pipeline param strings with the corresponding - # template var strings + # map param's (unsanitized pattern or serialized str pattern) -> input param var str map_to_tmpl_var = { - key: '{{inputs.parameters.%s}}' % param.full_name - for key, param in _map.items() + (param.pattern or str(param)): '{{inputs.parameters.%s}}' % param.full_name + for param in op.inputs } # process all attr with pipelineParams except inputs and outputs parameters diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index 9e26b5173e6..4accbe81f9a 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -22,7 +22,15 @@ # TODO: Move this to a separate class # For now, this identifies a condition with only "==" operator supported. ConditionOperator = namedtuple('ConditionOperator', 'operator operand1 operand2') -PipelineParamTuple = namedtuple('PipelineParamTuple', 'name op value type') +PipelineParamTuple = namedtuple('PipelineParamTuple', 'name op value type pattern') + + +def sanitize_k8s_name(name): + """From _make_kubernetes_name + sanitize_k8s_name cleans and converts the names in the workflow. + """ + return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-') + def _match_serialized_pipelineparam(payload: str): """_match_serialized_pipelineparam matches the serialized pipelineparam. @@ -38,9 +46,21 @@ def _match_serialized_pipelineparam(payload: str): param_tuples = [] for match in matches: if len(match) == 3: - param_tuples.append(PipelineParamTuple(name=match[1], op=match[0], value=match[2], type='')) + pattern = '{{pipelineparam:op=%s;name=%s;value=%s}}' % (match[0], match[1], match[2]) + param_tuples.append(PipelineParamTuple( + name=sanitize_k8s_name(match[1]), + op=sanitize_k8s_name(match[0]), + value=match[2], + type='', + pattern=pattern)) elif len(match) == 4: - param_tuples.append(PipelineParamTuple(name=match[1], op=match[0], value=match[2], type=match[3])) + pattern = '{{pipelineparam:op=%s;name=%s;value=%s;type=%s;}}' % (match[0], match[1], match[2], match[3]) + param_tuples.append(PipelineParamTuple( + name=sanitize_k8s_name(match[1]), + op=sanitize_k8s_name(match[0]), + value=match[2], + type=match[3], + pattern=pattern)) return param_tuples def _extract_pipelineparams(payloads: str or List[str]): @@ -59,7 +79,11 @@ def _extract_pipelineparams(payloads: str or List[str]): param_tuples += _match_serialized_pipelineparam(payload) pipeline_params = [] for param_tuple in list(set(param_tuples)): - pipeline_params.append(PipelineParam(param_tuple.name, param_tuple.op, param_tuple.value, TypeMeta.deserialize(param_tuple.type))) + pipeline_params.append(PipelineParam(param_tuple.name, + param_tuple.op, + param_tuple.value, + TypeMeta.deserialize(param_tuple.type), + pattern=param_tuple.pattern)) return pipeline_params @@ -118,7 +142,7 @@ class PipelineParam(object): value passed between components. """ - def __init__(self, name: str, op_name: str=None, value: str=None, param_type: TypeMeta=TypeMeta()): + def __init__(self, name: str, op_name: str=None, value: str=None, param_type: TypeMeta=TypeMeta(), pattern: str=None): """Create a new instance of PipelineParam. Args: name: name of the pipeline parameter. @@ -129,6 +153,7 @@ def __init__(self, name: str, op_name: str=None, value: str=None, param_type: Ty value: The actual value of the PipelineParam. If provided, the PipelineParam is "resolved" immediately. For now, we support string only. param_type: the type of the PipelineParam. + pattern: the serialized string regex pattern this pipeline parameter created from. Raises: ValueError in name or op_name contains invalid characters, or both op_name and value are set. """ @@ -147,6 +172,7 @@ def __init__(self, name: str, op_name: str=None, value: str=None, param_type: Ty self.op_name = op_name if op_name else None self.value = value if value else None self.param_type = param_type + self.pattern = pattern @property def full_name(self): diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index d95597995e1..9233c9b278e 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -207,9 +207,6 @@ def _test_py_compile(self, file_base_name): with open(os.path.join(test_data_dir, file_base_name + '.yaml'), 'r') as f: golden = yaml.load(f) compiled = self._get_yaml_from_tar(target_tar) - import json - with open('/pipelines/sdk/python/dist/'+file_base_name+'.yaml', 'w') as f: - json.dump(compiled, f) self.maxDiff = None self.assertEqual(golden, compiled) finally: diff --git a/sdk/python/tests/components/test_components.py b/sdk/python/tests/components/test_components.py index c1ae9622e1f..83c0e73aaf0 100644 --- a/sdk/python/tests/components/test_components.py +++ b/sdk/python/tests/components/test_components.py @@ -33,7 +33,7 @@ def _test_load_component_from_file(self, component_path: str): task1 = task_factory1(arg1, arg2) self.assertEqual(task1.human_name, 'Add') - self.assertEqual(task_factory1.__doc__.strip(), 'Returns sum of two arguments') + self.assertEqual(task_factory1.__doc__.strip(), 'Add\nReturns sum of two arguments') self.assertEqual(task1.container.image, 'python:3.5') self.assertEqual(task1.container.args[0], str(arg1)) self.assertEqual(task1.container.args[1], str(arg2)) From ae64f8410956efcb8dfade6a5657ee40dce74e89 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 22 Mar 2019 14:43:46 +0800 Subject: [PATCH 07/12] Added container_kwargs to ContainerOp to pass in k8s container kwargs --- sdk/python/kfp/dsl/_container_op.py | 3 ++- sdk/python/tests/dsl/container_op_tests.py | 8 +++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 233a5e6f759..3a2032f70ae 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -684,8 +684,9 @@ def __init__(self, self, is_exit_handler) # `container` prop in `io.argoproj.workflow.v1alpha1.Template` + container_kwargs = container_kwargs or {} self._container = Container( - image=image, args=arguments, command=command) + image=image, args=arguments, command=command, **container_kwargs) # for chaining, and returning back to `ContainerOp` when updating `Container` # i.e # op._container.set_image_policy('Always').parent == op # True diff --git a/sdk/python/tests/dsl/container_op_tests.py b/sdk/python/tests/dsl/container_op_tests.py index 9035dacd9e4..840b54c2709 100644 --- a/sdk/python/tests/dsl/container_op_tests.py +++ b/sdk/python/tests/dsl/container_op_tests.py @@ -29,6 +29,8 @@ def test_basic(self): param2 = PipelineParam('param2') op1 = (ContainerOp(name='op1', image='image', arguments=['%s hello %s %s' % (param1, param2, param1)], + sidecars=[Sidecar(name='sidecar0', image='image0')], + container_kwargs={'env': [V1EnvVar(name='env1', value='value1')]}, file_outputs={'out1': '/tmp/b'}) .add_sidecar(Sidecar(name='sidecar1', image='image1')) .add_sidecar(Sidecar(name='sidecar2', image='image2'))) @@ -37,9 +39,9 @@ def test_basic(self): self.assertCountEqual(list(op1.outputs.keys()), ['out1']) self.assertCountEqual([x.op_name for x in op1.outputs.values()], ['op1']) self.assertEqual(op1.output.name, 'out1') - self.assertCountEqual([sidecar.name for sidecar in op1.sidecars], ['sidecar1', 'sidecar2']) - self.assertCountEqual([sidecar.image for sidecar in op1.sidecars], ['image1', 'image2']) - + self.assertCountEqual([sidecar.name for sidecar in op1.sidecars], ['sidecar0', 'sidecar1', 'sidecar2']) + self.assertCountEqual([sidecar.image for sidecar in op1.sidecars], ['image0', 'image1', 'image2']) + self.assertCountEqual([env.name for env in op1.container.env], ['env1']) def test_after_op(self): """Test duplicate ops.""" From d2b8f2d05012dd005397b1b3b70f23676ff997d0 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 22 Mar 2019 15:03:19 +0800 Subject: [PATCH 08/12] Fix comment bug, updated with example in ContainerOp docstring --- sdk/python/kfp/dsl/_container_op.py | 35 ++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 3a2032f70ae..8bccf95f9d9 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -111,8 +111,8 @@ class Container(V1Container): # creates a operation - op = ContainerOp(name='bash-ops', i - mage='busybox:latest', + op = ContainerOp(name='bash-ops', + image='busybox:latest', command=['echo'], arguments=['$MSG']) @@ -624,7 +624,36 @@ def inputs(self): class ContainerOp(object): - """Represents an op implemented by a container image.""" + """ + Represents an op implemented by a container image. + + Example + + from kfp import dsl + from kubernetes.client.models import V1EnvVar + + + @dsl.pipeline( + name='foo', + description='hello world') + def foo_pipeline(tag: str, pull_image_policy: str): + + # any attributes can be parameterized (both serialized string or actual PipelineParam) + op = dsl.ContainerOp(name='foo', + image='busybox:' % tag, + # pass in sidecars list + sidecars=[dsl.Sidecar('print', 'busybox:latest', command='echo "hello"')], + # pass in k8s container kwargs + container_kwargs={'env': [V1EnvVar('foo', 'bar')]}) + + # set `imagePullPolicy` property for `container` with `PipelineParam` + op.container.set_pull_image_policy(pull_image_policy) + + # add sidecar with parameterized image tag + # sidecar follows the argo sidecar swagger spec + op.add_sidecar(dsl.Sidecar('redis', 'redis:' % tag).set_image_pull_policy('Always')) + + """ # list of attributes that might have pipeline params - used to generate # the input parameters during compilation. From 8f144f8d24d824c4663829f036637208f4b17306 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 22 Mar 2019 15:58:25 +0800 Subject: [PATCH 09/12] fix copyright year --- sdk/python/kfp/compiler/_op_to_template.py | 2 +- sdk/python/kfp/dsl/_container_op.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/compiler/_op_to_template.py b/sdk/python/kfp/compiler/_op_to_template.py index d230fa4fdf2..648fb9cb258 100644 --- a/sdk/python/kfp/compiler/_op_to_template.py +++ b/sdk/python/kfp/compiler/_op_to_template.py @@ -1,4 +1,4 @@ -# Copyright 2018 Google LLC +# Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 8bccf95f9d9..8adf0f6b41c 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -1,4 +1,4 @@ -# Copyright 2018 Google LLC +# Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 9db1858cb529737cad6250dd4a31164dbeb4e965 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Wed, 27 Mar 2019 22:58:56 +0800 Subject: [PATCH 10/12] expose match_serialized_pipelineparam as public for compiler to process serialized pipeline params --- sdk/python/kfp/compiler/_op_to_template.py | 2 +- sdk/python/kfp/compiler/compiler.py | 5 ----- sdk/python/kfp/dsl/__init__.py | 2 +- sdk/python/kfp/dsl/_pipeline_param.py | 6 +++--- 4 files changed, 5 insertions(+), 10 deletions(-) diff --git a/sdk/python/kfp/compiler/_op_to_template.py b/sdk/python/kfp/compiler/_op_to_template.py index 648fb9cb258..967fa2ca814 100644 --- a/sdk/python/kfp/compiler/_op_to_template.py +++ b/sdk/python/kfp/compiler/_op_to_template.py @@ -35,7 +35,7 @@ def _process_obj(obj: Any, map_to_tmpl_var: dict): # serialized str might be unsanitized if isinstance(obj, str): # get signature - param_tuples = dsl._match_serialized_pipelineparam(obj) + param_tuples = dsl.match_serialized_pipelineparam(obj) if not param_tuples: return obj # replace all unsanitized signature with template var diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 6f13289e8e6..d0cf92a1c78 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -23,7 +23,6 @@ from ._k8s_helper import K8sHelper from ._op_to_template import _op_to_template -from ..dsl._pipeline_param import _match_serialized_pipelineparam from ..dsl._metadata import TypeMeta class Compiler(object): @@ -432,10 +431,6 @@ def _compile(self, pipeline_func): for op in p.ops.values(): sanitized_name = K8sHelper.sanitize_k8s_name(op.name) op.name = sanitized_name - for param in op.inputs: # + op.argument_inputs: - param.name = K8sHelper.sanitize_k8s_name(param.name) - if param.op_name: - param.op_name = K8sHelper.sanitize_k8s_name(param.op_name) for param in op.outputs.values(): param.name = K8sHelper.sanitize_k8s_name(param.name) if param.op_name: diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index 4359a8060db..0cbcb5e8fbd 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. -from ._pipeline_param import PipelineParam, _match_serialized_pipelineparam +from ._pipeline_param import PipelineParam, match_serialized_pipelineparam from ._pipeline import Pipeline, pipeline, get_pipeline_conf from ._container_op import ContainerOp, Sidecar from ._ops_group import OpsGroup, ExitHandler, Condition diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index 4accbe81f9a..650da9df6dc 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -32,8 +32,8 @@ def sanitize_k8s_name(name): return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-') -def _match_serialized_pipelineparam(payload: str): - """_match_serialized_pipelineparam matches the serialized pipelineparam. +def match_serialized_pipelineparam(payload: str): + """match_serialized_pipelineparam matches the serialized pipelineparam. Args: payloads (str): a string that contains the serialized pipelineparam. @@ -76,7 +76,7 @@ def _extract_pipelineparams(payloads: str or List[str]): payloads = [payloads] param_tuples = [] for payload in payloads: - param_tuples += _match_serialized_pipelineparam(payload) + param_tuples += match_serialized_pipelineparam(payload) pipeline_params = [] for param_tuple in list(set(param_tuples)): pipeline_params.append(PipelineParam(param_tuple.name, From 7eb56b02add6e602c2ecc8baf3595b777a417746 Mon Sep 17 00:00:00 2001 From: eterna2 Date: Wed, 27 Mar 2019 23:04:19 +0800 Subject: [PATCH 11/12] fixed pydoc example and removed unnecessary ContainerOp.container.parent --- sdk/python/kfp/dsl/_container_op.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 8adf0f6b41c..d0dd56a1854 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -640,7 +640,7 @@ def foo_pipeline(tag: str, pull_image_policy: str): # any attributes can be parameterized (both serialized string or actual PipelineParam) op = dsl.ContainerOp(name='foo', - image='busybox:' % tag, + image='busybox:%s' % tag, # pass in sidecars list sidecars=[dsl.Sidecar('print', 'busybox:latest', command='echo "hello"')], # pass in k8s container kwargs @@ -651,7 +651,7 @@ def foo_pipeline(tag: str, pull_image_policy: str): # add sidecar with parameterized image tag # sidecar follows the argo sidecar swagger spec - op.add_sidecar(dsl.Sidecar('redis', 'redis:' % tag).set_image_pull_policy('Always')) + op.add_sidecar(dsl.Sidecar('redis', 'redis:%s' % tag).set_image_pull_policy('Always')) """ @@ -716,10 +716,6 @@ def __init__(self, container_kwargs = container_kwargs or {} self._container = Container( image=image, args=arguments, command=command, **container_kwargs) - # for chaining, and returning back to `ContainerOp` when updating `Container` - # i.e - # op._container.set_image_policy('Always').parent == op # True - setattr(self._container, "parent", self) # NOTE for backward compatibility (remove in future?) # proxy old ContainerOp callables to Container @@ -950,5 +946,6 @@ def _set_metadata(self, metadata): if len(self.outputs) == 1: self.output = list(self.outputs.values())[0] - +# proxy old ContainerOp properties to ContainerOp.container +# with PendingDeprecationWarning. ContainerOp = _proxy_container_op_props(ContainerOp) \ No newline at end of file From 4f621c2a242a36adf34ecc0a75022da58c4b359f Mon Sep 17 00:00:00 2001 From: eterna2 Date: Wed, 27 Mar 2019 23:46:44 +0800 Subject: [PATCH 12/12] Fix conflicts in compiler tests --- sdk/python/tests/compiler/compiler_tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index df30f443479..e54dc378fa0 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -260,11 +260,11 @@ def test_py_compile_basic(self): def test_py_compile_with_sidecar(self): """Test pipeline with sidecar.""" - self._test_py_compile('sidecar') + self._test_py_compile_yaml('sidecar') def test_py_compile_with_pipelineparams(self): """Test pipeline with multiple pipeline params.""" - self._test_py_compile('pipelineparams') + self._test_py_compile_yaml('pipelineparams') def test_py_compile_condition(self): """Test a pipeline with conditions."""