爱程序网

Disruptor 分析

来源: 阅读:

通过分析如下代码,大致了解Disruptor的原理

 1 public static void main(String[] args)throws Exception{
 2         EventFactory<LongEvent> eventFactory = new LongEventFactory();
 3         int ringBufferSize = 1024;
 4         ExecutorService executors = Executors.newCachedThreadPool();
 5         final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executors,ProducerType.SINGLE, new BlockingWaitStrategy());    
 6         //final Disruptor<IntEvent> disruptor = new Disruptor<IntEvent>(eventFactory, ringBufferSize, executors,ProducerType.MULTI, new BlockingWaitStrategy()); //多生产者
 7         Consumer consumer1 = new Consumer();
 8         Consumer consumer2 = new Consumer();
 9         EventHandlerGroup<LongEvent> firstLevel = disruptor.handleEventsWith(consumer1,consumer2);
10         Consumer consumer21 = new Consumer();
11         Consumer consumer22 = new Consumer();
12         EventHandlerGroup<LongEvent> secondLevel = firstLevel.then(consumer21,consumer22);
13         disruptor.start();    //启动disruptor,consumer开始等待,消费数据
14         Producer producer = new Producer(disruptor);
15         
16         //启动生产者
17         new Thread(producer).start();
18     }

 1. 第2行代码  EventFactory<LongEvent> eventFactory = new LongEventFactory();

 数据工厂类构造单个数据,disruptor使用此工厂类预分配数据。

 2. 第5行代码 final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executors,ProducerType.SINGLE, new BlockingWaitStrategy());

 预分配数据,构建RingBuffer,指定生产者类型(单生产者、多生产者)、消费者执行的线程池、生产者等待可发布数据空间和消费者等待可消费数据的策略。

 

 3. 第9行代码 EventHandlerGroup<LongEvent> firstLevel = disruptor.handleEventsWith(consumer1,consumer2);

   每个消费者Handler都会被封装为一个Processor,其可消费序号由其sequence barrier决定。

 4. 第12行代码 EventHandlerGroup<LongEvent> secondLevel = firstLevel.then(consumer21,consumer22);

 5. 第13行代码 disruptor.start(); //启动disruptor,consumer开始等待,消费数据

 6. 第14-17行代码,创建启动生产者,发布数据

 

关于爱程序网 - 联系我们 - 广告服务 - 友情链接 - 网站地图 - 版权声明 - 人才招聘 - 帮助