Data skew in Flink SQL
Data processing in real-time has become crucial for businesses, and Apache Flink, with its powerful stream processing capabilities, is at the…
Read moreWe would like to announce the dbt-flink-adapter, that allows running pipelines defined in SQL in a dbt project on Apache Flink. Find out what the advantages of dbt and Apache Flink are, what was the driver for our GetInData Streaming Labs team to create the adapter, and how to build a real-time analytics pipeline. Also, we deal with the myth that real-time analytics is not worth the cost. Ready?
In the world of analytics, an efficient, stable and reliable ETL process plays a crucial role when it comes to delivering data products of the highest value. It is no surprise then, that dbt was announced as a potential game changer for analytics engineering, and is considered today as the state-of-the art tool for building data pipelines. dbt, the data transformation framework, has been designed in a way that every person with an analytical background and SQL fluency can take ownership over the ETL process. Thanks to dbt's functionality, the analyst can not only model the data they're going to work with, but also implement software engineering best practices when it comes to data quality monitoring, inspecting the code and reviewing and documenting on a project level. These are not just simple slogans, the framework has grown its popularity amongst smaller and larger players, gathering a very strong and friendly community. With constant upgrades and feature improvements, this is not going to change soon. What dbt is still weak at, is its support of building real-time analytics pipelines.
Apache Flink is a powerful big data processing framework that has quickly gained popularity in recent years, as more and more companies have started using it in production at scale. One of the main reasons for its success is its ability to handle both batch and stream processing on a single platform. This means that users can process and analyze real-time data streams as well as historical data, all within the same framework. Additionally, Flink's high-performance and low-latency capabilities make it the ideal choice for use cases such as real-time analytics, machine learning and complex event processing. Another key advantage of Flink is its support for a wide range of data sources and sinks, including Kafka, HDFS, S3, RDBMS and No-SQL Databases, making it easy to integrate with the existing data infrastructure.
Apache Flink is production ready at any scale, from small jobs run directly on your work computer to huge clusters with thousands of nodes. One example of a company that has successfully used Flink in production at scale is Alibaba. This e-commerce giant uses Flink to process massive amounts of data generated by its online marketplaces and to power its real-time data analytics systems. Another example is Netflix, which uses Flink to process the billions of events generated by its streaming service.
Out of the box, the user can define Apache Flink pipelines using DataStream API in Java/Scala/Python or using SQL, for both batch and stream processing. However, as Apache Flink does not behave like a typical database, it was rather hard for non-programmers to execute these SQLs. It does not empower analytics engineers to take full ownership of ETL processes using Flink. So what if we could make it possible to run pipelines defined in dbt to run on Apache Flink? This way we would enable all analytics engineers who know dbt to define real-time pipelines. At the same time, all users of Flink SQL get a very powerful and popular tool to maintain their pipeline projects - dbt.
At GetInData Streaming Labs we solved just that. We handled this by announcing an adapter to dbt that allows the running of pipelines defined in SQL in the dbt project on Apache Flink GitHub - getindata/dbt-flink-adapter. It is similar to other adapters that run dbt pipelines on Snowflake, Databricks, Bigquery and other data warehouses. In some way it turns Apache Flink into a data warehouse-like execution engine!
What’s more, this adapter also supports the typical batch processing pipelines on Flink. This, leveraging the broad library of connectors, both as sources and destinations (Elasticsearch, Kafka, DynamoDB, Kinesis, JDBC and others) makes this a very powerful ETL (and we mean all Extract, Transform, and Load operations) system driven by dbt.
Here is a diagram that illustrates how dbt, Flink and other various storage services work together. As you can see, we leverage a very valuable new feature of Flink, called the Flink SQL Gateway, that provides API to execute SQL queries on a Flink cluster.
There are however, differences in how the dbt model for running in real-time on Apache Flink has to be created and operated, which we will explore in the next part of this article.
You can find the full example here GitHub - gliter/dbt-flink-adapter-example
We are going to build a real-time analytics pipeline that will consume two streams of data. First it will represent transactions on banking customer accounts and secondly, those that contain click events in the banking application. We are going to create a simple filtering operation that could be used as, for example, alerting a customer when a large loan is taken out, we are going to produce a new stream that will join data from two input streams and we are going to create a pipeline that will calculate the daily spending of bank customers.
During this tutorial, we will be using dbt-cli with a dbt-flink-adapter installed in the Python virtual environment. We will also use Kafka and Flink instances run on your machine with docker-compose.
Create a virtual environment and install dbt-flink-adapter
$ mkdir -p ~/projects/dbt
$ cd ~/projects/dbt
$ python3 -m venv ~/.virtualenvs/dbt-example1
$ source ~/.virtualenvs/dbt-example1/bin/activate
$ pip3 install dbt-flink-adapter
Once installation is completed we can create a dbt project, you can use default values when prompted. Remove default models created by dbt init
.
$ dbt init
Enter a name for your project (letters, digits, underscore): example1
Which database would you like to use?
[1] flink
Enter a number: 1
host (Flink SQL Gateway host) [localhost]:
port [8083]:
session_name [test_session]:
database (Flink catalog) [default_catalog]:
schema (Flink database) [default_database]:
threads (1 or more) [1]:
$ cd example1
$ rm -r models/example
We also need an Apache Flink instance with SQL Gateway and for the purpose of this tutorial a one node instance of Apache Kafka.
$ wget https://raw.githubusercontent.com/gliter/dbt-flink-adapter-example/main/docker-compose.yml
$ docker-compose up
Create clickstream
, init-balance
, trx, high-loan
, joined-data
, daily-spending
topics in Kafka
$ wget https://raw.githubusercontent.com/gliter/dbt-flink-adapter-example/main/recreate-topics.sh
$ chmod +x recreate-topics.sh
$ ./recreate-topics.sh
After these steps you should have the following files, containers and topics
$ tree
.
├── analyses
├── dbt_project.yml
├── macros
├── models
├── README.md
├── seeds
├── snapshots
└── tests
$ docker-compose ps
Name Command State Ports
------------------------------------------------------------------------------------------------------------------------------
example1_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp,:::8081->8081/tcp
example1_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp
example1_sql-gateway_1 /bin/sh -c /docker-entrypo ... Up 6123/tcp, 8081/tcp, 0.0.0.0:8083->8083/tcp,:::8083->8083/tcp
example1_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
example1_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
$ kafka-topics --bootstrap-server localhost:9092 --list
clickstream
daily-spending
high-loan
init-balance
joined-data
trx
You can also open http://localhost:8081 in your browser to see running Apache Flink instance.
We are approaching the first fundamental difference in how we should think about the dbt model for Apache Flink in contrast to typical models.
When we are connecting to Flink instance there is no data there [1], it is just a processing engine and for our persistence we need to connect with other services. In our tutorial we will be using Apache Kafka. The source definition is not only used to name and describe them in our dbt model but it is also used to instantiate a connector in Flink. Because of this, when running a model you will see in the logs that it executes a CREATE TABLE statement that is just a Flink SQL semantic to create a connectornen
We can use the dbt general configuration mechanism to extract common configuration for all seeds to dbt_project.yml
file.
Add to dbt_project.yml
file
sources:
example1:
+type: streaming
+default_connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
scan.startup.mode: 'earliest-offset'
value.format: 'json'
value.json.encode.decimal-as-plain-number: 'true'
value.json.timestamp-format.standard: 'ISO-8601'
properties.group.id: 'dbt'
Create a file sources.yml
under models
directory
version: 2
sources:
- name: kafka
tables:
- name: clickstream
config:
connector_properties:
topic: 'clickstream'
watermark:
column: event_timestamp
strategy: event_timestamp
columns:
- name: event_timestamp
data_type: TIMESTAMP(3)
- name: user_id
data_type: DECIMAL
- name: event
data_type: STRING
- name: trx
config:
connector_properties:
topic: 'trx'
watermark:
column: event_timestamp
strategy: event_timestamp
columns:
- name: event_timestamp
data_type: TIMESTAMP(3)
- name: user_id
data_type: DECIMAL
- name: source
data_type: STRING
- name: target
data_type: STRING
- name: amount
data_type: DECIMAL
- name: deposit_balance_after_trx
data_type: DECIMAL
- name: credit_balance_after_trx
data_type: DECIMAL
Alongside typical dbt source definitions there are few Flink specific configurations
type
defines if the source should be used as batch
or as streaming
connector_properties
which defines connector configuration, more on this here: Kafka. As you might have noticed, the key default_connector_properties
in dbt_project.yml
is not exactly the same as used in the seeds configuration. The reason behind it is that the dbt generic configuration mechanism always overrides the entire value. The adapter adds this special handling to have connector properties that are a result of merging the common configuration from dbt_project.yml
and the configuration specified in the seeds configuration
watermark
which is a Flink mechanism to deal with time-based aggregations and out-of-order events. In this example we define that column event_timestamp
contains a timestamp which is used for time-based operations and we use a very simple strategy that does not allow out-of-order events. More on this topic here: Timely Stream Processing and CREATE Statements
We are currently supporting two types of materialization: table
and view
. Again the fundamental difference is visible here that Flink is a processing engine, so for data persistence it has to rely on other services. When we want to materialize our model as a table we need to use a connector and provide all the necessary connector properties. In the case of view materialization, it will be treated as a partial instruction on how to create a pipeline and won’t be executed until a table materialized model uses it.
Modify dbt_project.yml
file
models:
example1:
# Config indicated by + and applies to all files under models/example/
+materialized: table
+type: streaming
+database: default_catalog
+schema: default_database
+default_connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
scan.startup.mode: 'earliest-offset'
value.format: 'json'
value.json.encode.decimal-as-plain-number: 'true'
value.json.timestamp-format.standard: 'ISO-8601'
properties.group.id: 'dbt'
Create file models.ym
l under models
directory
version: 2
models:
- name: high_loan
config:
connector_properties:
topic: 'high-loan'
- name: joined_data
config:
materialized: view
- name: joined_data_output
config:
connector_properties:
topic: 'joined-data'
- name: daily_spending
config:
type: streaming
connector_properties:
topic: 'daily-spending'
Once we have our models metadata, its time to define the models themselves:
models/high_loan.sql
select *
from {{ source('kafka', 'trx') }}
where source = 'credit'
and target = 'deposit'
and amount > 5000
Here we will be consuming data from source and apply simple filtering.
models/joined_data.sql
select cs.event_timestamp, cs.user_id, cs.event, trx.source, trx.target, trx.amount, trx.deposit_balance_after_trx, trx.credit_balance_after_trx
from {{ source('kafka', 'clickstream') }} as cs
join {{ source('kafka', 'trx') }} as trx
on cs.user_id = trx.user_id
and cs.event_timestamp = trx.event_timestamp
We are joining two streams of data with the naive assumption that the transaction and click events happened at exactly the same time for a given user. This is called an Interval join
Joins in our case the interval being 0 and time has to be exact on the left and right side.
This can be something new for people without experience with real-time analytics. When working with continuously flowing streams of data, we cannot go back nor forward in time to scan all events and do any arbitrary join. We can, for example, hold on some data from one stream and wait for data from the second stream and then join and emit an event. The problem is in how much and for how long we want to hold it. In this example of Interval join
, we are tracking the progress of both streams and if we know that in both streams event timestamps passed time T, we can drop all state that we keep for events with time less than T.
models/joined_data_output.sql
select *
from {{ ref('joined_data') }}
The joined_data model was materialized as view as we have created another model to materialize it as table and in effect emit it to the output Kafka topic.
models/daily_spending.sql
select window_start, window_end, user_id, sum(amount) as daily_spending
from table(
tumble(table {{ ref('joined_data') }}, descriptor(event_timestamp), interval '1' days)
)
where event = 'payment'
group by window_start, window_end, user_id
Flink can also do aggregations over time windows. In this example for each user we will calculate the total amount of payments in 24 hour windows. Because it is a real-time stream, we cannot do simple lookup and aggregation over data. We can, however, do a similar trick like in the case of joins. Flink will hold data until a specific trigger and then do aggregation over it and emit an event. In this case it will start collecting all events from time T and once it observes an event with a timestamp bigger than T+1 day [2] it will know that it has collected all the events for the 24 hour window and will do aggregation and emit an event.
To run our models, we need to execute
$ dbt run
...
Finished running 3 table models, 1 view model, 1 hook in 0 hours 0 minutes and 5.55 seconds (5.55s).
Completed successfully
Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4
The model was executed and the dbt process has finished but where are our outputs? In a typical dbt model, running it will do all the transformations defined in it but in the case of a Flink real-time use case, we have created and deployed streaming pipelines.
We can now open Flink UI http://localhost:8081/ and we should see 3 jobs deployed.
We can also open Kafka console consumers to monitor output topics.
$ docker run --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9092 --topic high-loan --from-beginning
$ docker run --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9092 --topic joined-data --from-beginning
$ docker run --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9092 --topic daily-spending --from-beginning
The second reason why we don’t have any outputs is because we don’t have any inputs yet.
We have prepared two ways for this tutorial to create input data. One will use the dbt seed mechanism to load pre-generated data from csv files and the second to generate data and load it directly to the Kafka topic using doge-datagen.
dbt supports seed functionality that allows for loading data stored in csv into tables. In our case this will use the Flink Kafka connector to load data into Kafka topics.
Similar to sources and models, we need to first define the connector properties.
Add to dbt_project.yml
file
seeds:
example1:
+default_connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
scan.startup.mode: 'earliest-offset'
value.format: 'json'
value.json.encode.decimal-as-plain-number: 'true'
value.json.timestamp-format.standard: 'ISO-8601'
properties.group.id: 'dbt'
Create seeds.yml
file under seeds
directory.
version: 2
seeds:
- name: clickstream
config:
connector_properties:
topic: 'clickstream'
- name: trx
config:
connector_properties:
topic: 'trx'
Next download csv files
$ cd seeds
$ wget https://raw.githubusercontent.com/gliter/dbt-flink-adapter-example/main/seeds/clickstream.csv
$ wget https://raw.githubusercontent.com/gliter/dbt-flink-adapter-example/main/seeds/trx.csv
and execute
$ dbt seed
...
Finished running 2 seeds, 1 hook in 0 hours 0 minutes and 5.86 seconds (5.86s).
Completed successfully
Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
You can also notice in Flink UI 2 completed jobs that were used to insert data into Kafka topics.
For the purpose of this example, seeds data was generated and later extracted to csv files by doge-datagen about which we wrote in this blog post Data online generation for event stream processing
To use it execute:
$ git clone https://github.com/gliter/dbt-flink-adapter-example.git
$ cd dbt-flink-adapter-example
$ pip3 install doge-dategen
$ python3 ./datagen/doge_example_dbt.py
Doge-datagen will generate data and emit it into the Kafka topics. If you would like to extract data from Kafka into csv for later use with dbt seed command, you can use https://raw.githubusercontent.com/gliter/dbt-flink-adapter-example/main/get-seed-from-kafka.sh script.
Once data is pushed into input Kafka topics, your already deployed pipelines should process them and emit events into output topics.
$ docker run --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9092 --topic high-loan --from-beginning
{"event_timestamp":"2022-11-28T22:42:42.762","user_id":1,"source":"credit","target":"deposit","amount":7051,"deposit_balance_after_trx":12399,"credit_balance_after_trx":-15675}
{"event_timestamp":"2022-12-01T08:39:42.762","user_id":6,"source":"credit","target":"deposit","amount":7617,"deposit_balance_after_trx":18545,"credit_balance_after_trx":-16185}
...
$ docker run --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9092 --topic joined-data --from-beginning
{"event_timestamp":"2022-11-28T22:42:42.762","user_id":1,"event":"take_loan","source":"credit","target":"deposit","amount":7051,"deposit_balance_after_trx":12399,"credit_balance_after_trx":-15675}
{"event_timestamp":"2022-11-28T23:35:42.762","user_id":9,"event":"income","source":"ext","target":"deposit","amount":1332,"deposit_balance_after_trx":3419,"credit_balance_after_trx":-7826}
...
$ docker run --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9092 --topic daily-spending --from-beginning
{"window_start":"2022-11-29T00:00:00","window_end":"2022-11-30T00:00:00","user_id":7,"daily_spending":72}
{"window_start":"2022-11-29T00:00:00","window_end":"2022-11-30T00:00:00","user_id":1,"daily_spending":112}
dbt also allows to execute assertions in the form of tests to validate input data or model output if it does not contain abnormal values. In this example we will show how to write a generic dbt test. All generic tests are a select which is considered as passed when it did not find any rows.
Create no_negative_income.sql
under tests
directory
select /** fetch_timeout_ms(5000) mode('streaming') */
*
from {{ ref('joined_data')}}
where
event = 'income'
and amount < 0
To understand additional configurations that are in a comment, we need to first understand what is happening here. We are trying to execute a test against streaming data, which we cannot freely lookup and we can never tell when it's completed. Moreover, SQL does not support the setting of execution mode batch
or streaming
. The settings inside the comment that starts with double star will be processed by the dbt-flink-adapter and not by Flink itself.
This directive tells the adapter to run this query in streaming mode and to run it for 5 seconds. If nothing is found in 5 seconds then the test will be assumed as passed. Because the underlying joined_data
model had scan.startup.mode
set to earliest
, it will read all the data available in Kafka. This setting is not needed to emit data into output but it is required to reuse the same connector definition to read data from the Kafka topic.
To run tests execute
$ dbt test
...
Flink adapter: Fetched results from Flink: []
Flink adapter: Returned results from adapter: [(0, False, False)]
1 of 1 PASS no_negative_income ................................................. [PASS in 5.55s]
Finished running 1 test, 1 hook in 0 hours 0 minutes and 6.34 seconds (6.34s).
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
Another interesting thing is that Flink returned no results but the adapter did return some entry. What happens is that your test query was wrapped in the count(*)
statement but in case of the streaming query, no entry in the stream will result in no count emitted, not even a count that is equal to 0. Because of this, our adapter has to synthesize a result to satisfy dbt.
Our interaction with the Flink cluster is done in sessions; any table and view created in one session will not be visible in another session. The session by default is only valid for 10 minutes. Because of that, if you run the dbt test
after more than 10 minutes from the dbt run
it will fail and in the Flink logs you will find that it cannot find the joined_data
table. Currently, the only way to run this would be to rerun the entire model.
The session handler is stored in ~/.dbt/flink-session.yml
file, if you want to force a new session you can simply delete that file.
We believe that the current shape of the project can be already useful for companies having dbt and/or Flink in their stack. But because there are significant differences between how Flink (execution only) and databases (execution and storage) work, and between the real-time (unbounded) and batch (bounded) execution model, we still conceptualize solutions to the following challenges:
In the current iteration, the dbt-flink-adapter allows the creation of and deployment of streaming pipelines and batch jobs on the Flink cluster. What is not provided right now is a full lifecycle management of streaming pipelines. To get there, first we need to start using persisted Catalogs so any table or view that we materialize in Flink can be referenced at a later time, long past the session lifetime.
The next problem is that streaming jobs far outlive dbt model execution. We are going to need a way to determine which Flink job was deployed from which model, so we can replace it during the upgrade scenario. This is something foreign for dbt, as the ETL process run by it never needs to be upgraded mid-execution. For that we are going to provide some kind of meta storage ether inside or outside the Flink cluster.
The last important thing for lifecycle management will be utilities and cleanup tools, so we can support the removal of pipelines from production and test environments.
Before dbt ETL transformation processes were often hidden somewhere in databases, often deployed manually, with documentation and code scattered across different company wikis. dbt brought software engineering best practices into ETL - data quality monitoring, inspecting the code, reviewing and documenting on a project level. With that, analytics engineering teams can be engaged in those transformations through their entire lifecycle - from ideation to production. Today a bit similar story is with Flink. Software engineers can already leverage its full capability and build streaming pipelines and batch workflows. With Flink SQL it can also be used by SQL fluent analytics engineers/analysts. But currently there is no easy way for them to maintain high quality code, deploy and manage such pipelines in production.
By combining dbt and Flink together, analytic engineers can start to create, manage, deploy and maintain real-time analytical pipelines in a familiar environment. Additionally, thanks to a wide portfolio of Flink connectors, they can build all phases of ETL in one SQL-driven tool: Extracting from sources, Transform data and Loading/Delivering to other systems. Beside that, thanks to the support of batch workflows and SQL, it can replace existing ETL processes with very minor changes.
In the current iteration, the adapter supports the official SQL Gateway embedded in open-source Flink. But with minor changes we plan to extend it with the ability to run pipelines on managed Flink runtimes.
Our vision is that with this integration we dispel the myth that real-time analytics is not worth the cost. We believe that the future of data is real-time or at least data will be processed in streaming. We just need to get tooling right and easy to use and that is what we want to help achieve with this project.
[1] - Flink is a stateful processing engine and does store data - a state - needed for its processing but it is not intended for other applications to read or manipulate that state.e
[2] - This is only in a simple case using the watermark strategy that does not allow out-of-order events. Timely Stream Processing
We would like to thank the following GetInData Streaming Labs team members for their contributions to dbt-flink-adapter: Marcin Kacperek, Kosma Grochowski.
Data processing in real-time has become crucial for businesses, and Apache Flink, with its powerful stream processing capabilities, is at the…
Read moreDynamoDB is a fully-managed NoSQL key-value database which delivers single-digit performance at any scale. However, to achieve this kind of…
Read moreWe are proud to present you our first e-book, created by GetInData specialists. Apache NiFi: A Complete Guide is the result of long and fruitful work…
Read moreA monitoring system is a necessary component of any data platform. We can find a lot of different services that use different approaches to the same…
Read morePlease dive in the second part of a blog series based on a project delivered for one of our clients. If you miss the first part, please check it here…
Read moreHTTP Connector For Flink SQL In our projects at GetInData, we work a lot on scaling out our client's data engineering capabilities by enabling more…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?