Elasticsearch深度分页scroll内存溢出?search_after与PointInTime快照优化

Elasticsearch 深度分页难题:Scroll 内存溢出与 Search_After/Point In Time 快照优化

大家好,今天我们来聊聊 Elasticsearch 中深度分页的问题,以及如何利用 scrollsearch_afterPoint in Time (PIT) 快照来优化深度分页,特别是避免内存溢出。

深度分页的挑战:为什么 from/size 不靠谱?

在 Elasticsearch 中,最简单的分页方式就是使用 fromsize 参数。from 指定起始文档的位置,size 指定返回的文档数量。例如:

GET /my_index/_search
{
  "from": 1000,
  "size": 10
}

这段代码会跳过前 1000 个文档,然后返回接下来的 10 个文档。看起来很简单,但当 from 的值变得非常大时,问题就来了。

  • 性能瓶颈: Elasticsearch 需要检索 from + size 个文档,然后在内存中排序,最后丢弃 from 个文档,只返回 size 个。这在 from 值很大时会消耗大量的 CPU 和内存资源。
  • index.max_result_window 限制: Elasticsearch 为了防止深度分页带来的性能问题,默认限制了 from + size 的最大值,由 index.max_result_window 参数控制,默认为 10000。这意味着如果你想分页超过 10000 条数据,就需要采用其他方式。
  • 分页不准确: 在高并发的写入场景下,当你在分页的过程中,数据还在不断写入,这就会导致分页的数据不准确,出现重复或者遗漏的情况。

因此,对于深度分页,我们不能简单地依赖 from/size

scroll:游标式分页的早期方案

scroll API 允许你创建一个游标,用于遍历整个数据集。它的工作方式是:

  1. 初始化 Scroll: 发送一个 _search 请求,并指定 scroll 参数,例如 1m(表示游标的有效期为 1 分钟)。
  2. 获取 Scroll ID: Elasticsearch 会返回一个 scroll_id,用于后续的请求。
  3. 持续 Scroll: 使用 _search/scroll API 和 scroll_id 来获取下一批数据。每次请求都会更新游标的位置。
  4. 清除 Scroll: 当你完成遍历后,应该清除 scroll_id,释放资源。

代码示例:

初始化 Scroll:

GET /my_index/_search?scroll=1m
{
  "query": {
    "match_all": {}
  },
  "size": 1000
}

响应:

{
  "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZnduQW94cVJFdkJQQndyZw==",
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 10000,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my_index",
        "_type": "_doc",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "field1": "value1"
        }
      },
      // ... more hits
    ]
  }
}

持续 Scroll:

POST /_search/scroll
{
  "scroll": "1m",
  "scroll_id": "DXF1ZXJ5QW5kRmV0Y2hBAAAAAAAAAD4WYm9laVYtZnduQW94cVJFdkJQQndyZw=="
}

响应:

{
  "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2hBAAAAAAAAAD8GYm9laVYtZnduQW94cVJFdkJQQndyZw==",
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 10000,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my_index",
        "_type": "_doc",
        "_id": "1001",
        "_score": 1.0,
        "_source": {
          "field1": "value1001"
        }
      },
      // ... more hits
    ]
  }
}

清除 Scroll:

DELETE /_search/scroll
{
  "scroll_id": ["DXF1ZXJ5QW5kRmV0Y2hBAAAAAAAAAD4WYm9laVYtZnduQW94cVJFdkJQQndyZw==", "DXF1ZXJ5QW5kRmV0Y2hBAAAAAAAAAD8GYm9laVYtZnduQW94cVJFdkJQQndyZw=="]
}

或者清除所有scroll

DELETE /_search/scroll/_all

scroll 的优点:

  • 可以遍历大量数据,不受 index.max_result_window 的限制。

scroll 的缺点:

  • 资源消耗: scroll 会创建一个快照,并保持索引段的打开状态,以便在后续请求中访问数据。这会消耗大量的服务器资源,特别是当有多个 scroll 并发执行时,容易导致内存溢出。
  • 实时性问题: scroll 创建的是一个时间点的快照,后续对数据的修改不会反映到 scroll 的结果中。这意味着 scroll 不适合实时性要求高的场景。
  • 无状态性: scroll 是一个游标,需要客户端维护 scroll_id,如果客户端崩溃,scroll 就无法继续,需要重新初始化。

search_after:基于上一页排序值的深度分页

search_after 是一种基于上一页排序值的深度分页方式。它的工作原理是:

  1. 首次查询: 执行一个正常的 _search 请求,并指定排序字段。
  2. 获取排序值: 从结果中获取最后一个文档的排序值。
  3. 使用 search_after 在后续的请求中使用 search_after 参数,将上一个文档的排序值传递给 Elasticsearch。Elasticsearch 会从这个排序值之后开始搜索。

代码示例:

首次查询:

GET /my_index/_search
{
  "size": 10,
  "sort": [
    {"id": "asc"}
  ]
}

响应:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 10000,
      "relation": "eq"
    },
    "max_score": null,
    "hits": [
      {
        "_index": "my_index",
        "_type": "_doc",
        "_id": "1",
        "_score": null,
        "_source": {
          "id": 1,
          "field1": "value1"
        },
        "sort": [
          1
        ]
      },
      {
        "_index": "my_index",
        "_type": "_doc",
        "_id": "2",
        "_score": null,
        "_source": {
          "id": 2,
          "field1": "value2"
        },
        "sort": [
          2
        ]
      },
      // ... more hits
    ]
  }
}

使用 search_after

GET /my_index/_search
{
  "size": 10,
  "sort": [
    {"id": "asc"}
  ],
  "search_after": [
    10  // 上一个文档的排序值
  ]
}

search_after 的优点:

  • 性能更好: search_after 不需要维护游标,每次请求都是一个独立的搜索,避免了 scroll 的资源消耗。
  • 实时性更好: search_after 每次请求都会执行搜索,可以反映最新的数据变化。

search_after 的缺点:

  • 需要排序字段: search_after 必须指定排序字段,并且排序字段的值必须是唯一的。如果排序字段的值不唯一,可能会导致分页结果不准确。
  • 排序字段的选择: 选择合适的排序字段很重要。如果排序字段的基数太低,可能会导致性能问题。
  • 无法跳页: search_after 只能按顺序分页,无法像 from/size 那样跳到任意页。

Point in Time (PIT):快照与 search_after 的结合

Point in Time (PIT) 是 Elasticsearch 7.10 引入的新特性,它可以创建一个索引的快照,并允许你在这个快照上执行搜索。PIT 可以与 search_after 结合使用,以解决 scroll 的资源消耗和实时性问题。

PIT 的工作方式是:

  1. 创建 PIT: 发送一个 POST 请求到 /_pit 端点,指定要创建快照的索引。
  2. 获取 PIT ID: Elasticsearch 会返回一个 pit_id,用于后续的请求。
  3. 使用 PIT ID:_search 请求中使用 pit 参数,指定 pit_id
  4. 结合 search_after 使用 search_after 参数进行分页。
  5. 关闭 PIT: 当你完成遍历后,应该关闭 pit_id,释放资源。

代码示例:

创建 PIT:

POST /my_index/_pit?keep_alive=1m

响应:

{
  "id": "ER1rMEt4WEdYQmdJR2l0dWRRVEF3Zw=="
}

使用 PIT ID 和 search_after

GET /_search
{
  "size": 10,
  "sort": [
    {"id": "asc"}
  ],
  "pit": {
    "id": "ER1rMEt4WEdYQmdJR2l0dWRRVEF3Zw==",
    "keep_alive": "1m"
  },
  "search_after": [
    10
  ]
}

关闭 PIT:

DELETE /_pit
{
  "id": "ER1rMEt4WEdYQmdJR2l0dWRRVEF3Zw=="
}

PIT 的优点:

  • 快照一致性: PIT 保证了在快照创建之后,对数据的修改不会反映到搜索结果中,提供了数据一致性。
  • search_after 完美结合: PIT 可以与 search_after 结合使用,避免了 scroll 的资源消耗和实时性问题。
  • 简化深度分页: PIT 简化了深度分页的实现,只需要维护 pit_id 和排序值即可。

PIT 的缺点:

  • 快照开销: 创建和维护快照需要一定的开销,但通常比 scroll 小得多。
  • 需要排序字段: 仍然需要指定排序字段,并且排序字段的值必须是唯一的。

总结:选择合适的分页方案

特性 from/size scroll search_after PIT + search_after
深度分页 不支持 支持 支持 支持
性能 较差,资源消耗高 较好,每次请求都是独立的搜索 较好,每次请求都是独立的搜索,但需要维护快照
实时性 差,基于快照 好,每次请求都会执行搜索 中等,基于快照,但比 scroll 更灵活
数据一致性 强,基于快照 弱,可能出现重复或遗漏 强,基于快照
资源消耗 高,需要维护游标和快照 低,不需要维护游标 较低,需要维护快照,但通常比 scroll 小得多
实现复杂度 简单 较复杂,需要维护 scroll_id 较复杂,需要维护排序值 较复杂,需要维护 pit_id 和排序值
适用场景 小数据量分页 离线数据处理,对实时性要求不高,能接受数据不一致性 实时性要求较高,数据量较大,能接受数据不一致性 实时性要求不高,数据量较大,要求数据一致性,能接受快照开销
排序字段 可选 可选 必须,且值必须唯一 必须,且值必须唯一
是否可以跳页 可以 不可以 不可以 不可以

选择合适的分页方案取决于你的具体需求,例如数据量、实时性要求、数据一致性要求以及可接受的资源消耗等。

  • 对于小数据量,可以使用 from/size
  • 对于离线数据处理,可以使用 scroll
  • 对于实时性要求较高,可以使用 search_after
  • 对于数据一致性要求较高,可以使用 PIT + search_after

避免 Scroll 内存溢出

在使用 scroll API 时,需要特别注意避免内存溢出。以下是一些建议:

  1. 设置合理的 scroll 有效期: scroll 的有效期由 scroll 参数指定,例如 1m 表示 1 分钟。如果你的应用程序处理数据的速度比较慢,可以适当延长 scroll 的有效期。但是,过长的有效期会占用更多的资源,容易导致内存溢出。因此,应该根据实际情况选择合适的 scroll 有效期。
  2. 减小 size size 参数指定每次 scroll 请求返回的文档数量。减小 size 可以减少每次请求的内存消耗,降低内存溢出的风险。
  3. 及时清除 scroll_id 当你完成遍历后,应该及时清除 scroll_id,释放资源。可以使用 DELETE /_search/scroll API 来清除 scroll_id
  4. 监控 Elasticsearch 集群的资源使用情况: 使用 Elasticsearch 的监控工具,例如 Kibana,可以监控集群的 CPU、内存、磁盘等资源使用情况。如果发现内存使用率过高,应该及时采取措施,例如减小 size、缩短 scroll 有效期、增加集群节点等。
  5. 避免长时间运行的 scroll 尽量避免长时间运行的 scroll,因为长时间运行的 scroll 会占用大量的资源,容易导致内存溢出。可以将大数据集拆分成多个小数据集,分别进行 scroll
  6. 使用更高效的查询: 优化你的查询语句,使其能够更快地返回结果。更高效的查询可以减少 scroll 的时间,降低内存消耗。
  7. 考虑使用 PIT + search_after 如果你的应用程序对数据一致性要求较高,可以考虑使用 PIT + search_after 代替 scrollPIT + search_after 的资源消耗通常比 scroll 小得多,可以有效地避免内存溢出。

实践案例:电商订单分页

假设我们有一个电商平台,需要对订单进行分页显示。订单数据存储在 Elasticsearch 中,包含以下字段:

  • order_id:订单 ID,Long 类型,唯一。
  • user_id:用户 ID,Long 类型。
  • order_time:下单时间,Date 类型。
  • order_amount:订单金额,Double 类型。

现在我们需要按照下单时间倒序分页显示订单数据,每页显示 10 条数据。

使用 PIT + search_after 的代码示例:

创建 PIT:

// Java code using Elasticsearch High Level REST Client
RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(
                new HttpHost("localhost", 9200, "http")));

// Create Point in Time request
OpenPointInTimeRequest openPointInTimeRequest = new OpenPointInTimeRequest("orders", TimeValue.timeValueMinutes(1));
OpenPointInTimeResponse openPointInTimeResponse = client.openPointInTime(openPointInTimeRequest, RequestOptions.DEFAULT);

String pitId = openPointInTimeResponse.getId();

System.out.println("PIT ID: " + pitId);

分页查询:

// Java code using Elasticsearch High Level REST Client
// Initial search
SearchRequest searchRequest = new SearchRequest("orders");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(10);
searchSourceBuilder.sort(new FieldSortBuilder("order_time").order(SortOrder.DESC));
searchSourceBuilder.sort(new FieldSortBuilder("order_id").order(SortOrder.ASC)); // Add a tiebreaker
searchRequest.source(searchSourceBuilder);

PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pitId, TimeValue.timeValueMinutes(1));
searchSourceBuilder.pointInTimeBuilder(pointInTimeBuilder);

SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

SearchHit[] hits = searchResponse.getHits().getHits();

// Process first page of results
for (SearchHit hit : hits) {
    System.out.println(hit.getSourceAsString());
}

Object[] searchAfter = hits[hits.length - 1].getSortValues(); // Store for next query

// Subsequent searches using search_after
for (int i = 0; i < 5; i++) { // Fetch 5 more pages
    SearchRequest nextSearchRequest = new SearchRequest("orders");
    SearchSourceBuilder nextSearchSourceBuilder = new SearchSourceBuilder();
    nextSearchSourceBuilder.size(10);
    nextSearchSourceBuilder.sort(new FieldSortBuilder("order_time").order(SortOrder.DESC));
    nextSearchSourceBuilder.sort(new FieldSortBuilder("order_id").order(SortOrder.ASC)); // Add a tiebreaker
    nextSearchSourceBuilder.searchAfter(searchAfter);
    nextSearchSourceBuilder.pointInTimeBuilder(pointInTimeBuilder);
    nextSearchRequest.source(nextSearchSourceBuilder);

    SearchResponse nextSearchResponse = client.search(nextSearchRequest, RequestOptions.DEFAULT);
    SearchHit[] nextHits = nextSearchResponse.getHits().getHits();

    // Process next page of results
    for (SearchHit hit : nextHits) {
        System.out.println(hit.getSourceAsString());
    }

    if (nextHits.length > 0) {
        searchAfter = nextHits[nextHits.length - 1].getSortValues();
    } else {
        break; // No more results
    }
}

关闭 PIT:

// Java code using Elasticsearch High Level REST Client
// Close Point in Time
ClosePointInTimeRequest closePointInTimeRequest = new ClosePointInTimeRequest();
closePointInTimeRequest.setPointInTimeId(pitId);
ClosePointInTimeResponse closePointInTimeResponse = client.closePointInTime(closePointInTimeRequest, RequestOptions.DEFAULT);

boolean isClosed = closePointInTimeResponse.isSuccessful();
System.out.println("PIT closed: " + isClosed);

client.close();

在这个示例中,我们首先创建了一个 PIT,然后使用 search_after 进行分页查询。每次查询都指定了 pit_id 和排序值。最后,我们关闭了 PIT,释放资源。

注意:

  • 为了保证排序的唯一性,我们使用了 order_id 作为辅助排序字段。
  • 在实际应用中,你需要根据自己的数据结构和业务需求选择合适的排序字段。
  • PIT 的有效期应该根据实际情况进行调整。

根据需求选择合适的分页策略

我们讨论了 Elasticsearch 中深度分页的各种策略,包括 from/sizescrollsearch_afterPoint in Time (PIT)。每种策略都有其优缺点,选择合适的策略取决于你的具体需求。from/size 简单易用,但只适用于小数据量分页。scroll 可以遍历大量数据,但资源消耗较高,实时性较差。search_after 性能较好,实时性较好,但需要指定排序字段,并且排序字段的值必须是唯一的。PIT + search_after 结合了 PITsearch_after 的优点,既保证了数据一致性,又避免了 scroll 的资源消耗。

希望今天的分享能够帮助你更好地理解 Elasticsearch 中的深度分页问题,并选择合适的解决方案。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注