构建Disruptor实例-生产消费模型完成整个入门示例

2019-05-20 23:30|来源: 网路

1、初始化Disruptor,构建Disruptor只要需要以下几个参数

    1 eventFactory: 消息(event)工厂对象

    2 ringBufferSize: 容器的长度

    3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler

    4 ProducerType: 单生产者 还是 多生产者

    5 waitStrategy: 等待策略

示例代码:

Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
        ringBufferSize,
        executor,
        ProducerType.SINGLE,
        new BlockingWaitStrategy());


2、初始化好Disruptor之后,通过该对象的handleEventsWith添加消费者的监听

3、然后启动Disruptor实例

4、往RingBuffer中生产数据,完成生产消费模型


具体代码如下:

package com656463.quickstart;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
    public static void main(String[] args) {
        // 参数准备工作
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 4;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        /**
         * 1 eventFactory: 消息(event)工厂对象
         * 2 ringBufferSize: 容器的长度
         * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
         * 4 ProducerType: 单生产者 还是 多生产者
         * 5 waitStrategy: 等待策略
         */
        //1. 实例化disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
                ringBufferSize,
                executor,
                ProducerType.SINGLE,
                new BlockingWaitStrategy());

        //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
        disruptor.handleEventsWith(new OrderEventHandler());

        //3. 启动disruptor
        disruptor.start();

        //4. 获取实际存储数据的容器: RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            producer.sendData(bb);
        }

        disruptor.shutdown();
        executor.shutdown();

    }
}


整个Disruptor的入门程序完成,接下来深入理解Disruptor核心API

相关问答

更多

从LinkedBlockingQueue迁移到LMAX的Disruptor(Migrating from LinkedBlockingQueue to LMAX' Disruptor)

Mentaqueue提供了一个单一的生产者单一消费者队列基于相同的想法 - http://mentaqueue.soliveirajr.com/Page.mtw ,你可以检查代码,但我从来没有使用它自己。 开箱即用的Disruptor提供了两种技术 - 我不会进入代码,但可以根据需要进行操作。 它允许对事件处理程序进行排序,并且可以对其进行配置,以便每个处理程序将并行处理所有请求; 每个请求由每个处理程序处理。 一个Worker Pool实现,它允许一个工作线程池来处理一个请求; 每个请求将从线程 ...

异步生产者/消费者(Async Producer/Consumer)

我认为这应该有效: private static BlockingCollection<string> _itemsToProcess = new BlockingCollection<string>(); static void Main(string[] args) { InsertWorker(); GenerateItems(10, 1000); _itemsToProcess.CompleteAdding(); ...

在linux下用c语言实现用多进程同步方法演示“生产者-消费者”问题

这个问题需要的知识主要包括: 1 多进程间进行通信; 2 使用同步信号量(semaphore)和互斥信号量(mutex)进行数据保护。 参考代码如下,可以参照注释辅助理解: #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #define N 2 // 消费者或者生产者的数目 #define M 10 // 缓冲数目 int in = 0; / ...

java从入门到精通实例版

每次看到这种java从入门到精通的书籍,我都会想,世界上真的有这种书吗?所以只要这样取名的书籍我从来不看,没有意思,另外说到书籍,国内的java入门书籍好一点的真少,我推荐java核心技术第九版,共两卷,有源码,这种书才是好书中的好书。 不好意思,这里只能上传一卷

生产者/消费者线程使用队列(Producer/Consumer threads using a Queue)

Java 5+具有您需要的所有工具。 你会想: 把你所有的生产者放在一个ExecutorService ; 把你所有的消费者放在另一个ExecutorService ; 如有必要,使用BlockingQueue在两者之间进行通信。 (3)因为从我的经验来看,这是一个不必要的步骤。 你所做的就是向消费者执行者服务提交新的任务。 所以: final ExecutorService producers = Executors.newFixedThreadPool(100); final Executor ...