1. 首页
  2. kafka实践

8. kafka分区

分区策略

构造KafkaProducer代码如下:

  Properties props = new Properties();
    props.put("bootstrap.servers", "10.0.55.229:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // 定义分区策略(默认实现: org.apache.kafka.clients.producer.internals.DefaultPartitioner)
    props.put("partitioner.class", "com.afei.kafka.KafkaCustomPartitioner");
    kafkaProducer = new KafkaProducer<>(props);

属性partitioner.class就是决定消息如何分区的,默认实现类是DefaultPartitioner,源码注释如下:

  The default partitioning strategy:
    If a partition is specified in the record, use it;
    If no partition is specified but a key is present choose a partition based on a hash of the key;
    If no partition or key is present choose a partition in a round-robin fashion;

源码分析

在调用send()方法发送消息时,会调用如下代码选择分区:

  int partition = partition(record, serializedKey, serializedValue, cluster);

partition()方法源码如下:

  /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        // 如果构造的ProducerRecord指定了partition,则发送到指定的分区;否则调用partitioner选择一个分区。(如果通过参数partitioner.class指定了自定义分区实现则用其选择分区,否则用默认的DefaultPartitioner选择分区)
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

DefaultPartitioner即默认分区选取策略的源码如下:


public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); @Override public void configure(Map<String, ?> configs) { } /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 得到topic所有的分区信息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); // 得到topic分区数量 int numPartitions = partitions.size(); // 如果发送的消息ProducerRecord没有key的话 if (keyBytes == null) { // 得到topic对应的值,由值的计算方式(counter.getAndIncrement())可知,每个topic下每个客户端采取轮询策略往分区中写入消息 int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); // 如果有效的分区集合不为空,那么轮询有效的分区写入消息(即有效分区集合优先原则) if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // 如果有效的分区集合为空,那么轮询无效的分区(即全部分区)写入消息 // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // 如果发送的消息ProducerRecord有key的话, 根据key的murmur2 hash结果对分区数量取模就是最终选择的分区(所以如果想让消息有序,只需给消息指定相同的key即可,有相同的key,就会有相同的hash值,就会选择相同的分区,同一个分区的消息又是有序的) // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { // 每个topic对应不同的counter AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { // 如果topic还从没没发过消息,或者客户端重启,那么counter为空,那么给这个topic初始化一个AtomicInteger counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } @Override public void close() {} }

通过设置相同key来保证消息有序性,这里可能还会有一点小小的缺陷。例如消息发送设置了重试机制,并且异步发送,消息A和B设置相同的key,业务上A先发,B后发。由于网络或者其他原因A发送失败,B发送成功;A由于发送失败就会重试且重试成功,这时候消息顺序B在前A在后,与业务发送顺序不一致。如果需要解决这个问题,需要设置参数max.in.flight.requests.per.connection=1,其含义是限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求,这个参数默认值是5。官方文档说明如下,这个参数如果大于1,由于重试消息顺序可能重排:

  The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

自定义

如过要自定义分区实现非常简单,只需要自定义类实现接口org.apache.kafka.clients.producer.Partitioner,并作如下配置即可:

  props.put("partitioner.class", "com.afei.kafka.KafkaCustomPartitioner");

KafkaCustomPartitioner就是自定义实现类,假定分区策略如下:

  分区要求至少5个,如果key以vip开头,表示是重要消息,重要消息在除了最后两个分区之外的分区中遍历寻找分区并写入;否则,表示是一般消息,一般消息只在最后两个分区中遍历寻找分区并写入;

分区实现的核心源码如下:

  /**
     * @author afei
     * @version 1.0.0
     * @since 2018年06月18日
     */
    public class KafkaCustomPartitioner implements Partitioner{

        /**
         * 保存普通消息topic与其对应的编号(每次取编号时需要自增),通过编号能够得到目标分区
         */
        private final ConcurrentMap<String, AtomicInteger> normalTopicCounterMap = new ConcurrentHashMap<>();
        /**
         * 保存重要消息topic与其对应的编号(每次取编号时需要自增),通过编号能够得到目标分区
         */
        private final ConcurrentMap<String, AtomicInteger> importTopicCounterMap = new ConcurrentHashMap<>();

        private static final int MIN_PARTITION_NUM = 5;

        @Override
        public int partition(String topic,
                             Object key, byte[] keyBytes,
                             Object value, byte[] valueBytes,
                             Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            // MIN_PARTITION_NUM即需求定义的要求的最小的分区数5
            if (numPartitions< MIN_PARTITION_NUM){
                throw new IllegalStateException("The num of partition must be greater than "+ MIN_PARTITION_NUM);
            }

            int index;
            boolean vipMsg = keyBytes != null && new String(keyBytes).startsWith("vip");
            int nextValue = this.nextValue(topic, vipMsg);
            if (vipMsg) {
                // 重要消息在除了最后两个分区之外的分区中遍历寻找分区并写入
                index = Utils.toPositive(nextValue) % (numPartitions-2);
            } else {
                // 一般消息只在最后两个分区中遍历寻找分区并写入
                index = Utils.toPositive(nextValue) % 2 + (numPartitions-2);
            }
            System.out.println("topic = "+topic+" ,index = "+index+" ,vip?"+vipMsg+", key = "+new String(keyBytes));
            return index;
        }

        private int nextValue(String topic, boolean vip) {
            AtomicInteger counter = vip? importTopicCounterMap.get(topic): normalTopicCounterMap.get(topic);
            if (null == counter) {
                counter = new AtomicInteger(0);
                AtomicInteger currentCounter = vip? importTopicCounterMap.putIfAbsent(topic, counter): normalTopicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    counter = currentCounter;
                }
            }
            return counter.getAndIncrement();
        }

        @Override
        public void close() {

        }

        @Override
        public void configure(Map<String, ?> configs) {

        }
    }

作者:阿飞的博客

来源:https://www.jianshu.com/p/849e22317a7f


JS中文网,Javascriptc中文网是中国领先的新一代开发者社区和专业的技术媒体,一个帮助开发者成长的社区,是给开发者用的 Hacker News,技术文章由为你筛选出最优质的干货,其中包括:Android、iOS、前端、后端等方面的内容。目前已经覆盖和服务了超过 300 万开发者,你每天都可以在这里找到技术世界的头条内容。

本文著作权归作者所有,如若转载,请注明出处

转载请注明:文章转载自「 Java极客技术学习 」https://www.javajike.com

标题:8. kafka分区

链接:https://www.javajike.com/article/1792.html

« 9. kafka shell脚本用法详解
7. kafka序列化&反序列化»

相关推荐

QR code