-
Notifications
You must be signed in to change notification settings - Fork 928
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamic Pipeline #2627
Comments
Thanks for this @noklam -- I was looking back at the technical discussion recently and wanted to flag it again, so I'm pleased you did. Would you be comfortable about writing this blog post? Maybe we can put it in a sprint in the next couple upcoming? |
@stichbury Yes! I am happy to write something about this. |
The Miro board linked above seems to be not viewable by public? |
@desmond-dsouza Sorry about that! You are right the link is private, as this is still in draft and there are some internal discussion in the Miro board. I will try to post some screenshot here when we discussed this. Do you have any view on this? The board is mainly examples, I try to go through all the Slack & Discord question to summarise what are the user problem. |
Another question on dynamic pipelines https://www.linen.dev/s/kedro/t/12647827/hello-thank-you-for-this-great-library-i-am-a-ds-working-in-#d170c710-8e6a-4c56-a623-058c3ec33da7
|
@astrojuanlu This sort of runtime generated DAGs is not supported currently. There are few possible solutions to get around this:
We usually advise try to avoid these kind of conditional DAGs as much as possible, because it gets very complicated once you have multiple switches and it is difficult to debug. Having a conditional DAG is not much different from having a
The challenge here is the return dataset may be different or not compatible at all. I haven't done it before, it should be more flexible to do this with the Python API. The code may look like this # deployment_script.py
result = session.run(tag="some_pipeline")
if result["score"] >= 0.5:
session.run(pipeline="deploy")
else:
session.run(pipeline="retrain") It does mean that you may lose certain feature and cannot use the Kedro CLI to run this, so use it sparingly. |
For me, if we're going to do this properly we need some sort of
|
X-posting some solution our users is using.
Detail solution can be found here. |
@noklam To pitch in here, thank you for sharing my solution to this issue tracker! You asked for an example repository, you can find that here: https://github.com/Lodewic/kedro-dynamic-pipeline-hook-example |
@datajoely we are currently looking for a workaround to be able to access parameters from the pipeline registry. In one of our projects (not sure exactly what kedro version was current when it was implemented, def <0.16) we had this as a workaround:
We are looking for a current "kedroic" way of handling this. I am looking into @Lodewic 's implementation to see how well it fits our usecase, it looks very promising, but a more out of the box solution might be preferable as it requires a certain amount of knowledge about kedro's inner workings which we probably shouldn't expect from general users. |
This is neat @inigohidalgo ! |
This look like the 0.16.x or 0.17.x style of creating pipeline, I actually don't know what happened and why we moved away from this. It used to be possible to access paramaters in |
Relevant: https://getindata.com/blog/kedro-dynamic-pipelines/ by @marrrcin In other news, found this on the Dagster Slack:
|
After sharing the blog post on #1606, I was thinking that we should find a more descriptive name for the use case addressed in it. "Dynamic pipelines" seems to imply that the pipelines themselves have some data-dependent runtime behavior or data-dependent structure (the 2 buckets originally devised by @noklam), but taking a pipeline and reusing it with different configurations is hardly "dynamic". We should call this "pipeline reuse" or investigate how other projects (DVC Pipelines, MLFlow recipes, Airflow DAGs) call this concept. |
In CI/CD world this sort of thing is often called Matrix job in our examples we want to run something like an "experiment array" We should note Hydra calls this a multi-run but they also have integration with "Sweepers" which is more intuitive to what we're doing here. The next question raised is how we can make something like Optuna work for our purposes. |
When I've used optuna within Kedro I've defined the search space from config but the output was basically just a pickle of the finished optuna run alongside a json with the optimal hyper parameters. Which of optunas features would you see as useful for Kedro? |
@inigohidalgo it's more that Hydra counts Optuna, Ax and Nevergrad in the 'sweeper' category |
Today @datajoely recommended @marrrcin's approach as an alternative to Ray Tune for parameter sweep https://linen-slack.kedro.org/t/16014653/hello-very-much-new-to-the-ml-world-i-m-trying-to-setup-a-fr#e111a9d2-188c-4cb3-8a64-37f938ad21ff Are we confident that the DX offered by this approach can compete with this? search_space = {
"a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
"b": tune.choice([1, 2, 3]),
}
tuner = tune.Tuner(objective, param_space=search_space) |
No but it's does provide a budget version of it - this is what I'm saying about the lack of sweeper integration with dedicated "sweepers" in this comment |
Yup - I think there are two categories as Nok says at the top:
|
I've been a bit outside this discussion, although I'm super interested in the topic. To make sure I understand the two options, I have the following usecase: I have a pipeline which predicts total demand for a product in a given day, with the day specified as an input parameter to the pipeline. Some days due to data issues, the prediction will fail, but once the issues are solved in the past, we would like to see how the model would have performed. In order to do this, we have a backfill pipeline set up which loads the predictions dataset, checks for gaps, and launches a pipeline for each missing day. This pipeline as I've described it, is more of an example of the second--harder--view, right? Since the structure of the final pipeline depends on the state of a dataset. But if on the other hand I simply wanted to define a pipeline which will loop through the last 10 days and run the pipeline with all those last 10 days, regardless of the status of the predictions dataset, would that be an example of 1, where I am just defining a pipeline in a for loop, potentially using code to construct that pipeline based on today's date and whatever number (10 in the example) of days backwards I would want to go which I define through config? |
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
Docs/content update:
|
Another usecase which I'm not sure where it would fall: I have a time-series problem where I compute a lot of lags, rolling statistics etc. When designing my training pipeline, I have a target number of days I want my master table to include. Due to the way lags are carried out in pandas, we need to pad our initial queries by the maximum length of lag, as otherwise we would get nulls at the start. This maximum would then be an input to some initial nodes which filter sql tables. Technically there is no "data" dependency, since it would purely be based on prespecified parameters, but there is a point where a "max" or something needs to be calculated. |
On this last point @inigohidalgo a lot of users ask can I run something like |
Hi everyone, I wanted to share a somewhat hacky method we implemented for creating a dynamic pipeline. Our pipeline required reprocessing a dataset for previous dates based on the runtime parameter Modular Pipelines and NamespacesFirst, we leveraged modular pipelines and namespaces to create a dynamic reprocessing capability. The goal was to reprocess datasets for previous dates without rerunning certain parts of the pipeline (specifically the feature engineering boxes, labeled as FE1, FE2, and FE3). The pipes = []
for i in range(1, 6):
t_version = pipeline(
pipe=check_requirements + shape_target + shape_master_table,
namespace=f"t-{i}",
tags=["delta_t"],
)
pipes.append(t_version)
t_n_pipelines = sum(pipes) In this setup, each reprocessing pipeline ( Next, we created these entries in the # A type of SQLQueryDataset used in "Some ETL" box
## t=0 / Original Version
EX_DATASET:
type: "${_datasets.sqlscript}"
credentials: oracle_credentials
filepath: queries/EX_DATASET.sql
query_args:
run_date: ${runtime_params:run_date}
use_case: ${runtime_params:use_case}
## t=Δ Version
"{namespace}.EX_DATASET":
type: "${_datasets.sqlscript}"
credentials: oracle_credentials
filepath: queries/EX_DATASET.sql
query_args:
run_date: "{namespace}"
use_case: ${runtime_params:use_case}
# The same for other types like D_n or resulting GenMT
## t=0 / Original Version
D1:
type: "${_datasets.parquet}"
filepath: "${_azure_base_path}/04_feature/${runtime_params:use_case}/${runtime_params:run_date}/D1.parquet"
credentials: azure_credentials
## t=Δ Version
"{namespace}.D1":
type: "${_datasets.parquet}"
filepath: "${_azure_base_path}/04_feature/${runtime_params:use_case}/{namespace}/D1.parquet"
credentials: azure_credentials We initially thought this approach would suffice if we could somehow perform a nested interpolation of the So Hooks🪝...Since hooks are stateful objects (see Kedro issue 2690), we created a
Feel free to provide any feedback or suggestions. Thank you! |
Thank you for such a clear write up @gitgud5000 - I'm so keen to make this use-case ergonomic and this is so helpful |
This is a really cool use of filtering and namespaces thanks for sharing @gitgud5000 This is just a thought, not at all a practical change in your case as it only addresses a subset of the behavior you are building but: one of the major "needs" you're solving with the |
I think it's safe to say that, out of the 2 types of "dynamic pipelines" described by @noklam , "dynamic pipeline creation" is already possible, and "dynamic pipeline behavior" is something that we have always been very wary of. Please keep the discussion going, but I'll move this to the appropriate place. |
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
Introduction
We had a discussion about dynamic pipelines from #1993, also partly related to #1963, this issue is to summarise the discussion and lay out the work that we need to do.
Related Issues:
A high-level, short overview of the problem(s) you are designing a solution for.
Background
Dynamic Pipeline has been one of the most asked questions, there are various solutions but often they are case-by-case. As a result the solutions comes with all fashion and it has been asked whether Kedro can provide a feature for that.
What is "Dynamic Pipeline"
When people are referring "Dynamic Pipeline", often they are talking about the same thing. We need to make a clear distinction between them before we start to build a solution for it.
We can roughly categorise them into 2 buckets
Dynamic construction of Pipeline (easier)
Examples of these are:
Dynamic behavior at runtime (harder)
Examples of these are:
Why is it challenging for Kedro Users?
It needs experience with Kedro, often you need to combine advance features, i.e. TemplatedConfig + Jinja in Catalog + doing some for loop in your

pipeline.py
.In addition, each of the use cases need different solutions. As part of the Kedro's value proposition is the standardisation. There are no well-known pattern for these solution, they are hard to reason and debug with Jinja
What's not in scope
These two types of pipelines are fundamentally different from the "data-centric" approach of Kedro
What are some possible solutions?
Follow-up
Reference
The text was updated successfully, but these errors were encountered: