kafka教程

kafka是一种分布式消息队列,同类产品有rabbitmq、activemq。kafka常常结合storm等流式大数据处理框架使用。kafka是目前相当流程的消息队列框架。

使用spring-integration-kafka操作kafka

2016-10-05| 发布: | 浏览: 1803 |保存PDF

依赖

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>1.2.0.RELEASE</version>
</dependency>



消息生产者

消息生产者spring配置spring-integration-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/integration
       http://www.springframework.org/schema/integration/spring-integration.xsd
       http://www.springframework.org/schema/integration/kafka
       http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
       ">
        
    <context:property-placeholder location="classpath:kafka.properties" />
 
    <!--kafka config -->
    <int:channel id="inputToKafka" />
 
    <int-kafka:outbound-channel-adapter
        kafka-producer-context-ref="kafkaProducerContext" auto-startup="true"
        channel="inputToKafka" order="1">
    </int-kafka:outbound-channel-adapter>
 
    <bean id="producerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
                <prop key="message.send.max.retries">5</prop>
                <prop key="send.buffer.bytes">5242880</prop>
            </props>
        </property>
    </bean>
 
    <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
 
    <int-kafka:producer-context id="kafkaProducerContext"
        producer-properties="producerProperties">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration
                broker-list="${bootstrap.servers}" key-serializer="stringSerializer"
                value-class-type="java.lang.String" value-serializer="stringSerializer"
                topic="test1" />
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
</beans>



创建一个生产者测试类ProducerTest.java,往topic=test1中创建一些消息


package com._656463.demo.kafka.springintegrationkafka;
 
import java.util.Random;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:spring/spring-integration-producer.xml")
public class ProducerTest {
     
    @Autowired
    @Qualifier("inputToKafka")
    private MessageChannel channel;
     
    @Test
    public void test1() {
        Random rand = new Random();
        for (int i = 0; i < 100; i++) {
            channel.send(MessageBuilder.withPayload("Message-" + rand.nextInt())
                    .setHeader("messageKey", String.valueOf(i)).setHeader("topic", "test1").build());
        }
    }
}



消息消费者

消费者的spring配置spring-integration-consumer.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xsi:schemaLocation="
      http://www.springframework.org/schema/integration/kafka 
      http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
      http://www.springframework.org/schema/integration 
      http://www.springframework.org/schema/integration/spring-integration.xsd
      http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans.xsd">
 
    <int:channel id="inputFromKafka" />
 
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="192.168.56.101:2181" zk-connection-timeout="6000"
        zk-session-timeout="6000" zk-sync-time="2000" />
 
    <int-kafka:inbound-channel-adapter
        id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
        auto-startup="false" channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>
 
    <bean id="kafkaDecoder"
        class="org.springframework.integration.kafka.serializer.common.StringDecoder">
    </bean>
 
    <bean id="consumerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
 
    <bean id="kafkaService" class="com._656463.demo.kafka.springintegrationkafka.KafkaService" />
    <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaService" method="processMessage" />
     
    <int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
        consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
                max-messages="5000">
                <int-kafka:topic id="test1" streams="4" />
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
</beans>

此配置中创建了一个kafkaService,使用该类的processMessage方法消费


KafkaService.java


package com._656463.demo.kafka.springintegrationkafka;
 
import java.util.Map;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class KafkaService {
    static final Logger logger = LoggerFactory.getLogger(KafkaService.class);
 
    public void processMessage(Map<String, Map<Integer, String>> msgs) {
        for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
            System.out.println("Consumer Message received: ");
            logger.info("Suchit Topic:" + entry.getKey());
            for (String msg : entry.getValue().values()) {
                logger.info("Suchit Consumed Message: " + msg);
            }
        }
    }
}


测试消费者的时候,只要把spring启动

package com._656463.demo.kafka.springintegrationkafka;
 
import org.springframework.beans.BeansException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class KafkaConsumerTest {
    public static void main(String[] args) {
        try {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                    "classpath:spring/spring-integration-consumer.xml");
            context.start();
        } catch (BeansException e) {
            e.printStackTrace();
        }
 
        synchronized (KafkaConsumerTest.class) {
            while (true) {
                try {
                    KafkaConsumerTest.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}



书生参考网络整理


系列教程

大家都在看

热门访问