Elasticsearch 8.17 Java API Client在响应式链中BulkIngester背压触发Retry时请求重复?BulkRetryListener与幂等性去重

好的,我们开始今天的讲座。主题是Elasticsearch 8.17 Java API Client在响应式链中BulkIngester背压触发Retry时请求重复的问题,以及如何通过BulkRetryListener与幂等性去重解决。

背景

在构建高吞吐量的Elasticsearch数据索引管道时,BulkIngester 是一个非常有用的工具。 它允许你批量提交文档,从而显著提高索引速度。 然而,在高负载情况下,Elasticsearch集群可能会出现资源瓶颈,导致 BulkIngester 产生背压。 背压通常通过触发重试机制来处理。 然而,简单地重试所有失败的请求可能会导致重复文档的问题,特别是在某些情况下,例如网络问题导致请求实际上已经成功,但客户端没有收到确认。

我们的目标是理解在响应式链中使用 BulkIngester 时重试机制如何工作,并探讨如何使用 BulkRetryListener 和幂等性去重来避免重复文档。

问题分析:BulkIngester和响应式链的重试机制

BulkIngester 基于 Elasticsearch Java API Client 构建,后者支持响应式编程模型。 在响应式链中,重试通常由 retryWhen 操作符处理。 当 BulkIngester 遇到错误时(例如,由于 Elasticsearch 集群过载),它会发出一个错误信号。 retryWhen 操作符会捕获这个错误,并根据配置的策略(例如,指数退避)决定是否重试。

关键问题在于,在重试过程中,我们必须确保不会重复索引相同的文档。 这意味着我们需要一种机制来识别和过滤重复的请求。

示例代码:简单的BulkIngester与响应式链

首先,让我们看一个简单的使用 BulkIngester 和响应式链的示例:

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class BulkIngesterExample {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 1. Create an Elasticsearch client
        ElasticsearchClient client = createEsClient();

        // 2. Define the index name
        String indexName = "my-index";

        // 3. Create a Flux of JSON documents
        Flux<JsonObject> documentFlux = Flux.fromStream(
                IntStream.range(0, 100).mapToObj(i -> {
                    String id = UUID.randomUUID().toString();
                    return Json.createObjectBuilder()
                            .add("id", id)
                            .add("field1", "value" + i)
                            .add("field2", i)
                            .build();
                })
        );

        // 4. Process the documents using BulkIngester with retry
        processDocuments(client, indexName, documentFlux);

        Thread.sleep(5000);  // Wait for the process to complete
    }

    private static ElasticsearchClient createEsClient() {
        RestClient restClient = RestClient
                .builder(new HttpHost("localhost", 9200))
                .build();

        return new ElasticsearchClient(restClient.getLowLevelClient(), new JacksonJsonpMapper());
    }

    private static void processDocuments(ElasticsearchClient client, String indexName, Flux<JsonObject> documentFlux) {
        documentFlux
                .publishOn(Schedulers.boundedElastic()) // Offload processing to a dedicated scheduler
                .window(10) // Batch documents into groups of 10
                .flatMap(batch -> batch.collectList()) // Collect each batch into a list
                .flatMap(documents -> {
                    try {
                        return bulkIndex(client, indexName, documents);
                    } catch (IOException e) {
                        return reactor.core.publisher.Mono.error(e);
                    }
                })
                .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) // Retry up to 3 times with exponential backoff
                        .filter(throwable -> throwable instanceof IOException) // Only retry IOExceptions
                        .onRetry(retrySignal -> System.out.println("Retrying bulk index operation, attempt: " + retrySignal.totalRetries() + 1)))
                .subscribe(
                        response -> {
                            if (response.errors()) {
                                System.err.println("Bulk index operation failed with errors:");
                                for (BulkResponseItem item : response.items()) {
                                    if (item.error() != null) {
                                        System.err.println("  Error for document " + item.id() + ": " + item.error());
                                    }
                                }
                            } else {
                                System.out.println("Bulk index operation successful. Took: " + response.took());
                            }
                        },
                        error -> System.err.println("Error during bulk index operation: " + error),
                        () -> System.out.println("Bulk index operation completed.")
                );
    }

    private static reactor.core.publisher.Mono<BulkResponse> bulkIndex(ElasticsearchClient client, String indexName, List<JsonObject> documents) throws IOException {
        return reactor.core.publisher.Mono.fromCallable(() -> {
            return client.bulk(b -> {
                documents.forEach(doc -> b.index(idx -> idx
                        .index(indexName)
                        .id(doc.getString("id")) // Assuming 'id' field exists
                        .document(doc)
                ));
                return b;
            });
        });
    }
}

在这个示例中,我们创建了一个 Flux 来生成 JSON 文档,然后使用 window 操作符将它们分批处理。 flatMap 操作符用于调用 bulkIndex 方法,该方法将文档批量索引到 Elasticsearch 中。 retryWhen 操作符用于在发生 IOException 时重试批量索引操作。

问题:重复文档

如果 bulkIndex 方法在 Elasticsearch 集群过载时失败,retryWhen 操作符将重试该操作。 但是,如果 Elasticsearch 实际上已经索引了部分或全部文档,则重试将导致重复文档。 在生产环境中,网络延迟或其他瞬时问题会导致同样的结果,即使客户端最终接收到了错误信息。

解决方案1:BulkRetryListener

BulkRetryListener 接口允许你拦截和修改批量请求的重试行为。 你可以使用 BulkRetryListener 来记录重试尝试,或者在重试之前修改请求。

然而,BulkRetryListener 本身并不能解决重复文档的问题。 它只是提供了一个钩子,让你可以在重试之前执行一些自定义逻辑。 你仍然需要实现一些额外的机制来避免重复文档。

解决方案2:幂等性去重

幂等性是指一个操作无论执行多少次,其结果都是相同的。 换句话说,如果一个操作是幂等的,那么你可以安全地重试该操作,而不用担心会产生副作用。

为了使批量索引操作具有幂等性,我们可以使用以下方法:

  1. 使用文档 ID: 确保每个文档都有一个唯一的 ID。 Elasticsearch 使用文档 ID 来确定是否已经存在该文档。 如果你尝试索引具有相同 ID 的文档,Elasticsearch 将覆盖现有文档。 在上面的代码示例中,我们已经使用了 UUID 作为文档 ID。 这是确保文档 ID 唯一的常用方法。

  2. 使用 _versionif_seq_no + if_primary_term Elasticsearch 提供了乐观锁机制,允许你仅在文档的当前版本与预期版本匹配时才更新文档。 你可以使用 _version 字段来指定预期版本。 或者,你可以使用 if_seq_noif_primary_term 字段,它们提供了更强大的并发控制。

让我们修改上面的代码示例,以使用 if_seq_noif_primary_term 来实现幂等性:

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class BulkIngesterExampleIdempotent {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 1. Create an Elasticsearch client
        ElasticsearchClient client = createEsClient();

        // 2. Define the index name
        String indexName = "my-index";

        // 3. Create a Flux of JSON documents
        Flux<JsonObject> documentFlux = Flux.fromStream(
                IntStream.range(0, 100).mapToObj(i -> {
                    String id = UUID.randomUUID().toString();
                    return Json.createObjectBuilder()
                            .add("id", id)
                            .add("field1", "value" + i)
                            .add("field2", i)
                            .build();
                })
        );

        // 4. Process the documents using BulkIngester with retry
        processDocuments(client, indexName, documentFlux);

        Thread.sleep(5000);  // Wait for the process to complete
    }

    private static ElasticsearchClient createEsClient() {
        RestClient restClient = RestClient
                .builder(new HttpHost("localhost", 9200))
                .build();

        return new ElasticsearchClient(restClient.getLowLevelClient(), new JacksonJsonpMapper());
    }

    private static void processDocuments(ElasticsearchClient client, String indexName, Flux<JsonObject> documentFlux) {
        documentFlux
                .publishOn(Schedulers.boundedElastic()) // Offload processing to a dedicated scheduler
                .window(10) // Batch documents into groups of 10
                .flatMap(batch -> batch.collectList()) // Collect each batch into a list
                .flatMap(documents -> {
                    try {
                        return bulkIndex(client, indexName, documents);
                    } catch (IOException e) {
                        return reactor.core.publisher.Mono.error(e);
                    }
                })
                .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) // Retry up to 3 times with exponential backoff
                        .filter(throwable -> throwable instanceof IOException) // Only retry IOExceptions
                        .onRetry(retrySignal -> System.out.println("Retrying bulk index operation, attempt: " + retrySignal.totalRetries() + 1)))
                .subscribe(
                        response -> {
                            if (response.errors()) {
                                System.err.println("Bulk index operation failed with errors:");
                                for (BulkResponseItem item : response.items()) {
                                    if (item.error() != null) {
                                        System.err.println("  Error for document " + item.id() + ": " + item.error());
                                    }
                                }
                            } else {
                                System.out.println("Bulk index operation successful. Took: " + response.took());
                            }
                        },
                        error -> System.err.println("Error during bulk index operation: " + error),
                        () -> System.out.println("Bulk index operation completed.")
                );
    }

    private static reactor.core.publisher.Mono<BulkResponse> bulkIndex(ElasticsearchClient client, String indexName, List<JsonObject> documents) throws IOException {
        return reactor.core.publisher.Mono.fromCallable(() -> {
            return client.bulk(b -> {
                documents.forEach(doc -> {
                    String id = doc.getString("id");
                    // Try to update existing document, if not exists, create new
                    b.update(ub -> ub
                            .index(indexName)
                            .id(id)
                            .action(a -> a.doc(doc).docAsUpsert(true)) //Upsert
                    );
                });
                return b;
            });
        });
    }
}

在这个修改后的示例中,我们使用 update API和 doc_as_upsert来实现幂等性。

高级方案:使用外部存储进行去重

在某些情况下,仅依靠 Elasticsearch 的幂等性机制可能不够。 例如,你可能需要处理以下情况:

  • 文档 ID 不是唯一的。
  • 你无法使用 _versionif_seq_no + if_primary_term
  • 你需要在索引之前执行一些复杂的去重逻辑。

在这种情况下,你可以使用外部存储(例如 Redis 或数据库)来跟踪已索引的文档。 在索引文档之前,你可以查询外部存储以查看该文档是否已索引。 如果是,则跳过索引操作。

表格:不同方案的比较

方案 优点 缺点 适用场景

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注