跳至主要內容

MR

张启忻大约 9 分钟MRHadoop

核心功能

应用程序通常会通过提供map和reduce来实现 Mapper和Reducer接口,它们组成作业的核心。

Mapper

Mapper将输入键值对(key/value pair)映射到一组中间格式的键值对集合。

Reducer

Reducer将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。

Partitioner

Partitioner用于划分键值空间(key space)。

Reporter

Reporter是用于Map/Reduce应用程序报告进度,设定应用级别的状态消息, 更新Counters(计数器)的机制。

OutputCollector

OutputCollector是一个Map/Reduce框架提供的用于收集 Mapper或Reducer输出数据的通用机制 (包括中间输出结果和作业的输出结果)。

输入

InputFormat 为Map/Reduce作业描述输入的细节规范。

Map/Reduce框架根据作业的InputFormat来:

  1. 检查作业输入的有效性。
  2. 把输入文件切分成多个逻辑InputSplit实例, 并把每一实例分别分发给一个 Mapper。
  3. 提供RecordReader的实现,这个RecordReader从逻辑InputSplit中获得输入记录, 这些记录将由Mapper处理。

InputSplit

InputSplit 是一个单独的Mapper要处理的数据块。一般的InputSplit 是字节样式输入,然后由RecordReader处理并转化成记录样式。
FileSplit 是默认的InputSplit。 它把 map.input.file 设定为输入文件的路径,输入文件是逻辑分块文件。

RecordReader

RecordReader 从InputSlit读入<key, value>对。
一般的,RecordReader 把由InputSplit 提供的字节样式的输入文件,转化成由Mapper处理的记录样式的文件。 因此RecordReader负责处理记录的边界情况和把数据表示成keys/values对形式。

输出

OutputFormat 描述Map/Reduce作业的输出样式。

Map/Reduce框架根据作业的OutputFormat来:

  1. 检验作业的输出,例如检查输出路径是否已经存在。
  2. 提供一个RecordWriter的实现,用来输出作业结果。 输出文件保存在FileSystem上。

任务的 Side-Effect File

在一些应用程序中,子任务需要产生一些side-file,这些文件与作业实际输出结果的文件不同。

RecordWriter

RecordWriter 生成<key, value> 对到输出文件。
RecordWriter的实现把作业的输出结果写到 FileSystem。

其他工具或特性

Counters

Counters 是多个由Map/Reduce框架或者应用程序定义的全局计数器。 每一个Counter可以是任何一种 Enum类型。同一特定Enum类型的Counter可以汇集到一个组,其类型为Counters.Group。

应用程序可以定义任意(Enum类型)的Counters并且可以通过 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架会汇总这些全局counters。

DistributedCache

DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。

DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。

Tool

Tool 接口支持处理常用的Hadoop命令行选项。

Tool 是Map/Reduce工具或应用的标准。应用程序应只处理其定制参数, 要把标准命令行选项通过 ToolRunner.run(Tool, String[]) 委托给 GenericOptionsParser处理。

IsolationRunner

IsolationRunner 是帮助调试Map/Reduce程序的工具。

Profiling

Profiling是一个工具,它使用内置的java profiler工具进行分析获得(2-3个)map或reduce样例运行分析报告。

调试

Map/Reduce框架能够运行用户提供的用于调试的脚本程序。 当map/reduce任务失败时,用户可以通过运行脚本在任务日志(例如任务的标准输出、标准错误、系统日志以及作业配置文件)上做后续处理工作。用户提供的调试脚本程序的标准输出和标准错误会输出为诊断文件。如果需要的话这些输出结果也可以打印在用户界面上。

JobControl

JobControl是一个工具,它封装了一组Map/Reduce作业以及他们之间的依赖关系。

数据压缩

Hadoop Map/Reduce框架为应用程序的写入文件操作提供压缩工具,这些工具可以为map输出的中间数据和作业最终输出数据(例如reduce的输出)提供支持。

MR 工作机制

MapTask 工作机制

  1. 问题引出 MapTask 的并行度决定 map 阶段的任务处理并发度,进而影响到整个 job 的处理速度。那么,MapTask 并行任务是否越多越好呢?
  2. MapTask 并行度决定机制 一个 Job 的 map 阶段 MapTask 并行度(个数),由客户端提交 Job 时的切片个数决定。切片(逻辑上的切分)大小默认等于 128M,和 block 大小相等,原因是如果不按照 block 大小进行切分,可能会涉及到一些不同节点之间数据的传输。
    总结
  1. read 阶段 读取数据并行度决定机制生成 key-value
  2. map 阶段 将读取的 key-value 进行处理,生成新的 key-value
  3. collect 阶段 将 map 的数据写到环形缓冲区(分区)中
  4. spill 溢写阶段 环形缓冲区数据满 80% 后溢写磁盘,只不过溢写之前需要进行排序
  5. combine 阶段 合并小文件(而不是执行 Combiner 业务逻辑):归并排序,将一些多次产生的小文件进行合并,形成一个大文件 【注意】MapTask 的数量是由切片数决定的,虽然 MapTask 不能直接设置,但是我们可以通过设置切片个数去完成 MapTask 数量的指定

    详细步骤
  1. Read 阶段: MapTask 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。
  2. Map 阶段: 该节点主要是将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的 key/value。
  3. Collect 阶段: 在用户编写 map() 函数中,当数据处理完成后,一般会调用 OutputCollector.collect() 输出结果。在该函数内部,它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。
  4. Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
    溢写阶段详情:
    步骤1: 利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。 步骤2: 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。 步骤3: 将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前爱你内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
  5. Combiner阶段: 当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
    当所有数据处理完后,MapTask 会将所有临时文件合并生成一个大文件,并保存到文件 output/file.out 中,同时生成相应的索引文件 output/file.out.index。
    在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
    让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

ReduceTask 工作机制

总结

  1. copy 阶段: 将不同的 MapTask 计算出来的数据拷贝到指定的 ReduceTask,默认是放在内存中的。如果内存容量不足,那么写在磁盘中
  2. merge 阶段: 主要将从不同 MapTask 拷贝过来的数据文件进行一次合并,形成一个整体的大文件
  3. sort 阶段: 分组排序阶段。主要是用来保证 key 相同的逻辑判断。sort 阶段可以不存在
  4. reduce 阶段: 将一组相同的 key 执行一次业务逻辑即可 【注意】ReduceTask 的数量和 MapTask 的数量不一样,ReduceTask 数量可以手动指定,但是数量制定是有一定要求的。一般默认情况下,ReduceTask 的数量应该和 map,collect 阶段写出的分区数目对应。

    详细步骤
  1. Copy 阶段: ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  2. Merge 阶段: 在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
  3. Sort 阶段: 按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按照 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
  4. Reduce 阶段: reduce() 函数将计算结果写到 HDFS 上。 【注意】 从 Map 的 collect 阶段到 reduce 的 sort 阶段统称为 shuffle 阶段
    其中 Map 中的 collect、spill、combine 阶段称之为 Map 阶段的 shuffle Reduce 中的 copy、merge、sort 阶段称之为 Reduce 阶段的 shuffle

设置 ReduceTask

reducetask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由切片数决定不同,reducetask 数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);