kafka教程

kafka是一种分布式消息队列,同类产品有rabbitmq、activemq。kafka常常结合storm等流式大数据处理框架使用。kafka是目前相当流程的消息队列框架。

Kafka发送消费的路由

2016-10-08| 发布: | 浏览: 1046 |保存PDF

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。


在发送一条消息时,可以指定这条消息的keyProducer根据这个keyPartition机制来判断应该将这条消息发送到哪个ParitionParitition机制可以通过指定Producerparitition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)


import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class JasonPartitioner<T> implements Partitioner {
 
    public JasonPartitioner(VerifiableProperties verifiableProperties) {}
     
    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}


如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0123)至topic3(包含4Partition)。


public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
      List messageList = new ArrayList<KeyedMessage<String, String>>();
      for(int j = 0; j < 4; j++){
          messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
      }
      producer.send(messageList);
}
producer.close();
}


key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。



转自:http://www.jasongj.com/2015/03/10/KafkaColumn1/


系列教程

大家都在看

热门访问