构建Disruptor实例-生产消费模型完成整个入门示例

2019-05-20 23:30|来源: 网路

1、初始化Disruptor,构建Disruptor只要需要以下几个参数

    1 eventFactory: 消息(event)工厂对象

    2 ringBufferSize: 容器的长度

    3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler

    4 ProducerType: 单生产者 还是 多生产者

    5 waitStrategy: 等待策略

示例代码:

Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
        ringBufferSize,
        executor,
        ProducerType.SINGLE,
        new BlockingWaitStrategy());


2、初始化好Disruptor之后,通过该对象的handleEventsWith添加消费者的监听

3、然后启动Disruptor实例

4、往RingBuffer中生产数据,完成生产消费模型


具体代码如下:

package com656463.quickstart;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
    public static void main(String[] args) {
        // 参数准备工作
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 4;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        /**
         * 1 eventFactory: 消息(event)工厂对象
         * 2 ringBufferSize: 容器的长度
         * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
         * 4 ProducerType: 单生产者 还是 多生产者
         * 5 waitStrategy: 等待策略
         */
        //1. 实例化disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
                ringBufferSize,
                executor,
                ProducerType.SINGLE,
                new BlockingWaitStrategy());

        //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
        disruptor.handleEventsWith(new OrderEventHandler());

        //3. 启动disruptor
        disruptor.start();

        //4. 获取实际存储数据的容器: RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            producer.sendData(bb);
        }

        disruptor.shutdown();
        executor.shutdown();

    }
}


整个Disruptor的入门程序完成,接下来深入理解Disruptor核心API

相关问答

更多

LMAX Disruptor事件中的类字段是否需要易变?(Do class fields within an LMAX Disruptor event need to be volatile?)

彼得的评论提供了一个很好的线索,事实上,是的,有涉及的记忆围栏。 您可以在Sequence类中看到putOrderedLong()compareAndSet()等等。 每个都强制执行内存排序。 有关更多详细信息,请参阅源代码 。 Peter's comment gives a good clue, and in fact, yes there are memory fences involved. You can see a putOrderedLong() compareAndSet() an

解决LMAX Disruptor模式中的消费者(eventProcessor)问题(Solution to slow consumer(eventProcessor) issue in LMAX Disruptor pattern)

一般来说,使用WorkerPool允许多个池化工作线程在单个消费者上工作,如果您具有独立且可能具有可变持续时间的任务(例如:一些短任务,一些更长),这是很好的。 另一个选择是让多个独立工作者对事件进行并行处理,但每个工作者只处理模N个工作者(例如2个线程,一个线程处理奇数,一个线程处理甚至事件ID)。 如果您具有一致的持续时间处理任务,并且允许批处理也能非常有效地工作,则此方法很有用。 另一件需要考虑的事情是消费者可以进行“批处理”,这在审计中尤其有用。 如果您的消费者有10个事件在等待,而不是

从LinkedBlockingQueue迁移到LMAX的Disruptor(Migrating from LinkedBlockingQueue to LMAX' Disruptor)

Mentaqueue提供了一个单一的生产者单一消费者队列基于相同的想法 - http://mentaqueue.soliveirajr.com/Page.mtw ,你可以检查代码,但我从来没有使用它自己。 开箱即用的Disruptor提供了两种技术 - 我不会进入代码,但可以根据需要进行操作。 它允许对事件处理程序进行排序,并且可以对其进行配置,以便每个处理程序将并行处理所有请求; 每个请求由每个处理程序处理。 一个Worker Pool实现,它允许一个工作线程池来处理一个请求; 每个请求将从线

我应该同步访问disruptor Next / Publish方法吗?(should I synchronize access to disruptor Next/Publish methods?)

我是disruptor-net的作者。 破坏程序是并发集合,因此一旦正确配置,您不必应用任何锁定。 在您的情况下,您应该使用MultiThreadedClaimStrategy初始化RingBuffer,因为您有多个生成器并使用示例3发布到环形缓冲区。 也就是说,我不鼓励在生产代码中使用disruptor-net,我暂时把它移植到我的业余时间并且没有在生产代码中使用它,它需要进一步测试。 也就是说,.NET并发队列明显快于Java队列,因此我建议使用ConcurrentQueue作为disrut

LMAX的破坏者模式如何工作?(How does LMAX's disruptor pattern work?)

Google Code项目确实提到了一个关于环形缓冲区实施的技术文件 ,但是对于想要了解其工作原理的人来说,这是一个有点干燥,学术和艰难的任务。 然而,有一些博客文章已经开始以更可读的方式解释内部的内容。 有关环形缓冲区的解释是破坏者模式的核心, 描述消费者障碍 (与从中断者读取的部分)以及处理多个生产者的一些信息 。 Disruptor的最简单的描述是:它是以最有效的方式在线程之间发送消息的一种方法。 它可以用作队列的替代品,但它也与SEDA和演员共享许多功能。 与队列相比: Disrupto