irpas技术客

MapReduce on Yarn(包含MapReduce执行详细流程)_CLAY157

大大的周 2247

1. MapReduce 1.1 MapReduce任务在Yarn中执行流程

MapReduce作为一种分布式计算框架,它在Yarn中执行的流程为:

(1)客户端提交job;

细节: ① org.apache.hadoop.mapreduce.Job类配置job; ② mapred-site.xml中mapreduce.framework.name配置为yarn时,yarn协议会激活; ③ job.waitForCompletion(true):将job提交到Yarn集群,并等待执行完成; ④ 客户端代码在客户端JVM中运行;

(2)job跟ResourceManager交互获取任务元数据:如application id;

细节: ①委托 OutputFormat检查输出目录是否已存在; ②计算InputSplits;

(3)job将相关资源拷贝至HDFS(jar包、配置文件、input splits),并且允许其他job获取;(4)job将application提交到ResourceManager;

job初始化细节: ①ResourceManager收到新application的请求; ②ResourceManager委派内部的Scheduler为ApplicationMaster提供container;(MR的ApplicationMaster为MRAppMaster) ③MRAppMaster跟Resource Manager协商后初始化object,并且运行job;

(5)ResourceManager选择一个NodeManager,并分配资源让其新建一个container运行MRAppMaster;

MRAppMaster初始化细节: ①创建内部bookkeeping对象,监控进程; ②恢复Input Splits,客户端创建splits,并拷贝到HDFS; ③创建tasks:每个split创建一个Map tasks,Reduce tasks根据mapreduce.job.reduces配置决定; ④决定怎么运行task:特别小的任务会直接在MRAppMaster的JVM中运行,这种任务称为uberized、uber;在NodeManager运行tasks; ⑤执行tasks;

(6)NodeManager分配给MRAppMaster container,MRAppMaster 执行并协调MapReduce job;(7)MRAppMaster从HDFS获取运行MapReduce作业的资源(步骤③的资源);(8)MRAppMaster跟ResourceManager协商,获取资源(ResourceManager会分配一个资源较为充足的NodeManager);(9)MRAppMaster要求分配到的NodeManager上执行Map和Reduce tasks;(10)NodeManager创建YarnChild容器,运行tasks;(11)YarnChild从HDFS获取需要运行Map或Reduce任务的job资源;(12)YarnChild运行Map或Reduce任务;

ps:YarnChild会向MRAppMaster实时汇报任务进展,MRAppMaster会向客户端和ResourceManager汇报任务运行进展;

整体流程如下图所示:

1.2 客户端提交Job流程 …… # MR代码中配置好,Job后会通过以下代码进行提交 /** * TODO 提交 job.waitForCompletion */ System.exit(job.waitForCompletion(true) ? 0 : 1); job.waitForCompletion的执行流程如图所示:

详细的每一步的流程已在图中体现,最后到了ApplicationClientProtocolPBClientImpl后会根据ResourceManager的逻辑选择合适的nodemanager进行任务的分配;

1.3 MapReduce任务流程 MapReduce任务: 这里以最简单的MR WordCount举例:

①输入:一个文本文件; ②分片(split): 把输入的文件切分成split,具体逻辑如下: ((double) bytesRemaining)/splitSize > SPLIT_SLOP

其中bytesRemaining是文件的字节长度,bytesRemaining要满足以上条件,每次增加splitSize个字节长度; splitSize是以下三个参数的中间值; mapreduce.input.fileinputformat.split.maxsize mapreduce.input.fileinputformat.split.minsize blockSize SPLIT_SLOP一般是1.1; 切分的split存储在集合中: List splits = new ArrayList(); 主要是以FileSplit的形式存储在集合中:其中有文件路径及文件名,开始到结束的字节数,hostname等多种信息; splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); ①、②步在MapReduce任务开始之前会完成;

③每个split对应一个MapTask; ④Map做好相应逻辑后,spill到环形缓冲区(环形缓冲区可以同时读和写,读:Map的结果,写:往磁盘写排序后的内容); ⑤环形缓冲区的内容,经过快速排序后溢出到本地磁盘中; ⑥磁盘中的文件经过几轮归并排序,Merge成一个大的文件; ⑦步骤⑥的结果会向MRAppMaster进行汇报,然后MRAppMaster会通知ReduceTask启动; ⑧Reduce阶段拉取几个MapTask⑥步中的partition值一样的大文件,然后又进行几轮归并排序,最后合成一个或多个大文件写出到output文件夹中; 详细过程已在图中描述:

核心难点在于环形缓冲区的设计,后续有时间单独写一篇文章进行讲解;

原创作品,禁止转载!!!感谢支持~


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #MapReduce #on #1 #MapReduce11