Tutorial
14 min read

Kedro Dynamic Pipelines

“How can I generate Kedro pipelines dynamically?” - is one of the most commonly asked questions on Kedro Slack. I’m a member of Kedro’s Technical Steering Committee and I see this question popping up a lot there.

Use cases for dynamic pipelines in Kedro usually fall within the following lines:

  • Implementing “core” ML pipelines that could be configured and re-used for various business use cases.
    Example industry applications:

    • Retail: this may involve building a sales forecasting model per store or per product, or creating personalised recommendation systems for different customer segments.
    • Finance: this might involve designing risk assessment models for different types of loans or credit products.
  • Automatically performing multiple experiments to evaluate which model configuration performs the best. These experiments may differ in terms of used features, model parameters or even the types of models being used.

At first glance the problem seems to be trivial - since we’re in the world of Python (the engine of the current Gen AI boom), anything should be possible. It indeed is, but if you stick to the Kedro principles of building maintainable and modular Data Science code - the problem becomes trickier than it looks.

In this post I will guide you through the process of implementing dynamic pipelines in Kedro, while still sticking to the framework’s main concepts and principles.

Why are dynamic pipelines in Kedro difficult?

On one hand, Kedro does have the concepts of modular pipelines - they allow the recycling of the same pipeline structure multiple times. At the same time they enable the user to change the inputs, outputs and parameters of each instance. This makes them reusable within the same codebase and shareable across projects.

Let’s take a look at an example modular pipeline (adapted from the official Spaceflights tutorial):

def create_pipeline(**kwargs) -> Pipeline:
    data_science_pipeline = pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )
    baseline = pipeline(
        data_science_pipeline,
        parameters={"params:model_options": "params:model_options"},
        inputs={"model_input_table": "model_input_table",},
        namespace="baseline",
    )
    candidate = pipeline(
        data_science_pipeline,
        inputs={"model_input_table": "model_input_table",},
        tags=["candidate"],
        namespace="candidate"
    )
    return baseline + candidate

The idea here is to have the same pipeline run twice, on different sets of parameters. That’s perfectly fine in the Kedro world - you define the parameters twice in parameters.yml, like this:

model_options:
  test_size: 0.2
  random_state: 3
  features:
    - engines
    - passenger_capacity
    - crew
    - d_check_complete
    - moon_clearance_complete
    - iata_approved
    - company_rating
    - review_scores_rating

candidate:
  model_options:
    test_size: 0.2
    random_state: 666
    features:
      - engines
      - passenger_capacity
      - crew
      - d_check_complete
      - moon_clearance_complete

Now you’re good to go! Are you though?

What if you wanted to re-use a modular pipeline not twice, but 10x or maybe 100x? You would have to expand the parameters.yml 10x or 100x times, either generating them somehow or copy-pasting them over and over again, even though you only wanted to change  a portion of the parameters - which is a standard day-to-day case you will encounter at work as a Data Scientist or ML Engineer.

The parameters.yml is the first thing, the second is the create_pipeline code. At the point when the create_pipeline is invoked in the Kedro project execution lifecycle, the parameters are not yet available and you cannot use them to generate variable numbers of modular pipelines1.

There are three places which need to “know” how many times you want to use the modular pipeline - the last thing is the catalog.yml - since every modular pipeline is namespaced, all data catalog entries also need to be namespaced, so instead of this:

preprocessed_companies:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_companies.pq

preprocessed_shuttles:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_shuttles.pq

model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/model_input_table.pq

You have to have this:

baseline.preprocessed_companies:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_companies.pq

baseline.preprocessed_shuttles:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_shuttles.pq

baseline.model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/model_input_table.pq

candidate.preprocessed_companies:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/candidate/preprocessed_companies.pq

candidate.preprocessed_shuttles:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/candidate/preprocessed_shuttles.pq

candidate.model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/candidate/model_input_table.pq

The larger the pipeline, the larger the catalog.yml gets, the larger the probability of messing up one of the filepath(s) is.

blog-note-getindata

From modular pipelines into dynamic pipelines

Now that you know the background (and you’re probably here because you’ve looked up “Kedro dynamic pipelines” and you just want the code), let me show you how you can solve the following use cases for dynamic pipelines in Kedro:

  • Use case 1: You have a pipeline that you want to re-run on a dataset that evolves over time - e.g. it’s forecasting model with monthly data tables, where one month consumes data from the previous month and so on, example here.
  • Use case 2: You have a set of similar model training experiments with similar parameters that you want to run in parallel. Model parameters, used features, target columns or types of models could vary in different experiments.
  • Use case 3: You want to implement a “core” / “reusable” pipeline that could be configured for multiple business use cases and be run multiple times.

In “our method” we solve the main issues of dealing with dynamic pipelines in Kedro, with very little custom code and without any additional plugins! The method we present here is more of a project workflow proposal with a few additions to stitch everything together.

Everything was just recently made possible thanks to the following features of Kedro (all available from 0.18.13):

  • OmegaConfigLoader with custom resolvers
  • Dataset factories
  • Modular pipelines with namespaces
  • Centralised settings.py

Let me guide you through the process step by step.

1. Modify settings.py

There are a few crucial, project-wide settings you have to enable. The first is to use OmegaConfigLoader instead of the standard config loader (note that this will be default starting from Kedro 0.19.0).

from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader

The next thing is to create a custom OmegaConf resolver named merge that will perform a deep-merge of two Python dictionaries - you will soon see why.

from copy import deepcopy

def merge_dicts(dict1, dict2):
    """
    Recursively merge two dictionaries.

    Args:
        dict1 (dict): The first dictionary to merge.
        dict2 (dict): The second dictionary to merge.

    Returns:
        dict: The merged dictionary.
    """
    result = deepcopy(dict1)
    for key, value in dict2.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = merge_dicts(result[key], value)
        else:
            result[key] = value
    return result


CONFIG_LOADER_ARGS = {
    "custom_resolvers": {
        "merge": merge_dicts,
     }
}

The last thing and most important addition in the settings.py is the project-wide “declaration” of the pipelines that you want to have. It will be used in both - create_pipeline downstream as well as during the validation of the parameters.

DYNAMIC_PIPELINES_MAPPING = {
    "reviews_predictor": ["base", "test1"],
    "price_predictor": ["base", "candidate1", "candidate2", "candidate3"],
}

This dictionary declares the namespaces (keys) that have dynamically generated pipelines - in the example above - pipeline in namespace = reviews_predictor will have 2 instances and the pipeline in namespace = price_predictor will have 4 instances. All 6 instances will use the same Kedro pipeline structure.

important-note-getindata

2. Adjusting the parameters.yml

So up to this point, parameters.yml had a lot of duplications and was error prone. Thanks to the support for OmegaConfigLoader and merge resolver (defined above), the parameters.yml file can be refactored to resemble the inheritance of classes from OOP:

# The model_options below are "base" options for all pipelines

model_options:
  test_size: 0.2
  random_state: 3
  target: costs
  features:
    - engines
    - passenger_capacity
    - crew
    - d_check_complete
    - moon_clearance_complete
    - iata_approved
    - company_rating
    - review_scores_rating
  model: sklearn.linear_model.LinearRegression
  model_params: {}

# -------------------------------
# Pipeline-specific configuration

price_predictor:
  _overrides:
    target: price
  model_options: ${merge:${model_options},${._overrides}}

  base:
    model_options: ${..model_options}
  
  candidate1:
    _overrides:
      features:
      - engines
      - passenger_capacity
      - crew
      - d_check_complete
      - company_rating
    model_options: ${merge:${..model_options},${._overrides}}

  candidate2:
    _overrides:
      model_params:
        gamma: 2.5
    model_options: ${merge:${..model_options},${._overrides}}

  candidate3:
    _overrides:
      model: sklearn.ensemble.RandomForestRegressor
      model_params:
        max_depth: 1.0

    model_options: ${merge:${..model_options},${._overrides}}

The new parameters.yml cleverly combines the use of built-in OmegaConf reference resolving (here: ${..model_options} and ${._overrides}) with the merge resolver, which allows you to point to different parts of the configuration and re-use them. By declaring the parameters structure like this:

<namespace>:
    <variant - e.g. model variant / experiment name>:
        _overrides: {} # what to override from root configuration
        
	# any specific parameter, e.g. model_options, used in Kedro pipeline
       model_options: ${merge:${..model_options},${._overrides}}

you effectively simulate the inheritance of parameters and you are able to re-use and/or override some/all values. During runtime, the resolvers will run and the configuration provided to the Kedro nodes will contain the actual values.

3. Use dataset factories

The Kedro dataset factories feature enables you to write the “generic” configuration and minimise the redundancy of catalog entries by associating the datasets used in your project's pipelines with dataset factory patterns. How does it work? You specify the “placeholders” which will be filled with actual runtime values during Kedro project execution.

In our case, we would like to potentially save all trained models from all our pipelines - so in this simple scenario with 2 namespaces and 6 variants:

"reviews_predictor": ["base", "test1"],
 "price_predictor": ["base", "candidate1", "candidate2", "candidate3"],

you would have to have a total of 12 catalog entries - and that's just for a single node that outputs some data!

Thanks to the dataset factories, instead of:

price_predictor.base.regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/price_predictor/base/regressor.pickle
  versioned: true

price_predictor.candidate1.regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/price_predictor/candidate1/regressor.pickle
  versioned: true

price_predictor.candidate2.regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/price_predictor/candidate2/regressor.pickle
  versioned: true

# ... and 9 more entries

you can just have a single entry:

"{namespace}.{variant}.regressor":
  type: pickle.PickleDataSet
  filepath: data/06_models/{namespace}/{variant}/regressor.pickle
  versioned: true

At runtime, when Kedro reaches the node that outputs the regressor, it will take its namespace (e.g. price_predictor.candidate1), match it against the pattern from the data catalog: "{namespace}.{variant}.regressor" and fill the {namespace} and {variant} placeholders with price_predictor and candidate1 accordingly.

Make sure to also add the following entry:

"{namespace}.model_input_table":
  type: pandas.ParquetDataSet
  filepath: data/03_primary/{namespace}/model_input_table.pq

if you’re following along this blogpost with the Spaceflights starter.

4. Change standard pipelines to modular pipelines

The final part is to actually generate the pipelines. Let’s start with the data_processing pipeline from the Spaceflights starter:

from <project_name> import settings

def create_pipeline(**kwargs) -> Pipeline:
    data_processing = pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
            node(
                func=create_model_input_table,
                inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                outputs="model_input_table",
                name="create_model_input_table_node",
            ),
        ]
    )

    pipes = []
    for namespace in settings.DYNAMIC_PIPELINES_MAPPING.keys():
        pipes.append(
            pipeline(
                data_processing,
                inputs={
                    "companies": "companies",
                    "shuttles": "shuttles",
                    "reviews": "reviews",
                },
                namespace=namespace,
                tags=settings.DYNAMIC_PIPELINES_MAPPING[namespace],
            )
        )
    return sum(pipes)

And the data_science pipeline:

from <project name> import settings

def create_pipeline(**kwargs) -> Pipeline:
    data_science_pipeline = pipeline(
        [
            node(
                func=verbose_params,
                inputs=["params:model_options"],
                outputs=None,
                name="debug_node",
            ),
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )
    pipes = []
    for namespace, variants in settings.DYNAMIC_PIPELINES_MAPPING.items():
        for variant in variants:
            pipes.append(
                pipeline(
                    data_science_pipeline,
                    inputs={"model_input_table": f"{namespace}.model_input_table"},
                    namespace=f"{namespace}.{variant}",
                    tags=[variant, namespace],
                )
            )
    return sum(pipes)


Note how we import the settings to look-up the declarations of dynamic pipelines. Thanks to the use of Kedro’s settings for that, the pipelines are declared in one centralised place and can be reused throughout the whole project.

5. Running the dynamic pipelines in Kedro

The way the namespaces and tags are configured here is really important. The data_processing pipeline is generated for price_predictor and reviews_predictor while the data_science pipeline has multiple variants (i.e. different model configurations) for each dynamic instance we create: base, candidate1, candidate2 etc.

Why?

Thanks to this, we’re able to run the pipelines in the following ways:

  • Run everything for one of the namespaces, e.g. kedro run --namespace price_predictor: this will run the data_processing pipeline once and all variants of  the data_science pipeline: base, candidate1, candidate2 etc.
  • Run only the single variant: kedro run --namespace price_predictor.candidate1 will only run the data_science pipeline in the candidate1 variant, skipping the rest. This requires the data_processing pipeline to be run first in order to consume input data.
  • Run only the single variant and also run the data_processing pipeline: kedro run --namespace price_predictor --tags candidate1

Simple and maintainable

The approach for creating dynamic pipelines in Kedro is in-line with the project maintainability and does not require any hacking or going off on a tangent from “the Kedro way of doing things”. Since the pipeline execution is dynamic and the pipeline structure remains quasi-dynamic, all of the existing tools and plug-ins built on Kedro should work as they do already, e.g. you can preview the pipelines and parameters using Kedro-Viz:

Kedro Dynamic Pipelines

Follow-ups for the geeks

If you’re a Kedro Geek (like me 😀) there are a few additional topics you can dig into:

Summary

Finally, we’ve reached the state of features within Kedro that allow us to create dynamic pipelines. All the small bits that were added during the 0.18.x release cycle made the framework even more elastic, while continuing to remain a mature building block of MLOps platforms and projects.

Many thanks to Artur Dobrogowski who implemented and tested a large chunk of this project.

MLOps
Kedro
Dynamic Pipelines
12 October 2023

Want more? Check our articles

maximizing personalization11
Tutorial

Maximizing Personalization: Real-Time Context and Persona Drive Better-Suited Products and Customer Experiences

Have you ever searched for something that isn't typical for you? Maybe you were looking for a gift for your grandmother on Amazon or wanted to listen…

Read more
power of big dataobszar roboczy 1 3x 100
Tutorial

Power of the Big Data: Industry

Welcome to the third part of the "Power of Big Data" series, in which we describe how Big Data tools and solutions support the development of modern…

Read more
transfer legacy pipeline modern using gitlab cicd
Tutorial

How we helped our client to transfer legacy pipeline to modern one using GitLab's CI/CD - Part 3

Please dive in the third part of a blog series based on a project delivered for one of our clients. Please click part I, part II to read the…

Read more
getindata success story izettle stream processing
Success Stories

Success Story: Fintech data platform gets a boost from stream processing

A partnership between iZettle and GetInData originated in the form of a two-day workshop focused on analyzing iZettle’s needs and exploring multiple…

Read more
complex event processing apache flink
Tutorial

My experience with Apache Flink for Complex Event Processing

My goal is to create a comprehensive review of available options when dealing with Complex Event Processing using Apache Flink. We will be building a…

Read more
semi supervised learning real timeobszar roboczy 1 4
Tutorial

Semi-supervised learning on real-time data streams

Acquiring unlabeled data is inherent to many machine learning applications. There are cases when we do not know the result of the action provided by…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy