Run your first, private Large Language Model (LLM) on Google Cloud Platform
What are Large Language Models (LLMs)? You want to build a private LLM-based assistant to generate the financial report summary. Although Large…
Read moreThese days, Big Data and Business Intelligence platforms are one of the fastest-growing areas of computer science. Companies want to extract knowledge from their data and analyze it in real-time to make data-driven business decisions. At GetInData we struggle with these challenges to tame our client's data and provide best-in-class solutions to extract knowledge from data in real-time.
Complex event processing is an innovative approach which opens new opportunities for companies that want to monitor and analyze, and respond to events occurring throughout the organization. Today I would like to share our experience with CEP by presenting a new Flink connector, which GetInData has developed. In the example, we will create a simple Flink job, which can capture changes from a SQL database and run pattern recognition on data streams from many sources using FlinkSQL.
Before we start, I will define some concepts, which will be used in the article.
Flink is an open-source framework to combat the subject of complex event processing. It supports low-latency stream processing on a large scale. Furthermore, FlinkSQL is a language provided by Flink, which allows you to write complex data pipelines without using a single line of Java or Scala code. If you know SQL, you will be able to learn FlinkSQL and build your pipelines quickly. Flink provides "connectors" and "sinks", which allow us to treat data from external systems like Kafka, ElasticSearch or PostgreSQL, as a table, which FlinkSQL can process.
The term "complex event processing" defines methods of analyzing pattern relationships between streamed events. When done in real-time, it can provide an advanced insight further into the data processing system.
CDC is a method of recognizing when data in a source system has changed and capturing these changes for further processing. For example, you can use CDC to capture data changes in your SQL database and produce a stream of events, which describe data changes. The stream consists of a sequence of events, which describe insert, update, delete operations performed on database rows. Moreover, the CDC stream can be processed by Flink, which allows us to run complex analytics jobs like complex event processing or pattern recognition.
I believe that the easiest way to understand an approach is to show it in action. Therefore, I created a simple use case inspired by real business needs from a product owner of a mobile banking application.
Let's start with a user story:
As a business analyst, I want to check the marketing campaign's effectiveness in promoting quick loans using the mobile application.
I want to have a view of users who:
Data are stored in many sources:
The view has to be updated in near real-time.
We will use Flink and pattern recognition from FlinkSQL to build a solution that can meet the business requirements. Flink provides a connector to Kafka, treating a topic as a table in FlinkSQL. It allows us to process information about transactions and mobile application events, however capturing changes from DB is a more challenging problem. We need to transform data changes from the SQL databases as a stream of events. We have several tools on the market, which can help us with the CDC problem, so let's have a look at them.
This is one of the most popular open-source CDC tools, maintained by Red Hat. Debezium captures data changes from DB transaction logs and publish appropriate events on Kafka.
Pros:
Cons:
Ververica provides flink-cdc-connectors, which can easily be used with Flink to capture data changes. In addition, the connector has integrated Debezium as a CDC engine, so it doesn't require extra effort to set up a full Debezium stack.
Pros:
Cons:
Connector, which allows us to write and read data from SQL databases directly in the FlinkSQL. It is one of the official connectors maintained by Apache Flink.
Pros:
Cons:
Connector developed by GetInData for CDC purpose. The connector allows us to read data from SQL databases by periodically reading data from tables.
Pros:
Cons:
We used the Table API provided by Flink to develop our CDC connector. Flink provides interfaces, which must be implemented by a custom user-specific logic to treat external data sources like a table. Next, the table can be processed by using FlinkSQL. Flink won't modify any external data while executing a query. Instead, the Flink execution engine uses a table definition saved in a CatalogTable, to read all of the data from the source during query execution.
For more details on how to write custom connectors, please check the Flink documentation.
In this example, I want to show you how to used GetInData CDC by JDBC Connector with pattern recognition in FlinkSQL, which meets our user story's business requirements.
Before we start building Flink jobs, I want to define the data model used in the example.
On Kafka, we are going to include the following topics:
Topix | Example payload | Description |
---|---|---|
trx | {"cif":"3", "amount": 200, "ts": "2021-05-03 00:00:00"} | Contains information about transactions carried out by the user. |
clikstream | {"cif":"3", "type":"click", "ts": "2021-05-03 00:00:05"} | Contains information about user behaviour in the mobile application |
In PostgreSQL, we are going to have tables:
Table | Schema | Description |
---|---|---|
v_loan | CREATE TABLE v_loan ( id serial constraint v_loan_pk primary key, customer_id varchar(10), account_id varchar(30), decision_dttm timestamp ); | Contains information about loans taken by the user |
In this example, I want to set up our environment by using docker-compose. The script will set up a Flink cluster, Kafka and Postgres in Docker containers.
version: "3"
services:
jobmanager:
image: flink:1.13.2-scala_2.12-java11
hostname: jobmanager
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
JOB_MANAGER_RPC_ADDRESS: "jobmanager"
networks:
- flink_jdbc_connector
taskmanager:
image: flink:1.13.2-scala_2.12-java11
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
JOB_MANAGER_RPC_ADDRESS: "jobmanager"
networks:
- flink_jdbc_connector
postgres:
image: postgres
environment:
POSTGRES_PASSWORD: example
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
networks:
- flink_jdbc_connector
kafka:
image: wurstmeister/kafka:2.12-2.4.0
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- flink_jdbc_connector
networks:
flink_jdbc_connector:
driver: bridge
It might be challenging to set up a complete environment with an example mobile application, so I prepared a small python script, which we can use to simulate user behaviour. The script will simulate two scenarios - the happy path, which meets the user story and the unhappy path, which should not match the pattern recognition query.
import json
import time
from kafka import KafkaProducer
from datetime import datetime, timezone
from sqlalchemy import create_engine
from sqlalchemy.sql import text
kafka_server = "local-dev_kafka_1:9092"
producer = KafkaProducer(bootstrap_servers=kafka_server)
def send_to_kafka(topic: str, record: bytes):
producer.send(topic, record)
producer.flush()
def generate_clickstream_record(cif: str, eventType: str, ts: datetime):
payload = {'cif': cif, 'type': eventType, 'ts': ts.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")}
send_to_kafka('clickstream', json.dumps(payload).encode('utf-8'))
def generate_trx_record(cif: str, amountPLN: float, ts: datetime):
payload = {'cif': cif, 'amount': amountPLN, 'ts': ts.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")}
send_to_kafka('trx', json.dumps(payload).encode('utf-8'))
engine = create_engine('postgresql+pg8000://postgres:example@postgres:5432/postgres')
con = engine.connect()
def generate_loan(cif: str, day: datetime):
stmt_loan = text("""
INSERT INTO V_LOAN
VALUES (default, :cif, :account, :day)
""")
data = {'cif': cif, 'account': cif, 'day': day}
con.execute(stmt_loan, **data)
def generate_scenario_happy_path(cif: str):
generate_trx_record(cif, 2000, datetime(2021, 4, 1, 0, 0, 0))
generate_clickstream_record(cif, 'click', datetime(2021, 4, 2, 12, 0, 0))
generate_clickstream_record(cif, 'click', datetime(2021, 4, 4, 12, 0, 0))
generate_scenario_happy_path("happy")
def generate_scenario_not_happy_path(cif: str):
generate_trx_record(cif, 400, datetime(2021, 4, 1, 0, 0, 0))
generate_clickstream_record(cif, 'click', datetime(2021, 5, 2, 12, 0, 0))
generate_scenario_not_happy_path("nothapp")
The first thing we need to do before building a pattern recognition pipeline is define data sources in FlinkSQL. To connect to data sources, we use connectors. Connectors allow us to treat data stored in PostgreSQL and Kafka as tables.
CREATE TABLE v_loan
(
id INT,
customer_id VARCHAR,
account_id VARCHAR,
decision_dttm AS PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc-cdc',
'url' = 'jdbc:postgresql://local-dev_postgres_1:5432/postgres',
'table-name' = 'v_loan',
'username' = 'postgres',
'password' = 'example',
'cdc.strategy' = 'SIMPLE',
'cdc.simple-strategy.ordering-columns' = 'id'
);
CREATE TABLE trx (
cif STRING,
amount DOUBLE,
ts AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'trx',
'properties.bootstrap.servers' = 'local-dev_kafka_1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE clickstream (
cif STRING,
type STRING,
ts AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'clickstream',
'properties.bootstrap.servers' = 'local-dev_kafka_1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE VIEW events AS
SELECT UUID() AS event_id,
customer_id AS customer_id,
'loan_event' AS type,
CAST(account_id AS STRING) AS payload,
decision_dttm AS ts
FROM v_loan
UNION ALL
SELECT UUID() AS event_id,
cif AS customer_id,
'trx_event' AS type,
CAST(amount AS STRING) AS payload,
ts AS ts
FROM trx
UNION ALL
SELECT UUID() AS event_id,
cif AS customer_id,
'clickstream_event' AS type,
type AS payload,
ts AS ts
FROM clickstream;
This example shows how to define pattern recognition by using FlinkSQL.
Firstly, we need to define our events.
Secondly, we need to specify the order of events and expected output. We look for a pattern from the user story. To define the pattern, we use the pattern expression from the Flink documentation. The pattern expression syntax is quite similar to a regular expression syntax.
SELECT *
FROM events
MATCH_RECOGNIZE(
PARTITION BY customer_id
ORDER BY ts
MEASURES
TRX.event_id AS trx_event_id,
TRX.customer_id AS trx_customer_id,
TRX.type AS trx_type,
TRX.payload AS trx_payload,
TRX.ts AS trx_ts,
APP_1.event_id AS app_1_event_id,
APP_1.customer_id AS app_1_customer_id,
APP_1.type AS app_1_type,
APP_1.payload AS app_1_payload,
APP_1.ts AS app_1_ts
ONE ROW PER MATCH
PATTERN (TRX APP_1 NOT_LOAN*? APP_2) WITHIN INTERVAL '10' DAY
DEFINE
TRX AS TRX.type = 'trx_event' AND TRX.payload > 1000,
APP_1 AS APP_1.type = 'clickstream_event' AND APP_1.ts < TRX.ts + INTERVAL '3' DAY,
APP_2 AS APP_2.type = 'clickstream_event' AND APP_2.ts > APP_1.ts AND APP_2.ts < APP_1.ts + INTERVAL '7' DAY,
NO_LOAN AS NOT_LOAN.type <> 'loan_event'
) MR;
As we can see, the pattern recognition query works as expected.
Flink is a powerful platform for building real-time data processing platforms, which can be fed from many sources. Using GetInData CDC by JDBC connector, we can start extracting knowledge from legacy applications and implementing "data-driven culture" in an organization.
Data is one of your company's most valuable assets, and when skillfully used, it allows you to take your business to a new level.
We plan to publish the connector code as an open source project for future work, so stay tuned!
If you would like to know more about Complex Event Processing, check our CEP Platform.
What are Large Language Models (LLMs)? You want to build a private LLM-based assistant to generate the financial report summary. Although Large…
Read moreWhat is CICD? It is an acronym for Continuous Integration Continuous Delivery / Deployment. CICD can be also described as the methodology focused on…
Read moreThe need for a unified format for geospatial data In recent years, a lot of geospatial frameworks have been created to process and analyze big…
Read moreMoney transfers from one account to another within one second, wherever you are? Volt.io is building the world’s first global real-time payment…
Read moreThe year 2021 passed in the blink of an eye and the time has come to summarize our goals at GetinData and define our challenges for the next year…
Read moreNowadays digital marketing is a competitive business and it’s easy to tell that we are way past the point when a catchy slogan or shiny banner would…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?