RabbitMQ 发布订阅模式(Publish、Subscribe)

2016-07-05|来源:

1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的


消息生产者

private final static String EXCHANGE_NAME = "test_exchange_fanout";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 消息内容
    String message = "订阅消息";
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
}

注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为交换机没有存储消息的能力,消息只能存在在队列中。


消息消费者

private final static String QUEUE_NAME_1 = "test_queue_exchange_1";
private final static String QUEUE_NAME_2 = "test_queue_exchange_2";
@Test
public void testRecv1() throws Exception {
    recv(QUEUE_NAME_1,EXCHANGE_NAME);
}
@Test
public void testRecv2() throws Exception {
    recv(QUEUE_NAME_2,EXCHANGE_NAME);
}
public void recv(String queueName,String exchangeName) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(queueName, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(queueName, exchangeName, "");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(queueName, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


注:如果交换机没有启动起来,会报异常
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test_exchange_fanout' in vhost '/testvhost', class-id=50, method-id=20)
启动消费者之后, 在管理后台可以看到队列和交换机的绑定关系:

生产者再发一次消息,两个消费者都能获取该消息。


书生整理于网络


相关问答

更多

redis发布订阅模式 publish数据存在哪

redis发布订阅,当然必然会有一个内存队列,暂时缓存。然后发给订阅者,如果订阅者没准备好,那么会错过这条信息。如果期间发生宕机,重启后队列中的数据就会丢失

spring-rabbit-1.3.5.release.jar支持什么版本的rabbitmq

这个版本很老了,在maven仓库http://mvnrepository.com查了下,1.3.5.RELEASE 是在 2014年6月发布的,你可以是使用3.2.x或3.3.x版本的rabbitmq,下载地址: http://mvnrepository.com/artifact/com.rabbitmq/amqp-client

net redis 和rabbitmq 有什么区别

RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 Redis 是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。 具体对比 可靠消费 Redis:没有相应的机制保证 ...

如何使用redis实现订阅发布模式

从上面的官方解释上来看,它的玩法有一点像现实生活中我们听收音机一个道理,要想听收音机,我们要做什么?肯定就是调频啦,只有在正 确的频道上面,我们才能听得到好听的节目,所以说subscribe首先要订阅一个频道(channel),下面我举个例子,开两个client,分别订阅着 msg 这个频道,比如下面这样: 2.publish 到现在为止,这两个subscibe都在监视着msg这个频道,接下来,如果msg频道有消息传出,必定会被subscribe接收到,先我们还是看看 redis手册上怎么用这个命 ...

netty和rabbitmq的区别

netty和rabbitmq层次的问题: 我知道netty是tcp通信框架,rabbitmq是基于tcp通信封装的一种消息队列。如果包含套节字的话他们之间的关系层次是 socket/nio ---> netty ---> rabbitmq 这种,不知道我理解的有没有错误。 netty和rabbitmq 替换关系: 在不考虑数据解析序列化的前提下,单对单的,不涉及延时:能用netty的地方是不是可以用rabbitmq?能用rabbitmq的地方是不是可以使用netty+protobuf替换。

专题教程

JAVA概述
第一部分:java入门基础
第二部分:java常用类
第三部分:jdbc系列教程
第四部分:java高级特征
快速入门
Gson教程
快速了解 jdk8 新特征

相关文章

更多

最近更新

更多