kafka的kafka 消费者组组该怎么删除

请登录查看
消费群是多线程或多机器的Apache Kafka主题。消费者群体消费者可以使用相同的 group.id 加入群组一个组的最大并行度是组中的消费者数量←不是分区。Kafka将主题的分区分配给组中的使用者,以便每个分区仅由组中的一个使用者使用。Kafka保证消息只能被组中的一个消费者读取。消费者可以按照消息存储在日志中的顺序查看消息。重新平衡消费者添加更多进程/线程将导致Kafka重新平衡。 如果任何消费者或代理无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。 在此重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程。import java.util.P
import java.util.A
import org.apache.kafka.clients.consumer.KafkaC
import org.apache.kafka.clients.consumer.ConsumerR
import org.apache.kafka.clients.consumer.ConsumerR
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length & 2){
System.out.println(&Usage: consumer &topic& &groupname&&);
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put(&bootstrap.servers&, &localhost:9092&);
props.put(&group.id&, group);
props.put(&enable.auto.commit&, &true&);
props.put(&auto.commit.interval.ms&, &1000&);
props.put(&session.timeout.ms&, &30000&);
props.put(&key.deserializer&,
&org.apache.kafka.common.serializa-tion.StringDeserializer&);
props.put(&value.deserializer&,
&org.apache.kafka.common.serializa-tion.StringDeserializer&);
KafkaConsumer&String, String& consumer = new KafkaConsumer&String, String&(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println(&Subscribed to topic & & topic);
int i = 0;
while (true) {
ConsumerRecords&String, String& records = con-sumer.poll(100);
for (ConsumerRecord&String, String& record : records)
System.out.printf(&offset = %d, key = %s, value = %s\n&,
record.offset(), record.key(), record.value());
汇编javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*& ConsumerGroup.java
执行&&java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*&:.
ConsumerGroup &topic-name& my-group
&&java -cp &/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*&:.
ConsumerGroup &topic-name& my-group
在这里,我们为两个消费者创建了一个示例组名称为 my-group 。 同样,您可以在组中创建您的组和消费者数量。输入打开生产者CLI并发送一些消息 - Test consumer group 01
Test consumer group 02
第一个过程的输出Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
第二个过程的输出Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
现在希望你能通过使用Java客户端演示了解SimpleConsumer和ConsumeGroup。 现在,您了解如何使用Java客户端发送和接收消息。 让我们在下一章继续Kafka与大数据技术的集成。
意见反馈:
联系方式:
广告等垃圾信息
不友善内容
违反法律法规的内容
不宜公开讨论的政治内容kafka的消费者组该怎么删除? - 知乎13被浏览<strong class="NumberBoard-itemValue" title="分享邀请回答0添加评论分享收藏感谢收起kafka基于ZK消费者模型分析kafka基于ZK消费者模型分析畅想科技百家号消费者客户端设计基本思想a),同一个Partition不会同时被一个以上的消费者消费,更详细的说:b),同一个Partition只能被一个消费组中的一个消费者消费c),一个消费者可以消费多个Partitiond),一个Partition只会指定给一个消费者e),每一个消费组共享所有Partition,也就是同一个消费组下消费者对Partition是互斥的,而不同消费组之间是共享的f),一个Partition只会分配给一个消费者线程简单阐述下kafka的发布订阅和队列模型a),发布订阅模型:同一条数据会会被所有的消费组获取,每个消费组只有一个消费者可以消费,前提每个消费组只有一个消费者b),队列模型:同上,如果一个消费组有多个消费者,同时只有一个消费组订阅该主题则实现实现单播消费组模型如图所示:(官方图片)基于ZK设计流程ZK作为去中心化的集群模式,在消费者模型解决的核心问题是需要消费者知道哪些生产者是可用的,即在kafka中消费者需要知道集群中哪些节点和哪些分片可用首先需要存储数据在ZK中,消费进度在使用ZK作为协调者的时候,消费进度会定时的保存在ZK上,在获取新的Partition时候新的消费者都会从ZK中获取数据进行数据消费除此之外消费组的成员列表,主题和Partition也保存在ZK中,ZK会在消费者组节点下注册消费者子节点。启动流程如下:消费者在启动时就需要指定消费组和需要依赖的ZK集群,连接ZK后需要获取分配的Partition,然后创建对应的Partition消息流读取数据ZK会监听Partition的变化,消费组的变化和会话超时信息,触发新的事件进行Partition的重新分配。负责消费Partition的每个消费者都是一个消费进程,该进程也可以创建多个线程消费分区数据,ConcurrentMessageListenerContainer为多线程消息监听类例如:一个消费者在同一个主题下创建三个个线程消费同一个主题的三个Partition和三个消费者(每个消费者一个消费线程)消费该主题的最终结果是一样的。每一个消费者都会针对每个主题创建多个线程,每一个线程对应一个队列和消息流一个线程容许被分配给多个Partition,多个Partition会共用同一个队列和消息流。每个消费者在启动的时候都会订阅三种事件,如上所说:会话超时,消费组变化和主题变化事件:关于消费者和Partition变化后如何实现消费的再平衡,有个基本思路,消费组中的消费者发生变化如退出或者加入某一消费组,则该消费组已分配Partition的数量和节点需要进行重新分配,但消费进度会进行传递,避免重复消费,即消费组成员变化引起所有消费者发生Rebalance,且消费者在Rebalance前后分配到的Partition会完全不同,消费者和Partition的再分配主要由ZKREBALANCELISTENER类负责处理,详细流程如下:a),停止拉取线程防止数据重复b),分配之前会先删除原有ZK上保存的相关信息,如果没有进行删除操作则很有可能遇到同一Partition被多个消费者消费造成数据消费混乱c),为所有Partition重新分配消费者d),在分配Partition成功后则启动创建拉取线程其中关键点阐述如下:a),每个消费者只有在获取到Partition后才能拉取数据,才知道从哪里拉取数据,在拉取之前还需要知道消费进度,即读取的Partition偏移量,Partition信息对象会根据从ZK中拉取的Partition,队列和偏移量数据进行对象的构造,Partition信息被用到拉取线程中,这样才能在分区被重新分配后保证各自消费的消息平滑迁移和过渡b),拉取线程将数据填充到队列后,消息流方可以从队列中迭代出数据用于消费者消费,拉取线程也会针对同一主题多个节点的多个Partition进行网络优化和请求合并但不完全等同于生产者线程优化策略。c),拉取线程在备份副本的使用上和消费者拉取管理器上的使用上有所不同:消费者的使用上主要采用阻塞方式,并放入对应的消费队列而针对备份副本主要是为了做数据同步,目的不同,则采用异步的方式进行数据的拉取,存储方式采用的和主节点保持一致d),消息被放入队列中以数据块的方式进行存储,一个数据块对应一个Partition的消息集合总结基于ZK监听器流程为高级API采用的方式,即0.9之前的设计流程,0.9之后使用Java对该架构进行了一次重构,主要变化在减少了SCALE和ZK的依赖,使代码工程更加简洁,核心改变在于使用了kafka自己的分组协调机制来代替ZK的监听器机制启动方式由原来的指定ZK集群替换成需要指定kafka集群,这样在触发分配分区和提交分区偏移量的时候,就发送给kafka协调者进行处理,提交分区偏移量则只会发送给协调者,后续会做更详细的解析本文仅代表作者观点,不代表百度立场。系作者授权百家号发表,未经许可不得转载。畅想科技百家号最近更新:简介:精彩科技,任您翻阅,欢迎关注。作者最新文章相关文章kafka的消费者组该怎么删除_百度知道
kafka的消费者组该怎么删除
我有更好的答案
没有删除消费者的需求 除非队列迁移 因为我们消费者的group是固定的如果你有这种需求的话 调用zk api删除就行了
为您推荐:
其他类似问题
&#xe675;换一换
回答问题,赢新手礼包&#xe6b9;
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。新版本kafka
中新消费者的offset 的保存时间如何控制?
offset的保存时间无需关心。你应该关心数据的删除时间吧。
半兽人之家
您还未填写推送消息的邮箱,放心,此邮箱仅通知你提出的问题是否有人回答!

我要回帖

更多关于 kafka 删除消费者 的文章

 

随机推荐