首页 > 编程开发 > java教程网 > kafka教程

使用spring-integration-kafka操作kafka

2016-10-05 07:08:35| 发布: | 浏览: 1473

依赖

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>1.2.0.RELEASE</version>
</dependency>



消息生产者

消息生产者spring配置spring-integration-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/integration
       http://www.springframework.org/schema/integration/spring-integration.xsd
       http://www.springframework.org/schema/integration/kafka
       http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
       ">
        
    <context:property-placeholder location="classpath:kafka.properties" />
 
    <!--kafka config -->
    <int:channel id="inputToKafka" />
 
    <int-kafka:outbound-channel-adapter
        kafka-producer-context-ref="kafkaProducerContext" auto-startup="true"
        channel="inputToKafka" order="1">
    </int-kafka:outbound-channel-adapter>
 
    <bean id="producerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
                <prop key="message.send.max.retries">5</prop>
                <prop key="send.buffer.bytes">5242880</prop>
            </props>
        </property>
    </bean>
 
    <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
 
    <int-kafka:producer-context id="kafkaProducerContext"
        producer-properties="producerProperties">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration
                broker-list="${bootstrap.servers}" key-serializer="stringSerializer"
                value-class-type="java.lang.String" value-serializer="stringSerializer"
                topic="test1" />
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
</beans>



创建一个生产者测试类ProducerTest.java,往topic=test1中创建一些消息


package com._656463.demo.kafka.springintegrationkafka;
 
import java.util.Random;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:spring/spring-integration-producer.xml")
public class ProducerTest {
     
    @Autowired
    @Qualifier("inputToKafka")
    private MessageChannel channel;
     
    @Test
    public void test1() {
        Random rand = new Random();
        for (int i = 0; i < 100; i++) {
            channel.send(MessageBuilder.withPayload("Message-" + rand.nextInt())
                    .setHeader("messageKey", String.valueOf(i)).setHeader("topic", "test1").build());
        }
    }
}



消息消费者

消费者的spring配置spring-integration-consumer.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xsi:schemaLocation="
      http://www.springframework.org/schema/integration/kafka 
      http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
      http://www.springframework.org/schema/integration 
      http://www.springframework.org/schema/integration/spring-integration.xsd
      http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans.xsd">
 
    <int:channel id="inputFromKafka" />
 
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="192.168.56.101:2181" zk-connection-timeout="6000"
        zk-session-timeout="6000" zk-sync-time="2000" />
 
    <int-kafka:inbound-channel-adapter
        id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
        auto-startup="false" channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>
 
    <bean id="kafkaDecoder"
        class="org.springframework.integration.kafka.serializer.common.StringDecoder">
    </bean>
 
    <bean id="consumerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
 
    <bean id="kafkaService" class="com._656463.demo.kafka.springintegrationkafka.KafkaService" />
    <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaService" method="processMessage" />
     
    <int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
        consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
                max-messages="5000">
                <int-kafka:topic id="test1" streams="4" />
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
</beans>

此配置中创建了一个kafkaService,使用该类的processMessage方法消费


KafkaService.java


package com._656463.demo.kafka.springintegrationkafka;
 
import java.util.Map;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class KafkaService {
    static final Logger logger = LoggerFactory.getLogger(KafkaService.class);
 
    public void processMessage(Map<String, Map<Integer, String>> msgs) {
        for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
            System.out.println("Consumer Message received: ");
            logger.info("Suchit Topic:" + entry.getKey());
            for (String msg : entry.getValue().values()) {
                logger.info("Suchit Consumed Message: " + msg);
            }
        }
    }
}


测试消费者的时候,只要把spring启动

package com._656463.demo.kafka.springintegrationkafka;
 
import org.springframework.beans.BeansException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class KafkaConsumerTest {
    public static void main(String[] args) {
        try {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                    "classpath:spring/spring-integration-consumer.xml");
            context.start();
        } catch (BeansException e) {
            e.printStackTrace();
        }
 
        synchronized (KafkaConsumerTest.class) {
            while (true) {
                try {
                    KafkaConsumerTest.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}



书生参考网络整理


如非特别注明,本站内容均为领悟书生原创,转载请务必注明作者和原始出处。
本文地址:http://www.656463.com/kafka/EnQzYn.htm

相关专题

  • dubbo教程

    dubbo教程

    DUBBO是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,是阿里巴巴SOA服务化治理方案的核心框架,本教程带你入门学习dubbo框架的相关知识

  • RabbitMQ 教程

    RabbitMQ 教程

    RabbitMQ是一个开源的,在AMQP基础上完整的,可复用的企业消息系统。支持主流的操作系统,Linux、Windows、MacOX等。多种开发语言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等。本教程带你学习RabbitMQ环境搭建、RabbitMQ简单队列、work队列、发布订阅消息、各种Exchange应用、与spring整合等

  • java8新特征详解

    java8新特征详解

    本教程详细讲解Java8的新特新,结合简单的示例代码来讲解如何使用默认接口方法,lambda表达式,方法引用以及多重Annotation,同时你将会学到JAVA8最新的API上的改进,比如流,函数式接口,Map以及全新的日期API

  • WebSocket教程

    WebSocket教程

    WebSocket 规范的目标是在浏览器中实现和服务器端双向通信.双向通信可以拓展浏览器上的应用类型,例如实时的数据推送(股票行情),游戏,聊天/im 等.本教程通过java简单的示例带你快速学会WebSocket编程

  • json教程

    json教程

    JSON 即 JavaScript Object Natation,它是一种轻量级的数据交换格式,非常适合于服务器与 JavaScript 的交互。本专题详细讲解json、jackson、fastjson等工具包的操作教程

  • junit教程

    junit教程

    junit基本介绍,hamcrest和testSuite介绍,基于测试开发讲解和cobertura框架介绍,stub和mock简介,dbunit的使用,dbunit实际运用,easymock的使用,easymock的实际应用,利用easymock测试简单的servlet,cactus的使用,基于Jetty的cactus的使用

  • kafka分布式消息队列

    kafka分布式消息队列

    kafka是一种分布式消息队列,同类产品有rabbitmq、activemq。kafka常常结合storm等流式大数据处理框架使用。kafka是目前相当流程的消息队列框架。

  • i18n 国际化

    i18n 国际化

    国际化(internationalization)又称为 i18n(读法为i 18 n,据说是因为internationalization(国际化)这个单词从i到n之间有18个英文字母,i18n的名字由此而来)

  • freemarker教程

    freemarker教程

    freemarker是现在企业中用得最多的模板引擎,可以根据模板生成相当的静态页面等

  • Struts2教程

    Struts2教程

    Struts2,Struts2教程,Struts2学习,Struts2实例,Struts2视频教程

  • Hibernate教程

    Hibernate教程

    Hibernate,Hibernate教程,Hibernate学习,Hibernate实例,Hibernate视频教程

  • spring教程

    spring教程

    spring,spring教程,spring学习,spring实例,spring视频教程

  • java视频教程

    java视频教程

    java,视频教程,java视频教程,java web 视频教程,java基础视频教程