irpas技术客

kafka问题 Aattempt to heart beat failed since the group is rebalancing_zhang.yao

网络 4338

kafka 2.11 attempt to heart beat failed since the group is rebalancing 生产环境遇到kafka 2.11 重平衡问题,记录 为了解决问题,先还原此报错 ?

window下搭建kafka单节点

https://kafka.apache.org/downloads 2.1.1版本 下载后解压 修改 config目录下 zookeeper.properties **dataDir **指定zk数据存放目录(默认是linux目录结构) 修改 config目录下 server.properties log **log.dirs **指定kafka日志存放目录(默认是linux目录结构) ?

启动zk .\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

zk端口默认2181 ?

启动kafka .\bin\windows\kafka-server-start.bat config\server.properties

kafka默认端口 9092 ?

使用kafka_toole工具连接测试: 测试成功,单节点搭建完成

模拟程序生产者和消费者

开发程序,模拟生产者和消费者

# 引入jar包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 配置文件 spring.kafka.producer.bootstrap-servers=localhost:9092 # 配置kafka地址 spring.kafka.consumer.max-poll-records=1 #每次拉取一条数据 方便测试 spring.kafka.consumer.heartbeat-interval=100 # 心跳时间100ms 生产者

使用KafkaTemplate controller和service代码省略

package com.example.kafka_test.service.impl; import com.example.kafka_test.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import static com.example.kafka_test.KafkaConstant.TOPIC_NAME; /** * @Author: Zy * @Date: 2021/11/25 10:00 */ @Service public class ProducerServiceImpl implements ProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Override public void sendMsg(String msg) { kafkaTemplate.send(TOPIC_NAME, msg); } } 消费者 package com.example.kafka_test.service.impl; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import static com.example.kafka_test.KafkaConstant.CONSUMER_GROUP; import static com.example.kafka_test.KafkaConstant.TOPIC_NAME; /** * @Author: Zy * @Date: 2021/11/25 10:13 */ @Slf4j @Component public class ConsumerServiceImpl { @KafkaListener(topics = TOPIC_NAME, groupId = CONSUMER_GROUP) public void consumerMsg(String msg) { try { # 休眠 防止消费速度快 无法观察日志 Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } log.info(msg); } } 测试1-重平衡的触发 造数据

使用postman调用生产者造数据

启动第一个消费者

正常启动,且开始消费

启动第二个消费者

同样的分区,同样的消费者组 启动成功后,发现第一个消费者大量输出重平衡日志,且重平衡期间数据并没有消费

总结

消费者的数量变动会触发Rebalance 重平衡期间数据不消费 ?

测试2-还原报错

Attempt to heart beat failed since the group is rebalancing ?

修改配置参数 spring.kafka.consumer.heartbeat-interval=100 # 每100ms发送一次心跳 spring.kafka.consumer.properties.max.poll.interval.ms=9000 #最大每9000ms拉取一次数据 spring.kafka.consumer.properties.session.timeout.ms=30000 #超过此时间没有发送心跳,则认为消费者死亡,剔除组,并触发rebalance

修改后像测试1一样重启,发现大量报错![image.png](https://img-blog.csdnimg.cn/img_convert/427ef7f0f4905a582a30bab7b68bb7c1.png#clientId=u2e6e65cc-8171-4&from=paste&height=296&id=u186d515a&margin=[object Object]&name=image.png&originHeight=296&originWidth=1834&originalType=binary&ratio=1&size=95316&status=done&style=none&taskId=u3e3f0995-2de6-4f0a-b3cb-374988c7bc8&width=1834)

还原报错成功,总结一下 ?

总结 错误原因:

Attempt to heart beat failed since the group is rebalancing 先分析一下这句话,发送心跳请求失败,消费者组正在重平衡 也就是说触发这个问题的条件有两个:

发送心跳消费者重平衡 发送心跳请求:

在kafka 0.11版本之前,心跳请求是跟poll()请求一起发送的,即拉取一次数据发送一次心跳 在kafka 0.11版本之后,心跳请求是单独的线程,由 spring.kafka.consumer.heartbeat-interval 参数控制心跳请求的间隔时间 ?

重平衡:

触发重平衡的情况如下:

有新的consumer加入旧的consumer挂了coordinator挂了,集群选举出新的coordinatortopic的partition新加consumer调用unsubscrible(),取消topic的订阅

经过以上分析,可以得到结果; 由于拉取数据消费过慢,两次poll之间的时间超过了session.timeout.ms,被认为此消费者已死亡,触发了rebalance,而在rebalance的过程中,发送心跳请求导致的报错. ?

解决办法: 调高心跳请求的间隔时间 heartbeat-interval 此项可不调整,因为心跳时间偏大的情况下,也可能触发rebalance调高超时时间 session.timeout.ms减少每次poll的拉取数据量 max-poll-records 防止每次poll拉取的数据处理的时间过长,导致超时

其中最好的办法就是2 3 最重要的就是poll到的数据要在session.timeout.ms时间内处理完.


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #kafka问题 #Aattempt #To #Heart #beat #failed #since #The