disruptor学习笔记
LMAX Disruptor是一个高性能的线程间消息库。
核心组件
- Ring Buffer: Disruptor底层数据结构实现,核心类,是线程间交换数据的中转地;
- Sequence: 序号,声明一个序号,用于跟踪ringbuffer中任务的变化和消费者的消费情况;
- Sequencer: Sequencer是disruptor的真正核心。序号管理器,负责消费者/生产者各自序号、序号栅栏的管理和协调;
- Sequence Barrier: 序号栅栏,管理和协调生产者的游标序号和各个消费者的序号,确保生产者不会覆盖消费者未来得及处理的消息,确保存在依赖的消费者之间能够按照正确的顺序处理;
- Wait Strategy: 等待策略,决定了消费者如何等待事件。
- Event : 生产者传递给消费者的数据单元。
- EventProcessor: 事件处理器,监听RingBuffer的事件,并消费可用事件,从RingBuffer读取的事件会交由实际的生产者实现类来消费;它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好。
- EventHandler: 业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口;代表着消费者。
- Producer: 生产者接口,第三方线程充当该角色,producer向RingBuffer写入事件(实际调用Sequencer方法)。对于ringbuffer内的存储空间而言,实际上的生产者是Sequencer。
点击查看大图
如何使用
使用例子来自 官方demo
event
首先,定义将携带数据的事件。
1 | public class LongEvent{ |
为了让Disruptor能够预先分配这些事件,我们需要一个工厂EventFactory。
1 | import com.lmax.disruptor.EventFactory; |
consumer
1 | import com.lmax.disruptor.EventHandler; |
producer
1 | import com.lmax.disruptor.RingBuffer; |
Disruptor的3.0版本,添加了更丰富的Lambda风格的API,以帮助开发人员将此复杂性封装在Ring Buffer中,因此3.0之后发布消息的首选方法是通过API的Event Publisher / Event Translator部分。例如:
1 | import com.lmax.disruptor.RingBuffer; |
这种方法的另一个优点是翻译器代码可以被拉入一个单独的类中,并可以轻松地单独进行单元测试。Disruptor提供了许多不同的接口(EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg等),可以实现这些接口以提供翻译。
example
1 | import com.lmax.disruptor.dsl.Disruptor; |
源码分析
时序图
点击查看大图
DSL 类图
点击查看大图
com.lmax.disruptor.dsl
包描述了disruptor的领域模型。
- Disruptor: 对外暴露的门面类,提供start(),stop(),消费者事件注册,生产者事件发布等api;
- RingBuffer: 对生产者提供下一序号获取、entry元素获取、entry数据更改等api;
- EventHandler: 消费者的接口定义,提供onEvent()方法,负责具体业务逻辑实现;
- EventHandlerGroup——业务处理器分组,管理多个业务处理器的依赖关系,提供then()、before()、after()等api;
- ConsumerRepository: 维持
EventHandler
s 和EventProcessor
s 关系的仓库;
Sequence
这里对于Sequence的核心就是value这个volatile long类型的变量,它就是代表下一个位置。p1~p15这些是为了避免CPU伪共享的出现(false sharing)。在value前后添加,保证每个CacheLine中只有一个Disruptor中的Sequence。
Java最初被设计为一种安全的受控环境。尽管如此,Java HotSpot还是包含了一个“后门”,提供了一些可以直接操控内存和线程的低层次操作。这个后门类——sun.misc.Unsafe——被JDK广泛用于自己的包中,如java.nio和java.util.concurrent。但是丝毫不建议在生产环境中使用这个后门。因为这个API十分不安全、不轻便、而且不稳定。这个不安全的类提供了一个观察HotSpot JVM内部结构并且可以对其进行修改。有时它可以被用来在不适用C++调试的情况下学习虚拟机内部结构,有时也可以被拿来做性能监控和开发工具。
1 | try{ |
可以通过反射获取Unsafe静态final实例。
下面详细分析Sequence源码:
1 | public class Sequence extends RhsPadding { |
Producer(Sequencer)
- Cursored接口:实现此接口的类,可以理解为,记录某个sequence的类。例如,生产者在生产消息时,需要知道当前ringBuffer下一个生产的位置,这个位置需要更新,每次更新,需要访问getCursor来定位。
- Sequenced接口:实现此接口类,可以理解为,实现一个有序的存储结构,也就是RingBuffer的一个特性。一个Producer,在生产Event时,先获取下一位置的Sequence,之后填充Event,填充好后再publish,这之后,这个Event就可以被消费处理了。
- getBufferSize获取ringBuffer的大小
- hasAvailableCapacity判断空间是否足够
- remainingCapacity获取ringBuffer的剩余空间
- next申请下一个或者n个sequence(value)作为生产event的位置
- tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException
- publish发布Event
- Sequencer接口:Sequencer接口,扩展了Cursored和Sequenced接口。
- INITIAL_CURSOR_VALUE: -1 为 sequence的起始值
- claim: 申请一个特殊的Sequence,只有设定特殊起始值的ringBuffer时才会使用(一般是多个生产者时才会使用)
- isAvailable:非阻塞,验证一个sequence是否已经被published并且可以消费.
- addGatingSequences:将这些sequence加入到需要跟踪处理的gatingSequences中.
- removeGatingSequence:移除某个sequence.
- newBarrier:给定一串需要跟踪的sequence,创建SequenceBarrier。SequenceBarrier是用来给多消费者确定消费位置是否可以消费用的.
- getMinimumSequence:获取这个ringBuffer的gatingSequences中最小的一个sequence.
- getHighestPublishedSequence:获取最高可以读取的Sequence.
抽象类AbstractSequencer实现Sequencer这个接口,定义了5个域.
1 | // 是用来原子更新gatingSequences 的工具类 |
AbstractSequencer的构造函数中对bufferSize做了约束,只能是大于1的2的幂。
1 | public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) |
对于newBarrier,返回的是一个ProcessingSequenceBarrier:SequenceBarrier之后会详细说明,这里我们可以理解为用来协调消费者消费的对象。例如消费者A依赖于消费者B,就是消费者A一定要后于消费者B消费,也就是A只能消费B消费过的,也就是A的sequence一定要小于B的。这个Sequence的协调,通过A和B设置在同一个SequenceBarrier上实现。同时,我们还要保证所有的消费者只能消费被Publish过的。
Disruptor分为单生产者和多生产者,先来关注下单生产者的核心类SingleProducerSequencer。
1 | /** |
RingBuffer的头由AbstractSequencer中名字为Cursor的Sequence对象维护,用来协调生产者向RingBuffer中填充数据。表示队列尾的Sequence,是由消费者维护。这样的话,队列尾的维护就是无锁的。但是,在生产者方确定RingBuffer是否已满就需要跟踪更多信息。为此,AbstractSequencer中有个属性Sequence[] gatingSequences是用来跟踪相关Sequence。
下面用一个例子来描述工作流程:
RingBuffer: size = 4,假设有不断通过SequenceBarrier消费的消费者。SingleProducerSequencer的gatingSequences数组内保存这一个指向某个Sequence的引用,同时这个Sequence也会被SequenceBarrier更新以表示消费者消费到哪里了。(disruptor注册消费者时,会把消费者维护的队列尾的Sequence放到gatingSequences数组中。)
field | SingleProducerSequencer |
---|---|
nextValue | -1 |
cachedValue | -1 |
cursor | -1 |
gatingSequences | [{Sequence:-1}] |
- SingleProducerSequencer这时生产两个Event,要放入RingBuffer。则假设先调用hasAvailableCapacity(2)判断下。代码流程是:
wrapPoint = (nextValue + requiredCapacity) - bufferSize = (-1 + 2) - 4 = -3 ; -3 < cachedValue
,所以不用检查gateSequences直接返回true。假设next(2)成功,之后调用publish更新cursor,这样消费者调用isAvailable根据Cursor就可以判断,sequence:0和sequence:1可以消费了。 - 假设这之后,消费者消费了一个Event,更新Sequence为0.
field | SingleProducerSequencer |
---|---|
nextValue | 1 |
cachedValue | -1 |
cursor | 1 |
gatingSequences | [{Sequence:0}] |
- 生产者要生产四个Event,调用hasAvailableCapacity(4)检查。代码流程是:
wrapPoint = (nextValue + requiredCapacity) - bufferSize = (1 + 4) - 4 = 1 ;1 > cachedValue
。所以要重新检查,这时最小的Sequence是0,但是1 仍然大于最小的Sequence,所以更新cachedValue,返回false。
下面看看多生产者核心类MultiProducerSequencer:
MultiProducerSequencer是多生产者类,线程安全,与单一生产者不同的是,这里的cursor不再是可以消费的标记,而是多线程生产者抢占的标记。可以消费的sequence由availableBuffer来判断标识。这个类没有缓存行填充。
1 | private static final Unsafe UNSAFE = Util.getUnsafe(); |
RingBuffer
disruptor构造时会创建ringbuffer实例(调用RingBuffer.create
)。在RingBuffer.create
方法中,会根据不同的生产者情况(多个或单个)创建对应的序号管理器,然后是调用RingBuffer的构造方法,并返回。
1 | public static <E> RingBuffer<E> create( |
下面来看看 RingBuffer的构造过程。
1 | public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> |
再来看看RingBuffer实现的接口:
1 | public interface DataProvider<T>{ |
EventSequencer接口没有自己的方法,只是为了将Sequencer和DataProvider合起来。
EventSink代表RingBuffer是一个以Event槽为基础的数据结构。同时实现EventSequencer和EventSink代表RingBuffer是一个以Event槽为基础元素保存的数据结构。
EventSink接口的主要方法都是发布Event,发布一个Event的流程是:
- 申请下一个Sequence
- 申请成功则获取对应槽的Event
- 初始化并填充对应槽的Event
- 发布Event。
这里,初始化,填充Event是通过实现EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg,EventTranslatorThreeArg,EventTranslatorVararg这些EventTranslator来做的。我们看下EventTranslator,EventTranslatorOneArg和EventTranslatorVararg的源码:
1 | public interface EventTranslator<T> |
他们由生产者用户实现,将Event初始化并填充。在发布一条Event的时候,这些Translator的translate方法会被调用。在translate方法初始化并填充Event。对于EventTranslator,translate方法只接受Event和Sequence作为参数,对于其他的,都还会接受一个或多个参数用来初始化并填充Event。
EventSink接口是用来发布Event的,在发布的同时,调用绑定的Translator来初始化并填充Event。EventSink接口的大部分方法接受不同的Translator来处理Event:
1 | public interface EventSink<E> { |
RingBufferd的控制大部分是通过sequencer来实现的。
EventHandler
EventHandler只有一个onEvent方法,OnEvent抽象了事件处理逻辑。
1 | public interface EventHandler<T>{ |
WorkHandler类似于EventHandler,主要用于WorkerPool。
1 | public interface WorkHandler<T>{ |
WaitStrategy
在Disruptor中,有很多需要等待的情况。等待方法的具体实现是由WaitStrategy实现。
1 | public interface WaitStrategy { |
BlockingWaitStrategy
BlockingWaitStrategy是一种利用锁和等待机制的WaitStrategy,CPU消耗少,但是延迟比较高。
1 | public final class BlockingWaitStrategy implements WaitStrategy{ |
SleepingWaitStrategy
SleepingWaitStrategy是另一种较为平衡CPU消耗与延迟的WaitStrategy,在不同次数的重试后,采用不同的策略选择继续尝试或者让出CPU或者sleep。这种策略延迟不均匀。
1 | public final class SleepingWaitStrategy implements WaitStrategy { |
YieldingWaitStrategy
YieldingWaitStrategy 会使用100%CPU,当别的线程请求CPU资源时,会易于放弃CPU占用。
1 | public final class YieldingWaitStrategy implements WaitStrategy{ |
BusySpinWaitStrategy
BusySpinWaitStrategy是一种延迟最低,最耗CPU的策略。通常用于消费线程数小于CPU数的场景。
1 | public final class BusySpinWaitStrategy implements WaitStrategy{ |
剩下的WaitStrategy此处就不详细说明。
SequenceBarrier
序号栅栏(SequenceBarrier)和序号(Sequence)搭配使用,协调和管理消费者与生产者的工作节奏,避免了锁和CAS的使用。在Disruptor3.0中,各个消费者和生产者持有自己的序号,这些序号的变化必须满足如下基本条件:
- 消费者序号数值必须小于生产者序号数值;
- 消费者序号数值必须小于其前置(依赖关系)消费者的序号数值;
- 生产者序号数值不能大于消费者中最小的序号数值,以避免生产者速度过快,将还未来得及消费的消息覆盖。
第三点在生产者一节中已经详细分析。
SequenceBarrier只有一个实现类,就是ProcessingSequenceBarrier。
1 | final class ProcessingSequenceBarrier implements SequenceBarrier |
EventProcessor
EventProcessor抽象了消费方式(如何触发消费方法)。通过类依赖关系可以发现,EventProcessor都是实现了Runnable接口,可以把它们当做线程处理。
BatchEventProcessor
1 | public final class BatchEventProcessor<T> implements EventProcessor{ |
WorkProcessor
1 | public void run() |
WorkerPool
1 | public final class WorkerPool<T>{ |
参考资料:
- 官方wiki
- 线程间共享数据无需竞争
- 并发框架Disruptor译文
- Disruptor3.0的实现细节
- 高并发数据结构Disruptor解析