irpas技术客

springboot+kafka详细_起个名字总是说已存在_springboot+kafka

网络投稿 2203

集成kafka

1.引入kafka maven依赖

<!-- kafka包 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <!-- lang字符串工具 --> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.4</version> <scope>provided</scope> </dependency>

2.服务器安装Kafka (1).下载kafka压缩包

https://kafka.apache.org/downloads

(2).安装Kafka

tar -zxvf kafka_2.13-3.0.0.tgz

(3).修改kafka的配置

# listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://127.0.0.1:9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092 log.dirs=/usr/local/kafka_2.11-0.9.0.1/kafka-logs//

(4).修改kafka内置zookeeper的配置

dataDir=/usr/local/kafka/zookeeper dataLogDir=/usr/local/kafka/zookeeper-logs # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=100 tickTime=2000 initLimit=10 syncLimit=5 # Disable the admi

(5).开启zookeeper和kafka

bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties 为了方便可以自己创建一个启动脚本 进入kafka目录下 输入命令:vi kafkaStart.sh 添加内容为: #!/bin/bash #启动zookeeper /DATA/kafka/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/zookeeper.properties & sleep 3 #默默等3秒后执行 #启动kafka /DATA/kafka/kafka_2.12-2.0.0/bin/kafka-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/server.properties & broker.id:当前机器在集群中的唯一标识。例如有三台Kafka主机,则分别配置为1,2,3。 listeners:服务监听端口。 advertised.listeners:提供给生产者,消费者的端口号,即外部访问地址。默认为listeners的值。 zookeeper.connect:zookeeper连接地址。如有集群配置,每台Kafka主机都需要连接全部zookeeper服务

启动结果

(6).创建topic主题

./kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 -partitions 3 -replication-factor 1

(7).启动生产者连接主题

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

(8).启动消费者连接主题

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

3.springboot开始集成Kafka,添加配置

#kafka spring.applicationname=kafka-tutorial # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=172.31.111.11:9092 spring.kafka.producer.retries: 0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size: 16384 # 缓存容量 spring.kafka.producer.buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer # 指定默认消费者group id spring.kafka.consumer.group-id=demo spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 指定listener容器中的线程数,用于提高并发量 spring.kafka.listener.concurrency=3

4.创建测试bean包,创建kafka生产者与消费者 (1).创建生产者KafkaProduction

import com.alibaba.fastjson.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * kafka 生产者 * @author boxin * @date 2022年03月02日 20:32 */ @Component public class KafkaProduction <T>{ private Logger logger = LoggerFactory.getLogger(KafkaProduction.class); @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * 发送消息 * @param obj 消息体 * @param topics 消息主题 */ public void send(T obj,String topics) { String jsonObj = JSON.toJSONString(obj); logger.info("----kafka---- message = {}", jsonObj); //发送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topics, jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { logger.info("KafkaProduction: kafka to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { //成功消费 //TODO 业务处理 logger.info("KafkaProduction: The message has be sent successfully:"); logger.info("KafkaProduction: =============== result: " + stringObjectSendResult.toString()); } }); } }

(2).创建KafkaTopicsConstant常量类

public interface IKafkaSenderService { /** * 发送主题为test的消息 */ public void send(); }

(3).创建消费者KafkaConsumer

import com.iflytek.bim.cop.contant.KafkaTopicsConstant; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; @Component public class KafkaConsumer { private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); /** * 监听kafka.tut 的topic为test_topics的消息 * @param record */ @KafkaListener(topics = KafkaTopicsConstant.TEST_TOPICS) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); logger.info("KafkaConsumer 接收: ================= Topic:" + record.topic()); logger.info("KafkaConsumer 接收: ================= Record:" + record); logger.info("KafkaConsumer 接收: ================= Message:" + message); } } }

(4).发送实现

import com.iflytek.bim.cop.component.KafkaProduction; import com.iflytek.bim.cop.contant.KafkaTopicsConstant; import com.iflytek.bim.cop.domain.entity.User; import org.springframework.beans.factory.annotation.Autowired; /** * @author boxin * @date 2022年03月02日 20:50 */ public class KafkaSenderServiceImpl implements IKafkaSenderService { @Autowired private KafkaProduction<User> kafkaProduction; @Override public void send() { User user = new User(); user.setLoginName("I am is a panda"); user.setPassword("5588996633"); kafkaProduction.send(user, KafkaTopicsConstant.TEST_TOPICS); } }

(5).测试Kafka

@ResponseBody @RequestMapping("/") public void kafkatest() { kafkaSenderService.send(); }

(6).Kafka测试结果


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #springbootkafka #集成kafka1引入kafka #Maven依赖 #amplt #kafka包 #ampgt