Skip to content

Commit

Permalink
[KED-2436] Create modular pipeline visualisation MVP using the tags c…
Browse files Browse the repository at this point in the history
…oncept (#421)

* [KED-1951] Backend to send modular pipelines to kedro viz (#394)

* WIP add modular pipelines

* Expose modular pipelines and add testing data

* Lint

* undo push of package-lock

* Revert package lock

* Fix lint

* Return modular_pipelines in pipeline data endpoint for selected pipeline + update test data

* Address comments on PR

* Cleanup and lint

* Add modular pipelines to datasets and parameter nodes. Some refactoring for clarity

* Temporarily skip js tests to make sure all python stuff works

* Put back JS tests for CI

* First iteration of addressing comments on PR

* Correctly deduce modular pipeline from dataset

* Add all modular pipelines for all nodes

* Check that dataset namespace is actually a modular pipeline

* Undo check if namespace is modular pipeline

* updated FE tests to match new animals dataset (#401)

* Add modular pipelines to parameter nodes (#402)

* WIP add modular pipelines

* Expose modular pipelines and add testing data

* Lint

* undo push of package-lock

* Revert package lock

* Revert package lock

* Fix lint

* Return modular_pipelines in pipeline data endpoint for selected pipeline + update test data

* Address comments on PR

* Cleanup and lint

* Add modular pipelines to datasets and parameter nodes. Some refactoring for clarity

* Temporarily skip js tests to make sure all python stuff works

* Put back JS tests for CI

* First iteration of addressing comments on PR

* Correctly deduce modular pipeline from dataset

* Add all modular pipelines for all nodes

* Check that dataset namespace is actually a modular pipeline

* Undo check if namespace is modular pipeline

* Add modular pipelines for parameter nodes

* Verify if modular pipelines listed are actual modular piplines

* Temporarily disable JS tests to make sure other steps pass

* Put back JS tests, all other checks pass ✅

* Update package/kedro_viz/server.py

Co-authored-by: Ivan Danov <[email protected]>

* Address review comments

* Treat dataset node modular pipelines the same as task node modular pipelines.

Co-authored-by: Ivan Danov <[email protected]>

* Send correct format for modular pipelines from /pipelines endpoint (#408)

* Modular pipeline tags concept (#410)

* set up store functions for incoming modular pipeline data

* added additional test for modular pipeline

* set up flag for modular pipeline

* set up selector to get list of modular pipelines and nodes

* add ncheck for node modular pipeline data in selector

* set up modular pipeline on sidebar

* refactor node-list to enable change of both modular pipeline and tags

* further setup reducer and node selection

* added item label check

* hide modular pipeline features behind a flag

* fix failing tests and set up new data set

* added tests for modular pipeline actions

* further revisions

* enable indented styling for lazy list

* update readme to explain modular pipeline and tag filtering behaviour

* Fix pylint

* updates as per PR comments

* further adjustments per PR comments

* update tests to reflect latest PR changes

* refactor getItemLabel in node-list-row-list

* fix spelling in random-data

* further refactoring of getItemLabel

Co-authored-by: Merel Theisen <[email protected]>

* quick fix to ensure selector works for pipelines with no defined modular pipelines

* delete unneeded selector

* delete unneeded selector

* Bugfix: Ensure JSON backwards-compatibility

The application should still work without throwing an error, even when "modular_pipelines" is not present in the JSON dataset

Co-authored-by: Merel Theisen <[email protected]>
Co-authored-by: Ivan Danov <[email protected]>
Co-authored-by: Merel Theisen <[email protected]>
Co-authored-by: Richard Westenra <[email protected]>
  • Loading branch information
5 people authored Apr 16, 2021
1 parent a72a3dd commit d35b1d8
Show file tree
Hide file tree
Showing 28 changed files with 1,050 additions and 296 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The following flags are available to toggle experimental features:

- `oldgraph` - From release v3.8.0. Display old version of graph (dagre algorithm) without improved graphing algorithm. (default `false`)
- `sizewarning` - From release v3.9.1. Show a warning before rendering very large graphs. (default `true`)
- `modularpipeline` - From release v3.11.0. Enables filtering of nodes by modular pipelines. Note that selecting both modular pipeline and tag filters will only return nodes that belongs to both categories. (default `false`).

Note that newgraph has been removed from v3.8.0 onwards and is now the default functionality. Should there be issues with your project, see the oldgraph flag above.

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@
"not op_mini all"
],
"snyk": true
}
}
6 changes: 2 additions & 4 deletions package/features/steps/cli_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ def create_config_file_with_example(context):

@given("I have run a non-interactive kedro new")
def create_project_from_config_file(context):
"""Behave step to run kedro new given the config I previously created.
"""
"""Behave step to run kedro new given the config I previously created."""
res = run(
[context.kedro, "new", "-c", str(context.config_file)],
env=context.env,
Expand All @@ -82,8 +81,7 @@ def create_project_from_config_file(context):

@given("I have run a non-interactive kedro new with {starter} starter")
def create_project_with_starter(context, starter):
"""Behave step to run kedro new given the config I previously created.
"""
"""Behave step to run kedro new given the config I previously created."""
res = run(
[
context.kedro,
Expand Down
187 changes: 147 additions & 40 deletions package/kedro_viz/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,15 @@ def _pretty_name(name: str) -> str:
return " ".join(parts)


def format_pipelines_data(pipelines: Dict[str, "Pipeline"]) -> Dict[str, list]:
def _pretty_modular_pipeline_name(modular_pipeline: str) -> str:
"""Takes the namespace of a modular pipeline and prettifies the
last part to show as the modular pipeline name."""
chunks = modular_pipeline.split(".")
last_chunk = chunks[-1]
return _pretty_name(last_chunk)


def format_pipelines_data(pipelines: Dict[str, "Pipeline"]) -> Dict[str, Any]:
"""
Format pipelines and catalog data from Kedro for kedro-viz.
Expand All @@ -317,6 +325,8 @@ def format_pipelines_data(pipelines: Dict[str, "Pipeline"]) -> Dict[str, list]:
# keep track of node_id -> set(child_node_ids) for layers sorting
node_dependencies = defaultdict(set)
tags = set()
# keep track of modular pipelines
modular_pipelines = set()

for pipeline_key, pipeline in pipelines.items():
pipelines_list.append({"id": pipeline_key, "name": _pretty_name(pipeline_key)})
Expand All @@ -328,6 +338,7 @@ def format_pipelines_data(pipelines: Dict[str, "Pipeline"]) -> Dict[str, list]:
tags,
edges_list,
nodes_list,
modular_pipelines,
)

# sort tags
Expand All @@ -342,18 +353,39 @@ def format_pipelines_data(pipelines: Dict[str, "Pipeline"]) -> Dict[str, list]:
else pipelines_list[0]["id"]
)

sorted_modular_pipelines = _sort_and_format_modular_pipelines(modular_pipelines)
_remove_non_modular_pipelines(nodes_list, modular_pipelines)

return {
"nodes": nodes_list,
"edges": edges_list,
"tags": sorted_tags,
"layers": sorted_layers,
"pipelines": pipelines_list,
"selected_pipeline": selected_pipeline,
"modular_pipelines": sorted_modular_pipelines,
}


def _is_namespace_param(namespace: str) -> bool:
"""Returns whether a dataset namespace is a parameter"""
def _remove_non_modular_pipelines(nodes_list, modular_pipelines):
"""Check parameter nodes only contain existing modular pipelines from the task nodes
and remove those listed that aren't modular pipelines.
Args:
nodes_list: List of all nodes.
modular_pipelines: Set of modular pipelines for all nodes.
"""
for node in nodes_list:
if node["type"] == "parameters" and node["modular_pipelines"]:
pipes = [
pipe for pipe in node["modular_pipelines"] if pipe in modular_pipelines
]
node["modular_pipelines"] = sorted(pipes)


def _is_dataset_param(namespace: str) -> bool:
"""Returns whether a dataset is a parameter"""
return namespace.lower().startswith("param")


Expand All @@ -366,6 +398,7 @@ def format_pipeline_data(
tags: Set[str],
edges_list: List[dict],
nodes_list: List[dict],
modular_pipelines: Set[str],
) -> None:
"""Format pipeline and catalog data from Kedro for kedro-viz.
Expand All @@ -376,12 +409,13 @@ def format_pipeline_data(
node_dependencies: Dictionary of id and node dependencies.
edges_list: List of all edges.
nodes_list: List of all nodes.
modular_pipelines: Set of modular pipelines for all nodes.
"""
# keep_track of {data_set_namespace -> set(tags)}
namespace_tags = defaultdict(set)
# keep track of {data_set_namespace -> layer it belongs to}
namespace_to_layer = {}
# keep_track of {dataset_full_name -> set(tags)}
dataset_name_tags = defaultdict(set)
# keep track of {dataset_full_name -> layer it belongs to}
dataset_name_to_layer = {}

dataset_to_layer = _construct_layer_mapping()

Expand All @@ -390,6 +424,11 @@ def format_pipeline_data(
task_id = _hash(str(node))
tags.update(node.tags)
_JSON_NODES[task_id] = {"type": "task", "obj": node}

# Modular pipelines the current node is part of.
node_modular_pipelines = _expand_namespaces(node.namespace)
modular_pipelines.update(node_modular_pipelines)

if task_id not in nodes:
nodes[task_id] = {
"type": "task",
Expand All @@ -398,72 +437,124 @@ def format_pipeline_data(
"full_name": getattr(node, "_func_name", str(node)),
"tags": sorted(node.tags),
"pipelines": [pipeline_key],
"modular_pipelines": sorted(node_modular_pipelines),
}
nodes_list.append(nodes[task_id])
else:
nodes[task_id]["pipelines"].append(pipeline_key)

for data_set in node.inputs:
namespace = data_set.split("@")[0]
namespace_to_layer[namespace] = dataset_to_layer.get(data_set)
namespace_id = _hash(namespace)
edge = {"source": namespace_id, "target": task_id}
dataset_full_name = data_set.split("@")[0]
dataset_name_to_layer[dataset_full_name] = dataset_to_layer.get(data_set)
dataset_id = _hash(dataset_full_name)
edge = {"source": dataset_id, "target": task_id}
if edge not in edges_list:
edges_list.append(edge)
namespace_tags[namespace].update(node.tags)
node_dependencies[namespace_id].add(task_id)
dataset_name_tags[dataset_full_name].update(node.tags)
node_dependencies[dataset_id].add(task_id)

# if it is a parameter, add it to the node's data
if _is_namespace_param(namespace):
if "parameters" not in _JSON_NODES[task_id]:
_JSON_NODES[task_id]["parameters"] = {}

if namespace == "parameters":
_JSON_NODES[task_id]["parameters"] = _get_dataset_data_params(
namespace
).load()
else:
parameter_name = namespace.replace("params:", "")
parameter_value = _get_dataset_data_params(namespace).load()
_JSON_NODES[task_id]["parameters"][parameter_name] = parameter_value
if _is_dataset_param(dataset_full_name):
_add_parameter_data_to_node(dataset_full_name, task_id)

for data_set in node.outputs:
namespace = data_set.split("@")[0]
namespace_to_layer[namespace] = dataset_to_layer.get(data_set)
namespace_id = _hash(namespace)
edge = {"source": task_id, "target": namespace_id}
dataset_full_name = data_set.split("@")[0]
dataset_name_to_layer[dataset_full_name] = dataset_to_layer.get(data_set)
dataset_id = _hash(dataset_full_name)
edge = {"source": task_id, "target": dataset_id}
if edge not in edges_list:
edges_list.append(edge)
namespace_tags[namespace].update(node.tags)
node_dependencies[task_id].add(namespace_id)
dataset_name_tags[dataset_full_name].update(node.tags)
node_dependencies[task_id].add(dataset_id)

# Parameters and data
for namespace, tag_names in sorted(namespace_tags.items()):
is_param = _is_namespace_param(namespace)
node_id = _hash(namespace)
for dataset_full_name, tag_names in sorted(dataset_name_tags.items()):
is_param = _is_dataset_param(dataset_full_name)
node_id = _hash(dataset_full_name)

_JSON_NODES[node_id] = {
"type": "parameters" if is_param else "data",
"obj": _get_dataset_data_params(namespace),
"obj": _get_dataset_data_params(dataset_full_name),
}
if is_param and namespace != "parameters":

parameter_name = ""
if is_param and dataset_full_name != "parameters":
parameter_name = dataset_full_name.replace("params:", "")
# Add "parameter_name" key only for "params:" prefix.
_JSON_NODES[node_id]["parameter_name"] = namespace.replace("params:", "")
_JSON_NODES[node_id]["parameter_name"] = parameter_name

if is_param:
dataset_modular_pipelines = _expand_namespaces(
_get_namespace(parameter_name)
)
else:
dataset_modular_pipelines = _expand_namespaces(
_get_namespace(dataset_full_name)
)
modular_pipelines.update(dataset_modular_pipelines)

if node_id not in nodes:
nodes[node_id] = {
"type": "parameters" if is_param else "data",
"id": node_id,
"name": _pretty_name(namespace),
"full_name": namespace,
"name": _pretty_name(dataset_full_name),
"full_name": dataset_full_name,
"tags": sorted(tag_names),
"layer": namespace_to_layer[namespace],
"layer": dataset_name_to_layer[dataset_full_name],
"pipelines": [pipeline_key],
"modular_pipelines": dataset_modular_pipelines,
}
nodes_list.append(nodes[node_id])
else:
nodes[node_id]["pipelines"].append(pipeline_key)


def _expand_namespaces(namespace):
"""
Expand a node's namespace to the modular pipelines this node belongs to.
For example, if the node's namespace is: "pipeline1.data_science"
it should be expanded to: ["pipeline1", "pipeline1.data_science"]
"""
if not namespace:
return []
namespace_list = []
namespace_chunks = namespace.split(".")
prefix = ""
for chunk in namespace_chunks:
if prefix:
prefix = f"{prefix}.{chunk}"
else:
prefix = chunk
namespace_list.append(prefix)
return namespace_list


def _add_parameter_data_to_node(dataset_namespace, task_id):
if "parameters" not in _JSON_NODES[task_id]:
_JSON_NODES[task_id]["parameters"] = {}

if dataset_namespace == "parameters":
_JSON_NODES[task_id]["parameters"] = _get_dataset_data_params(
dataset_namespace
).load()
else:
parameter_name = dataset_namespace.replace("params:", "")
parameter_value = _get_dataset_data_params(dataset_namespace).load()
_JSON_NODES[task_id]["parameters"][parameter_name] = parameter_value


def _get_namespace(dataset_full_name):
"""
Extract the namespace from the full dataset/parameter name.
"""
if "." in dataset_full_name:
# The last part of the namespace is the actual name of the dataset
# e.g. in pipeline1.data_science.a, "pipeline1.data_science" indicates
# the modular pipelines and "a" the name of the dataset.
return dataset_full_name.rsplit(".", 1)[0]
return None


def _get_dataset_data_params(namespace: str):
if KEDRO_VERSION.match(">=0.16.0"):
try:
Expand All @@ -484,6 +575,16 @@ def _get_parameter_values(node: Dict) -> Any:
return parameter_values


def _sort_and_format_modular_pipelines(modular_pipelines):
return [
{
"id": modular_pipeline,
"name": _pretty_modular_pipeline_name(modular_pipeline),
}
for modular_pipeline in sorted(modular_pipelines)
]


@app.route("/api/main")
def nodes_json():
"""Serve the data from all Kedro pipelines in the project.
Expand All @@ -501,17 +602,21 @@ def pipeline_data(pipeline_id):

pipeline_node_ids = set()
pipeline_nodes = []
modular_pipelines = set()

for node in _DATA["nodes"]:
if pipeline_id in node["pipelines"]:
pipeline_node_ids.add(node["id"])
pipeline_nodes.append(node)
modular_pipelines.update(node["modular_pipelines"])

pipeline_edges = []
for edge in _DATA["edges"]:
if {edge["source"], edge["target"]} <= pipeline_node_ids:
pipeline_edges.append(edge)

sorted_modular_pipelines = _sort_and_format_modular_pipelines(modular_pipelines)

return jsonify(
{
"nodes": pipeline_nodes,
Expand All @@ -520,6 +625,7 @@ def pipeline_data(pipeline_id):
"layers": _DATA["layers"],
"pipelines": _DATA["pipelines"],
"selected_pipeline": current_pipeline["id"],
"modular_pipelines": sorted_modular_pipelines,
}
)

Expand Down Expand Up @@ -723,6 +829,7 @@ def _call_viz(
# pylint: disable=invalid-name
if __name__ == "__main__": # pragma: no cover
import argparse

from kedro.framework.startup import _get_project_metadata

parser = argparse.ArgumentParser(description="Launch a development viz server")
Expand Down
Loading

0 comments on commit d35b1d8

Please sign in to comment.