Skip to content

Commit

Permalink
Optimise pipeline addition and creation (#3730)
Browse files Browse the repository at this point in the history
* Create toposort groups only when needed
* Ensure that the suggest resume test has no node ordering requirement
* Ensure stable toposorting by grouping and ungrouping the result
* Delay toposorting until pipeline.nodes is used
* Avoid using .nodes when topological order or new copy is unneeded
* Copy the nodes only if tags are provided
* Remove unnecessary condition in self.nodes

Signed-off-by: Ivan Danov <[email protected]>
  • Loading branch information
idanov authored Mar 26, 2024
1 parent 2095e24 commit 177b93a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 55 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Kedro commands now work from any subdirectory within a Kedro project.
* Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`.
* Dropped the dependency on `toposort` in favour of the built-in `graphlib` module.
* Improve the performance of `Pipeline` object creation and summing.

## Bug fixes and other changes
* Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings.
Expand Down
91 changes: 36 additions & 55 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ def __init__(
_validate_transcoded_inputs_outputs(nodes_chain)
_tags = set(_to_list(tags))

tagged_nodes = [n.tag(_tags) for n in nodes_chain]
if _tags:
tagged_nodes = [n.tag(_tags) for n in nodes_chain]
else:
tagged_nodes = nodes_chain

self._nodes_by_name = {node.name: node for node in tagged_nodes}
_validate_unique_outputs(tagged_nodes)
Expand All @@ -162,7 +165,17 @@ def __init__(
self._nodes_by_output[_strip_transcoding(output)] = node

self._nodes = tagged_nodes
self._toposorted_nodes = _toposort(self.node_dependencies)
self._toposorter = TopologicalSorter(self.node_dependencies)

# test for circular dependencies without executing the toposort for efficiency
try:
self._toposorter.prepare()
except CycleError as exc:
message = f"Circular dependencies exist among these items: {exc.args[1]}"
raise CircularDependencyError(message) from exc

self._toposorted_nodes: list[Node] = []
self._toposorted_groups: list[list[Node]] = []

def __repr__(self) -> str: # pragma: no cover
"""Pipeline ([node1, ..., node10 ...], name='pipeline_name')"""
Expand All @@ -179,7 +192,7 @@ def __repr__(self) -> str: # pragma: no cover
def __add__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def __radd__(self, other: Any) -> Pipeline:
if isinstance(other, int) and other == 0:
Expand All @@ -189,17 +202,17 @@ def __radd__(self, other: Any) -> Pipeline:
def __sub__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) - set(other.nodes))
return Pipeline(set(self._nodes) - set(other._nodes))

def __and__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) & set(other.nodes))
return Pipeline(set(self._nodes) & set(other._nodes))

def __or__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def all_inputs(self) -> set[str]:
"""All inputs for all nodes in the pipeline.
Expand All @@ -208,7 +221,7 @@ def all_inputs(self) -> set[str]:
All node input names as a Set.
"""
return set.union(set(), *(node.inputs for node in self.nodes))
return set.union(set(), *(node.inputs for node in self._nodes))

def all_outputs(self) -> set[str]:
"""All outputs of all nodes in the pipeline.
Expand All @@ -217,7 +230,7 @@ def all_outputs(self) -> set[str]:
All node outputs.
"""
return set.union(set(), *(node.outputs for node in self.nodes))
return set.union(set(), *(node.outputs for node in self._nodes))

def _remove_intermediates(self, datasets: set[str]) -> set[str]:
intermediate = {_strip_transcoding(i) for i in self.all_inputs()} & {
Expand Down Expand Up @@ -347,6 +360,9 @@ def nodes(self) -> list[Node]:
The list of all pipeline nodes in topological order.
"""
if not self._toposorted_nodes:
self._toposorted_nodes = [n for group in self.grouped_nodes for n in group]

return list(self._toposorted_nodes)

@property
Expand All @@ -360,7 +376,13 @@ def grouped_nodes(self) -> list[list[Node]]:
"""

return _group_toposorted(self._toposorted_nodes, self.node_dependencies)
if not self._toposorted_groups:
while self._toposorter:
group = sorted(self._toposorter.get_ready())
self._toposorted_groups.append(group)
self._toposorter.done(*group)

return [list(group) for group in self._toposorted_groups]

def only_nodes(self, *node_names: str) -> Pipeline:
"""Create a new ``Pipeline`` which will contain only the specified
Expand Down Expand Up @@ -416,7 +438,7 @@ def only_nodes_with_namespace(self, node_namespace: str) -> Pipeline:
"""
nodes = [
n
for n in self.nodes
for n in self._nodes
if n.namespace and n.namespace.startswith(node_namespace)
]
if not nodes:
Expand Down Expand Up @@ -675,7 +697,7 @@ def only_nodes_with_tags(self, *tags: str) -> Pipeline:
of the tags provided are being copied.
"""
unique_tags = set(tags)
nodes = [node for node in self.nodes if unique_tags & node.tags]
nodes = [node for node in self._nodes if unique_tags & node.tags]
return Pipeline(nodes)

def filter( # noqa: PLR0913
Expand Down Expand Up @@ -759,7 +781,7 @@ def filter( # noqa: PLR0913
# would give different outcomes depending on the order of filter methods:
# only_nodes and then from_inputs would give node1, while only_nodes and then
# from_inputs would give node1 and node3.
filtered_pipeline = Pipeline(self.nodes)
filtered_pipeline = Pipeline(self._nodes)
for subset_pipeline in subset_pipelines:
filtered_pipeline &= subset_pipeline

Expand All @@ -778,7 +800,7 @@ def tag(self, tags: str | Iterable[str]) -> Pipeline:
Returns:
New ``Pipeline`` object with nodes tagged.
"""
nodes = [n.tag(tags) for n in self.nodes]
nodes = [n.tag(tags) for n in self._nodes]
return Pipeline(nodes)

def to_json(self) -> str:
Expand All @@ -790,7 +812,7 @@ def to_json(self) -> str:
"outputs": list(n.outputs),
"tags": list(n.tags),
}
for n in self.nodes
for n in self._nodes
]
pipeline_versioned = {
"kedro_version": kedro.__version__,
Expand Down Expand Up @@ -883,47 +905,6 @@ def _validate_transcoded_inputs_outputs(nodes: list[Node]) -> None:
)


def _group_toposorted(
toposorted: Iterable[Node], deps: dict[Node, set[Node]]
) -> list[list[Node]]:
"""Group already toposorted nodes into independent toposorted groups"""
processed: set[Node] = set()
groups = []
group = []
for x in toposorted:
if set(deps.get(x, set())) <= processed:
group.append(x)
elif group:
processed |= set(group)
groups.append(sorted(group))
group = [x]

if group:
groups.append(sorted(group))
return groups


def _toposort(node_dependencies: dict[Node, set[Node]]) -> list[Node]:
"""Topologically sort (order) nodes such that no node depends on
a node that appears earlier in the list.
Raises:
CircularDependencyError: When it is not possible to topologically order
provided nodes.
Returns:
The list of nodes in order of execution.
"""
try:
sorter = TopologicalSorter(node_dependencies)
# Ensure stable toposort by sorting the nodes in a group
groups = _group_toposorted(sorter.static_order(), node_dependencies)
return [n for group in groups for n in group]
except CycleError as exc:
message = f"Circular dependencies exist among these items: {exc.args[1]}"
raise CircularDependencyError(message) from exc


class CircularDependencyError(Exception):
"""Raised when it is not possible to provide a topological execution
order for nodes, due to a circular dependency existing in the node
Expand Down

0 comments on commit 177b93a

Please sign in to comment.