使用kafka-clients api操作Kafka

2018-10-04|来源:

引入kafka-clients相关依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.1</version>
    <exclusions>
        <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.5</version>
    <exclusions>
        <exclusion>
            <groupId>com.sun.jmx</groupId>
            <artifactId>jmxri</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.sun.jdmk</groupId>
            <artifactId>jmxtools</artifactId>
        </exclusion>
        <exclusion>
            <groupId>javax.jms</groupId>
            <artifactId>jms</artifactId>
        </exclusion>
    </exclusions>
</dependency>


消息生产者

package com._656463.demo.kafka.simple;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
/**
 * 简单的消息生产者
 */
public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zk.connect", "master:2181,slave1:2181,slave2:2181");
        // serializer.class为消息的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例
        props.put("metadata.broker.list", "master:9092,slave1:9092,slave2:9092");
        // ACK机制, 消息发送需要kafka服务端确认
        props.put("request.required.acks", "1");
 
        props.put("num.partitions", "3");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        for (int i = 0; i < 10; i++) {
            SimpleDateFormat formatter = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss SSS");
            Date curDate = new Date(System.currentTimeMillis());
            String str = formatter.format(curDate);
 
            String msg = "test" + i + "=" + str;
            String key = i + "";
             
            /**
             * KeyedMessage<K, V>,K对应Partition Key的类型,V对应消息本身的类型
             * topic: "test", key: "key", message: "message"
             */
            producer.send(new KeyedMessage<String, String>("test1",key, msg));
        }
    }
}


消息消费者

package com._656463.demo.kafka.simple;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
/**
 * 消息消费端
 */
public class SimpleConsumer extends Thread {
 
    private final ConsumerConnector consumer;
    private final String topic;
 
    public static void main(String[] args) {
        SimpleConsumer consumerThread = new SimpleConsumer("test1");
        consumerThread.start();
    }
 
    public SimpleConsumer(String topic) {
        consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
        this.topic = topic;
    }
 
    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        // 设置zookeeper的链接地址
        props.put("zookeeper.connect", "master:2181,slave1:2181,slave2:2181");
        // 设置group id
        props.put("group.id", "1");
        // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
        props.put("auto.commit.interval.ms", "1000");
        props.put("zookeeper.session.timeout.ms", "10000");
        return new ConsumerConfig(props);
    }
 
    public void run() {
        // 设置Topic=>Thread Num映射关系, 构建具体的流
        Map<String, Integer> topickMap = new HashMap<String, Integer>();
        topickMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
        KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        System.out.println("*********Results********");
        while (it.hasNext()) {
            System.err.println("get data:" + new String(it.next().message()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


运行程序可以看到,刚好10条信息,没有丢失。不过消息因为均衡的原因,并非是有序的,在Kafka只提供了分区内部的有序性,不能跨partition. 每个分区的有序性,结合按Keypartition的能力对大多应用都够用了。

书生参考于网络整理


相关问答

更多

kafka中如何根据时间获取topic中消息的offset??

我不生产答案,我只是当一回Stackoverflow的搬运工。今天刚好在Stackoverflow查Kafka的 一个问题,顺带看到的。 For finding the start offset to read in Kafka 0.8 Simple Consumer example they say Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of ...

kafka集群是否启动成功?

你的问题在于如何让一个程序一直在后台运行,还是只是窗口运行, (1)如果要kafka进程一直常驻并且在后台不关闭,启动如下: cd /home/kafka/kafka_2.10-0.9.0.0/ nohup bin/kafka-server-start.sh config/server.properties & (2)如果只是想窗口运行,启动如下: cd /home/kafka/kafka_2.10-0.9.0.0/ bin/kafka-server-start.sh config/server. ...

kafka脱离了zookeeper可以集群吗

不可以,kafka必须要依赖一个zookeeper集群才能运行。kafka系群里面各个broker都是通过zookeeper来同步topic列表以及其它broker列表的,一旦连不上zookeeper,kafka也就无法工作。

kafka的 broker都有什么操作

activemq基础的消息系统kafka分布式消息系统jafka是kafka的java版本(kafka是scala写的)

kafka apache 使用在什么场合

1、Messaging 对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等) 2、Websit activity tracking kafka可以作为" ...

kafka分布式消息队列

相关文章

更多

最近更新

更多