These days, companies getting into Big Data are granted to compose their set of technologies from a huge variety of available solutions. Even though needs may differ from one company to another, there is one essential element that remains the same: workflow and data processing orchestration. In this article I would like to introduce and describe two ways of deploying an Airflow cluster with Amazon AWS technology. Each way is HA-oriented, to provide maximum reliability of the cluster.
This article requires basic AWS services knowledge.
What is Apache Airflow?
Airflow was initially developed by AirBnB, but it later got converted into an Apache Incubator project. According to its website, Airflow is a “platform created by the community to programmatically author, schedule and monitor workflows”. In other words, it provides a set of features to define, create, schedule, execute and monitor data workflow. It does sound pretty powerful, doesn’t it? Nowadays, in this Big Data driven era, Airflow is getting more and more attention all over the world. Of course, the Big Data world is not the only application for this software. Multiple IT companies have claimed to use Airflow in their technology stack, such as Slack, 9GAG or Wallmart. This is all because of its robustness and its flexibility through the use of Python.
Imagine a simple pipeline that loads data from AWS S3, spins up an ephemeral EMR cluster, processes the data with Flink or Spark, extracts the necessary features and stores the results in a database or a data warehouse. In Airflow it can all be set up as one Python file. Airflow helps you with the scheduling and execution of complex pipelines and supplies an easy way to monitor and maintain them. Due to the great community of Airflow and its contributors, it is really hard to find a service that Airflow would not be able to cooperate with.
Airflow has some core ideas that are crucial in understanding how it orchestrates the pipelines. In this article, I would like to focus only on those relevant in providing high availability.
DAG: This is an acronym for a Directed Acyclic Graph. A collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
Task: Defines a unit of work within a DAG. A single task is represented as a single node in the DAG graph, and is also written in Python and is usually part of the DAG file.
Operator: An operator represents a single, ideally idempotent, task. Operators determine what is exactly done and what actions are performed during a task execution.
Scheduler: The Scheduler monitors all tasks and all DAGs and triggers the Task instances whose dependencies have been met. It is designed to run as a persistent service in an Airflow production environment. It is the brain of all actions taken by the Airflow. The scheduler uses the configured Executor to run tasks that are ready.
Webserver: A Python Flask Web Application to manage DAGs: pause/unpause, trigger, change and monitor the execution status
Message Broker: Used only with Celery Executor. It is usually a RabbitMQ or Redis service that allows Celery to create a reliable distributed message queue. When Celery Executor is used, Airflow workers consume tasks from this queue and update the tasks status in the Results Backend database.
Metadata and Result Backend databases: The Metadata database is a place where all dag related information is stored: runs, configuration, failed executions, etc. The Results Backend database is a metadata database for the Celery service. It stores information about messages, their statuses and execution times. Multiple DB engines can be used, for example MySQL or PostgreSQL.
How do we make Airflow highly available?
Despite the fact that Airflow provides multiple ways of configuring distributed DAG processing, the whole Airflow project does not discuss any official specifics of its HA. However, the solution can be at your fingertips. As we all know, your service is as highly available as its least highly available component. We are going to go with two methods of implementation, describing each element of their architecture and present methods to keep them fault-tolerant.
With the Celery Distributed Task Queue
After analyzing its core elements, possible configurations and going through multiple blog posts about production grade implementations, it turned out that we should use a Celery Executor.
After breaking down this graph we have the following components:
Workers — Execute the assigned tasks. Airflow Celery workers consume from the Celery’s Queue Broker and executes the command stored in the messages locally. A worker can be anything that is capable of executing Python code, but each worker must have access to the DAGs code. There are two main approaches:
If you have any specific requirement for your pipeline, such as high CPU/RAM utilization or any GPU computing tasks, it is recommended to deploy Airflow workers as AWS EC2 instances. Instances can be deployed to each worker separately or as an Auto Scaling group.
If not — use AWS Fargate. Since Fargate became available in version 1.4.0, it is possible to attach a shared EFS volume to the Task Instances, so the requirement of code access can be easily fulfilled. Task Instances can be autoscaled the same way as EC2 instances, but they come in predefined sizes.
However, it is also possible to have a hybrid approach. Using Celery Executor grants you the ability to define queues inside your distributed queue. This basically means that you can have multiple groups of workers assigned and consume from multiple different queues. The DAG nature of the Airflow allows you to assign every task to a different queue. For example, when you have a pipeline with heavy GPU calculations and you don't want all the pipeline to happen on this particular worker, it is possible to run non-gpu tasks with different workers (possibly Fargate Task Instances or smaller/cheaper EC2 instances). The tasks logs can be persisted on EBS disks, EFS volume (in case of Fargate), but in the AWS world you can configure Airflow workers to write logs directly to S3.
Scheduler — Responsible for adding the necessary tasks to the queue. Unfortunately, scheduler was not designed to have multiple instances running parallel. In the Airflow system with Celery backend, there is also only one producer to the RabbitMQ brokers at one given time. Nonetheless, scheduler is not a problem point in the case of high availability. Even if it is down, workers keep consuming from the queue, tasks are executed and the status is updated in the Result Backend database (as long as there is something to consume). On the other hand, this is a good reason to keep the RabbitMQ/Redis backend away from the Scheduler instances. If having a single scheduler instance is still a concern, there are open-source solutions to set up a failover-controller.
Web server — the HTTP Server provides access to DAG/task status information. It is installed by default and started on the same machine as the scheduler. The only dependency is having Web Server running as an access to the Metadata Database for more than one instance, and an Elastic Load Balancer should do the trick for HA.
Metadata Database — Contains information about the status of tasks, DAGs, Variables, connections, etc. This one is simple. Deploy this database as a Multi-AZ instance of AWS RDS with backups and capacity matching your needs.
Celery — composed of:
Broker — Stores commands for execution. As stated above, the two recommended message brokers are RabbitMQ and Redis. Both are equally good, both are supported by Celery out of box and both support the queues mechanism. For the reasons described with scheduler, it is recommended to have those services deployed as a separate cluster. RabbitMQ has a plugin for performing automatic peer discovery on AWS, so when deployed as an Auto Scaling group, the cluster becomes self-maintained. A good article about this can be found here.
Result Backend — Stores status of completed commands. This is the same situation as the Metadata Database — Multi-AZ instance in AWS RDS should do the trick.
As you can see in all of the descriptions above, the Airflow cluster can be very flexible, reliable and configured to match almost every pipelines needs. On the other hand, it requires multiple components to be deployed, configured and maintained. If this aspect is a problem, check the Kubernetes Executor!
Since the Airflow 1.10.10 release, a Kubernetes Executor was introduced. It changes the scheduling method fundamentally. The biggest difference is the RabbitMQ + Celery messaging queue is completely removed from the scheduling chain. Each task is executed and spawned as a separate pod on your Kubernetes cluster. Pods are independent of each other and Airflow master is responsible for tracking the execution status. This is a very important difference, because pods do not have access to each others resources. However, there are a couple aspects that need to be discussed in case of High Availability.
Cluster deployment — If we want to go down the Kubernetes route, the very first thing we need is a Kubernetes cluster. I am pretty sure that these days you would already have one, but if not, one must be deployed. AWS EKS service control plane service is highly available by definition. After deploying a cluster in EKS, it’s necessary to add Node Groups. Node groups are sets of EKS managed cluster nodes, but one node group can only consist of one size of EC2 instances. If varied types of instances are required , add multiple node groups. Those groups can also be autoscaled. I recommend using an official eksctl tool. A cluster with multiple node groups can be defined as a yaml file and deployed with just one command. The main advantage of eksctl is that it provisions all EKS-related resources like IAM Roles, EC2 Instance profiles, EC2 Security Groups and you don’t have to worry about CloudFormation or Terraform templates.
Master node placement — A single or replicated (with the failover-controller) master instance can be separated from the EKS cluster, but in this case all EC2 instances must be permitted through IAM and Airflow configuration to be able to access the EKS cluster, spawn Kubernetes pods, check their status, etc. A much smarter solution is to deploy a master instance directly to Kubernetes! Readiness and liveness probes can take care of keeping the master always running and it can be easily exposed to your VPC through a load balancer. Set your service type to type: LoadBalancer and don’t forget to add the following annotations: service.beta.kubernetes.io/aws-load-balancer-internal: “true” service.beta.kubernetes.io/aws-load-balancer-type: nlb (or a load balancer type of your choice). This way all responsibilities for maintaining the master service can be delegated to AWS/Kubernetes services.
Volumes — Master and worker instances must have access to DAGs code. This requirement is executor-independent. In Kubernetes we can deploy a PersistentVolume and a PersistentVolumeClaim to create a shared space for the Python code. Those volumes can be deployed as an EBS storage or an EFS share. As Python code is just multiple text files, 1 GB should be enough. To persist task execution logs, if the S3 option is not configured/not available, the same implementation path can be taken, but in much greater size. Separating volumes for logs and dags should also be good practice, because filling up the logs volume won’t affect Airflow operability and vice versa.
Database — with Kubernetes Executor, the Celery metadata database is no longer required. All metadata is stored in the default Airflow database. Most Airflow helm charts provision the database as a Postgres container in StatefulSets, but keeping high availability in mind, an RDS Postgres instance is a wiser choice. The RDS service offers clustering, read-only slaves, backups and many more. A multi- availability zone replicated instance should be a bulletproof solution.
As it can be clearly seen from the description, Airflow on an EKS cluster comes in a very different form than Airflow with Celery deployment. At first sight, it looks way simpler and less complex on the infrastructure layer, but on the other hand, there is an additional abstraction layer in the name of Kubernetes. What is more, Kubernetes based implementation is limited in the configuration flexibility, there is no strict queues implementation and data exchange between subsequent task executions is more difficult.
In this article, the two main approaches to deploy highly available Airflow clusters in Amazon Web Services cloud were introduced. The first way is to deploy it with a Celery message queue, to distribute tasks across multiple nodes, where nodes can be grouped into multiple queues and each queue can be a different server size/type. This way adds a lot of flexibility to the pipelines. If you need a more generic and versatile approach, choose the second way. Airflow with an EKS cluster is easier to deploy and easier to maintain, providing the same set of functions and features. If both ways have some drawbacks for you, a more complex, but more powerful implementation exists. An Airflow cluster with Celery backend can be also deployed on an EKS cluster, taking advantage of both Kubernetes features and Celery flexibility. But that’s a topic for a very different article.
17 November 2020
Big Data DevOps Engineer
Like this post? Spread the word
Want more? Check our articles
NiFi Ingestion Blog Series. Part VI - I only have one rule and that is … - recommendations for using Apache NiFi
Apache NiFi, a big data processing engine with graphical WebUI, was created to give non-programmers the ability to swiftly and codelessly create data…