RabbitMQ Work模式消息队列

2018-07-05|来源:

一个生产者、多个消费者。 一个消息只能被一个消费者获取。


生产者发布消息

private final static String QUEUE_NAME = "test_queue_work";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    for (int i = 0; i < 50; i++) {
        // 消息内容
        String message = "" + i;
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        Thread.sleep(i * 10);
    }
    channel.close();
    connection.close();
}


消息者1
@Test
public void testRecv1() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        //休眠
        Thread.sleep(10);
        // 返回确认状态
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


消费者2
@Test
public void testRecv2() throws Exception{
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成状态
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        // 休眠1秒
        Thread.sleep(1000);
        //反馈消息的消费状态
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


为了测试消费者获取到的消息情况,先将两个消费者启动后,再启动生产者
消费者1打印的消息
消费者2打印的消息
[x] Received '0'
[x] Received '2'
[x] Received '4'
[x] Received '6'
[x] Received '8'
[x] Received '10'
[x] Received '12'
[x] Received '14'
[x] Received '16'
[x] Received '18'
[x] Received '20'
[x] Received '22'
[x] Received '24'
[x] Received '26'
[x] Received '28'
[x] Received '30'
[x] Received '32'
[x] Received '34'
[x] Received '36'
[x] Received '38'
[x] Received '40'
[x] Received '42'
[x] Received '44'
[x] Received '46'
[x] Received '48'
[x] Received '1'
[x] Received '3'
[x] Received '5'
[x] Received '7'
[x] Received '9'
[x] Received '11'
[x] Received '13'
[x] Received '15'
[x] Received '17'
[x] Received '19'
[x] Received '21'
[x] Received '23'
[x] Received '25'
[x] Received '27'
[x] Received '29'
[x] Received '31'
[x] Received '33'
[x] Received '35'
[x] Received '37'
[x] Received '39'
[x] Received '41'
[x] Received '43'
[x] Received '45'
[x] Received '47'
[x] Received '49'
测试结果:
   消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
   消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。


但是由于消费者1每次消费后只休息10毫秒,而消费者2每次消费后只休息1秒,这样是不合理的,应该是消费者1要比消费者2获取到的消息多才对。


此时可以使用channel.basicQos(1);来设置同一时刻服务器只会发一条消息给消费者,解决“能者多劳”问题,以消费者1为例:

@Test
public void testRecv1() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        //休眠
        Thread.sleep(10);
        // 返回确认状态
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


执行结果:
消费者1打印的消息
消费者2打印的消息
[x] Received '0'
[x] Received '2'
[x] Received '3'
[x] Received '4'
[x] Received '5'
[x] Received '6'
[x] Received '7'
[x] Received '8'
[x] Received '9'
[x] Received '10'
[x] Received '11'
[x] Received '12'
[x] Received '13'
[x] Received '14'
[x] Received '15'
[x] Received '17'
[x] Received '18'
[x] Received '19'
[x] Received '20'
[x] Received '21'
[x] Received '22'
[x] Received '24'
[x] Received '25'
[x] Received '26'
[x] Received '27'
[x] Received '28'
[x] Received '30'
[x] Received '31'
[x] Received '32'
[x] Received '33'
[x] Received '35'
[x] Received '36'
[x] Received '37'
[x] Received '39'
[x] Received '40'
[x] Received '41'
[x] Received '43'
[x] Received '44'
[x] Received '45'
[x] Received '47'
[x] Received '48'
[x] Received '49'
[x] Received '1'
[x] Received '16'
[x] Received '23'
[x] Received '29'
[x] Received '34'
[x] Received '38'
[x] Received '42'
[x] Received '46'


书生整理于网络


相关问答

更多

java如何获取rabbitmq队列中消息数量

下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要 ...

netty和rabbitmq的区别

netty和rabbitmq层次的问题: 我知道netty是tcp通信框架,rabbitmq是基于tcp通信封装的一种消息队列。如果包含套节字的话他们之间的关系层次是 socket/nio ---> netty ---> rabbitmq 这种,不知道我理解的有没有错误。 netty和rabbitmq 替换关系: 在不考虑数据解析序列化的前提下,单对单的,不涉及延时:能用netty的地方是不是可以用rabbitmq?能用rabbitmq的地方是不是可以使用netty+protobuf替换。

Windows10安装RabbitMQ ,安装完成所有命令都报“系统找不到指定文件”

rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器,Rabbit MQ 是建立在Erlang OTP平台上。

rabbitmq 消费者在javaweb中的使用

这个,监听都是这样,得循环等待,那用多线程吧,一个线程监听队列,并存储队列信息。另一个线程处理这个信息。

rabbitmq消息真的可以持久化吗

消息传递的速度:用MSMQ/RabbitMq,等带持久化功能的队列组件;如果嫌太慢,就用ZeroMq(无消息持久化功能),但可以达到30W消息每秒;事件持久化的速度:由于事件都是.

RabbitMQ 教程

相关文章

更多

最近更新

更多