使用spring-integration-kafka操作kafka

2016-10-05|来源:

依赖

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>1.2.0.RELEASE</version>
</dependency>



消息生产者

消息生产者spring配置spring-integration-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/integration
       http://www.springframework.org/schema/integration/spring-integration.xsd
       http://www.springframework.org/schema/integration/kafka
       http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
       ">
        
    <context:property-placeholder location="classpath:kafka.properties" />
 
    <!--kafka config -->
    <int:channel id="inputToKafka" />
 
    <int-kafka:outbound-channel-adapter
        kafka-producer-context-ref="kafkaProducerContext" auto-startup="true"
        channel="inputToKafka" order="1">
    </int-kafka:outbound-channel-adapter>
 
    <bean id="producerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
                <prop key="message.send.max.retries">5</prop>
                <prop key="send.buffer.bytes">5242880</prop>
            </props>
        </property>
    </bean>
 
    <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
 
    <int-kafka:producer-context id="kafkaProducerContext"
        producer-properties="producerProperties">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration
                broker-list="${bootstrap.servers}" key-serializer="stringSerializer"
                value-class-type="java.lang.String" value-serializer="stringSerializer"
                topic="test1" />
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
</beans>



创建一个生产者测试类ProducerTest.java,往topic=test1中创建一些消息


package com._656463.demo.kafka.springintegrationkafka;
 
import java.util.Random;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:spring/spring-integration-producer.xml")
public class ProducerTest {
     
    @Autowired
    @Qualifier("inputToKafka")
    private MessageChannel channel;
     
    @Test
    public void test1() {
        Random rand = new Random();
        for (int i = 0; i < 100; i++) {
            channel.send(MessageBuilder.withPayload("Message-" + rand.nextInt())
                    .setHeader("messageKey", String.valueOf(i)).setHeader("topic", "test1").build());
        }
    }
}



消息消费者

消费者的spring配置spring-integration-consumer.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xsi:schemaLocation="
      http://www.springframework.org/schema/integration/kafka 
      http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
      http://www.springframework.org/schema/integration 
      http://www.springframework.org/schema/integration/spring-integration.xsd
      http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans.xsd">
 
    <int:channel id="inputFromKafka" />
 
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="192.168.56.101:2181" zk-connection-timeout="6000"
        zk-session-timeout="6000" zk-sync-time="2000" />
 
    <int-kafka:inbound-channel-adapter
        id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
        auto-startup="false" channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>
 
    <bean id="kafkaDecoder"
        class="org.springframework.integration.kafka.serializer.common.StringDecoder">
    </bean>
 
    <bean id="consumerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
 
    <bean id="kafkaService" class="com._656463.demo.kafka.springintegrationkafka.KafkaService" />
    <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaService" method="processMessage" />
     
    <int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
        consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
                max-messages="5000">
                <int-kafka:topic id="test1" streams="4" />
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
</beans>

此配置中创建了一个kafkaService,使用该类的processMessage方法消费


KafkaService.java


package com._656463.demo.kafka.springintegrationkafka;
 
import java.util.Map;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class KafkaService {
    static final Logger logger = LoggerFactory.getLogger(KafkaService.class);
 
    public void processMessage(Map<String, Map<Integer, String>> msgs) {
        for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
            System.out.println("Consumer Message received: ");
            logger.info("Suchit Topic:" + entry.getKey());
            for (String msg : entry.getValue().values()) {
                logger.info("Suchit Consumed Message: " + msg);
            }
        }
    }
}


测试消费者的时候,只要把spring启动

package com._656463.demo.kafka.springintegrationkafka;
 
import org.springframework.beans.BeansException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class KafkaConsumerTest {
    public static void main(String[] args) {
        try {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                    "classpath:spring/spring-integration-consumer.xml");
            context.start();
        } catch (BeansException e) {
            e.printStackTrace();
        }
 
        synchronized (KafkaConsumerTest.class) {
            while (true) {
                try {
                    KafkaConsumerTest.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}



书生参考网络整理


相关问答

更多

spring-integration-kafka会自动启动kafka服务吗

Jafka/KafkaKafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台

在spring怎么读取kafka文件

ound Channel Adapter用来发送消息到Kafka。 消息从Spring Integration Channel中读取。 你可以在Spring application context指定这个channel。 一旦配置好这个Channel,就可以利用这个Channel往Kafka发消息。 明显地,Spring Integration特定的消息发送给这个Adaptor,然后发送前在内部被转为Kafka消息。当前的版本要求你必须指定消息key和topic作为头部数据 (header),消息 ...

kafka集群是否启动成功?

你的问题在于如何让一个程序一直在后台运行,还是只是窗口运行, (1)如果要kafka进程一直常驻并且在后台不关闭,启动如下: cd /home/kafka/kafka_2.10-0.9.0.0/ nohup bin/kafka-server-start.sh config/server.properties & (2)如果只是想窗口运行,启动如下: cd /home/kafka/kafka_2.10-0.9.0.0/ bin/kafka-server-start.sh config/server. ...

kafka脱离了zookeeper可以集群吗

不可以,kafka必须要依赖一个zookeeper集群才能运行。kafka系群里面各个broker都是通过zookeeper来同步topic列表以及其它broker列表的,一旦连不上zookeeper,kafka也就无法工作。

kafka librdkafka怎么安装使用

答:这个是scala的编译库喝运行时库(kafka是用sbt管理依赖的),所以建议你使用sbt,会自动下载所有依赖

专题教程

JAVA概述
第一部分:java入门基础
第二部分:java常用类
第三部分:jdbc系列教程
第四部分:java高级特征
Gson教程
快速了解 jdk8 新特征

相关文章

更多

最近更新

更多