Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka是一個分布式的、可分區的(partitioned)、基於備份的(replicated)和commit-log存儲的服務.。它提供了類似於messaging system的特性,但是在設計實現上完全不同)。kafka是一種高吞吐量的分布式發布訂閱消息系統,它有如下特性:
(1)、通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
(2)、高吞吐量:即使是非常普通的硬體kafka也可以支持每秒數十萬的消息。
(3)、支持通過kafka伺服器和消費機集群來分區消息。
(4)、支持Hadoop並行數據載入。
一、用Kafka裡面自帶的腳本進行編譯
下載好了Kafka源碼,裡面自帶了一個gradlew的腳本,我們可以利用這個編譯Kafka源碼:
1 # wget http://mirror.bit.e.cn/apache/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
運行上面的命令進行編譯將會出現以下的異常信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
這是一個bug(https://issues.apache.org/jira/browse/KAFKA-1297),可以用下面的命令進行編譯
1 ./gradlew releaseTarGzAll -x signArchives
這時候將會編譯成功(在編譯的過程中將會出現很多的)。在編譯的過程中,我們也可以指定對應的Scala版本進行編譯:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
編譯完之後將會在core/build/distributions/裡面生成kafka_2.10-0.8.1.1.tgz文件,這個和從網上下載的一樣,可以直接用。
二、利用sbt進行編譯
我們同樣可以用sbt來編譯Kafka,步驟如下:
01 # git clone https://git-wip-us.apache.org/repos/asf/kafka.git
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading http://repo1.maven.org/maven2/ant/ant/1.6.5/ant-1.6.5.jar ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
對於Kafka 0.8及以上版本還需要運行以下的命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove plicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
當然,我們也可以在sbt裡面指定scala的版本:
01 <!--
02 User: 過往記憶
03 Date: 14-6-18
04 Time: 20:20
05 bolg: http://www.iteblog.com
06 本文地址:http://www.iteblog.com/archives/1044
07 過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
08 過往記憶博客微信公共帳號:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"
② kafka消費者java版本讀取不到消息怎麼辦
可以連接到一個網路伺服器並且能夠從這個伺服器下載指定的URL,程序中直接使用HTTP協議。程序將定義一個輸出流,下載的URL的內容將來被寫入這個流,通過socket來獲得輸入和輸出流:viewsourceprint?01importjava.io.*;02importjava.net.*;03publicclassHttpClient{04publicstaticvoidmain(String[]args){05try{06//Demo參數:07if((args.length!=1)&&(args.length!=2))("Wrongnumberofargs");09//定義輸出流,下載的URL內容被寫入這個流10OutputStreamto_file;11if(args.length==2)to_file=newFileOutputStream(args[1]);12elseto_file=System.out;13//使用URL類來把用戶指定的URL解析成幾個部分14URLurl=newURL(args[0]);15Stringprotocol=url.getProtocol();16if(!protocol.equals("http"))//檢驗是否滿足支持的協議("Mustuse'http:'protocol");18Stringhost=url.getHost();19intport=url.getPort();20if(port==-1)port=80;//如果沒有指定埠,用默認埠21Stringfilename=url.getFile();22//打開一個連接到指定主機和埠的網路socket連接23Socketsocket=newSocket(host,port);24//通過socket來獲得輸入和輸出流25InputStreamfrom_server=socket.getInputStream();26PrintWriterto_server=newPrintWriter(socket.getOutputStream());2728//發送HTTPGET命令給網路伺服器,指定要下載的文件29//使用了一個老版本非常簡單的HTTP協議30to_server.print("GET"+filename+"\n\n");31to_server.flush();//立即發送32//現在讀取伺服器的響應,把接收到的內容寫入文件33byte[]buffer=newbyte[4096];34intbytes_read;35while((bytes_read=from_server.read(buffer))!=-1)36to_file.write(buffer,0,bytes_read);37//當伺服器關閉連接時,也關閉stuff38socket.close();39to_file.close();40}41catch(Exceptione){//發布錯誤42System.err.println(e);43System.err.println("Usage:javaHttpClient[]");44}45}46}
③ java編譯中出現了Exception in thread 「main" java.lang.UnsupportedClassVersionError
這個問題是由較高版本的JDK編譯的java class文件試圖在較低版本的JVM上運行產生的錯誤。
1、解決措施就是保證jvm(java命令)和jdk(javac命令)版本一致。如果是linux版本,則在命令行中分別輸入java -version和javac -version命令來查看版本是否一致。這里假設都是1.7版本。
2、如果都一致,但還是解決不了問題,那麼你肯定不是直接在命令行中用javac來編譯的,而是用類似於eclipse、netbeans這樣的編譯器來編譯的。因為很多編譯器都自帶javac,而不是採用操作系統中的編譯器。如果你的編譯器是eclipse的話,那麼需要在項目的屬性里設置jdk版本,方法是右擊項目-->properties-->java compiler --> Enable project specific settings -->將compiler compliance level設置為1.7,也就是與jvm一致的版本(在命令行中java -version所顯示的版本)。
3、綜上,如果你是用編譯器來編譯的話,請首先確保編譯器自帶的jdk版本是否和操作系統中的java版本一致。一般是jdk和jvm版本不一致導致,可以在cmd命令下輸入java -version和javac -version看版本是否一致,不一致的話需要改JAVA_HOME保證一致。
拓展資料:
Java 異常處理
異常是程序中的一些錯誤,但並不是所有的錯誤都是異常,並且錯誤有時候是可以避免的。異常發生的原因有很多,通常包含以下幾大類:
1、用戶輸入了非法數據。
2、要打開的文件不存在。
3、網路通信時連接中斷,或者JVM內存溢出。
這些異常有的是因為用戶錯誤引起,有的是程序錯誤引起的,還有其它一些是因為物理錯誤引起的。要理解Java異常處理是如何工作的,你需要掌握以下三種類型的異常:
1、檢查性異常:最具代表的檢查性異常是用戶錯誤或問題引起的異常,這是程序員無法預見的。例如要打開一個不存在文件時,一個異常就發生了,這些異常在編譯時不能被簡單地忽略。
2、運行時異常:運行時異常是可能被程序員避免的異常。與檢查性異常相反,運行時異常可以在編譯時被忽略。
3、錯誤:錯誤不是異常,而是脫離程序員控制的問題。錯誤在代碼中通常被忽略。例如,當棧溢出時,一個錯誤就發生了,它們在編譯也檢查不到的。
④ kafka監控指標kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs;取不到值,在jconsole也找不到
kafka監控
kafka-web-console
https://github.com/claudemamo/kafka-web-console
部署sbt:
http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html
KafkaOffsetMonitor
https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day
kafka 監控之Mx4jLoader
源碼未做任何修改,在lib下添加jar包mx4j-tools-3.0.1
啟動server之後訪問 ip:8082
/**
* If mx4j-tools is in the classpath call maybeLoad to load the HTTP interface of mx4j.
*
* The default port is 8082. To override that provide e.g. -Dmx4jport=8083
* The default listen address is 0.0.0.0. To override that provide -Dmx4jaddress=127.0.0.1
* This feature must be enabled with -Dmx4jenable=true
*
* This is a Scala port of org.apache.cassandra.utils.Mx4jTool written by Ran Tavory for CASSANDRA-1068
* */
JMX監控指標參數列表如下:
參數
Mbean名稱
說明
Message in rate "kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics" 所有topic消息(進出)流量
Byte in rate "kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"
Request rate "kafka.network":name="{Proce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"
Byte out rate "kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics"
Log flush rate and time "kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"
# of under replicated partitions (|ISR| < |all replicas|) "kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager" 0
Is controller active on broker "kafka.controller":name="ActiveControllerCount",type="KafkaController" only one broker in the cluster should have 1
Leader election rate "kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats" non-zero when there are broker failures
Unclean leader election rate "kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats" 0
Partition counts "kafka.server":name="PartitionCount",type="ReplicaManager" mostly even across brokers
Leader replica counts "kafka.server":name="LeaderCount",type="ReplicaManager" mostly even across brokers
ISR shrink rate "kafka.server":name="ISRShrinksPerSec",type="ReplicaManager" If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.
ISR expansion rate "kafka.server":name="ISRExpandsPerSec",type="ReplicaManager" See above
Max lag in messages btw follower and leader replicas "kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager" 副本消息滯後數量
Lag in messages per follower replica "kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics" 副本消息滯後數量
Requests waiting in the procer purgatory "kafka.server":name="PurgatorySize",type="ProcerRequestPurgatory"
Requests waiting in the fetch purgatory "kafka.server":name="PurgatorySize",type="FetchRequestPurgatory"
Request total time "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics"
Time the request waiting in the request queue "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"
Time the request being processed at the leader "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"
Time the request waits for the follower "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics"
Time to send the response "kafka.network":name="{Proce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"
Number of messages the consumer lags behind the procer by "kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager"
頂
⑤ 為什麼會出現cannot be cast to java.lang.String
很明顯是類型轉換錯誤。即Integer 類型不能轉成String類型。
如果使用Kafka的javaApi向kafka發送消息時提示類似的錯誤:
Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to
則我們在發送消息時,需要根據自己發送的消息類型來指定序列化類。比如我們發送字元串消息,在構造生產者時增加以下配置:
Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");
這樣就可以發送字元串消息了
⑥ fusioninsightkafka發送數據報錯
運行程序出現錯誤。
需要改動kafka安裝目錄下config文件夾下的serverproperties中的以下兩個屬性,然後kafka重新啟動。
fusioninsight是一個分布式數據處理系統,對外提供大容量的數據存儲、查詢和分析能力。