1、MR基本定义
参考百度百科定义,简要概括如下:
MapReduce是分布式的计算框架或者解决方案,大致有基本内容:
- 1)首先MapReduce重点是工作在集群的节点上,而非在单台服务器上做计算、做统计等
- 2)MapReduce把用户提交的任务以分布式放在多个节点上执行,自动划分计算数据和计算任务等,这些并行计算涉及到的系统底层的复杂细节交由MR框架负责处理。换句话:数据开发人员只需要定义“我要统计词频”的“任务”后,提交给MR框架即可,坐等计算结果,至于MR是如何从多台服务上找数据文件、计算统计、缓存中间计算结果、存储最终技术结果、CPU、内存、IO网络资源使用等,数据开发人员都无需关注,MR自己会处理。
2、HDFS和MR的关系
HDFS和MR共同组成Hadoop分布式系统体系结构的核心。HDFS在集群上实现了分布式文件系统,MR在集群上实现了分布式计算和任务处理。HDFS在MR任务处理过程中提供了文件操作和存储等支持,MR在HDFS的基础上实现了任务的分发、计算、跟踪等工作,并收集结果,二者相互作用,完成分布式集群的主要任务。
3、MR组件架构
MapReduce框架设计为4部分:Client、JobTracker、TaskTracker以及Map Task & Reduce Task。具体内容如下:
1)Client 客户端
Hadoop中,把用户提交的MapReduce程序成为“Job”(作业),每一个 Job 都会在用户端通过 Client 类将应用程序以及配置参数 Configuration 打包成 jar文件存储在 HDFS,也就是这个MR程序 jar包在集群中每个节点都存放在一份,而且是存放在hdfs文件系统上,并把jar包路径告诉 JobTracker ,由它创建每一个 Task(即 Map Task 和 Reduce Task) 将它们分发到各个 TaskTracker 服务中去执行。
2)JobTracker
JobTracke负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与job的健康状况,当然TaskTracker也会主动上报自己的情况,若JobTracker一旦发现某个TaskTracker失败,就将相应的任务转移到其他健康TaskTracker节点继续干活;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器Task Scheduler,而调度器会在TaskTracker所在节点资源出现空闲时,选择合适的任务工作交由这些空闲的TaskTracker来干活。这里说的调度器,就是我们后面要讨论的YARN,在hadoop 1,MR框架还需要运行资源调度和管理服务,在hadoop 2中,这个资源调度功能在MR框架中剥离出来,交由更专业的YARN框架统一处理。
3)TaskTracker
TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个TaskTracker 上的空闲slot 分配给Task 使用。slot 分为Map slot 和Reduce slot 两种,分别供Map Task 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。
4)Map Task & Reduce Task
Map Task、Reduce Task由Datanode节点上的TaskTracker 启动,HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce的处理单位是:split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。正因为屏蔽了物理文件层,在逻辑层,数据文件划分方法完全由数据开发者自行决定,这就是为何MR通过再次封装使用split来组织数据块的原因。这就像Linux LVM逻辑,多个物理卷PV组成一个大的卷组VG(或者物理卷池),逻辑卷LV在VG的上面,因为物理卷被逻辑化,因此可以由用户自行划分区或或者调整分区大小。
split 的多少决定了Map Task 的数目,每个split 只会交给一个Map Task 处理。
Split 和 Block的关系如下图所示:
4、MR运行流程
4.1 分区原理
1)、首先我们需要了解MR的分区partition原理,它是参考了一致性hash算法的概念:
Hash算法是为了保证数据均匀的分布,例如:有3个目录,编号分别为:0、1、2;现在有12个文件,如何把12个文件平均存放到3个目录下呢?按Hash算法的做法是,将12个文件从0开始编号,得到这样的一个数组:
[0,1,2,3,4,5,6,7,8,9,10,11],每个文件所存放目录号=文件序号 mod 3,任何数字对3取模,最终得到的结果都是0,1,2。
最后将取模结果为0的文件放入0号目录下,结果为1的文件放入1号目录下,结果为2的文件放入2号目录,12个文件最终均匀的分布到3个目录下:
0、3、6、9、12号的文件存放在0号目录下
1、4、7、10号的文件存放在1号目录下
2、5、8、11号的文件存放在2号目录下
MR的分区partition就是这样的过程,partition_num=hash(key)mod N2)MR为何做partition?
在进行MapReduce计算时,经常的需求是把最终的输出数据分到不同的文件中,例如:
按照年度(季度、月等)划分的话,需要把同一年份的数据放到一个文件中,因此不同年份对应不同的文件;
按性别业务划分,需要把同一性别的数据放到一个文件中,因此不同性别的数据放在不同的文件上。
从逆向来看:这些最终的输出是我们需要的数据,是由Reducer任务产生,而如果要得到最终输出的多个数据文件,意味着有同样数量的Reducer任务在跑;Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的Map阶段的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition,负责实现划分数据的类称作Partitioner。
4.2 MapReduce统计词频的MapReduce流程
该图统计词频流程主要分为以下几步。
第1步:假设一个文件有三行英文句子作为 MapReduce 的Input(输入),这里经过 Splitting 过程把文件分割为3块。分割后的3块数据就可以并行处理,每一块交给一个 map 线程处理,对应3个map task。
第2步:每个 map 线程中,以每个单词为key,以1作为词频数value,并建结果写到磁盘上,而非内存中(spark计算框架则会把此计算结果放在内存中,减少IO加速计算)。
第3步:每个 map 的输出要经过 shuffling(混洗),将相同的单词key放在同一分区,然后交给 reduce task处理。
第4步:reduce 接受到 shuffling 后的数据(reduce去读相应的map计算结果文件), 会将相同的单词进行合并,得到每个单词的词频数,最后将统计好的每个单词的词频数作为输出结果。
Map Reduce具体实现
==Map阶段==
Map 阶段是由一定数量的 Map Task 组成。这些 Map Task 可以同时运行,每个Task所使用的计算资源由slot决定,每个 Map Task又是由以下三个部分组成。
1)、对输入数据格式进行解析的一个组件:InputFormat。因为不同的数据可能存储的数据格式不一样,这就需要有一个 InputFormat 组件来解析这些数据的存放格式。默认情况下,它提供了一个 TextInputFormat 来读取输入数据格式(就像在Pandas中,使用read_csv()方法读取csv格式数据,用read_excel()读取Excel格式的数据)。TextInputFormat 会将文件的每一行解释成(key,value),key代表每行偏移量,value代表每行数据内容。 通常不需要自定义 InputFormat,因为 MapReduce 提供了很多种InputFormat的实现,可直接引用。
2)、对输入数据进一步处理:Mapper——这个 Mapper 计算指具体业务逻辑,因为不同的业务对数据有不同的处理。
3)、数据分组:Partitioner。Mapper 数据处理之后输出导reduce之前,输出key会经过 Partitioner 分组选择不同的reduce。默认的情况下,Partitioner 会对 map 输出的key进行hash取模,比如有3个Reduce Task,它就是模(mod)3,如果key的hash值为0对应第0个 Reduce Task,hash值1对应第1个Reduce Task,hash值2对应第2个Reduce Task。如上图第一组map的bear和第二组map的bear hash值一样,因此被shuffling到同一组,然后交给第一个 reduce 来处理。
==Reduce 阶段==
Reduce 阶段由一定数量的 Reduce Task 组成。这些 Reduce Task 可以同时运行,每个 Reduce Task又是由以下四个部分组成。
1)、数据远程(http方式)或本地拷贝。Reduce Task 通过http拷贝每个 map 处理的结果,从每个 map 中读取一部分结果。每个 Reduce Task 拷贝哪些数据,是由上面 Partitioner 决定的(具体由reducer向ApplicationMaster请求拷贝数据的安排信息)。(这里要求服务器直接的网卡连接至少是Gb级别以上的光纤链路,提供拷贝数据的传输吞吐量)
2)、数据按照key排序。Reduce Task 读取完数据后,要按照key进行排序。按照key排序后,相同的key被分到一组,交给同一个 Reduce Task 处理。
3)、数据处理:Reducer。以WordCount为例,相同的单词key分到一组,交个同一个Reducer处理,这样就实现了对每个单词的词频统计。
4)、数据输出格式:OutputFormat。Reducer 统计的结果,将按照 OutputFormat 格式输出。默认情况下的输出格式为 TextOutputFormat,以WordCount为例,这里的key为单词,value为词频数。
InputFormat、Mapper、Partitioner、Reducer和OutputFormat 都可以数据开发者自行实现,通常情况下,只需要实现 Mapper和Reducer。(数据开发者主要任务:集合数据业务需求,专注编写mapper和reducer二十年,该工作性质类似在关系型数据库中,数据分析岗根据市场运营提的数据报表需求写sql)
4.3 Shuffle(混洗)与reducer拉取数据细节
从4.2节内容可知,在map到reduce的过程,有一个环节为shuffling,那么shuffle是如何运作的呢?reducer又是如何从mapper节点去拷贝中间数据呢?按以下图示说明:
当map任务将数据output时,不仅仅是将结果输出到磁盘,它是将其写入内存缓冲区域,并进行一些预分类。
1)、Map阶段
首先map任务的output过程是一个环状的内存缓冲区,缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存的大小到达一定比例,默认为80%(可通过mapreduce.map.sort.spill.percent配置项修改),便开始将这些缓存数据spill溢出写入磁盘。
在写入磁盘之前,线程将会指定数据写入与reduce相应的patitions中,最终传送给reduce。在每个partition中,后台线程将会在内存中进行Key的排序,(如果代码中有combiner方法,则会在output时就进行sort排序,这里,如果只有少于3个写入磁盘的文件,combiner将会在outputfile前启动,如果只有一个或两个,那么将不会调用)
这里将map输出的结果进行压缩会大大减少磁盘IO与网络传输的开销(配置参数mapreduce.map .output.compress 设置为true,如果使用第三方压缩jar,可通过mapreduce.map.output.compress.codec进行设置)
随后这些paritions输出文件将会通过HTTP发送至reducers,传送的最大启动线程通过mapreduce.shuffle.max.threads进行配置,如果mapper跟reducer同在一台服务器上,则reducer无需通过网络,直接读取本地文件,效率更高。
2)、The Reduce Side
首先上面每个节点的map都将结果写入了本地磁盘中,现在reduce需要将map的结果通过集群拉取过来,这里要注意的是,需要等到所有map任务结束后,reduce才会对map的结果进行拷贝,由于reduce函数有少数几个复制线程,以至于它可以同时拉取多个map的输出结果。默认的为5个线程(可通过修改配置mapreduce.reduce.shuffle.parallelcopies来修改其个数)
这里有个问题,那么reducers怎么知道从哪些机器拉取数据呢? 当所有map的任务结束后,applicationMaster通过心跳机制(heartbeat mechanism),由它负责mapping的输出结果与机器host的信息。所以reducer会定时的通过一个线程访问ApplicationMaster请求map的输出结果。
Map的结果将会被拷贝到reduce task的JVM的内存中(内存大小可在mapreduce.reduce.shuffle.input.buffer.percent中设置)如果不够用,则会写入磁盘。当内存缓冲区的大小到达一定比例时(可通过mapreduce.reduce.shuffle.merge.percent设置)或map的输出结果文件过多时(可通过配置mapreduce.reduce.merge.inmen.threshold),将会触发合并(merged)随之写入磁盘。
这时要注意,所有的map结果这时都是被压缩过的,需要先在内存中进行解压缩,以便后续合并它们。(合并最终文件的数量可通过mapreduce.task.io.sort.factor进行配置) 最终reduce进行运算进行输出。
以上内容参考博文:shuffle和reducer拷贝mapper数据的细节