MySQL云原生与分布式之:`MySQL`的`Binlog Stream`:其在`Flink`和`Kafka`数据同步中的应用。

MySQL Binlog Stream:Flink & Kafka 数据同步实战

大家好,今天我们来聊聊 MySQL 的 Binlog Stream,以及它在 Flink 和 Kafka 数据同步中的应用。Binlog(Binary Log)是 MySQL 数据库服务器记录所有更改数据库数据的语句的一种二进制格式的日志。它记录了数据库的所有更新事件,包括插入、更新、删除等操作。而 Binlog Stream,顾名思义,就是将这些 Binlog 事件以流的形式推送出去,这为我们构建实时数据管道提供了基础。

1. Binlog 的基本概念

在深入实践之前,我们先来了解一下 Binlog 的几个关键概念:

  • Format(格式): Binlog 有三种常见的格式:
    • STATEMENT: 记录执行的 SQL 语句。这种格式的优点是日志量小,但缺点是在某些情况下可能导致数据不一致,例如使用了 NOW() 函数或者不确定性的函数。
    • ROW: 记录每一行数据的变化。这种格式的优点是数据一致性高,但缺点是日志量大,特别是对于批量更新操作。
    • MIXED: 混合模式,MySQL 会根据不同的 SQL 语句自动选择 STATEMENT 或 ROW 格式。
  • Position(位置): Binlog 文件中的位置,用于标识读取的起始点。
  • File(文件): Binlog 文件名,用于标识读取的 Binlog 文件。
  • Event(事件): Binlog 中记录的每一个操作,例如插入、更新、删除等。

2. 为什么要使用 Binlog Stream 进行数据同步?

传统的 ETL (Extract, Transform, Load) 流程通常采用批量方式,定时从数据库中抽取数据,然后进行转换和加载。这种方式存在以下缺点:

  • 延迟高: 数据更新到数据仓库的时间间隔较长,无法满足实时分析的需求。
  • 资源消耗大: 每次抽取都需要扫描全表或大量数据,对数据库造成压力。
  • 实时性差: 无法实时反映数据库中的数据变化。

而 Binlog Stream 则可以解决这些问题:

  • 实时性高: 能够近乎实时地捕获数据库中的数据变化。
  • 资源消耗低: 只需读取 Binlog,避免了全表扫描。
  • 可靠性高: Binlog 记录了所有的数据库更新操作,保证了数据的一致性。

3. Binlog Stream 在 Flink 和 Kafka 中的应用架构

我们通常使用以下架构来实现 MySQL 数据到 Flink 和 Kafka 的同步:

+-----------------+      +-----------------+      +-----------------+      +-----------------+      +-----------------+
|   MySQL Server  |----->|  Binlog Connector|----->|      Kafka      |----->|       Flink      |----->|  Data Warehouse |
+-----------------+      +-----------------+      +-----------------+      +-----------------+      +-----------------+
                       |                   |      |                   |      |                   |
                       |  (Debezium, Canal)|      |   (Optional)    |      |   (Processing)   |
                       +-------------------+      +-------------------+      +-------------------+
  • MySQL Server: 存储业务数据的 MySQL 数据库。
  • Binlog Connector: 负责读取 MySQL 的 Binlog,并将其转换为可以被 Kafka 或 Flink 处理的格式。常用的工具有 Debezium 和 Canal。
  • Kafka: 作为消息队列,用于缓冲 Binlog 事件,并提供数据持久化和解耦的功能。
  • Flink: 作为流处理引擎,用于对 Binlog 事件进行实时处理,例如数据转换、清洗、聚合等。
  • Data Warehouse: 存储处理后的数据的目标数据库,例如 ClickHouse、HBase 等。

4. 使用 Debezium Connector for MySQL

Debezium 是一个开源的分布式平台,用于捕获数据库的变更。它提供了一个 MySQL Connector,可以方便地读取 MySQL 的 Binlog。

4.1 配置 MySQL

首先,需要配置 MySQL 启用 Binlog,并设置相应的格式。

-- 1. 开启 Binlog
SET GLOBAL log_bin = ON;

-- 2. 设置 Binlog 格式 (推荐 ROW 或 MIXED)
SET GLOBAL binlog_format = ROW;

-- 3. 设置 server_id (每个 MySQL 实例必须唯一)
SET GLOBAL server_id = 123456;

-- 4. 授权 Debezium 用户
CREATE USER 'debezium'@'%' IDENTIFIED BY 'your_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

重要提示: server_id必须唯一,否则会导致复制错误。 在生产环境中,强烈建议使用独立的 Debezium 用户,并只授予必要的权限。

4.2 配置 Debezium Connector

Debezium Connector 可以以多种方式部署,例如作为 Kafka Connect 的一部分,或者作为独立的服务。 这里我们以 Kafka Connect 为例。

首先,需要下载 Debezium Connector 的相关 JAR 包,并将其放到 Kafka Connect 的插件目录中。

然后,创建一个 Kafka Connect 的配置文件,例如 debezium.json

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "your_mysql_host",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "your_password",
    "database.server.id": "85744",
    "database.server.name": "your_mysql_server",
    "database.include.list": "your_database_name",
    "table.include.list": "your_database_name.your_table_name",
    "database.history.kafka.bootstrap.servers": "your_kafka_bootstrap_servers",
    "database.history.kafka.topic": "your_database_history_topic",
    "decimal.handling.mode": "string",
    "snapshot.mode": "initial"
  }
}

配置项说明:

配置项 说明
connector.class Debezium MySQL Connector 的类名。
tasks.max 并行任务数量。
database.hostname MySQL 服务器的 hostname。
database.port MySQL 服务器的端口号。
database.user 用于连接 MySQL 的用户名。
database.password 用于连接 MySQL 的密码。
database.server.id MySQL 服务器的 ID,必须与 MySQL 实例中的 server_id 保持一致。
database.server.name Debezium 用于标识该 MySQL 连接器的唯一名称。
database.include.list 要捕获的数据库列表,多个数据库之间用逗号分隔。
table.include.list 要捕获的表列表,格式为 database_name.table_name,多个表之间用逗号分隔。
database.history.kafka.bootstrap.servers 用于存储数据库 schema 历史的 Kafka 集群的 bootstrap servers。
database.history.kafka.topic 用于存储数据库 schema 历史的 Kafka topic 名称。
decimal.handling.mode 如何处理 Decimal 类型的数据。可选值包括 double, string, precise
snapshot.mode 如何进行初始快照。可选值包括 initial, schema_only, never, initial_onlyinitial 表示在启动时进行一次全量快照,并将后续的增量数据写入 Kafka。 schema_only 表示只进行 schema 快照,不读取数据。 never 表示不进行快照,只读取增量数据。 initial_only 表示只进行一次全量快照,不读取增量数据。

然后,使用 Kafka Connect REST API 启动 Connector:

curl -X POST -H "Content-Type: application/json" 
     -d @debezium.json 
     http://your_kafka_connect_host:8083/connectors

4.3 Debezium 输出的 Kafka 消息格式

Debezium 输出的 Kafka 消息采用 JSON 格式,包含以下几个部分:

  • schema: 描述数据的 schema 信息。
  • payload: 包含实际的数据内容。

Payload 中通常包含以下字段:

  • before: 修改前的数据。
  • after: 修改后的数据。
  • source: 描述数据来源的信息,例如数据库名称、表名、Binlog 文件名、位置等。
  • op: 操作类型,例如 c (create), u (update), d (delete), r (read)。
  • ts_ms: 事件发生的时间戳。

例如,对于一个插入操作,payload 可能会是这样的:

{
  "schema": {
    "type": "struct",
    "fields": [
      // ... schema details ...
    ],
    "optional": false,
    "name": "your_database_name.your_table_name.Value"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1,
      "name": "test",
      "age": 20
    },
    "source": {
      "version": "1.9.7.Final",
      "connector": "mysql",
      "name": "your_mysql_server",
      "ts_ms": 1678886400000,
      "snapshot": "false",
      "db": "your_database_name",
      "sequence": null,
      "table": "your_table_name",
      "server_id": 123456,
      "gtid": null,
      "file": "mysql-bin.000001",
      "pos": 1234
    },
    "op": "c",
    "ts_ms": 1678886400000,
    "transaction": null
  }
}

5. 使用 Flink 处理 Binlog 事件

接下来,我们使用 Flink 来处理 Kafka 中的 Binlog 事件。

5.1 添加 Flink Kafka Connector 依赖

首先,在 Flink 项目中添加 Kafka Connector 的依赖。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

5.2 创建 Flink Source

然后,创建一个 Flink Source,从 Kafka 中读取 Binlog 事件。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class FlinkBinlogConsumer {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "your_kafka_bootstrap_servers");
        properties.setProperty("group.id", "flink_consumer_group");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "your_kafka_topic",
                new SimpleStringSchema(),
                properties
        );

        DataStream<String> stream = env.addSource(kafkaConsumer);

        // Process the data stream
        stream.print();

        env.execute("Flink Binlog Consumer");
    }
}

5.3 解析 Binlog 事件

接下来,我们需要解析从 Kafka 中读取的 JSON 格式的 Binlog 事件。可以使用 JSON 解析库,例如 Jackson 或 Gson。

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.MapFunction;

public class JsonParser implements MapFunction<String, JsonNode> {
    private static final ObjectMapper mapper = new ObjectMapper();

    @Override
    public JsonNode map(String value) throws Exception {
        return mapper.readTree(value);
    }
}

将 JSON 字符串解析为 JsonNode 对象。

5.4 处理 Binlog 事件

然后,我们可以根据 op 字段来判断操作类型,并进行相应的处理。

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.api.common.functions.MapFunction;

public class BinlogEventHandler implements MapFunction<JsonNode, String> {

    @Override
    public String map(JsonNode value) throws Exception {
        String op = value.get("payload").get("op").asText();
        JsonNode after = value.get("payload").get("after");
        JsonNode before = value.get("payload").get("before");

        switch (op) {
            case "c":
                // Handle create event
                return "Insert: " + after.toString();
            case "u":
                // Handle update event
                return "Update: Before=" + before.toString() + ", After=" + after.toString();
            case "d":
                // Handle delete event
                return "Delete: " + before.toString();
            case "r":
                // Handle read event (usually not needed for CDC)
                return "Read: " + after.toString();
            default:
                return "Unknown operation: " + op;
        }
    }
}

5.5 将处理后的数据写入目标数据库

最后,将处理后的数据写入目标数据库,例如 ClickHouse 或 HBase。可以使用 Flink 的 Sink Connector 来实现。

例如,写入 ClickHouse:

import org.apache.flink.streaming.api.datastream.DataStream;
import ru.ivi.opensource.flinkclickhouse.ClickhouseSink;
import ru.ivi.opensource.flinkclickhouse.config.ClickhouseSinkConfig;

import java.util.Properties;

public class ClickHouseWriter {

    public static void writeToClickHouse(DataStream<String> stream) {
        Properties properties = new Properties();
        properties.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver");
        properties.setProperty("url", "jdbc:clickhouse://your_clickhouse_host:8123/your_database_name");
        properties.setProperty("username", "your_clickhouse_user");
        properties.setProperty("password", "your_clickhouse_password");
        properties.setProperty("batchsize", "1000");
        properties.setProperty("flushinterval", "1000");

        ClickhouseSinkConfig sinkConfig = new ClickhouseSinkConfig.ClickhouseSinkConfigBuilder()
                .withJdbcProperties(properties)
                .withRetry(5)
                .build();

        ClickhouseSink<String> clickhouseSink = new ClickhouseSink<>(
                "your_table_name",
                sinkConfig
        );

        stream.addSink(clickhouseSink);
    }
}

5.6 完整 Flink 代码示例

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class FlinkBinlogPipeline {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "your_kafka_bootstrap_servers");
        properties.setProperty("group.id", "flink_consumer_group");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "your_kafka_topic",
                new SimpleStringSchema(),
                properties
        );

        DataStream<String> stream = env.addSource(kafkaConsumer);

        DataStream<JsonNode> jsonStream = stream.map(new JsonParser());

        DataStream<String> processedStream = jsonStream.map(new BinlogEventHandler());

        // Write to ClickHouse (example)
        // ClickHouseWriter.writeToClickHouse(processedStream);

        processedStream.print();

        env.execute("Flink Binlog Pipeline");
    }
}

注意: 上面的代码只是一个简单的示例,实际应用中需要根据具体的业务需求进行调整。 例如:需要根据实际的表结构来定义 Flink 的 DataStream 的 Schema,并且需要处理各种异常情况。

6. 使用 Canal 进行数据同步

Canal 是阿里巴巴开源的基于数据库增量日志解析,提供增量数据订阅&消费的组件。 相比 Debezium,Canal 更轻量级,并且对 MySQL 的兼容性更好。

6.1 Canal Server 安装与配置

首先,下载 Canal Server 并解压。

然后,修改 Canal Server 的配置文件 conf/example/instance.properties

canal.instance.mysql.slaveId=1234
canal.instance.master.address=your_mysql_host:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=1234
canal.instance.master.username=canal
canal.instance.master.password=your_password
canal.instance.connectionCharset=UTF-8
canal.instance.tsdb.enable=false

配置项说明:

  • canal.instance.mysql.slaveId: Canal Server 的 Slave ID,必须与 MySQL 实例中的 server_id 不同。
  • canal.instance.master.address: MySQL 服务器的地址和端口。
  • canal.instance.master.journal.name: Binlog 文件名。
  • canal.instance.master.position: Binlog 位置。
  • canal.instance.master.username: 用于连接 MySQL 的用户名。
  • canal.instance.master.password: 用于连接 MySQL 的密码。
  • canal.instance.connectionCharset: 连接 MySQL 的字符集。
  • canal.instance.tsdb.enable: 是否启用时间序列数据库。

6.2 Canal Client 开发

Canal Client 可以使用 Java SDK 来开发。

首先,添加 Canal Client 的依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>
</dependency>

然后,编写 Canal Client 代码:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {

    public static void main(String[] args) throws Exception {
        // 创建 Canal 连接器
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(AddressUtils.getHostAddress(), 11111),
                "example",
                "canal",
                "canal"
        );

        int batchSize = 1000;

        try {
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();

            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                } else {
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                 CanalEntry.RowChange rowChange;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }

                CanalEntry.EventType eventType = rowChange.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] ,name[%s,%s] , eventType :%s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------&gt; before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------&gt; after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

代码说明:

  • CanalConnector: Canal 连接器,用于连接 Canal Server。
  • connector.connect(): 连接 Canal Server。
  • connector.subscribe(".*\..*"): 订阅所有数据库的所有表。 可以根据实际需求修改订阅规则。
  • connector.getWithoutAck(batchSize): 获取指定数量的数据。
  • connector.ack(batchId): 提交确认。
  • connector.rollback(batchId): 处理失败, 回滚数据。

6.3 Canal 与 Kafka 集成

可以将 Canal Client 获取到的数据发送到 Kafka,供 Flink 或其他应用程序消费。

可以使用 Kafka Producer API 来实现。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {

    public static void sendToKafka(String message) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "your_kafka_bootstrap_servers");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("your_kafka_topic", message));

        producer.close();
    }
}

在 Canal Client 中调用 KafkaProducerExample.sendToKafka(message) 方法,将数据发送到 Kafka。

7. 总结

本文介绍了 MySQL Binlog Stream 的基本概念和应用,以及如何使用 Debezium 和 Canal 进行数据同步,并使用 Flink 处理 Binlog 事件。 通过这些技术,我们可以构建实时的数据管道,满足实时分析的需求。

8. 关键步骤概括

  • 配置 MySQL 启用 Binlog,设置适当的格式。
  • 选择合适的 Binlog Connector (Debezium 或 Canal),并进行配置。
  • 使用 Flink 从 Kafka 中读取 Binlog 事件,进行解析和处理。
  • 将处理后的数据写入目标数据库,例如 ClickHouse 或 HBase。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注