① 如何通過java程序提交yarn的MapRece計算任務
1、在程序中,我將文件讀入格式設定為WholeFileInputFormat,即不對文件進行切分。
2、為了控制rece的處理過程,map的輸出鍵的格式為組合鍵格式。與常規的<key,value>不同,這里變為了<textpair,value>,TextPair的格式為<key1,key2>。
3、為了適應組合鍵,重新設定了分組函數,即GroupComparator。分組規則為,只要TextPair中的key1相同(不要求key2相同),則數據被分配到一個rece容器中。這樣,當相同key1的數據進入rece容器後,key2起到了一個數據標識的作用
② 用Java寫MapRece,用python和R,哪種更適合從事數據行業,做數據...
必然python啊,不過R也很好。python更加靈活,但是R是這一方面的功能一點不弱。但是我感覺很多演算法拿python實現會更容易,而且python更好學,語法更簡潔。具體看個人。
③ 如何解決map /rece程序執行時卡住現象
一、首先要知道此前提轉載若在windows的Eclipse工程中直接啟動maprec程序,需要先把hadoop集群的配置目錄下的xml都拷貝到src目錄下,讓程序自動讀取集群的地址後去進行分布式運行(您也可以自己寫java代碼去設置job的configuration屬性)。若不拷貝,工程中bin目錄沒有完整的xml配置文件,則windows執行的maprece程序全部通過本機的jvm執行,作業名也是帶有「local"字眼的作業,如job_local2062122004_0001。這不是真正的分布式運行maprece程序。估計得研究org.apache.hadoop.conf.Configuration的源碼,反正xml配置文件會影響執行maprece使用的文件系統是本機的windows文件系統還是遠程的hdfs系統;還有影響執行maprece的mapper和recer的是本機的jvm還是集群裡面機器的jvm二、本文的結論第一點就是:windows上執行maprece,必須打jar包到所有slave節點才能正確分布式運行maprece程序。(我有個需求是要windows上觸發一個maprece分布式運行)第二點就是:linux上,只需拷貝jar文件到集群master上,執行命令hadoopjarPackage.jarMainClassName即可分布式運行maprece程序。第三點就是:推薦使用附一,實現了自動打jar包並上傳,分布式執行的maprece程序。附一、推薦使用此方法:實現了自動打jar包並上傳,分布式執行的maprece程序:請先參考博文五篇:Hadoop作業提交分析(一)~~(五)引用博文的附件中EJob.java到你的工程中,然後main中添加如下方法和代碼。publicstaticFilecreatePack()throwsIOException{FilejarFile=EJob.createTempJar("bin");ClassLoaderclassLoader=EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);returnjarFile;}在作業啟動代碼中使用打包:Jobjob=Job.getInstance(conf,"testAnaAction");添加:StringjarPath=createPack().getPath();job.setJar(jarPath);即可實現直接runasjavaapplication在windows跑分布式的maprece程序,不用手工上傳jar文件。附二、得出結論的測試過程(未有空看書,只能通過愚笨的測試方法得出結論了)一.直接通過windows上Eclipse右擊main程序的java文件,然後"runasapplication"或選擇hadoop插件"runonhadoop"來觸發執行MapRece程序的測試。1,如果不打jar包到進集群任意linux機器上,它報錯如下:[work]2012-06-2515:42:47,360-org.apache.hadoop.maprece.Job-10244[main]INFOorg.apache.hadoop.maprece.Job-map0%rece0%[work]2012-06-2515:42:52,223-org.apache.hadoop.maprece.Job-15107[main]INFOorg.apache.hadoop.maprece.Job-TaskId:attempt_1403517983686_0056_m_000000_0,Status:FAILEDError:java.lang.RuntimeException:java.lang.ClassNotFoundException:ClassbookCount.BookCount$BookCountMappernotfoundatorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)atorg.apache.hadoop.maprece.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)atorg.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:721)atorg.apache.hadoop.mapred.MapTask.run(MapTask.java:339)atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:415)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)atorg.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)Causedby:java.lang.ClassNotFoundException:ClassbookCount.BookCount$BookCountMappernotfoundatorg.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)atorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1718)8more#Error:後重復三次2012-06-2515:44:53,234-org.apache.hadoop.maprece.Job-37813[main]INFOorg.apache.hadoop.maprece.Job-map100%rece100%現象就是:報錯,無進度,無運行結果。2,拷貝jar包到「只是」集群master的$HADOOP_HOME/share/hadoop/maprece/目錄上,直接通過windows的eclipse"runasapplication"和通過hadoop插件"runonhadoop"來觸發執行,它報錯同上。現象就是:報錯,無進度,無運行結果。3,拷貝jar包到集群某些slave的$HADOOP_HOME/share/hadoop/maprece/目錄上,直接通過windows的eclipse"runasapplication"和通過hadoop插件"runonhadoop"來觸發執行和報錯:Error:java.lang.RuntimeException:java.lang.ClassNotFoundException:ClassbookCount.BookCount$BookCountMappernotfoundatorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)atorg.apache.hadoop.maprece.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)和報錯:Error:java.lang.RuntimeException:java.lang.ClassNotFoundException:ClassbookCount.BookCount$BookCountRecernotfound現象就是:有報錯,但仍然有進度,有運行結果。4,拷貝jar包到集群所有slave的$HADOOP_HOME/share/hadoop/maprece/目錄上,直接通過windows的eclipse"runasapplication"和通過hadoop插件"runonhadoop"來觸發執行:現象就是:無報錯,有進度,有運行結果。第一點結論就是:windows上執行maprece,必須打jar包到所有slave節點才能正確分布式運行maprece程序。二在Linux上的通過以下命令觸發MapRece程序的測試。hadoopjar$HADOOP_HOME/share/hadoop/maprece/bookCount.jarbookCount.BookCount1,只拷貝到master,在master上執行。現象就是:無報錯,有進度,有運行結果。2,拷貝隨便一個slave節點,在slave上執行。現象就是:無報錯,有進度,有運行結果。但某些節點上運行會報錯如下,且運行結果。:14/06/2516:44:02INFOmaprece.JobSubmitter:Cleaningupthestagingarea/tmp/hadoop-yarn/staging/hser/.staging/job_1403517983686_0071Exceptioninthread"main"java.lang.NoSuchFieldError:DEFAULT_MAPREDUCE_APPLICATION_CLASSPATHatorg.apache.hadoop.maprece.v2.util.MRApps.setMRFrameworkClasspath(MRApps.java:157)atorg.apache.hadoop.maprece.v2.util.MRApps.setClasspath(MRApps.java:198)atorg.apache.hadoop.mapred.YARNRunner.(YARNRunner.java:443)atorg.apache.hadoop.mapred.YARNRunner.submitJob(YARNRunner.java:283)atorg.apache.hadoop.maprece.JobSubmitter.submitJobInternal(JobSubmitter.java:415)atorg.apache.hadoop.maprece.Job$10.run(Job.java:1268)atorg.apache.hadoop.maprece.Job$10.run(Job.java:1265)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:415)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)atorg.apache.hadoop.maprece.Job.submit(Job.java:1265)atorg.apache.hadoop.maprece.Job.waitForCompletion(Job.java:1286)atcom.etrans.anaSpeed.AnaActionMr.run(AnaActionMr.java:207)atorg.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)atcom.etrans.anaSpeed.AnaActionMr.main(AnaActionMr.java:44)atsun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod)atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)atjava.lang.reflect.Method.invoke(Method.java:606)atorg.apache.hadoop.util.RunJar.main(RunJar.java:212)第二點結論就是:Linux上,只需拷貝jar文件到集群master上,執行命令hadoopjarPackage.jarMainClassName即可分布式運行maprece程序。
④ hadoop 2.2.0 maprece java.lang.NullPointerException
能再具體點嗎,感覺問題出在創建臨時目錄時,JobSubmissionFiles.getStagingDir,FilterFileSystem.mkdirs,RawLocalFileSystem.setPermission,可能是許可權問題,話說在如果可以的話最好不要在Eclipse下跑Maprece,打包到集群最好
⑤ java maprece 統計出現次數
以時間為key,訪問的url和訪問者組合到一起作為value。按key分組,然後每組統計訪問次數以及訪問人數
⑥ 求maprece(JAVA)讀取文件然後處理數據的代碼
package test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
public class Test {
public static void main(String[] args) {
Test t = new Test();
t.maprece("c:\\a.txt");
}
public void maprece(String filepath){
if(filepath==null||"".equals(filepath)){
System.out.println("文件名錯誤!");
return;
}
List<String> list = new ArrayList<String>();
File f = new File(filepath);
BufferedReader reader=null;
try {
reader = new BufferedReader(new FileReader(f));
String tempString = null;
// 一次讀入一行,直到讀入null為文件結束
while ((tempString = reader.readLine()) != null) {
list.add(tempString);
}
reader.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (Exception e1) {
}
}
}
for(int i=0,length=list.size();i<length;i+=3){
String name = list.get(i).split(":")[1];
String id = list.get(i+1).split(":")[1];
String address = list.get(i+2).split(":")[1];
System.out.println("name:"+name+",id:"+id+",address:"+address);
}
}
}
⑦ 如何通過Java程序提交yarn的maprece計算任務
由於項目需求,需要通過Java程序提交Yarn的MapRece的計算任務。與一般的通過Jar包提交MapRece任務不同,通過程序提交MapRece任務需要有點小變動,詳見以下代碼。
以下為MapRece主程序,有幾點需要提一下:
1、在程序中,我將文件讀入格式設定為WholeFileInputFormat,即不對文件進行切分。
2、為了控制rece的處理過程,map的輸出鍵的格式為組合鍵格式。與常規的<key,value>不同,這里變為了<TextPair,Value>,TextPair的格式為<key1,key2>。
3、為了適應組合鍵,重新設定了分組函數,即GroupComparator。分組規則為,只要TextPair中的key1相同(不要求key2相同),則數據被分配到一個rece容器中。這樣,當相同key1的數據進入rece容器後,key2起到了一個數據標識的作用。
⑧ 為什麼spark支持多種語言編程,而maprece只支持java
Java開發系統級別軟體的優勢,我覺得是不言而喻的,這應該選擇的首要原因。
加上Apache社區從來都是Java主打的。
而且從易用性和普及性考慮,一個開源軟體在10年前用scala開發真的是作。
⑨ 如何將java類對象作為maprece中map函數的輸入
1.首先介紹一下wordcount 早maprece框架中的 對應關系
大家都知道 maprece 分為 map 和rece 兩個部分,那麼在wordcount例子中,很顯然 對文件word 計數部分為map,對 word 數量累計部分為 rece;
大家都明白 map接受一個參數,經過map處理後,將處理結果作為rece的入參分發給rece,然後在rece中統計了word 的數量,最終輸出到輸出結果;
但是初看遇到的問題:
一、map的輸入參數是個 Text之類的 對象,並不是 file對象
二、rece中並沒有if-else之類的判斷語句 ,來說明 這個word 數量 加 一次,那個word 加一次。那麼這個判斷到底只是在 map中已經區分了 還是在rece的時候才判斷的
三、map過程到底做了什麼,rece過程到底做了什麼?為什麼它能夠做到多個map多個rece?
一、
1. 怎麼將 文件參數 傳遞 到 job中呢?
在 client 我們調用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
實際上 addInputPath 做了以下的事情(將文件路徑載入到了conf中)
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
我們再來看看 FileInputFormat 是做什麼用的, FileInputFormat 實現了 InputFormat 介面 ,這個介面是hadoop用來接收客戶端輸入參數的。所有的輸入格式都繼承於InputFormat,這是一個抽象類,其子類有專門用於讀取普通文件的FileInputFormat,用來讀取資料庫的DBInputFormat等等。
我們會看到 在 InputFormat 介面中 有getSplits方法,也就是說分片操作實際上實在 map之前 就已經做好了
List<InputSplit>getSplits(JobContext job)
Generate the list of files and make them into FileSplits.
具體實現參考 FileInputFormat getSplits 方法:
上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它們會被用來計算分片大小。可以通過設置mapred.min.split.size和mapred.max.split.size來設置。splits鏈表用來存儲計算得到的輸入分片,files則存儲作為由listStatus()獲取的輸入文件列表。然後對於每個輸入文件,判斷是否可以分割,通過computeSplitSize計算出分片大小splitSize,計算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保證在minSize和maxSize之間,且如果minSize<=blockSize<=maxSize,則設為blockSize。然後我們根據這個splitSize計算出每個文件的inputSplits集合,然後加入分片列表splits中。注意到我們生成InputSplit的時候按上面說的使用文件路徑,分片起始位置,分片大小和存放這個文件的hosts列表來創建。最後我們還設置了輸入文件數量:maprece.input.num.files。
二、計算出來的分片有時怎麼傳遞給 map呢 ?對於單詞數量如何累加?
我們使用了 就是InputFormat中的另一個方法createRecordReader() 這個方法:
RecordReader:
RecordReader是用來從一個輸入分片中讀取一個一個的K -V 對的抽象類,我們可以將其看作是在InputSplit上的迭代器。我們從API介面中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它獲取分片上的下一個K-V 對。
可以看到介面中有:
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
FileInputFormat<K,V>
Direct Known Subclasses:
CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat
對於 wordcount 測試用了 NLineInputFormat和 TextInputFormat 實現類
在 InputFormat 構建一個 RecordReader 出來,然後調用RecordReader initialize 的方法,初始化RecordReader 對象
那麼 到底 Map是怎麼調用 的呢? 通過前邊我們 已經將 文件分片了,並且將文件分片的內容存放到了RecordReader中,
下面繼續看看這些RecordReader是如何被MapRece框架使用的
終於 說道 Map了 ,我么如果要實現Map 那麼 一定要繼承 Mapper這個類
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context) throws IOException, InterruptedException
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }
protected void cleanup(Context context ) throws IOException, InterruptedException { }
public void run(Context context) throws IOException, InterruptedException { }
我們寫MapRece程序的時候,我們寫的mapper都要繼承這個Mapper.class,通常我們會重寫map()方法,map()每次接受一個K-V對,然後我們對這個K-V對進行處理,再分發出處理後的數據。我們也可能重寫setup()以對這個map task進行一些預處理,比如創建一個List之類的;我們也可能重寫cleanup()方法對做一些處理後的工作,當然我們也可能在cleanup()中寫出K-V對。舉個例子就是:InputSplit的數據是一些整數,然後我們要在mapper中算出它們的和。我們就可以在先設置個sum屬性,然後map()函數處理一個K-V對就是將其加到sum上,最後在cleanup()函數中調用context.write(key,value);
最後我們看看Mapper.class中的run()方法,它相當於map task的驅動,我們可以看到run()方法首先調用setup()進行初始操作,然後對每個context.nextKeyValue()獲取的K-V對,就調用map()函數進行處理,最後調用cleanup()做最後的處理。事實上,從context.nextKeyValue()就是使用了相應的RecordReader來獲取K-V對的。
我們看看Mapper.class中的Context類,它繼承與MapContext,使用了一個RecordReader進行構造。下面我們再看這個MapContext。
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
RecordReader 看來是在這里構造出來了, 那麼 是誰調用這個方法,將這個承載著關鍵數據信息的 RecordReader 傳過來了 ?
我們可以想像 這里 應該被框架調用的可能性比較大了,那麼maprece 框架是怎麼分別來調用map和rece呢?
還以為分析完map就完事了,才發現這里僅僅是做了maprece 框架調用前的一些准備工作,
還是繼續分析 下 maprece 框架調用吧:
1.在 job提交 任務之後 首先由jobtrack 分發任務,
在 任務分發完成之後 ,執行 task的時候,這時 調用了 maptask 中的 runNewMapper
在這個方法中調用了 MapContextImpl, 至此 這個map 和框架就可以聯系起來了。
⑩ hadoop和maprece是一種什麼關系
hadoop是依據maprece的原理,用Java語言實現的分布式處理機制。
Hadoop是一個能夠對大量數據進行分布式處理的軟體框架,實現了Google的MapRece編程模型和框架,能夠把應用程序分割成許多的小的工作單元,並把這些單元放到任何集群節點上執行。
MapRece是Hadoop中的一個數據運算核心模塊,MapRece通過JobClient生成任務運行文件,並在JobTracker進行調度指派TaskTracker完成任務。
(10)mapreceJava擴展閱讀
1、MapRece分布式計算框架原型:
MapRece分布式計算模型是由Google提出,主要用於搜索領域,解決海量數據的計算問題Apache對其做了開源實現,整合在hadoop中實現通用分布式數據計算。
MR由兩個階段組成:Map和Rece,用戶只需要實現map()和rece()兩個函數,即可實現分布式計算,非常簡單。大大簡化了分布式並發處理程序的開發。
Map階段就是進行分段處理。
Rece階段就是進行匯總處理。匯總之後還可以進行數據的一系列美化操作,然後再輸出。
2、MapRece組件介紹:
JobClient:用於把用戶的作業任務生成Job的運行包,並存放到HDFS中。
JobinProgress:把Job運行包分解成MapTask和ReceTask並存放於TaskTracker中。
JobTracker(Master):進行調度管理TaskTracker執行任務。
TaskTracker(Slave):執行分配下來的Map計算或Rece計算任務。