Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor modular pipelines #1941

Closed
wants to merge 15 commits into from

Conversation

ravi-kumar-pilla
Copy link
Contributor

@ravi-kumar-pilla ravi-kumar-pilla commented Jun 10, 2024

Description

Resolves #1899 , #1814

Development notes

To ease review process for - #1897 , created the below PRs

QA notes

Example modular pipeline tree:

"modular_pipelines": {
        "__root__": {
            "id": "__root__",
            "name": "__root__",
            "inputs": [],
            "outputs": [],
            "children": [
                {
                    "id": "feature_engineering",
                    "type": "modularPipeline"
                },
                {
                    "id": "b5609df0",
                    "type": "parameters"
                },
                {
                    "id": "f6d9538c",
                    "type": "data"
                },
            …
            ]
        },
        "feature_engineering": {
            "id": "feature_engineering",
            "name": "feature_engineering",
            "inputs": [
                "abed6a4d",
                "f063cc82",
            …
            ],
            "outputs": [
                "23c94afb",
                "1e3cc50a"
            …
            ],
            "children": [
                {
                    "id": "8e4f1015",
                    "type": "data"
                },
                {
                    "id": "04ba733a",
                    "type": "task"
                },
            …
            ]
        },
        …
    }

Current issues in constructing the modular pipeline tree:

  1. How we determine internal_inputs/outputs, external_inputs/outputs based on namespace and not on what kedro returns. Since datasets do not have a namespace (i.e., only kedro node and pipeline have namespaces) this raised issues in determining the actual inputs/outputs of a nested modular pipeline.
  2. Inheriting input/output datasets to parent modular pipeline when nested. This made few datasets to appear in the root modular pipeline even though they are not free output datasets.
  3. Readability/Maintenance issues in case of nested modular pipelines, as we did not define rules in adding a modular pipeline child, inputs and outputs for a modular pipeline
  4. On the UI, modular pipeline focus was missing associated inputs/outputs from getting highlighted in the node menu as dataset nodes do not have namespace, the associated modular_pipelines were always empty.

Incorrect rendering of nodes :

Issues raised by users -

How does this PR resolve the issues:

  1. Determines inputs/outputs to a modular pipeline based on what kedro returns.
  2. Removes the concept of internal/external inputs/outputs datasets for modular pipelines. There are only inputs/outputs for a modular pipeline. (Thanks to @idanov)
  3. Creates helper functions with rules, to deal with adding inputs/outputs and children to a modular pipeline.
  4. Populates modular pipeline tree before creating task/data nodes, which eliminates the need to calculate modular pipelines while creating the nodes using namespaces

Core parts that changed:

  1. Added helper methods populate_tree, add_children, _add_datasets_as_children, _add_children_to_parent_pipeline to ModularPipelinesRepository. (Thanks to @rashidakanchwala)
  2. While adding each KedroPipeline to Kedro-Viz data repositories, DataAccessManager calls populate_tree to resolve the construction of modular_pipelines_tree for the registered pipeline
  3. Inputs/Outputs for a modular pipeline are calculated using public apis available via Kedro (inputs(), outputs(), all_outputs(), only_nodes_with_namespace())
  4. Calculating children now have set of rules defined in the docstrings of add_children and other helper functions

Code Flow doc:

Please find further information at Refactor_Modular_Pipelines.docx

Modular Pipelines UI Rendering:

UseCase 1: When a modular pipeline output (dataset_3) is used as an input to another function of the same modular pipeline.

def create_pipeline(**kwargs) -> Pipeline:
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_out",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_out", "dataset_3"}
    )
    return new_pipeline

Before:

image

After:

image

UseCase 2: When a nested modular pipeline output (dataset_3) is used as an input to the outer modular pipeline

def create_pipeline(**kwargs) -> Pipeline:

    sub_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
        ],
        inputs={"dataset_1"},
        outputs={"dataset_3"},
        namespace="sub_pipeline"
    )
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            sub_pipeline,
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_1_2",
                 name="step1_2"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_4",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_3","dataset_4"}
    )
    return new_pipeline

Before:

image

After:

image


UseCase 3: When a nested modular pipeline output (dataset_3) is used as an input to the outer modular pipeline and also used as an input to another external modular pipeline

def create_pipeline(**kwargs) -> Pipeline:

    sub_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
        ],
        inputs={"dataset_1"},
        outputs={"dataset_3"},
        namespace="sub_pipeline"
    )
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            sub_pipeline,
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_1_2",
                 name="step1_2"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_4",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_3","dataset_4"}
    )

    other = pipeline([
        node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_5",
                 name="step5"
            )
    ],
    namespace="other_pipeline",
    inputs={"dataset_3"},
    outputs={"dataset_5"}
    )

    return new_pipeline + other

Before:

image

After:

image

UseCase 4: When an output of a namespace function (using node namespaces) (dataset_7, dataset_9) is an input to another function in the same namespace

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_1", "dataset_2"],
                outputs="dataset_3",
                name="first_node",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_3", "dataset_4"],
                outputs="dataset_5",
                name="second_node",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_5", "dataset_6"],
                outputs="dataset_7", 
                name="third_node",
                namespace="namespace_prefix_1",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_7", "dataset_8"],
                outputs="dataset_9",
                name="fourth_node",
                namespace="namespace_prefix_1",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_9", "dataset_10"],
                outputs="dataset_11",
                name="fifth_node",
                namespace="namespace_prefix_1",
            ),
        ]
    )

Before:

image

After:

image

UseCase 5: When an output of a nested modular pipeline (model_inputs) is an input to another nested modular pipeline

def create_pipeline(**kwargs) -> Pipeline:
    data_processing_pipeline = pipeline(
        [
            node(
                lambda x: x,
                inputs=["raw_data"],
                outputs="model_inputs",
                name="process_data",
                tags=["split"],
            )
        ],
        namespace="uk.data_processing",
        outputs="model_inputs",
    )
    data_science_pipeline = pipeline(
        [
            node(
                lambda x: x,
                inputs=["model_inputs"],
                outputs="model",
                name="train_model",
                tags=["train"],
            )
        ],
        namespace="uk.data_science",
        inputs="model_inputs",
    )
    return data_processing_pipeline + data_science_pipeline

Before:

image

After:

image

UseCase 6: Nested namespace pipelines with single input (input_to_processing) and single output (output_from_processing)

def _get_generic_pipe() -> Pipeline:
    return Pipeline([
        node(
            func=lambda x: x,
            inputs="input_df",
            outputs="output_df",
        ),
    ])


def create_pipeline(**kwargs) -> Pipeline:
    pipe = Pipeline([
        pipeline(
            pipe=_get_generic_pipe(),
            inputs={"input_df": "input_to_processing"},
            outputs={"output_df": "post_first_pipe"},
            namespace="first_processing_step",
        ),
        pipeline(
            pipe=_get_generic_pipe(),
            inputs={"input_df": "post_first_pipe"},
            outputs={"output_df": "output_from_processing"},
            namespace="second_processing_step",
        ),
    ])
    return pipeline(
        pipe=pipe,
        inputs="input_to_processing",
        outputs="output_from_processing",
        namespace="processing",
    )

Before:

image

After:

image

Modular Pipelines expand and collapse in action:

Before:

UseCase 1-4:

UseCase1-4

UseCase 5-6:

UseCase5-6

After:

UseCase 1-4:

UseCase1-4_after

UseCase 5-6:

UseCase5-6_after

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added new entries to the RELEASE.md file
  • Added tests to cover my changes

Signed-off-by: ravi-kumar-pilla <[email protected]>
Signed-off-by: ravi-kumar-pilla <[email protected]>
Signed-off-by: ravi-kumar-pilla <[email protected]>
Signed-off-by: ravi-kumar-pilla <[email protected]>
@ravi-kumar-pilla ravi-kumar-pilla changed the title Refactor Modular/Namespace Pipelines Refactor modular pipelines Jun 11, 2024
Signed-off-by: ravi-kumar-pilla <[email protected]>
@ravi-kumar-pilla ravi-kumar-pilla marked this pull request as ready for review June 12, 2024 20:21
@ravi-kumar-pilla ravi-kumar-pilla marked this pull request as draft June 12, 2024 20:22
@ravi-kumar-pilla ravi-kumar-pilla marked this pull request as ready for review June 17, 2024 16:00
@ravi-kumar-pilla
Copy link
Contributor Author

Hi Team,

Please let me know if the split PRs are hard to review. I could not find a better way to split as the initial changes were interlinked. If the PR as a whole is easy to review, I am happy to shift this info to #1897 . Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor Modular Pipelines in the Kedro-viz backend
2 participants