各位观众,各位朋友,大家好!今天咱们来聊聊一个听起来高大上,实则挺接地气的话题:MapReduce 作业的输入分片(Input Split)机制。如果你是数据领域的初学者,别担心,我会尽量用最幽默风趣的方式,把这个概念掰开了、揉碎了,喂到你嘴里。如果你是老司机,也欢迎来交流,看看咱们的理解是不是在同一频道上。
开场白:话说数据洪流与分而治之
想象一下,你面前堆着一座喜马拉雅山般高的文件,里面塞满了各种数据,你要从中找出所有“小明”同学的生日。如果让你一个人啃,估计啃到天荒地老也啃不完。这时候,聪明的你一定会想到:能不能找几个小伙伴,大家一人分一块,一起啃呢?
这,就是“分而治之”的思想,也是MapReduce的核心理念。而“输入分片(Input Split)”,就是把这座数据喜马拉雅山分成一块块、方便小伙伴们啃的小山头。
第一幕:什么是Input Split?
Input Split,中文可以翻译成“输入分片”或者“输入切片”,它是MapReduce框架中,数据输入的最小单元。注意,是最小单元!它定义了Map Task(小伙伴,也就是Mapper)要处理的数据范围。
你可以把它想象成:
- 披萨饼:Input Split就是被切成一块块的披萨,每个Mapper负责吃一块。🍕
- 书本:Input Split就是书中的一个章节,每个Mapper负责阅读一个章节。📖
- 豆腐块:Input Split就是一大块豆腐被切成一个个小豆腐块,每个Mapper负责炒一个豆腐块。 (嗯…也许有点奇怪的比喻,但意思到了就行!) 😅
关键点: Input Split只是一个逻辑概念,它只是定义了数据的起始位置和长度,并没有实际存储数据。它更像是指向数据的指针或者索引。
第二幕:InputFormat闪亮登场!
那么,是谁负责切披萨?谁负责分章节?谁负责切豆腐呢?(好吧,我承认豆腐的比喻有点上瘾了…)
答案是:InputFormat! 它是MapReduce框架中,负责生成Input Split的组件。
InputFormat是一个抽象类,它定义了两个核心方法:
getSplits(JobContext job)
: 这个方法负责将输入数据分割成多个Input Split。它会根据输入数据的大小、格式,以及用户配置的参数(比如分片大小)来决定如何划分。这个方法返回一个List<InputSplit>
,包含了所有切好的披萨块的信息(起始位置、长度、所在文件等等)。createRecordReader(InputSplit split, TaskAttemptContext context)
: 这个方法负责创建一个RecordReader
对象。RecordReader
是真正读取Input Split中数据的组件,它将数据解析成一个个的<key, value>
键值对,供Mapper进行处理。
常见的InputFormat:
InputFormat | 功能 |
---|---|
TextInputFormat |
用于处理文本文件。默认情况下,它会将文本文件按行分割成Input Split,每行作为一个记录(Record)。Key是行的偏移量,Value是行的内容。 |
KeyValueTextInputFormat |
用于处理键值对形式的文本文件。它会将每行文本按照分隔符(默认是制表符)分割成Key和Value。 |
SequenceFileInputFormat |
用于处理SequenceFile格式的文件。SequenceFile是Hadoop中一种二进制文件格式,用于存储键值对。 |
NLineInputFormat |
用于处理文本文件,但是可以指定每个Input Split包含多少行数据。比如,你可以设置每个Input Split包含1000行数据。 |
MultipleInputs |
允许一个MapReduce作业处理多个不同格式的输入文件。你可以为每个输入文件指定不同的InputFormat。 |
DBInputFormat |
用于从关系型数据库中读取数据。 |
CombineTextInputFormat |
用于处理大量小文件。它会将多个小文件合并成一个Input Split,以减少Mapper的数量,提高作业的效率。(后面会详细讲这个) |
第三幕:分片策略:一切为了效率!
InputFormat生成Input Split的策略,直接影响MapReduce作业的效率。一个好的分片策略应该尽量做到:
- 均匀性: 每个Input Split的大小应该尽量接近,避免出现某些Mapper处理的数据量过大,而另一些Mapper早早完成任务的情况(也就是所谓的“数据倾斜”)。
- 局部性: Input Split应该尽量包含位于同一HDFS数据块(Block)上的数据,这样Mapper就可以从本地读取数据,避免网络传输,提高效率。(这就是所谓的“数据本地性”)
更详细的分片策略:
-
FileInputFormat的分片策略: (TextInputFormat, KeyValueTextInputFormat等都继承自它)
- FileInputFormat会先将输入文件分割成多个逻辑分片,每个分片对应一个InputSplit。
- 分片大小的计算: 分片大小由以下几个因素决定:
mapreduce.input.fileinputformat.split.minsize
:最小分片大小mapreduce.input.fileinputformat.split.maxsize
:最大分片大小dfs.blocksize
:HDFS块大小- 计算公式:
splitSize = max(minSize, min(maxSize, blockSize))
。 也就是说,分片大小会取这三个值的中间值。
- 分片边界的确定:
- FileInputFormat会尽量保证分片边界是完整的行。也就是说,它不会将一行数据分割到两个不同的Input Split中。
- 如果一个文件的大小小于
splitSize
,那么该文件会被作为一个单独的Input Split。 - 如果一个文件的大小大于
splitSize
,那么FileInputFormat会将该文件分割成多个Input Split,每个Input Split的大小接近splitSize
。
-
CombineTextInputFormat的分片策略:
- CombineTextInputFormat专门用于处理大量小文件。
- 它的核心思想是将多个小文件合并成一个Input Split,以减少Mapper的数量。
- 工作流程:
- CombineTextInputFormat会遍历所有输入文件,将它们的大小加起来。
- 当累加的大小超过
mapreduce.input.fileinputformat.split.maxsize
时,它会创建一个新的Input Split,并将剩余的文件加入到下一个Input Split中。 - 这样,就可以将多个小文件合并到一个Input Split中,减少Mapper的数量,提高作业的效率。
举个栗子🌰:
假设我们有一个文本文件data.txt
,大小为200MB,HDFS块大小为128MB,mapreduce.input.fileinputformat.split.minsize
设置为64MB,mapreduce.input.fileinputformat.split.maxsize
设置为256MB。
那么,根据公式splitSize = max(minSize, min(maxSize, blockSize))
,可以计算出splitSize = max(64MB, min(256MB, 128MB)) = 128MB
。
因此,data.txt
会被分割成两个Input Split:
- Input Split 1:起始位置0,长度128MB
- Input Split 2:起始位置128MB,长度72MB
第四幕:RecordReader:数据的忠实搬运工
有了Input Split,我们还需要一个搬运工,把里面的数据搬出来,交给Mapper。这个搬运工就是RecordReader!
RecordReader负责从Input Split中读取数据,并将数据解析成一个个的<key, value>
键值对。
不同的InputFormat会使用不同的RecordReader。比如,TextInputFormat
使用LineRecordReader
,它会将Input Split中的每一行文本作为一个记录,Key是行的偏移量,Value是行的内容。
RecordReader的核心方法:
initialize(InputSplit split, TaskAttemptContext context)
: 初始化RecordReader,传入Input Split和TaskAttemptContext。nextKeyValue()
: 读取下一个键值对。如果读取成功,返回true
;如果已经读取到Input Split的末尾,返回false
。getCurrentKey()
: 返回当前键。getCurrentValue()
: 返回当前值。getProgress()
: 返回读取进度。close()
: 关闭RecordReader,释放资源。
举个栗子🌰:
假设我们使用TextInputFormat
来处理文本文件,LineRecordReader
会将Input Split中的每一行文本作为一个记录。
例如,Input Split的内容如下:
hello world
hadoop is awesome
那么,LineRecordReader
会生成两个键值对:
<0, "hello world">
<12, "hadoop is awesome">
Key是行的偏移量,Value是行的内容。
第五幕:Input Split与数据本地性
数据本地性是MapReduce的一个重要优化策略。它的核心思想是:尽量让Mapper处理位于同一HDFS数据块(Block)上的数据,避免网络传输,提高效率。
Input Split的设计就考虑到了数据本地性。InputFormat在生成Input Split时,会尽量将位于同一HDFS数据块上的数据划分到同一个Input Split中。
这样,Mapper就可以从本地读取数据,而不需要通过网络从其他节点读取数据。
第六幕:自定义InputFormat和RecordReader
Hadoop提供了很多内置的InputFormat和RecordReader,但是有时候,我们需要处理一些特殊格式的数据,或者需要实现一些特殊的逻辑,这时候,我们就需要自定义InputFormat和RecordReader。
自定义InputFormat的步骤:
- 创建一个类,继承自
InputFormat
。 - 实现
getSplits()
方法,用于生成Input Split。 - 实现
createRecordReader()
方法,用于创建RecordReader。
自定义RecordReader的步骤:
- 创建一个类,实现
RecordReader
接口。 - 实现
initialize()
方法,用于初始化RecordReader。 - 实现
nextKeyValue()
方法,用于读取下一个键值对。 - 实现
getCurrentKey()
方法,用于返回当前键。 - 实现
getCurrentValue()
方法,用于返回当前值。 - 实现
getProgress()
方法,用于返回读取进度。 - 实现
close()
方法,用于关闭RecordReader。
第七幕:总结与展望
今天,我们一起学习了MapReduce作业的输入分片(Input Split)机制。我们了解了Input Split的概念、InputFormat的作用、分片策略、RecordReader的工作原理,以及如何自定义InputFormat和RecordReader。
希望通过今天的学习,你对MapReduce的数据输入有了更深入的理解。理解了Input Split机制,你才能更好地优化MapReduce作业,提高作业的效率。
未来,随着数据量的不断增长,MapReduce的输入分片机制也会不断发展和完善。例如,一些新的InputFormat可以处理更复杂的数据格式,一些新的分片策略可以更好地利用数据本地性。
总之,Input Split机制是MapReduce框架中一个非常重要的组成部分。掌握Input Split机制,是成为一名优秀的Hadoop开发者的必备技能。
感谢大家的收听!咱们下期再见!👋