Apache Kafka Connectors 开发与高可用部署:构建健壮的流式 ETL

好的,各位听众,欢迎来到今天的Kafka Connectors“相声”专场! 咳咳,不对,是技术讲座!今天咱们的主题是:Apache Kafka Connectors 开发与高可用部署:构建健壮的流式 ETL。

各位都知道,数据就像金矿,埋在各种犄角旮旯里,等着我们去挖掘。而Kafka Connectors就像是我们的矿铲、传送带,负责把数据从各个源头(比如数据库、文件、API等)挖掘出来,再安全地运送到Kafka这个数据高速公路上。有了它,我们才能方便地进行实时数据分析、构建流式应用,最终把数据变成真金白银。💰💰💰

所以,Kafka Connectors的重要性不言而喻。今天,我们就来好好唠唠嗑,看看怎么打造一个既能挖得快,又能抗得住各种风吹雨打的Kafka Connectors。

第一部分:Kafka Connectors:你的数据搬运工

首先,让我们来认识一下Kafka Connectors这位“搬运工”。

  • 什么是Kafka Connectors?

简单来说,Kafka Connectors是一个框架,它允许你构建可重用的、可配置的连接器(Connectors),用于将数据导入(Source Connector)或导出(Sink Connector)Kafka。

想象一下,你要把MySQL数据库里的数据实时同步到Kafka。如果没有Kafka Connectors,你可能需要自己写一大堆代码,处理各种数据库连接、数据转换、错误处理等等。但是有了Kafka Connectors,你只需要配置一下现成的MySQL Connector,就能轻松搞定。简直是懒人福音!😇

  • Connectors的两大阵营:Source & Sink

就像一支军队,Connectors也分为两大阵营:

*   **Source Connector(源连接器):** 负责把数据从外部系统(比如数据库、文件系统、API)读取到Kafka。它就像是矿山的挖掘机,源源不断地把矿石(数据)挖出来。
*   **Sink Connector(目标连接器):** 负责把数据从Kafka写入到外部系统(比如Elasticsearch、HDFS、数据库)。它就像是矿石加工厂,把Kafka里的矿石(数据)加工成各种产品。
  • Connectors的核心概念

    • Connector: 连接器的实例,负责定义连接的配置,比如连接哪个数据库,使用哪个表等等。
    • Task: 连接器中的任务,负责实际的数据传输工作。一个Connector可以包含多个Task,从而实现并行处理,提高吞吐量。Task就像是挖掘机的驾驶员,负责操作挖掘机进行挖掘。
    • Converter: 负责数据格式转换。Kafka Connector支持多种数据格式,比如JSON、Avro、Protobuf等。Converter就像是翻译官,负责把不同语言的数据翻译成统一的语言。
    • Transform: 负责数据转换和过滤。可以在数据写入Kafka之前,对数据进行一些处理,比如字段重命名、类型转换、数据过滤等。Transform就像是数据美容师,负责把数据打扮得漂漂亮亮的。

第二部分:Connector开发:手把手教你打造专属搬运工

好了,认识了Kafka Connectors的基本概念,接下来我们就来动手打造一个属于自己的Connector。这里我们以Source Connector为例,演示如何从一个简单的REST API读取数据到Kafka。

  • 准备工作

    • 开发环境: 你需要一个Java IDE(比如IntelliJ IDEA或Eclipse),以及Maven或Gradle构建工具。
    • Kafka环境: 你需要一个运行中的Kafka集群。
    • REST API: 你需要一个可以提供数据的REST API。为了方便演示,我们可以使用一个免费的在线API,比如https://jsonplaceholder.typicode.com/todos
  • 创建Connector项目

    使用Maven或Gradle创建一个新的Java项目。

  • 添加依赖

    pom.xmlbuild.gradle文件中添加Kafka Connect API的依赖。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>connect-api</artifactId>
        <version>${kafka.version}</version>
    </dependency>
  • 编写Connector代码

    我们需要创建三个类:

    • RestSourceConnector Connector的入口类,负责配置和启动Task。
    • RestSourceTask Task的实现类,负责实际的数据读取和写入Kafka。
    • RestSourceConnectorConfig 配置类,负责定义Connector的配置参数。

    RestSourceConnectorConfig.java

    import org.apache.kafka.common.config.AbstractConfig;
    import org.apache.kafka.common.config.ConfigDef;
    import org.apache.kafka.common.config.ConfigDef.Importance;
    import org.apache.kafka.common.config.ConfigDef.Type;
    
    import java.util.Map;
    
    public class RestSourceConnectorConfig extends AbstractConfig {
    
        public static final String TOPIC_CONFIG = "topic";
        private static final String TOPIC_DOC = "Topic to write to";
    
        public static final String API_URL_CONFIG = "api.url";
        private static final String API_URL_DOC = "The URL of the REST API to consume";
    
        public static final String POLL_INTERVAL_MS_CONFIG = "poll.interval.ms";
        private static final String POLL_INTERVAL_MS_DOC = "The interval in milliseconds to poll the REST API";
        private static final int POLL_INTERVAL_MS_DEFAULT = 5000;
    
        public RestSourceConnectorConfig(Map<?, ?> originals) {
            super(configDef(), originals);
        }
    
        public static ConfigDef configDef() {
            return new ConfigDef()
                    .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC)
                    .define(API_URL_CONFIG, Type.STRING, Importance.HIGH, API_URL_DOC)
                    .define(POLL_INTERVAL_MS_CONFIG, Type.INT, POLL_INTERVAL_MS_DEFAULT, Importance.MEDIUM, POLL_INTERVAL_MS_DOC);
        }
    
        public String getTopic() {
            return getString(TOPIC_CONFIG);
        }
    
        public String getApiUrl() {
            return getString(API_URL_CONFIG);
        }
    
        public int getPollIntervalMs() {
            return getInt(POLL_INTERVAL_MS_CONFIG);
        }
    }

    RestSourceConnector.java

    import org.apache.kafka.common.config.ConfigDef;
    import org.apache.kafka.connect.connector.Task;
    import org.apache.kafka.connect.source.SourceConnector;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class RestSourceConnector extends SourceConnector {
    
        private Map<String, String> configProps;
    
        @Override
        public String version() {
            return "1.0"; // 版本号
        }
    
        @Override
        public void start(Map<String, String> props) {
            configProps = props;
        }
    
        @Override
        public Class<? extends Task> taskClass() {
            return RestSourceTask.class;
        }
    
        @Override
        public List<Map<String, String>> taskConfigs(int maxTasks) {
            List<Map<String, String>> taskConfigs = new ArrayList<>();
            for (int i = 0; i < maxTasks; i++) {
                taskConfigs.add(configProps);
            }
            return taskConfigs;
        }
    
        @Override
        public void stop() {
            // Nothing to do since Tasks manage themselves.
        }
    
        @Override
        public ConfigDef config() {
            return RestSourceConnectorConfig.configDef();
        }
    }

    RestSourceTask.java

    import org.apache.kafka.connect.data.Schema;
    import org.apache.kafka.connect.data.SchemaBuilder;
    import org.apache.kafka.connect.data.Struct;
    import org.apache.kafka.connect.source.SourceRecord;
    import org.apache.kafka.connect.source.SourceTask;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.http.HttpClient;
    import java.net.http.HttpRequest;
    import java.net.http.HttpResponse;
    import java.time.Instant;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Map;
    
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    public class RestSourceTask extends SourceTask {
    
        private static final Logger log = LoggerFactory.getLogger(RestSourceTask.class);
    
        private String apiUrl;
        private String topic;
        private int pollIntervalMs;
        private HttpClient httpClient;
        private ObjectMapper objectMapper;
    
        private static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
                .field("userId", Schema.INT32_SCHEMA)
                .field("id", Schema.INT32_SCHEMA)
                .field("title", Schema.STRING_SCHEMA)
                .field("completed", Schema.BOOLEAN_SCHEMA)
                .build();
    
        @Override
        public String version() {
            return "1.0";
        }
    
        @Override
        public void start(Map<String, String> props) {
            RestSourceConnectorConfig config = new RestSourceConnectorConfig(props);
            apiUrl = config.getApiUrl();
            topic = config.getTopic();
            pollIntervalMs = config.getPollIntervalMs();
            httpClient = HttpClient.newHttpClient();
            objectMapper = new ObjectMapper();
        }
    
        @Override
        public List<SourceRecord> poll() throws InterruptedException {
            try {
                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create(apiUrl))
                        .build();
    
                HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
                String responseBody = response.body();
    
                List<SourceRecord> records = new ArrayList<>();
                JsonNode jsonArray = objectMapper.readTree(responseBody);
    
                jsonArray.forEach(jsonNode -> {
                    Struct value = new Struct(VALUE_SCHEMA)
                            .put("userId", jsonNode.get("userId").asInt())
                            .put("id", jsonNode.get("id").asInt())
                            .put("title", jsonNode.get("title").asText())
                            .put("completed", jsonNode.get("completed").asBoolean());
    
                    SourceRecord record = new SourceRecord(
                            Collections.singletonMap("api", apiUrl),  // source partition
                            null,                                    // source offset
                            topic,                                  // topic
                            VALUE_SCHEMA,                             // value schema
                            value                                     // value
                    );
                    records.add(record);
                });
    
                Thread.sleep(pollIntervalMs); // 控制API请求频率
                return records;
    
            } catch (IOException e) {
                log.error("Error while fetching data from API: {}", e.getMessage());
                return null;
            }
        }
    
        @Override
        public void stop() {
            // Clean resources if needed
        }
    }
  • 打包Connector

    使用Maven或Gradle将项目打包成一个JAR文件。

  • 安装Connector

    将JAR文件复制到Kafka Connect的插件目录(plugin.path配置)。

  • 配置Connector

    创建一个Connector配置文件,比如rest-source.properties

    name=rest-source-connector
    connector.class=RestSourceConnector
    tasks.max=1
    topic=rest-api-topic
    api.url=https://jsonplaceholder.typicode.com/todos
    poll.interval.ms=5000 # 每5秒钟请求一次API
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
  • 启动Connector

    使用Kafka Connect REST API启动Connector:

    curl -X POST -H "Content-Type: application/json" 
         --data @rest-source.properties 
         http://localhost:8083/connectors
  • 验证结果

    使用Kafka Consumer消费rest-api-topic主题的数据,验证Connector是否正常工作。

    kafka-console-consumer --bootstrap-server localhost:9092 --topic rest-api-topic --from-beginning

    如果一切顺利,你应该能看到从REST API读取的数据被成功写入Kafka。🎉🎉🎉

第三部分:高可用部署:让你的搬运工永不停歇

一个好的搬运工不仅要能搬得快,还要能抗得住各种突发情况。所以,高可用部署对于Kafka Connectors来说至关重要。

  • Kafka Connect的两种部署模式

    • Standalone Mode(独立模式): 只有一个Kafka Connect进程。这种模式简单易用,适合开发测试环境。但是,如果这个进程挂了,所有Connector都会停止工作。
    • Distributed Mode(分布式模式): 多个Kafka Connect进程组成一个集群。这种模式具有高可用性和可扩展性,适合生产环境。即使某个进程挂了,其他进程仍然可以继续工作。
  • 分布式模式的架构

    在分布式模式下,Kafka Connect集群依赖于Kafka本身来实现配置管理、状态管理和任务分配。

    • Configuration Topic: 用于存储Connector的配置信息。
    • Offset Topic: 用于存储Source Connector的偏移量信息,保证数据不会重复消费。
    • Status Topic: 用于存储Connector和Task的状态信息。
    • Group ID: 用于标识Kafka Connect集群。同一个Group ID的进程属于同一个集群。
  • 高可用部署的策略

    • 多节点部署: 部署多个Kafka Connect进程,组成一个集群。
    • 负载均衡: 使用负载均衡器(比如Nginx或HAProxy)将流量分发到不同的Kafka Connect进程。
    • 监控和告警: 监控Kafka Connect进程的状态,及时发现和处理问题。
    • 自动重启: 使用进程管理工具(比如Systemd或Supervisor)自动重启失败的进程。
    • 滚动升级: 逐步升级Kafka Connect集群的进程,避免服务中断。
  • 配置分布式模式

    要启用分布式模式,需要在connect-distributed.properties文件中配置以下参数:

    bootstrap.servers=localhost:9092
    group.id=my-connect-cluster
    config.storage.topic=connect-config
    offset.storage.topic=connect-offsets
    status.storage.topic=connect-status
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    offset.flush.interval.ms=10000
    rest.port=8083

    然后,使用以下命令启动Kafka Connect进程:

    ./bin/connect-distributed.sh config/connect-distributed.properties

    启动多个进程,组成一个集群。

第四部分:最佳实践:打造高效稳定的流式ETL

最后,我们来分享一些使用Kafka Connectors的最佳实践,帮助你打造高效稳定的流式ETL。

  • 选择合适的Connector: 根据你的数据源和目标系统,选择合适的Connector。尽量使用官方或社区维护的Connector,避免重复造轮子。
  • 合理配置Connector: 根据你的业务需求,合理配置Connector的参数,比如tasks.maxpoll.interval.msbatch.size等。
  • 监控Connector状态: 使用Kafka Connect REST API或监控工具,监控Connector的状态,及时发现和处理问题。
  • 处理错误和异常: Connector可能会遇到各种错误和异常,比如网络连接失败、数据格式错误等。要做好错误处理和异常处理,保证数据的完整性和一致性。
  • 使用Transforms进行数据转换: 使用Transforms对数据进行转换和过滤,可以简化下游应用的开发。
  • 测试和验证: 在生产环境部署之前,一定要进行充分的测试和验证,保证Connector的稳定性和可靠性。
  • 版本控制和管理: 对Connector的配置和代码进行版本控制和管理,方便回滚和升级。

总结

今天,我们一起学习了Kafka Connectors的开发和高可用部署。希望通过今天的“相声”,哦不,是技术讲座,大家对Kafka Connectors有了更深入的了解。记住,Kafka Connectors是构建健壮的流式ETL的关键组件,掌握它,你就能轻松驾驭数据,让数据为你创造价值!

最后,祝大家工作顺利,天天开心! 谢谢大家!😊

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注