MapReduce 作业的输入分片(Input Split)机制

各位观众,各位朋友,大家好!今天咱们来聊聊一个听起来高大上,实则挺接地气的话题:MapReduce 作业的输入分片(Input Split)机制。如果你是数据领域的初学者,别担心,我会尽量用最幽默风趣的方式,把这个概念掰开了、揉碎了,喂到你嘴里。如果你是老司机,也欢迎来交流,看看咱们的理解是不是在同一频道上。

开场白:话说数据洪流与分而治之

想象一下,你面前堆着一座喜马拉雅山般高的文件,里面塞满了各种数据,你要从中找出所有“小明”同学的生日。如果让你一个人啃,估计啃到天荒地老也啃不完。这时候,聪明的你一定会想到:能不能找几个小伙伴,大家一人分一块,一起啃呢?

这,就是“分而治之”的思想,也是MapReduce的核心理念。而“输入分片(Input Split)”,就是把这座数据喜马拉雅山分成一块块、方便小伙伴们啃的小山头。

第一幕:什么是Input Split?

Input Split,中文可以翻译成“输入分片”或者“输入切片”,它是MapReduce框架中,数据输入的最小单元。注意,是最小单元!它定义了Map Task(小伙伴,也就是Mapper)要处理的数据范围。

你可以把它想象成:

  • 披萨饼:Input Split就是被切成一块块的披萨,每个Mapper负责吃一块。🍕
  • 书本:Input Split就是书中的一个章节,每个Mapper负责阅读一个章节。📖
  • 豆腐块:Input Split就是一大块豆腐被切成一个个小豆腐块,每个Mapper负责炒一个豆腐块。 (嗯…也许有点奇怪的比喻,但意思到了就行!) 😅

关键点: Input Split只是一个逻辑概念,它只是定义了数据的起始位置和长度,并没有实际存储数据。它更像是指向数据的指针或者索引。

第二幕:InputFormat闪亮登场!

那么,是谁负责切披萨?谁负责分章节?谁负责切豆腐呢?(好吧,我承认豆腐的比喻有点上瘾了…)

答案是:InputFormat! 它是MapReduce框架中,负责生成Input Split的组件。

InputFormat是一个抽象类,它定义了两个核心方法:

  1. getSplits(JobContext job): 这个方法负责将输入数据分割成多个Input Split。它会根据输入数据的大小、格式,以及用户配置的参数(比如分片大小)来决定如何划分。这个方法返回一个List<InputSplit>,包含了所有切好的披萨块的信息(起始位置、长度、所在文件等等)。
  2. createRecordReader(InputSplit split, TaskAttemptContext context): 这个方法负责创建一个RecordReader对象。RecordReader是真正读取Input Split中数据的组件,它将数据解析成一个个的<key, value>键值对,供Mapper进行处理。

常见的InputFormat:

InputFormat 功能
TextInputFormat 用于处理文本文件。默认情况下,它会将文本文件按行分割成Input Split,每行作为一个记录(Record)。Key是行的偏移量,Value是行的内容。
KeyValueTextInputFormat 用于处理键值对形式的文本文件。它会将每行文本按照分隔符(默认是制表符)分割成Key和Value。
SequenceFileInputFormat 用于处理SequenceFile格式的文件。SequenceFile是Hadoop中一种二进制文件格式,用于存储键值对。
NLineInputFormat 用于处理文本文件,但是可以指定每个Input Split包含多少行数据。比如,你可以设置每个Input Split包含1000行数据。
MultipleInputs 允许一个MapReduce作业处理多个不同格式的输入文件。你可以为每个输入文件指定不同的InputFormat。
DBInputFormat 用于从关系型数据库中读取数据。
CombineTextInputFormat 用于处理大量小文件。它会将多个小文件合并成一个Input Split,以减少Mapper的数量,提高作业的效率。(后面会详细讲这个)

第三幕:分片策略:一切为了效率!

InputFormat生成Input Split的策略,直接影响MapReduce作业的效率。一个好的分片策略应该尽量做到:

  • 均匀性: 每个Input Split的大小应该尽量接近,避免出现某些Mapper处理的数据量过大,而另一些Mapper早早完成任务的情况(也就是所谓的“数据倾斜”)。
  • 局部性: Input Split应该尽量包含位于同一HDFS数据块(Block)上的数据,这样Mapper就可以从本地读取数据,避免网络传输,提高效率。(这就是所谓的“数据本地性”)

更详细的分片策略:

  1. FileInputFormat的分片策略: (TextInputFormat, KeyValueTextInputFormat等都继承自它)

    • FileInputFormat会先将输入文件分割成多个逻辑分片,每个分片对应一个InputSplit。
    • 分片大小的计算: 分片大小由以下几个因素决定:
      • mapreduce.input.fileinputformat.split.minsize:最小分片大小
      • mapreduce.input.fileinputformat.split.maxsize:最大分片大小
      • dfs.blocksize:HDFS块大小
      • 计算公式: splitSize = max(minSize, min(maxSize, blockSize))。 也就是说,分片大小会取这三个值的中间值。
    • 分片边界的确定:
      • FileInputFormat会尽量保证分片边界是完整的行。也就是说,它不会将一行数据分割到两个不同的Input Split中。
      • 如果一个文件的大小小于splitSize,那么该文件会被作为一个单独的Input Split。
      • 如果一个文件的大小大于splitSize,那么FileInputFormat会将该文件分割成多个Input Split,每个Input Split的大小接近splitSize
  2. CombineTextInputFormat的分片策略:

    • CombineTextInputFormat专门用于处理大量小文件。
    • 它的核心思想是将多个小文件合并成一个Input Split,以减少Mapper的数量。
    • 工作流程:
      • CombineTextInputFormat会遍历所有输入文件,将它们的大小加起来。
      • 当累加的大小超过mapreduce.input.fileinputformat.split.maxsize时,它会创建一个新的Input Split,并将剩余的文件加入到下一个Input Split中。
      • 这样,就可以将多个小文件合并到一个Input Split中,减少Mapper的数量,提高作业的效率。

举个栗子🌰:

假设我们有一个文本文件data.txt,大小为200MB,HDFS块大小为128MB,mapreduce.input.fileinputformat.split.minsize设置为64MB,mapreduce.input.fileinputformat.split.maxsize设置为256MB。

那么,根据公式splitSize = max(minSize, min(maxSize, blockSize)),可以计算出splitSize = max(64MB, min(256MB, 128MB)) = 128MB

因此,data.txt会被分割成两个Input Split:

  • Input Split 1:起始位置0,长度128MB
  • Input Split 2:起始位置128MB,长度72MB

第四幕:RecordReader:数据的忠实搬运工

有了Input Split,我们还需要一个搬运工,把里面的数据搬出来,交给Mapper。这个搬运工就是RecordReader!

RecordReader负责从Input Split中读取数据,并将数据解析成一个个的<key, value>键值对。

不同的InputFormat会使用不同的RecordReader。比如,TextInputFormat使用LineRecordReader,它会将Input Split中的每一行文本作为一个记录,Key是行的偏移量,Value是行的内容。

RecordReader的核心方法:

  • initialize(InputSplit split, TaskAttemptContext context): 初始化RecordReader,传入Input Split和TaskAttemptContext。
  • nextKeyValue(): 读取下一个键值对。如果读取成功,返回true;如果已经读取到Input Split的末尾,返回false
  • getCurrentKey(): 返回当前键。
  • getCurrentValue(): 返回当前值。
  • getProgress(): 返回读取进度。
  • close(): 关闭RecordReader,释放资源。

举个栗子🌰:

假设我们使用TextInputFormat来处理文本文件,LineRecordReader会将Input Split中的每一行文本作为一个记录。

例如,Input Split的内容如下:

hello world
hadoop is awesome

那么,LineRecordReader会生成两个键值对:

  • <0, "hello world">
  • <12, "hadoop is awesome">

Key是行的偏移量,Value是行的内容。

第五幕:Input Split与数据本地性

数据本地性是MapReduce的一个重要优化策略。它的核心思想是:尽量让Mapper处理位于同一HDFS数据块(Block)上的数据,避免网络传输,提高效率。

Input Split的设计就考虑到了数据本地性。InputFormat在生成Input Split时,会尽量将位于同一HDFS数据块上的数据划分到同一个Input Split中。

这样,Mapper就可以从本地读取数据,而不需要通过网络从其他节点读取数据。

第六幕:自定义InputFormat和RecordReader

Hadoop提供了很多内置的InputFormat和RecordReader,但是有时候,我们需要处理一些特殊格式的数据,或者需要实现一些特殊的逻辑,这时候,我们就需要自定义InputFormat和RecordReader。

自定义InputFormat的步骤:

  1. 创建一个类,继承自InputFormat
  2. 实现getSplits()方法,用于生成Input Split。
  3. 实现createRecordReader()方法,用于创建RecordReader。

自定义RecordReader的步骤:

  1. 创建一个类,实现RecordReader接口。
  2. 实现initialize()方法,用于初始化RecordReader。
  3. 实现nextKeyValue()方法,用于读取下一个键值对。
  4. 实现getCurrentKey()方法,用于返回当前键。
  5. 实现getCurrentValue()方法,用于返回当前值。
  6. 实现getProgress()方法,用于返回读取进度。
  7. 实现close()方法,用于关闭RecordReader。

第七幕:总结与展望

今天,我们一起学习了MapReduce作业的输入分片(Input Split)机制。我们了解了Input Split的概念、InputFormat的作用、分片策略、RecordReader的工作原理,以及如何自定义InputFormat和RecordReader。

希望通过今天的学习,你对MapReduce的数据输入有了更深入的理解。理解了Input Split机制,你才能更好地优化MapReduce作业,提高作业的效率。

未来,随着数据量的不断增长,MapReduce的输入分片机制也会不断发展和完善。例如,一些新的InputFormat可以处理更复杂的数据格式,一些新的分片策略可以更好地利用数据本地性。

总之,Input Split机制是MapReduce框架中一个非常重要的组成部分。掌握Input Split机制,是成为一名优秀的Hadoop开发者的必备技能。

感谢大家的收听!咱们下期再见!👋

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注