From c35ccfaf97888e5f3ad6affeb4c6f81c38a76f0f Mon Sep 17 00:00:00 2001 From: Han Wang Date: Mon, 17 Apr 2023 13:14:04 +0800 Subject: [PATCH] [ML][Pipelines] Provide API to get node predecessors for @pipeline (#29848) * support get data owner * add more tests * update implementation * add API and tests * add tests for yaml job * fix comment * remove get_predecessors --- .../ai/ml/dsl/_pipeline_component_builder.py | 2 + .../ai/ml/entities/_job/pipeline/_io/base.py | 43 +++ .../azure-ai-ml/azure/ai/ml/entities/_util.py | 3 + sdk/ml/azure-ai-ml/tests/dsl/_util.py | 54 ++++ .../tests/dsl/unittests/test_dsl_pipeline.py | 110 +++++++- .../tests/dsl/unittests/test_io_builder.py | 259 +++++++++++++++++- .../unittests/test_pipeline_job_entity.py | 10 + .../test_configs/components/2in2out.yaml | 20 ++ 8 files changed, 499 insertions(+), 2 deletions(-) create mode 100644 sdk/ml/azure-ai-ml/tests/test_configs/components/2in2out.yaml diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py index d1a09b3215f3..b951b719ffae 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py @@ -278,6 +278,8 @@ def _map_internal_output_type(_meta): meta=output_meta, owner="pipeline", description=self._args_description.get(key, None), + # store original node output to be able to trace back to inner node from a pipeline output builder. + binding_output=value, ) # copy node level output setting to pipeline output copy_output_setting(source=value._owner.outputs[value._port_name], target=pipeline_output) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/base.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/base.py index da8558e53c17..1400b886c320 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/base.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/base.py @@ -337,6 +337,43 @@ def _deepcopy(self): meta=self._meta, ) + def _get_data_owner(self) -> Optional["BaseNode"]: + """Return the node if Input is from another node's output. Returns None if for literal value. + Note: This only works for @pipeline, not for YAML pipeline. + + Note: Inner step will be returned as the owner when node's input is from sub pipeline's output. + @pipeline + def sub_pipeline(): + inner_node = component_func() + return inner_node.outputs + + @pipeline + def root_pipeline(): + pipeline_node = sub_pipeline() + node = copy_files_component_func(input_dir=pipeline_node.outputs.output_dir) + owner = node.inputs.input_dir._get_data_owner() + assert owner == pipeline_node.nodes[0] + """ + from azure.ai.ml.entities import Pipeline + from azure.ai.ml.entities._builders import BaseNode + + def _resolve_data_owner(data) -> Optional["BaseNode"]: + if isinstance(data, BaseNode) and not isinstance(data, Pipeline): + return data + while isinstance(data, PipelineInput): + # for pipeline input, it's original value(can be literal value or another node's output) + # is stored in _original_data + return _resolve_data_owner(data._original_data) + if isinstance(data, NodeOutput): + if isinstance(data._owner, Pipeline): + # for input from subgraph's output, trace back to inner node + return _resolve_data_owner(data._binding_output) + # for input from another node's output, return the node + return _resolve_data_owner(data._owner) + return None + + return _resolve_data_owner(self._data) + class NodeOutput(InputOutputBase, PipelineExpressionMixin): """Define one output of a Component.""" @@ -348,6 +385,7 @@ def __init__( *, data: Optional[Union[Output, str]] = None, owner: Optional[Union["BaseComponent", "PipelineJob"]] = None, + binding_output: Optional["NodeOutput"] = None, **kwargs, ): """Initialize an Output of a component. @@ -365,6 +403,8 @@ def __init__( :type mode: str :param owner: The owner component of the output, used to calculate binding. :type owner: Union[azure.ai.ml.entities.BaseNode, azure.ai.ml.entities.PipelineJob] + :param binding_output: The node output bound to pipeline output, only available for pipeline. + :type binding_output: azure.ai.ml.entities.NodeOutput :param kwargs: A dictionary of additional configuration parameters. :type kwargs: dict :raises ~azure.ai.ml.exceptions.ValidationException: Raised if object cannot be successfully validated. @@ -387,6 +427,8 @@ def __init__( self._assert_name_and_version() self._is_control = meta.is_control if meta is not None else None + # store original node output to be able to trace back to inner node from a pipeline output builder. + self._binding_output = binding_output @property def is_control(self) -> str: @@ -536,6 +578,7 @@ def _deepcopy(self): data=copy.copy(self._data), owner=self._owner, meta=self._meta, + binding_output=self._binding_output, ) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py index ead43310317e..6fb5d6cd8245 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py @@ -478,3 +478,6 @@ def copy_output_setting(source: Union["Output", "NodeOutput"], target: "NodeOutp return if source._data: target._data = copy.deepcopy(source._data) + # copy pipeline component output's node output to subgraph builder + if source._binding_output is not None: + target._binding_output = source._binding_output diff --git a/sdk/ml/azure-ai-ml/tests/dsl/_util.py b/sdk/ml/azure-ai-ml/tests/dsl/_util.py index 9e82d8b09551..824a74ca6ea7 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/_util.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/_util.py @@ -1,7 +1,11 @@ import contextlib +from typing import Union, List from azure.ai.ml._schema.pipeline import PipelineJobSchema from azure.ai.ml._schema.pipeline.pipeline_component import PipelineJobsField +from azure.ai.ml.entities import PipelineJob, PipelineComponent, Pipeline +from azure.ai.ml.entities._builders import BaseNode +from azure.ai.ml.entities._job.pipeline._io import NodeInput _DSL_TIMEOUT_SECOND = 20 * 60 # timeout for dsl's tests, unit in second. @@ -15,3 +19,53 @@ def include_private_preview_nodes_in_pipeline(): yield finally: PipelineJobSchema._declared_fields["jobs"] = original_jobs + + +def expand_pipeline_nodes(pipeline: Union[PipelineJob, PipelineComponent]): + """Expand pipeline nodes to a list of nodes. All sub-graphs will be expanded.""" + nodes = [] + for node in pipeline.jobs.values(): + if isinstance(node, Pipeline): + pipeline_component = node.component + if not isinstance(pipeline_component, PipelineComponent): + raise RuntimeError( + "Pipeline component must be a PipelineComponent object, but got {}".format(type(pipeline_component)) + ) + nodes.extend(expand_pipeline_nodes(pipeline_component)) + else: + nodes.append(node) + return nodes + + +def get_predecessors(node) -> List[BaseNode]: + """Return list of predecessors for current node. + + Note: Only non-control flow nodes in @pipeline are supported. + Node: For sub-graph node, we will trace back to inner node and return. + Example: + @pipeline + def sub_pipeline(): + inner_node = component_func() + return inner_node.outputs + @pipeline + def root_pipeline(): + pipeline_node = sub_pipeline() + node1 = component_func(input1=pipeline_node.outputs.output1) + node2 = component_func( + input1=pipeline_node.outputs.output1 + input2=node1.outputs.output1 + ) + # pipeline_node.get_predecessors() will return [] + # node1.get_predecessors() will return [inner_node] + # node2.get_predecessors() will return [inner_node, node1] + """ + + # use {id: instance} dict to avoid nodes with component and parameters being duplicated + predecessors = {} + for _, input_value in node.inputs.items(): + if not isinstance(input_value, NodeInput): + continue + owner = input_value._get_data_owner() + if owner is not None: + predecessors[owner._instance_id] = owner + return list(predecessors.values()) diff --git a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py index ef08e0f274ca..d391069d644e 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py @@ -53,7 +53,7 @@ from test_configs.dsl_pipeline import data_binding_expression from test_utilities.utils import assert_job_cancel, omit_with_wildcard, prepare_dsl_curated -from .._util import _DSL_TIMEOUT_SECOND +from .._util import _DSL_TIMEOUT_SECOND, get_predecessors tests_root_dir = Path(__file__).parent.parent.parent components_dir = tests_root_dir / "test_configs/components/" @@ -3226,6 +3226,114 @@ def pipeline_func(param: str = "2"): # check if all the fields are correctly serialized pipeline_job.component._get_anonymous_hash() + def test_get_predecessors(self): + component_yaml = components_dir / "2in2out.yaml" + component_func = load_component(source=component_yaml) + + # case1.1: predecessor from same node + @dsl.pipeline() + def pipeline1(): + node1 = component_func() + node1.name = "node1" + assert get_predecessors(node1) == [] + node2 = component_func(input1=node1.outputs.output1, input2=node1.outputs.output2) + assert ["node1"] == [n.name for n in get_predecessors(node2)] + return node1.outputs + + pipeline1() + + # case1.2: predecessor from different node + @dsl.pipeline() + def pipeline2(): + node1 = component_func() + node1.name = "node1" + assert get_predecessors(node1) == [] + + node2 = component_func() + node2.name = "node2" + assert get_predecessors(node2) == [] + + node2 = component_func(input1=node1.outputs.output1, input2=node2.outputs.output2) + assert ["node1", "node2"] == [n.name for n in get_predecessors(node2)] + return node2.outputs + + pipeline2() + + # case 2.1: predecessor from same sub pipeline + @dsl.pipeline() + def pipeline3(): + sub1 = pipeline1() + node3 = component_func(input1=sub1.outputs.output1, input2=sub1.outputs.output2) + assert ["node1"] == [n.name for n in get_predecessors(node3)] + + pipeline3() + + # case 2.2: predecessor from different sub pipeline + @dsl.pipeline() + def pipeline4(): + sub1 = pipeline1() + sub2 = pipeline2() + node3 = component_func(input1=sub1.outputs.output1, input2=sub2.outputs.output2) + assert ["node1", "node2"] == [n.name for n in get_predecessors(node3)] + + pipeline4() + + # case 3.1: predecessor from different outer node + @dsl.pipeline() + def sub_pipeline_1(input1: Input, input2: Input): + node1 = component_func(input1=input1, input2=input2) + assert ["outer1", "outer2"] == [n.name for n in get_predecessors(node1)] + + @dsl.pipeline() + def pipeline5(): + outer1 = component_func() + outer1.name = "outer1" + outer2 = component_func() + outer2.name = "outer2" + sub_pipeline_1(input1=outer1.outputs.output1, input2=outer2.outputs.output2) + + pipeline5() + + # case 3.2: predecessor from same outer node + @dsl.pipeline() + def sub_pipeline_2(input1: Input, input2: Input): + node1 = component_func(input1=input1, input2=input2) + assert ["outer1"] == [n.name for n in get_predecessors(node1)] + + @dsl.pipeline() + def pipeline6(): + outer1 = component_func() + outer1.name = "outer1" + sub_pipeline_2(input1=outer1.outputs.output1, input2=outer1.outputs.output2) + + pipeline6() + + # case 3.3: predecessor from outer literal value + @dsl.pipeline() + def sub_pipeline_3(input1: Input, input2: Input): + node1 = component_func(input1=input1, input2=input2) + assert [] == [n.name for n in get_predecessors(node1)] + + @dsl.pipeline() + def pipeline7(): + sub_pipeline_3(input1=Input(), input2=Input()) + + pipeline7() + + # case 3.4: predecessor from outer subgraph + @dsl.pipeline() + def sub_pipeline_4(input1: Input, input2: Input): + node1 = component_func(input1=input1, input2=input2) + assert ["node1", "node2"] == [n.name for n in get_predecessors(node1)] + + @dsl.pipeline() + def pipeline8(): + sub1 = pipeline1() + sub2 = pipeline2() + sub_pipeline_4(input1=sub1.outputs.output1, input2=sub2.outputs.output2) + + pipeline8() + def test_pipeline_singularity_strong_type(self, mock_singularity_arm_id: str): component_yaml = "./tests/test_configs/components/helloworld_component_singularity.yml" component_func = load_component(component_yaml) diff --git a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_io_builder.py b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_io_builder.py index a5215bd5bd75..442cb9c78f97 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_io_builder.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_io_builder.py @@ -9,7 +9,7 @@ from azure.ai.ml.entities._job.pipeline._io.base import _resolve_builders_2_data_bindings from azure.ai.ml.exceptions import UserErrorException -from .._util import _DSL_TIMEOUT_SECOND +from .._util import _DSL_TIMEOUT_SECOND, expand_pipeline_nodes tests_root_dir = Path(__file__).parent.parent.parent components_dir = tests_root_dir / "test_configs/components/" @@ -20,6 +20,18 @@ ] +def assert_node_owners_expected(pipeline_job, expected_owners: dict, input_name: str): + nodes = expand_pipeline_nodes(pipeline_job) + + actual_owners = {} + for node in nodes: + owner = node.inputs[input_name]._get_data_owner() + if owner: + owner = owner.name + actual_owners[node.name] = owner + assert actual_owners == expected_owners + + @pytest.mark.usefixtures("enable_pipeline_private_preview_features") @pytest.mark.timeout(_DSL_TIMEOUT_SECOND) @pytest.mark.unittest @@ -197,3 +209,248 @@ def pipeline_func(int_param: int): "Type " "is not supported for operation bool()." ) + + def test_input_get_data_owner(self): + component_yaml = components_dir / "helloworld_component.yml" + component_func1 = load_component(source=component_yaml) + + # case1: node input from another node's output + @pipeline + def another_nodes_output(): + node1 = component_func1(component_in_number=1, component_in_path=Input(path="test_path")) + node1.name = "node1" + node2 = component_func1(component_in_number=2, component_in_path=node1.outputs.component_out_path) + node2.name = "node2" + assert node2.inputs.component_in_path._get_data_owner().name == "node1" + + assert_node_owners_expected( + pipeline_job=another_nodes_output(), + expected_owners={"node1": None, "node2": "node1"}, + input_name="component_in_path", + ) + + # case2.1: node input from pipeline input, which has literal value + @pipeline + def literal_pipeline_val(component_in_path: Input): + node2 = component_func1(component_in_number=2, component_in_path=component_in_path) + node2.name = "node2" + assert node2.inputs.component_in_path._get_data_owner() == None + + assert_node_owners_expected( + pipeline_job=literal_pipeline_val(component_in_path=Input(path="test_path")), + expected_owners={"node2": None}, + input_name="component_in_path", + ) + + # case2.2: node input from pipeline input, which is from another node's output + @pipeline + def sub_pipeline(component_in_path: Input): + node2 = component_func1(component_in_number=2, component_in_path=component_in_path) + node2.name = "node2" + assert node2.inputs.component_in_path._get_data_owner().name == "node1" + + @pipeline + def parent_pipeline(): + node1 = component_func1(component_in_number=1, component_in_path=Input(path="test_path")) + node1.name = "node1" + sub_pipeline(component_in_path=node1.outputs.component_out_path) + + assert_node_owners_expected( + pipeline_job=parent_pipeline(), + expected_owners={"node1": None, "node2": "node1"}, + input_name="component_in_path", + ) + + # case2.3: node input from pipeline input, which is from subgraph's output + @pipeline + def sub_pipeline1(component_in_path: Input): + node1 = component_func1(component_in_number=2, component_in_path=component_in_path) + return node1.outputs + + @pipeline + def sub_pipeline2(component_in_path: Input): + node2 = component_func1(component_in_number=2, component_in_path=component_in_path) + assert node2.inputs.component_in_path._get_data_owner().name == "node1" + + @pipeline + def parent_pipeline(): + src = component_func1(component_in_number=1, component_in_path=Input(path="test_path")) + sub1 = sub_pipeline1(component_in_path=src) + sub_pipeline2(component_in_path=sub1.outputs.component_out_path) + + assert_node_owners_expected( + pipeline_job=parent_pipeline(), + expected_owners={"src": None, "node1": "src", "node2": "node1"}, + input_name="component_in_path", + ) + + # case3.1: node input from subgraph's output, which is from a normal node + @pipeline + def sub_pipeline(component_in_path: Input): + sub_node = component_func1(component_in_number=2, component_in_path=component_in_path) + return sub_node.outputs + + @pipeline + def parent_pipeline(): + node1 = sub_pipeline(component_in_path=Input(path="test_path")) + node3 = component_func1(component_in_number=3, component_in_path=node1.outputs.component_out_path) + assert node3.inputs.component_in_path._get_data_owner().name == "sub_node" + return node3 + + assert_node_owners_expected( + pipeline_job=parent_pipeline(), + expected_owners={"sub_node": None, "node3": "sub_node"}, + input_name="component_in_path", + ) + + # case3.2: node input from subgraph's output, which is from another subgraph + @pipeline + def sub_pipeline_1(component_in_path: Input): + sub_node_1 = component_func1(component_in_number=2, component_in_path=component_in_path) + return sub_node_1.outputs + + @pipeline + def sub_pipeline_2(component_in_path: Input): + sub_node_2 = sub_pipeline_1(component_in_path=component_in_path) + return sub_node_2.outputs + + @pipeline + def parent_pipeline(): + node1 = sub_pipeline_2(component_in_path=Input(path="test_path")) + node3 = component_func1(component_in_number=3, component_in_path=node1.outputs.component_out_path) + assert node3.inputs.component_in_path._get_data_owner().name == "sub_node_1" + return node3 + + assert_node_owners_expected( + pipeline_job=parent_pipeline(), + expected_owners={"sub_node_1": None, "node3": "sub_node_1"}, + input_name="component_in_path", + ) + + def test_input_get_data_owner_multiple_subgraph(self): + component_yaml = components_dir / "helloworld_component.yml" + component_func1 = load_component(source=component_yaml) + + @pipeline + def sub_pipeline(component_in_path: Input): + inner_node = component_func1(component_in_number=2, component_in_path=component_in_path) + return inner_node.outputs + + @pipeline + def parent_pipeline(): + node1 = component_func1(component_in_number=1, component_in_path=Input(path="test_path1")) + node1.name = "node1" + sub1 = sub_pipeline(component_in_path=node1.outputs.component_out_path) + after1 = component_func1(component_in_path=sub1.outputs.component_out_path) + source_of_branch_1 = after1.inputs.component_in_path._get_data_owner() + assert source_of_branch_1.name == "inner_node" + + node2 = component_func1(component_in_number=3, component_in_path=Input(path="test_path2")) + node2.name = "node2" + sub2 = sub_pipeline(component_in_path=node2.outputs.component_out_path) + after2 = component_func1(component_in_path=sub2.outputs.component_out_path) + source_of_branch_2 = after2.inputs.component_in_path._get_data_owner() + assert source_of_branch_2.name == "inner_node" + + # subgraph called twice, source for each branch should not be the same + assert source_of_branch_1._instance_id != source_of_branch_2._instance_id + # one is from node1, the other is from node2 + assert source_of_branch_1.inputs.component_in_path._get_data_owner().name == "node1" + assert source_of_branch_2.inputs.component_in_path._get_data_owner().name == "node2" + + parent_pipeline() + + def test_input_get_data_owner_multiple_level_pipeline(self): + component_yaml = components_dir / "helloworld_component.yml" + component_func1 = load_component(source=component_yaml) + + # case1: multi-level pipeline input + @pipeline + def pipeline_level1(component_in_path: Input): + node1 = component_func1(component_in_path=component_in_path) + assert node1.inputs.component_in_path._get_data_owner().name == "src" + return node1.outputs + + @pipeline + def pipeline_level2(component_in_path: Input): + node2 = pipeline_level1(component_in_path=component_in_path) + return node2.outputs + + @pipeline + def pipeline_level3(): + src = component_func1(component_in_path=Input(path="test_path")) + src.name = "src" + node3 = pipeline_level2(component_in_path=src) + return node3.outputs + + assert_node_owners_expected( + pipeline_job=pipeline_level3(), + expected_owners={"src": None, "node1": "src"}, + input_name="component_in_path", + ) + + # case2: multi-level pipeline output + @pipeline + def pipeline_level1(): + node1 = component_func1(component_in_path=Input(path="test_path")) + node1.name = "node1" + return node1.outputs + + @pipeline + def pipeline_level2(): + node2 = pipeline_level1() + return node2.outputs + + @pipeline + def pipeline_level3(): + node3 = pipeline_level2() + node3.name = "node3" + dst = component_func1(component_in_path=node3.outputs.component_out_path) + assert dst.inputs.component_in_path._get_data_owner().name == "node1" + return dst.outputs + + assert_node_owners_expected( + pipeline_job=pipeline_level3(), + expected_owners={"node1": None, "dst": "node1"}, + input_name="component_in_path", + ) + + def test_input_get_data_owner_complex_case(self): + component_yaml = components_dir / "helloworld_component.yml" + component_func1 = load_component(source=component_yaml) + + @pipeline + def sub_pipeline(component_in_path: Input): + inner_node = component_func1(component_in_path=component_in_path) + # node input from pipeline input and it's actual value is from another node's output + assert inner_node.inputs.component_in_path._get_data_owner().name == "node1" + # node input left empty, owner should be None + assert inner_node.inputs.component_in_number._get_data_owner() is None + return inner_node.outputs + + @pipeline + def my_pipeline(): + node1 = component_func1(component_in_number=1, component_in_path=Input(path="test_path")) + node1.name = "node1" + # node input literal value, don't have owner + assert node1.inputs.component_in_number._get_data_owner() is None + assert node1.inputs.component_in_path._get_data_owner() is None + + node2 = sub_pipeline(component_in_path=node1.outputs.component_out_path) + node2.name = "node2" + # node input from another node's output + assert node2.inputs.component_in_path._get_data_owner().name == "node1" + + node3 = component_func1(component_in_path=node2.outputs.component_out_path) + node3.name = "node3" + # node input from another (pipeline) node's output + assert node3.inputs.component_in_path._get_data_owner().name == "inner_node" + return node3.outputs + + my_pipeline = my_pipeline() + + assert_node_owners_expected( + pipeline_job=my_pipeline, + expected_owners={"node1": None, "inner_node": "node1", "node3": "inner_node"}, + input_name="component_in_path", + ) diff --git a/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py b/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py index c03b03e308ed..1e0439287bd8 100644 --- a/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py +++ b/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py @@ -6,6 +6,8 @@ import yaml from marshmallow import ValidationError from pytest_mock import MockFixture + +from dsl._util import get_predecessors from test_utilities.utils import omit_with_wildcard, verify_entity_load_and_dump from azure.ai.ml import MLClient, dsl, load_component, load_job @@ -2092,3 +2094,11 @@ def test_pipeline_job_automl_with_job_tier_in_pipeline(self) -> None: # similar to sweep job, automl job job_tier value is also lowercase. rest_obj = pipeline_job._to_rest_object() assert rest_obj.properties.jobs["text_ner_node"]["queue_settings"] == {"job_tier": "spot"} + + def test_get_predecessors_for_pipeline_job(self) -> None: + test_path = "./tests/test_configs/pipeline_jobs/helloworld_pipeline_job_with_component_output.yml" + pipeline: PipelineJob = load_job(source=test_path) + # get_predecessors is not supported for YAML job + assert get_predecessors(pipeline.jobs["hello_world_component_1"]) == [] + assert get_predecessors(pipeline.jobs["hello_world_component_2"]) == [] + assert get_predecessors(pipeline.jobs["merge_component_outputs"]) == [] diff --git a/sdk/ml/azure-ai-ml/tests/test_configs/components/2in2out.yaml b/sdk/ml/azure-ai-ml/tests/test_configs/components/2in2out.yaml new file mode 100644 index 000000000000..fd22bf0e4394 --- /dev/null +++ b/sdk/ml/azure-ai-ml/tests/test_configs/components/2in2out.yaml @@ -0,0 +1,20 @@ +$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json +name: one_in_two_out +display_name: One In Two Out (Split) +version: 0.0.1 +type: command +inputs: + input1: + type: uri_folder + input2: + type: uri_folder +outputs: + output1: + type: uri_folder + output2: + type: uri_folder +environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:33 +command: >- + echo ${{inputs.input1}} && + echo ${{outputs.output1}} && + echo ${{outputs.output2}}