irpas技术客

分区配置错误导致Kafka Topic xxx not present in metadata bug问题排查_奔跑的蜗牛&

irpas 7592

Kafka Topic xxx not present in metadata bug问题排查 异常堆栈 17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] o.s.k.s.LoggingProducerListener error:254 - Exception thrown when sending a message with key='null' and payload='{"bizId":"property","extendMap":{"stationCode":"21097-01","clientId":"00187dd33c02:21097@21097-01","...' to topic iot_edge_property and partition 1: org.apache.kafka.common.errors.TimeoutException: Topic iot_edge_property not present in metadata after 60000 ms. 00:38:59.944, [a25f869d4afcdecf a25f869d4afcdecf] [TID:N/A] [MQTT Call: 17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] c.g.s.b.k.p.KafKaProducerClient send:76 - send mq message error msgId:f1f5de02f4c248c196691184c035336b topic:iot_edge_property org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic iot_edge_property not present in metadata after 60000 ms. at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:372) at cn.g2link.seg.base.kafka.producter.KafKaProducerClient.send(KafKaProducerClient.java:67) at cn.g2link.seg.base.kafka.producter.KafKaProducerClient.sendFailSave(KafKaProducerClient.java:17 分析 假设1:是不是网络问题。

试了下没问题,排除。

假设2:缺少jackson包的的问题

根据异常信息从网上搜索,网上的答案都比较统一说是缺少jackson包的的问题,基于我们现有的情况,这个项目是经过了多轮测试,在之前测试都比较顺畅,没出现过这个问题,所以应该不是这个问题,排除。

假设3:是不是生产kafka版本与测试环境的版本不一致,版本不兼容。

让运维查看了一下版本一致,排除。

假设4:是不是报文过大了,导致发送超时了

写了一个小demo,往生产的kafka进行发送。

发送客户端代码

这里分区是跟运维上线前确认过是2,所以就没改动。

/** * 发送客户端 **/ public class ProducerClient { /** * kafka的分区,默认是2 **/ @Value("${kafka.partition.count:2}") private int partitionCount; /** * @description:发送消息 * @date: 2020/9/8 下午5:02 * @param: message * @param: callback * @return: void */ public void send(MQMessage message, ListenableFutureCallback callback) { try { if (Objects.isNull(message.getMessageId())) { //雪花算法 message.setMessageId(idGenerator.nextStrValue()); } //如果key和分区都为空,那么自动计算分区 if (StringUtil.isBlank(message.getKey()) && StringUtil.isNull(message.getPartition())) { message.setPartition(Math.abs(message.getMessageId().hashCode()) % partitionCount); } //消息序列化 message.setMessage(getBizMessage(message)); log.info("send mq message start,msgId:{} topic:{}", message.getMessageId(), message.getTopic()); if (log.isDebugEnabled()) { log.debug("send mq message, msgId:{} topic:{} message:{}", message.getMessageId(), message.getTopic(), JSON.toJSONString(message)); } ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message.getTopic(), message.getPartition(), message.getKey(), JSON.toJSONString(message)); if (Objects.nonNull(callback)) { future.addCallback(callback); } log.info("send mq message end,msgId:{} topic:{}", message.getMessageId(), message.getTopic()); } catch (Exception e) { log.error("send mq message error msgId:{} topic:{}", message.getMessageId(), message.getTopic(), e); throw new BizException(BizExceptionCode.SEND_MQ_ERROR, e); } } } 消息发送示例

一个简单报文测试接口、一个原报文测试接口

@RestController @RequestMapping(value = "/example/test/") @Api(tags = " 测试接口") @Log4j2 public class TestController { /** *简单报文测试 **/ @PostMapping(value = "/testSendMq") @ApiOperation(value = "发送mq测试") public void testSendMq() { if (BooleanUtil.intConvertBoolean(testEnable)) { Map<String, Object> msg = Maps.newHashMap(); msg.put("userId", "0001"); MQMessage mqMessage = new MQMessage(); mqMessage.setTopic("test_topic1231a"); mqMessage.setMessageId(idGenerator.nextStrValue()); mqMessage.setStoreRecords(true); mqMessage.setFailRetry(false); mqMessage.setMessage(msg); producerClient.sendFailSave(mqMessage); } } /** *原报文测试 **/ @PostMapping(value = "/testSendMq2") @ApiOperation(value = "发送mq测试") public void testSendMq2() { if (BooleanUtil.intConvertBoolean(testEnable)) { String parkCode = "21097", stationCode = "21097-01"; String emqMsgStr = "{\n" + "\t\"messageId\": \"f1f5de02f4c248c196691184c035336b\",\n" + "\t\"extendMap\": {\n" + "\t\t\"clientId\": \"00187dd33c02\",\n" + "\t\t\"parkCode\": \"21097\",\n" + "\t\t\"stationCode\": \"21097-01\"\n" + "\t},\n" + "\t\"timestamp\": 1626287494277,\n" + "\t\"bizId\": \"property\",\n" + "\t\"message\": {\n" + "\t\t\"deviceCode\": \"21097000641\",\n" + "\t\t\"name\": \"测试数据\",\n" + "\t\t\"areaCode\": \"1212121212\",\n" + "\t\t\"deviceType\": \"xxxx1212\",\n" + "\t\t\"properties\": [{\n" + "\t\t\t\"name\": \"高度1\",\n" + "\t\t\t\"identifier\": \"tank_height\",\n" + "\t\t\t\"value\": \"6\",\n" + "\t\t\t\"valueRefDevices\": null,\n" + "\t\t\t\"timestamp\": 1626280679529,\n" + "\t\t\t\"changed\": 0\n" + "\t\t}],\n" + "\t\t\"traceId\": \"\",\n" + "\t\t\"mock\": 0\n" + "\t},\n" + "\t\"resend\": true\n" + "}"; log.info("获取到的parkCode {},stationCode{} 作为 topic为{}", parkCode, stationCode, parkCode.hashCode()); MQMessage mqMessage = JSON.parseObject(emqMsgStr, MQMessage.class); Map<String, Object> extendMap = mqMessage.getExtendMap(); if (Objects.isNull(extendMap)) { extendMap = Maps.newHashMap(); } extendMap.put("parkCode", "21097"); extendMap.put("stationCode", "21097-01"); mqMessage.setExtendMap(extendMap); mqMessage.setTopic("test_topic1231a"); mqMessage.setStoreRecords(true); mqMessage.setFailRetry(false); producerClient.sendFailSave(mqMessage); } } } 执行简单报文示例,成功

2. 执行原报文接口2,失败。。。

3. 在此执行简单报文,失败了。。。

在等一会执行简单报文又成功了。。。

问题复现了就比较好解决了,初步分析是不是原报文过大了,导致发送队列堵塞超时的,然后在此发简单报文,也会超时,基于以上情况找运维同学,查看kafka配置,突然发现kafka的配置是1。。。,突然想到我们之前的配置是2。

假设5:是不是分区配置错了

基于查看kafka的配置是1,而我们配置的是2,然后我们修改kafka.partition.count=1,在此进行测试,问题解决。。。。。

@Value("${kafka.partition.count:2}") private int partitionCount; /** * @description:发送消息 * @date: 2020/9/8 下午5:02 * @param: message * @param: callback * @return: void */ public void send(MQMessage message, ListenableFutureCallback callback) { try { if (Objects.isNull(message.getMessageId())) { //雪花算法 message.setMessageId(idGenerator.nextStrValue()); } //如果key和分区都为空,那么自动计算分区 if (StringUtil.isBlank(message.getKey()) && StringUtil.isNull(message.getPartition())) { message.setPartition(Math.abs(message.getMessageId().hashCode()) % partitionCount); } //消息序列化 message.setMessage(getBizMessage(message)); log.info("send mq message start,msgId:{} topic:{}", message.getMessageId(), message.getTopic()); if (log.isDebugEnabled()) { log.debug("send mq message, msgId:{} topic:{} message:{}", message.getMessageId(), message.getTopic(), JSON.toJSONString(message)); } ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message.getTopic(), message.getPartition(), message.getKey(), JSON.toJSONString(message)); if (Objects.nonNull(callback)) { future.addCallback(callback); } log.info("send mq message end,msgId:{} topic:{}", message.getMessageId(), message.getTopic()); } catch (Exception e) { log.error("send mq message error msgId:{} topic:{}", message.getMessageId(), message.getTopic(), e); throw new BizException(BizExceptionCode.SEND_MQ_ERROR, e); } } } 总结

基于多种情况分析是由于kafka的分区配置错误导致。在发送消息时设置的分区为不存在的分区然后发送失败。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #分区配置错误导致Kafka #topic #xxx #not #present #in #metadata #bug问题排查