好的,我们开始今天的讲座。主题是Elasticsearch 8.17 Java API Client在响应式链中BulkIngester背压触发Retry时请求重复的问题,以及如何通过BulkRetryListener与幂等性去重解决。
背景
在构建高吞吐量的Elasticsearch数据索引管道时,BulkIngester 是一个非常有用的工具。 它允许你批量提交文档,从而显著提高索引速度。 然而,在高负载情况下,Elasticsearch集群可能会出现资源瓶颈,导致 BulkIngester 产生背压。 背压通常通过触发重试机制来处理。 然而,简单地重试所有失败的请求可能会导致重复文档的问题,特别是在某些情况下,例如网络问题导致请求实际上已经成功,但客户端没有收到确认。
我们的目标是理解在响应式链中使用 BulkIngester 时重试机制如何工作,并探讨如何使用 BulkRetryListener 和幂等性去重来避免重复文档。
问题分析:BulkIngester和响应式链的重试机制
BulkIngester 基于 Elasticsearch Java API Client 构建,后者支持响应式编程模型。 在响应式链中,重试通常由 retryWhen 操作符处理。 当 BulkIngester 遇到错误时(例如,由于 Elasticsearch 集群过载),它会发出一个错误信号。 retryWhen 操作符会捕获这个错误,并根据配置的策略(例如,指数退避)决定是否重试。
关键问题在于,在重试过程中,我们必须确保不会重复索引相同的文档。 这意味着我们需要一种机制来识别和过滤重复的请求。
示例代码:简单的BulkIngester与响应式链
首先,让我们看一个简单的使用 BulkIngester 和响应式链的示例:
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class BulkIngesterExample {
public static void main(String[] args) throws IOException, InterruptedException {
// 1. Create an Elasticsearch client
ElasticsearchClient client = createEsClient();
// 2. Define the index name
String indexName = "my-index";
// 3. Create a Flux of JSON documents
Flux<JsonObject> documentFlux = Flux.fromStream(
IntStream.range(0, 100).mapToObj(i -> {
String id = UUID.randomUUID().toString();
return Json.createObjectBuilder()
.add("id", id)
.add("field1", "value" + i)
.add("field2", i)
.build();
})
);
// 4. Process the documents using BulkIngester with retry
processDocuments(client, indexName, documentFlux);
Thread.sleep(5000); // Wait for the process to complete
}
private static ElasticsearchClient createEsClient() {
RestClient restClient = RestClient
.builder(new HttpHost("localhost", 9200))
.build();
return new ElasticsearchClient(restClient.getLowLevelClient(), new JacksonJsonpMapper());
}
private static void processDocuments(ElasticsearchClient client, String indexName, Flux<JsonObject> documentFlux) {
documentFlux
.publishOn(Schedulers.boundedElastic()) // Offload processing to a dedicated scheduler
.window(10) // Batch documents into groups of 10
.flatMap(batch -> batch.collectList()) // Collect each batch into a list
.flatMap(documents -> {
try {
return bulkIndex(client, indexName, documents);
} catch (IOException e) {
return reactor.core.publisher.Mono.error(e);
}
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) // Retry up to 3 times with exponential backoff
.filter(throwable -> throwable instanceof IOException) // Only retry IOExceptions
.onRetry(retrySignal -> System.out.println("Retrying bulk index operation, attempt: " + retrySignal.totalRetries() + 1)))
.subscribe(
response -> {
if (response.errors()) {
System.err.println("Bulk index operation failed with errors:");
for (BulkResponseItem item : response.items()) {
if (item.error() != null) {
System.err.println(" Error for document " + item.id() + ": " + item.error());
}
}
} else {
System.out.println("Bulk index operation successful. Took: " + response.took());
}
},
error -> System.err.println("Error during bulk index operation: " + error),
() -> System.out.println("Bulk index operation completed.")
);
}
private static reactor.core.publisher.Mono<BulkResponse> bulkIndex(ElasticsearchClient client, String indexName, List<JsonObject> documents) throws IOException {
return reactor.core.publisher.Mono.fromCallable(() -> {
return client.bulk(b -> {
documents.forEach(doc -> b.index(idx -> idx
.index(indexName)
.id(doc.getString("id")) // Assuming 'id' field exists
.document(doc)
));
return b;
});
});
}
}
在这个示例中,我们创建了一个 Flux 来生成 JSON 文档,然后使用 window 操作符将它们分批处理。 flatMap 操作符用于调用 bulkIndex 方法,该方法将文档批量索引到 Elasticsearch 中。 retryWhen 操作符用于在发生 IOException 时重试批量索引操作。
问题:重复文档
如果 bulkIndex 方法在 Elasticsearch 集群过载时失败,retryWhen 操作符将重试该操作。 但是,如果 Elasticsearch 实际上已经索引了部分或全部文档,则重试将导致重复文档。 在生产环境中,网络延迟或其他瞬时问题会导致同样的结果,即使客户端最终接收到了错误信息。
解决方案1:BulkRetryListener
BulkRetryListener 接口允许你拦截和修改批量请求的重试行为。 你可以使用 BulkRetryListener 来记录重试尝试,或者在重试之前修改请求。
然而,BulkRetryListener 本身并不能解决重复文档的问题。 它只是提供了一个钩子,让你可以在重试之前执行一些自定义逻辑。 你仍然需要实现一些额外的机制来避免重复文档。
解决方案2:幂等性去重
幂等性是指一个操作无论执行多少次,其结果都是相同的。 换句话说,如果一个操作是幂等的,那么你可以安全地重试该操作,而不用担心会产生副作用。
为了使批量索引操作具有幂等性,我们可以使用以下方法:
-
使用文档 ID: 确保每个文档都有一个唯一的 ID。 Elasticsearch 使用文档 ID 来确定是否已经存在该文档。 如果你尝试索引具有相同 ID 的文档,Elasticsearch 将覆盖现有文档。 在上面的代码示例中,我们已经使用了 UUID 作为文档 ID。 这是确保文档 ID 唯一的常用方法。
-
使用
_version和if_seq_no+if_primary_term: Elasticsearch 提供了乐观锁机制,允许你仅在文档的当前版本与预期版本匹配时才更新文档。 你可以使用_version字段来指定预期版本。 或者,你可以使用if_seq_no和if_primary_term字段,它们提供了更强大的并发控制。
让我们修改上面的代码示例,以使用 if_seq_no 和 if_primary_term 来实现幂等性:
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class BulkIngesterExampleIdempotent {
public static void main(String[] args) throws IOException, InterruptedException {
// 1. Create an Elasticsearch client
ElasticsearchClient client = createEsClient();
// 2. Define the index name
String indexName = "my-index";
// 3. Create a Flux of JSON documents
Flux<JsonObject> documentFlux = Flux.fromStream(
IntStream.range(0, 100).mapToObj(i -> {
String id = UUID.randomUUID().toString();
return Json.createObjectBuilder()
.add("id", id)
.add("field1", "value" + i)
.add("field2", i)
.build();
})
);
// 4. Process the documents using BulkIngester with retry
processDocuments(client, indexName, documentFlux);
Thread.sleep(5000); // Wait for the process to complete
}
private static ElasticsearchClient createEsClient() {
RestClient restClient = RestClient
.builder(new HttpHost("localhost", 9200))
.build();
return new ElasticsearchClient(restClient.getLowLevelClient(), new JacksonJsonpMapper());
}
private static void processDocuments(ElasticsearchClient client, String indexName, Flux<JsonObject> documentFlux) {
documentFlux
.publishOn(Schedulers.boundedElastic()) // Offload processing to a dedicated scheduler
.window(10) // Batch documents into groups of 10
.flatMap(batch -> batch.collectList()) // Collect each batch into a list
.flatMap(documents -> {
try {
return bulkIndex(client, indexName, documents);
} catch (IOException e) {
return reactor.core.publisher.Mono.error(e);
}
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) // Retry up to 3 times with exponential backoff
.filter(throwable -> throwable instanceof IOException) // Only retry IOExceptions
.onRetry(retrySignal -> System.out.println("Retrying bulk index operation, attempt: " + retrySignal.totalRetries() + 1)))
.subscribe(
response -> {
if (response.errors()) {
System.err.println("Bulk index operation failed with errors:");
for (BulkResponseItem item : response.items()) {
if (item.error() != null) {
System.err.println(" Error for document " + item.id() + ": " + item.error());
}
}
} else {
System.out.println("Bulk index operation successful. Took: " + response.took());
}
},
error -> System.err.println("Error during bulk index operation: " + error),
() -> System.out.println("Bulk index operation completed.")
);
}
private static reactor.core.publisher.Mono<BulkResponse> bulkIndex(ElasticsearchClient client, String indexName, List<JsonObject> documents) throws IOException {
return reactor.core.publisher.Mono.fromCallable(() -> {
return client.bulk(b -> {
documents.forEach(doc -> {
String id = doc.getString("id");
// Try to update existing document, if not exists, create new
b.update(ub -> ub
.index(indexName)
.id(id)
.action(a -> a.doc(doc).docAsUpsert(true)) //Upsert
);
});
return b;
});
});
}
}
在这个修改后的示例中,我们使用 update API和 doc_as_upsert来实现幂等性。
高级方案:使用外部存储进行去重
在某些情况下,仅依靠 Elasticsearch 的幂等性机制可能不够。 例如,你可能需要处理以下情况:
- 文档 ID 不是唯一的。
- 你无法使用
_version或if_seq_no+if_primary_term。 - 你需要在索引之前执行一些复杂的去重逻辑。
在这种情况下,你可以使用外部存储(例如 Redis 或数据库)来跟踪已索引的文档。 在索引文档之前,你可以查询外部存储以查看该文档是否已索引。 如果是,则跳过索引操作。
表格:不同方案的比较
| 方案 | 优点 | 缺点 | 适用场景 |
|---|