Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka是一个分布式的、可分区的(partitioned)、基于备份的(replicated)和commit-log存储的服务.。它提供了类似于messaging system的特性,但是在设计实现上完全不同)。kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:
(1)、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
(2)、高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
(3)、支持通过kafka服务器和消费机集群来分区消息。
(4)、支持Hadoop并行数据加载。
一、用Kafka里面自带的脚本进行编译
下载好了Kafka源码,里面自带了一个gradlew的脚本,我们可以利用这个编译Kafka源码:
1 # wget http://mirror.bit.e.cn/apache/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
运行上面的命令进行编译将会出现以下的异常信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
这是一个bug(https://issues.apache.org/jira/browse/KAFKA-1297),可以用下面的命令进行编译
1 ./gradlew releaseTarGzAll -x signArchives
这时候将会编译成功(在编译的过程中将会出现很多的)。在编译的过程中,我们也可以指定对应的Scala版本进行编译:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
编译完之后将会在core/build/distributions/里面生成kafka_2.10-0.8.1.1.tgz文件,这个和从网上下载的一样,可以直接用。
二、利用sbt进行编译
我们同样可以用sbt来编译Kafka,步骤如下:
01 # git clone https://git-wip-us.apache.org/repos/asf/kafka.git
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading http://repo1.maven.org/maven2/ant/ant/1.6.5/ant-1.6.5.jar ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
对于Kafka 0.8及以上版本还需要运行以下的命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove plicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
当然,我们也可以在sbt里面指定scala的版本:
01 <!--
02 User: 过往记忆
03 Date: 14-6-18
04 Time: 20:20
05 bolg: http://www.iteblog.com
06 本文地址:http://www.iteblog.com/archives/1044
07 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08 过往记忆博客微信公共帐号:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"
② kafka消费者java版本读取不到消息怎么办
可以连接到一个网络服务器并且能够从这个服务器下载指定的URL,程序中直接使用HTTP协议。程序将定义一个输出流,下载的URL的内容将来被写入这个流,通过socket来获得输入和输出流:viewsourceprint?01importjava.io.*;02importjava.net.*;03publicclassHttpClient{04publicstaticvoidmain(String[]args){05try{06//Demo参数:07if((args.length!=1)&&(args.length!=2))("Wrongnumberofargs");09//定义输出流,下载的URL内容被写入这个流10OutputStreamto_file;11if(args.length==2)to_file=newFileOutputStream(args[1]);12elseto_file=System.out;13//使用URL类来把用户指定的URL解析成几个部分14URLurl=newURL(args[0]);15Stringprotocol=url.getProtocol();16if(!protocol.equals("http"))//检验是否满足支持的协议("Mustuse'http:'protocol");18Stringhost=url.getHost();19intport=url.getPort();20if(port==-1)port=80;//如果没有指定端口,用默认端口21Stringfilename=url.getFile();22//打开一个连接到指定主机和端口的网络socket连接23Socketsocket=newSocket(host,port);24//通过socket来获得输入和输出流25InputStreamfrom_server=socket.getInputStream();26PrintWriterto_server=newPrintWriter(socket.getOutputStream());2728//发送HTTPGET命令给网络服务器,指定要下载的文件29//使用了一个老版本非常简单的HTTP协议30to_server.print("GET"+filename+"\n\n");31to_server.flush();//立即发送32//现在读取服务器的响应,把接收到的内容写入文件33byte[]buffer=newbyte[4096];34intbytes_read;35while((bytes_read=from_server.read(buffer))!=-1)36to_file.write(buffer,0,bytes_read);37//当服务器关闭连接时,也关闭stuff38socket.close();39to_file.close();40}41catch(Exceptione){//发布错误42System.err.println(e);43System.err.println("Usage:javaHttpClient[]");44}45}46}
③ java编译中出现了Exception in thread “main" java.lang.UnsupportedClassVersionError
这个问题是由较高版本的JDK编译的java class文件试图在较低版本的JVM上运行产生的错误。
1、解决措施就是保证jvm(java命令)和jdk(javac命令)版本一致。如果是linux版本,则在命令行中分别输入java -version和javac -version命令来查看版本是否一致。这里假设都是1.7版本。
2、如果都一致,但还是解决不了问题,那么你肯定不是直接在命令行中用javac来编译的,而是用类似于eclipse、netbeans这样的编译器来编译的。因为很多编译器都自带javac,而不是采用操作系统中的编译器。如果你的编译器是eclipse的话,那么需要在项目的属性里设置jdk版本,方法是右击项目-->properties-->java compiler --> Enable project specific settings -->将compiler compliance level设置为1.7,也就是与jvm一致的版本(在命令行中java -version所显示的版本)。
3、综上,如果你是用编译器来编译的话,请首先确保编译器自带的jdk版本是否和操作系统中的java版本一致。一般是jdk和jvm版本不一致导致,可以在cmd命令下输入java -version和javac -version看版本是否一致,不一致的话需要改JAVA_HOME保证一致。
拓展资料:
Java 异常处理
异常是程序中的一些错误,但并不是所有的错误都是异常,并且错误有时候是可以避免的。异常发生的原因有很多,通常包含以下几大类:
1、用户输入了非法数据。
2、要打开的文件不存在。
3、网络通信时连接中断,或者JVM内存溢出。
这些异常有的是因为用户错误引起,有的是程序错误引起的,还有其它一些是因为物理错误引起的。要理解Java异常处理是如何工作的,你需要掌握以下三种类型的异常:
1、检查性异常:最具代表的检查性异常是用户错误或问题引起的异常,这是程序员无法预见的。例如要打开一个不存在文件时,一个异常就发生了,这些异常在编译时不能被简单地忽略。
2、运行时异常:运行时异常是可能被程序员避免的异常。与检查性异常相反,运行时异常可以在编译时被忽略。
3、错误:错误不是异常,而是脱离程序员控制的问题。错误在代码中通常被忽略。例如,当栈溢出时,一个错误就发生了,它们在编译也检查不到的。
④ kafka监控指标kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs;取不到值,在jconsole也找不到
kafka监控
kafka-web-console
https://github.com/claudemamo/kafka-web-console
部署sbt:
http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html
KafkaOffsetMonitor
https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day
kafka 监控之Mx4jLoader
源码未做任何修改,在lib下添加jar包mx4j-tools-3.0.1
启动server之后访问 ip:8082
/**
* If mx4j-tools is in the classpath call maybeLoad to load the HTTP interface of mx4j.
*
* The default port is 8082. To override that provide e.g. -Dmx4jport=8083
* The default listen address is 0.0.0.0. To override that provide -Dmx4jaddress=127.0.0.1
* This feature must be enabled with -Dmx4jenable=true
*
* This is a Scala port of org.apache.cassandra.utils.Mx4jTool written by Ran Tavory for CASSANDRA-1068
* */
JMX监控指标参数列表如下:
参数
Mbean名称
说明
Message in rate "kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics" 所有topic消息(进出)流量
Byte in rate "kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"
Request rate "kafka.network":name="{Proce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"
Byte out rate "kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics"
Log flush rate and time "kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"
# of under replicated partitions (|ISR| < |all replicas|) "kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager" 0
Is controller active on broker "kafka.controller":name="ActiveControllerCount",type="KafkaController" only one broker in the cluster should have 1
Leader election rate "kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats" non-zero when there are broker failures
Unclean leader election rate "kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats" 0
Partition counts "kafka.server":name="PartitionCount",type="ReplicaManager" mostly even across brokers
Leader replica counts "kafka.server":name="LeaderCount",type="ReplicaManager" mostly even across brokers
ISR shrink rate "kafka.server":name="ISRShrinksPerSec",type="ReplicaManager" If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.
ISR expansion rate "kafka.server":name="ISRExpandsPerSec",type="ReplicaManager" See above
Max lag in messages btw follower and leader replicas "kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager" 副本消息滞后数量
Lag in messages per follower replica "kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics" 副本消息滞后数量
Requests waiting in the procer purgatory "kafka.server":name="PurgatorySize",type="ProcerRequestPurgatory"
Requests waiting in the fetch purgatory "kafka.server":name="PurgatorySize",type="FetchRequestPurgatory"
Request total time "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics"
Time the request waiting in the request queue "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"
Time the request being processed at the leader "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"
Time the request waits for the follower "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics"
Time to send the response "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"
Number of messages the consumer lags behind the procer by "kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager"
顶
⑤ 为什么会出现cannot be cast to java.lang.String
很明显是类型转换错误。即Integer 类型不能转成String类型。
如果使用Kafka的javaApi向kafka发送消息时提示类似的错误:
Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to
则我们在发送消息时,需要根据自己发送的消息类型来指定序列化类。比如我们发送字符串消息,在构造生产者时增加以下配置:
Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");
这样就可以发送字符串消息了
⑥ fusioninsightkafka发送数据报错
运行程序出现错误。
需要改动kafka安装目录下config文件夹下的serverproperties中的以下两个属性,然后kafka重新启动。
fusioninsight是一个分布式数据处理系统,对外提供大容量的数据存储、查询和分析能力。