各位观众老爷,大家好!今天咱们聊点硬核的,关于 MySQL 和 Kafka 勾搭成奸…哦不,是强强联合的秘密武器:Debezium!
开场白:数据这厮,太不老实了!
在互联网世界里,数据就是金钱,谁掌握了数据,谁就掌握了财富密码(误)。但是,数据这玩意儿它不老实啊,它总是在不停地变化,就像你女朋友的心情一样。
特别是对于 MySQL 这种关系型数据库来说,数据变更简直是家常便饭。增删改查,一刻不停。那问题来了,其他系统怎么实时感知到这些变化呢?难道要不停地轮询 MySQL,问它:“你变了吗?你变了吗?” 这也太傻了吧!不仅浪费资源,还延迟贼高。
所以,我们需要一种更优雅的方式,让 MySQL 主动告诉我们:“老子变了!快来看!” 这就是 CDC (Change Data Capture) 的魅力所在。
第一幕:Binlog 的秘密花园
要实现 CDC,首先得找到数据变化的源头。在 MySQL 里面,这个源头就是 Binlog (Binary Log)。
Binlog 记录了所有对 MySQL 数据库的修改操作,包括 INSERT、UPDATE、DELETE 等等。它就像一个日记本,详细记录了数据库发生的每一件事。
想要开启 Binlog,需要在 MySQL 的配置文件 (my.cnf 或 my.ini) 中进行设置,比如:
[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1
log_bin = mysql-bin
: 启用 Binlog,并指定 Binlog 文件的前缀为mysql-bin
。binlog_format = ROW
: 指定 Binlog 的格式为ROW
。这个格式会记录每一行数据的变化,更适合 CDC。还有另外两种格式STATEMENT
和MIXED
,STATEMENT
记录的是SQL语句,MIXED
则是两种的混合. 优先推荐ROW
格式。server_id = 1
: 指定 MySQL 服务器的唯一 ID。在集群环境中,每个服务器的 ID 必须不同。
重启 MySQL 服务后,Binlog 就开始工作了。你可以通过 SHOW BINARY LOGS;
命令查看当前的 Binlog 文件列表。
第二幕:Debezium 闪亮登场
有了 Binlog,接下来就是 Debezium 的表演时间了!Debezium 是一个开源的 CDC 平台,它能够实时地捕获数据库的变化,并将这些变化以事件的形式发送到 Kafka 中。
Debezium 的核心组件是 Connector,它负责连接到数据库,读取 Binlog,并将数据变化转换为 Kafka 事件。Debezium 支持多种数据库,包括 MySQL、PostgreSQL、MongoDB 等等。
咱们今天的主角是 MySQL Connector,它能够读取 MySQL 的 Binlog,并将数据变化转换为 Kafka 事件。
第三幕:Debezium 的安装与配置
首先,你需要下载 Debezium 的 MySQL Connector 的插件。可以从 Debezium 官网下载,也可以使用 Maven 或 Gradle 来管理依赖。
其次,你需要将 Connector 插件安装到 Kafka Connect 中。Kafka Connect 是 Kafka 提供的一个用于连接外部系统的框架。
假设你已经安装了 Kafka 和 Kafka Connect,并且 Debezium Connector 插件已经安装到 Kafka Connect 的插件目录中。接下来,你需要创建一个 Connector 配置,告诉 Kafka Connect 如何连接到 MySQL 数据库,以及如何处理数据变化。
这是一个 Connector 配置的例子 (JSON 格式):
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "your_mysql_host",
"database.port": "3306",
"database.user": "your_mysql_user",
"database.password": "your_mysql_password",
"database.server.id": "85744",
"database.server.name": "inventory",
"database.include.list": "inventory",
"table.include.list": "inventory.products",
"database.history.kafka.bootstrap.servers": "your_kafka_brokers",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
让我们来解读一下这个配置:
connector.class
: 指定 Connector 的类名,这里是io.debezium.connector.mysql.MySqlConnector
,表示使用 MySQL Connector。tasks.max
: 指定 Connector 的最大任务数,这里是1
,表示只使用一个任务。database.hostname
: 指定 MySQL 服务器的主机名。database.port
: 指定 MySQL 服务器的端口号。database.user
: 指定连接 MySQL 服务器的用户名。database.password
: 指定连接 MySQL 服务器的密码。database.server.id
: 指定 MySQL 服务器的唯一 ID,必须与 MySQL 配置文件中的server_id
保持一致。database.server.name
: 指定 MySQL 服务器的逻辑名称,这个名称会作为 Kafka Topic 的前缀。database.include.list
: 指定需要捕获的数据库列表,这里是inventory
,表示只捕获inventory
数据库的变化。table.include.list
: 指定需要捕获的表列表,这里是inventory.products
,表示只捕获inventory
数据库的products
表的变化。database.history.kafka.bootstrap.servers
: 指定 Kafka Broker 的地址。database.history.kafka.topic
: 指定用于存储数据库 Schema 历史信息的 Kafka Topic。include.schema.changes
: 指定是否将 Schema 变化也发送到 Kafka 中。key.converter
和value.converter
: 指定 Kafka Key 和 Value 的序列化和反序列化方式。这里使用JsonConverter
,表示使用 JSON 格式。key.converter.schemas.enable
和value.converter.schemas.enable
: 指定是否在 JSON 中包含 Schema 信息。
将这个配置文件保存为 inventory-connector.json
,然后使用 Kafka Connect 的 REST API 创建 Connector:
curl -X POST -H "Content-Type: application/json"
-d @inventory-connector.json
http://your_kafka_connect_host:8083/connectors
如果一切顺利,Kafka Connect 就会启动 Debezium MySQL Connector,开始监听 MySQL 的 Binlog,并将数据变化发送到 Kafka 中。
第四幕:Kafka 事件的结构
Debezium 发送到 Kafka 中的事件是 JSON 格式的,它包含以下几个部分:
schema
: 描述事件的 Schema 信息。payload
: 包含实际的数据变化。
对于 INSERT 事件,payload
包含 before
和 after
两个字段。before
字段为空,after
字段包含插入后的数据。
对于 UPDATE 事件,payload
也包含 before
和 after
两个字段。before
字段包含更新前的数据,after
字段包含更新后的数据。
对于 DELETE 事件,payload
包含 before
和 after
两个字段。before
字段包含删除前的数据,after
字段为空。
这是一个 INSERT 事件的例子:
{
"schema": {
"type": "struct",
"fields": [
// ... Schema 信息 ...
],
"optional": false,
"name": "inventory.inventory.products.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 101,
"name": "Amazing Product",
"description": "This is an amazing product.",
"weight": 1.23
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "inventory",
"ts_ms": 1678886400000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "products",
"server_id": 85744,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154
},
"op": "c",
"ts_ms": 1678886400000,
"transaction": null
}
}
before
:null
,因为这是一个 INSERT 事件。after
: 包含插入后的数据,包括id
、name
、description
和weight
字段。source
: 包含事件的来源信息,包括 Debezium 版本、Connector 类型、数据库名称、表名称等等。op
: 表示操作类型,c
表示 CREATE (INSERT)。ts_ms
: 表示事件发生的时间戳。
第五幕:事件驱动架构的威力
有了 Kafka 中的数据变化事件,我们就可以构建事件驱动架构 (Event-Driven Architecture, EDA) 了。
EDA 是一种软件架构模式,它基于事件的发布和订阅来实现系统之间的解耦。在 EDA 中,系统组件通过发布事件来通知其他组件发生了什么事情,而其他组件可以订阅这些事件,并根据事件的内容做出相应的处理。
使用 Debezium 和 Kafka 构建 EDA 的流程如下:
- MySQL 数据库发生数据变化。
- Debezium MySQL Connector 捕获这些变化,并将它们转换为 Kafka 事件。
- Kafka 将这些事件存储在 Kafka Topic 中。
- 其他系统组件订阅 Kafka Topic,接收数据变化事件。
- 系统组件根据事件的内容做出相应的处理,例如更新缓存、发送通知、触发业务逻辑等等。
这种架构的优点是:
- 实时性: 数据变化可以实时地传播到其他系统组件。
- 解耦性: 系统组件之间通过事件进行通信,不需要直接依赖彼此。
- 可扩展性: 可以很容易地添加或删除系统组件,而不会影响其他组件的运行。
第六幕:代码实战:消费 Kafka 事件
为了更好地理解如何使用 Kafka 事件,咱们来写一个简单的 Java 程序,消费 Kafka Topic 中的事件,并将它们打印到控制台上。
首先,你需要添加 Kafka Consumer 的依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
然后,你可以编写以下代码:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
// 1. 配置 Kafka Consumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your_kafka_brokers");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 2. 创建 Kafka Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3. 订阅 Kafka Topic
consumer.subscribe(Collections.singletonList("inventory.inventory.products"));
// 4. 循环消费 Kafka 事件
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
这段代码做了以下几件事:
- 配置 Kafka Consumer,包括 Kafka Broker 的地址、Consumer Group 的 ID、Key 和 Value 的反序列化方式等等。
- 创建 Kafka Consumer。
- 订阅 Kafka Topic,这里订阅的是
inventory.inventory.products
Topic,它包含了inventory
数据库的products
表的数据变化事件。 - 循环消费 Kafka 事件,并将每个事件的内容打印到控制台上。
运行这段代码,你就可以看到 Kafka Topic 中的数据变化事件了。
第七幕:Debezium 的高级特性
Debezium 除了基本的 CDC 功能之外,还提供了一些高级特性,例如:
- Schema Evolution: 自动处理数据库 Schema 的变化。当数据库的 Schema 发生变化时,Debezium 会自动更新 Kafka 事件的 Schema,并通知消费者。
- Snapshotting: 在 Connector 启动时,可以创建一个数据库的快照,并将快照数据发送到 Kafka 中。这可以用于初始化下游系统的数据。
- Filtering and Transformation: 可以对数据变化事件进行过滤和转换,例如只捕获某些特定的列,或者将数据转换为不同的格式。
- Heartbeat Monitoring: 可以定期发送心跳消息到 Kafka 中,用于监控 Connector 的运行状态。
第八幕:总结与展望
今天咱们一起学习了 MySQL 和 Kafka 的 CDC 技术,以及 Debezium 的使用。Debezium 是一个强大的 CDC 平台,它可以帮助你实时地捕获数据库的变化,并将这些变化以事件的形式发送到 Kafka 中。
使用 Debezium 和 Kafka 构建事件驱动架构,可以实现系统之间的解耦,提高系统的实时性和可扩展性。
当然,Debezium 还有很多其他的特性和用法,等待你去探索和发现。希望今天的讲座能够帮助你入门 CDC 技术,并在实际项目中应用它。
表格总结:Debezium 配置参数
参数名 | 描述 |
---|---|
connector.class |
指定 Connector 的类名 |
tasks.max |
指定 Connector 的最大任务数 |
database.hostname |
指定 MySQL 服务器的主机名 |
database.port |
指定 MySQL 服务器的端口号 |
database.user |
指定连接 MySQL 服务器的用户名 |
database.password |
指定连接 MySQL 服务器的密码 |
database.server.id |
指定 MySQL 服务器的唯一 ID |
database.server.name |
指定 MySQL 服务器的逻辑名称,会作为 Kafka Topic 的前缀 |
database.include.list |
指定需要捕获的数据库列表 |
table.include.list |
指定需要捕获的表列表 |
database.history.kafka.bootstrap.servers |
指定 Kafka Broker 的地址 |
database.history.kafka.topic |
指定用于存储数据库 Schema 历史信息的 Kafka Topic |
include.schema.changes |
指定是否将 Schema 变化也发送到 Kafka 中 |
key.converter |
指定 Kafka Key 的序列化和反序列化方式 |
value.converter |
指定 Kafka Value 的序列化和反序列化方式 |
key.converter.schemas.enable |
指定是否在 Kafka Key 的 JSON 中包含 Schema 信息 |
value.converter.schemas.enable |
指定是否在 Kafka Value 的 JSON 中包含 Schema 信息 |
好啦,今天的讲座就到这里,希望大家有所收获!下次再见!