RabbitMQ 路由模式(Routing)-使用 direct Exchange

2018-07-06|来源:


任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1、处理路由键
2、需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
3、一般情况可以使用rabbitMQ自带的Exchange:" "(该Exchange的名字为空字符串,下文称其为default Exchange)。
4、这种模式下不需要将Exchange进行任何绑定(binding)操作
5、消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
6、如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey");  
byte[] messageBodyBytes = "hello world".getBytes();  
//需要绑定路由键  
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);



生产者

1、创建一个 direct模式的exchange(交换机)
channel.exchange_declare(exchange='direct_logs',type='direct')
2、向交换机中发送消息
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
private final static String EXCHANGE_NAME = "test_exchange_direct";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    // 消息内容
    String message = "这是消息A";
    channel.basicPublish(EXCHANGE_NAME, "A", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
}


消息者
1、声明队列
channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
2、创建消息与交换机的绑定
channel.queue_bind(exchange=exchange_name,queue=queue_name)
channel.queue_bind(exchange=exchange_name,queue=queue_name, routing_key='black')
3、生产者发出消息后,匹配的路由key会接收到消息


消费者1

private final static String QUEUE_NAME_1 = "test_queue_direct_1";
@Test
public void testRecv1() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "A");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME_1, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


消息者2
private final static String QUEUE_NAME_2 = "test_queue_direct_2";
@Test
public void testRecv2() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "B");
    channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "A");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME_2, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


书生整理于网络


相关问答

更多

如何监控RabbitMQ的性能

测试 rabbitmq 的性能方法如下: 1、声明7个具有不同属性的queue,分别和名为test_exchage的exchange进行绑定(因为exchange为fanout类型,所以测试代码中的routing_key其实是不起作用的); 2、向exchange发送具有persistent属性的消息(delivery_mode=2); 3、创建7个消费者分别从上述7个queue中获取消息;

graphic engine编程(direct3D)

www.gameres.com 到这个网站找,很多

netty和rabbitmq的区别

netty和rabbitmq层次的问题: 我知道netty是tcp通信框架,rabbitmq是基于tcp通信封装的一种消息队列。如果包含套节字的话他们之间的关系层次是 socket/nio ---> netty ---> rabbitmq 这种,不知道我理解的有没有错误。 netty和rabbitmq 替换关系: 在不考虑数据解析序列化的前提下,单对单的,不涉及延时:能用netty的地方是不是可以用rabbitmq?能用rabbitmq的地方是不是可以使用netty+protobuf替换。

rabbitmq的work模式怎么整合到spring中

abbitMQ主要有三种交换器:direct、fanout、topic direct就是一对一传输 fanout就是匹配传输 topic就是主题分发传输 还是看代码吧 maven构建:pom.xml [html] view plain copy <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLS ...

rabbitmq 消费者在javaweb中的使用

这个,监听都是这样,得循环等待,那用多线程吧,一个线程监听队列,并存储队列信息。另一个线程处理这个信息。

RabbitMQ 教程

相关文章

更多

最近更新

更多