irpas技术客

Kafka的分区规则(轮询分区、黏性分区)/ 生产者实现生产数据的负载均衡_在上树的路上_kafka生产者负载均衡

网络 8205

1.分区参数partition说明

在说Kafka分区规则前,先看一下partition的计算方法以确定数据的分区。从而使topic加上分区编号构建分区对象,将数据写入该分区中。代码如下。

int partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition);

record是生产者数据对象,它的全参构建方法代码如下,通过参数Integer partition可以指定数据的传入分区。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { this(topic, partition, timestamp, key, value, (Iterable)null); }

下面是对于传入参数的处理。根据传入的ProducerRecord对象,判断是否有指定数据传入分区,如果没有指定,则默认使用KafkaProducer分区规则;如果指定了,则直接返回指定分区。

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } 2.默认分区规则 如果指定了key:按照key的hash/mur取余分区的个数,来写入对应分区如果没有指定key:按照黏性分区写入 public class DefaultPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache(); public DefaultPartitioner() { } public void configure(Map<String, ?> configs) { } //*客官~~~来~~看这边看这边看这边* public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //首先判断是否有传入key if (keyBytes == null) { //如果没有,则返回**黏性分区** return this.stickyPartitionCache.partition(topic, cluster); } else { //获取这个topic的所有分区对象 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //获取分区个数 int numPartitions = partitions.size(); //使用key的mur值取余分区个数(和哈希取余的方法类似)得到当前key对应value的写入分区 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } public void close() { } public void onNewBatch(String topic, Cluster cluster, int prevPartition) { this.stickyPartitionCache.nextPartition(topic, cluster, prevPartition); } } 3.轮询分区

在2.4版本之前,没有黏性分区,默认的是轮询分区。 它的原理很简单:就是轮着来给分区。

Topicpartkeyvaluetopic101value1topic112value2topic123value3topic104value4topic115value5topic126value6

优点:数据分配均衡 缺点:性能较差

性能差的原因:

Kafka生产者写入数据过程step1:先将数据放入一个批次中,判断是否达到条件,达到条件才将整个批次的数据写入kafka 批次满了【batch.size】达到一定时间【linger.ms】 step2:根据数据属于哪个分区,就与分区构建一个连接,发送这个分区的批次的数据 第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第—条第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条…每条数据需要构建一个批次,9条数据,9个批次,每个批次只有—条数据,每个批次又得构建连接,性能很差 总结:轮询分区会将一条数据构成一个批次,每个批次都需要构建一个连接,性能差希望:批次少,每个批次数据量多,性能比较好

源码:

public class RoundRobinPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap(); public RoundRobinPartitioner() { } public void configure(Map<String, ?> configs) { } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = this.nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> { return new AtomicInteger(0); }); return counter.getAndIncrement(); } public void close() { } } 4.黏性分区

2.4版本后,默认分区改为黏性分区

设计:实现少批次多数据规则:判断缓存中是否有这个topic的分区连接,如果有,直接使用,如果没有随机写入一个分区,并且放入缓存优点:性能好,整体数据的分配相对均衡缺点:分配没有绝对均衡思想总结:随机选择分区将一个批次的数据放入缓存,随机与一个分区构建连接,将数据全部传入分区,清空缓存。然后再将一个批次的数据放入缓存,随机·······

数据写入过程:

第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中 Topicpartkeyvaluetopic111value1topic112value2topic113value3topic114value4topic115value5topic116value6
第二次开始根据缓存中是否有上一次的编号 有:直接使用上一次的编号没有:重新选择一个

源码:

public class StickyPartitionCache { private final ConcurrentMap<String, Integer> indexCache = new ConcurrentHashMap(); public StickyPartitionCache() { } public int partition(String topic, Cluster cluster) { Integer part = (Integer)this.indexCache.get(topic); return part == null ? this.nextPartition(topic, cluster, -1) : part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = (Integer)this.indexCache.get(topic); Integer newPart = oldPart; if (oldPart != null && oldPart != prevPartition) { return (Integer)this.indexCache.get(topic); } else { List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); Integer random; if (availablePartitions.size() < 1) { //这边-------------这边 //核心思想:随机选择一个分区 random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = ((PartitionInfo)availablePartitions.get(0)).partition(); } else { while(newPart == null || newPart.equals(oldPart)) { random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = ((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition(); } } if (oldPart == null) { this.indexCache.putIfAbsent(topic, newPart); } else { this.indexCache.replace(topic, prevPartition, newPart); } return (Integer)this.indexCache.get(topic); } } } 5.总结

Kafka中生产数据的分区规则是什么? Kafka生产者怎么实现生产数据的负载均衡? 为什么生产数据的方式不同,分区规则就不一样?

先判断是否指定了分区如果指定了,就写入指定的分区再判断是否指定了Key如果指定了Key,按照Key的mur取余分区个数来决定如果没有指定Key,按照黏性分区


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #kafka生产者负载均衡 #1 #先判断是否指定了分区 #2 #如果指定了就写入指定的分区 #3