pentiumchen的专栏

Disruptor是一个高吞吐量的异步处理框架,LMAX号称用它可以1秒钟处理600万订单,从一个简单的例子来分析Disruptor,下面的代码是基于2.10.4版本,不同的版本API可能会有些变化,代码仅供参考。

private static void testDisruptor() {RingBuffer<MyEvent> ringBuffer = new RingBuffer<MyEvent>(new EventFactory<MyEvent>() {@Overridepublic MyEvent newInstance() {return new MyEvent();}}, new SingleThreadedClaimStrategy(RING_SIZE),new BlockingWaitStrategy());// SequenceBarrierSequenceBarrier barrier1 = ringBuffer.newBarrier();// 注册一个EventProcessorBatchEventProcessor<MyEvent> processor1 = new BatchEventProcessor<MyEvent>(ringBuffer, barrier1, new EventHandler<MyEvent>() {@Overridepublic void onEvent(MyEvent event, long sequence,boolean endOfBatch) throws Exception {System.out.println(event.s + "A:" + event.i+ ":Thread.id-"+ Thread.currentThread().getId());}});// ringBuffer.setGatingSequences(Util.getSequencesFor(processor1));publishEvent(ringBuffer);EXECUTOR.execute(processor1);}private static void publishEvent(RingBuffer<MyEvent> ringBuffer) {final EventPublisher<MyEvent> eventPublisher = new EventPublisher<MyEvent>(ringBuffer);EXECUTOR_0.execute(new Runnable() {@Overridepublic void run() {for (long i = 0; i < Long.MAX_VALUE; i++) {final long number = i;eventPublisher.publishEvent(new EventTranslator<MyEvent>() {@Overridepublic void translateTo(MyEvent event, long sequence) {event.s = "a" + number;event.i = "b" + number;}});}}});}首先来看下RingBuffer,RingBuffer是Disruptor框架的核心,它巧妙的设计是Disrutpor高性能的关键。RingBuffer是基于数组的实现的环形队列,与基于链表的线程安全队列LinkedBlockingQueue相比,RingBuffer不会删除元素对象对GC友好,而且数组元素访问比链表更快。相比于同样基于数组的线程安全队列ArrayBlockingQueue,RingBuffer同样具有它的优势,来看看RingBuffer的代码,由于篇幅省略一些代码只列出比较关键的代码:public final class RingBuffer<T> extends Sequencer{…}public class Sequencer{/** Set to -1 as sequence starting point */public static final long INITIAL_CURSOR_VALUE = -1L;private final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);private Sequence[] gatingSequences;private final ClaimStrategy claimStrategy;private final WaitStrategy waitStrategy;/*** Construct a Sequencer with the selected strategies.** @param claimStrategy for those claiming sequences.* @param waitStrategy for those waiting on sequences.*/public Sequencer(final ClaimStrategy claimStrategy, final WaitStrategy waitStrategy){this.claimStrategy = claimStrategy;this.waitStrategy = waitStrategy;}/*** Claim the next event in sequence for publishing.** @return the claimed sequence value*/public long next(){if (null == gatingSequences){throw new NullPointerException("gatingSequences must be set before claiming sequences");}return claimStrategy.incrementAndGet(gatingSequences);}…private void publish(final long sequence, final int batchSize){claimStrategy.serialisePublishing(sequence, cursor, batchSize);waitStrategy.signalAllWhenBlocking();}}生产者线程发布数据到RingBuffer时要走两个步骤,调用next方法维护并获取当前可插入数据的位置,调用publish插入数据并且发布插入数据的位置。next方法会进入ClaimStrategy接口的incrementAndGet方法,生产者是单线程写策略时实现类为SingleThreadedClaimStrategy,多线程写策略是AbstractMultithreadedClaimStrategy。看看单线程策略SingleThreadedClaimStrategy代码:public final class SingleThreadedClaimStrategyimplements ClaimStrategy{private final int bufferSize;private final PaddedLong minGatingSequence = new PaddedLong(Sequencer.INITIAL_CURSOR_VALUE);private final PaddedLong claimSequence = new PaddedLong(Sequencer.INITIAL_CURSOR_VALUE);…@Overridepublic long incrementAndGet(final Sequence[] dependentSequences){long nextSequence = claimSequence.get() + 1L;claimSequence.set(nextSequence);waitForFreeSlotAt(nextSequence, dependentSequences);return nextSequence;}@Overridepublic long incrementAndGet(final int delta, final Sequence[] dependentSequences){long nextSequence = claimSequence.get() + delta;claimSequence.set(nextSequence);waitForFreeSlotAt(nextSequence, dependentSequences);return nextSequence;}@Overridepublic void serialisePublishing(final long sequence, final Sequence cursor, final int batchSize){cursor.set(sequence);}…private void waitForFreeSlotAt(final long sequence, final Sequence[] dependentSequences){final long wrapPoint = sequence – bufferSize;if (wrapPoint > minGatingSequence.get()){long minSequence;while (wrapPoint > (minSequence = getMinimumSequence(dependentSequences))){LockSupport.parkNanos(1L);}minGatingSequence.set(minSequence);}}}incrementAndGet就做了两件事情:1、写入位置(claimSequence变量)加1得到当前写入位置,因为是单线程策略,所以无需加任何锁;2、检查计算出的写入位置是否是可写的,什么意思呢?因为RingBuffer是环形的,一旦数据发布到某个位置之后,只有所有的消费者消费了这个位置的数据之后,这个位置才能被覆盖写入新的数据,所以如果生产者发布数据过快,那么非常有可能在数据写了一圈之后发现前面位置的数据还没有被消费,那么这时候生产者线程需要挂起等待消费者消费数据之后腾出位置。啰嗦了这么久看了一坨代码,那么现在总结一下RingBuffer和ArrayBlockingQueue比到底有啥优势,在学学JUC(一)– BlockingQueue分析过ArrayBlockingQueue,ArrayBlockingQueue有三个可变的状态:队列头指针(takeIndex)、队列尾指针(putIndex)、队列长度(count),在多线程并发时这三个变量都是潜在的竞争点。而RingBuffer只有数据写入位置(claimSequence)一个竞争点,,没有长度变量,读指针由消费者维护(后面介绍),在单写策略下,甚至连写入位置这个竞争点都是不存在的。所以在高并发下,RingBuffer的性能肯定是要优于ArrayBlockingQueue的,这些优势对于LinkedBlockingQueue同样有效。在单写策略SingleThreadedClaimStrategy代码中看到claimSequence变量是一个PaddedLong类型,来看看这是个什么东东:public final class PaddedLong extends MutableLong{public volatile long p1, p2, p3, p4, p5, p6 = 7L;…}public class MutableLong{private long value = 0L;…}看到一段奇怪的代码,PaddedLong中定义了p1-p6六个看起来没什么用处的的long类型变量,为什么要定义这些不会被使用的变量呢?经过对谷歌度娘的一番骚扰后终于明白了原委,这是跟CPU高速缓存有关的,CPU缓存是以缓存行为单位的,缓存行的大小通常是64个字节,CPU在加载数据到缓存的时候会把地址相邻的变量都加载到缓存行把缓存行填满,这种方式会存在一个问题,当把总大小64字节的多个变量都加载到缓存行之后,只要其中一个变量发生变化,那么会导致整个缓存行失效,从而导致整个缓存行的数据都被刷出,这种现象有个专门的术语描述叫false sharing(伪共享),关于伪共享大神有比较详细的阐述False Sharing。定义这6个long类型的变量的目的就是为了填充缓存行,让value字段单独占用一个缓存行消除伪共享(这里不得不跪拜大神对性能极致的追求,看来写出高质量的代码对计算机体系架构也是必须要了解的)。不是每一次努力都有收获,但是,每一次收获都必须经过努力。

pentiumchen的专栏

相关文章:

你感兴趣的文章:

标签云: