17 min read

Change Data Capture by JDBC with FlinkSQL

These days, Big Data and Business Intelligence platforms are one of the fastest-growing areas of computer science. Companies want to extract knowledge from their data and analyze it in real-time to make data-driven business decisions. At GetInData we struggle with these challenges to tame our client's data and provide best-in-class solutions to extract knowledge from data in real-time.

Complex event processing is an innovative approach which opens new opportunities for companies that want to monitor and analyze, and respond to events occurring throughout the organization. Today I would like to share our experience with CEP by presenting a new Flink connector, which GetInData has developed. In the example, we will create a simple Flink job, which can capture changes from a SQL database and run pattern recognition on data streams from many sources using FlinkSQL.

Before we start, I will define some concepts, which will be used in the article.


Flink and FlinkSQL

Flink is an open-source framework to combat the subject of complex event processing. It supports low-latency stream processing on a large scale. Furthermore, FlinkSQL is a language provided by Flink, which allows you to write complex data pipelines without using a single line of Java or Scala code. If you know SQL, you will be able to learn FlinkSQL and build your pipelines quickly. Flink provides "connectors" and "sinks", which allow us to treat data from external systems like Kafka, ElasticSearch or PostgreSQL, as a table, which FlinkSQL can process.

Complex Event Processing (CEP)

The term "complex event processing" defines methods of analyzing pattern relationships between streamed events. When done in real-time, it can provide an advanced insight further into the data processing system.

Change Data Capture (CDC)

CDC is a method of recognizing when data in a source system has changed and capturing these changes for further processing. For example, you can use CDC to capture data changes in your SQL database and produce a stream of events, which describe data changes. The stream consists of a sequence of events, which describe insert, update, delete operations performed on database rows. Moreover, the CDC stream can be processed by Flink, which allows us to run complex analytics jobs like complex event processing or pattern recognition.

Use Case

I believe that the easiest way to understand an approach is to show it in action. Therefore, I created a simple use case inspired by real business needs from a product owner of a mobile banking application.

Let's start with a user story: 

As a business analyst, I want to check the marketing campaign's effectiveness in promoting quick loans using the mobile application. 

I want to have a view of users who:

  • carried out a transaction for amount > 1000
  • used the mobile application within 3 days of the transaction
  • didn't take a loan
  • used the mobile application again within 7 days from the previous usage

Data are stored in many sources:

  • Kafka has topics with transactions and mobile application events
  • PostgreSQL contains information about user loans

The view has to be updated in near real-time.

We will use Flink and pattern recognition from FlinkSQL to build a solution that can meet the business requirements. Flink provides a connector to Kafka, treating a topic as a table in FlinkSQL. It allows us to process information about transactions and mobile application events, however capturing changes from DB is a more challenging problem. We need to transform data changes from the SQL databases as a stream of events. We have several tools on the market, which can help us with the CDC problem, so let's have a look at them.


This is one of the most popular open-source CDC tools, maintained by Red Hat. Debezium captures data changes from DB transaction logs and publish appropriate events on Kafka.


  • does not have an impact on the DB performance
  • capture data in a real-time
  • can capture deletes
  • never misses an event (it captures every change in the table)


  • difficult setup - it requires using Kafka
  • needs access to a database binlog
  • supports only a few databases
  • some old versions of databases do not support CDC

Ververica Flink CDC Connectors

Ververica provides flink-cdc-connectors, which can easily be used with Flink to capture data changes. In addition, the connector has integrated Debezium as a CDC engine, so it doesn't require extra effort to set up a full Debezium stack.


  • features provided by Debezium, but without setting up a "full environment."


  • supports only MySQL (5.7, 8.0.x), PostgreSQL (9.6, 10, 11, 12) and MongoDB (4.0, 4.2, 5.0)
  • needs access to a database binlog
  • some old versions of databases do not support CDC

Flink Connector JDBC

Connector, which allows us to write and read data from SQL databases directly in the FlinkSQL. It is one of the official connectors maintained by Apache Flink.


  • allows us to write results into SQL databases
  • built-in to Flink, no need to add anything


  • reads data from a table only once - the connector does not provide CDC
  • supports only a few DBs (MySQL, PostgreSQL, Derby)

GetInData CDC by JDBC Connector 

Connector developed by GetInData for CDC purpose. The connector allows us to read data from SQL databases by periodically reading data from tables.


  • does not require additional components, as Debezium does.
  • can easily be reused with any database. Just provide SQL queries for the CDC and JDBC Driver.
  • uses pure SQL for CDC, so it doesn't require the special configuration of a database.


  • does not support delete operations.
  • some CDC strategies require additional columns eg. last_modify_date.
  • may have an impact on a database because it reads data by a periodically run SQL query.
  • refreshes data in near real-time. The CDC logic is run at specific intervals.
  • may omit an CDC event in frequently changing rows (insertion and deletion of a row, before the connector refreshes data). 

Custom connector overview

We used the Table API provided by Flink to develop our CDC connector. Flink provides interfaces, which must be implemented by a custom user-specific logic to treat external data sources like a table. Next, the table can be processed by using FlinkSQL. Flink won't modify any external data while executing a query. Instead, the Flink execution engine uses a table definition saved in a CatalogTable, to read all of the data from the source during query execution. 

For more details on how to write custom connectors, please check the Flink documentation.


In this example, I want to show you how to used GetInData CDC by JDBC Connector with pattern recognition in FlinkSQL, which meets our user story's business requirements.

Before we start building Flink jobs, I want to define the data model used in the example. 

On Kafka, we are going to include the following topics:

TopixExample payloadDescription
trx{"cif":"3", "amount": 200, "ts": "2021-05-03 00:00:00"}Contains information about transactions carried out by the user.
clikstream{"cif":"3", "type":"click", "ts": "2021-05-03 00:00:05"}Contains information about user behaviour in the mobile application

In PostgreSQL, we are going to have tables:

v_loanCREATE TABLE v_loan
( id serial constraint v_loan_pk primary key,
customer_id varchar(10),
account_id varchar(30),
decision_dttm timestamp
Contains information about loans taken by the user

In this example, I want to set up our environment by using docker-compose. The script will set up a Flink cluster, Kafka and Postgres in Docker containers.

version: "3"


    image: flink:1.13.2-scala_2.12-java11
    hostname: jobmanager
      - "6123"
      - "8081:8081"
    command: jobmanager
      JOB_MANAGER_RPC_ADDRESS: "jobmanager"
      - flink_jdbc_connector

    image: flink:1.13.2-scala_2.12-java11
      - "6121"
      - "6122"
      - jobmanager
    command: taskmanager
      - jobmanager:jobmanager
      JOB_MANAGER_RPC_ADDRESS: "jobmanager"
      - flink_jdbc_connector

    image: postgres
      POSTGRES_PASSWORD: example
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
      - "5432:5432"
      - flink_jdbc_connector

    image: wurstmeister/kafka:2.12-2.4.0
      - "9092:9092"
      - zookeeper
      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      - /var/run/docker.sock:/var/run/docker.sock
      - flink_jdbc_connector

    driver: bridge

It might be challenging to set up a complete environment with an example mobile application, so I prepared a small python script, which we can use to simulate user behaviour. The script will simulate two scenarios - the happy path, which meets the user story and the unhappy path, which should not match the pattern recognition query.

import json
import time
from kafka import KafkaProducer
from datetime import datetime, timezone
from sqlalchemy import create_engine
from sqlalchemy.sql import text

kafka_server = "local-dev_kafka_1:9092"
producer = KafkaProducer(bootstrap_servers=kafka_server)

def send_to_kafka(topic: str, record: bytes):
    producer.send(topic, record)
def generate_clickstream_record(cif: str, eventType: str, ts: datetime):
    payload = {'cif': cif, 'type': eventType, 'ts': ts.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")}
    send_to_kafka('clickstream', json.dumps(payload).encode('utf-8'))
def generate_trx_record(cif: str, amountPLN: float, ts: datetime):
    payload = {'cif': cif, 'amount': amountPLN, 'ts': ts.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")}
    send_to_kafka('trx', json.dumps(payload).encode('utf-8'))
engine = create_engine('postgresql+pg8000://postgres:example@postgres:5432/postgres')
con = engine.connect()

def generate_loan(cif: str, day: datetime):
    stmt_loan = text("""
        VALUES (default, :cif, :account, :day)
    data = {'cif': cif, 'account': cif, 'day': day}
    con.execute(stmt_loan, **data)
def generate_scenario_happy_path(cif: str):
    generate_trx_record(cif, 2000, datetime(2021, 4, 1, 0, 0, 0))
    generate_clickstream_record(cif, 'click', datetime(2021, 4, 2, 12, 0, 0))
    generate_clickstream_record(cif, 'click', datetime(2021, 4, 4, 12, 0, 0))


def generate_scenario_not_happy_path(cif: str):
    generate_trx_record(cif, 400, datetime(2021, 4, 1, 0, 0, 0))
    generate_clickstream_record(cif, 'click', datetime(2021, 5, 2, 12, 0, 0))


The first thing we need to do before building a pattern recognition pipeline is define data sources in FlinkSQL. To connect to data sources, we use connectors. Connectors allow us to treat data stored in PostgreSQL and Kafka as tables.

    id INT,
    customer_id   VARCHAR,
    account_id    VARCHAR,
    decision_dttm AS PROCTIME(),
) WITH (
      'connector' = 'jdbc-cdc',
      'url' = 'jdbc:postgresql://local-dev_postgres_1:5432/postgres',
      'table-name' = 'v_loan',
      'username' = 'postgres',
      'password' = 'example',
      'cdc.strategy' = 'SIMPLE',
      'cdc.simple-strategy.ordering-columns' = 'id'

     cif STRING,
     amount DOUBLE,
     ts  AS PROCTIME()
) WITH (
      'connector' = 'kafka',
      'topic' = 'trx',
      'properties.bootstrap.servers' = 'local-dev_kafka_1:9092',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'

CREATE TABLE clickstream (
     cif STRING,
     type STRING,
     ts  AS PROCTIME()
) WITH (
      'connector' = 'kafka',
      'topic' = 'clickstream',
      'properties.bootstrap.servers' = 'local-dev_kafka_1:9092',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'

SELECT UUID()                               AS event_id,
       customer_id                          AS customer_id,
       'loan_event'                         AS type,
       CAST(account_id AS STRING)           AS payload,
       decision_dttm                        AS ts
FROM v_loan
SELECT UUID()                   AS event_id,
       cif                      AS customer_id,
       'trx_event'              AS type,
       CAST(amount AS STRING)   AS payload,
       ts                       AS ts
FROM trx
SELECT UUID()              AS event_id,
       cif                 AS customer_id,
       'clickstream_event' AS type,
       type                AS payload,
       ts                  AS ts
FROM clickstream;

This example shows how to define pattern recognition by using FlinkSQL. 

Firstly, we need to define our events.

  • TRX - an event, which describes a transaction with an amount greater than 1000.
  • APP_1 - an event representing the user interaction with the application within 3 days after the transaction.
  • NO_LOAN - an event, which informs usthat the user didn't take a loan.

Secondly, we need to specify the order of events and expected output. We look for a pattern from the user story. To define the pattern, we use the pattern expression from the Flink documentation. The pattern expression syntax is quite similar to a regular expression syntax.

FROM events
            PARTITION BY customer_id
            ORDER BY ts
                TRX.event_id    AS trx_event_id,
                TRX.customer_id AS trx_customer_id,
                TRX.type        AS trx_type,
                TRX.payload     AS trx_payload,
                TRX.ts          AS trx_ts,
                APP_1.event_id    AS app_1_event_id,
                APP_1.customer_id AS app_1_customer_id,
                APP_1.type        AS app_1_type,
                APP_1.payload     AS app_1_payload,
                APP_1.ts          AS app_1_ts
            ONE ROW PER MATCH
                TRX                 AS TRX.type = 'trx_event' AND TRX.payload > 1000,
                APP_1               AS APP_1.type = 'clickstream_event' AND APP_1.ts < TRX.ts + INTERVAL '3' DAY,
                APP_2               AS APP_2.type = 'clickstream_event' AND APP_2.ts > APP_1.ts AND APP_2.ts < APP_1.ts + INTERVAL '7' DAY,
                NO_LOAN             AS NOT_LOAN.type <> 'loan_event'
        ) MR;

As we can see, the pattern recognition query works as expected. 

Conclusions - Complex Event Processing with Flink

Flink is a powerful platform for building real-time data processing platforms, which can be fed from many sources. Using GetInData CDC by JDBC connector, we can start extracting knowledge from legacy applications and implementing "data-driven culture" in an organization. 

Data is one of your company's most valuable assets, and when skillfully used, it allows you to take your business to a new level.

We plan to publish the connector code as an open source project for future work, so stay tuned!

If you would like to know more about Complex Event Processing, check our CEP Platform.

big data
apache flink
stream processing
21 October 2021

Want more? Check our articles

getindata monitoring alert data streaming platfrorm

How to build continuous processing for real-time data streaming platform?

Real-time data streaming platforms are tough to create and to maintain. This difficulty is caused by a huge amount of data that we have to process as…

Read more
getindata integartion tests spark applications

Integration tests of Spark applications

You just finished the Apache Spark-based application. You ran so many times, you just know the app works exactly as expected: it loads the input…

Read more
datagenerationobszar roboczy 1 4

Data online generation for event stream processing

In a lot of business cases that we solve at Getindata when working with our clients, we need to analyze sessions: a series of related events of actors…

Read more
power of big dataobszar roboczy 1 3x 100

Power of the Big Data: Industry

Welcome to the third part of the "Power of Big Data" series, in which we describe how Big Data tools and solutions support the development of modern…

Read more
Big Data Event

Five big ideas to learn at Big Data Tech Warsaw 2020

Hello again in 2020. It’s a new year and the new, 6th edition of Big Data Tech Warsaw is coming soon! Save the date: 27th of February. We have put…

Read more
data analyst data analytics how start career non technical background getindata big data blog

Data Analyst - how to start your career with a non-technical background

Interested in joining the data analytics world? Not sure where to start? Are more and more questions popping into your head? I’ve been there myself…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.

The administrator of your personal data is GetInData Sp. z o.o. Sp.k with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the  Terms & Conditions. For more information on personal data processing and your rights please see  Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy