首页 > 编程开发 > java教程网 > kafka教程

Kafka发送消费的路由

2016-10-08 07:26:23| 发布: | 浏览: 657

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。


在发送一条消息时,可以指定这条消息的keyProducer根据这个keyPartition机制来判断应该将这条消息发送到哪个ParitionParitition机制可以通过指定Producerparitition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)


import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class JasonPartitioner<T> implements Partitioner {
 
    public JasonPartitioner(VerifiableProperties verifiableProperties) {}
     
    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}


如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0123)至topic3(包含4Partition)。


public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
      List messageList = new ArrayList<KeyedMessage<String, String>>();
      for(int j = 0; j < 4; j++){
          messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
      }
      producer.send(messageList);
}
producer.close();
}


key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。



转自:http://www.jasongj.com/2015/03/10/KafkaColumn1/


如非特别注明,本站内容均为领悟书生原创,转载请务必注明作者和原始出处。
本文地址:http://www.656463.com/kafka/Y3Irae.htm

相关专题

  • dubbo教程

    dubbo教程

    DUBBO是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,是阿里巴巴SOA服务化治理方案的核心框架,本教程带你入门学习dubbo框架的相关知识

  • RabbitMQ 教程

    RabbitMQ 教程

    RabbitMQ是一个开源的,在AMQP基础上完整的,可复用的企业消息系统。支持主流的操作系统,Linux、Windows、MacOX等。多种开发语言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等。本教程带你学习RabbitMQ环境搭建、RabbitMQ简单队列、work队列、发布订阅消息、各种Exchange应用、与spring整合等

  • java8新特征详解

    java8新特征详解

    本教程详细讲解Java8的新特新,结合简单的示例代码来讲解如何使用默认接口方法,lambda表达式,方法引用以及多重Annotation,同时你将会学到JAVA8最新的API上的改进,比如流,函数式接口,Map以及全新的日期API

  • WebSocket教程

    WebSocket教程

    WebSocket 规范的目标是在浏览器中实现和服务器端双向通信.双向通信可以拓展浏览器上的应用类型,例如实时的数据推送(股票行情),游戏,聊天/im 等.本教程通过java简单的示例带你快速学会WebSocket编程

  • json教程

    json教程

    JSON 即 JavaScript Object Natation,它是一种轻量级的数据交换格式,非常适合于服务器与 JavaScript 的交互。本专题详细讲解json、jackson、fastjson等工具包的操作教程

  • junit教程

    junit教程

    junit基本介绍,hamcrest和testSuite介绍,基于测试开发讲解和cobertura框架介绍,stub和mock简介,dbunit的使用,dbunit实际运用,easymock的使用,easymock的实际应用,利用easymock测试简单的servlet,cactus的使用,基于Jetty的cactus的使用

  • kafka分布式消息队列

    kafka分布式消息队列

    kafka是一种分布式消息队列,同类产品有rabbitmq、activemq。kafka常常结合storm等流式大数据处理框架使用。kafka是目前相当流程的消息队列框架。

  • i18n 国际化

    i18n 国际化

    国际化(internationalization)又称为 i18n(读法为i 18 n,据说是因为internationalization(国际化)这个单词从i到n之间有18个英文字母,i18n的名字由此而来)

  • freemarker教程

    freemarker教程

    freemarker是现在企业中用得最多的模板引擎,可以根据模板生成相当的静态页面等

  • Struts2教程

    Struts2教程

    Struts2,Struts2教程,Struts2学习,Struts2实例,Struts2视频教程

  • Hibernate教程

    Hibernate教程

    Hibernate,Hibernate教程,Hibernate学习,Hibernate实例,Hibernate视频教程

  • spring教程

    spring教程

    spring,spring教程,spring学习,spring实例,spring视频教程

  • java视频教程

    java视频教程

    java,视频教程,java视频教程,java web 视频教程,java基础视频教程