各位观众老爷们,晚上好!我是今天的主讲人,江湖人称“代码界的老司机”。今天咱们聊聊MySQL全文搜索那些事儿,以及如何跟Elasticsearch这个“洋玩意儿”搞好关系,实现搜索功能的华丽升级。
第一章:MySQL全文搜索的爱与痛
话说MySQL也算是个老实人,啥活都愿意干。但要说到全文搜索,它就有点力不从心了。
1.1 初识MySQL全文搜索
MySQL 从5.6版本开始支持InnoDB引擎的全文索引(FULLTEXT index),之前只能在MyISAM引擎上用。这玩意儿能让你在TEXT
类型的字段里搜索关键词,听起来是不是很厉害?
1.2 MySQL全文搜索的优点
- 简单易用: 创建和使用全文索引都比较简单,SQL语句就能搞定。
- 内置支持: 无需安装额外的插件或软件,MySQL自带的功能。
1.3 MySQL全文搜索的局限性
但是,但是,但是!重要的事情说三遍。MySQL的全文搜索,有很多限制,简直让人抓狂:
- 性能问题: 面对海量数据,搜索速度简直慢到怀疑人生。
- 功能简陋: 不支持中文分词,对英文的支持也比较弱,不支持拼写纠错、近义词、权重等高级功能。
- 词库限制: 自带的停用词库很有限,很多常用词都被当成停用词过滤掉了。
- 排序方式单一: 只能按照相关度排序,无法自定义排序规则。
- 更新代价高昂: 数据更新频繁时,全文索引的维护成本很高。
举个例子,假设咱们有个articles
表,存储文章内容:
CREATE TABLE articles (
id INT PRIMARY KEY AUTO_INCREMENT,
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL
);
-- 创建全文索引
ALTER TABLE articles ADD FULLTEXT INDEX article_content_index (title, content);
-- 插入一些数据
INSERT INTO articles (title, content) VALUES
('MySQL的那些事', 'MySQL是一种流行的关系型数据库管理系统。'),
('Elasticsearch入门', 'Elasticsearch是一个基于Lucene的分布式搜索和分析引擎。'),
('MySQL与Elasticsearch的结合', '如何将MySQL的数据同步到Elasticsearch,实现更强大的搜索功能。');
-- 使用全文搜索
SELECT * FROM articles WHERE MATCH (title, content) AGAINST ('MySQL' IN NATURAL LANGUAGE MODE);
上面的例子很简单,但如果数据量很大,或者搜索的关键词比较复杂,MySQL的全文搜索就会显得很吃力。 比如要搜索包含“MySQL”和“Elasticsearch”的文章,或者要按照文章的点击量排序,MySQL就无能为力了。
第二章:Elasticsearch闪亮登场
这时候,就需要请出我们的主角之一:Elasticsearch!
2.1 Elasticsearch是什么?
Elasticsearch(简称ES)是一个基于Lucene的分布式搜索和分析引擎。简单来说,它就是个专门搞搜索的,而且是专家级别的那种。
2.2 Elasticsearch的优点
- 高性能: 基于倒排索引,搜索速度飞快。
- 功能强大: 支持各种高级搜索功能,如分词、拼写纠错、近义词、权重等。
- 可扩展性: 可以轻松扩展到数百台服务器,处理PB级别的数据。
- 灵活的查询语言: 提供了丰富的查询DSL,可以构建各种复杂的查询。
- 实时性: 支持近实时搜索,数据更新后可以很快被搜索到。
2.3 Elasticsearch的核心概念
- Index(索引): 相当于MySQL的数据库。
- Type(类型): 相当于MySQL的表。(注意:ES 7.x 已经移除 Type 的概念,统一为
_doc
) - Document(文档): 相当于MySQL的行,存储具体的数据。
- Field(字段): 相当于MySQL的列。
- Mapping(映射): 定义了文档的结构,包括字段的类型、分词器等。
第三章:MySQL与Elasticsearch的协同作战
既然MySQL和Elasticsearch各有千秋,那么最好的方式就是让他们协同作战,发挥各自的优势。
3.1 数据同步方案
要让Elasticsearch能搜索MySQL的数据,首先需要把数据同步过去。常见的同步方案有以下几种:
- Logstash: 一个强大的数据收集引擎,可以从MySQL读取数据,然后写入Elasticsearch。
- Canal: 阿里巴巴开源的MySQL binlog解析工具,可以实时捕获MySQL的数据变更,然后同步到Elasticsearch。
- 自定义程序: 自己编写程序,监听MySQL的binlog,然后将数据同步到Elasticsearch。
3.2 使用Logstash同步数据
Logstash的配置比较简单,只需要编写一个配置文件,指定输入源(MySQL)和输出目标(Elasticsearch)即可。
input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector-java-8.0.28.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database"
jdbc_user => "your_user"
jdbc_password => "your_password"
statement => "SELECT * FROM articles WHERE update_time > :sql_last_value"
use_column_value => true
tracking_column => "update_time"
tracking_column_type => "timestamp"
schedule => "* * * * *" # 每分钟执行一次
record_last_run => true
last_run_metadata_path => "/path/to/logstash_last_run"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "articles"
document_id => "%{id}" # 使用id作为文档的_id
action => "update"
doc_as_upsert => true
}
stdout { codec => rubydebug }
}
上面的配置文件的意思:
- input: 使用jdbc插件从MySQL读取数据。
jdbc_driver_library
: MySQL驱动的jar包路径。jdbc_driver_class
: MySQL驱动类名。jdbc_connection_string
: MySQL连接字符串。jdbc_user
: MySQL用户名。jdbc_password
: MySQL密码。statement
: SQL查询语句,使用update_time
字段作为增量同步的依据。use_column_value
: 使用字段的值作为增量同步的依据。tracking_column
: 增量同步的字段。tracking_column_type
: 增量同步的字段类型。schedule
: 定时执行的时间间隔,这里是每分钟执行一次。record_last_run
: 记录上次执行的时间。last_run_metadata_path
: 记录上次执行时间的文件路径。
- output: 使用elasticsearch插件将数据写入Elasticsearch。
hosts
: Elasticsearch的地址。index
: Elasticsearch的索引名称。document_id
: Elasticsearch的文档id,这里使用MySQL的id。action
: Elasticsearch的操作类型,这里使用update,如果文档不存在则插入。doc_as_upsert
: 如果文档不存在,则将整个文档作为新文档插入。stdout
: 将数据输出到控制台,方便调试。
3.3 使用Canal同步数据
Canal的原理是模拟MySQL的slave,解析binlog,然后将数据同步到Elasticsearch。Canal的配置相对复杂一些,但性能更高,实时性更好。
Canal的配置主要包括:
- Canal Server配置: 指定MySQL的连接信息、binlog的位置等。
- Canal Client配置: 指定Elasticsearch的连接信息、数据同步的规则等。
具体配置可以参考Canal的官方文档。
3.4 自定义程序同步数据
如果觉得Logstash和Canal都太重了,也可以自己编写程序来同步数据。
// Java代码示例,使用MySQL binlog connector
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class BinlogSync {
public static void main(String[] args) throws IOException {
BinaryLogClient client = new BinaryLogClient("localhost", 3306, "your_user", "your_password");
client.setServerId(1234); // 必须指定一个唯一的server id
Map<Long, String> tableMap = new HashMap<>();
client.registerEventListener(event -> {
EventType eventType = event.getHeader().getEventType();
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableMapEventData = event.getData();
tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getTable());
}
if (eventType == EventType.WRITE_ROWS) {
WriteRowsEventData writeRowsEventData = event.getData();
String table = tableMap.get(writeRowsEventData.getTableId());
if ("articles".equals(table)) {
writeRowsEventData.getRows().forEach(row -> {
// 将数据写入Elasticsearch
System.out.println("Insert: " + row);
// TODO: Implement Elasticsearch index operation
});
}
} else if (eventType == EventType.UPDATE_ROWS) {
UpdateRowsEventData updateRowsEventData = event.getData();
String table = tableMap.get(updateRowsEventData.getTableId());
if ("articles".equals(table)) {
updateRowsEventData.getRows().forEach(row -> {
// 将数据更新到Elasticsearch
System.out.println("Update: " + row);
// TODO: Implement Elasticsearch update operation
});
}
} else if (eventType == EventType.DELETE_ROWS) {
DeleteRowsEventData deleteRowsEventData = event.getData();
String table = tableMap.get(deleteRowsEventData.getTableId());
if ("articles".equals(table)) {
deleteRowsEventData.getRows().forEach(row -> {
// 从Elasticsearch删除数据
System.out.println("Delete: " + row);
// TODO: Implement Elasticsearch delete operation
});
}
}
});
client.connect();
}
}
上面的代码使用了mysql-binlog-connector-java
库来监听MySQL的binlog。当有数据变更时,将数据同步到Elasticsearch。
注意: 自定义程序需要处理各种异常情况,比如网络中断、连接失败等。
3.5 Elasticsearch Mapping配置
在将数据同步到Elasticsearch之前,需要先定义Mapping,指定字段的类型和分词器。
PUT /articles
{
"mappings": {
"properties": {
"id": {
"type": "integer"
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
}
}
}
}
上面的Mapping配置:
id
: 字段类型为integer。title
和content
: 字段类型为text,使用ik_max_word
作为索引时的分词器,使用ik_smart
作为搜索时的分词器。
注意: ik_max_word
和ik_smart
是常用的中文分词器,需要安装elasticsearch-analysis-ik
插件。
3.6 查询优化
数据同步到Elasticsearch之后,就可以使用Elasticsearch的查询DSL进行搜索了。
GET /articles/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"title": "MySQL"
}
},
{
"match": {
"content": "Elasticsearch"
}
}
]
}
},
"sort": [
{
"id": {
"order": "desc"
}
}
]
}
上面的查询DSL:
- 使用
bool
查询,组合多个match
查询。 must
表示必须匹配的条件。sort
表示按照id
字段降序排序。
第四章:注意事项
- 数据一致性: 在使用Logstash或Canal同步数据时,需要考虑数据一致性的问题。可以使用事务消息、两阶段提交等方案来保证数据一致性。
- 性能优化: Elasticsearch的性能优化是一个复杂的话题,需要根据具体的业务场景进行调整。可以考虑调整分片数、副本数、缓存大小等参数。
- 监控: 需要对MySQL和Elasticsearch进行监控,及时发现和解决问题。
第五章:总结
MySQL的全文搜索功能虽然简单易用,但在性能和功能上存在很多局限性。Elasticsearch作为一个专业的搜索引擎,可以弥补MySQL的不足,实现更强大的搜索功能。通过Logstash、Canal或自定义程序,可以将MySQL的数据同步到Elasticsearch,然后使用Elasticsearch的查询DSL进行搜索。
好了,今天的讲座就到这里。希望大家有所收获,谢谢! 散会!