irpas技术客

java客户端发消息到kafka_逆风飞翔的小叔_java kafka发送消息

大大的周 6941

前言

下面记录下如何使用kafka的java客户端向kafka的broker发送消息

1、导入maven依赖

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>

2、发送方式1,直接发送

比如我们提前创建好了一个名为 ”zcy222“的topic,测试的数据将发到这个topic中

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer1 { public static void main(String[] args) throws Exception { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); System.out.println("开始发送数据"); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("zcy222","congge " + i)); } // 5. 关闭资源 kafkaProducer.close(); } }

连接上该topic的shell窗口的消费者,便于观察

运行上面的代码,通过控制台日志以及shell窗口可以看到,消息成功发送到kafka的broker

3、发送方式2,带有回调函数

在某些场景下,需要确保数据准确无误的发送到kafka的broker,即消息一定要成功发送到broker,这种情况下,可以通过回调函数的方式

import org.apache.kafka.clients.producer.*; import java.util.Properties; public class ProducerCallBack { public static void main(String[] args) throws Exception { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); System.out.println("开始发送数据"); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("zcy222", "congge " + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e == null){ System.out.println("发送成功"); System.out.println("主题:" + metadata.topic()); System.out.println("分区:" + metadata.partition()); } } }); } // 5. 关闭资源 kafkaProducer.close(); } }

通过这种方式,可以在回调函数中,获取到更多的信息,比如发送失败的原因,发送数据到达的broker分区等等

运行这段代码,观察效果

4、发送方式3,同步发送

上面2种发送消息的方式,默认情况下,属于异步发送,即消息从生产端发送到kafka的broker后,无需等待broker的ack就完成了

但是某些情况下,为确保消息的精准投放,可以采用同步发送,但是同步发送可能会减弱kafka的消息发送的吞吐量

/** * 同步发送 */ public class ProducerSync { public static void main(String[] args) throws Exception { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); System.out.println("开始发送数据"); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("zcy222","congge " + i)).get(); } // 5. 关闭资源 kafkaProducer.close(); } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #JAVA #kafka发送消息