Disruptor是什么?
Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后再交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。本文将使用disruptor最新版3.3.6进行介绍,可以在
https://github.com/LMAX-Exchange/disruptor/releases 下载最新的JAR包开始disruptor之旅吧。
轮胎:RingBuffer
RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bV2qKzfA-1591960121369)(https://upload-images.jianshu.io/upload_images/22478635-fa10d4054e44f757?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]
数组
这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。
序号
RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。
由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。
无锁的机制
在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:
一个生产者 + 一个消费者
生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。
一个生产者 + 多个消费者
多个消费者当然持有多个消费指针C1,C2,…,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。
多个生产者 + N个消费者
很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。
Disruptor初体验:简单的生产者和消费者
业务数据对象POJO(Event)
public class Order {
/* 订单ID */
private long id; /* 订单信息 */
private String info; /* 订单价格 */
private double price; public long getId()
{
return(id);
} public void setId( long id )
{
this.id = id;
} public String getInfo()
{
return(info);
} public void setInfo( String info )
{
this.info = info;
} public double getPrice()
{
return(price);
} public void setPrice( double price )
{
this.price = price;
}
}
业务数据工厂(Factory)
public class OrderFactory implements EventFactory {
@Override
public Object newInstance()
{
System.out.println( "OrderFactory.newInstance" );
return(new Order() );
}
}
事件处理器(Handler,即消费者处理逻辑)
public class OrderHandler implements EventHandler{
@Override
public void onEvent( Order order, long l, boolean b ) throws Exception
{
System.out.println( Thread.currentThread().getName() + " 消费者处理中:" + l );
order.setInfo( "info" + order.getId() );
order.setPrice( Math.random() );
}
}
Mainpublic class Main {
public static void main( String[] args ) throws InterruptedException
{
/* 创建订单工厂 */
OrderFactory orderFactory = new OrderFactory(); /* ringbuffer的大小 */
int RINGBUFFER_SIZE = 1024; /* 创建disruptor */
Disruptor disruptor = new Disruptor( orderFactory, RINGBUFFER_SIZE, Executors.defaultThreadFactory() ); /* 设置事件处理器 即消费者 */
disruptor.handleEventsWith( new OrderHandler() ); disruptor.start();
RingBuffer ringBuffer = disruptor.getRingBuffer();
/* -------------生产数据 */
for ( int i = 0; i
运行结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V25Xei7s-1591960121372)(https://upload-images.jianshu.io/upload_images/22478635-fc42aec1fcc42b38?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]
说明:
其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。
另外在构造Disruptor的时候,在3.3.6之前使用的是API:
到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。
构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xosRRHuN-1591960121378)(https://upload-images.jianshu.io/upload_images/22478635-2569d715193c3998?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]
单独使用RingBuffer:WorkerPool
如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。
public static void main( String[] args ) throws InterruptedException
{
ExecutorService executor = Executors.newFixedThreadPool( 3 );
RingBuffer ringBuffer = RingBuffer.create( ProducerType.SINGLE, new OrderFactory(), 1024, new YieldingWaitStrategy() );
WorkerPool workerPool = new WorkerPool( ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), new OrderHandler() ); workerPool.start( executor );
/* -------------生产数据 */
for ( int i = 0; i
实际上是利用WorkerPool辅助连接消费者。
一个生产者+多个消费者
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wboXIggd-1591960121380)(https://upload-images.jianshu.io/upload_images/22478635-764f278a5cf381cb?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]
public static void main( String[] args ) throws InterruptedException
{
/* 创建订单工厂 */
OrderFactory orderFactory = new OrderFactory(); /* ringbuffer的大小 */
int RINGBUFFER_SIZE = 1024; /* 创建disruptor */
Disruptor disruptor = new Disruptor( orderFactory, RINGBUFFER_SIZE, Executors.defaultThreadFactory() ); /* 设置事件处理器 即消费者 */
EventHandlerGroup eventHandlerGroup = disruptor.handleEventsWith( new OrderHandler(), new OrderHandler2() );
eventHandlerGroup.then( new OrderHandler3() );
disruptor.start(); RingBuffer ringBuffer = disruptor.getRingBuffer();
/* -------------生产数据 */
for ( int i = 0; i
运行结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lbaEDZcZ-1591960121382)(https://upload-images.jianshu.io/upload_images/22478635-00beb3bb71654ffa?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]
生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理。
如果我们想顺序的按照A->B->C呢?
public class Order {
/* 订单ID */
private long id; /* 订单信息 */
private String info; /* 订单价格 */
private double price; public long getId()
{
return(id);
} public void setId( long id )
{
this.id = id;
} public String getInfo()
{
return(info);
} public void setInfo( String info )
{
this.info = info;
} public double getPrice()
{
return(price);
} public void setPrice( double price )
{
this.price = price;
}
}
如果我们想六边形操作呢?
Handler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith( h1, h2 );
disruptor.after( h1 ).handleEventsWith( h4 );
disruptor.after( h2 ).handleEventsWith( h5 );
disruptor.after( h4, h5 ).handleEventsWith( h3 );
到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
文章和代码已经归档至【Github仓库:https://github.com/timerring/java-tutorial 】或者公众号【AIShareLab】回复 java 也可获取。 程序框架图 代码实现 数据库 — 创建满汉楼的数据库 CREATE …