RabbitMQ topic Exchange使用

2018-07-06|来源:



 任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
 1、这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
 2、这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
 3、在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
 4、“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
 5、同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。


生产者
private final static String EXCHANGE_NAME = "test_exchange_topic";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    // 消息内容
    String message = "用户数据操作";
    channel.basicPublish(EXCHANGE_NAME, "user.insert", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
}


消费者1
private final static String QUEUE_NAME_1 = "test_queue_topic_1";
@Test
public void testRecv1() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "user.update");
    channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "user.delete");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME_1, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


消费者2
private final static String QUEUE_NAME_2 = "test_queue_topic_2";
@Test
public void testRecv2() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "user.#");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME_2, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


书生整理于网络


相关问答

更多

netty和rabbitmq的区别

netty和rabbitmq层次的问题: 我知道netty是tcp通信框架,rabbitmq是基于tcp通信封装的一种消息队列。如果包含套节字的话他们之间的关系层次是 socket/nio ---> netty ---> rabbitmq 这种,不知道我理解的有没有错误。 netty和rabbitmq 替换关系: 在不考虑数据解析序列化的前提下,单对单的,不涉及延时:能用netty的地方是不是可以用rabbitmq?能用rabbitmq的地方是不是可以使用netty+protobuf替换。

如何监控RabbitMQ的性能

测试 rabbitmq 的性能方法如下: 1、声明7个具有不同属性的queue,分别和名为test_exchage的exchange进行绑定(因为exchange为fanout类型,所以测试代码中的routing_key其实是不起作用的); 2、向exchange发送具有persistent属性的消息(delivery_mode=2); 3、创建7个消费者分别从上述7个queue中获取消息;

rabbitmq 消费者在javaweb中的使用

这个,监听都是这样,得循环等待,那用多线程吧,一个线程监听队列,并存储队列信息。另一个线程处理这个信息。

为什么我要放弃 RabbitMQ

1. 需要“消息延迟”功能 这对我们来说是很重要的业务需求。当顾客订了一个服务,首先我们会发送描述相信指令的短信,然后我们会在两分钟后发送第二条描述详情的短信,而不是两条一起发送。我们希望通过这样,留给用户阅读的时间(每次间隔2分钟),然后把他们放入处理递交的队列中。可是用现在的设计,在递交到RabbitMQ的时候,设置延迟是不可能的。因为开发者把RabbitMQ设计成立刻发送消息,而不是延迟后再发送。 我本可以使用Java SDK内建的“DelayQueue”,然后把消息递交到RabbitMQ, ...

为什么我要放弃 RabbitMQ

1. 需要“消息延迟”功能 这对我们来说是很重要的业务需求。当顾客订了一个服务,首先我们会发送描述相信指令的短信,然后我们会在两分钟后发送第二条描述详情的短信,而不是两条一起发送。我们希望通过这样,留给用户阅读的时间(每次间隔2分钟),然后把他们放入处理递交的队列中。可是用现在的设计,在递交到RabbitMQ的时候,设置延迟是不可能的。因为开发者把RabbitMQ设计成立刻发送消息,而不是延迟后再发送。 我本可以使用Java SDK内建的“DelayQueue”,然后把消息递交到RabbitMQ, ...

RabbitMQ 教程

相关文章

更多

最近更新

更多