构建Disruptor实例-生产消费模型完成整个入门示例

2019-05-20 23:30|来源: 网路

1、初始化Disruptor,构建Disruptor只要需要以下几个参数

    1 eventFactory: 消息(event)工厂对象

    2 ringBufferSize: 容器的长度

    3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler

    4 ProducerType: 单生产者 还是 多生产者

    5 waitStrategy: 等待策略

示例代码:

Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
        ringBufferSize,
        executor,
        ProducerType.SINGLE,
        new BlockingWaitStrategy());


2、初始化好Disruptor之后,通过该对象的handleEventsWith添加消费者的监听

3、然后启动Disruptor实例

4、往RingBuffer中生产数据,完成生产消费模型


具体代码如下:

package com656463.quickstart;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
    public static void main(String[] args) {
        // 参数准备工作
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 4;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        /**
         * 1 eventFactory: 消息(event)工厂对象
         * 2 ringBufferSize: 容器的长度
         * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
         * 4 ProducerType: 单生产者 还是 多生产者
         * 5 waitStrategy: 等待策略
         */
        //1. 实例化disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
                ringBufferSize,
                executor,
                ProducerType.SINGLE,
                new BlockingWaitStrategy());

        //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
        disruptor.handleEventsWith(new OrderEventHandler());

        //3. 启动disruptor
        disruptor.start();

        //4. 获取实际存储数据的容器: RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            producer.sendData(bb);
        }

        disruptor.shutdown();
        executor.shutdown();

    }
}


整个Disruptor的入门程序完成,接下来深入理解Disruptor核心API

相关问答

更多

从LinkedBlockingQueue迁移到LMAX的Disruptor(Migrating from LinkedBlockingQueue to LMAX' Disruptor)

Mentaqueue提供了一个单一的生产者单一消费者队列基于相同的想法 - http://mentaqueue.soliveirajr.com/Page.mtw ,你可以检查代码,但我从来没有使用它自己。 开箱即用的Disruptor提供了两种技术 - 我不会进入代码,但可以根据需要进行操作。 它允许对事件处理程序进行排序,并且可以对其进行配置,以便每个处理程序将并行处理所有请求; 每个请求由每个处理程序处理。 一个Worker Pool实现,它允许一个工作线程池来处理一个请求; 每个请求将从线程 ...

Spring JMS生产者和消费者互动(Spring JMS Producer and Consumer interaction)

您需要在生产者端和消费者端使用org.springframework.jms.core.JmsTemplate.sendAndReceive(...)使用请求/回复,容器中设置的MessageListener方法必须返回一个值。 或者让你的生产者订阅ActiveMQ.Advisory.MessageConsumed.Queue ,当消息通过访问诸如orignalMessageId属性被消费时被通知,请看这里http://activemq.apache.org/advisory-message.h

线程锁和条件变量,生产者使用者示例(Thread Locks and Condition Variables, The Producer Consumer Example)

有关完整示例,请参阅BlockingQueue的java api中的生产者 - 消费者示例。 代码中有几个错误。 首先生产者和消费者不使用相同的队列,例如有两个队列实例。 其次notify和wait方法也在不同的对象上运行。 让你的例子工作需要几件事情: 只有一个队列 线程安全处理队列 处理通知并等待同一个对象 生产者代码可以被重写为: public void produce() { int i = 0; while (i < 100) { synchronize

具有Qt信号/插槽的生产者/消费者(Producers/consumers with Qt signals/slots)

我不认为信号/插槽机制在这里是合适的,因为每个信号都分配给所有连接的插槽。 这意味着如果您使用信号/插槽机制作为“工作队列”,则不会对消费者进行任何负载分配,而是所有消费者都可以执行相同(重复)的工作。 一个更好的机制是使用容器作为工作队列(生产者向容器添加项目,消费者删除它们),使用QMutex避免并发问题,使用一个(或两个,如果要强加最大大小) QWaitCondition来阻止没有工作的消费者。 I don't think the signal/slot mechanism is appr

异步生产者/消费者(Async Producer/Consumer)

我认为这应该有效: private static BlockingCollection<string> _itemsToProcess = new BlockingCollection<string>(); static void Main(string[] args) { InsertWorker(); GenerateItems(10, 1000); _itemsToProcess.CompleteAdding(); ...