大家好,我是老王,今天咱来聊聊如何搞一个基于binlog
的异构数据同步系统。这玩意儿听起来高大上,其实说白了,就是把MySQL数据库里发生的变化,实时或者准实时地同步到另一个不同类型的数据库或者系统里。
为什么需要异构数据同步?
首先,咱们得搞清楚为啥需要这玩意儿。你想啊,现在谁家还没几个数据库啊?MySQL搞业务,Redis搞缓存,Elasticsearch搞搜索,MongoDB搞文档存储…数据分散在各个地方,想要做个报表、搞个分析,或者做一个跨库的业务功能,那不得累死个人?
所以,我们需要一个“搬运工”,把数据从一个地方搬到另一个地方,最好是自动的、实时的。这样,我们就能在不同的系统里用上最新的数据,发挥各自的优势。
举个例子:
- 业务系统(MySQL) -> 搜索系统(Elasticsearch): 用户在业务系统里修改了商品信息,同步到Elasticsearch后,用户就能立即搜到最新的商品信息。
- 业务系统(MySQL) -> 数据仓库(Hive/ClickHouse): 将业务数据同步到数据仓库,方便做数据分析和报表。
- MySQL -> MongoDB: 将MySQL中的结构化数据转换为MongoDB的文档数据,方便某些特定场景的应用。
binlog
:我们的数据来源
要实现数据同步,最关键的就是要知道数据发生了什么变化。MySQL的binlog
就是干这个的。binlog
记录了数据库的所有更改操作,包括增删改查(增删改需要开启binlog_format=ROW
)。
简单来说,binlog
就像是MySQL的“日记本”,它把所有发生的事情都记录下来了。我们只需要读取这个“日记本”,就能知道数据库里发生了什么。
异构数据同步系统的架构
一个典型的基于binlog
的异构数据同步系统,一般包含以下几个组件:
binlog
监听器 (Binlog Listener): 负责连接MySQL数据库,读取binlog
,并将binlog
事件解析成结构化的数据。- 数据转换器 (Data Transformer): 负责将解析后的数据转换成目标数据库可以接受的格式。例如,将MySQL的数据类型转换成Elasticsearch的数据类型。
- 数据写入器 (Data Writer): 负责将转换后的数据写入到目标数据库。
- 配置中心 (Configuration Center): 管理整个系统的配置,例如MySQL的连接信息、目标数据库的连接信息、数据转换规则等。
- 监控系统 (Monitoring System): 监控整个系统的运行状态,例如
binlog
的读取进度、数据同步的延迟、错误日志等。
可以用下面的表格描述:
组件名称 | 职责 | 技术选型建议 |
---|---|---|
binlog 监听器 |
连接MySQL,读取并解析binlog 事件。 |
Canal (阿里巴巴开源),Debezium |
数据转换器 | 将解析后的数据转换成目标数据库可接受的格式。 | 自定义代码 (Java, Python, Go),ETL工具 (Kettle, DataX) |
数据写入器 | 将转换后的数据写入到目标数据库。 | JDBC, 各数据库的官方驱动,ORM框架 (MyBatis, Hibernate),Bulk API (Elasticsearch) |
配置中心 | 管理系统配置,例如数据库连接信息,数据转换规则。 | ZooKeeper, etcd, Consul, Apollo |
监控系统 | 监控系统运行状态,例如binlog 读取进度,数据同步延迟。 |
Prometheus + Grafana, ELK (Elasticsearch, Logstash, Kibana), 自定义监控脚本 |
动手实现一个简单的异构数据同步系统 (MySQL -> Elasticsearch)
为了方便理解,咱们用Java语言,手撸一个最简单的异构数据同步系统,实现MySQL到Elasticsearch的数据同步。
1. 引入依赖
首先,在pom.xml
文件中引入相关的依赖:
<dependencies>
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.25.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
</dependencies>
这里用到了:
mysql-binlog-connector-java
: 用于连接MySQL并读取binlog
。elasticsearch-rest-high-level-client
: 用于连接Elasticsearch并写入数据。slf4j-api
和slf4j-simple
: 用于日志记录。jackson-databind
: 用于JSON序列化和反序列化。
2. binlog
监听器 (Binlog Listener)
创建一个BinlogListener
类,负责连接MySQL并读取binlog
:
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class BinlogListener {
private static final Logger logger = LoggerFactory.getLogger(BinlogListener.class);
private final String host;
private final int port;
private final String username;
private final String password;
private final String database;
private final DataWriter dataWriter;
private final Map<Long, String> tableMap = new HashMap<>();
public BinlogListener(String host, int port, String username, String password, String database, DataWriter dataWriter) {
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.database = database;
this.dataWriter = dataWriter;
}
public void start() throws IOException {
BinaryLogClient client = new BinaryLogClient(host, port, username, password);
client.setServerId(123456); // 设置一个唯一的server id
client.registerEventListener(event -> {
EventType eventType = event.getHeader().getEventType();
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableMapEventData = event.getData();
tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getTable());
}
else if (eventType == EventType.WRITE_ROWS) {
WriteRowsEventData writeRowsEventData = event.getData();
String tableName = tableMap.get(writeRowsEventData.getTableId());
if(database.equals("your_database_name") && "your_table_name".equals(tableName)) { // 只监听特定的数据库和表
writeRowsEventData.getRows().forEach(row -> {
try {
dataWriter.write("your_index_name", row);
} catch (IOException e) {
logger.error("Error writing data to Elasticsearch", e);
}
});
}
}
else if (eventType == EventType.UPDATE_ROWS) {
UpdateRowsEventData updateRowsEventData = event.getData();
String tableName = tableMap.get(updateRowsEventData.getTableId());
if(database.equals("your_database_name") && "your_table_name".equals(tableName)) { // 只监听特定的数据库和表
updateRowsEventData.getRows().forEach(row -> {
try {
dataWriter.update("your_index_name", row.getValue().get(1)); // 假设id在第二个位置
} catch (IOException e) {
logger.error("Error updating data to Elasticsearch", e);
}
});
}
}
else if (eventType == EventType.DELETE_ROWS) {
DeleteRowsEventData deleteRowsEventData = event.getData();
String tableName = tableMap.get(deleteRowsEventData.getTableId());
if(database.equals("your_database_name") && "your_table_name".equals(tableName)) { // 只监听特定的数据库和表
deleteRowsEventData.getRows().forEach(row -> {
try {
dataWriter.delete("your_index_name", row);
} catch (IOException e) {
logger.error("Error deleting data to Elasticsearch", e);
}
});
}
}
});
try {
client.connect();
logger.info("Connected to MySQL binlog");
} catch (IOException e) {
logger.error("Error connecting to MySQL binlog", e);
throw e;
}
}
public static void main(String[] args) throws IOException {
// 替换成你的MySQL配置
String host = "localhost";
int port = 3306;
String username = "root";
String password = "password";
String database = "your_database_name";
// 初始化DataWriter
DataWriter dataWriter = new DataWriter("localhost", 9200); // 替换成你的Elasticsearch配置
dataWriter.init();
// 启动Binlog监听器
BinlogListener binlogListener = new BinlogListener(host, port, username, password, database, dataWriter);
binlogListener.start();
}
}
这个类做了以下几件事:
- 连接MySQL数据库。
- 注册一个事件监听器,监听
binlog
事件。 - 根据事件类型,判断是否是增删改操作。
- 如果是增删改操作,将数据发送给
DataWriter
进行处理。 tableMap
记录了tableId和tableName的映射关系,方便我们根据tableId找到对应的表名。- 重要: 请替换
your_database_name
和your_table_name
以及Elasticsearch的配置为你自己的实际配置。
3. 数据写入器 (Data Writer)
创建一个DataWriter
类,负责将数据写入到Elasticsearch:
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
public class DataWriter {
private static final Logger logger = LoggerFactory.getLogger(DataWriter.class);
private final String host;
private final int port;
private RestHighLevelClient client;
private final ObjectMapper objectMapper = new ObjectMapper();
public DataWriter(String host, int port) {
this.host = host;
this.port = port;
}
public void init() {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(host, port, "http")));
}
public void write(String index, List<?> row) throws IOException {
try {
// 将List转换为Map,你需要根据你的表结构进行映射
// 这里只是一个简单的示例,你需要根据你的实际情况进行修改
Map<String, Object> data = convertListToMap(row);
IndexRequest request = new IndexRequest(index)
.source(objectMapper.writeValueAsString(data), XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
logger.info("Wrote data to Elasticsearch index: {}", index);
} catch (IOException e) {
logger.error("Error writing data to Elasticsearch", e);
throw e;
}
}
public void update(String index, Object id) throws IOException {
try {
// 根据ID从数据库查询最新的数据,这里只是一个占位符,你需要根据你的实际情况进行实现
Map<String, Object> data = fetchLatestDataFromDatabase(id);
UpdateRequest request = new UpdateRequest(index, id.toString())
.doc(objectMapper.writeValueAsString(data), XContentType.JSON);
client.update(request, RequestOptions.DEFAULT);
logger.info("Updated data to Elasticsearch index: {}, id: {}", index, id);
} catch (IOException e) {
logger.error("Error updating data to Elasticsearch", e);
throw e;
}
}
public void delete(String index, List<?> row) throws IOException {
try {
// 从List中提取ID,你需要根据你的表结构进行映射
// 这里只是一个简单的示例,你需要根据你的实际情况进行修改
Object id = extractIdFromList(row);
DeleteRequest request = new DeleteRequest(index, id.toString());
client.delete(request, RequestOptions.DEFAULT);
logger.info("Deleted data from Elasticsearch index: {}, id: {}", index, id);
} catch (IOException e) {
logger.error("Error deleting data from Elasticsearch", e);
throw e;
}
}
private Map<String, Object> convertListToMap(List<?> row) {
// 这里需要根据你的表结构进行转换
// 这是一个简单的示例,假设你的表有三个字段:id, name, age
// 你需要根据你的实际情况进行修改
Map<String, Object> data = new java.util.HashMap<>();
data.put("id", row.get(0));
data.put("name", row.get(1));
data.put("age", row.get(2));
return data;
}
private Object extractIdFromList(List<?> row) {
// 这里需要根据你的表结构提取ID
// 这是一个简单的示例,假设ID在第一个位置
// 你需要根据你的实际情况进行修改
return row.get(0);
}
private Map<String, Object> fetchLatestDataFromDatabase(Object id) {
// 在这里实现从数据库查询最新数据的逻辑
// 这是一个占位符,你需要根据你的实际情况进行实现
// 例如,你可以使用JDBC连接数据库,执行SQL查询
Map<String, Object> data = new java.util.HashMap<>();
data.put("id", id);
data.put("name", "Updated Name");
data.put("age", 99);
return data;
}
public void close() throws IOException {
if (client != null) {
client.close();
}
}
}
这个类做了以下几件事:
- 连接Elasticsearch。
- 将数据转换成Elasticsearch可以接受的JSON格式。
- 将数据写入到Elasticsearch。
- 重要:
convertListToMap
、extractIdFromList
和fetchLatestDataFromDatabase
方法是占位符,需要根据你的实际表结构和业务逻辑进行修改。
4. 运行程序
- 确保你的MySQL开启了
binlog
,并且binlog_format
设置为ROW
。 - 确保你的Elasticsearch已经启动。
- 运行
BinlogListener
的main
方法。 - 在MySQL数据库里对
your_table_name
表进行增删改操作。 - 观察Elasticsearch,看看数据是否同步过来了。
进阶:更完善的异构数据同步系统
上面的例子只是一个最简单的原型,实际生产环境的异构数据同步系统要复杂得多。我们需要考虑以下几个方面:
- 数据转换规则: 不同的数据库数据类型不一样,我们需要定义一套数据转换规则,将MySQL的数据类型转换成目标数据库可以接受的数据类型。例如,MySQL的
VARCHAR
类型可以转换成Elasticsearch的keyword
类型。 - 错误处理: 数据同步过程中可能会出现各种错误,例如网络连接错误、数据转换错误、数据写入错误等。我们需要一套完善的错误处理机制,能够自动重试、记录错误日志、发送告警等。
- 事务一致性: 如果多个表之间存在外键关联,我们需要保证数据同步的事务一致性。例如,如果一个订单包含了订单头和订单行,我们需要保证订单头和订单行要么都同步成功,要么都同步失败。
- 性能优化: 数据同步可能会对MySQL数据库的性能产生影响。我们需要采取一些性能优化措施,例如批量读取
binlog
、并发写入目标数据库、使用缓存等。 - 监控和告警: 我们需要一套完善的监控和告警系统,能够实时监控数据同步的进度、延迟、错误率等,并在出现异常情况时及时发出告警。
- Schema变更处理: 当MySQL的表结构发生变化时,我们需要能够自动检测到这些变化,并更新数据同步的配置和转换规则。
总结
异构数据同步是一个复杂的工程,需要考虑很多细节。但是,只要我们掌握了binlog
的原理,理解了异构数据同步系统的架构,就能一步一步地构建出一个稳定、高效、可靠的数据同步系统。记住,这玩意儿没有银弹,只有不断地学习和实践,才能真正掌握它。
希望这次讲座对大家有所帮助。下次有机会再跟大家聊聊其他技术。谢谢大家!