导航:首页 > 编程语言 > javakafka消费

javakafka消费

发布时间:2022-05-18 07:35:44

‘壹’ kafka是干嘛的

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

主要特性

Kafka是一种高吞吐量 的分布式发布订阅消息系统,有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。

支持通过Kafka服务器和消费机集群来分区消息。

支持Hadoop并行数据加载。

Kafka通过官网发布了最新版本3.0.0。

以上内容来自 网络-kafka

‘贰’ kafka消费者组数量较大对性能有什么影响

在Kafak中国社区的群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。
怎么确定分区数?
“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka上标榜自己是"high-throughput distributed messaging system",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢? Kafka就是使用了分区(partition),通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理(不管是procer还是consumer)的高吞吐量。
Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于procer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。
但分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:
一、客户端/服务器端需要使用的内存就越多
先说说客户端的情况。Kafka 0.8.2之后推出了Java版的全新的procer,这个procer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。
二、文件句柄的开销
每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
三、降低高可用性
Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理procer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。
说了这么多“废话”,很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于,说每秒能到10MB,为什么我的procer每秒才1MB? —— 且不说硬件条件,最后发现他使用的消息体有1KB,而的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的procer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)
Tp表示procer的吞吐量。测试procer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。
另外,Kafka并不能真正地做到线性扩展(其实任何系统都不能),所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。
消息-分区的分配
默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions,如下图所示:
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}

这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?
if(key == null) { // 如果没有指定key
val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Id
id match {
case Some(partitionId) =>
partitionId // 如果有的话直接使用这个分区Id就好了
case None => // 如果没有的话,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分区的leader所在的broker
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
partitionId
}
}

可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)
如何设定consumer线程数
我个人的观点,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。让我们来看看具体Kafka是如何分配的。
topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。——其实ConsoleConsumer可以使用通配符的功能实现同时消费多个topic数据,但这和本文无关。
再讨论分配策略之前,先说说KafkaStream——它是consumer的关键类,提供了遍历方法用于consumer程序调用实现数据的消费。其底层维护了一个阻塞队列,所以在没有新消息到来时,consumer是处于阻塞状态的,表现出来的状态就是consumer程序一直在等待新消息的到来。——你当然可以配置成带超时的consumer,具体参看参数consumer.timeout.ms的用法。
下面说说 Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每个线程都分配哪些分区呢?
C0 消费分区 0, 1, 2, 3
C1 消费分区 4, 5, 6
C2 消费分区 7, 8, 9
具体算法就是:
val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每个consumer至少保证消费的分区数
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 还剩下多少个分区需要单独分配给开头的线程们

for (consumerThreadId <- consumerThreadIdSet) { // 对于每一个consumer线程
val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出该线程在所有线程中的位置,介于[0, n-1]
assert(myConsumerPosition >= 0)
// startPart 就是这个线程要消费的起始分区数
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是这个线程总共要消费多少个分区
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

}

针对于这个例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart为10%3=1,说明每个线程至少保证3个分区,还剩下1个分区需要单独分配给开头的若干个线程。这就是为什么C0消费4个分区,后面的2个线程每个消费3个分区,具体过程详见下面的Debug截图信息:
ctx.myTopicThreadIds

nPartsPerConsumer = 10 / 3 = 3
nConsumersWithExtraPart = 10 % 3 = 1

第一次:
myConsumerPosition = 1
startPart = 1 * 3 + min(1, 1) = 4 ---也就是从分区4开始读
nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 读取3个分区, 即4,5,6
第二次:
myConsumerPosition = 0
startPart = 3 * 0 + min(1, 0) =0 --- 从分区0开始读
nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 读取4个分区,即0,1,2,3
第三次:
myConsumerPosition = 2
startPart = 3 * 2 + min(2, 1) = 7 --- 从分区7开始读
nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 读取3个分区,即7, 8, 9
至此10个分区都已经分配完毕

说到这里,经常有个需求就是我想让某个consumer线程消费指定的分区而不消费其他的分区。坦率来说,目前Kafka并没有提供自定义分配策略。做到这点很难,但仔细想一想,也许我们期望Kafka做的事情太多了,毕竟它只是个消息引擎,在Kafka中加入消息消费的逻辑也许并不是Kafka该做的事情。

不消费问题
第一步:参看消费者的基本情况

查看mwbops系统,【Consumer监控】-->【对应的consumerId】

如果offset数字一直在动,说明一直在消费,说明不存在问题,return;
如果offset数字一直不动,看Owner是不是有值存在
如果Owner是空,说明消费端的程序已经跟Kafka断开连接,应该排查消费端是否正常,return;
如果Owner不为空,就是有上图上面的类似于 bennu_index_benuprdapp02--fa-0 的文字,继续看下面内容
第二步:查看消费端的程序代码
一般的消费代码是这样的

看看自己的消费代码里面,存不存在处理消息的时候出异常的情况
如果有,需要try-catch一下,其实不论有没有异常,都用try-catch包一下最好,如下面代码

return;
原因:如果在处理消息的时候有异常出现,又没有进行处理,那么while循环就会跳出,线程会结束,所以不会再去取消息,就是消费停止了。
第三步:查看消费端的配置
消费代码中一般以以下方式创建Consumer

消费端有一个配置,叫 fetch.message.max.bytes,默认是1M,此时如果有消息大于1M,会发生停止消费的情况。
此时,在配置中增加 props.put("fetch.message.max.bytes", "10 * 1024 * 1024"); 即可
return;
原因:目前Kafka集群配置的运行最大的消息大小是10M,如果客户端配置的运行接收的消息是1M,跟Kafka服务端配置的不一致,
则消息大于1M的情况下,消费端就无法消费,导致一直卡在这一条消息,现象就是消费停止。

‘叁’ kafka的group怎么能接着消费

很早以前我们组里的Intern写过一个Patch用来GC旧的consumer metadata from ZK:

[KAFKA-559] Garbage collect old consumer metadata entries

这个最终没有merge进code base,不过你可以考虑拿过来改一改自己用。

此外就是新版本0.9里面新增的Java New Consumer API,去掉了ZK的依赖,直接用Kafka Server做Coordination,在这里只要一个consumer group里面所有的member都掉线后,这个group之后就会被自动GC掉,包括它的offsets等等。这个新版本Consumer已经用在LI快两年了。当然要用这个新client你也需要upgrade你的Kafka Server到0.9版本:

Consumer Client Re-Design

‘肆’ 用Kafka和Java搭建的项目,Kafka管理中心在什么情况下会重复发送消息消费端的程序接收到消息,进入方法

非手动提交offset

消费者只要读取到数据,就会修改offset,不需要方法体执行完

手动提交

需要手动提交代码执行完毕

针对你的问题,情况有很多种可能。

  1. 你是否开启手动提交offset

  2. 你的消费者,有几个?是否是同一个组?

‘伍’ 多个消费者要消费kafka相同数据怎么办

在Kafak中国社区的qq群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。怎么确定分区数?“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka官网上标榜自己是"high-throughput distributed messaging system",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢? Kafka就是使用了分区partition,通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理不管是procer还是consumer的高吞吐量。Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于procer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费具体如何确定consumer线程数目我们后面会详细说明。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。但分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:一、客户端/服务器端需要使用的内存就越多先说说客户端的情况。Kafka 082之后推出了Java版的全新的procer,这个procer有个参数batchsize,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数大部分情况下是最佳的消费吞吐量配置的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。二、文件句柄的开销每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offsetlog和base_offsetindex。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄file handler。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。三、降低高可用性Kafka通过副本replica机制来保证高可用。具体做法就是为每个分区保存若干个副本replica_factor指定副本数。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理procer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。说了这么多“废话”,很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、软件、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于,官网说每秒能到10MB,为什么我的procer每秒才1MB? —— 且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的procer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / maxTp, TcTp表示procer的吞吐量。测试procer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。另外,Kafka并不能真正地做到线性扩展其实任何系统都不能,所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。消息-分区的分配默认情况下,Kafka根据传递消息的key来进行分区的分配,即hashkey

‘陆’ kafka 怎样查看kafka状态

输入以下代码即可查看kafka状态:

BROKER_HOST是kafka server的ip地址,PORTt是server的监听端口。多个host port之间用逗号隔开。

第一条命令是获取group列表,一般而言,应用是知道消费者group的,通常在应用的配置里,如果已知,该步骤可以省略。

第二条命令是查看具体的消费者group的详情信息,需要给出group的名称。

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

‘柒’ java工程kafka传递自定义对象,消费端获取到的是null

3. 启服务
3.1 启zookeeper
启zk两种式第种使用kafka自带zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另种使用其zookeeper位于本机位于其址种情况需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 启 kafka
bin/kafka-server-start.sh config/server.properties
4.创建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
创建名testtopic副本区
通list命令查看刚刚创建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.启procer并发送消息启procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
启发送消息

test
hello boy
按Ctrl+C退发送消息
6.启consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
启consumerconsole看procer发送消息
启两终端发送消息接受消息
都行查看zookeeper进程kafkatopic步步排查原吧

‘捌’ kafka是干嘛的

Kafka是由Apache软件基金会开发的一个开源流处理平台,Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这种动作是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

‘玖’ kafka在java应用中怎么设置每次只消费一条消息

:数据直接从通信网关过来?那最近每个map得到的数据是怎么区分的是发数据端按规则把数据配发到每个map?咱整过的一个例子是多个map同时从一张数据表取数进行数据处理在hdfs的输入目录给每个map指定一个输入文件map读取这个输入文件

阅读全文

与javakafka消费相关的资料

热点内容
24bit高频精品解压音乐 浏览:181
api程序员遇到更新 浏览:298
程序员程序运行搞笑图 浏览:772
秦思怎么下载app 浏览:691
发抖音怎么发自己的APP网站 浏览:362
androidinbitmap 浏览:774
lzma源码使用 浏览:748
ibm服务器湖南经销商云服务器 浏览:991
正规模板建站配云服务器商家 浏览:875
安卓清楚缓存命令 浏览:379
汽车压缩机电磁离合器损坏怎么修 浏览:507
怎么提取安卓软件 浏览:596
单片机和主机高速传文件 浏览:480
男生直发加密需要剃光头吗 浏览:826
qtdesignerlinux 浏览:433
命令的几要素 浏览:934
代理服务器地址怎么知道 浏览:174
汉语命令形 浏览:193
ACG官网下载的游戏怎么解压 浏览:965
stata交叉项命令 浏览:471