分类:
编程软件 Java 中间件 Elasticsearch 前端开发 kafka 云计算大数据 Lua zookeeper PHP 网络与运维 项目构建管理 ActiveMQ 办公应用软件 Memcached RabbitMQ Sentinel Nacos
南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗 南玻医疗

构建Disruptor实例-生产消费模型完成整个入门示例

2019-05-20 23:30|来源: 网路

1、初始化Disruptor,构建Disruptor只要需要以下几个参数

    1 eventFactory: 消息(event)工厂对象

    2 ringBufferSize: 容器的长度

    3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler

    4 ProducerType: 单生产者 还是 多生产者

    5 waitStrategy: 等待策略

示例代码:

Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
        ringBufferSize,
        executor,
        ProducerType.SINGLE,
        new BlockingWaitStrategy());


2、初始化好Disruptor之后,通过该对象的handleEventsWith添加消费者的监听

3、然后启动Disruptor实例

4、往RingBuffer中生产数据,完成生产消费模型


具体代码如下:

package com656463.quickstart;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
    public static void main(String[] args) {
        // 参数准备工作
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 4;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        /**
         * 1 eventFactory: 消息(event)工厂对象
         * 2 ringBufferSize: 容器的长度
         * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
         * 4 ProducerType: 单生产者 还是 多生产者
         * 5 waitStrategy: 等待策略
         */
        //1. 实例化disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,
                ringBufferSize,
                executor,
                ProducerType.SINGLE,
                new BlockingWaitStrategy());

        //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
        disruptor.handleEventsWith(new OrderEventHandler());

        //3. 启动disruptor
        disruptor.start();

        //4. 获取实际存储数据的容器: RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            producer.sendData(bb);
        }

        disruptor.shutdown();
        executor.shutdown();

    }
}


整个Disruptor的入门程序完成,接下来深入理解Disruptor核心API

相关问答

更多

LMAX Disruptor事件中的类字段是否需要易变?(Do class fields within an LMAX Disruptor event need to be volatile?)

彼得的评论提供了一个很好的线索,事实上,是的,有涉及的记忆围栏。 您可以在Sequence类中看到putOrderedLong()compareAndSet()等等。 每个都强制执行内存排序。 有关更多详细信息,请参阅源代码 。 Peter's comment gives a good clue, and in fact, yes there are memory fences involved. You can see a putOrderedLong() compareAndSet() an

解决LMAX Disruptor模式中的消费者(eventProcessor)问题(Solution to slow consumer(eventProcessor) issue in LMAX Disruptor pattern)

一般来说,使用WorkerPool允许多个池化工作线程在单个消费者上工作,如果您具有独立且可能具有可变持续时间的任务(例如:一些短任务,一些更长),这是很好的。 另一个选择是让多个独立工作者对事件进行并行处理,但每个工作者只处理模N个工作者(例如2个线程,一个线程处理奇数,一个线程处理甚至事件ID)。 如果您具有一致的持续时间处理任务,并且允许批处理也能非常有效地工作,则此方法很有用。 另一件需要考虑的事情是消费者可以进行“批处理”,这在审计中尤其有用。 如果您的消费者有10个事件在等待,而不是

从LinkedBlockingQueue迁移到LMAX的Disruptor(Migrating from LinkedBlockingQueue to LMAX' Disruptor)

Mentaqueue提供了一个单一的生产者单一消费者队列基于相同的想法 - http://mentaqueue.soliveirajr.com/Page.mtw ,你可以检查代码,但我从来没有使用它自己。 开箱即用的Disruptor提供了两种技术 - 我不会进入代码,但可以根据需要进行操作。 它允许对事件处理程序进行排序,并且可以对其进行配置,以便每个处理程序将并行处理所有请求; 每个请求由每个处理程序处理。 一个Worker Pool实现,它允许一个工作线程池来处理一个请求; 每个请求将从线

LMAX Disruptor - 什么决定批量大小?(LMAX Disruptor - what determines the batch size?)

批量大小完全取决于可用元素的数量。 所以如果现在有更多的元素可用,那么它将被包含在批处理中。 例如,如果Disruptor调用您的代码并且队列中只有一个元素,那么您将使用endOfBatch = true获得一个调用。 如果队列中有8个元素,那么它将收集全部8个元素,并将其发送到一个批次中。 你可以在下面的代码中看到队列中的“可用”条目被获取,这可能比“下一个”条目多得多。 例如,你现在是5,等待6号插槽,然后3个事件到达,可用的将是8,并且你将在一批中接收多个呼叫(对于6,7,8)。 http

我应该同步访问disruptor Next / Publish方法吗?(should I synchronize access to disruptor Next/Publish methods?)

我是disruptor-net的作者。 破坏程序是并发集合,因此一旦正确配置,您不必应用任何锁定。 在您的情况下,您应该使用MultiThreadedClaimStrategy初始化RingBuffer,因为您有多个生成器并使用示例3发布到环形缓冲区。 也就是说,我不鼓励在生产代码中使用disruptor-net,我暂时把它移植到我的业余时间并且没有在生产代码中使用它,它需要进一步测试。 也就是说,.NET并发队列明显快于Java队列,因此我建议使用ConcurrentQueue作为disrut

相关文章

更多

最近更新

更多