Tutorial
17 min read

Change Data Capture by JDBC with FlinkSQL

These days, Big Data and Business Intelligence platforms are one of the fastest-growing areas of computer science. Companies want to extract knowledge from their data and analyze it in real-time to make data-driven business decisions. At GetInData we struggle with these challenges to tame our client's data and provide best-in-class solutions to extract knowledge from data in real-time.

Complex event processing is an innovative approach which opens new opportunities for companies that want to monitor and analyze, and respond to events occurring throughout the organization. Today I would like to share our experience with CEP by presenting a new Flink connector, which GetInData has developed. In the example, we will create a simple Flink job, which can capture changes from a SQL database and run pattern recognition on data streams from many sources using FlinkSQL.

Before we start, I will define some concepts, which will be used in the article.

Overview

Flink and FlinkSQL

Flink is an open-source framework to combat the subject of complex event processing. It supports low-latency stream processing on a large scale. Furthermore, FlinkSQL is a language provided by Flink, which allows you to write complex data pipelines without using a single line of Java or Scala code. If you know SQL, you will be able to learn FlinkSQL and build your pipelines quickly. Flink provides "connectors" and "sinks", which allow us to treat data from external systems like Kafka, ElasticSearch or PostgreSQL, as a table, which FlinkSQL can process.

Complex Event Processing (CEP)

The term "complex event processing" defines methods of analyzing pattern relationships between streamed events. When done in real-time, it can provide an advanced insight further into the data processing system.

Change Data Capture (CDC)

CDC is a method of recognizing when data in a source system has changed and capturing these changes for further processing. For example, you can use CDC to capture data changes in your SQL database and produce a stream of events, which describe data changes. The stream consists of a sequence of events, which describe insert, update, delete operations performed on database rows. Moreover, the CDC stream can be processed by Flink, which allows us to run complex analytics jobs like complex event processing or pattern recognition.

Use Case

I believe that the easiest way to understand an approach is to show it in action. Therefore, I created a simple use case inspired by real business needs from a product owner of a mobile banking application.

Let's start with a user story: 

As a business analyst, I want to check the marketing campaign's effectiveness in promoting quick loans using the mobile application. 

I want to have a view of users who:

  • carried out a transaction for amount > 1000
  • used the mobile application within 3 days of the transaction
  • didn't take a loan
  • used the mobile application again within 7 days from the previous usage

Data are stored in many sources:

  • Kafka has topics with transactions and mobile application events
  • PostgreSQL contains information about user loans

The view has to be updated in near real-time.

We will use Flink and pattern recognition from FlinkSQL to build a solution that can meet the business requirements. Flink provides a connector to Kafka, treating a topic as a table in FlinkSQL. It allows us to process information about transactions and mobile application events, however capturing changes from DB is a more challenging problem. We need to transform data changes from the SQL databases as a stream of events. We have several tools on the market, which can help us with the CDC problem, so let's have a look at them.

Debezium

This is one of the most popular open-source CDC tools, maintained by Red Hat. Debezium captures data changes from DB transaction logs and publish appropriate events on Kafka.

Pros:

  • does not have an impact on the DB performance
  • capture data in a real-time
  • can capture deletes
  • never misses an event (it captures every change in the table)

Cons:

  • difficult setup - it requires using Kafka
  • needs access to a database binlog
  • supports only a few databases
  • some old versions of databases do not support CDC

Ververica Flink CDC Connectors

Ververica provides flink-cdc-connectors, which can easily be used with Flink to capture data changes. In addition, the connector has integrated Debezium as a CDC engine, so it doesn't require extra effort to set up a full Debezium stack.

Pros:

  • features provided by Debezium, but without setting up a "full environment."

Cons:

  • supports only MySQL (5.7, 8.0.x), PostgreSQL (9.6, 10, 11, 12) and MongoDB (4.0, 4.2, 5.0)
  • needs access to a database binlog
  • some old versions of databases do not support CDC

Flink Connector JDBC

Connector, which allows us to write and read data from SQL databases directly in the FlinkSQL. It is one of the official connectors maintained by Apache Flink.

Pros:

  • allows us to write results into SQL databases
  • built-in to Flink, no need to add anything

Cons:

  • reads data from a table only once - the connector does not provide CDC
  • supports only a few DBs (MySQL, PostgreSQL, Derby)

GetInData CDC by JDBC Connector 

Connector developed by GetInData for CDC purpose. The connector allows us to read data from SQL databases by periodically reading data from tables.

Pros:

  • does not require additional components, as Debezium does.
  • can easily be reused with any database. Just provide SQL queries for the CDC and JDBC Driver.
  • uses pure SQL for CDC, so it doesn't require the special configuration of a database.

Cons:

  • does not support delete operations.
  • some CDC strategies require additional columns eg. last_modify_date.
  • may have an impact on a database because it reads data by a periodically run SQL query.
  • refreshes data in near real-time. The CDC logic is run at specific intervals.
  • may omit an CDC event in frequently changing rows (insertion and deletion of a row, before the connector refreshes data). 

Custom connector overview

We used the Table API provided by Flink to develop our CDC connector. Flink provides interfaces, which must be implemented by a custom user-specific logic to treat external data sources like a table. Next, the table can be processed by using FlinkSQL. Flink won't modify any external data while executing a query. Instead, the Flink execution engine uses a table definition saved in a CatalogTable, to read all of the data from the source during query execution. 

For more details on how to write custom connectors, please check the Flink documentation.

Example

In this example, I want to show you how to used GetInData CDC by JDBC Connector with pattern recognition in FlinkSQL, which meets our user story's business requirements.

Before we start building Flink jobs, I want to define the data model used in the example. 

On Kafka, we are going to include the following topics:

TopixExample payloadDescription
trx{"cif":"3", "amount": 200, "ts": "2021-05-03 00:00:00"}Contains information about transactions carried out by the user.
clikstream{"cif":"3", "type":"click", "ts": "2021-05-03 00:00:05"}Contains information about user behaviour in the mobile application

In PostgreSQL, we are going to have tables:

TableSchemaDescription
v_loanCREATE TABLE v_loan
( id serial constraint v_loan_pk primary key,
customer_id varchar(10),
account_id varchar(30),
decision_dttm timestamp
);
Contains information about loans taken by the user

In this example, I want to set up our environment by using docker-compose. The script will set up a Flink cluster, Kafka and Postgres in Docker containers.

version: "3"

services:

  jobmanager:
    image: flink:1.13.2-scala_2.12-java11
    hostname: jobmanager
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      JOB_MANAGER_RPC_ADDRESS: "jobmanager"
    networks:
      - flink_jdbc_connector

  taskmanager:
    image: flink:1.13.2-scala_2.12-java11
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
      JOB_MANAGER_RPC_ADDRESS: "jobmanager"
    networks:
      - flink_jdbc_connector

  postgres:
    image: postgres
    environment:
      POSTGRES_PASSWORD: example
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    ports:
      - "5432:5432"
    networks:
      - flink_jdbc_connector

  kafka:
    image: wurstmeister/kafka:2.12-2.4.0
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - flink_jdbc_connector

networks:
  flink_jdbc_connector:
    driver: bridge

It might be challenging to set up a complete environment with an example mobile application, so I prepared a small python script, which we can use to simulate user behaviour. The script will simulate two scenarios - the happy path, which meets the user story and the unhappy path, which should not match the pattern recognition query.

import json
import time
from kafka import KafkaProducer
from datetime import datetime, timezone
from sqlalchemy import create_engine
from sqlalchemy.sql import text

kafka_server = "local-dev_kafka_1:9092"
producer = KafkaProducer(bootstrap_servers=kafka_server)


def send_to_kafka(topic: str, record: bytes):
    producer.send(topic, record)
    producer.flush()
    
def generate_clickstream_record(cif: str, eventType: str, ts: datetime):
    payload = {'cif': cif, 'type': eventType, 'ts': ts.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")}
    send_to_kafka('clickstream', json.dumps(payload).encode('utf-8'))
  
def generate_trx_record(cif: str, amountPLN: float, ts: datetime):
    payload = {'cif': cif, 'amount': amountPLN, 'ts': ts.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")}
    send_to_kafka('trx', json.dumps(payload).encode('utf-8'))
    
engine = create_engine('postgresql+pg8000://postgres:example@postgres:5432/postgres')
con = engine.connect()

def generate_loan(cif: str, day: datetime):
    stmt_loan = text("""
        INSERT INTO V_LOAN 
        VALUES (default, :cif, :account, :day)
    """)
    data = {'cif': cif, 'account': cif, 'day': day}
    con.execute(stmt_loan, **data)
    
def generate_scenario_happy_path(cif: str):
    generate_trx_record(cif, 2000, datetime(2021, 4, 1, 0, 0, 0))
    generate_clickstream_record(cif, 'click', datetime(2021, 4, 2, 12, 0, 0))
    generate_clickstream_record(cif, 'click', datetime(2021, 4, 4, 12, 0, 0))

generate_scenario_happy_path("happy")

def generate_scenario_not_happy_path(cif: str):
    generate_trx_record(cif, 400, datetime(2021, 4, 1, 0, 0, 0))
    generate_clickstream_record(cif, 'click', datetime(2021, 5, 2, 12, 0, 0))

generate_scenario_not_happy_path("nothapp")


The first thing we need to do before building a pattern recognition pipeline is define data sources in FlinkSQL. To connect to data sources, we use connectors. Connectors allow us to treat data stored in PostgreSQL and Kafka as tables.

CREATE TABLE v_loan
(
    id INT,
    customer_id   VARCHAR,
    account_id    VARCHAR,
    decision_dttm AS PROCTIME(),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
      'connector' = 'jdbc-cdc',
      'url' = 'jdbc:postgresql://local-dev_postgres_1:5432/postgres',
      'table-name' = 'v_loan',
      'username' = 'postgres',
      'password' = 'example',
      'cdc.strategy' = 'SIMPLE',
      'cdc.simple-strategy.ordering-columns' = 'id'
      );

CREATE TABLE trx (
     cif STRING,
     amount DOUBLE,
     ts  AS PROCTIME()
) WITH (
      'connector' = 'kafka',
      'topic' = 'trx',
      'properties.bootstrap.servers' = 'local-dev_kafka_1:9092',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
      );

CREATE TABLE clickstream (
     cif STRING,
     type STRING,
     ts  AS PROCTIME()
) WITH (
      'connector' = 'kafka',
      'topic' = 'clickstream',
      'properties.bootstrap.servers' = 'local-dev_kafka_1:9092',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
      );

CREATE VIEW events AS
SELECT UUID()                               AS event_id,
       customer_id                          AS customer_id,
       'loan_event'                         AS type,
       CAST(account_id AS STRING)           AS payload,
       decision_dttm                        AS ts
FROM v_loan
UNION ALL
SELECT UUID()                   AS event_id,
       cif                      AS customer_id,
       'trx_event'              AS type,
       CAST(amount AS STRING)   AS payload,
       ts                       AS ts
FROM trx
UNION ALL
SELECT UUID()              AS event_id,
       cif                 AS customer_id,
       'clickstream_event' AS type,
       type                AS payload,
       ts                  AS ts
FROM clickstream;

This example shows how to define pattern recognition by using FlinkSQL. 

Firstly, we need to define our events.

  • TRX - an event, which describes a transaction with an amount greater than 1000.
  • APP_1 - an event representing the user interaction with the application within 3 days after the transaction.
  • NO_LOAN - an event, which informs usthat the user didn't take a loan.

Secondly, we need to specify the order of events and expected output. We look for a pattern from the user story. To define the pattern, we use the pattern expression from the Flink documentation. The pattern expression syntax is quite similar to a regular expression syntax.

SELECT *
FROM events
        MATCH_RECOGNIZE(
            PARTITION BY customer_id
            ORDER BY ts
            MEASURES
                TRX.event_id    AS trx_event_id,
                TRX.customer_id AS trx_customer_id,
                TRX.type        AS trx_type,
                TRX.payload     AS trx_payload,
                TRX.ts          AS trx_ts,
                APP_1.event_id    AS app_1_event_id,
                APP_1.customer_id AS app_1_customer_id,
                APP_1.type        AS app_1_type,
                APP_1.payload     AS app_1_payload,
                APP_1.ts          AS app_1_ts
            ONE ROW PER MATCH
            PATTERN (TRX APP_1 NOT_LOAN*? APP_2) WITHIN INTERVAL '10' DAY
            DEFINE
                TRX                 AS TRX.type = 'trx_event' AND TRX.payload > 1000,
                APP_1               AS APP_1.type = 'clickstream_event' AND APP_1.ts < TRX.ts + INTERVAL '3' DAY,
                APP_2               AS APP_2.type = 'clickstream_event' AND APP_2.ts > APP_1.ts AND APP_2.ts < APP_1.ts + INTERVAL '7' DAY,
                NO_LOAN             AS NOT_LOAN.type <> 'loan_event'
        ) MR;

As we can see, the pattern recognition query works as expected. 

Conclusions - Complex Event Processing with Flink

Flink is a powerful platform for building real-time data processing platforms, which can be fed from many sources. Using GetInData CDC by JDBC connector, we can start extracting knowledge from legacy applications and implementing "data-driven culture" in an organization. 

Data is one of your company's most valuable assets, and when skillfully used, it allows you to take your business to a new level.

We plan to publish the connector code as an open source project for future work, so stay tuned!

If you would like to know more about Complex Event Processing, check our CEP Platform.

streaming
big data
apache flink
CEP
stream processing
21 October 2021

Want more? Check our articles

8e8a6167
Big Data Event

A Review of the Presentations at the DataMass Gdańsk Summit 2022

The 4th edition of DataMass, and the first one we have had the pleasure of co-organizing, is behind us. We would like to thank all the speakers for…

Read more
data modelling looker pdt vs dbt getindata 2
Tutorial

Data Modelling in Looker: PDT vs DBT

A data-driven approach helps companies to make decisions based on facts rather than perceptions. One of the main elements that  supports this approach…

Read more
getindata cover nifi ingestion nologo
Tutorial

Apache NiFi - why do data engineers love it and hate it at the same time? Blog Series Introduction

Learning new technologies is like falling in love. At the beginning, you enjoy it totally and it is like wearing pink glasses that prevent you from…

Read more
e commerce chatbot llmobszar roboczy 1 4
Tutorial

How to build an e-commerce shopping assistant (chatbot) with LLMs

In the dynamic world of e-commerce, providing exceptional customer service is no longer an option – it's a necessity. The rise of online shopping has…

Read more
data enrichtment flink sql using http connector flink getindata big data blog notext
Tutorial

Data Enrichment in Flink SQL using HTTP Connector For Flink - Part Two

In part one of this blog post series, we have presented a business use case which inspired us to create an HTTP connector for Flink SQL. The use case…

Read more
getindator beautiful magi lake with data visualization under th 04d517e5 6cb7 49b2 af1a 77884a44a1eb
Tutorial

Data lakehouse with Snowflake Iceberg tables - introduction

Snowflake has officially entered the world of Data Lakehouses! What is a data lakehouse, where would such solutions be a perfect fit and how could…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy