自定义 InputFormat:处理非标准数据源的技巧与实践

自定义 InputFormat:驯服非标准数据源的艺术

各位观众,各位听众,欢迎来到“数据驯兽师”课堂!我是你们的导游,数据魔法师,即将带领大家探索“自定义 InputFormat”这片神秘而充满挑战的土地。今天,我们要学习的不是如何用键盘砸电脑(虽然有时候很想这么干),而是如何优雅地驯服那些桀骜不驯的,非标准数据源。

想象一下,你是一位考古学家,发现了埋藏千年的古墓。里面没有规整的石板,没有统一的文字,只有形状各异的陶片,上面刻着你从未见过的符号。这些陶片就是我们的非标准数据源,而InputFormat 就是你手里的工具,帮助你挖掘、整理、破译这些信息,最终还原历史的真相。

一、 为什么我们需要自定义 InputFormat?

首先,我们来回答一个灵魂拷问:为什么需要自定义 InputFormat?Hadoop 已经提供了那么多的默认 InputFormat,比如 TextInputFormatSequenceFileInputFormatAvroKeyInputFormat,难道还不够用吗?

答案是:图样图森破!世界上的数据千奇百怪,就像恋爱一样,你永远不知道下一秒会遇到什么样的人(或者数据)。默认的 InputFormat 只能处理标准格式的数据,比如文本文件,序列文件等。但现实世界的数据往往是这样的:

  • 自定义格式的文本文件: 比如,每一行数据不是简单的用逗号分隔,而是用一些奇奇怪怪的符号分隔,甚至用固定长度来分割。
  • XML 文件: 层次结构复杂,需要特定的解析方式。
  • JSON 文件: 结构灵活,但处理起来也需要一些技巧。
  • 数据库: 需要通过 JDBC 连接读取数据。
  • NoSQL 数据库: 比如 MongoDB、HBase,需要特定的客户端来访问。
  • 二进制文件: 比如图片、音频、视频,需要进行解码。
  • 甚至,你需要从一个远古时代的磁带机里读取数据! (好吧,这个有点夸张了,但数据世界的可能性是无限的!)

如果你的数据源不符合 Hadoop 默认 InputFormat 的规范,那么你就需要自定义 InputFormat,来告诉 Hadoop 如何读取、解析这些数据。

二、 InputFormat 的核心组件:三大天王

自定义 InputFormat 并不是什么高深的魔法,它其实就是一个接口,你需要实现其中的几个核心组件。你可以把它们想象成三个武功盖世的大侠,分别是:

  1. InputSplit: 数据切分侠。他负责将输入数据分割成多个小的逻辑块,每个块对应一个 Map Task。 你可以想象他是一位经验老道的切肉师傅,将一块巨大的牛肉(输入数据)分割成大小合适的肉块(InputSplit),方便后续的处理。

  2. RecordReader: 数据读取侠。他负责从 InputSplit 中读取数据,并将数据转换成键值对(Key-Value Pair),供 Map Task 使用。 他就像一位辛勤的矿工,从矿山(InputSplit)中挖掘出金子(Key-Value Pair),交给后续的加工厂(Map Task)。

  3. getSplits(): 分配任务侠。他负责根据输入数据的大小和分片策略,生成 InputSplit 列表。 他就像一位调度员,根据任务的规模和资源情况,合理分配任务给不同的工人(Map Task)。

这三大天王各司其职,协同作战,共同完成数据读取的任务。

三、 自定义 InputFormat 的步骤:步步为营

现在,我们来一步步地学习如何自定义 InputFormat。

1. 定义 InputSplit:确定数据分割的策略

InputSplit 类是 Hadoop 中表示输入数据分片的基本单元。你需要根据你的数据源的特性,来决定如何分割数据。

  • 如果是文本文件, 你可以按照行来分割,也可以按照文件大小来分割。
  • 如果是 XML 文件, 你可以按照 XML 文档的结构来分割,比如以根节点为单位分割。
  • 如果是数据库, 你可以按照表中的记录来分割,也可以按照分区来分割。

你需要创建一个自定义的 InputSplit 类,继承自 org.apache.hadoop.mapreduce.InputSplit 抽象类,并实现以下方法:

```java
public class MyInputSplit extends InputSplit implements Writable {

    private String filePath;
    private long start;
    private long length;

    public MyInputSplit() {}

    public MyInputSplit(String filePath, long start, long length) {
        this.filePath = filePath;
        this.start = start;
        this.length = length;
    }

    public String getFilePath() {
        return filePath;
    }

    public long getStart() {
        return start;
    }

    public long getLength() {
        return length;
    }

    @Override
    public long getLength() throws IOException, InterruptedException {
        return length;
    }

    @Override
    public String[] getLocations() throws IOException, InterruptedException {
        // 返回存储该 InputSplit 的数据节点的 hostname
        return new String[] {}; // 可以根据实际情况返回
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(filePath);
        out.writeLong(start);
        out.writeLong(length);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.filePath = in.readUTF();
        this.start = in.readLong();
        this.length = in.readLong();
    }
}
```

**表格:`MyInputSplit` 类成员变量说明**

| 成员变量  | 数据类型 | 含义                                                                                                                                                                                                                                                                  |
| ----------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `filePath`  | String   | 文件路径。存储当前 InputSplit 对应的数据文件路径。                                                                                                                                                                                                                            |
| `start`     | long     | 起始位置。表示当前 InputSplit 在文件中起始的字节偏移量。                                                                                                                                                                                                                          |
| `length`    | long     | 长度。表示当前 InputSplit 包含的字节数。                                                                                                                                                                                                                                  |

2. 定义 RecordReader:读取数据并转换成键值对

RecordReader 类是 Hadoop 中用于读取 InputSplit 中的数据,并将数据转换成键值对的关键组件。你需要创建一个自定义的 RecordReader 类,继承自 org.apache.hadoop.mapreduce.RecordReader 抽象类,并实现以下方法:

```java
public class MyRecordReader extends RecordReader<LongWritable, Text> {

    private MyInputSplit inputSplit;
    private FileSystem fs;
    private Path filePath;
    private InputStreamReader reader;
    private BufferedReader bufferedReader;
    private LongWritable key = new LongWritable();
    private Text value = new Text();
    private long start;
    private long end;
    private long currentPosition;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.inputSplit = (MyInputSplit) split;
        Configuration conf = context.getConfiguration();
        filePath = new Path(inputSplit.getFilePath());
        fs = filePath.getFileSystem(conf);
        start = inputSplit.getStart();
        end = start + inputSplit.getLength();
        FSDataInputStream fileIn = fs.open(filePath);
        fileIn.seek(start); // 定位到 InputSplit 的起始位置
        reader = new InputStreamReader(fileIn, "UTF-8");
        bufferedReader = new BufferedReader(reader);
        this.currentPosition = start;
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        String line = bufferedReader.readLine();
        if (line != null && currentPosition < end) {
            key.set(currentPosition); // 设置 Key 为当前行的起始位置
            value.set(line); // 设置 Value 为当前行的内容
            currentPosition += line.length() + 1; // 更新当前位置
            return true;
        } else {
            return false;
        }
    }

    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (float) (currentPosition - start) / (end - start));
        }
    }

    @Override
    public void close() throws IOException {
        if (bufferedReader != null) {
            bufferedReader.close();
        }
        if (reader != null) {
            reader.close();
        }
    }
}
```

**表格:`MyRecordReader` 类方法说明**

| 方法名          | 返回值   | 参数                                  | 含义                                                                                                                                                                                                                                                                                                                                                     |
| --------------- | -------- | ------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `initialize()`  | void     | `InputSplit split`, `TaskAttemptContext context` | 初始化 RecordReader。 接收一个 InputSplit 对象和一个 TaskAttemptContext 对象。在这里,你需要打开输入流,定位到 InputSplit 的起始位置,并进行必要的初始化操作。                                                                                                                                                                                                                                                                                                                              |
| `nextKeyValue()` | boolean  | 无                                    | 读取下一个 Key-Value Pair。 如果成功读取到下一个 Key-Value Pair,则返回 true,否则返回 false。 这是 RecordReader 的核心方法,负责从输入流中读取数据,并将数据转换成 Key-Value Pair。你需要根据你的数据源的特性,来实现这个方法。                                                                                                                                                                                                                                                                          |
| `getCurrentKey()`  | KEYIN  | 无                                    | 获取当前的 Key。  返回当前 Key-Value Pair 的 Key。                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| `getCurrentValue()` | VALUEIN | 无                                    | 获取当前的 Value。 返回当前 Key-Value Pair 的 Value。                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `getProgress()`   | float    | 无                                    | 获取读取进度。 返回一个 0 到 1 之间的浮点数,表示读取进度。                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| `close()`         | void     | 无                                    | 关闭 RecordReader。 关闭输入流,释放资源。                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |

3. 定义 InputFormat:整合 InputSplit 和 RecordReader

InputFormat 类是 Hadoop 中用于描述输入数据格式的接口。你需要创建一个自定义的 InputFormat 类,继承自 org.apache.hadoop.mapreduce.InputFormat 抽象类,并实现以下方法:

```java
public class MyInputFormat extends InputFormat<LongWritable, Text> {

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        Path inputPath = new Path(conf.get("input.path"));
        FileSystem fs = inputPath.getFileSystem(conf);
        FileStatus fileStatus = fs.getFileStatus(inputPath);
        long fileLength = fileStatus.getLen();
        long blockSize = fileStatus.getBlockSize();
        long splitSize = Math.max(1, Math.min(blockSize, fileLength)); // 确保 splitSize 至少为 1
        List<InputSplit> splits = new ArrayList<>();
        long start = 0;
        while (start < fileLength) {
            long length = Math.min(splitSize, fileLength - start);
            MyInputSplit split = new MyInputSplit(inputPath.toString(), start, length);
            splits.add(split);
            start += length;
        }
        return splits;
    }

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new MyRecordReader();
    }
}
```

**表格:`MyInputFormat` 类方法说明**

| 方法名          | 返回值                      | 参数                                | 含义                                                                                                                                                                                                                                                                                                                                                     |
| --------------- | --------------------------- | ----------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `getSplits()`   | `List<InputSplit>`          | `JobContext context`                | 生成 InputSplit 列表。 根据输入数据的大小和分片策略,生成 InputSplit 列表。 这是 InputFormat 的核心方法之一,负责将输入数据分割成多个小的逻辑块,每个块对应一个 Map Task。你需要根据你的数据源的特性,来实现这个方法。                                                                                                                                                                                                                                                                         |
| `createRecordReader()` | `RecordReader<KEYIN, VALUEIN>` | `InputSplit split`, `TaskAttemptContext context` | 创建 RecordReader。  根据 InputSplit 创建一个 RecordReader 对象,用于读取 InputSplit 中的数据。                                                                                                                                                                                                                                                                                                                                                                            |

4. 配置 MapReduce 作业:指定自定义 InputFormat

最后,你需要配置你的 MapReduce 作业,告诉 Hadoop 使用你自定义的 InputFormat。

```java
Configuration conf = new Configuration();
conf.set("input.path", "hdfs://your_hdfs_path/input.txt"); // 设置输入文件路径
Job job = Job.getInstance(conf, "MyJob");
job.setJarByClass(MyInputFormat.class);
job.setInputFormatClass(MyInputFormat.class); // 设置 InputFormat 类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(conf.get("input.path")));
FileOutputFormat.setOutputPath(job, new Path("hdfs://your_hdfs_path/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
```

四、 实战演练:驯服 XML 这只小怪兽

现在,我们来一个实战演练,以 XML 文件为例,演示如何自定义 InputFormat。

假设我们有一个 XML 文件,内容如下:

<books>
  <book>
    <title>The Lord of the Rings</title>
    <author>J.R.R. Tolkien</author>
  </book>
  <book>
    <title>The Hobbit</title>
    <author>J.R.R. Tolkien</author>
  </book>
</books>

我们的目标是读取 XML 文件,提取每个 book 的 title 和 author。

1. 定义 XMLInputSplit:以 book 节点为单位分割

我们可以创建一个 XMLInputSplit 类,以 <book> 节点为单位分割 XML 文件。

public class XMLInputSplit extends InputSplit implements Writable {

    private String filePath;
    private long start;
    private long end;

    public XMLInputSplit() {}

    public XMLInputSplit(String filePath, long start, long end) {
        this.filePath = filePath;
        this.start = start;
        this.end = end;
    }

    // ...省略 getter 方法和 write/readFields 方法
}

2. 定义 XMLRecordReader:读取 XML 节点并解析数据

我们可以创建一个 XMLRecordReader 类,读取 XML 节点,并使用 XML 解析器(比如 DOM 或 SAX)提取 title 和 author。

public class XMLRecordReader extends RecordReader<Text, Text> {

    private XMLInputSplit inputSplit;
    private FileSystem fs;
    private Path filePath;
    private InputStreamReader reader;
    private BufferedReader bufferedReader;
    private Text key = new Text();
    private Text value = new Text();
    private long start;
    private long end;
    private long currentPosition;
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // ...省略初始化代码
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            try {
                StringBuilder xmlRecord = new StringBuilder();
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    xmlRecord.append(line).append("n");
                    if (line.contains("</book>")) {
                        break; // 找到 </book> 结束标签
                    }
                }

                if (xmlRecord.length() > 0) {
                    String xmlString = xmlRecord.toString();

                    // 使用 XML 解析器解析 XML 数据
                    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
                    DocumentBuilder builder = factory.newDocumentBuilder();
                    Document document = builder.parse(new ByteArrayInputStream(xmlString.getBytes()));
                    document.getDocumentElement().normalize();

                    // 提取 title 和 author
                    NodeList titleList = document.getElementsByTagName("title");
                    NodeList authorList = document.getElementsByTagName("author");

                    if (titleList.getLength() > 0 && authorList.getLength() > 0) {
                        String title = titleList.item(0).getTextContent();
                        String author = authorList.item(0).getTextContent();

                        key.set(title);
                        value.set(author);
                        processed = true;
                        return true;
                    }
                }
                return false;

            } catch (ParserConfigurationException | SAXException e) {
                throw new IOException("Error parsing XML", e);
            }
        }
        return false;
    }

    // ...省略其他方法
}

3. 定义 XMLInputFormat:整合 XMLInputSplit 和 XMLRecordReader

我们可以创建一个 XMLInputFormat 类,整合 XMLInputSplitXMLRecordReader

public class XMLInputFormat extends InputFormat<Text, Text> {

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
        // ...省略 getSplits 方法
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new XMLRecordReader();
    }
}

4. 配置 MapReduce 作业:指定自定义 XMLInputFormat

最后,我们需要配置 MapReduce 作业,指定使用自定义的 XMLInputFormat

Configuration conf = new Configuration();
conf.set("input.path", "hdfs://your_hdfs_path/books.xml");
Job job = Job.getInstance(conf, "XMLJob");
job.setJarByClass(XMLInputFormat.class);
job.setInputFormatClass(XMLInputFormat.class);
job.setMapperClass(XMLMapper.class);
job.setReducerClass(XMLReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(conf.get("input.path")));
FileOutputFormat.setOutputPath(job, new Path("hdfs://your_hdfs_path/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);

五、 总结:数据驯兽师的自我修养

恭喜你,已经成功掌握了自定义 InputFormat 的基本技巧! 🎉 现在,你已经可以驯服一些简单的非标准数据源了。

但是,要成为一名真正的数据驯兽师,还需要不断学习,不断实践。以下是一些建议:

  • 深入理解 Hadoop 的 InputFormat 机制: 了解 Hadoop 如何处理输入数据,如何进行数据切分,如何读取数据。
  • 掌握各种数据格式的解析方法: 熟悉 XML、JSON 等数据格式的解析方法,可以使用 DOM、SAX、Jackson 等工具。
  • 善于利用开源库: 有很多开源库可以帮助你处理非标准数据源,比如 Apache Tika 可以用于提取各种文件格式的文本内容。
  • 多写代码,多调试: 实践是检验真理的唯一标准,只有多写代码,多调试,才能真正掌握自定义 InputFormat 的技巧。
  • 保持一颗好奇心: 数据世界是不断变化的,要保持一颗好奇心,不断探索新的数据源,学习新的技术。

记住,数据驯兽师的最高境界不是征服数据,而是与数据和谐共处,从中发现价值,创造价值。 愿你也能成为一名优秀的数据驯兽师,在数据的海洋中自由翱翔! 🚀

感谢大家的观看,下次再见! 👋

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注