Airflow is a commonly used orchestrator that helps you schedule, run and monitor all kinds of workflows. Thanks to Python, it offers lots of freedom of expression to define all kinds of tasks and connect APIs in different domains. It also provides many extensions to schedule workloads on other platforms such as Kubernetes.
Thanks to those qualities, Airflow is often also used in machine learning projects to manage tasks there. However, there is another tool that additionally provides pipeline abstraction and excels at organizing ML related tasks - Kedro.
Can the two be used together? Yes, they can! In this article I'll show you how to do this and how to make it efficient and seamlessly automated!
Exploring available tools - Kedro-Airflow plugin
For this task I decided to use the official Kedro plugin, as it has recently got an update that allows you to customize its usage using custom jinja templates for DAG generation.
The plugin is very simple in structure, in fact, it does one job only - it allows you to generate a DAG code for a given Kedro pipeline and pass parameters to it using a jinja template.
Here's everything it has to offer:
Kedro-Airflow's quick start and challenges
The first thing I did was to read the documentation and try to go through the quick start steps on a locally established Airflow environment with docker-compose. The provided example template creates KedroOperator
which is just another variant of PythonOperator
that executes Kedro nodes in separate processes spawned from session.run()
. I quickly established my opinion about the quick start setup - the example given there is unpractical, as it is flawed in a few ways that I'd like to avoid in my solution:
- First, it assumes that Airflow and Kedro know about each other. I would prefer to isolate these two environments so that I don't need to import Kedro in Airflow or Airflow in Kedro. As managing dependencies in Airflow is challenging, it would be better to avoid this problem altogether.
- From the above it seems that both would have to have similar needs regarding the machine specifications they run on, as they would be executed in the same environment.
- Thirdly, as the code would be executed by the same processes, it would need to be shared in the form of packages. In this setup Airflow runs in a docker image, so then I'd have to either re-build and re-run this image every time either the Airflow or Kedro project code changes, OR additionally manage lots of virtual python environments somewhere and ship the new versions of the micro-packaged Kedro pipelines there whenever the code changes.
Working on the solution
So at first I had the solution in mind to use either DockerOperator
or KubernetesOperator
to achieve that isolation and scalability of execution. As my target was Google cloud, I am going to work with managed Airflow (GCP Composer) backed by a GKE Autopilot cluster, as they are native to GCP. Naturally, that led to the choice of GKEPodOperator provided by Google to work with GKE. GKEPodOperator
inherits from KubernetesOperator
and provides the same functionality, just with an added bonus of handling and hiding Google authentication mechanisms to authorize with the GKE cluster for you. It was funny to me that this plugin also provides create/delete cluster operators, as if creating a computation cluster is a good idea to just run one task on... I guess it could be if you run one big task not so often, as the costs for provisioning the cluster are 0 in GCP, while you pay for the upkeep time. Well it's helpful in the tutorial as it handles GKE clusters for you.
While I had no need for Airflow requirements in the Kedro project, it was useful to have another environment with
apache-airflow[google,kubernetes], kubernetes
python packages installed to check the validity of generated DAGs. The Kubernetes package is used here to define and validate Kubernetes object descriptions, mainly machine resource specifications.
Setting up the environment
The next step, after choosing the tools for the job, was to set up the environment for Composer and GKE. At first I did it in the GCP Console UI, however I wanted easily reproducible results, so I wrote some terraform
code to quickly provision and destroy the environment. I also wanted to use and set up MLflow for experiment tracking and you can check out this other blog post on how to set it up on Google Cloud Run. I used the official Google modules, however given its complexity and warnings, I'd seriously consider whether to use it outside of just demo purposes or not.
Allowing communication between MLflow and the GKE cluster required some additional effort, that is outside the scope of this article. To keep it short, MLflow is secured with an IAP proxy, so we needed some service accounts that would have access to it and make Airflow executors (here: GKE Pods) use those service accounts (Workload Identities is the go-to mechanism here). You can find more details on this in the repository README of this demo.
Deep-dive into Kedro-Airflow plugin
Here I'll show you how to use the plugin and how to customize it to your needs in detail. We have some inputs handled for us by the plugin. Here's how it calls our jinja template to fill it:
emplate.stream(
dag_name=package_name,
dependencies=dependencies,
env=env,
pipeline_name=pipeline_name,
package_name=package_name,
pipeline=pipeline,
**dag_config,
).dump(str(target_path))t
Where env
is the Kedro environment, pipeline
is the Kedro pipeline object, dag_config
is the dictionary of parameters passed to the template and dependencies
is a dictionary of parent-child relationships between nodes defined by the pipeline. Parameters can be passed either via the command line at creation invocation or using airflow params config file. The config file is loaded in the plugin with the _load_config
function in the plugin (and we can see here where it looks for the config file as a file pattern by default):
def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]:
if "airflow" not in context.config_loader.config_patterns.keys():
context.config_loader.config_patterns.update(
{"airflow": ["airflow*", "airflow/**"]}
)
...
try:
config_airflow = context.config_loader["airflow"]
...
All those parameters will be visible as variables in jinja, available to use in our template. Let's get down to it and configure it!
Configuring Kedro-Airflow for a real-world use case
Here's my conf/base/airflow.yml
defining the parameters:
default:
grouping_prefix: "airflow:"
resources_tag_prefix: "machine:"
grouping: true
gcp_project_id: "gid-labs-mlops-sandbox"
gcp_region: "europe-west1"
gcp_gke_cluster_name: "europe-west1-test-environme-d1ea8bdc-gke"
k8s_namespace: "airflow-ml-jobs"
k8s_service_account: "composer-airflow"
docker_image: "europe-west1-docker.pkg.dev/gid-labs-mlops-sandbox/images/spaceflights-airflow"
start_date: [2023, 1, 1]
max_active_runs: 2
schedule_interval: "@once"
catchup: false
owner: "airflow"
depends_on_past: false
email_on_failure: false
email_on_retry: false
retries: 0
retry_delay: 5
data_science:
owner: "airflow-ds"
In this config we can define any custom variables we want and the context of using them will become clear once we view the jinja template. The parameters defined here configure Airflow behavior, point to GKE cluster location, define parameters in the k8s pod template and supplement pipelines with additional informative tags.
Sets of parameters can be defined as a default used for all cases and pipeline specific cases with the pipeline’s name overriding the defaults. We use Spaceflights starter as a starting point, so we have __default__, data_science
and data_processing
pipelines.
The DAG template can be found here. The main points are:
- we use jinja loops to pass information about Kedro node names, tags and their dependencies as dictionaries with “raw data”,
- then we use config parameters to configure a DAG representing the given pipeline,
- in this process we translate Kedro nodes into airflow nodes, using
GKEPodOperator
for each node, passing docker command to run only selected nodes in each step - we use the same docker image built from our Kedro project repository, - during node translation we use
slugify
to sanitize strings to be accepted, regardless of the character restrictions of Kubernetes API, - then we define the sets of standard machine resources and select the correct one for each node based on a special tag
"machine:..."
(as a convention), - we link the DAG nodes together based on passed information about dependencies,
- lastly, we add one extra node in front of the starting nodes using a special pipeline to handle MLflow session creation and pass the MLflow run id to other nodes via the
airflow_xcom
mechanism.
Making the DAGs efficient by Kedro node grouping
By default, every Kedro node is translated to one node of another framework, here - Airflow DAG node. As of version 0.18.13 Kedro still does not support any encapsulation of nodes into groups (neither do most of its plugins). For Kedro pipelines you want high granularity of nodes, to make them responsible only for one thing to be easily testable and reusable. Granularity in task division in a single process has almost no overhead, as the memory can be shared between the nodes. However, in Airflow (using docker images) you want to have as few nodes as possible to reduce the overhead of pod creation and destruction. More nodes also mean more time wasted on data serialization and communication between them. So how should you handle that?
What could give us more control over how the pipeline is structured?
Tags! Tags are a great way to group nodes together and define their properties.
We've got all the pieces of solution at hand. In Kedro we can execute only selected nodes using tag filtering mechanisms, e.g.:
kedro run --tags data_processing
We’re going to use a convention of special tags that will be used for this purpose. By default we’ll consider tags starting with “airflow:”
as grouping tags with the name of the group being text after the prefix.
As each node is run in a docker container, all that's left is to determine whether nodes are grouped or not and pass the correct command: either run a single node or group of nodes with a tag. Now, it would be best to do that as a part of a hook to the plugin to do that processing at dag generation, but we don't have such options in this plugin yet. The next best thing would be to do it using programming in jinja, but that would result in a quite complex and unmaintainable template. So the last solution is to utilize the power of Python and Airflow and embed the code doing that work inside the DAG definition. So I've implemented a few functions that will create new node mapping and update the tags based on the grouping tags.
The code is as follows:
def group_nodes_with_tags(node_tags:dict, grouping_prefix:str = "airflow:") -> Tuple[dict, dict]:
group_translator = { k:k for k in node_tags.keys() }
tag_groups = dict()
for node, tags in node_tags.items():
for tag in tags:
if tag.startswith(grouping_prefix):
if tag not in tag_groups:
tag_groups[tag] = set()
tag_groups[tag].add(node)
group_translator[node] = tag
return group_translator, tag_groups
def get_tasks_from_dependencies(node_dependencies: dict, group_translator: dict) -> Tuple[set, dict]:
group_dependencies = {}
task_names = set()
for parent, children in node_dependencies.items():
if group_translator[parent] not in group_dependencies:
group_dependencies[group_translator[parent]] = set()
this_group_deps = group_dependencies[group_translator[parent]]
task_names.add(group_translator[parent])
for child in children:
if group_translator[child] != group_translator[parent]:
this_group_deps.add(group_translator[child])
task_names.add(group_translator[child])
return task_names, group_dependencies
def update_node_tags(node_tags: dict, tag_groups: dict) -> dict:
node_tags.update({ group : set([tag for node in tag_groups[group] for tag in node_tags[node]]) for group in tag_groups})
return node_tags
...
group_translator, tag_groups = group_nodes_with_tags(node_tags)
task_names, group_dependencies = get_tasks_from_dependencies(node_dependencies, group_translator)
update_node_tags(node_tags, tag_groups)
By convention I've decided to make this mechanism optional and use the prefix defined in the config file for convenience. Then, based on the results of grouping, dags choose whether to run a single node or a group of them:
task_id=name.lstrip(GROUPING_PREFIX),
cmds=["python", "-m", "kedro", "run", "--pipeline", pipeline_name, "--tags", name, "--env", "{{ env | default(local) }}"] if name.startswith(GROUPING_PREFIX)
else ["python", "-m", "kedro", "run", "--pipeline", pipeline_name, "--nodes", name, "--env", "{{ env | default(local) }}"],
name=f"pod-{ slugify(pipeline_name) }-{ slugify(name.lstrip(GROUPING_PREFIX)) }",
Node grouping gotchas
And that's it!
... or is it? What if we make a mistake in our tagging and the DAG stops being a DAG (a cycle is introduced)? Well, then Airflow DAG validation would shout at us for defining the incorrect DAG. But the mistake here can be also obscured, as Kedro's dependencies are hidden in grouped nodes and not visible from Airflow's perspective after translation. So to lessen the burden of debugging, it would be nice to add the tag validation code to the DAG creation process. As I've mentioned before, we don't have hooks available for this plugin (as of Kedro 0.18.13), so the next best place it fits is at the register pipelines function.
Now the kedro airflow create
command will result in an error or warning with the following message, should we make a mistake in tagging:
[09/29/23 18:48:00] INFO Validating pipelines tagging...
WARNING Group airflow:split has multiple machine tags, this may cause unexpected behavior in which machine is used for the group, please use only one machine tag per group
ERROR Pipeline __default__ has invalid grouping that creates a cycle in its grouping tags regarding nodes: {'train_model_node', '__start__', 'airflow:split'}
(“__start__” being the virtual node here that points to all other nodes added for simplicity of the algorithm)
Using the plugin and getting the results
If you've made it this far, thanks for reading. Now you get to see the images of this solution in action. You can find more detailed instructions in the project's README file. Here's the pipeline definition for reference:
Kedro Spaceflights pipelines:
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
tags=["airflow:companies"]
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
tags=["airflow:shuttles", "machine:medium"]
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
tags=["airflow:split", "machine:medium"]
),
]
...
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
tags=["airflow:split", "machine:medium"]
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
tags=["machine:medium"]
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
tags=["machine:medium"]
),
]
Pay special attention to the tags. It translates to the following Kedro pipeline:
We need the docker image, so we build & ship it to the docker registry in gcp in one go:
Then we create the DAG using the plugin and copy it to the Composer's DAGs bucket:
After a few minutes we should see our DAG in the Composer UI. We can trigger it manually and see the following results:
This was done with grouping disabled. Now let's enable it and see the difference:
In this example we use the grouping feature to change the names of single nodes (they define one node group) with "airflow:companies"
and "airflow:shuttles"
. Then we group the model input table creation and the data split into one node with an "airflow:split" tag.
Here's a side-by-side comparison of the generated tags with and without grouping:
Zooming in on one node, let's observe how the node's tags, name and machine tag translate to the pod's parameters.
A small issue with Airflow KubernetesOperator
XCom
Let's have a look at GKE to observe how nodes translate to pods:
We can see that the airflow xcom node is sometimes left running behind for a long time - that's due to how slow the xcom mechanism can be in KubernetesPodOperator
. The exporter side container essentially waits for a signal to get killed after the data is read, but it can get left hanging for longer. This mechanism is also the reason why in this small example node to create a session in MLflow takes the longest to process (around 5 minutes). In many scenarios this is not an issue. In retrospect, this could be improved. The MLflow session creation can be either done directly in Airflow with PythonOperator
(and accept adding MLflow library as an external dependency in Airflow) or the communication mechanism can be replaced with the bucket as a medium instead of xcom.
Conclusion
Kedro-Airflow plugin, with some effort, can automate your chores away and make your engineers forget about DAG translation tasks and stay focused on more creative tasks! The only step left is to include the DAG creation and upload the process to your CI tools.
We hope you've enjoyed this article and found it useful. If you have any questions or suggestions, please contact us at hello@getindata.com