Java线程间通讯概述
副标题#e#
这个故事源自一个很简朴的想法:建设一个对开拓人员友好的、简朴轻量的线程间通讯框架,完全不 用锁、同步器、信号量、期待和通知,在Java里开拓一个轻量、无锁的线程内通讯框架;而且也没有行列 、动静、事件或任何其他并发专用的术语或东西。
只用普通的老式Java接话柄现POJO的通讯。
它大概跟Akka的范例化actor雷同,但作为一个必需超等轻量,而且要针对单台多核计较机举办优化的 新框架,谁人大概有点过了。
当actor超过差异JVM实例(在同一台呆板上,或漫衍在网络上的差异呆板上)的历程界线时,Akka框 架很善于处理惩罚历程间的通讯。
但对付那种只需要线程间通讯的小型项目而言,用Akka范例化actor大概有点儿像用牛刀杀鸡,不外类 型化actor仍然是一种抱负的实现方法。
我花了几天时间,用动态署理,阻塞行列缓和存线程池建设了一个办理方案。
图一是这个框架的高条理架构:
图一: 框架的高条理架构
SPSC行列是指单一出产者/单一消费者行列。MPSC行列是指多出产者/单一消费者行列。
派发线程认真吸收Actor线程发送的动静,并把它们派发到对应的SPSC行列中去。
吸收到动静的Actor线程用个中的数据挪用相应的actor实例中的要领。借助其他actor的署理,actor 实例可以将动静发送到MPSC行列中,然后动静会被发送给方针actor线程。
我建设了一个简朴的例子来测试,就是下面这个打乒乓球的措施:
public interface PlayerA ( void pong(long ball); //发完就忘的要领挪用 } public interface PlayerB { void ping(PlayerA playerA, long ball); //发完就忘的要领挪用 } public class PlayerAImpl implements PlayerA { @Override public void pong(long ball) { } } public class PlayerBImpl implements PlayerB { @Override public void ping(PlayerA playerA, long ball) { playerA.pong(ball); } } public class PingPongExample { public void testPingPong() { // 打点器埋没了线程间通讯的巨大性 // 节制actor署理,actor实现和线程 ActorManager manager = new ActorManager(); // 在打点器内注册actor实现 manager.registerImpl(PlayerAImpl.class); manager.registerImpl(PlayerBImpl.class); //建设actor署理。署剖析将要领挪用转换成内部动静。 //会在线程间发给特定的actor实例。 PlayerA playerA = manager.createActor(PlayerA.class); PlayerB playerB = manager.createActor(PlayerB.class); for(int i = 0; i < 1000000; i++) { playerB.ping(playerA, i); } }
#p#副标题#e#
颠末测试,速度约莫在每秒500,000 次乒/乓阁下;还不错吧。然而跟单线程的运行速度比起来,我突 然就感受没那么好了。在 单线程 中运行的代码每秒速度能到达20亿 (2,681,850,373) !
居然差了5,000 多倍。太让我失望了。在大大都环境下,单线程代码的结果都比多线程代码更高效。
我开始找原因,想看看我的乒乓球运带动们为什么这么慢。颠末一番调研和测试,我发明是阻塞行列 的问题,我用来在actor间通报动静的行列影响了机能。
图 2: 只有一个出产者和一个消费者的SPSC行列
所以我提倡了一场比赛,要将它换成Java里最快的行列。我发明白Nitsan Wakart的 博客 。他发了几篇文章先容单一出产者/单一消费者 (SPSC)无锁行列的实现。这些文章受到了Martin Thompson的演讲 终极机能的无锁算法的开导 。
跟基于私有锁的行列对比,无锁行列的机能更优。在基于锁的行列中,当一个线程获得锁时,其它线 程就要等着锁被释放。而在无锁的算法中,某个出产者线程出产动静时不会阻塞其它出产者线程,消费者 也不会被其它读取行列的消费者阻塞。
在Martin Thompson的演讲以及在Nitsan的博客中先容的SPSC行列的机能的确令人难以置信 —— 高出了100M ops/sec。比JDK的并刊行列实现还要快10倍 (在4核的 Intel Core i7 上的机能约莫在 8M ops/sec 阁下)。
我怀着极大的期望,将所有actor上毗连的链式阻塞行列都换成了无锁的SPSC行列。惋惜,在吞吐量上 的机能测试并没有像我预期的那样呈现大幅晋升。不外很快我就意识到,瓶颈并不在SPSC行列上,而是在 多个出产者/单一消费者(MPSC)哪里。
用SPSC行列做MPSC行列的任务并不那么简朴;在做put操纵时,多个出产者大概会包围掉互相的值。 SPSC 行列就没有节制多个出产者put操纵的代码。所以即便换成最快的SPSC行列,也办理不了我的问题。
为了处理惩罚多个出产者/单一消费者的环境,我抉择启用LMAX Disruptor ——一个基于环形缓冲区的高机能进 程间动静库。
图3: 单一出产者和单一消费者的LMAX Disruptor
#p#分页标题#e#
借助Disruptor,很容易实现低延迟、高吞吐量的线程间动静通讯。它还为出产者和消费者的差异组合 提供了差异的用例。几个线程可以互不阻塞地读取环形缓冲中的动静:
图 4: 单一出产者和两个消费者的LMAX Disruptor
下面是有多个出产者写入环形缓冲区,多个消费者从中读打动静的场景。
图 5: 两个出产者和两个消费者的LMAX Disruptor
颠末对机能测试的快速搜索,我找到了 三个宣布者和一个消费者的吞吐量测试。 这个真是正合我意,它给出了下面这个功效:
在3 个出产者/1个 消费者场景下, Disruptor要比LinkedBlockingQueue快两倍多。然而这跟我所期 望的机能上晋升10倍仍有很大差距。
这让我以为很沮丧,而且我的大脑一直在搜寻办理方案。就像掷中注定一样,我最近不在跟人拼车上 下班,而是改乘地铁了。溘然灵光一闪,我的大脑开始将车站跟出产者消费者对应起来。在一个车站里, 既有出产者(车和下车的人),也有消费者(同一辆车和上车的人)。
我建设了 Railway类,并用AtomicLong追踪从一站到下一站的列车。我先从简朴的场景开始,只有一 辆车的铁轨。
public class RailWay { private final Train train = new Train(); // stationNo追踪列车并界说哪个车站吸收到了列车 private final AtomicInteger stationIndex = new AtomicInteger(); // 会有多个线程会见这个要领,并期待特定车站上的列车 public Train waitTrainOnStation(final int stationNo) { while (stationIndex.get() % stationCount != stationNo) { Thread.yield(); // 为担保高吞吐量的动静通报,这个是必需的。 //但在期待列车时它会耗损CPU周期 } // 只有站号便是stationIndex.get() % stationCount时,这个忙轮回才会返回 return train; } // 这个要领通过增加列车的站点索引将这辆列车移到下一站 public void sendTrain() { stationIndex.getAndIncrement(); } }
为了测试,我用的条件跟在Disruptor机能测试顶用的一样,而且也是测的SPSC行列——测 试在线程间通报long值。我建设了下面这个Train类,个中包括了一个long数组:
public class Train { // public static int CAPACITY = 2*1024; private final long[] goodsArray; // 传输运输货品的数组 private int index; public Train() { goodsArray = new long[CAPACITY]; } public int goodsCount() { //返回货品数量 return index; } public void addGoods(long i) { // 向列车中添加条目 goodsArray[index++] = i; } public long getGoods(int i) { //从列车中移走条目 index--; return goodsArray[i]; } }
然后我写了一个简朴的测试 :两个线程通过列车相互通报long值。
图 6: 利用单辆列车的单一出产者和单一消费者Railway
public void testRailWay() { final Railway railway = new Railway(); final long n = 20000000000l; //启动一个消费者历程 new Thread() { long lastValue = 0; @Override public void run() { while (lastValue < n) { Train train = railway.waitTrainOnStation(1); //在#1站等列车 int count = train.goodsCount(); for (int i = 0; i < count; i++) { lastValue = train.getGoods(i); // 卸货 } railway.sendTrain(); //将当前列车送到第一站 } } }.start(); final long start = System.nanoTime(); long i = 0; while (i < n) { Train train = railway.waitTrainOnStation(0); // 在#0站等列车 int capacity = train.getCapacity(); for (int j = 0; j < capacity; j++) { train.addGoods((int)i++); // 将货品装到列车上 } railway.sendTrain(); if (i % 100000000 == 0) { //每隔100M个条目丈量一次机能 final long duration = System.nanoTime() - start; final long ops = (i * 1000L * 1000L * 1000L) / duration; System.out.format("ops/sec = %,d\n", ops); System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY); System.out.format("latency nanos = %.3f%n\n", duration / (float)(i) * (float)Train.CAPACITY); } } }
在差异的列车容量下运行这个测试,功效惊着我了:
在列车容量到达32,768时,两个线程传送动静的吞吐量到达了767,028,751 ops/sec。比Nitsan博客中 的SPSC行列快了几倍。
#p#分页标题#e#
继承按铁路列车这个思路思考,我想知道假如有两辆列车会怎么样?我以为应该能提高吞吐量,同时 还能低落延迟。每个车站城市有它本身的列车。当一辆列车在第一个车站装货时 ,第二辆列车会在第二个车站卸货,反之亦然。
图 7: 利用两辆列车的单一出产者和单一消费者Railway
下面是吞吐量的功效:
功效是惊人的;比单辆列车的功效快了1.4倍多。列车容量为一时,延迟从192.6纳秒低落到133.5纳秒 ;这显然是一个令人激昂的迹象。
因此我的尝试还没竣事。列车容量为2048的两个线程通报动静的延迟为2,178.4 纳秒,这太高了。我 在想如何低落它,建设一个有许多辆列车 的例子:
图 8: 利用多辆列车的单一出产者和单一消费者Railway
查察本栏目
我还把列车容量降到了1个long值,开始玩起了列车数量。下面是测试功效:
用32,768 列车在线程间发送一个long值的延迟低落到了13.9 纳秒。通过调解列车数量和列车容量, 当延时不那么高,吞吐量不那么低时,吞吐量和延时就到达了最佳均衡。
对付单一出产者和单一消费者(SPSC)而言,这些数值很棒;但我们怎么让它在有多个出产者和消费者 时也能生效呢?谜底很简朴,添加更多的车站!
图 9:一个出产者和两个消费者的Railway
每个线程都等着下一趟列车,装货/卸货,然后把列车送到下一站。在出产者往列车上装货时,消费者 在从列车上卸货。列车周而复始地从一个车站转到另一个车站。
为了测试单一出产者/多消费者(SPMC) 的环境,我建设了一个有8个车站的Railway测试。 一个车站属于一个出产者,而另 外7个车站属于消费者。功效是:
列车数量 = 256 ,列车容量 = 32:
ops/sec = 116,604,397 延迟(纳秒) = 274.4
列车数量= 32,列车容量= 256:
ops/sec = 432,055,469 延迟(纳秒) = 592.5
如你所见,即便有8个事情线程,测试给出的功效也相当好– 32辆容量为256个long的列车吞吐量为 432,055,469 ops/sec。在测试期间,所有CPU内核的负载都是100%。
图 10:在测试有8个车站的Railway 期间的CPU 利用环境
在玩这个Railway算法时,我险些忘了我最初的方针:晋升多出产者/单消费者环境下的机能。
图 11:三个出产者和一个消费者的 Railway
我建设了3个出产者和1个消费者的新测试。 每辆列车一站一站地转圈,而每个出产者只给每辆车装1/3容量的货。消费者取出每辆车上三个出产者给 出的全部三项货品。机能测试给出的平均功效如下所示:
ops/sec = 162,597,109 列车/秒 = 54,199,036 延迟(纳秒) = 18.5
功效相当棒。出产者和消费者事情的速度高出了160M ops/sec。
为了填补差别,下面给出沟通环境下的Disruptor功效- 3个出产者和1个消费者:
Run 0, Disruptor=11,467,889 ops/sec Run 1, Disruptor=11,280,315 ops/sec Run 2, Disruptor=11,286,681 ops/sec Run 3, Disruptor=11,254,924 ops/sec
下面是另一个批量动静的Disruptor 3P:1C 测试 (10 条动静每批):
Run 0, Disruptor=116,009,280 ops/sec Run 1, Disruptor=128,205,128 ops/sec Run 2, Disruptor=101,317,122 ops/sec Run 3, Disruptor=98,716,683 ops/sec;
最后是用带LinkedBlockingQueue 实现的Disruptor 在3P:1C场景下的测试功效:
Run 0, BlockingQueue=4,546,281 ops/sec Run 1, BlockingQueue=4,508,769 ops/sec Run 2, BlockingQueue=4,101,386 ops/sec Run 3, BlockingQueue=4,124,561 ops/sec
如你所见,Railway方法的平均吞吐量是162,597,109 ops/sec,而Disruptor在同样的环境下的最好结 果只有128,205,128 ops/sec。至于 LinkedBlockingQueue,最好的功效只有4,546,281 ops/sec。
Railway算法为事件批处理惩罚提供了一种可以显著增加吞吐量的浅易步伐。通过调解列车容量或列车数量 ,很容易告竣想要的吞吐量/延迟。
别的, 当同一个线程可以用来消费动静,处理惩罚它们并向环中返回功效时,通过殽杂出产者和消费者, Railway也能用来处理惩罚巨大的环境:
图 12: 殽杂出产者和消费者的Railway
最后,我会提供一个颠末优化的超高吞吐量 单出产者/单消费者测试:
图 13:单个出产者和单个消费者的Railway
#p#分页标题#e#
它的平均功效为:吞吐量高出每秒15亿 (1,569,884,271)次操纵,延迟为1.3 微秒。如你所见,本文 开头描写的谁人局限沟通的单线程测试的功效是每秒2,681,850,373。
你本身想想结论是什么吧。
我但愿未来再写一篇文章,阐发如何用Queue和 BlockingQueue接口支持Railway算法,用来处理惩罚差异 的出产者和消费者组合。