irpas技术客

Idea+maven+spring-cloud项目搭建系列--8整合Kafka_拽着尾巴的鱼儿_maven spring-kafka

未知 7889

前言: 本文是建立在已整合nacos 的基础上进行的扩展,如需要整合Nacos 可以参考: https://blog.csdn.net/l123lgx/article/details/121624988 本文是建立在服务器已经部署Kafka的基础只上进行的整合,如需要部署Kafka可以参考:https://blog.csdn.net/l123lgx/article/details/122047659

1 Kafka 介绍: Spring for Apache Kafka (spring-kafka) 项目将核心 Spring 概念应用于基于 Kafka 的消息传递解决方案的开发;

官网地址:https://docs.spring.io/spring-kafka/docs/current/reference/html/

2 springcloud 整合kafka:

2.1 引入kafka jar:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.6.0</version> </dependency>

2.2 nacos kafka 配置文件:

spring: kafka: # kafka 服务地址 bootstrap-servers: kafkaip:9092 # 消费者配置 consumer: autostartup: false # 消费者分组 group-id: consumer-1 properties: # 设置从什么位置进行消费,这里设置从最早开始消费 auto-offset-reset: earliest # 消费偏移量是否自动提交,这里设置false enable-auto-commit: false # 心跳间隔 heartbeat-interval-ms: 2000 # enable-auto-commit为true 时使用,自动提交的间隔时间 auto-commit-interval-ms: 500 # kafka 每次拉取的记录数 max-poll-records: 10 # session 超时时间 session-timeout-ms: 6000 # 加密 security-protocol: SASL_PLAINTEXT sasl-mechanism: PLAIN # 加密的实现类及配置 账号和密码 sasl-jaas-config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345678"; # 生产者配置 producer: # 生产者id client-id: producer-1 properties: # 失败时重试次数 retries: 3 # 批量发送的消息数量 batch-size: 16384 # 32MB的批处理缓冲区 buffer-memory: 33554432 # 发送信息时所有的机器都写入成功,此消息为发送成功 acks: all # 加密 security-protocol: SASL_PLAINTEXT sasl-mechanism: PLAIN sasl-jaas-config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="12345678";

2.3 kafka 配置文件kafkaConfig.java:

import com.google.common.collect.Maps; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; import java.util.Map; /** * @Description TODO * @Date 2020/12/24 16:25 * @Author lgx * @Version 1.0 */ @Configuration @EnableKafka public class kafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.properties.enable-auto-commit}") private Boolean autoCommit; @Value("${spring.kafka.consumer.properties.auto-commit-interval-ms}") private Integer autoCommitInterval; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.properties.max-poll-records}") private Integer maxPollRecords; @Value("${spring.kafka.consumer.properties.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.consumer.properties.security-protocol:PLAINTEXT}") private String securityProtocol; @Value("${spring.kafka.consumer.properties.sasl-mechanism:GSSAPI}") private String saslMechanism; @Value("${spring.kafka.consumer.properties.sasl-jaas-config:null}") private String saslJaasConfig; @Value("${spring.kafka.consumer.autoStartup:false}") private boolean autoStartup; @Value("${spring.kafka.producer.properties.retries}") private Integer retries; @Value("${spring.kafka.producer.properties.batch-size}") private Integer batchSize; @Value("${spring.kafka.producer.properties.buffer-memory}") private Integer bufferMemory; @Value("${spring.kafka.producer.properties.acks}") private String acks; @Value("${spring.kafka.producer.client-id}") private String clientId; @Value("${spring.kafka.producer.properties.security-protocol:PLAINTEXT}") private String securityProtocolProducer; @Value("${spring.kafka.producer.properties.sasl-mechanism:GSSAPI}") private String saslMechanismProducer; @Value("${spring.kafka.producer.properties.sasl-jaas-config:null}") private String saslJaasConfigProducer; @Value("${spring.cric.bi.environment:dev}") private String environment; /** * 生产者配置信息 */ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ProducerConfig.ACKS_CONFIG, acks); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.CLIENT_ID_CONFIG,clientId); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); props.put("security.protocol", securityProtocolProducer); props.put("sasl.mechanism", saslMechanismProducer); props.put("sasl.jaas.config", saslJaasConfigProducer); return props; } /** * 生产者工厂 */ @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * 生产者模板 */ @Bean public KafkaTemplate<String, String> kafkaTemplateCustomer() { return new KafkaTemplate<>(producerFactory()); } /** * 消费者配置信息 */ @Bean public Map<String, Object> batchConsumerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval); props.put("security.protocol", securityProtocol); props.put("sasl.mechanism", saslMechanism); props.put("sasl.jaas.config", saslJaasConfig); return props; } /** * 消费者批量工程 */ @Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(batchConsumerConfigs())); //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); factory.setConcurrency(1); //设置提交偏移量的方式 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // if (!autoStartup){ // factory.setAutoStartup(false); // } return factory; } }

3 测试: 3.1 测试生产者 KafkaTestProducerController.java:

import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; /** * @Description TODO * @Date 2020/11/24 15:00 * @Author lgx * @Version 1.0 */ @Slf4j @RestController public class KafkaTestProducerController { static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//24小时制 @Autowired @Qualifier("kafkaTemplateCustomer") private KafkaTemplate kafkaTemplate; @RequestMapping(value ="message/send", method = RequestMethod.GET, produces = "application/json;charset=utf-8") public Map<String, Object> send(@RequestParam String msg){ Map<String, Object> mapData = new HashMap<>(); try { ListenableFuture<SendResult> data = kafkaTemplate.send("test1","weixin_123"); ListenableFuture<SendResult> data1 = kafkaTemplate.send("test1","weixin_123"); ListenableFuture<SendResult> data2 = kafkaTemplate.send("test1","weixin_123"); Object obj = data.get(); mapData.put("success",true); }catch (Exception e){ log.error(e.getMessage()); mapData.put("success",false); mapData.put("errorMs",e.getMessage()); } return mapData; } }

3.2 测试消费者KafkaTestConsumer.java:

import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List; /** * @Description TODO * @Date 2021/12/17 17:06 * @Author lgx * @Version 1.0 */ @Slf4j @Component public class KafkaTestConsumer { @KafkaListener(topics = "test1",containerFactory = "batchFactory") public void articleConsumerWx(List<ConsumerRecord<?,?>> records, Consumer consumer){ Long time1 = System.currentTimeMillis(); records.stream().forEach(e->{ System.out.println("e.value() = " + e.value()); }); // 提交偏移量 consumer.commitAsync(); // consumer.commitSync(); Long time2 = System.currentTimeMillis(); log.debug("消费{}条数据,耗时:{}",records.size(),(time2-time1)/1000); } }

参考:https://docs.spring.io/spring-kafka/docs/current/reference/html


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Maven #springkafka #Kafka #介绍spring #for