MySQL高级讲座篇之:如何设计一个基于`binlog`的异构数据同步系统?

大家好,我是老王,今天咱来聊聊如何搞一个基于binlog的异构数据同步系统。这玩意儿听起来高大上,其实说白了,就是把MySQL数据库里发生的变化,实时或者准实时地同步到另一个不同类型的数据库或者系统里。

为什么需要异构数据同步?

首先,咱们得搞清楚为啥需要这玩意儿。你想啊,现在谁家还没几个数据库啊?MySQL搞业务,Redis搞缓存,Elasticsearch搞搜索,MongoDB搞文档存储…数据分散在各个地方,想要做个报表、搞个分析,或者做一个跨库的业务功能,那不得累死个人?

所以,我们需要一个“搬运工”,把数据从一个地方搬到另一个地方,最好是自动的、实时的。这样,我们就能在不同的系统里用上最新的数据,发挥各自的优势。

举个例子:

  • 业务系统(MySQL) -> 搜索系统(Elasticsearch): 用户在业务系统里修改了商品信息,同步到Elasticsearch后,用户就能立即搜到最新的商品信息。
  • 业务系统(MySQL) -> 数据仓库(Hive/ClickHouse): 将业务数据同步到数据仓库,方便做数据分析和报表。
  • MySQL -> MongoDB: 将MySQL中的结构化数据转换为MongoDB的文档数据,方便某些特定场景的应用。

binlog:我们的数据来源

要实现数据同步,最关键的就是要知道数据发生了什么变化。MySQL的binlog就是干这个的。binlog记录了数据库的所有更改操作,包括增删改查(增删改需要开启binlog_format=ROW)。

简单来说,binlog就像是MySQL的“日记本”,它把所有发生的事情都记录下来了。我们只需要读取这个“日记本”,就能知道数据库里发生了什么。

异构数据同步系统的架构

一个典型的基于binlog的异构数据同步系统,一般包含以下几个组件:

  1. binlog监听器 (Binlog Listener): 负责连接MySQL数据库,读取binlog,并将binlog事件解析成结构化的数据。
  2. 数据转换器 (Data Transformer): 负责将解析后的数据转换成目标数据库可以接受的格式。例如,将MySQL的数据类型转换成Elasticsearch的数据类型。
  3. 数据写入器 (Data Writer): 负责将转换后的数据写入到目标数据库。
  4. 配置中心 (Configuration Center): 管理整个系统的配置,例如MySQL的连接信息、目标数据库的连接信息、数据转换规则等。
  5. 监控系统 (Monitoring System): 监控整个系统的运行状态,例如binlog的读取进度、数据同步的延迟、错误日志等。

可以用下面的表格描述:

组件名称 职责 技术选型建议
binlog监听器 连接MySQL,读取并解析binlog事件。 Canal (阿里巴巴开源),Debezium
数据转换器 将解析后的数据转换成目标数据库可接受的格式。 自定义代码 (Java, Python, Go),ETL工具 (Kettle, DataX)
数据写入器 将转换后的数据写入到目标数据库。 JDBC, 各数据库的官方驱动,ORM框架 (MyBatis, Hibernate),Bulk API (Elasticsearch)
配置中心 管理系统配置,例如数据库连接信息,数据转换规则。 ZooKeeper, etcd, Consul, Apollo
监控系统 监控系统运行状态,例如binlog读取进度,数据同步延迟。 Prometheus + Grafana, ELK (Elasticsearch, Logstash, Kibana), 自定义监控脚本

动手实现一个简单的异构数据同步系统 (MySQL -> Elasticsearch)

为了方便理解,咱们用Java语言,手撸一个最简单的异构数据同步系统,实现MySQL到Elasticsearch的数据同步。

1. 引入依赖

首先,在pom.xml文件中引入相关的依赖:

<dependencies>
    <dependency>
        <groupId>com.github.shyiko</groupId>
        <artifactId>mysql-binlog-connector-java</artifactId>
        <version>0.25.1</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.17.6</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.32</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.32</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.13.0</version>
    </dependency>
</dependencies>

这里用到了:

  • mysql-binlog-connector-java: 用于连接MySQL并读取binlog
  • elasticsearch-rest-high-level-client: 用于连接Elasticsearch并写入数据。
  • slf4j-apislf4j-simple: 用于日志记录。
  • jackson-databind: 用于JSON序列化和反序列化。

2. binlog监听器 (Binlog Listener)

创建一个BinlogListener类,负责连接MySQL并读取binlog

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class BinlogListener {

    private static final Logger logger = LoggerFactory.getLogger(BinlogListener.class);

    private final String host;
    private final int port;
    private final String username;
    private final String password;
    private final String database;
    private final DataWriter dataWriter;

    private final Map<Long, String> tableMap = new HashMap<>();

    public BinlogListener(String host, int port, String username, String password, String database, DataWriter dataWriter) {
        this.host = host;
        this.port = port;
        this.username = username;
        this.password = password;
        this.database = database;
        this.dataWriter = dataWriter;
    }

    public void start() throws IOException {
        BinaryLogClient client = new BinaryLogClient(host, port, username, password);
        client.setServerId(123456); // 设置一个唯一的server id

        client.registerEventListener(event -> {
            EventType eventType = event.getHeader().getEventType();

            if (eventType == EventType.TABLE_MAP) {
                TableMapEventData tableMapEventData = event.getData();
                tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getTable());
            }
            else if (eventType == EventType.WRITE_ROWS) {
                WriteRowsEventData writeRowsEventData = event.getData();
                String tableName = tableMap.get(writeRowsEventData.getTableId());
                if(database.equals("your_database_name") && "your_table_name".equals(tableName)) { // 只监听特定的数据库和表
                    writeRowsEventData.getRows().forEach(row -> {
                        try {
                            dataWriter.write("your_index_name", row);
                        } catch (IOException e) {
                            logger.error("Error writing data to Elasticsearch", e);
                        }
                    });
                }
            }
            else if (eventType == EventType.UPDATE_ROWS) {
                UpdateRowsEventData updateRowsEventData = event.getData();
                String tableName = tableMap.get(updateRowsEventData.getTableId());
                if(database.equals("your_database_name") && "your_table_name".equals(tableName)) { // 只监听特定的数据库和表
                    updateRowsEventData.getRows().forEach(row -> {
                        try {
                           dataWriter.update("your_index_name", row.getValue().get(1)); // 假设id在第二个位置
                        } catch (IOException e) {
                            logger.error("Error updating data to Elasticsearch", e);
                        }
                    });
                }
            }
            else if (eventType == EventType.DELETE_ROWS) {
                DeleteRowsEventData deleteRowsEventData = event.getData();
                String tableName = tableMap.get(deleteRowsEventData.getTableId());
                if(database.equals("your_database_name") && "your_table_name".equals(tableName)) { // 只监听特定的数据库和表
                    deleteRowsEventData.getRows().forEach(row -> {
                        try {
                            dataWriter.delete("your_index_name", row);
                        } catch (IOException e) {
                            logger.error("Error deleting data to Elasticsearch", e);
                        }
                    });
                }
            }
        });

        try {
            client.connect();
            logger.info("Connected to MySQL binlog");
        } catch (IOException e) {
            logger.error("Error connecting to MySQL binlog", e);
            throw e;
        }
    }

    public static void main(String[] args) throws IOException {
        // 替换成你的MySQL配置
        String host = "localhost";
        int port = 3306;
        String username = "root";
        String password = "password";
        String database = "your_database_name";

        // 初始化DataWriter
        DataWriter dataWriter = new DataWriter("localhost", 9200); // 替换成你的Elasticsearch配置
        dataWriter.init();

        // 启动Binlog监听器
        BinlogListener binlogListener = new BinlogListener(host, port, username, password, database, dataWriter);
        binlogListener.start();
    }
}

这个类做了以下几件事:

  • 连接MySQL数据库。
  • 注册一个事件监听器,监听binlog事件。
  • 根据事件类型,判断是否是增删改操作。
  • 如果是增删改操作,将数据发送给DataWriter进行处理。
  • tableMap记录了tableId和tableName的映射关系,方便我们根据tableId找到对应的表名。
  • 重要: 请替换your_database_nameyour_table_name以及Elasticsearch的配置为你自己的实际配置。

3. 数据写入器 (Data Writer)

创建一个DataWriter类,负责将数据写入到Elasticsearch:

import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;

public class DataWriter {

    private static final Logger logger = LoggerFactory.getLogger(DataWriter.class);

    private final String host;
    private final int port;
    private RestHighLevelClient client;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public DataWriter(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void init() {
        client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost(host, port, "http")));
    }

    public void write(String index, List<?> row) throws IOException {
        try {
            // 将List转换为Map,你需要根据你的表结构进行映射
            // 这里只是一个简单的示例,你需要根据你的实际情况进行修改
            Map<String, Object> data = convertListToMap(row);

            IndexRequest request = new IndexRequest(index)
                    .source(objectMapper.writeValueAsString(data), XContentType.JSON);

            client.index(request, RequestOptions.DEFAULT);
            logger.info("Wrote data to Elasticsearch index: {}", index);
        } catch (IOException e) {
            logger.error("Error writing data to Elasticsearch", e);
            throw e;
        }
    }

    public void update(String index, Object id) throws IOException {
        try {
            // 根据ID从数据库查询最新的数据,这里只是一个占位符,你需要根据你的实际情况进行实现
            Map<String, Object> data = fetchLatestDataFromDatabase(id);

            UpdateRequest request = new UpdateRequest(index, id.toString())
                    .doc(objectMapper.writeValueAsString(data), XContentType.JSON);

            client.update(request, RequestOptions.DEFAULT);
            logger.info("Updated data to Elasticsearch index: {}, id: {}", index, id);
        } catch (IOException e) {
            logger.error("Error updating data to Elasticsearch", e);
            throw e;
        }
    }

    public void delete(String index, List<?> row) throws IOException {
        try {
            // 从List中提取ID,你需要根据你的表结构进行映射
            // 这里只是一个简单的示例,你需要根据你的实际情况进行修改
            Object id = extractIdFromList(row);

            DeleteRequest request = new DeleteRequest(index, id.toString());

            client.delete(request, RequestOptions.DEFAULT);
            logger.info("Deleted data from Elasticsearch index: {}, id: {}", index, id);
        } catch (IOException e) {
            logger.error("Error deleting data from Elasticsearch", e);
            throw e;
        }
    }

    private Map<String, Object> convertListToMap(List<?> row) {
        // 这里需要根据你的表结构进行转换
        // 这是一个简单的示例,假设你的表有三个字段:id, name, age
        // 你需要根据你的实际情况进行修改
        Map<String, Object> data = new java.util.HashMap<>();
        data.put("id", row.get(0));
        data.put("name", row.get(1));
        data.put("age", row.get(2));
        return data;
    }

    private Object extractIdFromList(List<?> row) {
        // 这里需要根据你的表结构提取ID
        // 这是一个简单的示例,假设ID在第一个位置
        // 你需要根据你的实际情况进行修改
        return row.get(0);
    }

    private Map<String, Object> fetchLatestDataFromDatabase(Object id) {
       // 在这里实现从数据库查询最新数据的逻辑
       // 这是一个占位符,你需要根据你的实际情况进行实现
       // 例如,你可以使用JDBC连接数据库,执行SQL查询
        Map<String, Object> data = new java.util.HashMap<>();
        data.put("id", id);
        data.put("name", "Updated Name");
        data.put("age", 99);
        return data;
    }

    public void close() throws IOException {
        if (client != null) {
            client.close();
        }
    }
}

这个类做了以下几件事:

  • 连接Elasticsearch。
  • 将数据转换成Elasticsearch可以接受的JSON格式。
  • 将数据写入到Elasticsearch。
  • 重要: convertListToMapextractIdFromListfetchLatestDataFromDatabase方法是占位符,需要根据你的实际表结构和业务逻辑进行修改。

4. 运行程序

  1. 确保你的MySQL开启了binlog,并且binlog_format设置为ROW
  2. 确保你的Elasticsearch已经启动。
  3. 运行BinlogListenermain方法。
  4. 在MySQL数据库里对your_table_name表进行增删改操作。
  5. 观察Elasticsearch,看看数据是否同步过来了。

进阶:更完善的异构数据同步系统

上面的例子只是一个最简单的原型,实际生产环境的异构数据同步系统要复杂得多。我们需要考虑以下几个方面:

  1. 数据转换规则: 不同的数据库数据类型不一样,我们需要定义一套数据转换规则,将MySQL的数据类型转换成目标数据库可以接受的数据类型。例如,MySQL的VARCHAR类型可以转换成Elasticsearch的keyword类型。
  2. 错误处理: 数据同步过程中可能会出现各种错误,例如网络连接错误、数据转换错误、数据写入错误等。我们需要一套完善的错误处理机制,能够自动重试、记录错误日志、发送告警等。
  3. 事务一致性: 如果多个表之间存在外键关联,我们需要保证数据同步的事务一致性。例如,如果一个订单包含了订单头和订单行,我们需要保证订单头和订单行要么都同步成功,要么都同步失败。
  4. 性能优化: 数据同步可能会对MySQL数据库的性能产生影响。我们需要采取一些性能优化措施,例如批量读取binlog、并发写入目标数据库、使用缓存等。
  5. 监控和告警: 我们需要一套完善的监控和告警系统,能够实时监控数据同步的进度、延迟、错误率等,并在出现异常情况时及时发出告警。
  6. Schema变更处理: 当MySQL的表结构发生变化时,我们需要能够自动检测到这些变化,并更新数据同步的配置和转换规则。

总结

异构数据同步是一个复杂的工程,需要考虑很多细节。但是,只要我们掌握了binlog的原理,理解了异构数据同步系统的架构,就能一步一步地构建出一个稳定、高效、可靠的数据同步系统。记住,这玩意儿没有银弹,只有不断地学习和实践,才能真正掌握它。

希望这次讲座对大家有所帮助。下次有机会再跟大家聊聊其他技术。谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注