From b675e0272b617f23c3cac435c321aa7cfb7d6275 Mon Sep 17 00:00:00 2001 From: Ilias Katsakioris Date: Fri, 10 May 2019 03:25:57 +0300 Subject: [PATCH] Remove cops and rops pipeline attributes (#1298) * Remove the separated dictionaries for ContainerOps and ResourceOps * Fix the sanitization performed by the compiler to iterate through ops dict and do type-check for the special fields file_outputs and attribute_outputs Signed-off-by: Ilias Katsakioris --- sdk/python/kfp/compiler/compiler.py | 37 +++++++---------------------- sdk/python/kfp/dsl/_pipeline.py | 6 ----- 2 files changed, 9 insertions(+), 34 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 8b9d3cfd6c6..f3953dcab8d 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -601,8 +601,8 @@ def _compile(self, pipeline_func): arg.value = default.value if isinstance(default, dsl.PipelineParam) else default # Sanitize operator names and param names - sanitized_cops = {} - for op in p.cops.values(): + sanitized_ops = {} + for op in p.ops.values(): sanitized_name = K8sHelper.sanitize_k8s_name(op.name) op.name = sanitized_name for param in op.outputs.values(): @@ -614,38 +614,19 @@ def _compile(self, pipeline_func): op.output.op_name = K8sHelper.sanitize_k8s_name(op.output.op_name) if op.dependent_names: op.dependent_names = [K8sHelper.sanitize_k8s_name(name) for name in op.dependent_names] - if op.file_outputs is not None: + if isinstance(op, dsl.ContainerOp) and op.file_outputs is not None: sanitized_file_outputs = {} for key in op.file_outputs.keys(): sanitized_file_outputs[K8sHelper.sanitize_k8s_name(key)] = op.file_outputs[key] op.file_outputs = sanitized_file_outputs - sanitized_cops[sanitized_name] = op - p.cops = sanitized_cops - p.ops = dict(sanitized_cops) - - # Sanitize operator names and param names of ResourceOps - sanitized_rops = {} - for rop in p.rops.values(): - sanitized_name = K8sHelper.sanitize_k8s_name(rop.name) - rop.name = sanitized_name - for param in rop.outputs.values(): - param.name = K8sHelper.sanitize_k8s_name(param.name) - if param.op_name: - param.op_name = K8sHelper.sanitize_k8s_name(param.op_name) - if rop.output is not None: - rop.output.name = K8sHelper.sanitize_k8s_name(rop.output.name) - rop.output.op_name = K8sHelper.sanitize_k8s_name(rop.output.op_name) - if rop.dependent_names: - rop.dependent_names = [K8sHelper.sanitize_k8s_name(name) for name in rop.dependent_names] - if rop.attribute_outputs is not None: + elif isinstance(op, dsl.ResourceOp) and op.attribute_outputs is not None: sanitized_attribute_outputs = {} - for key in rop.attribute_outputs.keys(): + for key in op.attribute_outputs.keys(): sanitized_attribute_outputs[K8sHelper.sanitize_k8s_name(key)] = \ - rop.attribute_outputs[key] - rop.attribute_outputs = sanitized_attribute_outputs - sanitized_rops[sanitized_name] = rop - p.rops = sanitized_rops - p.ops.update(dict(sanitized_rops)) + op.attribute_outputs[key] + op.attribute_outputs = sanitized_attribute_outputs + sanitized_ops[sanitized_name] = op + p.ops = sanitized_ops workflow = self._create_pipeline_workflow(args_list_with_defaults, p) return workflow diff --git a/sdk/python/kfp/dsl/_pipeline.py b/sdk/python/kfp/dsl/_pipeline.py index 5de27328c0a..2037063d917 100644 --- a/sdk/python/kfp/dsl/_pipeline.py +++ b/sdk/python/kfp/dsl/_pipeline.py @@ -110,8 +110,6 @@ def __init__(self, name: str): """ self.name = name self.ops = {} - self.cops = {} - self.rops = {} # Add the root group. self.groups = [_ops_group.OpsGroup('pipeline', name=name)] self.group_id = 0 @@ -148,10 +146,6 @@ def add_op(self, op: _container_op.BaseOp, define_only: bool): op_name = _make_name_unique_by_adding_index(op.human_name, list(self.ops.keys()), ' ') self.ops[op_name] = op - if isinstance(op, _container_op.ContainerOp): - self.cops[op_name] = op - elif isinstance(op, _resource_op.ResourceOp): - self.rops[op_name] = op if not define_only: self.groups[-1].ops.append(op)