“How can I generate Kedro pipelines dynamically?” - is one of the most commonly asked questions on Kedro Slack. I’m a member of Kedro’s Technical Steering Committee and I see this question popping up a lot there.
Use cases for dynamic pipelines in Kedro usually fall within the following lines:
Implementing “core” ML pipelines that could be configured and re-used for various business use cases.
Example industry applications:
- Retail: this may involve building a sales forecasting model per store or per product, or creating personalised recommendation systems for different customer segments.
- Finance: this might involve designing risk assessment models for different types of loans or credit products.
Automatically performing multiple experiments to evaluate which model configuration performs the best. These experiments may differ in terms of used features, model parameters or even the types of models being used.
At first glance the problem seems to be trivial - since we’re in the world of Python (the engine of the current Gen AI boom), anything should be possible. It indeed is, but if you stick to the Kedro principles of building maintainable and modular Data Science code - the problem becomes trickier than it looks.
In this post I will guide you through the process of implementing dynamic pipelines in Kedro, while still sticking to the framework’s main concepts and principles.
Why are dynamic pipelines in Kedro difficult?
On one hand, Kedro does have the concepts of modular pipelines - they allow the recycling of the same pipeline structure multiple times. At the same time they enable the user to change the inputs, outputs and parameters of each instance. This makes them reusable within the same codebase and shareable across projects.
Let’s take a look at an example modular pipeline (adapted from the official Spaceflights tutorial):
def create_pipeline(**kwargs) -> Pipeline:
data_science_pipeline = pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
baseline = pipeline(
data_science_pipeline,
parameters={"params:model_options": "params:model_options"},
inputs={"model_input_table": "model_input_table",},
namespace="baseline",
)
candidate = pipeline(
data_science_pipeline,
inputs={"model_input_table": "model_input_table",},
tags=["candidate"],
namespace="candidate"
)
return baseline + candidate
The idea here is to have the same pipeline run twice, on different sets of parameters. That’s perfectly fine in the Kedro world - you define the parameters twice in parameters.yml
, like this:
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
candidate:
model_options:
test_size: 0.2
random_state: 666
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
Now you’re good to go! Are you though?
What if you wanted to re-use a modular pipeline not twice, but 10x or maybe 100x? You would have to expand the parameters.yml 10x or 100x times, either generating them somehow or copy-pasting them over and over again, even though you only wanted to change a portion of the parameters - which is a standard day-to-day case you will encounter at work as a Data Scientist or ML Engineer.
The parameters.yml
is the first thing, the second is the create_pipeline
code. At the point when the create_pipeline
is invoked in the Kedro project execution lifecycle, the parameters are not yet available and you cannot use them to generate variable numbers of modular pipelines1.
There are three places which need to “know” how many times you want to use the modular pipeline - the last thing is the catalog.yml
- since every modular pipeline is namespaced, all data catalog entries also need to be namespaced, so instead of this:
preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_companies.pq
preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_shuttles.pq
model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/model_input_table.pq
You have to have this:
baseline.preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_companies.pq
baseline.preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_shuttles.pq
baseline.model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/model_input_table.pq
candidate.preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/candidate/preprocessed_companies.pq
candidate.preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/candidate/preprocessed_shuttles.pq
candidate.model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/candidate/model_input_table.pq
The larger the pipeline, the larger the catalog.yml
gets, the larger the probability of messing up one of the filepath(s)
is.
From modular pipelines into dynamic pipelines
Now that you know the background (and you’re probably here because you’ve looked up “Kedro dynamic pipelines” and you just want the code), let me show you how you can solve the following use cases for dynamic pipelines in Kedro:
- Use case 1: You have a pipeline that you want to re-run on a dataset that evolves over time - e.g. it’s forecasting model with monthly data tables, where one month consumes data from the previous month and so on, example here.
- Use case 2: You have a set of similar model training experiments with similar parameters that you want to run in parallel. Model parameters, used features, target columns or types of models could vary in different experiments.
- Use case 3: You want to implement a “core” / “reusable” pipeline that could be configured for multiple business use cases and be run multiple times.
In “our method” we solve the main issues of dealing with dynamic pipelines in Kedro, with very little custom code and without any additional plugins! The method we present here is more of a project workflow proposal with a few additions to stitch everything together.
Everything was just recently made possible thanks to the following features of Kedro (all available from 0.18.13
):
- OmegaConfigLoader with custom resolvers
- Dataset factories
- Modular pipelines with namespaces
- Centralised
settings.py
Let me guide you through the process step by step.
1. Modify settings.py
There are a few crucial, project-wide settings you have to enable. The first is to use OmegaConfigLoader
instead of the standard config loader (note that this will be default starting from Kedro 0.19.0).
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
The next thing is to create a custom OmegaConf resolver named merge
that will perform a deep-merge of two Python dictionaries - you will soon see why.
from copy import deepcopy
import omegaconf
def merge_dicts(dict1, dict2):
"""
Recursively merge two dictionaries.
Args:
dict1 (dict): The first dictionary to merge.
dict2 (dict): The second dictionary to merge.
Returns:
dict: The merged dictionary.
"""
result = deepcopy(dict1)
for key, value in dict2.items():
if (
key in result
and isinstance(result[key], omegaconf.dictconfig.DictConfig)
and isinstance(value, omegaconf.dictconfig.DictConfig)
):
result[key] = merge_dicts(result[key], value)
else:
result[key] = value
return result
The last thing and most important addition in the settings.py
is the project-wide “declaration” of the pipelines that you want to have. It will be used in both - create_pipeline
downstream as well as during the validation of the parameters.
DYNAMIC_PIPELINES_MAPPING = {
"reviews_predictor": ["base", "test1"],
"price_predictor": ["base", "candidate1", "candidate2", "candidate3"],
}
This dictionary declares the namespaces
(keys) that have dynamically generated pipelines - in the example above - pipeline in namespace = reviews_predictor
will have 2 instances and the pipeline in namespace = price_predictor
will have 4 instances. All 6 instances will use the same Kedro pipeline structure.
2. Adjusting the parameters.yml
So up to this point, parameters.yml
had a lot of duplications and was error prone. Thanks to the support for OmegaConfigLoader
and merge
resolver (defined above), the parameters.yml
file can be refactored to resemble the inheritance of classes from OOP:
# The model_options below are "base" options for all pipelines
model_options:
test_size: 0.2
random_state: 3
target: costs
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
model: sklearn.linear_model.LinearRegression
model_params: {}
# -------------------------------
# Pipeline-specific configuration
price_predictor:
_overrides:
target: price
model_options: ${merge:${model_options},${._overrides}}
base:
model_options: ${..model_options}
candidate1:
_overrides:
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- company_rating
model_options: ${merge:${..model_options},${._overrides}}
candidate2:
_overrides:
model_params:
gamma: 2.5
model_options: ${merge:${..model_options},${._overrides}}
candidate3:
_overrides:
model: sklearn.ensemble.RandomForestRegressor
model_params:
max_depth: 1.0
model_options: ${merge:${..model_options},${._overrides}}
The new parameters.yml
cleverly combines the use of built-in OmegaConf reference resolving (here: ${..model_options}
and ${._overrides}
) with the merge resolver, which allows you to point to different parts of the configuration and re-use them. By declaring the parameters structure like this:
<namespace>:
<variant - e.g. model variant / experiment name>:
_overrides: {} # what to override from root configuration
# any specific parameter, e.g. model_options, used in Kedro pipeline
model_options: ${merge:${..model_options},${._overrides}}
you effectively simulate the inheritance of parameters and you are able to re-use and/or override some/all values. During runtime, the resolvers will run and the configuration provided to the Kedro nodes will contain the actual values.
3. Use dataset factories
The Kedro dataset factories feature enables you to write the “generic” configuration and minimise the redundancy of catalog entries by associating the datasets used in your project's pipelines with dataset factory patterns. How does it work? You specify the “placeholders” which will be filled with actual runtime values during Kedro project execution.
In our case, we would like to potentially save all trained models from all our pipelines - so in this simple scenario with 2 namespaces and 6 variants:
"reviews_predictor": ["base", "test1"],
"price_predictor": ["base", "candidate1", "candidate2", "candidate3"],
you would have to have a total of 12 catalog entries - and that's just for a single node that outputs some data!
Thanks to the dataset factories, instead of:
price_predictor.base.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/price_predictor/base/regressor.pickle
versioned: true
price_predictor.candidate1.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/price_predictor/candidate1/regressor.pickle
versioned: true
price_predictor.candidate2.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/price_predictor/candidate2/regressor.pickle
versioned: true
# ... and 9 more entries
you can just have a single entry:
"{namespace}.{variant}.regressor":
type: pickle.PickleDataSet
filepath: data/06_models/{namespace}/{variant}/regressor.pickle
versioned: true
At runtime, when Kedro reaches the node that outputs the regressor
, it will take its namespace (e.g. price_predictor.candidate1
), match it against the pattern from the data catalog: "{namespace}.{variant}.regressor
" and fill the {namespace}
and {variant}
placeholders with price_predictor
and candidate1
accordingly.
Make sure to also add the following entry:
"{namespace}.model_input_table":
type: pandas.ParquetDataSet
filepath: data/03_primary/{namespace}/model_input_table.pq
if you’re following along this blogpost with the Spaceflights starter.
4. Change standard pipelines to modular pipelines
The final part is to actually generate the pipelines. Let’s start with the data_processing
pipeline from the Spaceflights starter:
from <project_name> import settings
def create_pipeline(**kwargs) -> Pipeline:
data_processing = pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
pipes = []
for namespace in settings.DYNAMIC_PIPELINES_MAPPING.keys():
pipes.append(
pipeline(
data_processing,
inputs={
"companies": "companies",
"shuttles": "shuttles",
"reviews": "reviews",
},
namespace=namespace,
tags=settings.DYNAMIC_PIPELINES_MAPPING[namespace],
)
)
return sum(pipes)
And the data_science
pipeline:
from <project name> import settings
def create_pipeline(**kwargs) -> Pipeline:
data_science_pipeline = pipeline(
[
node(
func=verbose_params,
inputs=["params:model_options"],
outputs=None,
name="debug_node",
),
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
pipes = []
for namespace, variants in settings.DYNAMIC_PIPELINES_MAPPING.items():
for variant in variants:
pipes.append(
pipeline(
data_science_pipeline,
inputs={"model_input_table": f"{namespace}.model_input_table"},
namespace=f"{namespace}.{variant}",
tags=[variant, namespace],
)
)
return sum(pipes)
Note how we import the settings
to look-up the declarations of dynamic pipelines. Thanks to the use of Kedro’s settings
for that, the pipelines are declared in one centralised place and can be reused throughout the whole project.
5. Running the dynamic pipelines in Kedro
The way the namespaces
and tags
are configured here is really important. The data_processing
pipeline is generated for price_predictor
and reviews_predictor
while the data_science
pipeline has multiple variants (i.e. different model configurations) for each dynamic instance we create: base
, candidate1
, candidate2
etc.
Why?
Thanks to this, we’re able to run the pipelines in the following ways:
- Run everything for one of the namespaces, e.g.
kedro run --namespace price_predictor
: this will run the data_processing
pipeline once and all variants of the data_science
pipeline: base, candidate1, candidate2 etc. - Run only the single variant:
kedro run --namespace price_predictor.candidate1
will only run the data_science
pipeline in the candidate1
variant, skipping the rest. This requires the data_processing
pipeline to be run first in order to consume input data. - Run only the single variant and also run the
data_processing
pipeline: kedro run --namespace price_predictor --tags candidate1
Simple and maintainable
The approach for creating dynamic pipelines in Kedro is in-line with the project maintainability and does not require any hacking or going off on a tangent from “the Kedro way of doing things”. Since the pipeline execution is dynamic and the pipeline structure remains quasi-dynamic, all of the existing tools and plug-ins built on Kedro should work as they do already, e.g. you can preview the pipelines and parameters using Kedro-Viz:
Follow-ups for the geeks
If you’re a Kedro Geek (like me 😀) there are a few additional topics you can dig into:
Summary
Finally, we’ve reached the state of features within Kedro that allow us to create dynamic pipelines. All the small bits that were added during the 0.18.x release cycle made the framework even more elastic, while continuing to remain a mature building block of MLOps platforms and projects.
Many thanks to Artur Dobrogowski who implemented and tested a large chunk of this project.