『壹』 kafka是幹嘛的
Kafka是由Apache軟體基金會開發的一個開源流處理平台,由Scala和java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。
這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。
對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
主要特性
Kafka是一種高吞吐量 的分布式發布訂閱消息系統,有如下特性:
通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
高吞吐量:即使是非常普通的硬體Kafka也可以支持每秒數百萬的消息。
支持通過Kafka伺服器和消費機集群來分區消息。
支持Hadoop並行數據載入。
Kafka通過官網發布了最新版本3.0.0。
以上內容來自 網路-kafka
『貳』 kafka消費者組數量較大對性能有什麼影響
在Kafak中國社區的群中,這個問題被提及的比例是相當高的,這也是Kafka用戶最常碰到的問題之一。本文結合Kafka源碼試圖對該問題相關的因素進行探討。希望對大家有所幫助。
怎麼確定分區數?
「我應該選擇幾個分區?」——如果你在Kafka中國社區的群里,這樣的問題你會經常碰到的。不過有些遺憾的是,我們似乎並沒有很權威的答案能夠解答這樣的問題。其實這也不奇怪,畢竟這樣的問題通常都是沒有固定答案的。Kafka上標榜自己是"high-throughput distributed messaging system",即一個高吞吐量的分布式消息引擎。那麼怎麼達到高吞吐量呢?Kafka在底層摒棄了Java堆緩存機制,採用了操作系統級別的頁緩存,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO性能。但是,這只是一個方面,畢竟單機優化的能力是有上限的。如何通過水平擴展甚至是線性擴展來進一步提升吞吐量呢? Kafka就是使用了分區(partition),通過將topic的消息打散到多個分區並分布保存在不同的broker上實現了消息處理(不管是procer還是consumer)的高吞吐量。
Kafka的生產者和消費者都可以多線程地並行操作,而每個線程處理的是一個分區的數據。因此分區實際上是調優Kafka並行度的最小單元。對於procer而言,它實際上是用多個線程並發地向不同分區所在的broker發起Socket連接同時給這些分區發送消息;而consumer呢,同一個消費組內的所有consumer線程都被指定topic的某一個分區進行消費(具體如何確定consumer線程數目我們後面會詳細說明)。所以說,如果一個topic分區越多,理論上整個集群所能達到的吞吐量就越大。
但分區是否越多越好呢?顯然也不是,因為每個分區都有自己的開銷:
一、客戶端/伺服器端需要使用的內存就越多
先說說客戶端的情況。Kafka 0.8.2之後推出了Java版的全新的procer,這個procer有個參數batch.size,默認是16KB。它會為每個分區緩存消息,一旦滿了就打包將消息批量發出。看上去這是個能夠提升性能的設計。不過很顯然,因為這個參數是分區級別的,如果分區數越多,這部分緩存所需的內存佔用也會更多。假設你有10000個分區,按照默認設置,這部分緩存需要佔用約157MB的內存。而consumer端呢?我們拋開獲取數據所需的內存不說,只說線程的開銷。如果還是假設有10000個分區,同時consumer線程數要匹配分區數(大部分情況下是最佳的消費吞吐量配置)的話,那麼在consumer client就要創建10000個線程,也需要創建大約10000個Socket去獲取分區數據。這裡面的線程切換的開銷本身已經不容小覷了。
伺服器端的開銷也不小,如果閱讀Kafka源碼的話可以發現,伺服器端的很多組件都在內存中維護了分區級別的緩存,比如controller,FetcherManager等,因此分區數越多,這種緩存的成本越久越大。
二、文件句柄的開銷
每個分區在底層文件系統都有屬於自己的一個目錄。該目錄下通常會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會為每個broker都保存這兩個文件句柄(file handler)。很明顯,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit -n的限制。
三、降低高可用性
Kafka通過副本(replica)機制來保證高可用。具體做法就是為每個分區保存若干個副本(replica_factor指定副本數)。每個副本保存在不同的broker上。期中的一個副本充當leader 副本,負責處理procer和consumer請求。其他副本充當follower角色,由Kafka controller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分情況下可能只是幾毫秒級別。但如果你有10000個分區,10個broker,也就是說平均每個broker上有1000個分區。此時這個broker掛掉了,那麼zookeeper和controller需要立即對這1000個分區進行leader選舉。比起很少的分區leader選舉而言,這必然要花更長的時間,並且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。
說了這么多「廢話」,很多人肯定已經不耐煩了。那你說到底要怎麼確定分區數呢?答案就是:視情況而定。基本上你還是需要通過一系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對你意義不大,因為不同的硬體、、負載情況測試出來的結果必然不一樣。我經常碰到的問題類似於,說每秒能到10MB,為什麼我的procer每秒才1MB? —— 且不說硬體條件,最後發現他使用的消息體有1KB,而的基準測試是用100B測出來的,因此根本沒有可比性。不過你依然可以遵循一定的步驟來嘗試確定分區數:創建一個只有1個分區的topic,然後測試這個topic的procer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然後假設總的目標吞吐量是Tt,那麼分區數 = Tt / max(Tp, Tc)
Tp表示procer的吞吐量。測試procer通常是很容易的,因為它的邏輯非常簡單,就是直接發送消息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關系更大, 因為Tc的值取決於你拿到消息之後執行什麼操作,因此Tc的測試通常也要麻煩一些。
另外,Kafka並不能真正地做到線性擴展(其實任何系統都不能),所以你在規劃你的分區數的時候最好多規劃一下,這樣未來擴展時候也更加方便。
消息-分區的分配
默認情況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions,如下圖所示:
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
這就保證了相同key的消息一定會被路由到相同的分區。如果你沒有指定key,那麼Kafka是如何確定這條消息去往哪個分區的呢?
if(key == null) { // 如果沒有指定key
val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有緩存的現成的分區Id
id match {
case Some(partitionId) =>
partitionId // 如果有的話直接使用這個分區Id就好了
case None => // 如果沒有的話,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分區的leader所在的broker
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中隨機挑一個
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用
partitionId
}
}
可以看出,Kafka幾乎就是隨機找一個分區發送無key的消息,然後把這個分區號加入到緩存中以備後面直接使用——當然了,Kafka本身也會清空該緩存(默認每10分鍾或每次請求topic元數據時)
如何設定consumer線程數
我個人的觀點,如果你的分區數是N,那麼最好線程數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的線程不會被分配到任何分區。讓我們來看看具體Kafka是如何分配的。
topic下的一個分區只能被同一個consumer group下的一個consumer線程來消費,但反之並不成立,即一個consumer線程可以消費多個分區的數據,比如Kafka提供的ConsoleConsumer,默認就只是一個線程來消費所有分區的數據。——其實ConsoleConsumer可以使用通配符的功能實現同時消費多個topic數據,但這和本文無關。
再討論分配策略之前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用於consumer程序調用實現數據的消費。其底層維護了一個阻塞隊列,所以在沒有新消息到來時,consumer是處於阻塞狀態的,表現出來的狀態就是consumer程序一直在等待新消息的到來。——你當然可以配置成帶超時的consumer,具體參看參數consumer.timeout.ms的用法。
下面說說 Kafka提供的兩種分配策略: range和roundrobin,由參數partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分區,P0 ~ P9,consumer線程數是3, C0 ~ C2,那麼每個線程都分配哪些分區呢?
C0 消費分區 0, 1, 2, 3
C1 消費分區 4, 5, 6
C2 消費分區 7, 8, 9
具體演算法就是:
val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每個consumer至少保證消費的分區數
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 還剩下多少個分區需要單獨分配給開頭的線程們
for (consumerThreadId <- consumerThreadIdSet) { // 對於每一個consumer線程
val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出該線程在所有線程中的位置,介於[0, n-1]
assert(myConsumerPosition >= 0)
// startPart 就是這個線程要消費的起始分區數
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是這個線程總共要消費多少個分區
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
}
針對於這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart為10%3=1,說明每個線程至少保證3個分區,還剩下1個分區需要單獨分配給開頭的若干個線程。這就是為什麼C0消費4個分區,後面的2個線程每個消費3個分區,具體過程詳見下面的Debug截圖信息:
ctx.myTopicThreadIds
nPartsPerConsumer = 10 / 3 = 3
nConsumersWithExtraPart = 10 % 3 = 1
第一次:
myConsumerPosition = 1
startPart = 1 * 3 + min(1, 1) = 4 ---也就是從分區4開始讀
nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 讀取3個分區, 即4,5,6
第二次:
myConsumerPosition = 0
startPart = 3 * 0 + min(1, 0) =0 --- 從分區0開始讀
nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 讀取4個分區,即0,1,2,3
第三次:
myConsumerPosition = 2
startPart = 3 * 2 + min(2, 1) = 7 --- 從分區7開始讀
nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 讀取3個分區,即7, 8, 9
至此10個分區都已經分配完畢
說到這里,經常有個需求就是我想讓某個consumer線程消費指定的分區而不消費其他的分區。坦率來說,目前Kafka並沒有提供自定義分配策略。做到這點很難,但仔細想一想,也許我們期望Kafka做的事情太多了,畢竟它只是個消息引擎,在Kafka中加入消息消費的邏輯也許並不是Kafka該做的事情。
不消費問題
第一步:參看消費者的基本情況
查看mwbops系統,【Consumer監控】-->【對應的consumerId】
如果offset數字一直在動,說明一直在消費,說明不存在問題,return;
如果offset數字一直不動,看Owner是不是有值存在
如果Owner是空,說明消費端的程序已經跟Kafka斷開連接,應該排查消費端是否正常,return;
如果Owner不為空,就是有上圖上面的類似於 bennu_index_benuprdapp02--fa-0 的文字,繼續看下面內容
第二步:查看消費端的程序代碼
一般的消費代碼是這樣的
看看自己的消費代碼裡面,存不存在處理消息的時候出異常的情況
如果有,需要try-catch一下,其實不論有沒有異常,都用try-catch包一下最好,如下面代碼
return;
原因:如果在處理消息的時候有異常出現,又沒有進行處理,那麼while循環就會跳出,線程會結束,所以不會再去取消息,就是消費停止了。
第三步:查看消費端的配置
消費代碼中一般以以下方式創建Consumer
消費端有一個配置,叫 fetch.message.max.bytes,默認是1M,此時如果有消息大於1M,會發生停止消費的情況。
此時,在配置中增加 props.put("fetch.message.max.bytes", "10 * 1024 * 1024"); 即可
return;
原因:目前Kafka集群配置的運行最大的消息大小是10M,如果客戶端配置的運行接收的消息是1M,跟Kafka服務端配置的不一致,
則消息大於1M的情況下,消費端就無法消費,導致一直卡在這一條消息,現象就是消費停止。
『叄』 kafka的group怎麼能接著消費
很早以前我們組里的Intern寫過一個Patch用來GC舊的consumer metadata from ZK:
[KAFKA-559] Garbage collect old consumer metadata entries
這個最終沒有merge進code base,不過你可以考慮拿過來改一改自己用。
此外就是新版本0.9裡面新增的Java New Consumer API,去掉了ZK的依賴,直接用Kafka Server做Coordination,在這里只要一個consumer group裡面所有的member都掉線後,這個group之後就會被自動GC掉,包括它的offsets等等。這個新版本Consumer已經用在LI快兩年了。當然要用這個新client你也需要upgrade你的Kafka Server到0.9版本:
Consumer Client Re-Design
『肆』 用Kafka和Java搭建的項目,Kafka管理中心在什麼情況下會重復發送消息消費端的程序接收到消息,進入方法
非手動提交offset
消費者只要讀取到數據,就會修改offset,不需要方法體執行完
手動提交
需要手動提交代碼執行完畢
針對你的問題,情況有很多種可能。
你是否開啟手動提交offset
你的消費者,有幾個?是否是同一個組?
『伍』 多個消費者要消費kafka相同數據怎麼辦
在Kafak中國社區的qq群中,這個問題被提及的比例是相當高的,這也是Kafka用戶最常碰到的問題之一。本文結合Kafka源碼試圖對該問題相關的因素進行探討。希望對大家有所幫助。怎麼確定分區數?「我應該選擇幾個分區?」——如果你在Kafka中國社區的群里,這樣的問題你會經常碰到的。不過有些遺憾的是,我們似乎並沒有很權威的答案能夠解答這樣的問題。其實這也不奇怪,畢竟這樣的問題通常都是沒有固定答案的。Kafka官網上標榜自己是"high-throughput distributed messaging system",即一個高吞吐量的分布式消息引擎。那麼怎麼達到高吞吐量呢?Kafka在底層摒棄了Java堆緩存機制,採用了操作系統級別的頁緩存,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO性能。但是,這只是一個方面,畢竟單機優化的能力是有上限的。如何通過水平擴展甚至是線性擴展來進一步提升吞吐量呢? Kafka就是使用了分區partition,通過將topic的消息打散到多個分區並分布保存在不同的broker上實現了消息處理不管是procer還是consumer的高吞吐量。Kafka的生產者和消費者都可以多線程地並行操作,而每個線程處理的是一個分區的數據。因此分區實際上是調優Kafka並行度的最小單元。對於procer而言,它實際上是用多個線程並發地向不同分區所在的broker發起Socket連接同時給這些分區發送消息;而consumer呢,同一個消費組內的所有consumer線程都被指定topic的某一個分區進行消費具體如何確定consumer線程數目我們後面會詳細說明。所以說,如果一個topic分區越多,理論上整個集群所能達到的吞吐量就越大。但分區是否越多越好呢?顯然也不是,因為每個分區都有自己的開銷:一、客戶端/伺服器端需要使用的內存就越多先說說客戶端的情況。Kafka 082之後推出了Java版的全新的procer,這個procer有個參數batchsize,默認是16KB。它會為每個分區緩存消息,一旦滿了就打包將消息批量發出。看上去這是個能夠提升性能的設計。不過很顯然,因為這個參數是分區級別的,如果分區數越多,這部分緩存所需的內存佔用也會更多。假設你有10000個分區,按照默認設置,這部分緩存需要佔用約157MB的內存。而consumer端呢?我們拋開獲取數據所需的內存不說,只說線程的開銷。如果還是假設有10000個分區,同時consumer線程數要匹配分區數大部分情況下是最佳的消費吞吐量配置的話,那麼在consumer client就要創建10000個線程,也需要創建大約10000個Socket去獲取分區數據。這裡面的線程切換的開銷本身已經不容小覷了。伺服器端的開銷也不小,如果閱讀Kafka源碼的話可以發現,伺服器端的很多組件都在內存中維護了分區級別的緩存,比如controller,FetcherManager等,因此分區數越多,這種緩存的成本越久越大。二、文件句柄的開銷每個分區在底層文件系統都有屬於自己的一個目錄。該目錄下通常會有兩個文件: base_offsetlog和base_offsetindex。Kafak的controller和ReplicaManager會為每個broker都保存這兩個文件句柄file handler。很明顯,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit -n的限制。三、降低高可用性Kafka通過副本replica機制來保證高可用。具體做法就是為每個分區保存若干個副本replica_factor指定副本數。每個副本保存在不同的broker上。期中的一個副本充當leader 副本,負責處理procer和consumer請求。其他副本充當follower角色,由Kafka controller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分情況下可能只是幾毫秒級別。但如果你有10000個分區,10個broker,也就是說平均每個broker上有1000個分區。此時這個broker掛掉了,那麼zookeeper和controller需要立即對這1000個分區進行leader選舉。比起很少的分區leader選舉而言,這必然要花更長的時間,並且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。說了這么多「廢話」,很多人肯定已經不耐煩了。那你說到底要怎麼確定分區數呢?答案就是:視情況而定。基本上你還是需要通過一系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對你意義不大,因為不同的硬體、軟體、負載情況測試出來的結果必然不一樣。我經常碰到的問題類似於,官網說每秒能到10MB,為什麼我的procer每秒才1MB? —— 且不說硬體條件,最後發現他使用的消息體有1KB,而官網的基準測試是用100B測出來的,因此根本沒有可比性。不過你依然可以遵循一定的步驟來嘗試確定分區數:創建一個只有1個分區的topic,然後測試這個topic的procer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然後假設總的目標吞吐量是Tt,那麼分區數 = Tt / maxTp, TcTp表示procer的吞吐量。測試procer通常是很容易的,因為它的邏輯非常簡單,就是直接發送消息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關系更大, 因為Tc的值取決於你拿到消息之後執行什麼操作,因此Tc的測試通常也要麻煩一些。另外,Kafka並不能真正地做到線性擴展其實任何系統都不能,所以你在規劃你的分區數的時候最好多規劃一下,這樣未來擴展時候也更加方便。消息-分區的分配默認情況下,Kafka根據傳遞消息的key來進行分區的分配,即hashkey
『陸』 kafka 怎樣查看kafka狀態
輸入以下代碼即可查看kafka狀態:
BROKER_HOST是kafka server的ip地址,PORTt是server的監聽埠。多個host port之間用逗號隔開。
第一條命令是獲取group列表,一般而言,應用是知道消費者group的,通常在應用的配置里,如果已知,該步驟可以省略。
第二條命令是查看具體的消費者group的詳情信息,需要給出group的名稱。
Kafka是由Apache軟體基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。
這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。
對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
『柒』 java工程kafka傳遞自定義對象,消費端獲取到的是null
3. 啟服務
3.1 啟zookeeper
啟zk兩種式第種使用kafka自帶zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另種使用其zookeeper位於本機位於其址種情況需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 啟 kafka
bin/kafka-server-start.sh config/server.properties
4.創建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
創建名testtopic副本區
通list命令查看剛剛創建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.啟procer並發送消息啟procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
啟發送消息
比
test
hello boy
按Ctrl+C退發送消息
6.啟consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
啟consumerconsole看procer發送消息
啟兩終端發送消息接受消息
都行查看zookeeper進程kafkatopic步步排查原吧
『捌』 kafka是幹嘛的
Kafka是由Apache軟體基金會開發的一個開源流處理平台,Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。這種動作是在現代網路上的許多社會功能的一個關鍵因素。這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
『玖』 kafka在java應用中怎麼設置每次只消費一條消息
:數據直接從通信網關過來?那最近每個map得到的數據是怎麼區分的是發數據端按規則把數據配發到每個map?咱整過的一個例子是多個map同時從一張數據表取數進行數據處理在hdfs的輸入目錄給每個map指定一個輸入文件map讀取這個輸入文件