Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
Java内置队列
介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。
队列 有界性 锁 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。
我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。
通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。
Disruptor论文中讲述了一个实验:
这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
机器环境:2.4G 6核
运算: 64位的计数器累加5亿次
Method Time (ms)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000
CAS操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。
单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。
在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。
综上可知,加锁的性能是最差的。
Disruptor的设计方案
Disruptor通过以下设计来解决队列速度慢的问题:
环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
一个生产者
写数据
生产者单线程写数据的流程比较简单:
申请写入m个元素;
若是有m个元素可以写入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
若是返回的正确,则生产者开始写入元素。
图5 单个生产者生产过程示意图
多个生产者
多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。
但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
下面分读数据和写数据两种情况介绍。
读数据
生产者多线程写入的情况会复杂很多:
申请读取到序号n;
若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
消费者读取元素。
如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。
读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。
然后,消费者读取下标从3到6共计4个元素。
图6 多个生产者情况下,消费者消费过程示意图
写数据
多个生产者写入的时候:
申请写入m个元素;
若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。
如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。
Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。
图7 多个生产者情况下,生产者生产过程示意图
下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。
防止不同生产者对同一段空间写入的代码,如下所示:
public long tryNext(int n) throws InsufficientCapacityException
{
if (n 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
if (!hasAvailableCapacity(gatingSequences, n, current))
{
throw InsufficientCapacityException.INSTANCE;
}
}
while (!cursor.compareAndSet(current, next));
return next;
}
通过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。
消费者的流程与生产者非常类似,这儿就不多描述了。
总结
Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能。
在美团点评内部,很多高并发场景借鉴了Disruptor的设计,减少竞争的强度。其设计思想可以扩展到分布式场景,通过无锁设计,来提升服务性能。
代码样例
package com.vrv.linkdood_disruptor;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;
/**
* 说 明:开始
*
* @author 作者名:冯龙淼
* E-mail:fenglongmiao@vrvmail.com.cn
*
* @version 版 本 号:1.0.
* 创建时间:2018年1月22日 下午12:12:31
*/
public class DemoMain {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
final int BUFFER_SIZE=1024;
final int THREAD_NUMBERS=4;
/*
* createSingleProducer创建一个单生产者的RingBuffer,
* 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。
* 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率
* 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略
*/
final RingBuffer ringBuffer = RingBuffer.createSingleProducer(new EventFactory() {
@Override
public TradeTransaction newInstance() {
return new TradeTransaction();
} // PhasedBackoffWaitStrategy YieldingWaitStrategy
}, BUFFER_SIZE,new YieldingWaitStrategy());
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//创建消息处理器
BatchEventProcessor transProcessor = new BatchEventProcessor(
ringBuffer, sequenceBarrier, new TradeTransactionInDBHandler());
//这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略
ringBuffer.addGatingSequences(transProcessor.getSequence());
//把消息处理器提交到线程池
executors.submit(transProcessor);
//如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
Future> future=executors.submit(new Callable() {
@Override
public Void call() throws Exception {
long seq = 0;
for(int i=0;i说 明:数据实体类
*
* @author 作者名:冯龙淼
* E-mail:fenglongmiao@vrvmail.com.cn
*
* @version 版 本 号:1.0.
* 创建时间:2018年1月22日 下午12:10:58
*/
//DEMO中使用的 消息全假定是一条交易
public class TradeTransaction {
private String id;//交易ID
private double price;//交易金额
public TradeTransaction() {
}
public TradeTransaction(String id, double price) {
super();
this.id = id;
this.price = price;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
@Override
public String toString() {
return "TradeTransaction [id=" + id + ", price=" + price + "]";
}
} package com.vrv.linkdood_disruptor;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* 说 明:消费实现类
*
* @author 作者名:冯龙淼
* E-mail:fenglongmiao@vrvmail.com.cn
*
* @version 版 本 号:1.0.
* 创建时间:2018年1月22日 下午12:11:33
*/
public class TradeTransactionInDBHandler implements EventHandler,WorkHandler {
/** 交易条数 */
private AtomicLong count = new AtomicLong(0);
@Override
public void onEvent(TradeTransaction event, long sequence,
boolean endOfBatch) throws Exception {
this.onEvent(event);
}
@Override
public void onEvent(TradeTransaction event) throws Exception {
//这里做具体的消费逻辑
event.setId(UUID.randomUUID().toString());//简单生成下ID
System.out.println(event.getId()+"&& price:"+event.getPrice()+"count: "+count.getAndIncrement());
}
}
切记:一定要在设置值的地方加上
try{
}finally{
},
否则如果数据发布不成功,最后数据会逐渐填满ringbuffer,最后后面来的数据根本没有办法调用可用空间,导致方法阻塞,占用CPU和内存,无法释放资源,最后导致服务器死机
注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
static class Translator implements EventTranslatorOneArg{
@Override
publicvoid translateTo(LongEvent event, long sequence, Long data) {
event.set(data);
}
}
public static Translator TRANSLATOR = new Translator();
public staticvoid publishEvent2(Disruptor disruptor) {
// 发布事件;
RingBuffer ringBuffer = disruptor.getRingBuffer();
long data = getEventDataxxxx();//获取要通过事件传递的业务数据;
ringBuffer.publishEvent(TRANSLATOR,data);
}
多个生产者
在构建Disruptor实例的时候,需要指定生产者是单生产者(ProducerType.SINGLE)还是多生产者(ProducerType.MULTI)
多个消费者
(类型1) 多个消费者每个消费者都有机会消费相同数据,使用handleEventsWith方法
class ComsumerHandler implements EventHandler{
private int no;
public ComsumerHandler(int no) {
this.no=no;
}
@Override
public void onEvent(ResultTxt resultTxt, long sequence, boolean endOfBatch)
throws Exception {
System.out.println(no+" data commming......"+resultTxt.getBarCode());
}
}
//设置多个消费者
disruptor.handleEventsWith(new ComsumerHandler(1),new ComsumerHandler(2));
(类型2) 多个消费者,每个消费者消费不同数据。也就是说每个消费者竞争数据,竞争到消费,其他消费者没有机会。使用handleEventsWithWorkerPool方法class ComsumerHandler implements WorkHandler{
private int no;
public ComsumerHandler(int no) {
this.no=no;
}
@Override
public void onEvent(ResultTxt event)
throws Exception {
System.out.println(no+" data commming......"+event.getBarCode());
}
}
//多个消费者,每个消费者竞争消费不同数据
disruptor.handleEventsWithWorkerPool(new ComsumerHandler(1),new ComsumerHandler(2));
等待策略
生产者的等待策略
暂时只有休眠1ns。
LockSupport.parkNanos(1);
消费者的等待策略
名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: resourcesapplication.properties 常用配置
端口号 server.port=10000 应用的上下文路径(项目路径) server.servlet.context-path=/allModel 指定POJO 扫描包来让mybatis 自动扫描到自定义的POJO mybatis.type-aliases-…