Kafka发送消费的路由

2018-10-08|来源:

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。


在发送一条消息时,可以指定这条消息的keyProducer根据这个keyPartition机制来判断应该将这条消息发送到哪个ParitionParitition机制可以通过指定Producerparitition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)


import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class JasonPartitioner<T> implements Partitioner {
 
    public JasonPartitioner(VerifiableProperties verifiableProperties) {}
     
    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}


如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0123)至topic3(包含4Partition)。


public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
      List messageList = new ArrayList<KeyedMessage<String, String>>();
      for(int j = 0; j < 4; j++){
          messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
      }
      producer.send(messageList);
}
producer.close();
}


key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。



转自:http://www.jasongj.com/2015/03/10/KafkaColumn1/


相关问答

更多

kafka查看消费了多少条数据

前面应该还有个数据生产者,比如flume. flume负责生产数据,发送至kafka。 spark streaming作为消费者,实时的从kafka中获取数据进行计算。 计算结果保存至redis,供实时推荐使用。 flume+kafka+spark+redis是实时数据收集与计算的一套经典架构...

flume发送数据到kafka如何设置异步发送

前面应该还有个数据生产者,比如flume. flume负责生产数据,发送至kafka。 spark streaming作为消费者,实时的从kafka中获取数据进行计算。 计算结果保存至redis,供实时推荐使用。 flume+kafka+spark+redis是实时数据收集与计算的一套经典架构...

kafka查看消费了多少条数据

如何查看目前的消费者是否已经读到最新的数据: kafka-run-class.sh kafka.tools.ConsumerOffsetChecker #kafka查看topic各个分区的消息的信息 kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group ** --topic *** --zookeeper *:2181,*:2181,*:2181/kafka --zookeeper 那里是指kafka在zk中的path,即使zk有 ...

如何查看kafka消费者信息

在Kafak中国社区的qq群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。怎么确定分区数?“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka官网上标榜自己是"high-throughputdistributedmessagingsy ...

如何用Java向kafka发送json数据

使用了receivers来接收数据,利用的是Kafka高层次的消费者api

kafka分布式消息队列

相关文章

更多

最近更新

更多