Ⅰ 高高分!!C++,高手,MFCVC
针对你提的问题,我想,你应该还是有以下的误区 没有解决!
对于多线程
一个基本的概念就是同时对多个任务加以控制。许多程序设计问题都要求程序能够停下手 头的工作,改为处理其他一些问题,再返回主进程。可以通过多种途径达到这个目的。最开始的时候,那些 拥有机器低级知识的程序员编写一些“中断服务例程”,主进程的暂停是通过硬件级的中断实现的。尽管这 是一种有用的方法,但编出的程序很难移植,由此造成了另一类的代价高昂问题。 有些时候,中断对那些实时性很强的任务来说是很有必要的。但还存在其他许多问题,它们只要求将问题划 分进入独立运行的程序片断中,使整个程序能更迅速地响应用户的请求。在一个程序中,这些独立运行的片 断叫作“线程”(Thread),利用它编程的概念就叫作“多线程处理”。多线程处理一个常见的例子就是用 户界面。利用线程,用户可按下一个按钮,然后程序会立即作出响应,而不是让用户等待程序完成了当前任 务以后才开始响应。 最开始,线程只是用于分配单个处理器的处理时间的一种工具。但假如操作系统本身支持多个处理器,那么 每个线程都可分配给一个不同的处理器,真正进入“并行运算”状态。从程序设计语言的角度看,多线程操 作最有价值的特性之一就是程序员不必关心到底使用了多少个处理器。程序在逻辑意义上被分割为数个线 程;假如机器本身安装了多个处理器,那么程序会运行得更快,毋需作出任何特殊的调校。 根据前面的论述,大家可能感觉线程处理非常简单。但必须注意一个问题:共享资源!如果有多个线程同时 运行,而且它们试图访问相同的资源,就会遇到一个问题。举个例子来说,两个进程不能将信息同时发送给 一台打印机。为解决这个问题,对那些可共享的资源来说(比如打印机),它们在使用期间必须进入锁定状 态。所以一个线程可将资源锁定,在完成了它的任务后,再解开(释放)这个锁,使其他线程可以接着使用 同样的资源。 多线程是为了同步完成多项任务,不是为了提高运行效率,而是为了提高资源使用效率来提高系统的效率。线程是在同一时间需要完成多项任务的时候实现的。
对于STL,
从根本上说,STL是一些“容器”的集合,这些“容器”有list,vector,set,map等,STL也是算法和其他一些组件的集合。这里的“容器”和算法的集合指的是世界上很多聪明人很多年的杰作。STL的目的是标准化组件,这样就不用重新开发,可以使用现成的组件。STL现在是C++的一部分,因此不用额外安装什麽。它被内建在你的编译器之内。
Ⅱ 并行计算模型的C3模型
C3模型假定处理机不能同时发送和接收消息,它对超步的性能分析分为两部分:计算单元CU,依赖于本地计算量;通信单元COU,依赖与处理机发送和接收数据的多少、消息的延迟及通信引起的拥挤量。该模型考虑了两种路由(存储转发路由和虫蚀寻径路由)和两种发送/接收原语(阻塞和无阻塞)对COU的影响。 (1)用Cl和Cp来度量网络的拥挤对算法性能的影响;
(2)考虑了不同路由和不同发送或接收原语对通信的影响;
(3)不需要用户指定调度细节,就可以评估超步的时间复杂性;
(4)类似于H-PRAM模型的层次结构,C3模型给编程者提供了K级路由算法的思路,即系统被分为K级子系统,各级子系统的操作相互独立,用超步代替了H-PRAM中的Sub PRAM进行分割。 (1)Cl度量的前题假设为同一通信对中的2个处理机要分别位于网络对分后的不同子网络内;
(2)模型假设了网络带宽等于处理机带宽,这影响了正确描述可扩展系统;
(3)在K级算法中,处理机间顺序可以由多种排列,但C3模型不能区分不同排列的难易程度。
Ⅲ cpu调度的基本方式
我们知道,程序需要获得CPU的资源才能被调度和执行,那么当一个进程由于某种原因放弃CPU然后进入阻塞状态,下一个获得CPU资源去被调度执行的进程会是谁呢?下图中,进程1因为阻塞放弃CPU资源,此时,进程2刚IO操作结束,可以获得CPU资源去被调度,进程3的时间片轮转结束,也同样可以获得CPU资源去被调度,那么,此时的操作系统应该安排哪个进程去获得CPU资源呢?这就涉及到我们操作系统的CPU调度策略了。
根据生活中的例子,我们很容易想到以下两种策略CPU调度的直观想法:1.FIFO谁先进入,先调度谁,这是一种非常简单有效的方法,就好比我们去饭堂打饭,谁先到就给谁先打饭。但是这种策略会遇到一个问题:如果遇到一个很小的任务,但是它是最后进入的,那么必须得前面一大堆任务结束完后才能执行这个小小的任务,这样就感觉很不划算呀!因为我只是简简单单的一个小任务,但是从打开这个任务到结束这个任务要很久。这显然不符合我们的需求,因而我们会想到第2种策略,就是先调度小任务,后调度大任务。2.Priority很简单,就是任务短的优先执行,但是此时又有问题了,任务虽然短,但是它的执行时间不一定短,就好比在一个银行业务中,客户填写一个表,这是一个非常短的任务吧——就单单填个表,但是这个表很长很长,那么这个短任务它的执行时间就很长了,我们怎么知道这个短的任务将来会执行多长的时间呢?所以,这样的策略还是依然有问题。那么,面对诸多的场景,如何设计调度算法呢?首先,我们要明白我们的算法应该让什么更好呢?面对客户:银行调度算法的设计目标应该是用户满意;而面对进程:CPU调度的目标应该是进程满意。那怎么才能让进程满意呢?那就是时间了。进程希望尽早地结束任务,这就是周转时间(从任务到达到任务结束)要短,而且希望用户的操作能够尽快地被响应,这就是响应时间(从操作发生到响应)要短。而且系统内耗时间要少,吞吐量(任务的完成量)要大,系统需要把更多的时间用在任务的执行上,而不能老是去做无关紧要的事情,例如:频繁切换任务,切换栈,分配资源等事情。同时,系统还要去合理地调配任务。那么,CPU的调度策略如何做到合理呢?首先得明白系统中有以下的几种矛盾。1.吞吐量和响应时间之间有矛盾响应时间小=>切换次数多=>系统内耗大=>吞吐量小由于需要较短的响应时间,那么就得频繁地切换任务,这样系统的很多时间都花在切换任务上面了,系统的内耗大了,吞吐量就小了。2.前台任务和后台任务的关注点不同前台任务关注响应时间,后台任务关注周转时间。前台任务例如我们的word文档,我们打一个字,需要立马显示在文档中,这就是word文档这个任务关注的是响应时间;而后台任务中,例如我们的javac编译java代码,它的周转时间要小,即该任务从进入到结束所花的时间要小,即编译完成的时间要小。http://3.IO约束型任务和CPU约束型任务各有各的特点IO约束型任务就是使用CPU的时间较少,进行IO操作的时间较长,CPU约束型的任务就是使用CPU的时间较长。因此,要做到合理,需要折中、综合考虑以上的几种矛盾。由此,产生了一些CPU的调度算法,在下一节我们将重点讲述这些CPU调度算法。
关注小鲸融创,一起深度学习金融科技!
编辑于 2019-12-11 · 着作权归作者所有
赞同 1
评论
展开全部
Ⅳ 作业调度算法:编写并调度一个多道程序系统的作业调度模拟程序
回答:benben
新手
5月8日 08:33 作业调度的方法一般有:先来先服务算法,短程作业优先算法,响应比高者优先算法等等把
他就是把你要处理的总的作业,根据系统允许并行的工作得道数和系统的可利用的资源,调入内存的一种算法,如果要简单地说就是挑选最有者的过程!
Ⅳ 最佳调度问题:假设有n个任务由k个并行工作的机器来完成。完成任务i需要的时间为Ti。试设计一个算法找出完
Node{
int Path[n]; //节点对应的解空间树的路径,即到该节点为止的策略记录
int T[k]; //在本策略下的每台机器的运行时间
int Time; //本策略的总执行时间,为每台机器运行时间的最大值
int length; //本节点的深度,即当前处理的作业
}
Proc BestDispatch(int n,int k,int t[])
Node Boot,X,P,result; //Boot为根节点,result保存最优解
int f; //记录当前最优解的执行时间
f=n*max(t[]); //初始化f
Boot.T[n]={0};
Boot.Time=0;
Boot.Path[n]={0};
Boot.length=0; //初始化根节点
AddHeap(Boot); //根节点加入堆中,堆中元素按照Time值由小到大排序
While !Heap.empty() do
P=DeleteMinHeap(); //P为当前优先级最高的点
for i=1 to k do //扩展P的k个子节点
X=Newnode(P.Path[],P.T[],P.length+1);
X.Path[X.length]=i;
X.T[i]=X.T[i]+t[X.length];
X.Time=max(X.T[]);
if X.length==n then //X为叶节点
if X.Time<f then //X的执行时间小于已知最优解
f=X.Time; //将X设为最优解
result=X;
end{if}
else //X为中间节点
if X.Time<f then
AddHeap(X);
end{if}//X的当前执行时间小于已知最优解则加入堆中,否则剪去
end{if}
end{for}
end{while}
end{ BestDispatch }
Ⅵ 线程的并行是真并行吗
如果一台计算机有n个处理器,那么就有n个线程真正同时运行。单CPU计算机是伪并行,按照某种调度算法,多个线程轮流运行。
Ⅶ 并行处理计算机系统的结构原理
并行处理计算机的结构主要有流水线方式 、多功能部件方式 、阵列方式、多处理机方式和数据流方式。 将指令的执行过程分解为若干段,每段进行一部分处理。一条指令顺序流过所有段即执行完毕获得结果。当本条指令在本段已被处理完毕而进入下段时,下条指令即可流入本段。因此,在整个流水线上可以同时处理若干条指令。若各段的执行时间均为一个时钟节拍,则在正常情况下每拍可以输出一个结果,即完成一条指令。这就可加快处理机的速度。
程序中相邻指令的相关性会影响流水线处理机效率的发挥。例如,条件转移指令在上条指令执行完以前,有时不能确定后继指令;又如本条指令需要用上条指令的结果作为操作数等,都将中断流水线而使效率下降。 一台处理机由多个相同的处理部件和一个统一的控制器组成。这个控制器解释指令并传送操作命令至全部处理部件。各处理部件按照控制器的命令同时进行完全相同的操作。阵列处理机又可分为浮点阵列处理机和位片式阵列处理机两类。
ILLIAC-Ⅳ机属于浮点阵列处理机,包括64个完全相同的处理部件(PU)和一个公共的控制部件(CU)。每个处理部件包括一个能执行64位浮点操作的处理单元(PE)和一个容量为2k字的存储器(PM)。64个处理部件排列成8×8阵列。每个处理部件与四邻处理部件均有直接数据通路。 在直接耦合多处理机系统中,实现处理机与存储器、处理机与处理机之间连接的互连网络十分重要。互连网络有三种主要形式。
①总线结构:总线结构是多处理机系统中最为简单的网络结构。实际的多处理机系统的互连网络,往往是在总线结构的基础上发展起来的(图3)。
②交叉开关结构:交叉开关由纵横开关阵列组成,将横向的处理机与纵向的存储器模块连接起来(图4)。
③多端口存储器结构:把交叉开关结构中的各交叉点上的开关移到相应存储器的接口内部,形成多端口存储器结构。 数据流处理机是受到人们重视的高度并行的处理机。它虽保留了存储程序的做法,但在主要原理上已与诺依曼计算机结构不同。它不按程序计数器指出的指令顺序执行程序,只要所需操作数全部具备,指令即可被执行,亦即程序的执行不是由控制流驱动,而是由数据流驱动。
数据流处理机是以语言为基础的处理机。它使用数据流程序图作为用户语言与计算机结构之间的接口。数据流程序图用能动框表示 。每个能动框有多个域 ,分别存放操作码、操作数和目标地址。数据流程序以能动框集合的方式保存在能动存储器中。当某条指令可以执行时,相应的能动框地址便被送入指令排队器。读取部件则按地址从存储器中取出该能动框,形成操作包,送至操作部件进行处理,产生结果包。修改部件根据结果包的目标地址将结果数据送至规定的能动框作为操作数,并将具备操作数的指令的地址送至指令排队器。指令排队器中的指令均具备执行条件,因而只需增加部件数量或增强部件流水程度 , 就可以高速并行执行。此外,还可将多个指令处理单元连接成数据流多处理机系统,进一步提高处理能力。
并行算法和并行语言
提高并行处理效率的关键之一是并行算法。算法须适应计算机的结构。如果一种算法所表达出来的并行度与计算机的并行度基本一致,便能提高计算机的解题效率。
在向量计算机中,提高并行度的主要问题在于把可并行处理的操作数用向量表示。许多常用的数值计算法,如数列求和、矩阵乘、高斯消元、快速傅里叶变换等,已成功地在向量计算机上实现了并行处理。较为通行的并行语言基本上是FORTRAN语言的扩展。
在多处理机系统中,提高程序并行性的关键,是把任务分解成足够多的可同时操作的进程。在程序语言中,还须扩充能明确表达进程并发性的语句,以便程序运行时能为相应的控制机构提供控制和管理手段,其中包括并行任务的派生、通信和调度。ADA 语言为描述多处理机并行程序结构提供了必要的语句。为适应数据流计算机而出现的若干数据流语言如Id语言和VAL语言已经在试用。 其重要特点是把数组看成是值而不是目标。用数据流语言编写的程序能够自然地表达出最大的运算并行性。
Ⅷ Python进程之并行与并发的区别
并行 :
当系统有一个以上CPU时,则进程的操作有可能非并发。当一个CPU执行一个进程时,另一个CPU可以执行另一个进程,两个进程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行。
并发 :
当有多个进程在操作时,如果系统只有一个CPU,则它根本不可能真正同时执行一个以上的进程,它只能把CPU运行时间划分成若干个时间段,再将时间 段分配给各个进程执行,在一个时间段的进程代码运行时,其它进程处于挂起状,这种方式我们称之为并发。
区别:
并发和并行是即相似又有区别的两个概念,并行是指两个或者多个事件在同一时刻同时执行,而并发是指两个或多个事件通过时间片轮流被执行。在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单核CPU中,同一时刻仅能有一道程序执行,故微观上这些程序只能是分时地交替执行。倘若在计算机中有多个CPU,则这些可以并发执行的程序便可被分配到多个处理机上,实现并行执行,即利用每个处理机来处理一个可并发执行的程序,这样,多个程序便可以同时执行。
相关推荐:《Python视频教程》
进程的状态如下图所示
在了解其他概念之前,我们首先要了解进程的几个状态。在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。
(1)就绪(Ready)状态
当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。
(2)执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
(3)阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。
相关推荐:
一文带你读懂Python中的进程
Ⅸ 在多核CPU下,同一进程下的多个线程可以并行运行吗
CPU在某一个时间点上确实只能执行一个线程,但是多线程不是由于多核或者双核才叫多线程。
是由于,很多个线程在并行执行的时候,CPU根据一定的线程调度算法,频繁的进行线程切换,当正在执行的一个线程需要进行IO操作或者需要访问内存的时候,CPU完全可以放弃该线程,转而调度线程就绪队列上的其他线程,被放弃的线程则进入阻塞状态,IO操作或者访问内存操作结束之后,该线程可以进入线程就绪队列上。
人们通常意义上的多线程指的是,由于CPU根据一定的线程调度算法来切换线程,所以在一个时间段上,可以看做很多线程在并发执行。
其实还是在某一个时间点上只有一个线程在运行罢了。
Ⅹ hadoop并行过程,是由什么机制来进行控制
可以只用一行代码来运行MapRece作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:
1.JobClient 写代码,配置作业,提交作业。
2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。
3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Rece任务。
4.HDFS:保存作业数据、配置信息等,保存作业结果。
Map/Rece 作业总体执行流程:
代码编写 ----> 作业配置 ---->作业提交---->Map任务分配和执行---->处理中间结果----> Rece任务分配与执行----> 输出结果
而对于每个作业的执行,又包含:
输入准备---->任务执行---->输出结果
作业提交JobClient:
JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。
submitJob方法作业提交的过程:
1.向JobTracker请求一个新的JobId。
2.检查作业相关路径,如果路径不正确就会返回错误。
3.计算作业输入分片及其划分信息。
4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并
复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。
5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。
作业的初始化JobTracker:
JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheler)进行调度并对其初始化。Job初始化即创建一个作业对象。
当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。
初始化过程就是JobInProgress对象的initTasks方法进行初始化的。
初始化步骤:
1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。
2.创建并初始化map和rece任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.rece.tasks属性利用setNumReceTasks()方法设置rece task的数量,然后同map task创建方式。
3.最后就是创建两个初始化task,进行map和rece的初始化。
任务的分配JobTracker:
消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。
分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。
分配Map和Rece任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Rece任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Rece任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Rece任务分配很简单,jobtracker会简单的从待运行的rece任务列表中选取下一个来执行,不用考虑数据本地化。
任务的执行TaskTracker:
TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。
1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。
2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。
3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Rece就是ReceTaskRunner。
在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Rece任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。
对于单个Map,任务执行的简单流程是:
1.分配任务执行参数
2.在Child临时文件中添加map任务信息(Child是运行Map和Rece任务的主进程)
3.配置log文件夹,配置map任务的通信和输出参数
4.读取input split,生成RecordReader读取数据
5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。
6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。
Streaming和Pipes:
Streaming和Pipes都运行特殊的Map和Rece任务,目的是运行用户提供的可执行程序并与之通信。
Streaming:使用标准输入输出Streaming与进程进行通信。
Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。
进度和状态更新:
一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Rece进度、作业计数器值、状态消息。
状态消息与客户端的通信:
1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。
2.对于Rece:稍复杂,rece任务分三个阶段(每个阶段占1/3),复制、排序和Rece处理,若rece已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。
3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。
4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。
5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。
6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。
前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。
作业的完成:
当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。
如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。
jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。
失败
实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。
1.任务失败
子任务异常:如Map/Rece任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。
子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。
任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。
任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于rece任务,则由mapred.rece.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Rece任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.rece.failures.percent。
任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。
2.tasktracker失败
tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时rece任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。
即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。
3.jobtracker失败
老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。
作业调度:
早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。
Fair Scheler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheler属性配置(值为org.apache.hadoop.mapred.FairScheler)
Capacity Scheler:
集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。
shuffle和排序:
MapRece确保每个Recer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给recer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapRece的心脏。
整个shuffle的流程应该是这样:
map结果划分partition 排序sort 分割spill 合并同一划分 合并同一划分 合并结果排序 rece处理 输出
Map端:
写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。
写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。
合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。
在写磁盘前,线程首先根据数据最终要传递到的recer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。
内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。
如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给recer的数据更少。
写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给recer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。
recer获得文件分区的工作线程:recer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。
Rece端:
复制阶段:rece会定期向JobTracker获取map的输出位置,一旦拿到输出位置,rece就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。
合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。
Rece阶段:从合并的文件中顺序拿出一条数据进行rece函数处理,然后将结果输出到本地HDFS。
Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行rece任务。每个任务完成时间可能不同,但是只要有一个任务完成,rece任务就开始复制其输出,这就是rece任务的复制阶段( phase)。rece任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.rece.parallel.copies属性设置。
Recer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,recer中的一个线程定期询问jobtracker以便获知map输出位置。由于recer有可能失败,因此tasktracker并没有在第一个recer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。
如果map输出很小,则会被直接复制到rece tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)
或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。
随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。
排序阶段:复制阶段完成后,rece任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。
rece阶段:在最后rece阶段,会直接把排序好的文件输入rece函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。
细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入rece。