irpas技术客

SpringCloudAlibaba学习-06-SpringCloud整合Kafka入门(一)_Fuly1024_springcloud整合kafka

网络投稿 3455

参考: https://blog.csdn.net/u011019141/article/details/108743342 https://blog.csdn.net/qq12547345/article/details/119531607 https://blog.csdn.net/JinXYan/article/details/90813592 部署kafka服务, 使用docker-compose部署 docker-compose内容如下 (kafka依赖zookeeper所以会同时部署zookeeper)

version: "3.7" services: zookeeper_server: image: wurstmeister/zookeeper container_name: zookeeper_server ports: - 2181:2181 volumes: - ./data:/data logging: options: max-size: "50M" # 最大文件上传限制 max-file: "100" driver: json-file kafka_server: image: wurstmeister/kafka container_name: kafka_server ports: - 9092:9092 environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper_server:2181 KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092 KAFKA_ADVERTISED_LISTENERS: INSIDE://:9093,OUTSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE volumes: - ./kafka-logs:/kafka logging: options: max-size: "50M" # 最大文件上传限制 max-file: "100" driver: json-file depends_on: - zookeeper_server

可能会有启动项目后可能会有连接报错的可能 记得在hosts文件中添加(一般都会有的) 127.0.0.1 localhost

本来想弄一个publisher模块和subscriber模块,网上的东西复现下来各种问题,于是就先弄个能跑的demo

项目结构: 依赖文件pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.demo.springcloud_05_kafka</groupId> <artifactId>kafka_publisher</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka_publisher</name> <description>kafka</description> <properties> <spring.cloud.version>Hoxton.SR9</spring.cloud.version> <spring.boot.version>2.3.2.RELEASE</spring.boot.version> <spring.cloud.alibaba.version>2.2.6.RELEASE</spring.cloud.alibaba.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.0</version> <optional>true</optional> </dependency> <!--kafka--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <!--spring-boot--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- spring-cloud --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

application.yml配置文件

server: port: 8581 spring: application: name: kafka_app cloud: stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2181 autoCreateTopics: true requiredAcks: 1 autoAddPartitions: true bindings: input: destination: stream-demo output: #这里用stream给我们提供的默认output,后面会讲到自定义output destination: stream-demo #消息发往的目的地 content-type: text/plain #消息发送的格式,接收端不用指定格式,但是发送端要

项目文件 KafkaPublisherApplication.java

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @SpringBootApplication //@EnableBinding(Source.class) public class KafkaPublisherApplication { public static void main(String[] args) { SpringApplication.run(KafkaPublisherApplication.class, args); } }

消息接受端 KafkaMessageReceiveListener.java

import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import java.util.Date; @Slf4j @EnableBinding(value = Sink.class) public class KafkaMessageReceiveListener { /** * 从缺省通道接收消息 * @param message */ @StreamListener(Sink.INPUT) public void receive(Message<String> message){ log.info("{}订阅告警消息:通道 = es_default_input,data = {}", new Date(), message); } }

消息发送端:KafkaMessageSender.java

import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Slf4j @Component @EnableBinding(Source.class) public class KafkaMessageSender { @Autowired private Source channel; public void sendToDefaultChannel(String message) { channel.output().send(MessageBuilder.withPayload(message).build()); } }

发送消息端 KafkaSenderController.java

import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController public class KafkaSenderController { @Autowired private KafkaMessageSender sender; @GetMapping("/send") public void testKafkaMessageSend(String message) { log.info("message:{}",message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); } }

然后启动项目浏览器访问 http://localhost:8581/send?message=%221234521313123 就可以看到接受到的消息了


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #参考