构建Disruptor实例-生产消费模型完成整个入门示例

2019-05-20 23:30|来源: 网路

1、初始化Disruptor,构建Disruptor只要需要以下几个参数

    1 eventFactory: 消息(event)工厂对象

    2 ringBufferSize: 容器的长度

    3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler

    4 ProducerType: 单生产者 还是 多生产者

    5 waitStrategy: 等待策略

示例代码:

Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
        ringBufferSize,
        executor,
        ProducerType.SINGLE,
        new BlockingWaitStrategy());


2、初始化好Disruptor之后,通过该对象的handleEventsWith添加消费者的监听

3、然后启动Disruptor实例

4、往RingBuffer中生产数据,完成生产消费模型


具体代码如下:

package com656463.quickstart;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
    public static void main(String[] args) {
        // 参数准备工作
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 4;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        /**
         * 1 eventFactory: 消息(event)工厂对象
         * 2 ringBufferSize: 容器的长度
         * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
         * 4 ProducerType: 单生产者 还是 多生产者
         * 5 waitStrategy: 等待策略
         */
        //1. 实例化disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
                ringBufferSize,
                executor,
                ProducerType.SINGLE,
                new BlockingWaitStrategy());

        //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
        disruptor.handleEventsWith(new OrderEventHandler());

        //3. 启动disruptor
        disruptor.start();

        //4. 获取实际存储数据的容器: RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            producer.sendData(bb);
        }

        disruptor.shutdown();
        executor.shutdown();

    }
}


整个Disruptor的入门程序完成,接下来深入理解Disruptor核心API

相关问答

更多

生产者消费者程序问题!

不是断了,是程序停止了!

从LinkedBlockingQueue迁移到LMAX的Disruptor(Migrating from LinkedBlockingQueue to LMAX' Disruptor)

Mentaqueue提供了一个单一的生产者单一消费者队列基于相同的想法 - http://mentaqueue.soliveirajr.com/Page.mtw ,你可以检查代码,但我从来没有使用它自己。 开箱即用的Disruptor提供了两种技术 - 我不会进入代码,但可以根据需要进行操作。 它允许对事件处理程序进行排序,并且可以对其进行配置,以便每个处理程序将并行处理所有请求; 每个请求由每个处理程序处理。 一个Worker Pool实现,它允许一个工作线程池来处理一个请求; 每个请求将从线程 ...

通用.Net生产者/消费者(Generic .Net Producer/Consumer)

微软CCR包含你所需要的大部分内容。 以下是一些代码示例和使用说明。 Microsoft CCR contains much of what you need. Here are some code samples and usage notes.

如果实现生产者/消费者模型,那么最好的数据结构是什么?(If implementation a producer/consumer model, what's the best data structure to use?)

如果您在多线程环境中使用队列,我建议使用ConcurrentLinkedQueue 。 这将为您管理所有同步。 If you are using a queue inside of a multithreaded environment, I suggest a ConcurrentLinkedQueue. This will manage all the synchronization for you.

像场景一样的生产者消费者模型不起作用(producer consumer model like scenario is not working)

首先,不要使用Timer和TimerTask 。 使用ExecutorService进行多线程处理。 并在Singleton类中使用Eager Initialization。 或者双重检查null的锁定,以使Singleton真正单身。 FTPClientPolling.java public class FTPClientPolling { private static FTPClientPolling instance = new FTPClientPolling(); p ...

简单的生产者 - 消费者示例C#Dictionary(Simple producer-consumer example C# Dictionary)

如果我理解你的算法,它应该是: main.Add(key, part[key]) If I understand your algorithm, it should be: main.Add(key, part[key])

我该如何修复这个“不完全同步”的消费者生产者示例(How can I fix this “not quite synchronized” consumer producer example)

您的消费者必须await 持有锁 (作为方法状态的javadoc )。 另外,你不应该使用tryLock ,你应该只使用lock 。 如果锁定获取失败,如果要执行其他操作,则只使用tryLock 。 在您的情况下,如果锁定获取失败,您只需尝试再次获取它。 Your consumer must await while holding the lock (as the javadocs for the method state). also, you shouldn't be using tryLoc ...

LMAX Disruptor与JMS提供商(LMAX Disruptor vs JMS Provider)

主要区别在于Disruptor设计用于同一个过程。 为什么? 出于性能原因(简答)。 更长的答案是,如果你不小心使用JMS接口的额外开销,套接字连接,锁定和多线程将有更高的开销,使Disruptor相形见绌。 快速JMS服务每秒可处理超过20,000条消息,但破坏程序的设计目的是处理2000万条消息的速率。 要实现这一点,这意味着您无法执行JMS假定的某些事情。 (往上看) The main difference is that the Disruptor is designed to work ...

LMAX Disruptor事件中的类字段是否需要易变?(Do class fields within an LMAX Disruptor event need to be volatile?)

彼得的评论提供了一个很好的线索,事实上,是的,有涉及的记忆围栏。 您可以在Sequence类中看到putOrderedLong()compareAndSet()等等。 每个都强制执行内存排序。 有关更多详细信息,请参阅源代码 。 Peter's comment gives a good clue, and in fact, yes there are memory fences involved. You can see a putOrderedLong() compareAndSet() and ...

解决LMAX Disruptor模式中的消费者(eventProcessor)问题(Solution to slow consumer(eventProcessor) issue in LMAX Disruptor pattern)

一般来说,使用WorkerPool允许多个池化工作线程在单个消费者上工作,如果您具有独立且可能具有可变持续时间的任务(例如:一些短任务,一些更长),这是很好的。 另一个选择是让多个独立工作者对事件进行并行处理,但每个工作者只处理模N个工作者(例如2个线程,一个线程处理奇数,一个线程处理甚至事件ID)。 如果您具有一致的持续时间处理任务,并且允许批处理也能非常有效地工作,则此方法很有用。 另一件需要考虑的事情是消费者可以进行“批处理”,这在审计中尤其有用。 如果您的消费者有10个事件在等待,而不是独 ...