㈠ 如何利用pykafka遠程消費 zookeeper+kafka集群 python腳本
#從kafka消費
#consumer_area = topic_area.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)
#從ZOOKEEPER消費
consumer_area = topic_area.get_balanced_consumer(
consumer_group=b'zs_download_04', # 自己命令
auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情況下,設置此變數,表示從最新的開始取
#auto_offset_reset=OffsetType.EARLIEST,
#reset_offset_on_start=True,
auto_commit_enable=True,
#auto_commit_interval_ms=1,
zookeeper_connect=ZK_LIST
)
㈡ from pykafka import kafkaclient什麼意思
不是一個意思,前者來自於你的系統本身,後者來自於你的存儲卡。
㈢ 如何重新讀取kafka集群topic
Kafka是由LinkedIn設計的一個高吞吐量、分布式、基於發布訂閱模式的消息系統,使用Scala編寫,它以可水平
擴展、可靠性、非同步通信和高吞吐率等特性而被廣泛使用。目前越來越多的開源分布式處理系統都支持與Kafka集成,其中Spark
Streaming作為後端流引擎配合Kafka作為前端消息系統正成為當前流處理系統的主流架構之一。
然而,當下越來越多的安全漏
洞、數據泄露等問題的爆發,安全正成為系統選型不得不考慮的問題,Kafka由於其安全機制的匱乏,也導致其在數據敏感行業的部署存在嚴重的安全隱患。本
文將圍繞Kafka,先介紹其整體架構和關鍵概念,再深入分析其架構之中存在的安全問題,最後分享下Transwarp在Kafka安全性上所做的工作及
其使用方法。
Kafka架構與安全
首先,我們來了解下有關Kafka的幾個基本概念:
Topic:Kafka把接收的消息按種類劃分,每個種類都稱之為Topic,由唯一的Topic Name標識。
Procer:向Topic發布消息的進程稱為Procer。
Consumer:從Topic訂閱消息的進程稱為Consumer。
Broker:Kafka集群包含一個或多個伺服器,這種伺服器被稱為Broker。
Kafka的整體架構如下圖所示,典型的Kafka集群包含一組發布消息的Procer,一組管理Topic的Broker,和一組訂閱消息的
Consumer。Topic可以有多個分區,每個分區只存儲於一個Broker。Procer可以按照一定的策略將消息劃分給指定的分區,如簡單的
輪詢各個分區或者按照特定欄位的Hash值指定分區。Broker需要通過ZooKeeper記錄集群的所有Broker、選舉分區的Leader,記錄
Consumer的消費消息的偏移量,以及在Consumer Group發生變化時進行relalance.
Broker接收和發送消息是被動的:由Procer主動發送消息,Consumer主動拉取消息。
然而,分析Kafka框架,我們會發現以下嚴重的安全問題:
網路中的任何一台主機,都可以通過啟動Broker進程而加入Kafka集群,能夠接收Procer的消息,能夠篡改消息並發送給Consumer。
網路中的任何一台主機,都可以啟動惡意的Procer/Consumer連接到Broker,發送非法消息或拉取隱私消息數據。
Broker不支持連接到啟用Kerberos認證的ZooKeeper集群,沒有對存放在ZooKeeper上的數據設置許可權。任意用戶都能夠直接訪問ZooKeeper集群,對這些數據進行修改或刪除。
Kafka中的Topic不支持設置訪問控制列表,任意連接到Kafka集群的Consumer(或Procer)都能對任意Topic讀取(或發送)消息。
隨著Kafka應用場景越來越廣泛,特別是一些數據隱私程度較高的領域(如道路交通的視頻監控),上述安全問題的存在猶如一顆定時炸彈,一旦內網被黑客
入侵或者內部出現惡意用戶,所有的隱私數據(如車輛出行記錄)都能夠輕易地被竊取,而無需攻破Broker所在的伺服器。
Kafka安全設計
基於上述分析,Transwarp從以下兩個方面增強Kafka的安全性:
身份認證(Authentication):設計並實現了基於Kerberos和基於IP的兩種身份認證機制。前者為強身份認證,相比於後者具有更好的安全性,後者適用於IP地址可信的網路環境,相比於前者部署更為簡便。
許可權控制(Authorization):設計並實現了Topic級別的許可權模型。Topic的許可權分為READ(從Topic拉取數據)、WRITE(向Topic中生產數據)、CREATE(創建Topic)和DELETE(刪除Topic)。
Broker啟動時,需要使用配置文件中的身份和密鑰文件向KDC(Kerberos伺服器)認證,認證通過則加入Kafka集群,否則報錯退出。
Procer(或Consumer)啟動後需要經過如下步驟與Broker建立安全的Socket連接:
Procer向KDC認證身份,通過則得到TGT(票證請求票證),否則報錯退出
Procer使用TGT向KDC請求Kafka服務,KDC驗證TGT並向Procer返回SessionKey(會話密鑰)和ServiceTicket(服務票證)
Procer
使用SessionKey和ServiceTicket與Broker建立連接,Broker使用自身的密鑰解密ServiceTicket,獲得與
Procer通信的SessionKey,然後使用SessionKey驗證Procer的身份,通過則建立連接,否則拒絕連接。
ZooKeeper需要啟用Kerberos認證模式,保證Broker或Consumer與其的連接是安全的。
Topic的訪問控制列表(ACL)存儲於ZooKeeper中,存儲節點的路徑為/acl/<topic>/<user>,
節點數據為R(ead)、W(rite)、C(reate)、D(elete)許可權的集合,如/acl/transaction/jack節點的數據為
RW,則表示用戶jack能夠對transaction這個topic進行讀和寫。
另外,kafka為特權用戶,只有kafka用戶能夠賦予/取消許可權。因此,ACL相關的ZooKeeper節點許可權為kafka具有所有許可權,其他用戶不具有任何許可權。
構建安全的Kafka服務
首先,我們為Broker啟用Kerberos認證模式,配置文件為/etc/kafka/conf/server.properties,安全相關的參數如下所示:
其中,authentication參數表示認證模式,可選配置項為simple, kerberos和ipaddress,默認為simple。當認證模式為kerberos時,需要額外配置賬戶屬性principal和對應的密鑰文件路徑keytab.
認證模式為ipaddress時,Procer和Consumer創建時不需要做任何改變。而認證模式為kerberos時,需要預先創建好相應的principal和keytab,並使用API進行登錄,樣例代碼如下所示:
public class SecureProcer extends Thread {
private final kafka.javaapi.procer.Procer<Integer, String> procer;
private final String topic;
private final Properties props = new Properties();
public SecureProcer(String topic) {
AuthenticationManager.setAuthMethod("kerberos");
AuthenticationManager.login("procer1", "/etc/procer1.keytab");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list",
"172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
procer = new kafka.javaapi.procer.Procer<Integer, String>(
new ProcerConfig(props));
this.topic = topic;
}
㈣ 如何在kafka-python和confluent-kafka之間做出選擇
kafka-python:蠻荒的西部
kafka-python是最受歡迎的Kafka Python客戶端。我們過去使用時從未出現過任何問題,在我的《敏捷數據科學2.0》一書中我也用過它。然而在最近這個項目中,它卻出現了一個嚴重的問題。我們發現,當以文檔化的方式使用KafkaConsumer、Consumer迭代式地從消息隊列中獲取消息時,最終到達主題topic的由Consumer攜帶的消息通常會丟失。我們通過控制台Consumer的分析驗證了這一點。
需要更詳細說明的是,kafka-python和KafkaConsumer是與一個由SSL保護的Kafka服務(如Aiven Kafka)一同使用的,如下面這樣:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...
當以這樣的推薦方式使用時,KafkaConsumer會丟失消息。但有一個變通方案,就是保留所有消息。這個方案是Kafka服務提供商Aiven support提供給我們的。它看起來像這樣:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...
雖然這個變通方案可能有用,但README中的方法會丟棄消息使我對其失去興趣。所以我找到了一個替代方案。
confluent-kafka:企業支持
發現coufluent-kafka Python模塊時,我感到無比驚喜。它既能做librdkafka的外封裝,又非常小巧。librdkafka是一個用C語言寫的kafka庫,它是Go和.NET的基礎。更重要的是,它由Confluent公司支持。我愛開源,但是當「由非正式社區擁有或支持」這種方式效果不行的時候,或許該考慮給替代方案印上公章、即該由某個公司擁有或支持了。不過,我們並未購買商業支持。我們知道有人會維護這個庫的軟體質量,而且可以選擇買或不買商業支持,這一點真是太棒了。
用confluent-kafka替換kafka-python非常簡單。confluent-kafka使用poll方法,它類似於上面提到的訪問kafka-python的變通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()
現在我們能收到所有消息了。我並不是說kafka-python工具不好,我相信社區會對它的問題做出反應並解決。但從現在開始,我會一直堅持使用confluent-kafka。
開源治理
開源是強大的,但是涉及到復雜的「大數據」和NoSQL工具時,通常需要有一家大公司在背後推動工具的開發。這樣你就知道,如果那個公司可以使用工具,那麼該工具應該擁有很好的基本功能。它的出現可能是非正式的,就像某公司發布類似FOSS的項目一樣,但也可能是正式的,就像某公司為工具提供商業支持一樣。當然,從另一個角度來看,如果一家與開源社區作對的公司負責開發某個工具,你便失去了控制權。你的意見可能無關緊要,除非你是付費客戶。
理想情況是採取開源治理,就像Apache基金會一樣,還有就是增加可用的商業支持選項。這對互聯網上大部分的免費軟體來說根本不可能。限制自己只使用那些公司蓋章批准後的工具將非常限制你的自由。這對於一些商店可能是正確選擇,但對於我們不是。我喜歡工具測試,如果工具很小,而且只專心做一件事,我就會使用它。
信任開源
對於更大型的工具,以上決策評估過程更為復雜。通常,我會看一下提交問題和貢獻者的數量,以及最後一次commit的日期。我可能會問朋友某個工具的情況,有時也會在推特上問。當你進行嗅探檢查後從Github選擇了一個項目,即說明你信任社區可以產出好的工具。對於大多數工具來說,這是沒問題的。
但信任社區可能存在問題。對於某個特定的工具,可能並沒有充分的理由讓你信任社區可以產出好的軟體。社區在目標、經驗和開源項目的投入時間方面各不相同。選擇工具時保持審慎態度十分重要,不要讓理想蒙蔽了判斷。
㈤ kafka python topic 多少數據
您好,希望以下回答能幫助您 我只想說還是換個系統比較好,win7裝tornado特別容易出問題,XP就可以 如您還有疑問可繼續追問。
㈥ 如何使用python 連接kafka 並獲取數據
連接 kafka 的庫有兩種類型,一種是直接連接 kafka 的,存儲 offset 的事情要自己在客戶端完成。還有一種是先連接 zookeeper 然後再通過 zookeeper 獲取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 來協調。
我現在使用 samsa 這個 highlevel 庫
Procer示例
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')
** Consumer示例 **
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:
print msg
Tip
consumer 必需在 procer 向 kafka 的 topic 裡面提交數據後才能連接,否則會出錯。
在 Kafka 中一個 consumer 需要指定 groupname , groue 中保存著 offset 等信息,新開啟一個 group 會從 offset 0 的位置重新開始獲取日誌。
kafka 的配置參數中有個 partition ,默認是 1 ,這個會對數據進行分區,如果多個 consumer 想連接同個 group 就必需要增加 partition , partition 只能大於 consumer 的數量,否則多出來的 consumer 將無法獲取到數據。
㈦ python kafka 能從一個位置開始讀topic嗎
孕育嘲濾展翅蚊燦
㈧ kafka獲取數據的幾種方式
一、基於Receiver的方式
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然後Spark Streaming啟動的job會去處理那些數據。
然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日誌中。所以,即使底層節點出現了失敗,也可以使用預寫日誌中的數據進行恢復。
如何進行Kafka數據源連接
1、在maven添加依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.4.1</version></dependency>
2、scala代碼
val kafkaStream = {val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"val kafkaParams = Map("zookeeper.connect" -> "zookeeper1:2181","group.id" -> "spark-streaming-test","zookeeper.connection.timeout.ms" -> "1000")val inputTopic = "input-topic"val numPartitionsOfInputTopic = 5val streams = (1 to numPartitionsOfInputTopic) map { _ =>KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)}val unifiedStream = ssc.union(streams)val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in proction.unifiedStream.repartition(sparkProcessingParallelism)}
需要注意的要點
1、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。所以,在KafkaUtils.createStream()中,提高partition的數量,只會增加一個Receiver中,讀取partition的線程的數量。不會增加Spark處理數據的並行度。
2、可以創建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver並行接收數據。
3、如果基於容錯的文件系統,比如HDFS,啟用了預寫日誌機制,接收到的數據都會被復制一份到預寫日誌中。因此,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
二、基於Direct的方式
這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據後,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據。
這種方式有如下優點:
1、簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然後對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
2、高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那麼就可以通過Kafka的副本進行恢復。
3、一次且僅一次的事務機制:
基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
scala連接代碼
val topics = Set("teststreaming")val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092" val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")// Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)val events = kafkaStream.flatMap(line => {Some(line.toString())})
三、總結:兩種方式在生產中都有廣泛的應用,新api的Direct應該是以後的首選方式。