Kafka Stream API: MySQL CDC to Apache Kafka with Debezium

dipu11
6 min readAug 25, 2021

--

I was learning kafka. Then started learning kafka stream and was learning how to capture database change-events and source them to kafka topic. Here we are going to learn capturing mysql database change events, source them to kafka topic and consume from kafka-console-consumer.

What you need to start with[Note: I have done all this for my ubuntu-18.04 machine]:

  1. apache Kafka + Zookeepr[kafka tar file:v2.12–2.4.0]
  2. Debezium 0.9.2 Debezium_v0.9.2
  3. MySQL 5.7 installed on your pc

Now, things are all set to have a headstart.

Steps we will follow:

  1. unzip/untar all downloaded resources
  2. Configure MySQL
  3. Configure Apache Kafka
  4. Connector setup
  5. test how data-streaming happens

==================== Step 1: ====================

i). untar your your Kafka tar file in suitable location-like in a dedicated folder in any drive with this command:

tar xvzf your filename.tar.gz

ii). inside your kafka untar file, untar the debezium tar file you downloaded from above link. [You can untar in separate path too!]

iii). Your folder structure will look like this:

Your debezium folder must have contents listed below:

antlr4-runtime-4.7.jar
CONTRIBUTE.md
debezium-connector-mysql-0.9.2.Final.jar
debezium-ddl-parser-0.9.2.Final.jar
mysql-binlog-connector-java-0.19.0.jar
protobuf-java-2.6.1.jar
CHANGELOG.md
COPYRIGHT.txt
debezium-core-0.9.2.Final.jar
LICENSE.txt
mysql-connector-java-8.0.13.jar
README.md

==================== Step 2: ====================

Now, we have to enable binary log to catch the database history change event. To do that, find your MySQL’s my.cnf file [in my case it was in /etc/mysql/my.cnf path]. Open it with root’s permission. Add this lines

[mysqld]

server-id = 123

log_bin = mysql-bin

binlog_format = row

binlog_row_image = full

expire_logs_days = 10

set server-id any unique value. Save it and exit.

Restart MySQL server:

sudo /etc/init.d/mysql restart
//or by this command
sudo service mysql restart

Now open terminal and check server-id consistency if it is ok by this command below and find server-id if matches with this server-id:

mysqld --verbose --help

Now check if your binay-log option is enabled:

mysqladmin variables -uroot -p|grep log_bin

Remove -p flag if your database has no password for root user.

Output should be like this:

Login to MySQL server and create a dedicated user for connector we are going to create:

mysql -u root -p

remove -p option if you don’t have your root password.

Then create debezium user with password dbz [password can be anything]:

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

MySQL config is done now!

==================== Step 3: ====================

cd to your kafka directory and follow below steps sequentially:

i). start zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

ii). start kafka default broker:

bin/kafka-server-start.sh config/server.properties

and wait for a while to fully be ready for the startup time

iii). create a topic named test :

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

iv). Inside kafka-dir/config path, open connect-standalone.properties file and add this:

plugin.path=/your_debezium_folder_container_folder_path

v). This is not directly the debezium folder but its containing folder, so be careful. In my case, I put debezium folder inside kafka folder, so my path was:

plugin.path=/media/t***t/projects/playground/kafka/kafka_2.12-2.4.0

save file and check if

key.converter.schemas.enable=true
value.converter.schemas.enable=true

are set true.

vi). Change topic name to “test” in connect-console-source.properties and connect-console-source.properties files[Not mandatory].

Now start connector:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-console-source.properties config/connect-console-sink.properties

==================== Step 4: ====================

Kafka Connect Setup:[last but very important]:

copy and make curl request with this:

curl -i -X POST -H "Accept:application/json" \

-H "Content-Type:application/json" http://localhost:8083/connectors/ \

-d '{

"name": "mysql-connector",

"config": {

"connector.class": "io.debezium.connector.mysql.MySqlConnector",

"database.hostname": "localhost",

"database.port": "3306",

"database.user": "debezium",

"database.password": "dbz",

"database.server.id": "42",

"database.server.name": "test",

"database.history.kafka.bootstrap.servers": "localhost:9092",

"database.history.kafka.topic": "dbhistory.test" ,

"include.schema.changes": "true"

}

}'

All fields mentioned here are self-explanatory. Make sure database.server.id matches with the server-id you set in your my.cnf file.

To check if your connector setup is ok, run this curl request to check its status:

curl -s "http://localhost:8083/connectors" | jq '.[]' | xargs -I mysql-connector9 curl -s "http://localhost:8083/connectors/mysql-connector9/status" | jq -c -M '[.name,.connector.state,.tasks[].state] |
join(":|:")' | column -s : -t | tr -d \" | sort

If everything is OK, you will see connector’s state and task-state with status of RUNNING for both of them.

I assume, your connector setup is okay, now check the topic list, you will see topic name with your database tables’ name with prefix of topic name you created first+ database+table name i.e. we created topic as test and my few tables are t1, t2, t3 for db dbName, so topic will be:

test.dbName.t1, test.dbName.t2, etc…

List all topic:

kafka-topics --zookeeper localhost:2181 --list

To test streaming, pick any topic named by your table and run cosumer console client:

bin/kafka-console-consumer.sh --bootstrap-server "localhost:9092" --topic "test.mydb.mydb_organization" --from-beginning

Now, update or insert into that table, you will get response like this[check after key for your changes]:

{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"code"
},
{
"type":"int32",
"optional":true,
"default":1,
"field":"object_version_id"
},
{
"type":"string",
"optional":true,
"field":"type"
},
{
"type":"string",
"optional":true,
"field":"name_en"
},
{
"type":"string",
"optional":true,
"field":"name_bn"
},
{
"type":"string",
"optional":true,
"field":"address"
},
{
"type":"string",
"optional":true,
"field":"email"
},
{
"type":"string",
"optional":true,
"field":"contact"
},
{
"type":"int16",
"optional":true,
"field":"status"
},
{
"type":"string",
"optional":true,
"field":"short_name"
}
],
"optional":true,
"name":"test.mydb.mydb_organization.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"code"
},
{
"type":"int32",
"optional":true,
"default":1,
"field":"object_version_id"
},
{
"type":"string",
"optional":true,
"field":"type"
},
{
"type":"string",
"optional":true,
"field":"name_en"
},
{
"type":"string",
"optional":true,
"field":"name_bn"
},
{
"type":"string",
"optional":true,
"field":"address"
},
{
"type":"string",
"optional":true,
"field":"email"
},
{
"type":"string",
"optional":true,
"field":"contact"
},
{
"type":"int16",
"optional":true,
"field":"status"
},
{
"type":"string",
"optional":true,
"field":"short_name"
}
],
"optional":true,
"name":"test.mydb.mydb_organization.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"version"
},
{
"type":"string",
"optional":true,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"server_id"
},
{
"type":"int64",
"optional":false,
"field":"ts_sec"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"boolean",
"optional":true,
"default":false,
"field":"snapshot"
},
{
"type":"int64",
"optional":true,
"field":"thread"
},
{
"type":"string",
"optional":true,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"string",
"optional":true,
"field":"query"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
],
"optional":false,
"name":"test.mydb.mydb_organization.Envelope"
},
"payload":{
"before":{
"id":5,
"code":"500",
"object_version_id":0,
"type":"ORG_NAME",
"name_en":"werwer",
"name_bn":null,
"address":"fdgd",
"email":"a@b.com",
"contact":null,
"status":1,
"short_name":"rewerwerwer"
},
"after":{
"id":5,
"code":"500",
"object_version_id":0,
"type":"ORG_NAME",
"name_en":"test org",
"name_bn":null,
"address":"my demo add",
"email":"a@b.com",
"contact":null,
"status":1,
"short_name":"rewerwerwer"
},
"source":{
"version":"0.9.2.Final",
"connector":"mysql",
"name":"test",
"server_id":123,
"ts_sec":1581402039,
"gtid":null,
"file":"mysql-bin.000011",
"pos":5106,
"row":0,
"snapshot":false,
"thread":65,
"db":"mydb",
"table":"mydb_organization",
"query":null
},
"op":"u",
"ts_ms":1581402039640
}
}
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"code"
},
{
"type":"bytes",
"optional":true,
"field":"authentication"
}
],
"optional":true,
"name":"test.mydb.oauth_code.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"code"
},
{
"type":"bytes",
"optional":true,
"field":"authentication"
}
],
"optional":true,
"name":"test.mydb.oauth_code.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"version"
},
{
"type":"string",
"optional":true,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"server_id"
},
{
"type":"int64",
"optional":false,
"field":"ts_sec"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"boolean",
"optional":true,
"default":false,
"field":"snapshot"
},
{
"type":"int64",
"optional":true,
"field":"thread"
},
{
"type":"string",
"optional":true,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"string",
"optional":true,
"field":"query"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
],
"optional":false,
"name":"test.mydb.oauth_code.Envelope"
},
"payload":{
"before":null,
"after":{
"code":"22",
"authentication":"MTIyMzMz"
},
"source":{
"version":"0.9.2.Final",
"connector":"mysql",
"name":"test",
"server_id":123,
"ts_sec":1581401679,
"gtid":null,
"file":"mysql-bin.000011",
"pos":4792,
"row":0,
"snapshot":false,
"thread":65,
"db":"mydb",
"table":"oauth_code",
"query":null
},
"op":"c",
"ts_ms":1581401679228
}
}

Sorry! The response is too big, yet adding for your convenience.

Now, you can do whatever you want to do with your data, parse/modify or sink to another db, etc.

Hope this will help!

--

--

dipu11
dipu11

Written by dipu11

Software Engineer. Prefer to work on backend technologies such as(not limited to): Java, Spring, RDBMS, ES, JS, JQuery

Responses (2)