I was learning kafka. Then started learning kafka stream and was learning how to capture database change-events and source them to kafka topic. Here we are going to learn capturing mysql database change events, source them to kafka topic and consume from kafka-console-consumer.
What you need to start with[Note: I have done all this for my ubuntu-18.04 machine]:
- apache Kafka + Zookeepr[kafka tar file:v2.12–2.4.0]
- Debezium 0.9.2 Debezium_v0.9.2
- MySQL 5.7 installed on your pc
Now, things are all set to have a headstart.
Steps we will follow:
- unzip/untar all downloaded resources
- Configure MySQL
- Configure Apache Kafka
- Connector setup
- test how data-streaming happens
==================== Step 1: ====================
i). untar your your Kafka tar file in suitable location-like in a dedicated folder in any drive with this command:
tar xvzf your filename.tar.gz
ii). inside your kafka untar file, untar the debezium tar file you downloaded from above link. [You can untar in separate path too!]
iii). Your folder structure will look like this:
Your debezium folder must have contents listed below:
antlr4-runtime-4.7.jar
CONTRIBUTE.md
debezium-connector-mysql-0.9.2.Final.jar
debezium-ddl-parser-0.9.2.Final.jar
mysql-binlog-connector-java-0.19.0.jar
protobuf-java-2.6.1.jar
CHANGELOG.md
COPYRIGHT.txt
debezium-core-0.9.2.Final.jar
LICENSE.txt
mysql-connector-java-8.0.13.jar
README.md
==================== Step 2: ====================
Now, we have to enable binary log to catch the database history change event. To do that, find your MySQL’s my.cnf file [in my case it was in /etc/mysql/my.cnf path]. Open it with root’s permission. Add this lines
[mysqld]
server-id = 123
log_bin = mysql-bin
binlog_format = row
binlog_row_image = full
expire_logs_days = 10
set server-id any unique value. Save it and exit.
Restart MySQL server:
sudo /etc/init.d/mysql restart
//or by this command
sudo service mysql restart
Now open terminal and check server-id consistency if it is ok by this command below and find server-id if matches with this server-id:
mysqld --verbose --help
Now check if your binay-log option is enabled:
mysqladmin variables -uroot -p|grep log_bin
Remove -p flag if your database has no password for root user.
Output should be like this:
Login to MySQL server and create a dedicated user for connector we are going to create:
mysql -u root -p
remove -p option if you don’t have your root password.
Then create debezium user with password dbz [password can be anything]:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
MySQL config is done now!
==================== Step 3: ====================
cd to your kafka directory and follow below steps sequentially:
i). start zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
ii). start kafka default broker:
bin/kafka-server-start.sh config/server.properties
and wait for a while to fully be ready for the startup time
iii). create a topic named test :
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
iv). Inside kafka-dir/config path, open connect-standalone.properties file and add this:
plugin.path=/your_debezium_folder_container_folder_path
v). This is not directly the debezium folder but its containing folder, so be careful. In my case, I put debezium folder inside kafka folder, so my path was:
plugin.path=/media/t***t/projects/playground/kafka/kafka_2.12-2.4.0
save file and check if
key.converter.schemas.enable=true
value.converter.schemas.enable=true
are set true.
vi). Change topic name to “test” in connect-console-source.properties and connect-console-source.properties files[Not mandatory].
Now start connector:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-console-source.properties config/connect-console-sink.properties
==================== Step 4: ====================
Kafka Connect Setup:[last but very important]:
copy and make curl request with this:
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "test",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.test" ,
"include.schema.changes": "true"
}
}'
All fields mentioned here are self-explanatory. Make sure database.server.id matches with the server-id you set in your my.cnf file.
To check if your connector setup is ok, run this curl request to check its status:
curl -s "http://localhost:8083/connectors" | jq '.[]' | xargs -I mysql-connector9 curl -s "http://localhost:8083/connectors/mysql-connector9/status" | jq -c -M '[.name,.connector.state,.tasks[].state] |
join(":|:")' | column -s : -t | tr -d \" | sort
If everything is OK, you will see connector’s state and task-state with status of RUNNING for both of them.
I assume, your connector setup is okay, now check the topic list, you will see topic name with your database tables’ name with prefix of topic name you created first+ database+table name i.e. we created topic as test and my few tables are t1, t2, t3 for db dbName, so topic will be:
test.dbName.t1, test.dbName.t2, etc…
List all topic:
kafka-topics --zookeeper localhost:2181 --list
To test streaming, pick any topic named by your table and run cosumer console client:
bin/kafka-console-consumer.sh --bootstrap-server "localhost:9092" --topic "test.mydb.mydb_organization" --from-beginning
Now, update or insert into that table, you will get response like this[check after key for your changes]:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"code"
},
{
"type":"int32",
"optional":true,
"default":1,
"field":"object_version_id"
},
{
"type":"string",
"optional":true,
"field":"type"
},
{
"type":"string",
"optional":true,
"field":"name_en"
},
{
"type":"string",
"optional":true,
"field":"name_bn"
},
{
"type":"string",
"optional":true,
"field":"address"
},
{
"type":"string",
"optional":true,
"field":"email"
},
{
"type":"string",
"optional":true,
"field":"contact"
},
{
"type":"int16",
"optional":true,
"field":"status"
},
{
"type":"string",
"optional":true,
"field":"short_name"
}
],
"optional":true,
"name":"test.mydb.mydb_organization.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"code"
},
{
"type":"int32",
"optional":true,
"default":1,
"field":"object_version_id"
},
{
"type":"string",
"optional":true,
"field":"type"
},
{
"type":"string",
"optional":true,
"field":"name_en"
},
{
"type":"string",
"optional":true,
"field":"name_bn"
},
{
"type":"string",
"optional":true,
"field":"address"
},
{
"type":"string",
"optional":true,
"field":"email"
},
{
"type":"string",
"optional":true,
"field":"contact"
},
{
"type":"int16",
"optional":true,
"field":"status"
},
{
"type":"string",
"optional":true,
"field":"short_name"
}
],
"optional":true,
"name":"test.mydb.mydb_organization.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"version"
},
{
"type":"string",
"optional":true,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"server_id"
},
{
"type":"int64",
"optional":false,
"field":"ts_sec"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"boolean",
"optional":true,
"default":false,
"field":"snapshot"
},
{
"type":"int64",
"optional":true,
"field":"thread"
},
{
"type":"string",
"optional":true,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"string",
"optional":true,
"field":"query"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
],
"optional":false,
"name":"test.mydb.mydb_organization.Envelope"
},
"payload":{
"before":{
"id":5,
"code":"500",
"object_version_id":0,
"type":"ORG_NAME",
"name_en":"werwer",
"name_bn":null,
"address":"fdgd",
"email":"a@b.com",
"contact":null,
"status":1,
"short_name":"rewerwerwer"
},
"after":{
"id":5,
"code":"500",
"object_version_id":0,
"type":"ORG_NAME",
"name_en":"test org",
"name_bn":null,
"address":"my demo add",
"email":"a@b.com",
"contact":null,
"status":1,
"short_name":"rewerwerwer"
},
"source":{
"version":"0.9.2.Final",
"connector":"mysql",
"name":"test",
"server_id":123,
"ts_sec":1581402039,
"gtid":null,
"file":"mysql-bin.000011",
"pos":5106,
"row":0,
"snapshot":false,
"thread":65,
"db":"mydb",
"table":"mydb_organization",
"query":null
},
"op":"u",
"ts_ms":1581402039640
}
}
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"code"
},
{
"type":"bytes",
"optional":true,
"field":"authentication"
}
],
"optional":true,
"name":"test.mydb.oauth_code.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"code"
},
{
"type":"bytes",
"optional":true,
"field":"authentication"
}
],
"optional":true,
"name":"test.mydb.oauth_code.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"version"
},
{
"type":"string",
"optional":true,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"server_id"
},
{
"type":"int64",
"optional":false,
"field":"ts_sec"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"boolean",
"optional":true,
"default":false,
"field":"snapshot"
},
{
"type":"int64",
"optional":true,
"field":"thread"
},
{
"type":"string",
"optional":true,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"string",
"optional":true,
"field":"query"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
],
"optional":false,
"name":"test.mydb.oauth_code.Envelope"
},
"payload":{
"before":null,
"after":{
"code":"22",
"authentication":"MTIyMzMz"
},
"source":{
"version":"0.9.2.Final",
"connector":"mysql",
"name":"test",
"server_id":123,
"ts_sec":1581401679,
"gtid":null,
"file":"mysql-bin.000011",
"pos":4792,
"row":0,
"snapshot":false,
"thread":65,
"db":"mydb",
"table":"oauth_code",
"query":null
},
"op":"c",
"ts_ms":1581401679228
}
}
Sorry! The response is too big, yet adding for your convenience.
Now, you can do whatever you want to do with your data, parse/modify or sink to another db, etc.
Hope this will help!