① hadoop集群中hadoop需要启动哪些进程
启动Hadoop
启动Hadoop集群需要启动HDFS集群和Map/Rece集群。
格式化一个新的分布式文件系统:
$ bin/hadoop namenode -format
在分配的NameNode上,运行下面的命令启动HDFS:
$ bin/start-dfs.sh
bin/start-dfs.sh脚本会参照NameNode上${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上启动DataNode守护进程。
在分配的JobTracker上,运行下面的命令启动Map/Rece:
$ bin/start-mapred.sh
bin/start-mapred.sh脚本会参照JobTracker上${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上启动TaskTracker守护进程。
-----来自官方文档
② 大数据:Hadoop入门
什么是大数据:
(1.)大数据是指在一定时间内无法用常规软件对其内容进行抓取,管理和处理的数据集合,简而言之就是数据量非常大,大到无法用常规工具进行处理,如关系型数据库,数据仓库等。这里“大”是一个什么量级呢?如在阿里巴巴每天处理数据达到20PB(即20971520GB).
2.大数据的特点:
(1.)体量巨大。按目前的发展趋势来看,大数据的体量已经到达PB级甚至EB级。
(2.)大数据的数据类型多样,以非结构化数据为主,如网络杂志,音频,视屏,图片,地理位置信息,交易数据,社交数据等。
(3.)价值密度低。有价值的数据仅占到总数据的一小部分。比如一段视屏中,仅有几秒的信息是有价值的。
(4.)产生和要求处理速度快。这是大数据区与传统数据挖掘最显着的特征。
3.除此之外还有其他处理系统可以处理大数据。
Hadoop (开源)
Spark(开源)
Storm(开源)
MongoDB(开源)
IBM PureDate(商用)
Oracle Exadata(商用)
SAP Hana(商用)
Teradata AsterData(商用)
EMC GreenPlum(商用)
HP Vertica(商用)
注:这里我们只介绍Hadoop。
二:Hadoop体系结构
Hadoop来源:
Hadoop源于Google在2003到2004年公布的关于GFS(Google File System),MapRece和BigTable的三篇论文,创始人Doug Cutting。Hadoop现在是Apache基金会顶级项目,“
Hadoop”一个虚构的名字。由Doug Cutting的孩子为其黄色玩具大象所命名。
Hadoop的核心:
(1.)HDFS和MapRece是Hadoop的两大核心。通过HDFS来实现对分布式储存的底层支持,达到高速并行读写与大容量的储存扩展。
(2.)通过MapRece实现对分布式任务进行处理程序支持,保证高速分区处理数据。
3.Hadoop子项目:
(1.)HDFS:分布式文件系统,整个Hadoop体系的基石。
(2.)MapRece/YARN:并行编程模型。YARN是第二代的MapRece框架,从Hadoop 0.23.01版本后,MapRece被重构,通常也称为MapRece V2,老MapRece也称为 MapRece V1。
(3.)Hive:建立在Hadoop上的数据仓库,提供类似SQL语音的查询方式,查询Hadoop中的数据,
(5.)HBase:全称Hadoop Database,Hadoop的分布式的,面向列的数据库,来源于Google的关于BigTable的论文,主要用于随机访问,实时读写的大数据。
(6.)ZooKeeper:是一个为分布式应用所设计的协调服务,主要为用户提供同步,配置管理,分组和命名等服务,减轻分布式应用程序所承担的协调任务。
还有其它特别多其它项目这里不做一一解释了。
三:安装Hadoop运行环境
用户创建:
(1.)创建Hadoop用户组,输入命令:
groupadd hadoop
(2.)创建hser用户,输入命令:
useradd –p hadoop hser
(3.)设置hser的密码,输入命令:
passwd hser
按提示输入两次密码
(4.)为hser用户添加权限,输入命令:
#修改权限
chmod 777 /etc/sudoers
#编辑sudoers
Gedit /etc/sudoers
#还原默认权限
chmod 440 /etc/sudoers
先修改sudoers 文件权限,并在文本编辑窗口中查找到行“root ALL=(ALL)”,紧跟后面更新加行“hser ALL=(ALL) ALL”,将hser添加到sudoers。添加完成后切记还原默认权限,否则系统将不允许使用sudo命令。
(5.)设置好后重启虚拟机,输入命令:
Sudo reboot
重启后切换到hser用户登录
安装JDK
(1.)下载jdk-7u67-linux-x64.rpm,并进入下载目录。
(2.)运行安装命令:
Sudo rpm –ivh jdk-7u67-linux-x64.rpm
完成后查看安装路径,输入命令:
Rpm –qa jdk –l
记住该路径,
(3.)配置环境变量,输入命令:
Sudo gedit /etc/profile
打开profile文件在文件最下面加入如下内容
export java_HOME=/usr/java/jdk.7.0.67
export CLASSPATH=$ JAVA_HOME/lib:$ CLASSPATH
export PATH=$ JAVA_HOME/bin:$PATH
保存后关闭文件,然后输入命令使环境变量生效:
Source /etc/profile
(4.)验证JDK,输入命令:
Java –version
若出现正确的版本则安装成功。
配置本机SSH免密码登录:
(1.)使用ssh-keygen 生成私钥与公钥文件,输入命令:
ssh-keygen –t rsa
(2.)私钥留在本机,公钥发给其它主机(现在是localhost)。输入命令:
ssh--id localhost
(3.)使用公钥来登录输入命令:
ssh localhost
配置其它主机SSH免密登录
(1.)克隆两次。在VMware左侧栏中选中虚拟机右击,在弹出的快捷键菜单中选中管理---克隆命令。在克隆类型时选中“创建完整克隆”,单击“下一步”,按钮直到完成。
(2.)分别启动并进入三台虚拟机,使用ifconfig查询个主机IP地址。
(3.)修改每台主机的hostname及hosts文件。
步骤1:修改hostname,分别在各主机中输入命令。
Sudo gedit /etc/sysconfig/network
步骤2:修改hosts文件:
sudo gedit /etc/hosts
步骤3:修改三台虚拟机的IP
第一台对应node1虚拟机的IP:192.168.1.130
第二台对应node2虚拟机的IP:192.168.1.131
第三台对应node3虚拟机的IP:192.168.1.132
(4.)由于已经在node1上生成过密钥对,所有现在只要在node1上输入命令:
ssh--id node2
ssh--id node3
这样就可以将node1的公钥发布到node2,node3。
(5.)测试SSH,在node1上输入命令:
ssh node2
#退出登录
exit
ssh node3
exit
四:Hadoop完全分布式安装
1. Hadoop有三种运行方式:
(1.)单机模式:无须配置,Hadoop被视为一个非分布式模式运行的独立Java进程
(2.)伪分布式:只有一个节点的集群,这个节点即是Master(主节点,主服务器)也是Slave(从节点,从服务器),可在此单节点上以不同的java进程模拟分布式中的各类节点
(3.)完全分布式:对于Hadoop,不同的系统会有不同的节点划分方式。
2.安装Hadoop
(1.)获取Hadoop压缩包hadoop-2.6.0.tar.gz,下载后可以使用VMWare Tools通过共享文件夹,或者使用Xftp工具传到node1。进入node1 将压缩包解压到/home/hser目录下,输入命令:
#进入HOME目录即:“/home/hser”
cd ~
tar –zxvf hadoop-2.6.0.tar.gz
(2.)重命名hadoop输入命令:
mv hadoop-2.6.0 hadoop
(3.)配置Hadoop环境变量,输入命令:
Sudo gedit /etc/profile
将以下脚本加到profile内:
#hadoop
export HADOOP_HOME=/home/hser/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
保存关闭,最后输入命令使配置生效
source /etc/profile
注:node2,和node3都要按照以上配置进行配置。
3.配置Hadoop
(1.)hadoop-env.sh文件用于指定JDK路径。输入命令:
[hser@node1 ~]$ cd ~/hadoop/etc/hadoop
[hser@node1 hadoop]$ gedit hadoop-env.sh
然后增加如下内容指定jDK路径。
export JAVA_HOME=/usr/java/jdk1.7.0_67
(2.)打开指定JDK路径,输入命令:
export JAVA_HOME=/usr/java/jdk1.7.0_67
(4.)core-site.xml:该文件是Hadoop全局配置,打开并在
③ 启动Hadoop后,执行jps命令发现某些进程启动不了怎么办
咨询记录 · 回答于2021-10-12
④ 把hadoop/bin添加到path下 以后随便什么地方都可以直接执行start-all,sh具体
设置临时环境变量(只在单个终端 内起作用)
1、首先设置HADOOPHOME环境变量,执行命令:export HADOOPHOME=/../hadoop/hadoop-2.5.2/bin (后面参数为Hadoop安装位置下的bin目录)---删除环境变量的命令为 unset HADOOPHOME
2、把HADOOPHOME添加到PATH变量中,执行命令:PATH=$PATH:$HADOOPHOME(当然可以省略第一步,直接把路径添加到PATH中也可以)至此可以在任何位置操作Hadoop的命令了
设置永久环境变量
通过修改 /etc/profile 文件来修改环境变量
修改位置如下:
# Path manipulation
if [ "$EUID" = "0" ]; then
pathmunge /sbin
pathmunge /usr/sbin
pathmunge /usr/local/sbin
else
pathmunge /usr/local/sbin after
pathmunge /usr/sbin after
pathmunge /sbin after
pathmunge /home/hadoop/hadoop-2.5.2/bin
fi
其中 pathmunge /home/hadoop/hadoop-2.5.2/bin为要添加的,
然后执行命令:source /etc/profile 使其生效
执行命令:echo $PATH 查看就会多出/home/hadoop/hadoop-2.5.2/bin这个路径,就可以在任意地址下执行Hadoop的命令了
⑤ hadoop中命令经常含有-fs,-dfs,fs和dfs有什么区别作用是什么
You can see definitions of the two commands (hadoop fs & hadoop dfs) in
可以看一下hadoop的源代码
$HADOOP_HOME/bin/hadoop
...elif [ "$COMMAND" = "datanode" ] ; then CLASS='org.apache.hadoop.hdfs.server.datanode.DataNode' HADOOP_OPTS="$HADOOP_OPTS $HADOOP_DATANODE_OPTS"elif [ "$COMMAND" = "fs" ] ; then CLASS=org.apache.hadoop.fs.FsShell HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"elif [ "$COMMAND" = "dfs" ] ; then CLASS=org.apache.hadoop.fs.FsShell HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"elif [ "$COMMAND" = "dfsadmin" ] ; then CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"...
So, they are exactly the same.
所以,发现两者是完全一样的功能。
谢谢
⑥ hadoop已经安装好的软件,怎么运行,调用
首先确认你安装好的你的集群已经安装好,hdfs、mr和yarn组件均安装成功;
使用服务启动命令看服务是否可以启动成功
使用命令测试集群hadoop fs 相关命令等
如果你要实际去用的话,就搞清楚你的需求,比如新建目录、上传文件、目录文件授权、文件下载等。MR的话就需要自己写一些简单的程序、yarn主要是做任务注册与资源分批的,也可以实际配一下。
如果你有额外安装hive和hbase的话,自己可以去新建表、然后做一些自主查询。
⑦ ambari部署hadoop集群以后怎么运行hadoop实例
1、我这里用的是三个虚拟机 centos 6.4 64位操作系统,一台做Master,两台做Slave
2、三台机器都安装jdk,我这里的版本是7_79,同样是64位的
3、修改hostname(很关键,因为只有生成的ssh-keygen会用到)在/etc/sysconfig/network文件里面修改,同时添加hosts文件,在/etc/hosts文件中修改,我这里三个节点的这两个文件分别
4、生成ssh无密钥远程访问文件,并且放到都要在三个节点里面统一:(提前将三台节点的防火墙都关闭,一定要注意)
这里统一都使用root用户来操作,分别在三个节点执行命令:ssh-keygen -t rsa,然后三下回车即可,就会在 /root/.ssh/目录下生成两个文件:
然后分别全部执行命令保存公共密钥:cp id_rsa.pub authorized_keys,然后 /root/.ssh/ 目录下就是这样:
然后分别在Slaver1和Slaver2的 /root/.ssh/
目录下,将密钥文件远程拷贝到Master中,使用命
令:scp authorized_keys [email protected]:/root/.ssh/s1_keys和
scp authorized_keys [email protected]:/root/.ssh/s2_keys。
然后Master节点的/root/.ssh/ 目录下就会是这样:
然后分别将两个拷贝过来的密钥文件追加到Master的 authorized_keys文件之后,用命令:cat s1_keys >> authorized_keys 和 cat s2_keys >> authorized_keys
最
后再将Master的
authorized_keys文件远程拷贝覆盖到两个Slaver去,命令
是:scp authorized_keys [email protected]:/root/.ssh/authorized_keys
和 scp authorized_keys [email protected]:/root/.ssh/authorized_keys
。
这样就弄完了,自己可以测试一下三个节点之间能否相互登录:ssh Master.busymonkey,退出用:exit。
5、然后就可以在Master节点上安装ambari了,这里我装的是最小系统,所以可能要先安装wget,用:yum install wget来安装。
然后获取 Ambari 的公共库文件(public repository)。登录到 Linux 主机并执行下面的命令(也可以自己手工下载):
wget http://public-repo-1.hortonworks.com/ambari/centos6/2.x/updates/2.0.1/ambari.repo
将下载的 ambari.repo 文件拷贝到 Linux 的系统目录/etc/yum.repos.d/。拷贝完后,我们需要获取该公共库的所有的源文件列表。依次执行以下命令。
yum clean all
yum list|grep ambari
如图 1 所示:
如果可以看到 Ambari 的对应版本的安装包列表,说明公共库已配置成功。然后就可以安装 Ambari 的 package 了。执行下面的命令安装 Ambari Server 到该机器。
yum install ambari-server
待安装完成后,便需要对 Ambari Server 做一个简单的配置。执行下面的命令。
amari-server setup
在这个交互式的设置中,采用默认配置即可。Ambari 会使用 Postgres 数据库,默认会安装并使用 Oracle 的 JDK。默认设置了
Ambari GUI 的登录用户为 admin/admin。并且指定 Ambari Server 的运行用户为 root。
[root@hadoop1 ~]# ambari-server setup
Using python /usr/bin/python2.6
Setup ambari-server
Checking SELinux...
SELinux status is 'enabled'
SELinux mode is 'enforcing'
Temporarily disabling SELinux
WARNING: SELinux is set to 'permissive' mode and temporarily disabled.
OK to continue [y/n] (y)? y
Customize user account for ambari-server daemon [y/n] (n)?
Adjusting ambari-server permissions and ownership...
Checking iptables...
WARNING: iptables is running. Confirm the necessary Ambari ports are accessible. Refer to the Ambari documentation for more details on ports.
OK to continue [y/n] (y)?
Checking JDK...
[1] Oracle JDK 1.7
[2] Oracle JDK 1.6
[3] - Custom JDK
==============================================================================
Enter choice (1): 1
To download the Oracle JDK and the Java Cryptography Extension (JCE) Policy Files you must accept the license terms found at http://www.oracle.com/technetwork/java/javase/terms/license/index.html and not accepting will cancel the Ambari Server setup and you must install the JDK and JCE files manually.
Do you accept the Oracle Binary Code License Agreement [y/n] (y)?
Downloading JDK from http://public-repo-1.hortonworks.com/ARTIFACTS/jdk-7u67-linux-x64.tar.gz to /var/lib/ambari-server/resources/jdk-7u67-linux-x64.tar.gz
只有大部分都是默认的,就可以了。
简单的 setup 配置完成后。就可以启动 Ambari 了。运行下面的命令。
ambari-server start
当成功启动 Ambari Server 之后,便可以从浏览器登录,默认的端口为 8080。以本文环境为例,在浏览器的地址栏输入
http://zwshen37.example.com:8080,登录密码为 admin/admin。登入 Ambari 之后的页面如下图。
⑧ 启动hadoop中have命令
Hadoop集群启动命令。
1、启动NameNode,DataNode。
2、启动JournalNode,JournalNode在hdfs-site.xml中指定editslog存储的位置,主备NameNode共享数据,方便同步。
3)、启动DFSZKFailoverController,HA会启用ZooKeeperFailoverController。
4、启动YARN守护进程ResourceManager,NodeManager。
⑨ 如何在Windows下面运行hadoop的MapRece程序
1. 首先登入hadoop 集群里面的一个节点, 创建一个java源文件, 偷懒起见, 基本盗用官方的word count (因为本文的目的是教会你如何快编写和运行一个MapRece程序, 而不是如何写好一个功能齐全的MapRece程序)
内容如下:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class myword {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void rece(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println('Usage: wordcount <in> <out>');
System.exit(2);
}
Job job = new Job(conf, 'word count');
job.setJarByClass(myword.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
与官方版本相比, 主要做了两处修改
1) 为了简单起见,去掉了开头的 package org.apache.hadoop.examples;
2) 将类名从 WordCount 改为 myword, 以体现是我们自己的工作成果 :)
2. 拿到hadoop 运行的class path, 主要为编译所用
运行命令
hadoop classpath
保存打出的结果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-maprece/lib/*:/usr/lib/gphd/hadoop-maprece/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:
3. 编译
运行命令
javac -classpath xxx ./myword.java
xxx部分就是上一步里面取到的class path
运行完此命令后, 当前目录下会生成一些.class 文件, 例如:
myword.class myword$IntSumRecer.class myword$TokenizerMapper.class
4. 将class文件打包成.jar文件
运行命令
jar -cvf myword.jar ./*.class
至此, 目标jar 文件成功生成
5. 准备一些文本文件, 上传到hdfs, 以做word count的input
例子:
随意创建一些文本文件, 保存到mapred_test 文件夹
运行命令
hadoop fs -put ./mapred_test/
确保此文件夹成功上传到hdfs 当前用户根目录下
6. 运行我们的程序
运行命令
hadoop jar ./myword.jar myword mapred_test output
顺利的话, 此命令会正常进行, 一个MapRece job 会开始工作, 输出的结果会保存在 hdfs 当前用户根目录下的output 文件夹里面。
至此大功告成!
如果还需要更多的功能, 我们可以修改前面的源文件以达到一个真正有用的MapRece job。
但是原理大同小异, 练手的话, 基本够了。
一个抛砖引玉的简单例子, 欢迎板砖。
⑩ 如何在hadoop2.5.2使用命令行编译打包运行自己的maprece程序
网上的 MapRece WordCount 教程对于如何编译 WordCount.Java 几乎是一笔带过… 而有写到的,大多又是 0.20 等旧版本版本的做法,即 javac -classpath /usr/local/Hadoop/hadoop-1.0.1/hadoop-core-1.0.1.jar WordCount.java,但较新的 2.X 版本中,已经没有 hadoop-core*.jar 这个文件,因此编辑和打包自己的 MapRece 程序与旧版本有所不同。
本文以 Hadoop 2.7.2 环境下的 WordCount 实例来介绍 2.x 版本中如何编辑自己的 MapRece 程序。
编译、打包 Hadoop MapRece 程序
我们将 Hadoop 的 classhpath 信息添加到 CLASSPATH 变量中,在 ~/.bashrc 中增加如下几行:
[html] view plain
export HADOOP_HOME=/usr/local/hadoop
export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH
别忘了执行 source ~/.bashrc 使变量生效,接着就可以通过 javac 命令编译 WordCount.java 了(使用的是 Hadoop 源码中的 WordCount.java,源码在文本最后面):javac WordCount.java
编译时会有警告,可以忽略。编译后可以看到生成了几个 .class 文件。
接着把 .class 文件打包成 jar,才能在 Hadoop 中运行:
[html] view plain
jar -cvf WordCount.jar ./WordCount*.class
开始运行:
[html] view plain
hadoop jar WordCount.jar WordCount input output//hdfs上的input文件夹,命令执行所在位置为WordCount.jar同一目录
因为程序中声明了
package ,所以在命令中也要 org.apache.hadoop.examples 写完整:
[html] view plain
hadoop jar WordCount.jar org.apache.hadoop.examples.WordCount input output
查看:
[html] view plain
hadoop fs -cat /output/part-r-00000
WordCount.java 源码
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void rece(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}