Skip to content

Commit

Permalink
[ML][Pipelines] Provide API to get node predecessors for @pipeline (A…
Browse files Browse the repository at this point in the history
…zure#29848)

* support get data owner

* add more tests

* update implementation

* add API and tests

* add tests for yaml job

* fix comment

* remove get_predecessors
  • Loading branch information
D-W- authored Apr 17, 2023
1 parent eb40e17 commit c35ccfa
Show file tree
Hide file tree
Showing 8 changed files with 499 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ def _map_internal_output_type(_meta):
meta=output_meta,
owner="pipeline",
description=self._args_description.get(key, None),
# store original node output to be able to trace back to inner node from a pipeline output builder.
binding_output=value,
)
# copy node level output setting to pipeline output
copy_output_setting(source=value._owner.outputs[value._port_name], target=pipeline_output)
Expand Down
43 changes: 43 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,43 @@ def _deepcopy(self):
meta=self._meta,
)

def _get_data_owner(self) -> Optional["BaseNode"]:
"""Return the node if Input is from another node's output. Returns None if for literal value.
Note: This only works for @pipeline, not for YAML pipeline.
Note: Inner step will be returned as the owner when node's input is from sub pipeline's output.
@pipeline
def sub_pipeline():
inner_node = component_func()
return inner_node.outputs
@pipeline
def root_pipeline():
pipeline_node = sub_pipeline()
node = copy_files_component_func(input_dir=pipeline_node.outputs.output_dir)
owner = node.inputs.input_dir._get_data_owner()
assert owner == pipeline_node.nodes[0]
"""
from azure.ai.ml.entities import Pipeline
from azure.ai.ml.entities._builders import BaseNode

def _resolve_data_owner(data) -> Optional["BaseNode"]:
if isinstance(data, BaseNode) and not isinstance(data, Pipeline):
return data
while isinstance(data, PipelineInput):
# for pipeline input, it's original value(can be literal value or another node's output)
# is stored in _original_data
return _resolve_data_owner(data._original_data)
if isinstance(data, NodeOutput):
if isinstance(data._owner, Pipeline):
# for input from subgraph's output, trace back to inner node
return _resolve_data_owner(data._binding_output)
# for input from another node's output, return the node
return _resolve_data_owner(data._owner)
return None

return _resolve_data_owner(self._data)


class NodeOutput(InputOutputBase, PipelineExpressionMixin):
"""Define one output of a Component."""
Expand All @@ -348,6 +385,7 @@ def __init__(
*,
data: Optional[Union[Output, str]] = None,
owner: Optional[Union["BaseComponent", "PipelineJob"]] = None,
binding_output: Optional["NodeOutput"] = None,
**kwargs,
):
"""Initialize an Output of a component.
Expand All @@ -365,6 +403,8 @@ def __init__(
:type mode: str
:param owner: The owner component of the output, used to calculate binding.
:type owner: Union[azure.ai.ml.entities.BaseNode, azure.ai.ml.entities.PipelineJob]
:param binding_output: The node output bound to pipeline output, only available for pipeline.
:type binding_output: azure.ai.ml.entities.NodeOutput
:param kwargs: A dictionary of additional configuration parameters.
:type kwargs: dict
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if object cannot be successfully validated.
Expand All @@ -387,6 +427,8 @@ def __init__(
self._assert_name_and_version()

self._is_control = meta.is_control if meta is not None else None
# store original node output to be able to trace back to inner node from a pipeline output builder.
self._binding_output = binding_output

@property
def is_control(self) -> str:
Expand Down Expand Up @@ -536,6 +578,7 @@ def _deepcopy(self):
data=copy.copy(self._data),
owner=self._owner,
meta=self._meta,
binding_output=self._binding_output,
)


Expand Down
3 changes: 3 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,6 @@ def copy_output_setting(source: Union["Output", "NodeOutput"], target: "NodeOutp
return
if source._data:
target._data = copy.deepcopy(source._data)
# copy pipeline component output's node output to subgraph builder
if source._binding_output is not None:
target._binding_output = source._binding_output
54 changes: 54 additions & 0 deletions sdk/ml/azure-ai-ml/tests/dsl/_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import contextlib
from typing import Union, List

from azure.ai.ml._schema.pipeline import PipelineJobSchema
from azure.ai.ml._schema.pipeline.pipeline_component import PipelineJobsField
from azure.ai.ml.entities import PipelineJob, PipelineComponent, Pipeline
from azure.ai.ml.entities._builders import BaseNode
from azure.ai.ml.entities._job.pipeline._io import NodeInput

_DSL_TIMEOUT_SECOND = 20 * 60 # timeout for dsl's tests, unit in second.

Expand All @@ -15,3 +19,53 @@ def include_private_preview_nodes_in_pipeline():
yield
finally:
PipelineJobSchema._declared_fields["jobs"] = original_jobs


def expand_pipeline_nodes(pipeline: Union[PipelineJob, PipelineComponent]):
"""Expand pipeline nodes to a list of nodes. All sub-graphs will be expanded."""
nodes = []
for node in pipeline.jobs.values():
if isinstance(node, Pipeline):
pipeline_component = node.component
if not isinstance(pipeline_component, PipelineComponent):
raise RuntimeError(
"Pipeline component must be a PipelineComponent object, but got {}".format(type(pipeline_component))
)
nodes.extend(expand_pipeline_nodes(pipeline_component))
else:
nodes.append(node)
return nodes


def get_predecessors(node) -> List[BaseNode]:
"""Return list of predecessors for current node.
Note: Only non-control flow nodes in @pipeline are supported.
Node: For sub-graph node, we will trace back to inner node and return.
Example:
@pipeline
def sub_pipeline():
inner_node = component_func()
return inner_node.outputs
@pipeline
def root_pipeline():
pipeline_node = sub_pipeline()
node1 = component_func(input1=pipeline_node.outputs.output1)
node2 = component_func(
input1=pipeline_node.outputs.output1
input2=node1.outputs.output1
)
# pipeline_node.get_predecessors() will return []
# node1.get_predecessors() will return [inner_node]
# node2.get_predecessors() will return [inner_node, node1]
"""

# use {id: instance} dict to avoid nodes with component and parameters being duplicated
predecessors = {}
for _, input_value in node.inputs.items():
if not isinstance(input_value, NodeInput):
continue
owner = input_value._get_data_owner()
if owner is not None:
predecessors[owner._instance_id] = owner
return list(predecessors.values())
110 changes: 109 additions & 1 deletion sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from test_configs.dsl_pipeline import data_binding_expression
from test_utilities.utils import assert_job_cancel, omit_with_wildcard, prepare_dsl_curated

from .._util import _DSL_TIMEOUT_SECOND
from .._util import _DSL_TIMEOUT_SECOND, get_predecessors

tests_root_dir = Path(__file__).parent.parent.parent
components_dir = tests_root_dir / "test_configs/components/"
Expand Down Expand Up @@ -3226,6 +3226,114 @@ def pipeline_func(param: str = "2"):
# check if all the fields are correctly serialized
pipeline_job.component._get_anonymous_hash()

def test_get_predecessors(self):
component_yaml = components_dir / "2in2out.yaml"
component_func = load_component(source=component_yaml)

# case1.1: predecessor from same node
@dsl.pipeline()
def pipeline1():
node1 = component_func()
node1.name = "node1"
assert get_predecessors(node1) == []
node2 = component_func(input1=node1.outputs.output1, input2=node1.outputs.output2)
assert ["node1"] == [n.name for n in get_predecessors(node2)]
return node1.outputs

pipeline1()

# case1.2: predecessor from different node
@dsl.pipeline()
def pipeline2():
node1 = component_func()
node1.name = "node1"
assert get_predecessors(node1) == []

node2 = component_func()
node2.name = "node2"
assert get_predecessors(node2) == []

node2 = component_func(input1=node1.outputs.output1, input2=node2.outputs.output2)
assert ["node1", "node2"] == [n.name for n in get_predecessors(node2)]
return node2.outputs

pipeline2()

# case 2.1: predecessor from same sub pipeline
@dsl.pipeline()
def pipeline3():
sub1 = pipeline1()
node3 = component_func(input1=sub1.outputs.output1, input2=sub1.outputs.output2)
assert ["node1"] == [n.name for n in get_predecessors(node3)]

pipeline3()

# case 2.2: predecessor from different sub pipeline
@dsl.pipeline()
def pipeline4():
sub1 = pipeline1()
sub2 = pipeline2()
node3 = component_func(input1=sub1.outputs.output1, input2=sub2.outputs.output2)
assert ["node1", "node2"] == [n.name for n in get_predecessors(node3)]

pipeline4()

# case 3.1: predecessor from different outer node
@dsl.pipeline()
def sub_pipeline_1(input1: Input, input2: Input):
node1 = component_func(input1=input1, input2=input2)
assert ["outer1", "outer2"] == [n.name for n in get_predecessors(node1)]

@dsl.pipeline()
def pipeline5():
outer1 = component_func()
outer1.name = "outer1"
outer2 = component_func()
outer2.name = "outer2"
sub_pipeline_1(input1=outer1.outputs.output1, input2=outer2.outputs.output2)

pipeline5()

# case 3.2: predecessor from same outer node
@dsl.pipeline()
def sub_pipeline_2(input1: Input, input2: Input):
node1 = component_func(input1=input1, input2=input2)
assert ["outer1"] == [n.name for n in get_predecessors(node1)]

@dsl.pipeline()
def pipeline6():
outer1 = component_func()
outer1.name = "outer1"
sub_pipeline_2(input1=outer1.outputs.output1, input2=outer1.outputs.output2)

pipeline6()

# case 3.3: predecessor from outer literal value
@dsl.pipeline()
def sub_pipeline_3(input1: Input, input2: Input):
node1 = component_func(input1=input1, input2=input2)
assert [] == [n.name for n in get_predecessors(node1)]

@dsl.pipeline()
def pipeline7():
sub_pipeline_3(input1=Input(), input2=Input())

pipeline7()

# case 3.4: predecessor from outer subgraph
@dsl.pipeline()
def sub_pipeline_4(input1: Input, input2: Input):
node1 = component_func(input1=input1, input2=input2)
assert ["node1", "node2"] == [n.name for n in get_predecessors(node1)]

@dsl.pipeline()
def pipeline8():
sub1 = pipeline1()
sub2 = pipeline2()
sub_pipeline_4(input1=sub1.outputs.output1, input2=sub2.outputs.output2)

pipeline8()

def test_pipeline_singularity_strong_type(self, mock_singularity_arm_id: str):
component_yaml = "./tests/test_configs/components/helloworld_component_singularity.yml"
component_func = load_component(component_yaml)
Expand Down
Loading

0 comments on commit c35ccfa

Please sign in to comment.