① 如何通过java程序提交yarn的MapRece计算任务
1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。
2、为了控制rece的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不同,这里变为了<textpair,value>,TextPair的格式为<key1,key2>。
3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个rece容器中。这样,当相同key1的数据进入rece容器后,key2起到了一个数据标识的作用
② 用Java写MapRece,用python和R,哪种更适合从事数据行业,做数据...
必然python啊,不过R也很好。python更加灵活,但是R是这一方面的功能一点不弱。但是我感觉很多算法拿python实现会更容易,而且python更好学,语法更简洁。具体看个人。
③ 如何解决map /rece程序执行时卡住现象
一、首先要知道此前提转载若在windows的Eclipse工程中直接启动maprec程序,需要先把hadoop集群的配置目录下的xml都拷贝到src目录下,让程序自动读取集群的地址后去进行分布式运行(您也可以自己写java代码去设置job的configuration属性)。若不拷贝,工程中bin目录没有完整的xml配置文件,则windows执行的maprece程序全部通过本机的jvm执行,作业名也是带有“local"字眼的作业,如job_local2062122004_0001。这不是真正的分布式运行maprece程序。估计得研究org.apache.hadoop.conf.Configuration的源码,反正xml配置文件会影响执行maprece使用的文件系统是本机的windows文件系统还是远程的hdfs系统;还有影响执行maprece的mapper和recer的是本机的jvm还是集群里面机器的jvm二、本文的结论第一点就是:windows上执行maprece,必须打jar包到所有slave节点才能正确分布式运行maprece程序。(我有个需求是要windows上触发一个maprece分布式运行)第二点就是:linux上,只需拷贝jar文件到集群master上,执行命令hadoopjarPackage.jarMainClassName即可分布式运行maprece程序。第三点就是:推荐使用附一,实现了自动打jar包并上传,分布式执行的maprece程序。附一、推荐使用此方法:实现了自动打jar包并上传,分布式执行的maprece程序:请先参考博文五篇:Hadoop作业提交分析(一)~~(五)引用博文的附件中EJob.java到你的工程中,然后main中添加如下方法和代码。publicstaticFilecreatePack()throwsIOException{FilejarFile=EJob.createTempJar("bin");ClassLoaderclassLoader=EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);returnjarFile;}在作业启动代码中使用打包:Jobjob=Job.getInstance(conf,"testAnaAction");添加:StringjarPath=createPack().getPath();job.setJar(jarPath);即可实现直接runasjavaapplication在windows跑分布式的maprece程序,不用手工上传jar文件。附二、得出结论的测试过程(未有空看书,只能通过愚笨的测试方法得出结论了)一.直接通过windows上Eclipse右击main程序的java文件,然后"runasapplication"或选择hadoop插件"runonhadoop"来触发执行MapRece程序的测试。1,如果不打jar包到进集群任意linux机器上,它报错如下:[work]2012-06-2515:42:47,360-org.apache.hadoop.maprece.Job-10244[main]INFOorg.apache.hadoop.maprece.Job-map0%rece0%[work]2012-06-2515:42:52,223-org.apache.hadoop.maprece.Job-15107[main]INFOorg.apache.hadoop.maprece.Job-TaskId:attempt_1403517983686_0056_m_000000_0,Status:FAILEDError:java.lang.RuntimeException:java.lang.ClassNotFoundException:ClassbookCount.BookCount$BookCountMappernotfoundatorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)atorg.apache.hadoop.maprece.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)atorg.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:721)atorg.apache.hadoop.mapred.MapTask.run(MapTask.java:339)atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:415)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)atorg.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)Causedby:java.lang.ClassNotFoundException:ClassbookCount.BookCount$BookCountMappernotfoundatorg.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)atorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1718)8more#Error:后重复三次2012-06-2515:44:53,234-org.apache.hadoop.maprece.Job-37813[main]INFOorg.apache.hadoop.maprece.Job-map100%rece100%现象就是:报错,无进度,无运行结果。2,拷贝jar包到“只是”集群master的$HADOOP_HOME/share/hadoop/maprece/目录上,直接通过windows的eclipse"runasapplication"和通过hadoop插件"runonhadoop"来触发执行,它报错同上。现象就是:报错,无进度,无运行结果。3,拷贝jar包到集群某些slave的$HADOOP_HOME/share/hadoop/maprece/目录上,直接通过windows的eclipse"runasapplication"和通过hadoop插件"runonhadoop"来触发执行和报错:Error:java.lang.RuntimeException:java.lang.ClassNotFoundException:ClassbookCount.BookCount$BookCountMappernotfoundatorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)atorg.apache.hadoop.maprece.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)和报错:Error:java.lang.RuntimeException:java.lang.ClassNotFoundException:ClassbookCount.BookCount$BookCountRecernotfound现象就是:有报错,但仍然有进度,有运行结果。4,拷贝jar包到集群所有slave的$HADOOP_HOME/share/hadoop/maprece/目录上,直接通过windows的eclipse"runasapplication"和通过hadoop插件"runonhadoop"来触发执行:现象就是:无报错,有进度,有运行结果。第一点结论就是:windows上执行maprece,必须打jar包到所有slave节点才能正确分布式运行maprece程序。二在Linux上的通过以下命令触发MapRece程序的测试。hadoopjar$HADOOP_HOME/share/hadoop/maprece/bookCount.jarbookCount.BookCount1,只拷贝到master,在master上执行。现象就是:无报错,有进度,有运行结果。2,拷贝随便一个slave节点,在slave上执行。现象就是:无报错,有进度,有运行结果。但某些节点上运行会报错如下,且运行结果。:14/06/2516:44:02INFOmaprece.JobSubmitter:Cleaningupthestagingarea/tmp/hadoop-yarn/staging/hser/.staging/job_1403517983686_0071Exceptioninthread"main"java.lang.NoSuchFieldError:DEFAULT_MAPREDUCE_APPLICATION_CLASSPATHatorg.apache.hadoop.maprece.v2.util.MRApps.setMRFrameworkClasspath(MRApps.java:157)atorg.apache.hadoop.maprece.v2.util.MRApps.setClasspath(MRApps.java:198)atorg.apache.hadoop.mapred.YARNRunner.(YARNRunner.java:443)atorg.apache.hadoop.mapred.YARNRunner.submitJob(YARNRunner.java:283)atorg.apache.hadoop.maprece.JobSubmitter.submitJobInternal(JobSubmitter.java:415)atorg.apache.hadoop.maprece.Job$10.run(Job.java:1268)atorg.apache.hadoop.maprece.Job$10.run(Job.java:1265)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:415)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)atorg.apache.hadoop.maprece.Job.submit(Job.java:1265)atorg.apache.hadoop.maprece.Job.waitForCompletion(Job.java:1286)atcom.etrans.anaSpeed.AnaActionMr.run(AnaActionMr.java:207)atorg.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)atcom.etrans.anaSpeed.AnaActionMr.main(AnaActionMr.java:44)atsun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod)atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)atjava.lang.reflect.Method.invoke(Method.java:606)atorg.apache.hadoop.util.RunJar.main(RunJar.java:212)第二点结论就是:Linux上,只需拷贝jar文件到集群master上,执行命令hadoopjarPackage.jarMainClassName即可分布式运行maprece程序。
④ hadoop 2.2.0 maprece java.lang.NullPointerException
能再具体点吗,感觉问题出在创建临时目录时,JobSubmissionFiles.getStagingDir,FilterFileSystem.mkdirs,RawLocalFileSystem.setPermission,可能是权限问题,话说在如果可以的话最好不要在Eclipse下跑Maprece,打包到集群最好
⑤ java maprece 统计出现次数
以时间为key,访问的url和访问者组合到一起作为value。按key分组,然后每组统计访问次数以及访问人数
⑥ 求maprece(JAVA)读取文件然后处理数据的代码
package test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
public class Test {
public static void main(String[] args) {
Test t = new Test();
t.maprece("c:\\a.txt");
}
public void maprece(String filepath){
if(filepath==null||"".equals(filepath)){
System.out.println("文件名错误!");
return;
}
List<String> list = new ArrayList<String>();
File f = new File(filepath);
BufferedReader reader=null;
try {
reader = new BufferedReader(new FileReader(f));
String tempString = null;
// 一次读入一行,直到读入null为文件结束
while ((tempString = reader.readLine()) != null) {
list.add(tempString);
}
reader.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (Exception e1) {
}
}
}
for(int i=0,length=list.size();i<length;i+=3){
String name = list.get(i).split(":")[1];
String id = list.get(i+1).split(":")[1];
String address = list.get(i+2).split(":")[1];
System.out.println("name:"+name+",id:"+id+",address:"+address);
}
}
}
⑦ 如何通过Java程序提交yarn的maprece计算任务
由于项目需求,需要通过Java程序提交Yarn的MapRece的计算任务。与一般的通过Jar包提交MapRece任务不同,通过程序提交MapRece任务需要有点小变动,详见以下代码。
以下为MapRece主程序,有几点需要提一下:
1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。
2、为了控制rece的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不同,这里变为了<TextPair,Value>,TextPair的格式为<key1,key2>。
3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个rece容器中。这样,当相同key1的数据进入rece容器后,key2起到了一个数据标识的作用。
⑧ 为什么spark支持多种语言编程,而maprece只支持java
Java开发系统级别软件的优势,我觉得是不言而喻的,这应该选择的首要原因。
加上Apache社区从来都是Java主打的。
而且从易用性和普及性考虑,一个开源软件在10年前用scala开发真的是作。
⑨ 如何将java类对象作为maprece中map函数的输入
1.首先介绍一下wordcount 早maprece框架中的 对应关系
大家都知道 maprece 分为 map 和rece 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 rece;
大家都明白 map接受一个参数,经过map处理后,将处理结果作为rece的入参分发给rece,然后在rece中统计了word 的数量,最终输出到输出结果;
但是初看遇到的问题:
一、map的输入参数是个 Text之类的 对象,并不是 file对象
二、rece中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在rece的时候才判断的
三、map过程到底做了什么,rece过程到底做了什么?为什么它能够做到多个map多个rece?
一、
1. 怎么将 文件参数 传递 到 job中呢?
在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。
我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了
List<InputSplit>getSplits(JobContext job)
Generate the list of files and make them into FileSplits.
具体实现参考 FileInputFormat getSplits 方法:
上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:maprece.input.num.files。
二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?
我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:
RecordReader:
RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。
可以看到接口中有:
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
FileInputFormat<K,V>
Direct Known Subclasses:
CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat
对于 wordcount 测试用了 NLineInputFormat和 TextInputFormat 实现类
在 InputFormat 构建一个 RecordReader 出来,然后调用RecordReader initialize 的方法,初始化RecordReader 对象
那么 到底 Map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,
下面继续看看这些RecordReader是如何被MapRece框架使用的
终于 说道 Map了 ,我么如果要实现Map 那么 一定要继承 Mapper这个类
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context) throws IOException, InterruptedException
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }
protected void cleanup(Context context ) throws IOException, InterruptedException { }
public void run(Context context) throws IOException, InterruptedException { }
我们写MapRece程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);
最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。
我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
RecordReader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?
我们可以想象 这里 应该被框架调用的可能性比较大了,那么maprece 框架是怎么分别来调用map和rece呢?
还以为分析完map就完事了,才发现这里仅仅是做了maprece 框架调用前的一些准备工作,
还是继续分析 下 maprece 框架调用吧:
1.在 job提交 任务之后 首先由jobtrack 分发任务,
在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runNewMapper
在这个方法中调用了 MapContextImpl, 至此 这个map 和框架就可以联系起来了。
⑩ hadoop和maprece是一种什么关系
hadoop是依据maprece的原理,用Java语言实现的分布式处理机制。
Hadoop是一个能够对大量数据进行分布式处理的软件框架,实现了Google的MapRece编程模型和框架,能够把应用程序分割成许多的小的工作单元,并把这些单元放到任何集群节点上执行。
MapRece是Hadoop中的一个数据运算核心模块,MapRece通过JobClient生成任务运行文件,并在JobTracker进行调度指派TaskTracker完成任务。
(10)mapreceJava扩展阅读
1、MapRece分布式计算框架原型:
MapRece分布式计算模型是由Google提出,主要用于搜索领域,解决海量数据的计算问题Apache对其做了开源实现,整合在hadoop中实现通用分布式数据计算。
MR由两个阶段组成:Map和Rece,用户只需要实现map()和rece()两个函数,即可实现分布式计算,非常简单。大大简化了分布式并发处理程序的开发。
Map阶段就是进行分段处理。
Rece阶段就是进行汇总处理。汇总之后还可以进行数据的一系列美化操作,然后再输出。
2、MapRece组件介绍:
JobClient:用于把用户的作业任务生成Job的运行包,并存放到HDFS中。
JobinProgress:把Job运行包分解成MapTask和ReceTask并存放于TaskTracker中。
JobTracker(Master):进行调度管理TaskTracker执行任务。
TaskTracker(Slave):执行分配下来的Map计算或Rece计算任务。