irpas技术客

java kafka使用_ws-wang_java kafka使用

未知 613

消息队列的应用场景, 异步处理, 系统解耦,日志处理 常用消息队列比较

特性 ActiveMQ RabbitMQ Kafka RocketMQ 所属 Apache Mozilla Apache Apache/Ali 成熟度 成熟 成熟 成熟 比较成熟 生产者-消费者模式 支持 支持 支持 支持 发布-订阅 支持 支持 支持 支持 REQUEST-REPLY 支持 支持 - 支持 API完备性 高 高 高 低(静态配置) 多语言支持 支持 语言无关 支持 支持 单机呑吐量 万级 万级 十万级 十万级(最高) 消息延迟 - 微秒级 毫秒级 - 可用性 高(主从) 高(主从) 非常高(分布式) 高 消息丢失 - 低 理论上不会丢失 - 消息重复 - 可控制 理论上会有重复 - 事务 支持 不支持 支持 支持 文档的完备性 高 高 高 中 首次部署难度 - 低 中 高

不论成成熟度、社区、性能、可靠性,Kafka都是非常好的一款产品

集群环境搭建

http://kafka.apache.org/downloads下载地址 Kafka计划使用内嵌的KRaft替代ZooKeeper,是一个非常大的进步,因为像ES之类的分布式系统,这种集群meta信息的同步,都是自循环的,而且更快。 2.8以上版本在config目录下,多了一个叫做kraft的目录,里面包含着一套新的配置文件,可以直接摒弃对ZK的依赖。 但是kraft不完善,目前不要在线上环境开启这个功能,还是用ZK

三台kafka,三台zookeeper tar -zxvf kafka_2.12-3.0.0.tgz 修改 server.properties # 指定broker的id broker.id=0 # 指定Kafka数据的位置 log.dirs=/kafka/data # 配置zk的三个节点 zookeeper.connect=ip1:2181,ip2:2181,ip3:2181 复制到另外服务器 scp -r srcpath ip:path 修改另外服务器的broker.id分别为1和2 如果需要远程访问修改 advertised.listeners=PLAINTEXT://ip:9194 配置KAFKA_HOME环境变量 vim /etc/profile export KAFKA_HOME=/opt/kafka_2.12-3.0.0 export PATH=:$PATH:${KAFKA_HOME} source /etc/profile # 启动Kafka nohup kafka-server-start.sh ../config/server.properties & 常用命令 创建topic,test的主题 kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic test 查看目前Kafka中的主题 kafka-topics.sh --list --bootstrap-server localhost:9092 删除主题 kafka-topics.sh --bootstrap-server localhost:2181 --delete --topic test 创建消息 kafka-console-producer.sh --broker-list localhost:9092 --topic test 消费消息(即查看) kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

可以使用Kafka Tools图形界面连接kafka

kafka主要概念

zookeeper: ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer),Kafka正在逐步想办法将ZooKeeper剥离,自己来管理自己

broker: Kafka实例,一个Kafka的集群通常由多个broker组成

主题Topic: 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据 Kafka中的主题必须要有标识符,而且是唯一的,没有数量上的限制

分区Partition: 主题被分为多个分区

producer: 生产者负责将数据推送给broker的topic 生产者分区写入策略 1.轮询分区策略 2.随机分区策略 3.按key分区分配策略 4.自定义分区策略 实现 my implements Partitioner props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, my.class.getName());

consumer: 消费者负责从broker的topic中拉取数据

consumer group: 一个消费者组可以包含多个消费者 一个消费者组有一个唯一的ID 组内的消费者一起消费主题的数据(即一个数据只能被同一个消费者组中一个消费者消费一次) 消费者组Rebalance机制: 是Kafka中确保Consumer group下所有的consumer如何分配订阅topic每个分区的机制 消费者客户端参数partition.asssignment.strategy可以配置多个分配策略 触发的时机有: 1.消费者组中consumer的个数发生变化, 2.订阅的topic个数发生变化 3.订阅的topic分区数发生变化 发生Rebalance时会对consumer group产生非常严重的影响,Rebalance的过程中所有的消费者都将停止工作,直到Rebalance完成。 消费者分区分配策略: Range范围分配策略是Kafka默认的分配策略,将分区数平均分给消费者(如1,2给消费者1和3,4给消费者2) 配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor RoundRobin轮询策略,轮询方式逐个将分区分配给每个消费者 配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor Stricky粘性分配策略 1.分区分配尽可能均匀 2.在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同 org.apache.kafka.clients.consumer.strategyStickyAssignor 副本Replicas: 副本可以确保某个服务器出现故障时,确保数据依然可用

偏移量offset: offset记录着下一条将要发送给Consumer的消息的序号 默认Kafka将offset存储在ZooKeeper中 在一个分区中,消息是按顺序存储,每次在分区消费都有一个递增的id。这个就是偏移量offset auto.offset.reset参数指定了在没有偏移量可提交时或者请求的偏移量在broker上不存在时,消费者如何读取 earliest:消费者会从分区的开始位置读取数据 latest:消费者会从分区的末尾开始读取数据 enable.auto.commit参数true,false让消费者基于任务调度自动提交偏移量,也可以在代码里手动提交偏移量 auto.commit.interval.ms:此参数与enable.auto.commit有直接的联系,如果选择了自动提交偏移量,可以通过此参数配置提交的频度,默认值是每5秒钟提交一次

producer的ACKs参数 acks参数指定了在集群中有多少个分区副本收到消息,kafka producer才会认为消息是被写入成功。 有三种值可以设置,分别是0,1,和all. acks=0是kafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在Partition Leader上落到磁盘,就直接认为这个消息发送成功 acks=1是Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管其他的Follower有没有同步过去这条消息了 acks=all是Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了

Kafka-Eagle简介

是一款结合了目前Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具 官网地址:https://·mon.Cluster; import org.apache.kafka.common.PartitionInfo; import java.time.Duration; import java.util.*; import java.util.concurrent.Future; public class kafka { public static void main(String[] args) throws Exception { //product( ); consumer(); } /** * 生产者 */ public static void product( ) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.200.128:9092");//多个逗号隔开 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //ProducerConfig.RETRIES_CONFIG重试次数 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 1; i++) { Future<RecordMetadata> test = kafkaProducer.send(new ProducerRecord<String, String>("test", "first" + i)); System.out.println(test.get()); System.out.println("send "+i+" ok"); } kafkaProducer.close(); } /** * 消费者 */ public static void consumer( ) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.200.128:9092"); props.put("group.id", "test"); //消费者自动提交offset值 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //max.poll.records拉取大小 /**max.poll.interval.ms poll()调用之间的最大延迟.这为消费者在获取更多记录之前可以闲置的时间设置了上限. 如果在此超时到期之前未调用poll(),则认为使用者失败,并且该组将重新平衡以便将分区重新分配给另一个成员.*/ //heartbeat.interval.ms指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒; //session.timeout.ms:用于检测消费者故障的超时.消费者定期发送心跳以指示其活跃性 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String> (props); //订阅下要消费的topic kafkaConsumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("消费的数据为:" + record.value()); } } // 手动提交offset值 //props.put("enable.auto.commit", "false"); //将所有已接收的记录标记为已提交kafkaConsumer.commitSync(); //消费完每个分区之后手动提交每个分区offset /* while(true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000)); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset(); kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } }*/ //指定分区数据进行消费,主题与分区订阅只能二选一,当手动管理消费分区时,即使GroupID是一样的,Kafka的组协调器都将不再起作用 /* String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); kafkaConsumer.assign(Arrays.asList(partition0, partition1)); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }*/ } } /** * 生费者分区规则 * kafka当中支持以下四种数据的分区方式 * 第一种分区策略,如果既没有指定分区号,也没有指定数据key,那么就会使用轮询的方式将数据均匀的发送到不同的分区里面去 * ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "msg"); * 第二种分区策略 如果没有指定分区号,指定了数据key,通过key.hashCode % numPartitions来计算数据究竟会保存在哪一个分区里面 * ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "key", "msg"); * 第三种分区策略:如果指定了分区号,那么就会将数据直接写入到对应的分区里面去 * ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", 0, "key", "msg" ); * 第四种分区策略:自定义分区策略 * implements Partitioner * props.put("partitioner.class", "w.KafkaCustomPartitioner"); */ class KafkaCustomPartitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int partitionNum = partitions.size(); Random random = new Random(); int partition = random.nextInt(partitionNum); return partition; } @Override public void close() { } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #JAVA #kafka使用 #消息队列的应用场景 #异步处理