RabbitMQ 简单队列

2019-03-05 22:34|来源: 网路

生产者将消息发送到队列,消费者从队列中获取消息。


P:消息的生产者
C:消息的消费者
红色:队列
首先引用rabbitmq的客户端程序所依赖的jar包:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.2</version>
</dependency>


获取rabbitmq连接
public static Connection getConnection() throws Exception {
    //定义连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //设置服务地址
    factory.setHost("localhost");
    //端口
    factory.setPort(5672);
    //设置账号信息,用户名、密码、vhost
    factory.setVirtualHost("/testvhost");
    factory.setUsername("test");
    factory.setPassword("test");
    // 通过工程获取连接
    Connection connection = factory.newConnection();
    return connection;
}



生产者发送消息

private final static String QUEUE_NAME = "test_queue";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    // 从连接中创建通道
    Channel channel = connection.createChannel();
    // 声明(创建)队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 消息内容
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    //关闭通道和连接
    channel.close();
    connection.close();
}


执行流程
1、获取一个mq连接
Connection connection = ConnectionUtil.getConnection();
2、根据连接创建通道
Channel channel = connection.createChannel();
3、声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
4、消息内容
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
点击上面的队列名称,查询具体的队列中的信息


消费者从队列中获取消息

@Test
public void testRecv() throws Exception{
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列
    channel.basicConsume(QUEUE_NAME, true, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
    }
}


书生整理于网络


相关问答

更多

在RabbitMQ中更改现有队列的路由键(Changing the routing key of an existing queue in RabbitMQ)

队列没有路由密钥。 队列只是消息所在的位置。 路由密钥位于交换和队列之间的绑定中 您需要在交换和队列之间创建新绑定,并在不再需要时删除旧绑定 a queue does not have a routing key. a queue is just a place where messages sit. the routing key lives in the binding between an exchange and a queue you need to create a new bindi ...

RabbitMQ重新排序消息(RabbitMQ reordering messages)

由于没有答复,我想我做得很好;) 无论如何,在与其他利益相关者讨论要求后,决定现在可以放弃LIFO要求。 当涉及到它时,我们可以担心。 我们可能最终采用的解决方案是让工作人员打开第二个队列,让主人可以使用该队列让工作人员知道要忽略哪些工作,并提供额外的控制/监视信息(无论如何我们都需要这些信息)。 实现AMQP 1.0规范的RabbitMQ也可能对此有所帮助。 所以我会把这个问题标记为现在的答案。 其他人仍然可以自由添加或改进。 Since there is no reply I guess I ...

RabbitMQ RPC跨多个rabbitMQ实例(RabbitMQ RPC across multiple rabbitMQ instances)

看看org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory。 它将允许您为不同的虚拟主机或不同的rabbitmq实例创建多个连接工厂。 我们将其用于多租户rabbitmq应用程序。 Take a look at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory. It will allow you to ...

RabbitMQ集群用例没有HA(RabbitMQ clustering use case w/o HA)

通过拥有更多服务器,您可以获得更多吞吐量,能够接受更多连接的客户端等等。 尽管声明了资源的位置,但非HA群集仍能够查看群集中所有节点中的资源。 By having more servers you can get more throughput, be able to accept more connected clients and so on. The non HA cluster is able to see resources in all nodes in the clusters, d ...

使用RabbitMQ与Plone - 芹菜还是不?(Using RabbitMQ with Plone - Celery or not?)

首先,问问你自己,如果你需要RabbitMQ的功能,或者只想用Plone在Python中完成一些异步任务。 如果你真的不需要RabbitMQ,你可以看看David Glick的要点:如何将Celery和Plone集成在一起(并且仍然使用RabbitMQ和Celery): https://gist.github.com/davisagli/5824662 https://gist.github.com/davisagli/5824709 您还可以查看collective.taskqueue (没有C ...

相关文章

更多

最近更新

更多