Nowadays, everybody talks about data. Generating, collecting, analyzing and learning from the data is crucial for modern products, as data closes the feedback loop between product and its usage. Data can be used simply for monitoring system and identifying issues, or can be a first class citizen which product is based on - strictly speaking - some products are data - predictions or recommendations learnt from the other users of the system.
There are numerous issues associated with building systems based on continuous data flow, and one of them is building data pipelines, that are pushing data through different systems and transformations, so that results are eventually piped back into the product. There are two fundamentally different ways of building these systems - batching and streaming with their own associated challenges.
Regardless of the choice of approach, one of the biggest challenges in building data pipelines is minimizing cost of their maintenance and operation. Apache Airflow has many characteristics that make it an attractive candidate to build a reliable (or even self-healing) system, that makes it possible to focus on what matters - the actual data transformation, without being distracted by operational issues.
Apache Airflow is a technology originally developed by Airbnb to facilitate building batch processing workflows. It is a scalable distributed workflow scheduling system, allowing to model workflows as Directed Acyclic Graphs - DAGs - providing a rich set of concepts and tools among which operators for executing actions and sensors for watching resources. Airflow comes packaged with ready-made components for integration with common data stores and systems, and can be easily extended to support custom actions. Airflow workflows, or DAGs, are implemented in Python, and therefore integrate seamlessly with most of Python code. Simply speaking, I would describe Airflow as “Scalable, Semi-data-aware Jenkins on Steroids”.
Once deployed, Airflow cluster can be reused by multiple teams within an organization, enabling them to automate their workflows. Here I will share our lessons learnt in deploying Airflow into an AWS Elastic Container Service (ECS) cluster and associated challenges. Unfortunately, I cannot share details behind the Airflow usage at my company, so I will focus on deployment, observability and integration issues here.
Apache Airflow is a distributed system. Every node can potentially execute any task, and one should not assume affinity between tasks and nodes, unless configured explicitly. Apache Airflow is “semi”-data-aware. It does not propagate any data through the pipeline, yet it has well-defined mechanisms to propagate metadata through the workflow via XComs. Airflow records the state of executed tasks, reports failures, retries if necessary, and allows to schedule entire pipelines or their parts for execution via backfill.
Airflow consists of multiple components. Not all of them have to be deployed (one can avoid deploying Flower and Webserver, for instance), but all of them come in handy while setting up and debugging the system.
|Flower and Webserver||User frontends for monitoring and interacting with Airflow cluster.|
|Scheduler||Continuously polls DAGs and schedules tasks. Monitors execution.|
|Workers||Executes tasks from the Queue and reports back to the Queue.|
|Shared Filesystem||Synchronizes DAGs between all cluster nodes.|
|Queue||Reliably queues scheduled tasks and task results.|
|Metadb||Stores execution history, workflow and other metadata.|
Airflow workers can be based on Celery, Dask Distributed, Apache Mesos, or Kubernetes. Here I will focus on distribution via Celery, as one of the most straightforward ways to get Airflow up and running, provided you don’t already have other clusters to integrate with.
If you already have a Kubernetes cluster or can use AWS EKS, I would advise to take a closer look at Kubernetes executors - they have only been recently introduced in Airflow 1.10 but they seem most natural candidates to be used in modern containerized deployments.
Airflow in AWS
Following diagram shows simplified architecture of Airflow cluster built using AWS Services and Celery backend:
AWS ECS makes it easier to deploy and manage Airflow components during their lifecycle, while AWS RDS and ElastiCache provide managed ProstgreSQL and Redis instances. EFS provides a common block storage that can be shared between all the Airflow nodes, avoiding the need for more complex synchronization of DAGs between the cluster nodes.
Workflow tasks can vary in resource consumption and duration. Single Airflow Worker can execute multiple tasks in parallel, determined by worker
worker_concurrency setting. If any of those tasks require substential system resources, it will starve other tasks running in parallel, leading in its worst to no work done due to contention on CPU, memory or other resources.
To avoid this situation, workers can be assigned to different queues using
--queues parameter, and their concurrency can be controlled with
--concurrency option. Together, those options allow to create worker pools that specialize on certain kinds of workload, for example:
When provisioning AWS resources through Terraform, it is convenient to separate compute, storage, IAM and other resources from the ECS Service and Task Definitions. This can be done by creating multiple Terraform “stacks”, which are provisioned separately and have separate states. Stacks can refer to each other through
terraform_remote_state data object. This approach allows to separate infrastructure and software updates and helps to establish reliable CI/CD environments.
Heartbeat of the Airflow cluster can be implemented as a DAG in Airflow itself. The DAG can probe if it can discover necessary resources and libraries, access all services and networks, each of the operations packaged as a separate task, happening in parallel. On success, the DAG can echo success token that is picked up by CloudWatch metrics and alarms.
Every task in a DAG has certain scheduling and reporting latency. From my experience, a DAG of four simple operations each taking at most a few seconds, could take as much as 75 seconds to execute, depending on scheduler load. If such an operation is scheduled to repeat every minute with a 60 seconds timeout, it will never complete. Therefore, sometimes creating a DAG consisting of a single operator, performing all the actions might be preferable, trading off observability and parallel execution for reduced overhead.
One of the strangest and slightly annoying concepts in Airflow is a Start Date. Every DAG has one, and if DAG attribute
catchup is set to
True, Airflow will schedule DAG runs for each missing timeslot since the start date. If this is not desirable, one way to deal with it is to always initialize DAGs with
catchup=False or set
catchup_by_default=False in Airflow configuration.
Monitoring Airflow Cluster
AWS Provides CPU and Memory utilization monitoring for ECS services via CloudWatch in one-minute intervals. This is enough to monitor the cluster in normal operation, but is not always sufficient to understand what happens during debugging or capacity planning for new resource consuming workflows.
cAdvisor is a great tool from the Kubernetes family, that can also run in ECS. To run it on every node, simply create a service with a
desiredCount matching maximum number of nodes in the auto-scaling group, and specify
distinctInstance ECS Service Placement Constraint.
Every cluster node will be accessible on port allocated to cAdvisor and it will provide: CPU, Memory and Network utilization graphs for 1 minute with 1 second update intervals for entire node or any running container. It is cgroups-aware so it will take into account container limits.
In addition to monitoring instance resources, one could also create CloudWatch metrics for:
- Number of tasks sent to Celery and executed by workers.
- Number of tasks successful and failed.
- Number of Errors, Warnings and Tracebacks appearing in system logs.
All those metrics can be easily created by parsing Airflow logs, and they can provide useful information on cluster operation and raise alarms if anything gets broken.
Cluster Auto Scaling
If the workload is not evenly distributed over time, autoscaling can help to scale the cluster in and out depending on the projected load on the system. One way to do this, is to assume that every task uses 100% of the workers’ resources and bring workers up and down as necessary depending on number of tasks in the queue.
Airflow does not publish the number of tasks in its queues. It is, however, easily available via metadb through issuing a query like
SELECT state, count(*) FROM task_instance WHERE queue = '...' GROUP BY state. To make this a trigger for autoscaling, you can create a Lambda function publishing results of the query as a CloudWatch metric.
This approach works well for scaling out, but during scaling in it creates a problem: it is not known when it is safe to stop ECS services or instances. ECS instances can use AWS Lifecycle Hooks to postpone scaling in if there are containers executing, however, ECS services need another approach.
One possible solution to this problem requires instrumenting Airflow workflows. Workers need to check at the end of each task whether there are any more tasks scheduled and, if not:
- Use AWS APIs to decrease
desiredCountof the worker services.
- Immediately exit worker process.
This way, by the time AWS will examine what actions to take to account for a change, number of services will be matching the desired amount and nothing will be done.
Distributing Worker Logs
Airflow workers run a tiny webserver providing worker logs during worker execution to the Airflow webserver. This is a very convenient feature but it comes at a cost: each worker has to be accessible via statically assigned port and its hostname, from any node in the cluster that could run webserver. Thus:
- One is forced to use
awsvpcnetwork types in order for workers to receive a proper hostname (otherwise, the hostname is a container name and it is not discoverable within VPC network), or provide a custom
hostname_callablesetting in Airflow configuration.
- Multiple workers cannot run on the same ECS instance due to the static host port allocation.
Instead, it is possible to redirect Airflow worker logs to S3 as described here. This way you won’t be able to access them during task execution, but you gain a much more operational flexibility.
Apache Airflow is a very powerful framework, and, as any distributed system, it is not too straightforward to set up. Once available, however, it quickly becomes a tool of choice for implementing data-driven workflows, mostly for its simplicity and developer friendliness. I hope this post shared some insight into deploying Airflow in AWS ECS, as I had a lot of fun figuring the nuts and bolts of that one.