百木园-与人分享,
就是让自己快乐。

Debezium的基本使用(以MySQL为例)

  • GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。
  • GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。

一、Debezium介绍

摘自官网:

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。

二、基本使用

下面以MySQL为例介绍Debezium的基本使用。

1. MySQL的准备工作

  1. 准备一个MySQL用户并且拥有相应权限,像这样:
CREATE USER \'dbz\'@\'%\' IDENTIFIED BY \'dbzpwd\';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO \'dbz\' IDENTIFIED BY \'dbzpwd\';
  1. 检查MySQL是否开启log-bin
SELECT variable_value as \"BINARY LOGGING STATUS (log-bin) ::\" FROM information_schema.global_variables WHERE variable_name=\'log_bin\';

-- If the following error occurs: The \'INFORMATION_SCHEMA.GLOBAL_VARIABLES\' feature is disabled...
-- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

如果是OFF则需要修改MySQL配置文件,类似下面这样:

server-id         = 223344		#必须有
log_bin           = mysql-bin	#log_bin的值是binlog文件序列的基本名称
binlog_format     = ROW				#必须是ROW
binlog_row_image  = FULL			#必须是FULL
expire_logs_days  = 10				#依据实际情况而定
  1. 准备数据库&表
create database inventory;
create table inventory.a (id bigint primary key auto_increment, name varchar(32));
insert into inventory.a values (null, \'n1\'),(null, \'n2\'),(null, \'n3\');

2. 编写程序

2.1. 工程依赖(Maven)

pom.xml

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

目前Debezium最新稳定版本为:1.9.5.Final

2.2. 准备数据库&表

create database inventory;
create table inventory.a (id bigint primary key, name varchar(32));
insert into inventory.a values (1, \'n1\'),(2, \'n2\'),(3, \'n3\');

2.3. 代码编写

package com.greatdb.dbzdemo;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;

/**
 * @author wang.jianwen
 * @version 1.0
 * @date 2022/07/29
 */
public class DebeziumTest {

    private static DebeziumEngine<ChangeEvent<String, String>> engine;

    public static void main(String[] args) throws Exception {
        final Properties props = new Properties();
        props.setProperty(\"name\", \"dbz-engine\");
        props.setProperty(\"connector.class\", \"io.debezium.connector.mysql.MySqlConnector\");

        //offset config begin - 使用文件来存储已处理的binlog偏移量
        props.setProperty(\"offset.storage\", \"org.apache.kafka.connect.storage.FileOffsetBackingStore\");
        props.setProperty(\"offset.storage.file.filename\", \"/tmp/dbz/storage/mysql_offsets.dat\");
        props.setProperty(\"offset.flush.interval.ms\", \"0\");
        //offset config end

        props.setProperty(\"database.server.name\", \"mysql-connector\");
        props.setProperty(\"database.history\", \"io.debezium.relational.history.FileDatabaseHistory\");
        props.setProperty(\"database.history.file.filename\", \"/tmp/dbz/storage/mysql_dbhistory.txt\");

        props.setProperty(\"database.server.id\", \"122112\");	//需要与MySQL的server-id不同
        props.setProperty(\"database.hostname\", \"tmg\");
        props.setProperty(\"database.port\", \"3306\");
        props.setProperty(\"database.user\", \"mysqluser\");
        props.setProperty(\"database.password\", \"mysqlpw\");
        props.setProperty(\"database.include.list\", \"inventory\");//要捕获的数据库名
        props.setProperty(\"table.include.list\", \"inventory.a\");//要捕获的数据表

        props.setProperty(\"snapshot.mode\", \"initial\");//全量+增量

        // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式
        engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> {
                    System.out.println(record);//输出到控制台
                })
                .using((success, message, error) -> {
                    if (error != null) {
                        // 报错回调
                        System.out.println(\"------------error, message:\" + message + \"exception:\" + error);
                    }
                    closeEngine(engine);
                })
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
        addShutdownHook(engine);
        awaitTermination(executor);

        System.out.println(\"------------main finished.\");
    }

    private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {
        try {
            engine.close();
        } catch (IOException ignored) {
        }
    }

    private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));
    }

    private static void awaitTermination(ExecutorService executor) {
        if (executor != null) {
            try {
                executor.shutdown();
                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

3. 测试

程序跑起来后,可以看到控制台输出:

...(省略)
EmbeddedEngineChangeEvent [key={\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"}],\"optional\":false,\"name\":\"mysql_connector.inventory.a.Key\"},\"payload\":{\"id\":1}}, value={\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"mysql_connector.inventory.a.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"mysql_connector.inventory.a.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":true,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"server_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"gtid\"},{\"type\":\"string\",\"optional\":false,\"field\":\"file\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"pos\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"row\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"thread\"},{\"type\":\"string\",\"optional\":true,\"field\":\"query\"}],\"optional\":false,\"name\":\"io.debezium.connector.mysql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"mysql_connector.inventory.a.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"name\":\"n1\"},\"source\":{\"version\":\"1.8.1.Final\",\"connector\":\"mysql\",\"name\":\"mysql-connector\",\"ts_ms\":1659064005186,\"snapshot\":\"true\",\"db\":\"inventory\",\"sequence\":null,\"table\":\"a\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000001\",\"pos\":154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":1659064005191,\"transaction\":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic=\'mysql-connector.inventory.a\', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"}],\"optional\":false,\"name\":\"mysql_connector.inventory.a.Key\"},\"payload\":{\"id\":2}}, value={\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"mysql_connector.inventory.a.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"mysql_connector.inventory.a.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":true,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"server_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"gtid\"},{\"type\":\"string\",\"optional\":false,\"field\":\"file\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"pos\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"row\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"thread\"},{\"type\":\"string\",\"optional\":true,\"field\":\"query\"}],\"optional\":false,\"name\":\"io.debezium.connector.mysql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"mysql_connector.inventory.a.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":2,\"name\":\"n2\"},\"source\":{\"version\":\"1.8.1.Final\",\"connector\":\"mysql\",\"name\":\"mysql-connector\",\"ts_ms\":1659064005195,\"snapshot\":\"true\",\"db\":\"inventory\",\"sequence\":null,\"table\":\"a\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000001\",\"pos\":154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":1659064005196,\"transaction\":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic=\'mysql-connector.inventory.a\', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"}],\"optional\":false,\"name\":\"mysql_connector.inventory.a.Key\"},\"payload\":{\"id\":3}}, value={\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"mysql_connector.inventory.a.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"mysql_connector.inventory.a.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":true,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"server_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"gtid\"},{\"type\":\"string\",\"optional\":false,\"field\":\"file\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"pos\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"row\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"thread\"},{\"type\":\"string\",\"optional\":true,\"field\":\"query\"}],\"optional\":false,\"name\":\"io.debezium.connector.mysql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"mysql_connector.inventory.a.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":3,\"name\":\"n3\"},\"source\":{\"version\":\"1.8.1.Final\",\"connector\":\"mysql\",\"name\":\"mysql-connector\",\"ts_ms\":1659064005196,\"snapshot\":\"last\",\"db\":\"inventory\",\"sequence\":null,\"table\":\"a\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000001\",\"pos\":154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":1659064005196,\"transaction\":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic=\'mysql-connector.inventory.a\', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
...(省略)

可以看到全量的数据已经输出,关键的数据如下:

...\"payload\":{\"before\":null,\"after\":{\"id\":1,\"name\":\"n1\"}...\"op\":\"r\"...
...\"payload\":{\"before\":null,\"after\":{\"id\":2,\"name\":\"n2\"}...\"op\":\"r\"...
...\"payload\":{\"before\":null,\"after\":{\"id\":3,\"name\":\"n3\"}...\"op\":\"r\"...
  • 接下来新增一条数据:
insert into inventory.a values (4, \'n4\');

控制台输出:

...\"payload\":{\"before\":null,\"after\":{\"id\":4,\"name\":\"n4\"}...\"op\":\"c\"...
  • 修改一条数据:
update inventory.a set name = \'n4-upd\' where id = 4;

控制台输出:

...\"payload\":{\"before\":{\"id\":4,\"name\":\"n4\"},\"after\":{\"id\":4,\"name\":\"n4-upd\"}...\"op\":\"u\"...
  • 删除一条数据:
delete from inventory.a where id = 1;

控制台输出:

...\"payload\":{\"before\":{\"id\":1,\"name\":\"n1\"},\"after\":null...\"op\":\"d\"...

三、总结

本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。

参考:https://debezium.io/documentation/reference/1.8/index.html


来源:https://www.cnblogs.com/greatsql/p/16607170.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » Debezium的基本使用(以MySQL为例)

相关推荐

  • 暂无文章