自定义 InputFormat:驯服非标准数据源的艺术
各位观众,各位听众,欢迎来到“数据驯兽师”课堂!我是你们的导游,数据魔法师,即将带领大家探索“自定义 InputFormat”这片神秘而充满挑战的土地。今天,我们要学习的不是如何用键盘砸电脑(虽然有时候很想这么干),而是如何优雅地驯服那些桀骜不驯的,非标准数据源。
想象一下,你是一位考古学家,发现了埋藏千年的古墓。里面没有规整的石板,没有统一的文字,只有形状各异的陶片,上面刻着你从未见过的符号。这些陶片就是我们的非标准数据源,而InputFormat 就是你手里的工具,帮助你挖掘、整理、破译这些信息,最终还原历史的真相。
一、 为什么我们需要自定义 InputFormat?
首先,我们来回答一个灵魂拷问:为什么需要自定义 InputFormat?Hadoop 已经提供了那么多的默认 InputFormat,比如 TextInputFormat
、SequenceFileInputFormat
、AvroKeyInputFormat
,难道还不够用吗?
答案是:图样图森破!世界上的数据千奇百怪,就像恋爱一样,你永远不知道下一秒会遇到什么样的人(或者数据)。默认的 InputFormat 只能处理标准格式的数据,比如文本文件,序列文件等。但现实世界的数据往往是这样的:
- 自定义格式的文本文件: 比如,每一行数据不是简单的用逗号分隔,而是用一些奇奇怪怪的符号分隔,甚至用固定长度来分割。
- XML 文件: 层次结构复杂,需要特定的解析方式。
- JSON 文件: 结构灵活,但处理起来也需要一些技巧。
- 数据库: 需要通过 JDBC 连接读取数据。
- NoSQL 数据库: 比如 MongoDB、HBase,需要特定的客户端来访问。
- 二进制文件: 比如图片、音频、视频,需要进行解码。
- 甚至,你需要从一个远古时代的磁带机里读取数据! (好吧,这个有点夸张了,但数据世界的可能性是无限的!)
如果你的数据源不符合 Hadoop 默认 InputFormat 的规范,那么你就需要自定义 InputFormat,来告诉 Hadoop 如何读取、解析这些数据。
二、 InputFormat 的核心组件:三大天王
自定义 InputFormat 并不是什么高深的魔法,它其实就是一个接口,你需要实现其中的几个核心组件。你可以把它们想象成三个武功盖世的大侠,分别是:
-
InputSplit
: 数据切分侠。他负责将输入数据分割成多个小的逻辑块,每个块对应一个 Map Task。 你可以想象他是一位经验老道的切肉师傅,将一块巨大的牛肉(输入数据)分割成大小合适的肉块(InputSplit),方便后续的处理。 -
RecordReader
: 数据读取侠。他负责从 InputSplit 中读取数据,并将数据转换成键值对(Key-Value Pair),供 Map Task 使用。 他就像一位辛勤的矿工,从矿山(InputSplit)中挖掘出金子(Key-Value Pair),交给后续的加工厂(Map Task)。 -
getSplits()
: 分配任务侠。他负责根据输入数据的大小和分片策略,生成 InputSplit 列表。 他就像一位调度员,根据任务的规模和资源情况,合理分配任务给不同的工人(Map Task)。
这三大天王各司其职,协同作战,共同完成数据读取的任务。
三、 自定义 InputFormat 的步骤:步步为营
现在,我们来一步步地学习如何自定义 InputFormat。
1. 定义 InputSplit:确定数据分割的策略
InputSplit
类是 Hadoop 中表示输入数据分片的基本单元。你需要根据你的数据源的特性,来决定如何分割数据。
- 如果是文本文件, 你可以按照行来分割,也可以按照文件大小来分割。
- 如果是 XML 文件, 你可以按照 XML 文档的结构来分割,比如以根节点为单位分割。
- 如果是数据库, 你可以按照表中的记录来分割,也可以按照分区来分割。
你需要创建一个自定义的 InputSplit
类,继承自 org.apache.hadoop.mapreduce.InputSplit
抽象类,并实现以下方法:
```java
public class MyInputSplit extends InputSplit implements Writable {
private String filePath;
private long start;
private long length;
public MyInputSplit() {}
public MyInputSplit(String filePath, long start, long length) {
this.filePath = filePath;
this.start = start;
this.length = length;
}
public String getFilePath() {
return filePath;
}
public long getStart() {
return start;
}
public long getLength() {
return length;
}
@Override
public long getLength() throws IOException, InterruptedException {
return length;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
// 返回存储该 InputSplit 的数据节点的 hostname
return new String[] {}; // 可以根据实际情况返回
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(filePath);
out.writeLong(start);
out.writeLong(length);
}
@Override
public void readFields(DataInput in) throws IOException {
this.filePath = in.readUTF();
this.start = in.readLong();
this.length = in.readLong();
}
}
```
**表格:`MyInputSplit` 类成员变量说明**
| 成员变量 | 数据类型 | 含义 |
| ----------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `filePath` | String | 文件路径。存储当前 InputSplit 对应的数据文件路径。 |
| `start` | long | 起始位置。表示当前 InputSplit 在文件中起始的字节偏移量。 |
| `length` | long | 长度。表示当前 InputSplit 包含的字节数。 |
2. 定义 RecordReader:读取数据并转换成键值对
RecordReader
类是 Hadoop 中用于读取 InputSplit
中的数据,并将数据转换成键值对的关键组件。你需要创建一个自定义的 RecordReader
类,继承自 org.apache.hadoop.mapreduce.RecordReader
抽象类,并实现以下方法:
```java
public class MyRecordReader extends RecordReader<LongWritable, Text> {
private MyInputSplit inputSplit;
private FileSystem fs;
private Path filePath;
private InputStreamReader reader;
private BufferedReader bufferedReader;
private LongWritable key = new LongWritable();
private Text value = new Text();
private long start;
private long end;
private long currentPosition;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.inputSplit = (MyInputSplit) split;
Configuration conf = context.getConfiguration();
filePath = new Path(inputSplit.getFilePath());
fs = filePath.getFileSystem(conf);
start = inputSplit.getStart();
end = start + inputSplit.getLength();
FSDataInputStream fileIn = fs.open(filePath);
fileIn.seek(start); // 定位到 InputSplit 的起始位置
reader = new InputStreamReader(fileIn, "UTF-8");
bufferedReader = new BufferedReader(reader);
this.currentPosition = start;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
String line = bufferedReader.readLine();
if (line != null && currentPosition < end) {
key.set(currentPosition); // 设置 Key 为当前行的起始位置
value.set(line); // 设置 Value 为当前行的内容
currentPosition += line.length() + 1; // 更新当前位置
return true;
} else {
return false;
}
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (float) (currentPosition - start) / (end - start));
}
}
@Override
public void close() throws IOException {
if (bufferedReader != null) {
bufferedReader.close();
}
if (reader != null) {
reader.close();
}
}
}
```
**表格:`MyRecordReader` 类方法说明**
| 方法名 | 返回值 | 参数 | 含义 |
| --------------- | -------- | ------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `initialize()` | void | `InputSplit split`, `TaskAttemptContext context` | 初始化 RecordReader。 接收一个 InputSplit 对象和一个 TaskAttemptContext 对象。在这里,你需要打开输入流,定位到 InputSplit 的起始位置,并进行必要的初始化操作。 |
| `nextKeyValue()` | boolean | 无 | 读取下一个 Key-Value Pair。 如果成功读取到下一个 Key-Value Pair,则返回 true,否则返回 false。 这是 RecordReader 的核心方法,负责从输入流中读取数据,并将数据转换成 Key-Value Pair。你需要根据你的数据源的特性,来实现这个方法。 |
| `getCurrentKey()` | KEYIN | 无 | 获取当前的 Key。 返回当前 Key-Value Pair 的 Key。 |
| `getCurrentValue()` | VALUEIN | 无 | 获取当前的 Value。 返回当前 Key-Value Pair 的 Value。 |
| `getProgress()` | float | 无 | 获取读取进度。 返回一个 0 到 1 之间的浮点数,表示读取进度。 |
| `close()` | void | 无 | 关闭 RecordReader。 关闭输入流,释放资源。 |
3. 定义 InputFormat:整合 InputSplit 和 RecordReader
InputFormat
类是 Hadoop 中用于描述输入数据格式的接口。你需要创建一个自定义的 InputFormat
类,继承自 org.apache.hadoop.mapreduce.InputFormat
抽象类,并实现以下方法:
```java
public class MyInputFormat extends InputFormat<LongWritable, Text> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path inputPath = new Path(conf.get("input.path"));
FileSystem fs = inputPath.getFileSystem(conf);
FileStatus fileStatus = fs.getFileStatus(inputPath);
long fileLength = fileStatus.getLen();
long blockSize = fileStatus.getBlockSize();
long splitSize = Math.max(1, Math.min(blockSize, fileLength)); // 确保 splitSize 至少为 1
List<InputSplit> splits = new ArrayList<>();
long start = 0;
while (start < fileLength) {
long length = Math.min(splitSize, fileLength - start);
MyInputSplit split = new MyInputSplit(inputPath.toString(), start, length);
splits.add(split);
start += length;
}
return splits;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new MyRecordReader();
}
}
```
**表格:`MyInputFormat` 类方法说明**
| 方法名 | 返回值 | 参数 | 含义 |
| --------------- | --------------------------- | ----------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `getSplits()` | `List<InputSplit>` | `JobContext context` | 生成 InputSplit 列表。 根据输入数据的大小和分片策略,生成 InputSplit 列表。 这是 InputFormat 的核心方法之一,负责将输入数据分割成多个小的逻辑块,每个块对应一个 Map Task。你需要根据你的数据源的特性,来实现这个方法。 |
| `createRecordReader()` | `RecordReader<KEYIN, VALUEIN>` | `InputSplit split`, `TaskAttemptContext context` | 创建 RecordReader。 根据 InputSplit 创建一个 RecordReader 对象,用于读取 InputSplit 中的数据。 |
4. 配置 MapReduce 作业:指定自定义 InputFormat
最后,你需要配置你的 MapReduce 作业,告诉 Hadoop 使用你自定义的 InputFormat。
```java
Configuration conf = new Configuration();
conf.set("input.path", "hdfs://your_hdfs_path/input.txt"); // 设置输入文件路径
Job job = Job.getInstance(conf, "MyJob");
job.setJarByClass(MyInputFormat.class);
job.setInputFormatClass(MyInputFormat.class); // 设置 InputFormat 类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(conf.get("input.path")));
FileOutputFormat.setOutputPath(job, new Path("hdfs://your_hdfs_path/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
```
四、 实战演练:驯服 XML 这只小怪兽
现在,我们来一个实战演练,以 XML 文件为例,演示如何自定义 InputFormat。
假设我们有一个 XML 文件,内容如下:
<books>
<book>
<title>The Lord of the Rings</title>
<author>J.R.R. Tolkien</author>
</book>
<book>
<title>The Hobbit</title>
<author>J.R.R. Tolkien</author>
</book>
</books>
我们的目标是读取 XML 文件,提取每个 book 的 title 和 author。
1. 定义 XMLInputSplit:以 book 节点为单位分割
我们可以创建一个 XMLInputSplit
类,以 <book>
节点为单位分割 XML 文件。
public class XMLInputSplit extends InputSplit implements Writable {
private String filePath;
private long start;
private long end;
public XMLInputSplit() {}
public XMLInputSplit(String filePath, long start, long end) {
this.filePath = filePath;
this.start = start;
this.end = end;
}
// ...省略 getter 方法和 write/readFields 方法
}
2. 定义 XMLRecordReader:读取 XML 节点并解析数据
我们可以创建一个 XMLRecordReader
类,读取 XML 节点,并使用 XML 解析器(比如 DOM 或 SAX)提取 title 和 author。
public class XMLRecordReader extends RecordReader<Text, Text> {
private XMLInputSplit inputSplit;
private FileSystem fs;
private Path filePath;
private InputStreamReader reader;
private BufferedReader bufferedReader;
private Text key = new Text();
private Text value = new Text();
private long start;
private long end;
private long currentPosition;
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// ...省略初始化代码
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
try {
StringBuilder xmlRecord = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
xmlRecord.append(line).append("n");
if (line.contains("</book>")) {
break; // 找到 </book> 结束标签
}
}
if (xmlRecord.length() > 0) {
String xmlString = xmlRecord.toString();
// 使用 XML 解析器解析 XML 数据
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document document = builder.parse(new ByteArrayInputStream(xmlString.getBytes()));
document.getDocumentElement().normalize();
// 提取 title 和 author
NodeList titleList = document.getElementsByTagName("title");
NodeList authorList = document.getElementsByTagName("author");
if (titleList.getLength() > 0 && authorList.getLength() > 0) {
String title = titleList.item(0).getTextContent();
String author = authorList.item(0).getTextContent();
key.set(title);
value.set(author);
processed = true;
return true;
}
}
return false;
} catch (ParserConfigurationException | SAXException e) {
throw new IOException("Error parsing XML", e);
}
}
return false;
}
// ...省略其他方法
}
3. 定义 XMLInputFormat:整合 XMLInputSplit 和 XMLRecordReader
我们可以创建一个 XMLInputFormat
类,整合 XMLInputSplit
和 XMLRecordReader
。
public class XMLInputFormat extends InputFormat<Text, Text> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
// ...省略 getSplits 方法
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new XMLRecordReader();
}
}
4. 配置 MapReduce 作业:指定自定义 XMLInputFormat
最后,我们需要配置 MapReduce 作业,指定使用自定义的 XMLInputFormat
。
Configuration conf = new Configuration();
conf.set("input.path", "hdfs://your_hdfs_path/books.xml");
Job job = Job.getInstance(conf, "XMLJob");
job.setJarByClass(XMLInputFormat.class);
job.setInputFormatClass(XMLInputFormat.class);
job.setMapperClass(XMLMapper.class);
job.setReducerClass(XMLReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(conf.get("input.path")));
FileOutputFormat.setOutputPath(job, new Path("hdfs://your_hdfs_path/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
五、 总结:数据驯兽师的自我修养
恭喜你,已经成功掌握了自定义 InputFormat 的基本技巧! 🎉 现在,你已经可以驯服一些简单的非标准数据源了。
但是,要成为一名真正的数据驯兽师,还需要不断学习,不断实践。以下是一些建议:
- 深入理解 Hadoop 的 InputFormat 机制: 了解 Hadoop 如何处理输入数据,如何进行数据切分,如何读取数据。
- 掌握各种数据格式的解析方法: 熟悉 XML、JSON 等数据格式的解析方法,可以使用 DOM、SAX、Jackson 等工具。
- 善于利用开源库: 有很多开源库可以帮助你处理非标准数据源,比如 Apache Tika 可以用于提取各种文件格式的文本内容。
- 多写代码,多调试: 实践是检验真理的唯一标准,只有多写代码,多调试,才能真正掌握自定义 InputFormat 的技巧。
- 保持一颗好奇心: 数据世界是不断变化的,要保持一颗好奇心,不断探索新的数据源,学习新的技术。
记住,数据驯兽师的最高境界不是征服数据,而是与数据和谐共处,从中发现价值,创造价值。 愿你也能成为一名优秀的数据驯兽师,在数据的海洋中自由翱翔! 🚀
感谢大家的观看,下次再见! 👋