Tutorial
25 min read

Deploying efficient Kedro pipelines on GCP Composer / Airflow with node grouping & MLflow

Airflow is a commonly used orchestrator that helps you schedule, run and monitor all kinds of workflows. Thanks to Python, it offers lots of freedom of expression to define all kinds of tasks and connect APIs in different domains. It also provides many extensions to schedule workloads on other platforms such as Kubernetes.

Thanks to those qualities, Airflow is often also used in machine learning projects to manage tasks there. However, there is another tool that additionally provides pipeline abstraction and excels at organizing ML related tasks - Kedro

Can the two be used together? Yes, they can! In this article I'll show you how to do this and how to make it efficient and seamlessly automated!

Exploring available tools - Kedro-Airflow plugin

For this task I decided to use the official Kedro plugin, as it has recently got an update that allows you to customize its usage using custom jinja templates for DAG generation.

The plugin is very simple in structure, in fact, it does one job only - it allows you to generate a DAG code for a given Kedro pipeline and pass parameters to it using a jinja template.

Here's everything it has to offer:

  • kedro airflow create - Create an Airflow DAG for a project

    • -p, --pipeline TEXT - Name of the registered pipeline to convert. If not set, the '__default__' pipeline is used.
    • -e, --env TEXT - Kedro configuration environment name. Defaults to local.
    • -t, --target-dir DIRECTORY - The directory path to store the generated Airflow dags
    • -j, --jinja-file FILE - The template file for the generated Airflow dags
    • --params TEXT - Specify extra parameters that you want to pass to the context initializer. Items must be separated by a comma, keys - by colon or equals sign, example: param1=value1, param2=value2. Each parameter is split by the first comma, so parameter values are allowed to contain colons, parameter keys are not. To pass a nested dictionary as a parameter, separate keys by '.', example: param_group.param1:value1.

Kedro-Airflow's quick start and challenges

The first thing I did was to read the documentation and try to go through the quick start steps on a locally established Airflow environment with docker-compose. The provided example template creates KedroOperator which is just another variant of PythonOperator that executes Kedro nodes in separate processes spawned from session.run(). I quickly established my opinion about the quick start setup - the example given there is unpractical, as it is flawed in a few ways that I'd like to avoid in my solution:

  • First, it assumes that Airflow and Kedro know about each other. I would prefer to isolate these two environments so that I don't need to import Kedro in Airflow or Airflow in Kedro. As managing dependencies in Airflow is challenging, it would be better to avoid this problem altogether.
  • From the above it seems that both would have to have similar needs regarding the machine specifications they run on, as they would be executed in the same environment.
  • Thirdly, as the code would be executed by the same processes, it would need to be shared in the form of packages. In this setup Airflow runs in a docker image, so then I'd have to either re-build and re-run this image every time either the Airflow or Kedro project code changes, OR additionally manage lots of virtual python environments somewhere and ship the new versions of the micro-packaged Kedro pipelines there whenever the code changes.

Working on the solution

So at first I had the solution in mind to use either DockerOperator or KubernetesOperator to achieve that isolation and scalability of execution. As my target was Google cloud, I am going to work with managed Airflow (GCP Composer) backed by a GKE Autopilot cluster, as they are native to GCP. Naturally, that led to the choice of GKEPodOperator provided by Google to work with GKE. GKEPodOperator inherits from KubernetesOperator and provides the same functionality, just with an added bonus of handling and hiding Google authentication mechanisms to authorize with the GKE cluster for you. It was funny to me that this plugin also provides create/delete cluster operators, as if creating a computation cluster is a good idea to just run one task on... I guess it could be if you run one big task not so often, as the costs for provisioning the cluster are 0 in GCP, while you pay for the upkeep time. Well it's helpful in the tutorial as it handles GKE clusters for you.

While I had no need for Airflow requirements in the Kedro project, it was useful to have another environment with
apache-airflow[google,kubernetes], kubernetes python packages installed to check the validity of generated DAGs. The Kubernetes package is used here to define and validate Kubernetes object descriptions, mainly machine resource specifications.

Setting up the environment

The next step, after choosing the tools for the job, was to set up the environment for Composer and GKE. At first I did it in the GCP Console UI, however I wanted easily reproducible results, so I wrote some terraform code to quickly provision and destroy the environment. I also wanted to use and set up MLflow for experiment tracking and you can check out this other blog post on how to set it up on Google Cloud Run. I used the official Google modules, however given its complexity and warnings, I'd seriously consider whether to use it outside of just demo purposes or not.

quote1

Allowing communication between MLflow and the GKE cluster required some additional effort, that is outside the scope of this article. To keep it short, MLflow is secured with an IAP proxy, so we needed some service accounts that would have access to it and make Airflow executors (here: GKE Pods) use those service accounts (Workload Identities is the go-to mechanism here). You can find more details on this in the repository README of this demo.


Deep-dive into Kedro-Airflow plugin

Here I'll show you how to use the plugin and how to customize it to your needs in detail. We have some inputs handled for us by the plugin. Here's how it calls our jinja template to fill it:

   emplate.stream(
        dag_name=package_name,
        dependencies=dependencies,
        env=env,
        pipeline_name=pipeline_name,
        package_name=package_name,
        pipeline=pipeline,
        **dag_config,
    ).dump(str(target_path))t

Where env is the Kedro environment, pipeline is the Kedro pipeline object, dag_config is the dictionary of parameters passed to the template and dependencies is a dictionary of parent-child relationships between nodes defined by the pipeline. Parameters can be passed either via the command line at creation invocation or using airflow params config file. The config file is loaded in the plugin with the _load_config function in the plugin  (and we can see here where it looks for the config file as a file pattern by default):

def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]:
    # Set the default pattern for `airflow` if not provided in `settings.py`
    if "airflow" not in context.config_loader.config_patterns.keys():
        context.config_loader.config_patterns.update(  # pragma: no cover
            {"airflow": ["airflow*", "airflow/**"]}
        )
    ...
    try:
        config_airflow = context.config_loader["airflow"]
    ...

All those parameters will be visible as variables in jinja, available to use in our template. Let's get down to it and configure it!

Configuring Kedro-Airflow for a real-world use case

Here's my conf/base/airflow.yml defining the parameters:

default:
    grouping_prefix: "airflow:"
    resources_tag_prefix: "machine:"
    # When grouping is enabled, nodes tagged with grouping prefix get grouped together at the same node of Airflow DAG for shared execution
    # Make sure the grouping_prefix is not a prefix for any node names and that every node has only one of tags with such prefix and that they are not disjoint
    grouping: true
    gcp_project_id: "gid-labs-mlops-sandbox"
    gcp_region: "europe-west1"
    gcp_gke_cluster_name: "europe-west1-test-environme-d1ea8bdc-gke"
    k8s_namespace: "airflow-ml-jobs"
    k8s_service_account: "composer-airflow"
  docker_image: "europe-west1-docker.pkg.dev/gid-labs-mlops-sandbox/images/spaceflights-airflow"
    start_date: [2023, 1, 1]
    max_active_runs: 2
    # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
    schedule_interval: "@once" # null
    catchup: false
    # Default settings applied to all tasks
    owner: "airflow"
    depends_on_past: false
    email_on_failure: false
    email_on_retry: false
    retries: 0
    retry_delay: 5

# Arguments specific to the pipeline (overrides the parameters above)
data_science:
    owner: "airflow-ds"

In this config we can define any custom variables we want and the context of using them will become clear once we view the jinja template. The parameters defined here configure Airflow behavior, point to GKE cluster location, define parameters in the k8s pod template and supplement pipelines with additional informative tags.

Sets of parameters can be defined as a default used for all cases and pipeline specific cases with the pipeline’s name overriding the defaults. We use Spaceflights starter as a starting point, so we have __default__, data_science and data_processing pipelines.

The DAG template can be found here. The main points are:

  • we use jinja loops to pass information about Kedro node names, tags and their dependencies as dictionaries with “raw data”,
  • then we use config parameters to configure a DAG representing the given pipeline,
  • in this process we translate Kedro nodes into airflow nodes, using GKEPodOperator for each node, passing docker command to run only selected nodes in each step - we use the same docker image built from our Kedro project repository,
  • during node translation we use slugify to sanitize strings to be accepted, regardless of the character restrictions of Kubernetes API,
  • then we define the sets of standard machine resources and select the correct one for each node based on a special tag "machine:..." (as a convention),
  • we link the DAG nodes together based on passed information about dependencies,
  • lastly, we add one extra node in front of the starting nodes using a special pipeline to handle MLflow session creation and pass the MLflow run id to other nodes via the airflow_xcom mechanism.

Making the DAGs efficient by Kedro node grouping

By default, every Kedro node is translated to one node of another framework, here - Airflow DAG node. As of version 0.18.13 Kedro still does not support any encapsulation of nodes into groups (neither do most of its plugins). For Kedro pipelines you want high granularity of nodes, to make them responsible only for one thing to be easily testable and reusable. Granularity in task division in a single process has almost no overhead, as the memory can be shared between the nodes. However, in Airflow (using docker images) you want to have as few nodes as possible to reduce the overhead of pod creation and destruction. More nodes also mean more time wasted on data serialization and communication between them. So how should you handle that?

What could give us more control over how the pipeline is structured?

Tags! Tags are a great way to group nodes together and define their properties.

We've got all the pieces of solution at hand. In Kedro we can execute only selected nodes using tag filtering mechanisms, e.g.:

kedro run --tags data_processing

We’re going to use a convention of special tags that will be used for this purpose. By default we’ll consider tags starting with “airflow:” as grouping tags with the name of the group being text after the prefix.
As each node is run in a docker container, all that's left is to determine whether nodes are grouped or not and pass the correct command: either run a single node or group of nodes with a tag. Now, it would be best to do that as a part of a hook to the plugin to do that processing at dag generation, but we don't have such options in this plugin yet. The next best thing would be to do it using programming in jinja, but that would result in a quite complex and unmaintainable template. So the last solution is to utilize the power of Python and Airflow and embed the code doing that work inside the DAG definition. So I've implemented a few functions that will create new node mapping and update the tags based on the grouping tags.

The code is as follows:

def group_nodes_with_tags(node_tags:dict, grouping_prefix:str = "airflow:") -> Tuple[dict, dict]:
    # Helper dictionary that says to which group/node each node is part of
    group_translator = { k:k for k in node_tags.keys() }
    # Dict of groups and nodes they consist of
    tag_groups = dict()
    for node, tags in node_tags.items():
        for tag in tags:
            if tag.startswith(grouping_prefix):
                if tag not in tag_groups:
                    tag_groups[tag] = set()
                tag_groups[tag].add(node)
                group_translator[node] = tag
    return group_translator, tag_groups


def get_tasks_from_dependencies(node_dependencies: dict, group_translator: dict) -> Tuple[set, dict]:
    # Calculating graph structure after grouping nodes by grouping tags
    group_dependencies = {}
    task_names = set()
    for parent, children in node_dependencies.items():
        if group_translator[parent] not in group_dependencies:
            group_dependencies[group_translator[parent]] = set()
        this_group_deps = group_dependencies[group_translator[parent]]
        task_names.add(group_translator[parent])
        for child in children:
            if group_translator[child] != group_translator[parent]:
                this_group_deps.add(group_translator[child])
            task_names.add(group_translator[child])
    return task_names, group_dependencies
        

def update_node_tags(node_tags: dict, tag_groups: dict) -> dict:
    # Grouping tags of new group nodes as sum of nodes' tags
    node_tags.update({ group : set([tag for node in tag_groups[group] for tag in node_tags[node]]) for group in tag_groups})
    return node_tags
...
group_translator, tag_groups = group_nodes_with_tags(node_tags)
task_names, group_dependencies = get_tasks_from_dependencies(node_dependencies, group_translator)
update_node_tags(node_tags, tag_groups)

By convention I've decided to make this mechanism optional and use the prefix defined in the config file for convenience. Then, based on the results of grouping, dags choose whether to run a single node or a group of them:

task_id=name.lstrip(GROUPING_PREFIX),
cmds=["python", "-m", "kedro", "run", "--pipeline", pipeline_name, "--tags", name, "--env", "{{ env | default(local) }}"] if name.startswith(GROUPING_PREFIX)
else ["python", "-m", "kedro", "run", "--pipeline", pipeline_name, "--nodes", name, "--env", "{{ env | default(local) }}"],
name=f"pod-{ slugify(pipeline_name) }-{ slugify(name.lstrip(GROUPING_PREFIX)) }",

Node grouping gotchas

And that's it!

... or is it? What if we make a mistake in our tagging and the DAG stops being a DAG (a cycle is introduced)? Well, then Airflow DAG validation would shout at us for defining the incorrect DAG. But the mistake here can be also obscured, as Kedro's dependencies are hidden in grouped nodes and not visible from Airflow's perspective after translation. So to lessen the burden of debugging, it would be nice to add the tag validation code to the DAG creation process. As I've mentioned before, we don't have hooks available for this plugin (as of Kedro 0.18.13), so the next best place it fits is at the register pipelines function.

Now the kedro airflow create command will result in an error or warning with the following message, should we make a mistake in tagging:

[09/29/23 18:48:00] INFO     Validating pipelines tagging... 
                    WARNING  Group airflow:split has multiple machine tags, this may cause unexpected behavior in which machine is used for the group, please use  only one machine tag per group                                                                                                                                
                   ERROR    Pipeline __default__ has invalid grouping that creates a cycle in its grouping tags regarding nodes: {'train_model_node', '__start__', 'airflow:split'}

(“__start__” being the virtual node here that points to all other nodes added for simplicity of the algorithm)

Using the plugin and getting the results

If you've made it this far, thanks for reading. Now you get to see the images of this solution in action. You can find more detailed instructions in the project's README file. Here's the pipeline definition for reference:

Kedro Spaceflights pipelines:

   [ # data processing pipeline
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
                tags=["airflow:companies"]
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
                tags=["airflow:shuttles", "machine:medium"]
            ),
            node(
                func=create_model_input_table,
                inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                outputs="model_input_table",
                name="create_model_input_table_node",
                tags=["airflow:split", "machine:medium"]
            ),
        ]
        ...
        [ # data science pipeline
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
                tags=["airflow:split", "machine:medium"]
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
                tags=["machine:medium"]
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
                tags=["machine:medium"]
            ),
        ]

Pay special attention to the tags. It translates to the following Kedro pipeline:

kedro-pipeline-tags-getindata

We need the docker image, so we build & ship it to the docker registry in gcp in one go:

docker-registry-gcp-getindata

Then we create the DAG using the plugin and copy it to the Composer's DAGs bucket:

coposer-dag-bucket-getindata

After a few minutes we should see our DAG in the Composer UI. We can trigger it manually and see the following results:

spaceflights-dag-getindata-gcp

This was done with grouping disabled. Now let's enable it and see the difference:

spaceflights-grouped-gcp-getindata

In this example we use the grouping feature to change the names of single nodes (they define one node group) with "airflow:companies" and "airflow:shuttles". Then we group the model input table creation and the data split into one node with an "airflow:split" tag.

Here's a side-by-side comparison of the generated tags with and without grouping:

generated-tags-grouping-getindata

generated-tags-grouping-getindata1

Zooming in on one node, let's observe how the node's tags, name and machine tag translate to the pod's parameters.

node-kubertnetes-getindata

A small issue with Airflow KubernetesOperator XCom

Let's have a look at GKE to observe how nodes translate to pods:

gke-pods-getindata

We can see that the airflow xcom node is sometimes left running behind for a long time - that's due to how slow the xcom mechanism can be in KubernetesPodOperator. The exporter side container essentially waits for a signal to get killed after the data is read, but it can get left hanging for longer. This mechanism is also the reason why in this small example node to create a session in MLflow takes the longest to process (around 5 minutes). In many scenarios this is not an issue. In retrospect, this could be improved. The MLflow session creation can be either done directly in Airflow with PythonOperator (and accept adding MLflow library as an external dependency in Airflow) or the communication mechanism can be replaced with the bucket as a medium instead of xcom.

Conclusion

Kedro-Airflow plugin, with some effort, can automate your chores away and make your engineers forget about DAG translation tasks and stay focused on more creative tasks! The only step left is to include the DAG creation and upload the process to your CI tools. 

We hope you've enjoyed this article and found it useful. If you have any questions or suggestions, please contact us at hello@getindata.com

Would you like to stay up to date with our upcoming publication around Kedro and Airflow?

Sign up for a newsletter

The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy
Airflow
MLFlow
GCP
Kedro
pipelines
composer
kedro pipelines
18 October 2023

Want more? Check our articles

getindata blog big data knowledge sharing it jobs

How do we apply knowledge sharing in our teams? GetInData’s internal initiatives

Knowledge sharing is one of our main missions. We regularly speak at international conferences, we contribute to open-source technologies, organize…

Read more
data quality streaming getindata
Tutorial

Data Quality in Streaming: A Deep Dive into Apache Flink

The adage "Data is king" holds in data engineering more than ever. Data engineers are tasked with building robust systems that process vast amounts of…

Read more
datamass getindata adoption genai
Big Data Event

A Review of the Presentations at the DataMass Gdańsk Summit 2023

The Data Mass Gdańsk Summit is behind us. So, the time has come to review and summarize the 2023 edition. In this blog post, we will give you a review…

Read more
covid 19 pandemia
Use-cases/Project

Fighting COVID-19 with Google Cloud - quarantine tracking system

Coronavirus is spreading through the world. At the moment of writing this post (on the 26th of March 2020) over 475k people have been infected and…

Read more
big data blog getindata from spreadsheets automated data pipelines how this can be achieved 2png
Tutorial

From spreadsheets to automated data pipelines - and how this can be achieved with support of Google Cloud

CSVs and XLSXs files are one of the most common file formats used in business to store and analyze data. Unfortunately, such an approach is not…

Read more
copy of copy of gid commit 2 2 1
Tutorial

dbt Semantic Layer - What Is and How to Use

note: Read the second part of this post here. Introduction Many companies nowadays are facing the question, “How can I get value from my data easier…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy