MySQL的Binlog事件:如何利用binlog事件流实现数据变更订阅并推送到消息队列?

MySQL Binlog 事件流:数据变更订阅与消息队列推送

各位同学,大家好!今天我们来聊聊MySQL Binlog,以及如何利用Binlog事件流实现数据变更订阅并将其推送到消息队列,从而构建实时性更高、响应更快的系统。

Binlog,全称Binary Log,是MySQL数据库用于记录所有更改数据的语句的二进制文件。它主要用于数据库的复制、恢复和审计。但今天,我们关注的是它作为数据变更事件源的能力。通过解析Binlog,我们可以捕获数据库的每一次增、删、改操作,并将其转化为事件,进而进行实时处理。

一、Binlog 的基本概念与配置

在深入代码之前,我们需要了解一些 Binlog 的基本概念,并进行必要的配置。

  • Binlog 的作用:
    • 数据复制: MySQL 主从复制依赖 Binlog。
    • 数据恢复: 通过 Binlog 可以进行时间点恢复。
    • 审计: 记录数据库的所有变更操作。
  • Binlog 的格式:
    • Statement: 记录SQL语句。 缺点:某些语句(如包含UUID()NOW()等函数的语句)在主从服务器上执行结果可能不一致。
    • Row: 记录每一行数据的变更。 优点:数据一致性高。 缺点:日志量大。
    • Mixed: 混合模式,MySQL会根据语句选择Statement或Row模式。
  • Binlog 的配置:

要启用 Binlog,需要在 MySQL 的配置文件(通常是 my.cnfmy.ini)中进行如下配置:

[mysqld]
log-bin=mysql-bin  # 启用 Binlog,并指定 Binlog 文件的前缀
binlog_format=ROW   # 设置 Binlog 的格式为 Row
server_id=1         # 设置服务器的唯一 ID,主从服务器 ID 不能重复
#expire_logs_days=7  # 设置 Binlog 的过期时间(可选)
#binlog_do_db=your_database_name # 只记录指定数据库的 Binlog(可选)

配置完成后,需要重启 MySQL 服务才能生效。

  • 查看 Binlog 配置:

可以通过以下 SQL 语句查看 Binlog 的配置:

SHOW VARIABLES LIKE 'log_bin%';
SHOW VARIABLES LIKE 'binlog_format%';
SHOW VARIABLES LIKE 'server_id%';

二、Binlog 事件流的读取与解析

接下来,我们需要读取并解析 Binlog 事件流。这里我们使用 Java 语言,并借助 Canal 开源组件来实现。Canal 是阿里巴巴开源的基于数据库增量日志解析的服务,它可以模拟 MySQL slave 的交互协议,伪装成 MySQL slave,向 MySQL master 请求 dump 协议,然后解析 Binlog。

  1. 引入 Canal 依赖:

在 Maven 项目中,添加 Canal 的客户端依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version> <!-- 使用最新版本 -->
</dependency>
  1. 编写 Canal 客户端代码:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {

    public static void main(String[] args) {
        // Canal Server 地址
        String address = "127.0.0.1";
        int port = 11111;  //Canal Server监听端口
        String destination = "example"; // Canal Server 配置的 destination

        // 创建 Canal 连接器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(address, port), destination, "", "");

        try {
            // 连接 Canal Server
            connector.connect();
            // 订阅所有数据库和表
            connector.subscribe(".*\..*"); //正则表达式,订阅所有库的所有表
            // 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();

            while (true) {
                // 获取指定数量的 entries,batchSize 代表批量处理的条数
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // 处理中断异常
                    }
                } else {
                    printEntry(message.getEntries());
                }

                // 提交确认,消费成功后调用
                connector.ack(batchId);
            }

        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else { // UPDATE
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}
  1. Canal Server 的配置:

Canal Client 需要连接 Canal Server 才能获取 Binlog 事件。需要下载 Canal Server 的安装包,并进行配置。 Canal Server 的配置文件通常位于 conf/example/canal.properties。 需要修改以下配置:

canal.instance.master.address=127.0.0.1:3306  # MySQL 地址
canal.instance.dbUsername=canal           # MySQL 用户名
canal.instance.dbPassword=canal           # MySQL 密码
canal.instance.connectionCharset=UTF-8
canal.instance.tsdb.enable=false  # 是否启用时间序列数据库,这里设置为 false

还需要在 conf/example/instance.properties 中配置需要监听的数据库和表:

canal.instance.filter.regex=your_database_name\..*  # 监听的数据库和表,使用正则表达式

重要提示:

  • 确保 Canal Server 能够连接到 MySQL 数据库。
  • 在 MySQL 中创建 Canal Server 连接所需的账户并赋予相应的权限。通常需要 REPLICATION SLAVEREPLICATION CLIENT 权限。
  • Canal Server 的运行需要 Zookeeper 的支持,确保 Zookeeper 正常运行。
  1. 运行 Canal Client 和 Server:

先启动 Canal Server,然后再运行 Canal Client。 Canal Client 会连接到 Canal Server,并开始接收 Binlog 事件。

三、数据变更事件的解析与转换

Canal Client 接收到的是 CanalEntry 格式的事件,我们需要将其解析成更易于处理的数据结构,例如 JSON。

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class EventParser {

    public static String parse(CanalEntry.Entry entry) {
        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            return null;
        }

        CanalEntry.RowChange rowChange;
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
        }

        CanalEntry.EventType eventType = rowChange.getEventType();
        String databaseName = entry.getHeader().getSchemaName();
        String tableName = entry.getHeader().getTableName();

        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            if (eventType == CanalEntry.EventType.DELETE) {
                return buildJson(databaseName, tableName, eventType.toString(), convertColumns(rowData.getBeforeColumnsList()));
            } else if (eventType == CanalEntry.EventType.INSERT) {
                return buildJson(databaseName, tableName, eventType.toString(), convertColumns(rowData.getAfterColumnsList()));
            } else if (eventType == CanalEntry.EventType.UPDATE) {
                Map<String, Object> before = convertColumns(rowData.getBeforeColumnsList());
                Map<String, Object> after = convertColumns(rowData.getAfterColumnsList());
                Map<String, Object> data = new HashMap<>();
                data.put("before", before);
                data.put("after", after);
                return buildJson(databaseName, tableName, eventType.toString(), data);
            }
        }
        return null;
    }

    private static Map<String, Object> convertColumns(List<CanalEntry.Column> columns) {
        Map<String, Object> data = new HashMap<>();
        for (CanalEntry.Column column : columns) {
            data.put(column.getName(), column.getValue());
        }
        return data;
    }

    private static String buildJson(String databaseName, String tableName, String eventType, Object data) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("database", databaseName);
        jsonObject.put("table", tableName);
        jsonObject.put("eventType", eventType);
        jsonObject.put("data", data);
        return jsonObject.toJSONString();
    }
}

在 Canal Client 的 printEntry 方法中,调用 EventParser.parse(entry) 方法,将 CanalEntry 转换为 JSON 字符串。

四、将事件推送到消息队列

现在,我们已经将 Binlog 事件解析成了 JSON 格式,接下来需要将这些事件推送到消息队列。这里我们以 Kafka 为例。

  1. 引入 Kafka 依赖:

在 Maven 项目中,添加 Kafka 的客户端依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version> <!-- 使用最新版本 -->
</dependency>
  1. 编写 Kafka 生产者代码:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerService {

    private final String topic;
    private final KafkaProducer<String, String> producer;

    public KafkaProducerService(String topic, String bootstrapServers) {
        this.topic = topic;
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 可选配置,提高吞吐量
        props.put(ProducerConfig.ACKS_CONFIG, "1"); // 允许少量数据丢失,性能更高
        props.put(ProducerConfig.LINGER_MS_CONFIG, "20"); // 批量发送,提高吞吐量
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4); // 16KB batch size
        this.producer = new KafkaProducer<>(props);
    }

    public void sendMessage(String message) throws ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        // 异步发送消息
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("发送消息失败: " + exception.getMessage());
            } else {
                System.out.println("发送消息成功,Offset: " + metadata.offset());
            }
        });

        //producer.flush();  //确保所有消息发送到Kafka,性能差,生产环境不建议使用。
    }

    public void close() {
        producer.close();
    }
}
  1. 在 Canal Client 中使用 Kafka 生产者:

在 Canal Client 的 printEntry 方法中,获取到 JSON 格式的事件后,将其发送到 Kafka 消息队列。

// ... (其他代码)

String eventJson = EventParser.parse(entry);
if (eventJson != null) {
    try {
        kafkaProducer.sendMessage(eventJson);
    } catch (Exception e) {
        System.err.println("Failed to send message to Kafka: " + e.getMessage());
    }
}

// ... (其他代码)

在使用 Kafka 生产者之前,需要先创建 KafkaProducerService 的实例,并配置 Kafka 的地址和 Topic 名称。

private static KafkaProducerService kafkaProducer;

public static void main(String[] args) {
    // ... (其他代码)
    String kafkaBootstrapServers = "127.0.0.1:9092"; // Kafka Broker 地址
    String kafkaTopic = "mysql_binlog_events"; // Kafka Topic 名称
    kafkaProducer = new KafkaProducerService(kafkaTopic, kafkaBootstrapServers);
    // ... (其他代码)
}

最后,在程序结束时,关闭 Kafka 生产者。

finally {
    connector.disconnect();
    if(kafkaProducer != null){
        kafkaProducer.close();
    }
}

五、代码示例与逻辑解释

下面是一个完整的示例,展示了如何使用 Canal Client 读取 Binlog 事件,并将其推送到 Kafka 消息队列:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;

import java.util.concurrent.ExecutionException;

public class Main {

    private static KafkaProducerService kafkaProducer;

    public static void main(String[] args) {
        // Canal Server 地址
        String address = "127.0.0.1";
        int port = 11111;
        String destination = "example";

        String kafkaBootstrapServers = "127.0.0.1:9092";
        String kafkaTopic = "mysql_binlog_events";
        kafkaProducer = new KafkaProducerService(kafkaTopic, kafkaBootstrapServers);

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(address, port), destination, "", "");

        try {
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();

            while (true) {
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    printEntry(message.getEntries());
                }

                connector.ack(batchId);
            }

        } finally {
            connector.disconnect();
            if(kafkaProducer != null){
                kafkaProducer.close();
            }
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            String eventJson = EventParser.parse(entry);
            if (eventJson != null) {
                try {
                    kafkaProducer.sendMessage(eventJson);
                    System.out.println("Sent message to Kafka: " + eventJson);
                } catch (Exception e) {
                    System.err.println("Failed to send message to Kafka: " + e.getMessage());
                }
            }

        }
    }

    static class EventParser {

        public static String parse(CanalEntry.Entry entry) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                return null;
            }

            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            String databaseName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    return buildJson(databaseName, tableName, eventType.toString(), convertColumns(rowData.getBeforeColumnsList()));
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    return buildJson(databaseName, tableName, eventType.toString(), convertColumns(rowData.getAfterColumnsList()));
                } else if (eventType == CanalEntry.EventType.UPDATE) {
                    Map<String, Object> before = convertColumns(rowData.getBeforeColumnsList());
                    Map<String, Object> after = convertColumns(rowData.getAfterColumnsList());
                    Map<String, Object> data = new HashMap<>();
                    data.put("before", before);
                    data.put("after", after);
                    return buildJson(databaseName, tableName, eventType.toString(), data);
                }
            }
            return null;
        }

        private static Map<String, Object> convertColumns(List<CanalEntry.Column> columns) {
            Map<String, Object> data = new HashMap<>();
            for (CanalEntry.Column column : columns) {
                data.put(column.getName(), column.getValue());
            }
            return data;
        }

        private static String buildJson(String databaseName, String tableName, String eventType, Object data) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("database", databaseName);
            jsonObject.put("table", tableName);
            jsonObject.put("eventType", eventType);
            jsonObject.put("data", data);
            return jsonObject.toJSONString();
        }
    }

    static class KafkaProducerService {

        private final String topic;
        private final KafkaProducer<String, String> producer;

        public KafkaProducerService(String topic, String bootstrapServers) {
            this.topic = topic;
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.ACKS_CONFIG, "1");
            props.put(ProducerConfig.LINGER_MS_CONFIG, "20");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4);
            this.producer = new KafkaProducer<>(props);
        }

        public void sendMessage(String message) throws ExecutionException, InterruptedException {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("发送消息失败: " + exception.getMessage());
                } else {
                    System.out.println("发送消息成功,Offset: " + metadata.offset());
                }
            });
        }

        public void close() {
            producer.close();
        }
    }
}

逻辑解释:

  1. Canal Client 连接 Canal Server: 通过 CanalConnector 连接到 Canal Server,并订阅指定的数据库和表。
  2. 接收 Binlog 事件: Canal Client 循环接收 Binlog 事件,并进行处理。
  3. 解析 Binlog 事件: 使用 EventParser 将 CanalEntry 格式的事件解析成 JSON 格式。
  4. 推送消息到 Kafka: 使用 KafkaProducerService 将 JSON 格式的事件推送到 Kafka 消息队列。
  5. 异常处理: 对可能发生的异常进行捕获和处理,例如连接失败、解析错误、发送失败等。
  6. 资源释放: 在程序结束时,关闭 Canal 连接和 Kafka 生产者,释放资源。

六、可能遇到的问题与解决方案

  • Canal Server 连接失败: 检查 Canal Server 的配置是否正确,例如 MySQL 地址、用户名、密码等。 确保 MySQL 已经开启 Binlog,并且 Canal Server 拥有足够的权限。
  • Canal Client 无法接收到 Binlog 事件: 检查 Canal Client 订阅的数据库和表是否正确。 检查 MySQL 的 Binlog 是否有更新。
  • Kafka 消息发送失败: 检查 Kafka Broker 的地址是否正确。 检查 Kafka Topic 是否存在。 检查 Kafka 生产者是否有足够的权限。
  • 数据丢失: 调整 Kafka 的 acks 参数,以提高数据可靠性。 使用 Canal 的 rollbackack 机制,确保消息被正确处理。
  • 性能问题: 增加 Canal Client 的批量处理大小。 调整 Kafka 的生产者参数,例如 linger.msbatch.size,以提高吞吐量。

七、更进一步:数据一致性、容错与监控

  • 数据一致性: Canal 提供了多种一致性保证级别,可以根据实际需求进行选择。
  • 容错: 可以使用 Canal 集群,提高 Canal 服务的可用性。 可以使用 Kafka 的副本机制,提高 Kafka 消息队列的可靠性。
  • 监控: 监控 Canal Server 和 Kafka Broker 的运行状态,及时发现并解决问题。 可以使用 Prometheus 和 Grafana 等工具进行监控。

八、总结

今天我们学习了如何利用MySQL Binlog事件流实现数据变更订阅并将其推送到消息队列。 主要步骤包括:配置Binlog、使用Canal读取和解析Binlog事件、将事件转换为JSON格式、使用Kafka将事件推送到消息队列。 这将允许构建响应迅速,高度可扩展的实时数据管道,为各种用例(如缓存更新、搜索索引、数据分析等)提供动力。 这种架构可以构建更实时、更具响应性的应用程序。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注