MySQL Binlog 事件流:数据变更订阅与消息队列推送
各位同学,大家好!今天我们来聊聊MySQL Binlog,以及如何利用Binlog事件流实现数据变更订阅并将其推送到消息队列,从而构建实时性更高、响应更快的系统。
Binlog,全称Binary Log,是MySQL数据库用于记录所有更改数据的语句的二进制文件。它主要用于数据库的复制、恢复和审计。但今天,我们关注的是它作为数据变更事件源的能力。通过解析Binlog,我们可以捕获数据库的每一次增、删、改操作,并将其转化为事件,进而进行实时处理。
一、Binlog 的基本概念与配置
在深入代码之前,我们需要了解一些 Binlog 的基本概念,并进行必要的配置。
- Binlog 的作用:
- 数据复制: MySQL 主从复制依赖 Binlog。
- 数据恢复: 通过 Binlog 可以进行时间点恢复。
- 审计: 记录数据库的所有变更操作。
- Binlog 的格式:
- Statement: 记录SQL语句。 缺点:某些语句(如包含
UUID()
、NOW()
等函数的语句)在主从服务器上执行结果可能不一致。 - Row: 记录每一行数据的变更。 优点:数据一致性高。 缺点:日志量大。
- Mixed: 混合模式,MySQL会根据语句选择Statement或Row模式。
- Statement: 记录SQL语句。 缺点:某些语句(如包含
- Binlog 的配置:
要启用 Binlog,需要在 MySQL 的配置文件(通常是 my.cnf
或 my.ini
)中进行如下配置:
[mysqld]
log-bin=mysql-bin # 启用 Binlog,并指定 Binlog 文件的前缀
binlog_format=ROW # 设置 Binlog 的格式为 Row
server_id=1 # 设置服务器的唯一 ID,主从服务器 ID 不能重复
#expire_logs_days=7 # 设置 Binlog 的过期时间(可选)
#binlog_do_db=your_database_name # 只记录指定数据库的 Binlog(可选)
配置完成后,需要重启 MySQL 服务才能生效。
- 查看 Binlog 配置:
可以通过以下 SQL 语句查看 Binlog 的配置:
SHOW VARIABLES LIKE 'log_bin%';
SHOW VARIABLES LIKE 'binlog_format%';
SHOW VARIABLES LIKE 'server_id%';
二、Binlog 事件流的读取与解析
接下来,我们需要读取并解析 Binlog 事件流。这里我们使用 Java 语言,并借助 Canal 开源组件来实现。Canal 是阿里巴巴开源的基于数据库增量日志解析的服务,它可以模拟 MySQL slave 的交互协议,伪装成 MySQL slave,向 MySQL master 请求 dump 协议,然后解析 Binlog。
- 引入 Canal 依赖:
在 Maven 项目中,添加 Canal 的客户端依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version> <!-- 使用最新版本 -->
</dependency>
- 编写 Canal 客户端代码:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) {
// Canal Server 地址
String address = "127.0.0.1";
int port = 11111; //Canal Server监听端口
String destination = "example"; // Canal Server 配置的 destination
// 创建 Canal 连接器
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(address, port), destination, "", "");
try {
// 连接 Canal Server
connector.connect();
// 订阅所有数据库和表
connector.subscribe(".*\..*"); //正则表达式,订阅所有库的所有表
// 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的 entries,batchSize 代表批量处理的条数
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// 处理中断异常
}
} else {
printEntry(message.getEntries());
}
// 提交确认,消费成功后调用
connector.ack(batchId);
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else { // UPDATE
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
- Canal Server 的配置:
Canal Client 需要连接 Canal Server 才能获取 Binlog 事件。需要下载 Canal Server 的安装包,并进行配置。 Canal Server 的配置文件通常位于 conf/example/canal.properties
。 需要修改以下配置:
canal.instance.master.address=127.0.0.1:3306 # MySQL 地址
canal.instance.dbUsername=canal # MySQL 用户名
canal.instance.dbPassword=canal # MySQL 密码
canal.instance.connectionCharset=UTF-8
canal.instance.tsdb.enable=false # 是否启用时间序列数据库,这里设置为 false
还需要在 conf/example/instance.properties
中配置需要监听的数据库和表:
canal.instance.filter.regex=your_database_name\..* # 监听的数据库和表,使用正则表达式
重要提示:
- 确保 Canal Server 能够连接到 MySQL 数据库。
- 在 MySQL 中创建 Canal Server 连接所需的账户并赋予相应的权限。通常需要
REPLICATION SLAVE
和REPLICATION CLIENT
权限。 - Canal Server 的运行需要 Zookeeper 的支持,确保 Zookeeper 正常运行。
- 运行 Canal Client 和 Server:
先启动 Canal Server,然后再运行 Canal Client。 Canal Client 会连接到 Canal Server,并开始接收 Binlog 事件。
三、数据变更事件的解析与转换
Canal Client 接收到的是 CanalEntry 格式的事件,我们需要将其解析成更易于处理的数据结构,例如 JSON。
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class EventParser {
public static String parse(CanalEntry.Entry entry) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
return null;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
String databaseName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
return buildJson(databaseName, tableName, eventType.toString(), convertColumns(rowData.getBeforeColumnsList()));
} else if (eventType == CanalEntry.EventType.INSERT) {
return buildJson(databaseName, tableName, eventType.toString(), convertColumns(rowData.getAfterColumnsList()));
} else if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, Object> before = convertColumns(rowData.getBeforeColumnsList());
Map<String, Object> after = convertColumns(rowData.getAfterColumnsList());
Map<String, Object> data = new HashMap<>();
data.put("before", before);
data.put("after", after);
return buildJson(databaseName, tableName, eventType.toString(), data);
}
}
return null;
}
private static Map<String, Object> convertColumns(List<CanalEntry.Column> columns) {
Map<String, Object> data = new HashMap<>();
for (CanalEntry.Column column : columns) {
data.put(column.getName(), column.getValue());
}
return data;
}
private static String buildJson(String databaseName, String tableName, String eventType, Object data) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("database", databaseName);
jsonObject.put("table", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("data", data);
return jsonObject.toJSONString();
}
}
在 Canal Client 的 printEntry
方法中,调用 EventParser.parse(entry)
方法,将 CanalEntry 转换为 JSON 字符串。
四、将事件推送到消息队列
现在,我们已经将 Binlog 事件解析成了 JSON 格式,接下来需要将这些事件推送到消息队列。这里我们以 Kafka 为例。
- 引入 Kafka 依赖:
在 Maven 项目中,添加 Kafka 的客户端依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version> <!-- 使用最新版本 -->
</dependency>
- 编写 Kafka 生产者代码:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerService {
private final String topic;
private final KafkaProducer<String, String> producer;
public KafkaProducerService(String topic, String bootstrapServers) {
this.topic = topic;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 可选配置,提高吞吐量
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 允许少量数据丢失,性能更高
props.put(ProducerConfig.LINGER_MS_CONFIG, "20"); // 批量发送,提高吞吐量
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4); // 16KB batch size
this.producer = new KafkaProducer<>(props);
}
public void sendMessage(String message) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
// 异步发送消息
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送消息失败: " + exception.getMessage());
} else {
System.out.println("发送消息成功,Offset: " + metadata.offset());
}
});
//producer.flush(); //确保所有消息发送到Kafka,性能差,生产环境不建议使用。
}
public void close() {
producer.close();
}
}
- 在 Canal Client 中使用 Kafka 生产者:
在 Canal Client 的 printEntry
方法中,获取到 JSON 格式的事件后,将其发送到 Kafka 消息队列。
// ... (其他代码)
String eventJson = EventParser.parse(entry);
if (eventJson != null) {
try {
kafkaProducer.sendMessage(eventJson);
} catch (Exception e) {
System.err.println("Failed to send message to Kafka: " + e.getMessage());
}
}
// ... (其他代码)
在使用 Kafka 生产者之前,需要先创建 KafkaProducerService 的实例,并配置 Kafka 的地址和 Topic 名称。
private static KafkaProducerService kafkaProducer;
public static void main(String[] args) {
// ... (其他代码)
String kafkaBootstrapServers = "127.0.0.1:9092"; // Kafka Broker 地址
String kafkaTopic = "mysql_binlog_events"; // Kafka Topic 名称
kafkaProducer = new KafkaProducerService(kafkaTopic, kafkaBootstrapServers);
// ... (其他代码)
}
最后,在程序结束时,关闭 Kafka 生产者。
finally {
connector.disconnect();
if(kafkaProducer != null){
kafkaProducer.close();
}
}
五、代码示例与逻辑解释
下面是一个完整的示例,展示了如何使用 Canal Client 读取 Binlog 事件,并将其推送到 Kafka 消息队列:
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import java.util.concurrent.ExecutionException;
public class Main {
private static KafkaProducerService kafkaProducer;
public static void main(String[] args) {
// Canal Server 地址
String address = "127.0.0.1";
int port = 11111;
String destination = "example";
String kafkaBootstrapServers = "127.0.0.1:9092";
String kafkaTopic = "mysql_binlog_events";
kafkaProducer = new KafkaProducerService(kafkaTopic, kafkaBootstrapServers);
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(address, port), destination, "", "");
try {
connector.connect();
connector.subscribe(".*\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} finally {
connector.disconnect();
if(kafkaProducer != null){
kafkaProducer.close();
}
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
String eventJson = EventParser.parse(entry);
if (eventJson != null) {
try {
kafkaProducer.sendMessage(eventJson);
System.out.println("Sent message to Kafka: " + eventJson);
} catch (Exception e) {
System.err.println("Failed to send message to Kafka: " + e.getMessage());
}
}
}
}
static class EventParser {
public static String parse(CanalEntry.Entry entry) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
return null;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
String databaseName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
return buildJson(databaseName, tableName, eventType.toString(), convertColumns(rowData.getBeforeColumnsList()));
} else if (eventType == CanalEntry.EventType.INSERT) {
return buildJson(databaseName, tableName, eventType.toString(), convertColumns(rowData.getAfterColumnsList()));
} else if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, Object> before = convertColumns(rowData.getBeforeColumnsList());
Map<String, Object> after = convertColumns(rowData.getAfterColumnsList());
Map<String, Object> data = new HashMap<>();
data.put("before", before);
data.put("after", after);
return buildJson(databaseName, tableName, eventType.toString(), data);
}
}
return null;
}
private static Map<String, Object> convertColumns(List<CanalEntry.Column> columns) {
Map<String, Object> data = new HashMap<>();
for (CanalEntry.Column column : columns) {
data.put(column.getName(), column.getValue());
}
return data;
}
private static String buildJson(String databaseName, String tableName, String eventType, Object data) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("database", databaseName);
jsonObject.put("table", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("data", data);
return jsonObject.toJSONString();
}
}
static class KafkaProducerService {
private final String topic;
private final KafkaProducer<String, String> producer;
public KafkaProducerService(String topic, String bootstrapServers) {
this.topic = topic;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, "20");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4);
this.producer = new KafkaProducer<>(props);
}
public void sendMessage(String message) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送消息失败: " + exception.getMessage());
} else {
System.out.println("发送消息成功,Offset: " + metadata.offset());
}
});
}
public void close() {
producer.close();
}
}
}
逻辑解释:
- Canal Client 连接 Canal Server: 通过 CanalConnector 连接到 Canal Server,并订阅指定的数据库和表。
- 接收 Binlog 事件: Canal Client 循环接收 Binlog 事件,并进行处理。
- 解析 Binlog 事件: 使用 EventParser 将 CanalEntry 格式的事件解析成 JSON 格式。
- 推送消息到 Kafka: 使用 KafkaProducerService 将 JSON 格式的事件推送到 Kafka 消息队列。
- 异常处理: 对可能发生的异常进行捕获和处理,例如连接失败、解析错误、发送失败等。
- 资源释放: 在程序结束时,关闭 Canal 连接和 Kafka 生产者,释放资源。
六、可能遇到的问题与解决方案
- Canal Server 连接失败: 检查 Canal Server 的配置是否正确,例如 MySQL 地址、用户名、密码等。 确保 MySQL 已经开启 Binlog,并且 Canal Server 拥有足够的权限。
- Canal Client 无法接收到 Binlog 事件: 检查 Canal Client 订阅的数据库和表是否正确。 检查 MySQL 的 Binlog 是否有更新。
- Kafka 消息发送失败: 检查 Kafka Broker 的地址是否正确。 检查 Kafka Topic 是否存在。 检查 Kafka 生产者是否有足够的权限。
- 数据丢失: 调整 Kafka 的
acks
参数,以提高数据可靠性。 使用 Canal 的rollback
和ack
机制,确保消息被正确处理。 - 性能问题: 增加 Canal Client 的批量处理大小。 调整 Kafka 的生产者参数,例如
linger.ms
和batch.size
,以提高吞吐量。
七、更进一步:数据一致性、容错与监控
- 数据一致性: Canal 提供了多种一致性保证级别,可以根据实际需求进行选择。
- 容错: 可以使用 Canal 集群,提高 Canal 服务的可用性。 可以使用 Kafka 的副本机制,提高 Kafka 消息队列的可靠性。
- 监控: 监控 Canal Server 和 Kafka Broker 的运行状态,及时发现并解决问题。 可以使用 Prometheus 和 Grafana 等工具进行监控。
八、总结
今天我们学习了如何利用MySQL Binlog事件流实现数据变更订阅并将其推送到消息队列。 主要步骤包括:配置Binlog、使用Canal读取和解析Binlog事件、将事件转换为JSON格式、使用Kafka将事件推送到消息队列。 这将允许构建响应迅速,高度可扩展的实时数据管道,为各种用例(如缓存更新、搜索索引、数据分析等)提供动力。 这种架构可以构建更实时、更具响应性的应用程序。