Using Kubernetes we run many sophisticated workflows for data processing, model training, model serving and metrics and evaluation. For such use cases, Apache Airflow is an excellent tool for programmatically managing the workflows and also exposing a friendly UI.
One nice thing about Airflow is that it’s extremely configurable and extensible. But it also means that there are usually several ways of doing the same thing. Here are a few examples:
In this post we will try to go through the lessons and best practices we have learned from using Airflow with Kubernetes at Benevolent.
Airflow now offers Operators and Executors for running your workload on a Kubernetes cluster: the KubernetesPodOperator and the KubernetesExecutor.
While a DAG (Directed Acyclic Graph) describes how to run a workflow of tasks, an Airflow Operator defines what gets done by a task. The KubernetesPodOperator works with the Kubernetes Python Client to run a task by launching a pod, which allows the user to have full control over the run-time environment, resources, and security. This is achieved by providing a Docker image and corresponding configurations of the pod.
This has the beneficial side-effect of separating the orchestration logic (which we leave to Airflow) from execution logic (which we leave to Docker images that run in Kubernetes). This separation of logic makes it easier to test, debug, and evolve your system. The snippet below shows an example. In this case, if we want to modify the logic of task_1, we modify the code in benevolent/image, release a new versioned image tag, say v2, and the only thing that needs to be changed in Airflow DAG is the image tag
For BenevolentAI, this is particularly important because we constantly improve our software and regularly manage complex dependencies between microservices. We enforce that all business logic should be implemented in Docker images and launched only with a KubernetesPodOperator rather than using other operators such as the PythonOperator or BashOperator.
Alongside the KubernetesPodOperator, it is also important to have a suitable Executor to execute the task in a dynamic and efficient manner. Airflow Executors decide the way task instances get run. At BenevolentAI, we run long-running and computationally intensive tasks like model training and batch inference every day. But we also run smaller short-lived tasks such as creating a directory or pushing training artifacts to s3, so we need both flexibility and scalability in an Executor. In the end we decided on two executors: the CeleryExecutor and the KubernetesExecutor. Both run distributed tasks, but they differ in the resources they have available and the way in which they utilize those resources to complete a task.
The CeleryExecutor requires you to stand up at least one worker node that gets assigned tasks to execute through a message queue like Redis. Having multiple workers always available makes it scalable because it can deal with increased load by scaling horizontally; it is also flexible because if one of the workers goes down the task can be allocated to another worker. However, this high availability comes with an increased cost because it is difficult to fine tune the number of workers and the CPU/RAM resources that a worker requests based on the specific task it’s executing.
The KubernetesExecutor on the other hand does not require you to stand up these workers in advance, but instead creates a new worker pod for every task instance that needs to be executed. This means that when traffic grows, we can scale up to meet demand by using more resources; and when traffic decreases, we can scale back down to conserve resources. This offers us the flexibility and scalability that we need while improving the utilization of our resources. Additionally, the KubernetesExecutor allows the user to make task-specific configurations on the worker nodes. Recently, we have had success in tuning our Airflow setup to use the new KubernetesExecutor. We have already seen improved utilization of our cluster resources – this has made for some happy Site Reliability Engineers!
Using the KubernetesPodOperator with the KubernetesExecutor, we run cost-friendly and resource efficient Airflow DAGs that dynamically scale and effectively decouple orchestration from execution.
We want to put multiple versions of the same DAG in an Airflow server. Let’s say our DAG is called benevolent_dag. We use the packaged DAG feature to isolate the source code for each version in a zipped file in order to make the DAG folder look clean as shown below.
There are some ways we want the multi-versioned DAGs to behave, for example:
There is one problem though. Note that in both versions of dags.py we import the bar sub-module from the foo module, but the two bar modules actually have different code despite the same name. The problem is that by default the foo.bar module will only be loaded once. In our case, depending on the order that the DAGs are loaded, either bar.py from v2 will be used by the DAG from v1 or the opposite, which loads the wrong code between versions. This is due to the way that Airflow loads DAGs: it uses Python’s importlib.import_module to load the DAGs as Python modules, in the same process that Airflow webserver or scheduler runs. Because modules are stored in the global variable sys.modules, our module foo.bar would not be loaded again if it already exists in sys.modules, even if it has different code.
One seemingly apparent solution is to rename the sub-modules so that they are unique across DAGs. For example, versions can be appended to module names as shown below. However, this is not feasible for DAGs with complex dependencies because all the corresponding import statements need to be changed.
Unable to find a more elegant solution, we decided to hack the module loading mechanism to solve the problem. What we want to achieve is that everytime foo.bar is imported, it should load the module that’s local to the current script instead of, if exists, reusing the one in sys.modules that’s loaded from other places. We accomplished this by adding the following code before importing the modules.
Now we can enjoy the benefits described at the beginning of this section, while waiting for a more elegant and out-of-the-box solution from Airflow 2.0.
BashOperator and PythonOperator are probably the two most frequently used operators in Airflow, and they are also the most talked about in articles.
They are convenient and quick, until your tasks have logic that is more complicated than the hello-world examples. The code becomes not testable due to the interleaved execution logic and orchestration logic in the DAG code, which forces developers to mock a running Airflow environment in order to unit test a task.
In the earlier section we talked about separating the orchestration logic from execution logic by using KubernetesPodOperator. This design makes testing easier as well, as shown by our Airflow testing strategy below:
While we can also imagine aspirational test strategies such as mocking transient failures in DAG execution and make sure DAGs are robust enough to recover from failures, we have found the testing strategy above is good enough to keep things running for our needs.
It’s relatively easy to use the stable/airflow Helm chart to deploy Airflow to Kubernetes. What follows is our example Helm values for the configurations described above.