irpas技术客

多守护进程使用 kafka 注意事项(更新中)_潘广宇_kafka守护进程

irpas 4840

推荐Weary

问题1:消息处理问题

存在两个守护进程,如果需要A、B两个守护进程每次获取的消息是一样的,则只需要使用同一个group_id读取同一个分区即可。

如果只读取一个分区,则两个守护进程会同时获取相同的信息,比如生产者在一个topic生产了信息1,2,3,4,5,6,则两个守护进程都会同时获取到1,2,3,4,5,6

如果希望两个守护进程是同时消费,也就是不会重复消费,这时候需要修改topic的分区,改成多个分区,比如2(单台服务器也支持多个分区)

/usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 2 --topic mytopic ### 查看当前话题的配置情况 /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mytopic

生产者生产的消息会被均衡分发到两个分区,注意:两个分区获得的消息数量不一定是一样的,生产者产出10条信息,有可能分区1存储4条信息,分区2存储6条信息。

修改完成后,同时执行两个守护进程,两者得到的信息是不一样的: 守护进程1:读取分区1的内容 守护进程2:读取分区2的内容

这里修改分区后,我在单机上遇到一个问题,某个守护进程读取第二个分区的时候,会报错:

Local: Unknown partition

这时候需要检查函数读取的分区是否正确,比如使用php-rdkafka下面这两个方法,都需要修改 partition 参数。

public RdKafka\ConsumerTopic::consumeStart ( integer $partition , integer $offset ) : void public RdKafka\Message RdKafka\ConsumerTopic::consume ( integer $partition , integer $timeout_ms )

问题2:某项业务的守护进程(多守护进程)需要回滚到指针b处,重新消费消息

首先,需要找到消息的offset,然后回滚到offset重新消费信息,如何找到消息的offset,可以参考这篇文章:Kafka 查找某个消息内容在分片中的 offset_技术博主-CSDN博客

找到offset之后,在rdkafka只需要修改offset参数即可:

public RdKafka\ConsumerTopic::consumeStart ( integer $partition , integer $offset ) : void

使用shell,只需要指定 --offset 参数即可:

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.201:9092 --topic mytopic --partition 1 --offset 49

问题3:在守护进程里如何实时获取到某个topic的某个分片最大offset?

$message = $topic->consume($this->partition, 10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $this->max_offset = $message->offset; break; ... //@todo 在获取到最新的一条message消息中,offset为当前消息的最大offset,把它记录到全局变量即可

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #kafka守护进程 #2 #3 #4 #5