愚以为,Java 并发编程的难点在于,反常识!因为并发、并行本身,是有悖于我们大脑的工作模式的,也就是说,我们长期的写码不得不去思考多线程可能带来的问题,而一旦将这一个一个的线程组合起来,奇妙的 bug 发生了...
Ch0-并发编程基础
并发编程 Bug 的源头
线程安全性
首先我们要明确,线程安全性的定义中,最核心的概念就是正确性!!!(重要的事情感叹三遍)。当正确性无法得到保证时,Bug 便出现了。
一个线程安全的类,就是当多个线程访问它时,这个类始终都能表现出正确的行为。
原子性、可见性与有序性
原子性
- 给用户提供了字节码指令:monitorenter 和 monitorexit 来隐式的使用 lock 和 unlock。
- 这两个字节码反映到 Java 代码中就是同步块:synchronized。
可见性
- 当一条线程修改了共享变量的值,其他线程可以立即得知这个修改。
- 实现方式:在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值的方式实现,依赖主内存作为传输媒质。
- 可以保证可见性的关键字:
volatile
:通过 volatile 的特殊规则;synchronized
:通过“对一个变量执行 unlock 操作前,必须将该变量同步回主内存”这条规则。final
:被 final 修饰的字段,一旦完成了初始化,其他线程就能看到它,并且它也不会再变了。- 即只要不可变对象被正确的构建出来(没有发生 this 引用溢出),它就是线程安全的。
有序性
- 如果在本线程内观察,所有操作都是有序的。
- 即 Java 内存模型会保证重排序后的执行,在线程内看起来和串行的效果是一样的。
- 如果在一个线程观察另一个线程,所有操作都是无序的。
- 可以保证有序性的关键字:
- volatile:本身禁止指令重排序;
- synchronized:通过保证线程的串行执行来保证有序性,因为“线程内表现为串行的语义”。
- 如果两个操作之间缺乏 Happens-Before 规则,那么 JVM 可对它们任意地排序。
00-Java内存模型
为了更好的理解 Java 是如何实现 按需禁用缓存和编译优化 的,我们首先需要对 Java 的内存模型有一个初步的了解。
Java 内存模型简介
Java 内存模型主要由以下三部分构成:1 个主内存、n 个线程、n 个工作内存(与线程一一对应),数据就在它们三者之间来回倒腾。那么怎么倒腾呢?靠的是 Java 提供给我们的 8 个原子操作:lock
、unlock
、read
、load
、use
、assign
、store
、write
,其操作流程示意图如下:
一个变量从主内存拷贝到工作内存,再从工作内存同步回主内存的流程为:
|主内存| -> read -> load -> |工作内存| -> use -> |Java线程| -> assign -> |工作内存| -> store -> write -> |主内存|
其中,8 个原子操作的详细介绍如下。
Java 内存模型中的 8 个原子操作
lock
:作用于主内存,把一个变量标识为一个线程独占状态。unlock
:作用于主内存,释放一个处于锁定状态的变量。read
:作用于主内存,把一个变量的值从主内存传输到线程工作内存中,供之后的load
操作使用。load
:作用于工作内存,把read
操作从主内存中得到的变量值放入工作内存的变量副本中。use
:作用于工作内存,把工作内存中的一个变量传递给执行引擎,虚拟机遇到使用变量值的字节码指令时会执行。assign
:作用于工作内存,把一个从执行引擎得到的值赋给工作内存的变量,虚拟机遇到给变量赋值的字节码指令时会执行。store
:作用于工作内存,把工作内存中的一个变量传送到主内存中,供之后的write
操作使用。write
:作用于主内存,把store
操作从工作内存中得到的变量值存入主内存的变量中。
8 个原子操作的执行规则
有关变量拷贝过程的规则
- 不允许
read
和load
,store
和write
单独出现 - 不允许线程丢弃它最近的
assign
操作,即工作内存变化之后必须把该变化同步回主内存中 - 不允许一个线程在没有
assign
的情况下将工作内存同步回主内存中,也就是说,只有虚拟机遇到变量赋值的字节码时才会将工作内存同步回主内存 - 新的变量只能从主内存中诞生,即不能在工作内存中使用未被
load
和assign
的变量,一个变量在use
和store
前一定先经过了load
和assign
有关加锁的规则
- 一个变量在同一时刻只允许一个线程对其进行
lock
操作,但是可以被一个线程多次lock
(锁的可重入) - 对一个变量进行
lock
操作会清空这个变量在工作内存中的值,然后在执行引擎使用这个变量时,需要通过assign
或load
重新对这个变量进行初始化 - 对一个变量执行
unlock
前,必须将该变量同步回主内存中,即执行store
和write
操作 - 一个变量没有被
lock
,就不能被unlock
,也不能去unlock
一个被其他线程lock
的变量
可见性问题 -> 有序性问题
通过上图可以发现,Java 线程只能操作自己的工作内存,其对变量的所有操作(读取、赋值等)都必须在工作内存中进行,不能直接读写主内存中的变量。这就有可能会导致可见性问题:
- 因为对于主内存中的变量 A,其在不同的线程的工作内存中可能存在不同的副本 A1、A2、A3。
- 不同线程的
read
和load
、store
和write
不一定是连续执行的,中间可以插入其他命令。Java 只能保证read
和load
、store
和write
的执行对于一个线程而言是连续的,但是并不保证不同线程的read
和load
、store
和write
的执行是连续的,如下图:假设有两个线程 A 和 B,其中线程 A 在写入共享变量,线程 B 要读取共享变量,我们想让线程 A 先完成写入,线程 B 再完成读取。此时即便我们是按照 “线程 A 写入 -> 线程 B 读取” 的顺序开始执行的,真实的执行顺序也可能是这样的:storeA -> readB -> writeA -> loadB
,这将导致线程 B 读取的是变量的旧值,而非线程 A 修改过的新值。也就是说,线程 A 修改变量的执行先于线程 B 操作了,但这个操作对于线程 B 而言依旧是不可见的。
那么如何解决这个问题呢?通过上述的分析可以发现,可见性问题的本身,也是由于不同线程之间的执行顺序得不到保证导致的,因此我们也可以将它的解决和有序性合并,即对 Java 一些指令的操作顺序进行限制,这样既保证了有序性,有解决了可见性。
于是乎,Java 给出了一些命令执行的顺序规范,也就是大名鼎鼎 Happens-Before 规则。
Happens-Before 规则
根据语义,Happens-Before,就是即便是对于不同的线程,前面的操作也应该发生在后面操作的前面,也就是说,Happens-Before 规则保证:前面的操作的结果对后面的操作一定是可见的。
Happens-Before 规则本质上是一种顺序约束规范,用来约束编译器的优化行为。就是说,为了执行效率,我们允许编译器的优化行为,但是为了保证程序运行的正确性,我们要求编译器优化后需要满足 Happens-Before 规则。
根据类别,我们将 Happens-Before 规则分为了以下 4 类:
- 操作的顺序:
- 程序顺序规则: 如果代码中操作 A 在操作 B 之前,那么同一个线程中 A 操作一定在 B 操作前执行,即在本线程内观察,所有操作都是有序的。
- 传递性: 在同一个线程中,如果 A 先于 B ,B 先于 C 那么 A 必然先于 C。
- 锁和 volatile:
- 监视器锁规则: 监视器锁的解锁操作必须在同一个监视器锁的加锁操作前执行。
- volatile 变量规则: 对 volatile 变量的写操作必须在对该变量的读操作前执行,保证时刻读取到这个变量的最新值。
- 线程和中断:
- 线程启动规则:
Thread#start()
方法一定先于该线程中执行的操作。 - 线程结束规则: 线程的所有操作先于线程的终结。
- 中断规则: 假设有线程 A,其他线程 interrupt A 的操作先于检测 A 线程是否中断的操作,即对一个线程的
interrupt()
操作和interrupted()
等检测中断的操作同时发生,那么interrupt()
先执行。
- 线程启动规则:
- 对象生命周期相关:
- 终结器规则: 对象的构造函数执行先于
finalize()
方法。
- 终结器规则: 对象的构造函数执行先于
我第一次看到 Happens-Before 规则是在《Java 并发编程实战》这本书上,看的时候很是懵逼,根本不知道这一溜规则是用来干什么的。现在想来,这其实就像是玩推理游戏一样,一般会给定一些一定成立的前提,然后我们根据这些前提再推理其他的结论。而 Happens-Before 规则就相当于这些给定的前提。
由于并发程序执行的过程中,有太多的不确定性了,使得我们很难推断和分析这些跑在机器上的程序,到底是怎么运行的?而 Happens-Before 规则的作用,就是辅助我们推理程序的实际运行的。
就是说,Java 的开发者已经向我们承诺了,Java 严格准守了 Happens-Before 规则中的每一条,至于具体怎么实现的,我们可以不去深究。作为使用这门语言的应用开发者,我们只需知道这些规则是一定成立的,并且通过这些规则能推断、理解程序的运行结果就可以啦。
volatile 的实现原理
这里比较有趣的是有关 volatile 的规则,volatile 变量有以下两个特点:
- 保证对所有线程的可见性。
- 禁止指令重排序优化。
我之前一直以为,如果一个变量被标记成了 volatile 变量,那么这个变量的值就不会被加载进线程的工作内存中,而是直接在主内存上进行读写。
实际上并不是这样的,因为这样我们需要为 volatile 变量的读写设置一套特殊的规则,这显然是不合适。即使是 volatile 变量,也是从工作内存中读取的,只是它有特殊的操作顺序规定,使得看起来像是直接在主内存中读写。
Happens-Before 规则中要求,对 volatile 变量的写操作必须在对该变量的读操作前执行,这个规则听起来很容易,那实际上是如何实现的呢?解决方法分两步:
- 保证动作发生;
- 保证动作按正确的顺序发生。
1. 保证动作发生
首先,在对 volatile 变量进行读取和写入操作,必须去主内存拉取最新值,或是将最新值更新进主内存,不能只更新进工作内存而不将操作同步进主内存,即在执行 read
、load
、use
、assign
、store
、write
操作时:
use
操作必须与load
、read
操作同时出现,不能只use
,不load
、read
。use
<-load
<-read
assign
操作必须与store
、write
操作同时出现,不能只assign
,不store
、write
。assign
->store
->write
此时,我们已经保证了将变量的最新值时刻同步进主内存的动作发生了,接下来,我们需要保证这个动作,对于不同的线程,满足 volatile 变量的 Happens-Before 规则:对变量的写操作必须在对该变量的读操作前执行。
2. 保证动作按正确的顺序发生
其实,导致这个执行顺序问题的主要原因在于,这个读写 volatile 变量的操作不是一气呵成的,它不是原子的!无论是读还是写,它都分成了 3 个命令(use
<- load
<- read
或 assign
-> store
-> write
),这就导致了,你能保证 assignA
发生在 useB
之前,但你根本不能保证 writeA
也发生在 useB
之前,而如果 writeA
不发生在 useB
之前,主内存中的数据就是旧的,线程 B 就读不到最新值!
所以,我觉得这句话应当换一个理解方式:假设我是一个写操作,你发生在我之前的读操作可以随便执行,各个分解命令先于我还是后于我都无所谓。但是,你发生在我之后的读操作,必须等我把 3 个命令都执行完,才能执行!不许偷偷把一些指令排到我的最后一个指令的前面去。 这才是 “对变量的写操作必须在对该变量的读操作前执行” 的本质。
volatile 的真实实现
那么 Java 是如何利用现有的工具,实现了上述的两个效果的呢?
答案是:它巧妙的利用了 lock
操作的特点,通过 观察对 volatile 变量的赋值操作的反编译代码,我们发现,在执行了变量赋值操作之后,额外加了一行:
lock addl $0x0,(%esp)
这一句的意思是:给 ESP 寄存器 +0
,这是一个无意义的空操作,重点在 lock
上:
- 保证动作发生:
lock
指令会将当前 CPU 的 Cache 写入内存,并无效化其他 CPU 的 Cache,相当于在执行了assign
后,又进行了store
->write
;- 这使得其他 CPU 可以立即看见 volatile 变量的修改,因为其他 CPU 在读取 volatile 变量时,会发现自己的缓存过期了,于是会去主内存中拉取最新的 volatile 变量值,也就被迫在
use
前进行一次read
->load
。
- 保证动作顺序:
lock
的存在相当于一个内存屏障,使得在重排序时,不能把后面的指令排在内存屏障之前。
这个实现是不是十分的巧妙呀~
01-Java线程-使用篇
如何在 Java 中使用多线程
继承 Thread 类
// 自定义线程对象
class MyThread extends Thread {
public void run() {
// 线程需要执行的代码
}
}
// 创建线程对象并启动线程
MyThread myThread = new MyThread();
myThread.start();
Java 的 Thread 类中,所有关键方法都是 native 的,说明这些方法无法使用平台无关的手段实现。
实现 Runnable 接口
// 实现 Runnable 接口
class Runner implements Runnable {
@Override
public void run() {
// 线程需要执行的代码
}
}
// 创建线程对象并启动线程
Thread thread = new Thread(new Runner());
thread.start();
实现 Callable 接口
Callable 接口的使用需要搭配线程池
线程数配置原则
一般我们使用多线程是为了提高性能,这里的性能一般指的是 延迟 和 吞吐量,我们的目标是降低延迟,提高吞吐量。
- 延迟: 发出请求到收到响应这个过程的时间。
- 吞吐量: 单位时间内能处理请求的数量。
为了降低延迟,提高吞吐量,一般有以下两种常用手段:
- 优化算法。
- 将硬件的性能发挥到极致。
在并发编程领域,提升性能本质上就是提升硬件的利用率,尽可能的将硬件的能力压榨到极致。也就是说,我们的目标是让 CPU 时刻保持着 100% 的利用率,一刻也不停歇的工作着!
然而,线程也不是越多越好的,当一个 CPU 上同时又多个线程运行时,我们所看到的多个线程并行运行其实是一种伪并行,在同一时刻,真正运行的线程其实只有一个,只不过 CPU 在多个线程的运行之间不停的切换,让我们看起来好像是这些个线程在同时运行罢了。然而,线程运行的切换不是没有代价的,每次切换时,我们首先需要保存当前线程的上下文,然后再将下一个线程的上下文设置好。这个过程也是要消耗 CPU 时间的,如果 CPU 将大量的时间都花在了切换线程上,而非执行线程的任务上,那就得不偿失了。
在线程切换中,上下文一般指 CPU 寄存器和程序计数器中的内容。
那么我们应当创建多少线程合适呢?这要视线程执行的任务类型而定了。
一般我们的任务有以下两种类型:CPU 密集型的任务 和 I/O 密集型的任务,并且它们之间有着本质的区别:
-
CPU 密集型的任务:
最佳线程数 = CPU 核数 + 1
- 大多数时间里,只要在运行就有产出;
- 因此希望一个任务一直运行到底再运行下一个,而不是将时间耗费到线程的切换上(即上下文切换)。
- 后面的 “+1” 是为了一旦线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,以保证 CPU 的利用率。
-
I/O 密集型的任务:
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
当然,以上对于线程数量配置的公式只是一个参考,Java 的老版本对于 Docker 容器的支持还不是那么的好,比如有时程序员在 Docker 容器中直接使用 Runtime.getRuntime().availableProcessors() * 2
配置线程池大小,对于早期版本的 JVM,这个 Runtime.getRuntime().availableProcessors()
会忽略 cgroup 的限制,返回实际物理机的 CPU 数,而当同一物理机上的有好多容器都进行了这样的线程池大小设置操作时,有一下子开启好多线程的风险,可能会导致物理机的崩溃,这种场景我们需要额外注意!
线程间的通信方式
选择通信
-
synchronized 和 volatile 关键字
- 这两个关键字可以保障线程对变量访问的可见性
-
等待/通知机制
- 详见
Ch3-Java并发高级主题/00-Java中的锁.md
- 详见
-
Thread#join()
- 如果一个线程 A 执行了
threadA.join()
,那么只有当线程 A 执行完之后,threadA.join()
之后的语句才会继续执行,类似于创建 A 的线程要等待 A 执行完后才继续执行; - 使用 join 方法中线程被中断的效果 == 使用 wait 方法中线程被中断的效果,即会抛出 InterruptedException。因为 join 方法内部就是用 wait 方法实现的;
- join 还有一个带参数的方法:
join(long)
,这个方法是等待传入的参数的毫秒数,如果计时过程中等待的方法执行完了,就接着往下执行,如果计时结束等待的方法还没有执行完,就不再继续等待,而是往下执行。join(long)
和sleep(long)
的区别- 如果等待的方法提前结束,
join(long)
不会再计时了,而是往下执行,而sleep(long)
一定要等待够足够的毫秒数; join(long)
会释放锁,sleep(long)
不会释放锁,原因是join(long)
方法内部是用wait(long)
方法实现的。
- 如果等待的方法提前结束,
- 如果一个线程 A 执行了
-
管道流:
PipedInputStream
&PipedOutputStream
public class PipedStreamDemo { public static PipedInputStream in = new PipedInputStream(); public static PipedOutputStream out = new PipedOutputStream(); public static void send() { new Thread() { @Override public void run() { byte[] bytes = new byte[2000]; while (true) { try { out.write(bytes, 0, 2000); System.out.println("Send Success"); } catch (IOException e) { System.out.println("Send Failed"); e.printStackTrace(); } } } }.start(); } public static void receive() { new Thread() { @Override public void run() { byte[] bytes = new byte[100]; int len = 0; while (true) { try { len = in.read(bytes, 0, 100); System.out.println("len = " + len); } catch (IOException e) { System.out.println("Receive Failed"); e.printStackTrace(); } } } }.start(); } public static void main(String[] args) { try { in.connect(out); } catch (IOException e) { e.printStackTrace(); } receive(); send(); } }
选择不通信
有时,你也可以选择不通信,将变量封闭在线程内部,使用 ThreadLocal 可以实现这一效果。
02-Java线程-原理篇
线程的调度
-
协同式线程调度:
线程的执行时间由线程本身来控制,线程执行完自己的任务之后,主动通知系统切换到另一个线程。
- 优点: 实现简单,没有线程同步的问题。
- 缺点: 线程执行时间不可控,如果一个线程编写有问题一直无法结束,程序会一直阻塞在那里。
-
抢占式线程调度:
每个线程由系统分配执行时间,系统决定切不切换线程。
- Java 使用的线程调度方式就是这种!(鬼知道你会把程序写成什么样子,所以还是安全第一)
线程的实现原理
三种线程的实现方式
使用内核线程实现
内核线程(KLT),就是直接由操作系统支持的线程,不过当然不是我们的程序可以直接去操作操作系统的进程(那样实在是太危险了),而是程序可以通过调用内核线程的一种高级接口 —— 轻量级进程(LWP),来操作内核进程。也就是说,LWP 和 KLT 之间是 1:1 的关系,因此我们也称这种模型为一对一的线程模型。
这类似于一种代理模式,LWP 就是代理对象,而 KLT 则是被代理对象,我们把任务请求发给代理人 LWP,然后 LWP 会通过调用真实具备执行任务能力的被代理人 KLT 去执行任务。
优点和缺点
- 优点:
- 每个 LWP 都是一个独立的调度单元,即便有一个 LWP 在调用过程中阻塞了,也不会影响整个进程继续工作,系统的稳定性会比较好。
- 线程的调度和各种操作都委托给了操作系统,所以实现上比较简单。
- 缺点:
- 各种线程操作(创建、析构、同步等)都需要进行系统调用,而系统调用的代价较高,需要在用户态和内核态中来回切换,这会消耗掉一些时间。
- 每个 LWP 都需要一个 KLT 支持,也就是说,每个 LWP 都会消耗掉一部分内核资源(内核线程和栈空间),因此系统可以支持的 LWP 数量是有限的。
在早期的 Linux 系统里,是没有线程支持的,也就是说,操作系统的资源分配单元是进程,执行单元也是进程(不像现在的执行单元是线程),相当于每个进程都是单线程的。每个进程还是像现在这样,有自己的内存、文件描述符和 I/O 端口等,其他进程不能访问你的,同时你也不能访问其他进程的。然后每个进程在 CPU 上分时执行。
后来,人们发现并发程序的好了,觉得以进程为执行单元有点太繁琐了,每次切换执行单元需要切换的上下文太多了,于是就引入了线程的实现。
其实线程的概念很在就有了,不过在操作系统中大规模使用要比概念的提出晚了 20 年左右。
使用用户线程实现
狭义上,用户线程(UT)指的是完全建立在用户空间的线程,即操作系统是感知不到线程的存在的,它只知道那个掌管着这些 UT 的进程 P。因此,进程和 UT 之间的比例为 1:N。
优点和缺点
- 优点:
- UT 的创建、同步、销毁、调度都是在用户态完成的,完全不需要切换到内核态,因此各种线程操作可以是非常快速和低消耗的。
- 由于进程和 UT 之间的比例为 1:N,所以可以支持更大规模的 UT 数量。
- 缺点:
- 由于没有系统内核的支持,所以所有的线程操作都需要自己实现,这就使得 UT 的实现程序一般都比较复杂,而且事实证明,我们很难实现的比操作系统好。
- 现在使用 UT 的程序越来越少了,Java 和 Ruby 等语言都曾使用过 UT ,最后都放弃了...
- 由于没有系统内核的支持,所以所有的线程操作都需要自己实现,这就使得 UT 的实现程序一般都比较复杂,而且事实证明,我们很难实现的比操作系统好。
使用用户线程加轻量级进程
这种模式下,既存在用户线程,也存在轻量级线程。在这种模型下:
- UT 还是只存在于用户空间,因此线程的创建、同步、销毁的消耗依旧很小,同时也可以支持很多线程并发。
- 对于线程的调度,则通过 LWP 作为 UT 和 KLT 之间的桥梁,这样就可以使用操作系统提供的线程调度功能和处理器映射了。
- UT 的系统调用要通过 LWP 完成,大大降低了整个进程被完全阻塞的风险。
- UT 和 LWP 之间的比例是不确定的,即为 N:M 的关系。
一些 UNIX 操作系统提供了 N:M 线程模型的实现,如 Solaris、HP-UX,不过一般并不主流。
Java 线程的实现
根据上一节的描述,是不是看起来后两种实现方法从原理上讲更好、更高级、更牛逼的样子,不过实际情况可能和我们想的不太一样,复杂的实现往往会带来更多的问题(这是因为,想要体现复杂方法的优势,首先需要它能被完美的执行,而由于其复杂性,如何让它完美的执行这个事情本身,就是一个问题),所以有时候,往往是那些效率并没有被发挥到极致的简单方法,更受大家青睐。
JDK 1.2 之前,Java 线程是基于名为 Green Thread 的用户线程实现的,JDK 1.2 之后,被替换为基于操作系统原生线程模型来实现。对于目前的 JDK 版本,这将取决于操作系统支持怎样的线程模型,虚拟机规范中并没有规定 Java 线程必须使用哪种线程模型来实现。
对于 Sun JDK,它的 Windows 和 Linux 平台使用的都是 1:1 的线程模型,即第一种线程实现方式,每条 Java 线程都会被映射进一条轻量级进程中,因为 Windows 和 Linux 系统提供的线程模型就是一对一的。
看起来还是简单的实现更易掌控,更受欢迎呀!这也很符合 Unix 小而美的哲学呀~
03-Java线程-概念篇
线程的生命周期状态
想要了解线程,我们可以先从它的程(人)生轨迹入手,也就是它的生命周期,线程是操作系统中的一个重要概念,而 Java 线程本质上只是对操作系统的线程进行了封装, 所以我们先来看看通用的线程生命周期是什么样的,再来看看 Java 都对其进行了怎样的封装。
通用的线程生命周期
首先,通用的线程生命周期模型将线程的状态分为了以下五种:
- 初始状态:
- 线程仅仅在编程语言层面被创建,在操作系统中并没有被创建,因此还不能被分配 CPU 资源;
- 相当于现在只是在 Java 中 new 了个
Thread
对象,还没调用start()
方法。
- 可运行状态:
- 真正的操作系统线程此时已经成功被创建,线程已经可以被分配 CPU 资源了。
- 运行状态:
- 当有空闲的 CPU 资源时,操作系统会将其分配给一个处于可运行状态的线程,可运行状态的线程一旦被分配的 CPU,它的状态将变为运行状态。
- 休眠状态:
- 运行状态的线程如果调用了某个阻塞式 API(如以阻塞方式读文件),那么这个线程将变为休眠状态,并放弃自己的 CPU 使用权;
- 当它的阻塞状态结束了,它的状态会变为可运行状态,等待再次被分配 CPU 资源。
- 终止状态:
- 当线程执行完或出现异常时,它就会进入终止状态,这是一个终态(只进不出的饕餮状态),就是挂了。
它们之间的状态流转关系为:
Java 中线程的生命周期
对于 Java 线程而言,它对操作系统的线程状态划分略有不满。
首先,它觉得可运行状态和运行状态还搞两个状态有点麻烦,这两个状态只对操作系统调度层面有用,而 Java 秉承不重复造轮子的原则,将线程的调度交给操作系统处理了。所以,它把这两个状态合并成一个状态,即 RUNNABLE。
接下来,它又嫌弃休眠状态的分类太过笼统,所以它又细化了休眠状态,根据休眠的原因,将其细分为了:BLOCKED、WAITING、TIME_WAITING 状态。
最后,Java 将线程的生命周期状态改成了如下形式:
接下来,我们将这些状态之间的流转规则进行探究,主要有以下三条链路:
RUNNABLE -> BLOCKED/WAITING/TIME_WAITING
NEW -> RUNNABLE
RUNNABLE -> TERMINATED
线程的生命周期状态转换
可运行/运行状态 -> 休眠状态
RUNNABLE -> BLOCKED
:- 线程等待 synchronized 的隐式锁时,触发该状态转换。
RUNNABLE -> WAITING
:- 有三种场景会触发这种转换:
- 已获取 synchronized 隐式锁的线程,调用无参数的
Object.wait()
方法。 - 调用无参数的
Thread.join()
方法。- 因为
Thread.join()
其实是通过调用线程对象本身的wait(0)
方法实现的。
- 因为
- 调用
LockSupport.park()
方法。
- 已获取 synchronized 隐式锁的线程,调用无参数的
- 有三种场景会触发这种转换:
RUNNABLE -> TIMED_WAITING
:- 有五种场景会触发这种转换:
- 调用带超时参数的
Thread.sleep(long millis)
方法; - 获得 synchronized 隐式锁的线程,调用带超时参数的
Object.wait(long timeout)
方法; - 调用带超时参数的
Thread.join(long millis)
方法; - 调用带超时参数的
LockSupport.parkNanos(Object blocker, long deadline)
方法; - 调用带超时参数的
LockSupport.parkUntil(long deadline)
方法。
- 调用带超时参数的
- 有五种场景会触发这种转换:
LockSupport 对象说明:
Java 并发包中的锁,都是基于该对象实现的,其使用方法如下:
- 调用
LockSupport.park()
方法,当前线程会阻塞,线程的状态会从 RUNNABLE 转换到 WAITING。
初始状态 -> 可运行/运行状态
Java 刚创建出来的 Thread thread
对象就是 NEW 状态,调用了 thread.start()
方法后,线程就进入了 RUNNABLE 状态。
可运行/运行状态 -> 中止状态
- 线程执行完
run()
方法后,会自动转换到 TERMINATED 状态。 - 线程在执行
run()
方法的时,有异常抛出,线程也会被终止。
如何强制中断
run()
方法的执行?当
run()
方法中调用了一个耗时很长的方法时,我们等的不耐烦了,此时我们需要强制中断run()
方法的执行。在 Java 的 Thread 中,倒是给我们提供了一个
stop()
方法,不过该方法已经被标记为@Deprecated
的了,官方已经不推荐我们使用它了。不推荐它的原因是它太危险了,
stop()
方法不会给线程任何处理后事的机会,直接就杀掉线程,如果此时线程正好持有 ReentrantLock 锁,它被干掉后会导致这个锁永远不会被释放,这实在是太危险了。一种比较优雅的取消线程的方式是 中断,即调用
interrupt()
方法,这个方法并没做什么实质上的事情,它相当于只是给线程打上了一个标记,而后我们可以通过一些手段(如调用Thread.interrupted()
方法)来检测当前线程是否被打上了中断标记,来决定如何终止线程。if (Thread.interrupted()) { // Clears interrupted status! // do something like lock.unlock() throw new InterruptedException(); }
Ch1-保证线程安全的两个角度
想要保证并发访问的安全性,我们可以从以下两个不同的角度进行思考:
- 访问状态变量时使用同步,变 “多线程” 为 “单线程”,即避免多个线程在同一时刻访问相同数据;
- 确保被访问的对象是线程安全的;
除了以上两种方式以外,还有一种方式是不在线程之间共享任何变量,我们可以使用无状态变量,它有如下特点:
- 不包含任何域;
- 不包含任何其他类中域的引用;
- 计算过程中的临时状态仅保存在线程的操作数栈上,不会被保存在堆中。
注:Servlet 就是典型的无状态变量,因为请求的处理要保证并发性。
线程安全性主要解决了如何避免多个线程在同一时刻访问同一个数据的问题,它主要通过加锁的方式,使得多个线程排成一队,一个一个的访问数据,也是由于这个原因,通过这种方式保证线程安全会对应用的性能产生影响。
01-线程的安全性
加锁:synchronized
synchronized 是互斥锁,也就是说,在同一时刻,它只允许一个线程拿到锁对象,它有如下两种使用方法:
使用方法
对于存在线程安全问题的代码,我们可以用 synchronized 修饰它,然后无论有多少个线程同时到达这段代码,线程们都只能排成一列,一个一个的去执行它,就像地道战中的 “关口”,只能容一个人爬过去。
synchronized 有两种修饰代码块的方式:修饰代码块和修饰方法。
修饰代码块:
synchronized (lock对象) {
// 同步代码块
}
修饰方法:
采用修饰方法的方式使用 synchronized,不意味着就不需要指定锁对象了,只不过 Java 为我们隐式指定了。
修饰普通方法:(锁是调用方法的对象)
synchronized public void getValue() {...}
// 相当于:
class X {
synchronized(this) public void getValue() {...}
}
修饰静态方法:(锁是该方法所在的 Class 对象)
synchronized public static void getValue() {...}
// 相当于:
class X {
synchronized(X.class) public void getValue() {...}
}
synchronized 锁是可重入的
拿到锁的线程可以再次拿到锁,这意味着获取锁的操作粒度是“线程”。
可重入锁的一种实现方式:
- 为每个锁关联一个获取该锁的次数的计数器 count,和一个所有者线程;
- count = 0 时,说明没有线程持有该锁;
- 当一个线程获取一个未被持有的锁时,JVM 记下锁的持有者,并 count = 1;
- 当这个线程再次获取锁时,count++;
- 当线程退出同步代码块时,count--;
- 当 count 再次减为 0 时,锁被释放。
如何减小 synchronized 对应用性能的影响
- 将不影响共享状态且执行时间较长的操作放在同步代码块外面,尽量让同步代码块中放的是一些执行时间比较短的操作,让持有锁的时间尽可能短。
- 执行时间较长的计算或者可能无法快速完成的操作时(如:网络I/O或控制台I/O操作),一定不要持有锁!
synchronized 的原理
简单解释(通过 Happens-Before)
Happens-Before 中有一条关于锁的规则:监视器锁的解锁操作必须在同一个监视器锁的加锁操作前执行。我们知道,Java 中的 Happens-Before 规则表达的是:前面一个操作的结果对后续操作是可见的。这条规则说明,前一个线程的解锁操作对后一个线程的加锁操作是可见的,即前一个线程在临界区修改的共享变量,对后续进入临界区的线程是可见的。
这里的 “临界区” 表示被 synchronized 修饰的代码块。
真正的原理
JVM 是基于进入和退出 Monitor 对象来实现方法同步和代码块同步的。代码块的同步是通过monitorenter
和monitorexit
实现的,方法同步使用的是另一种方式,细节在JVM规范中并没有详细说明。
monitorenter
会被插入到同步代码块开始的位置,而monitorexit
会被插入到方法结束的位置或者异常处(并不是同步代码块结束的位置),JVM保证每个monitorenter
都会有一个monitorexit
与之对应。
当一个线程执行到monitorenter
指令时,会尝试获取对象对应的 monitor 的所有权,任何对象都有一个 monitor 与之关联,当一个 monitor 被持有后,该对象所保护的区域将处于锁定状态,因为其他线程这时不能持有 monitor。
那么接下来问题来了,锁对象到底被存在哪里呢?synchronized 用的锁是存在 Java 对象的对象头中的,我们先来介绍一下对象头是什么。
对象头
Java 的对象头主要包含几部分:
长度 | 内容 | 说明 |
---|---|---|
32/64 bit | Mark Word | 存储对象的hashCode和锁信息 |
32/64 bit | Class Metadata Address | 存储对象类型数据的指针 |
32 bit | Array length | 数组的长度(数组对象才有) |
Monitor Record
Monitor Record是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor record关联(对象头的 MarkWord中的 LockWord 指向 monitor record 的起始地址),同时 monitor record 中有一个 Owner 字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。
如下为 Monitor Record 的内部结构:
monitor record 元素 | 说明 |
---|---|
Owner | 初始时为 null,表示当前没有任何线程拥有该 monitor,当线程成功拥有该锁后保存线程唯一标识,当锁被释放时又设置为 null |
EntryQ | 阻塞所有试图锁住 monitor record 失败的线程 |
RcThis | 表示 blocked 或 waiting 在该 monitor record 上的所有线程的个数 |
Nest | 实现重入锁的计数 |
HashCode | 保存从对象头拷贝过来的 HashCode 值 |
Candidate | 0 表示没有需要唤醒的线程,1 表示要唤醒一个继任线程来竞争锁 |
锁优化
Synchronized是通过对象内部的一个叫做监视器锁(monitor)来实现的,监视器锁本质又是依赖于底层的操作系统的Mutex Lock(互斥锁)来实现的。而操作系统实现线程之间的切换需要从用户态转换到核心态,这个成本非常高,状态之间的转换需要相对比较长的时间,这就是为什么Synchronized效率低的原因。因此,这种依赖于操作系统Mutex Lock所实现的锁我们称之为“重量级锁”。
Java SE 1.6为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”:锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态。锁可以升级但不能降级。
偏向锁
引入背景: 大多数情况下,锁不仅不存在竞争,而且总是由同一个线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。
-
偏向锁的获取:
判断是否为可偏向状态
- 如果为可偏向状态,则判断线程ID是否是当前线程,如果是进入同步块;
- 如果线程ID并未指向当前线程,利用CAS操作竞争锁,如果竞争成功,将Mark Word中线程ID更新为当前线程ID,进入同步块
- 如果竞争失败,等待全局安全点,准备撤销偏向锁,根据线程是否处于活动状态,决定是转换为无锁状态还是升级为轻量级锁。
- 当锁对象第一次被线程获取的时候,虚拟机会把对象头中的标志位设置为“01”,即偏向模式。同时使用CAS操作把获取到这个锁的线程ID记录在对象的Mark Word中,如果CAS操作成功。持有偏向锁的线程以后每次进入这个锁相关的同步块时,虚拟机都可以不再进行任何同步操作。
-
偏向锁的释放:
偏向锁使用了遇到竞争才释放锁的机制。偏向锁的撤销需要等待全局安全点,然后它会首先暂停拥有偏向锁的线程,然后判断线程是否还活着,如果线程还活着,则升级为轻量级锁,否则,将锁设置为无锁状态。
轻量级锁
这种锁实现的背后基于这样一种假设,即在真实的情况下我们程序中的大部分同步代码一般都处于无锁竞争状态(即单线程执行环境),在无锁竞争的情况下完全可以避免调用操作系统层面的重量级互斥锁,取而代之的是在monitorenter和monitorexit中只需要依靠一条CAS原子指令就可以完成锁的获取及释放。当存在锁竞争的情况下,执行CAS指令失败的线程将调用操作系统互斥锁进入到阻塞状态,当锁被释放的时候被唤醒
-
加锁过程:
- 在代码进入同步块的时候,如果此对象没有被锁定(锁标志位为“01”状态),虚拟机首先在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储对象目前Mark Word的拷贝(官方把这份拷贝加了一个Displaced前缀,即Displaced Mark Word)。然后虚拟机使用CAS操作尝试将对象的Mark Word更新为指向锁记录(Lock Record)的指针。如果更新成功,那么这个线程就拥有了该对象的锁,并且对象的Mark Word标志位转变为“00”,即表示此对象处于轻量级锁定状态;如果更新失败,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块中执行,否则说明这个锁对象已经被其他线程占有了。如果有两条以上的线程竞争同一个锁,那轻量级锁不再有效,要膨胀为重量级锁,锁标志变为“10”,Mark Word中存储的就是指向重量级锁的指针,而后面等待的线程也要进入阻塞状态。
-
解锁过程:
- 如果对象的Mark Word仍然指向线程的锁记录,那就用CAS操作将对象当前的Mark Word与线程栈帧中的Displaced Mark Word交换回来,如果替换成功,整个同步过程就完成了。如果替换失败,说明有其他线程尝试过获取该锁,那就要在释放锁的同时,唤醒被挂起的线程。
- 如果没有竞争,轻量级锁使用CAS操作避免了使用互斥量的开销,但如果存在竞争,除了互斥量的开销外,还额外发生了CAS操作,因此在有竞争的情况下,轻量级锁比传统重量级锁开销更大。
自旋锁
互斥同步对性能影响最大的是阻塞的实现,挂起线程和恢复线程的操作都需要转入到内核态中完成,这些操作给系统的并发性能带来很大的压力。
于是在阻塞之前,我们让线程执行一个忙循环(自旋),看看持有锁的线程是否释放锁,如果很快释放锁,则没有必要进行阻塞。
锁消除
锁消除是指虚拟机即时编译器(JIT)在运行时,对一些代码上要求同步,但是检测到不可能发生数据竞争的锁进行消除。
锁粗化
如果虚拟机检测到有这样一串零碎的操作都对同一个对象加锁,将会把加锁同步的范围扩展(粗化)到整个操作序列的外部。
参考
- https://blog.csdn.net/u012465296/article/details/53022317
- https://blog.csdn.net/chenssy/article/details/54883355
02-对象的安全共享
这一节主要研究如何安全的让对象可以被多个线程访问。
可见性
概述
- **定义:**当一条线程修改了共享变量的值,其他线程可以立即得知这个修改。
- **实现方式:**在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值的方式实现,依赖主内存作为传输媒质。
- 可以保证可见性的关键字:
- volatile:通过 volatile 的特殊规则;
- synchronized:通过“对一个变量执行unlock操作前,必须将该变量同步回主内存”这条规则;
- final:被 final 修饰的字段,一旦完成了初始化,其他线程就能看到它,并且它也不会再变了;
- 只要不可变对象被正确的构建出来(没有发生 this 引用溢出),它就是线程安全的。
失去可见性的危害
-
用来作为状态判断的量被其他线程修改了,运行的那个线程就是看不见……
public class NoVisibility { private static boolean ready; private static int number; private static class ReaderThread extends Thread { @Override public void run() { while (!ready) { Thread.yield(); // 放弃当前CPU资源 } System.out.println(number); } } public static void main(String[] args) { new ReaderThread().start(); number = 42; ready = true; } } /* 这段代码可能有一下三种输出: 1. 正确输出42 2. 持续循环下去,如ReaderThread放弃当前CPU资源后,立即再次抢到CPU资源 3. 输出0,因为在没有同步的情况下,Java编译器,处理器及运行时会对操作顺序进行重排序, 所以number = 42;和ready = true;这两句的执行顺序可能会互换, 导致ready为true时,number还没被赋值为42 */
-
非原子的64位操作:读到的数高位已经被改了,低位还没来得及改
对抗可见性问题的法宝:volatile!
volatile 变量是用来确保将变量的更新操作通知给其他线程的,即**在读取 volatile 变量时,总会返回最新写入的值!**是一种比synchronized 更轻量级的同步机制。
特点
- 该变量保证对所有线程的可见性
- 禁止指令重排序优化
两个特点的实现原理!
- 在 volatile 变量的赋值操作的反编译代码中,在执行了赋值操作之后加了一行:
lock addl $0x0,(%esp)
- 这一句的意思是:给 ESP 寄存器 +0,是一个空操作,重点在 lock 上
- 首先 lock 的存在相当于一个内存屏障,使得重排序时,不能把后面的指令排在内存屏障之前
- 同时,lock 指令会将当前 CPU 的 Cache 写入内存,并无效化其他 CPU 的 Cache,相当于对 Cache 中的变量做了一次 store -> write 操作
- 这使得其他 CPU 可以立即看见 volatile 变量的修改,因为其他 CPU 在读取 volatile 变量前会先从主内存中读取 volatile 变量,即进行一次 read -> load 操作
Java 内存模型中对 volatile 变量定义的特殊规则
在对 volatile 变量执行 read、load、use、assign、store、write操作时:
- use 操作必须与 load、read 操作同时出现
- use <- load <- read
- assign 操作必须与 store、write 操作同时出现
- assign -> store -> write
- 同一个线程进行如下两套动作,可以保证:如果 A 先于 B 执行,那么 P 先于 Q 执行
- 第一套:A (use/assign) -> F (load/store) -> P (read/write)
- 第二套:B (use/assign) -> G (load/store) -> Q (read/write)
与 synchronized 的区别
- synchronized:既保证可见性,又保证原子性
- volatile:只保证可见性(所以count++原子性无法保证, 因为volatile只保证赋值时的三个原子操作是整体, 但是这个语句包括了count+1 和count被赋值两个操作, 在多线程环境中可能被打乱)
应用场景:
- 常见的应用:
- 某个操作完成的标志
- 发生中断的标志
- volatile变量不适用的场景:
- 运算结果依赖于该变量当前的值,比如
i++
- 运算结果依赖于其他状态变量
- 运算结果依赖于该变量当前的值,比如
通过确保状态不被发布来保证安全性
发布与溢出
发布
使对象能在当前作用域之外使用。
发布方法:
- 最简单的发布方法:
public static
- 将指向该对象的引用保存到其他代码可以访问的地方
- 在某个非私有的方法中返回该引用
- 将引用传递到其他类的方法中
溢出
发布了不该发布的对象。
一个简单的溢出过程
class UnsafeStates {
private String[] states = new String[] {"AB", "CD"};
public String[] getStates() {
return states; // 可以通过这个方法得到states,然后就可以随便修改states,就逸出了
}
}
this 引用溢出!
- 产生原因
- 在一个对象的构造方法中启动了一个线程,并在这个线程的 public 方法中调用了这个对象的方法,相当于将还没构造好的对象的 this 实例泄露了。
- 解决方法
- 私有化构造方法,只在构造方法中写新线程的代码但不 start,然后写一个工厂方法 newInstance 来创建实例,在工厂方法中先调用构造函数创建实例,再启动线程,这样就不会把一个还没有构造好的对象发布出去了。
我的理解:我们需要保证别的线程拿到我们的对象时,要是一个完整的执行完构造函数的对象,不能是一个构造函数执行了一半的对象。
线程封闭
栈封闭
使用局部变量,并保证这个局部变量不溢出。
ThreadLocal 类
类似于对应线程的全局变量,但是每一个线程维护一个自己的该变量对应值。
ThreadLocal 实现原理
- 每一个 ThreadLocal 都有一个唯一的的 ThreadLocalHashCode;
- 每一个线程中有一个专门保存这个 HashCode 的
Map<ThreadLocalHashCode, 对应变量的值>
; - 当
ThreadLocal#get()
时,实际上是当前线程先拿到这个 ThreadLocal 对象的 ThreadLocalHashCode,然后通过这个 ThreadLocalHashCode 去自己内部的 Map 中去取值。- 即每个线程对应的变量不是存储在 ThreadLocal 对象中的,而是存在当前线程对象中的,线程自己保管封存在自己内部的变量,达到线程封闭的目的。
- 也就是说,ThreadLocal 对象并不负责保存数据,它只是一个访问入口。
不可变对象
定义
- 对象创建后,其状态不能被修改
- 对象是正确创建的(无 this 引用逸出)
使用方法
- 因为对象是不可变的,所以多个线程可以放心大胆的同时访问
- 当这个对象中的状态需要被改变时,之间废掉当前的对象,new 一个新对象代替现在的旧对象
final 域!
final 域是我们用来构造不可变对象的一个利器,因为被它修饰的域一旦被初始化后,就是不可修改的了,不过这有一个前提,就是如果 final 修饰的是一个对象引用,必须保证这个对象也是不可变对象才行,否则 final 只能保证这个引用一直指向一个固定的对象,但这个对象自己的状态是可以改变的,所以一个所有域都是 final 的对象也不一定是不可变对象。
2 种初始化方式
final i = 42;
final i; // 之后在每一个构造函数中给i赋值
注意:对于含有 final 域的对象,JVM 必须保证对象的初始引用在构造函数之后执行,不能乱序执行(也就是说,一旦得到了对象的引用,那么这个对象的 final 域一定是已经完成了初始化的)!
安全发布对象
保证发布的对象的初始化构造过程不会受到任何其他线程干扰,就像加了锁一样,被创建它的线程构造好了,在发布给其他线程。
常用发布模式
- 在静态块中初始化一个对象引用
- 将对象引用保存到 volatile 类型的域或者 AtomicReference 对象中
- 将对象引用保存到某个正确构造的对象的 final 域中
- 将对象引用保存到一个被锁保护的域中
最简单和安全的发布方式
public static Holder holder = new Holder(42);
可以保证安全的原因:静态变量的赋值操作在加载类的初始化阶段完成,包含在<clinit>()
方法的执行过程中,因此这个过程受到 JVM 内部的的同步机制保护,可以用来安全发布对象。
Java 提供的可以安全发布对象的容器
- Map
- HashTable
- ConcurrentMap
- Collections.SynchronizedMap
- 使用
Collections.synchronizedMap(Map<K,V> m)
获得 - 所有方法都被 synchronized 修饰
- 使用
- List
- Vector
- CopyOnWriteArrayList
- CopyOnWriteSet
- Collections.SynchronizedSet
- 使用
Collections.synchronizedSet(Set<T> s)
获得 - 所有方法都被 synchronized 修饰
- 使用
- Queue
- BlockQueue
- ConcurrentLinkedQueue
Ch2-构造安全的并发应用程序
想要设计线程安全的模块是需要考虑许多情况的,我们一般先设计出一些比较小的,线程安全的模块,然后通过一些组合模式,将现有的线程安全组件组合为规模更大的组件或者程序,我们将在 “如何构造线程安全类” 这一节介绍这些模式。
除此之外,我们还可以通过使用 Java 类库中现有的并发基础构建模块构建线程安全的应用程序,我们将在 “常用的并发基础构建模块” 中对其进行介绍。
最后,我们将介绍最省心的使用线程的方式,就是使用线程池。
01-如何构造线程安全类
实例封闭
通过封装简化线程安全类的实现过程,因为封装之后,能够访问被封装对象的所有代码路径都是已知的,即通过封装限制对象被访问的方式。
在 Java 类库中的应用
ArrayList -> Collections.SynchronizedList
HashMap -> Collections.SynchronizedMap
这些类把线程不安全类封装到自己内部,然后将所有的接口方法都实现为同步方法(加上synchronized关键字修饰),并将调用请求转发到底层容器上(就是调用它封装进去的线程不安全类的相应方法),相当于给线程不安全类所有暴露在外的线程不安全方法都加上了synchronized修饰,是装饰者模式的一种应用。
Java 监视器模式
就是把类中所有能访问对象可变状态的方法都加上 synchronized 修饰(简单粗暴),虽然简单,但是一旦被加锁的方法是一个费时操作,会影响应用程序的性能甚至出现错误。以下是一个 Java 监视器模式的典型例子:
/* Java监视器模式的典型例子 */
public class Counter {
private long value = 0;
public synchronized long getValue() {
return value;
}
public synchronized long increment() {
if (value == Long.MAX_VALUE) {
throw new IllegalStateException("counter overflow");
}
return ++value;
}
}
线程安全的委托
示例:构建线程安全的车辆追踪器及优化
接下来,我们将使用下面这个车辆追踪器的示例对本节内容进行讲解,下面的这个只是一个初级版,在之后讲解了新的方法之后,我们会在这个初级的版本上得到两种进化版本。
首先,我们有一个线程不安全的 Point 类,用来表示车辆的坐标。
public class MutablePoint {
public int x, y;
public MutablePoint() {
x = 0;
y = 0;
}
public MutablePoint(MutablePoint p) {
this(p.x, p.y);
}
public MutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
}
先使用 Java 监视器模式,即简单粗暴的在所有会改变 MonitorVehicleTracker 的 locations 域的方法上都加上 synchronized 修饰,来达到线程安全的目的。
public class MonitorVehicleTracker {
private final Map<String, MutablePoint> locations;
public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
this.locations = deepCopy(locations);
}
public synchronized Map<String, MutablePoint> getLocations() {
// 当locations比较大时,这步是一个耗时操作,会长时间占用锁
// 会出现车辆位置已变,但返回信息保持不变的错误
return deepCopy(locations);
}
public synchronized MutablePoint getLocation(String id) {
MutablePoint loc = locations.get(id);
return loc == null ? null : new MutablePoint(loc);
}
public synchronized void setLocation(String id, int x, int y) throws IllegalAccessException {
MutablePoint loc = locations.get(id);
if (loc == null) {
throw new IllegalAccessException("No such ID: " + id);
}
loc.x = x;
loc.y = y;
}
// 当locations.size()比较大时,这个方法将会是一个十分费时的操作
public static Map<String, MutablePoint> deepCopy(Map<String, MutablePoint> m) {
Map<String, MutablePoint> result = new HashMap<String, MutablePoint>();
for (String id : m.keySet()) {
result.put(id, m.get(id));
}
return result;
}
}
这个车辆追踪器最大的问题就是 Point 类是一个易变的线程不安全类,这导致我们不得不在 MonitorVehicleTracker 中加入大量的同步代码,所以我们考虑从修改 Point 类入手(所以说,构建大的线程安全模块,应该从构建小的线程安全模块入手),对于这个错误,我们有两种解决思路:
- 直接把 Point 变为一个不可变对象;
- 构建一个可变但是线程安全的 Point 类,即给 Point 类中的 get 和 set 方法上加上同步,然后我们在 MonitorVehicleTracker 中就不用再使用同步了,相当于缩小了同步代码块的大小。
第一种思路:把 Point 变为一个不可变对象
我们修改 Point 类如下:
public class ImmutablePoint {
public final int x, y;
public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
}
我们可以在车辆追踪器中这样使用:
public class DelegatingVehicleTracker {
private final Map<String, ImmutablePoint> locations;
private final Map<String, ImmutablePoint> unmodifiableMap;
public DelegatingVehicleTracker(Map<String, ImmutablePoint> pointMap) {
// 通过使用ConrentHashMap来保证locations的读写安全
locations = new ConcurrentHashMap<String, ImmutablePoint>(pointMap);
unmodifiableMap = Collections.unmodifiableMap(locations);
}
public Map<String, ImmutablePoint> getLocations() {
return unmodifiableMap;
}
public ImmutablePoint getLocation(String id) {
return locations.get(id);
}
public void setLocation(String id, int x, int y) throws IllegalAccessException { // 不同!
// 这里直接new一个新的ImmutablePoint对象替代原理的对象
if (locations.replace(id, new ImmutablePoint(x, y)) == null) {
throw new IllegalAccessException("No such ID: " + id);
}
}
}
看源码补充:
Collections.unmodifiableMap(Map<? extend K, ? extend V> m)
- 返回一个不可修改的Map,这个 Map 的实现是
Collections.unmodifiableMap(Map<? extend K, ? extend V> m)
- 这个类是 Map 的线程安全装饰类,具体实现为把传入的 Map m 保存在自己的域中,然后把所有的能修改该 Map 的方法的实现改成:
throw new UnsupportedOperationException();
第二种思路:构建一个可变但是线程安全的 Point 类
把 Point 变成是线程安全的可变类:
public class SafePoint {
private int x, y;
public SafePoint(int[] a) {
this(a[0], a[1]);
}
public SafePoint(SafePoint point) {
this(point.get());
}
public SafePoint(int x, int y) {
this.x = x;
this.y = y;
}
public synchronized int[] get() {
return new int[]{x, y};
}
public synchronized void set(int x, int y) {
this.x = x;
this.y = y;
}
}
PublishingVehicleTracker 实现:
public class PublishingVehicleTracker {
private final Map<String, SafePoint> locations;
private final Map<String, SafePoint> unmodifiableMap;
public PublishingVehicleTracker(Map<String, SafePoint> pointMap) {
locations = new ConcurrentHashMap<String, SafePoint>(pointMap);
unmodifiableMap = Collections.unmodifiableMap(locations);
}
public Map<String, SafePoint> getLocations() {
return unmodifiableMap;
}
public SafePoint getLocation(String id) {
return locations.get(id);
}
public void setLocation(String id, int x, int y) throws IllegalAccessException { // 不同!
// 因为Point已经改成线程安全的了,我们可以通过Point自己的set和get方法放心大胆的修改它
SafePoint loc = locations.get(id);
if (loc == null) {
throw new IllegalAccessException("No such ID: " + id);
}
loc.set(x, y);
}
}
如何在现有线程安全类中添加功能
方法一:继承(extends)
不好,因为有的状态不对子类公开。
方法二:装饰类
在装饰类里放个线程安全的 List,然后在写个加锁的扩展方法 putIfAbsent,注意要用 list 当锁,不然锁不一致!
public class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
public boolean putIfAbsent(E x) {
synchronized (list) {
boolean contains = list.contains(x);
if (!contains) {
list.add(x);
}
return !contains;
}
}
}
**缺点:**通过添加一个原子操作的扩展类是脆弱的,因为它将类的加锁代码分布到了多个类中
- List 原有方法的加锁代码在 Collections.SynchronizedList 的代码中
- 新加的 putIfAbsent 方法的加锁代码在 ListHelper 中
方法三:组合(最好)
将 List 的操作委托给底层的 list 实例,并把这些方法都实现为 synchronized 的,然后添加一个新的 synchronized 方法putIfAbsent,然后客户代码不会再直接使用 list 对象,而是通过 ImproveList 来操纵它,这样加锁代码就都在一个类中了,同时底层的 list 实现也不用必须是线程安全的。(就是要写的代码有点多……)
public class ImproveList<E> implements List<E> {
private final List<E> list;
public ImproveList(List<E> list) {
this.list = list;
}
public synchronized boolean putIfAbsent(E x) {
boolean contains = list.contains(x);
if (!contains) {
list.add(x);
}
return !contains;
}
/* 剩下的是List本来的方法,都加上synchronized,然后内部调用底层list实现 */
@Override
public synchronized int size() {
return list.size();
}
// ...
}
02-常用的并发基础构建模块
同步容器类
是一类将对应容器的状态都封装起来,并对每个共有方法都进行同步(加synchronized关键字修饰)的类,相当于让所有对容器的状态的访问串行化,虽然安全但是并发性差。
- Vector
- HashTable
对容器进行迭代操作时,我们要考虑它是不是会被其他的线程修改,如果是我们自己写代码,可以考虑通过如下方式对容器的迭代操作加锁:
synchronized (vector) {
for (int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
}
不过 Java 自己的同步容器类并没有考虑并发修改的问题,它主要采用了一种及时失败的方法,即一旦容器被其他线程修改,它就会抛出异常,例如 Vector 类,它的内部实现是这样的:
Vector#iterator()
会返回一个 Vector 的内部类Itr implement Iterator<E>
,在 Itr 的next()
和remove()
方法中有如下代码:
synchronized (Vector.this) { // 类名.this:在内部类中,要用到外围类的this对象,使用“外围类名.this”
checkForComodification(); // 在进行next和remove操作前,会先检查以下容器是否被修改
...
}
/* checkForComodification()方法 */
final void checkForComodification() {
if (modCount != expectedModCount) // 在Itr的成员变量中有一个:int exceptedModCount = modCount;
throw new ConcurrentModificationException(); // 如果容器被修改了,modCount会变
}
因此,我们在调用 Vector 的如下方法时,要小心,因为它们会隐式的调用 Vector 的迭代操作。
- toString
- hashCode
- equals
- containsAll
- removeAll
- retainAll
- 容器作为参数的构造函数
并发容器类
通过上一小节的分析,我们发现同步容器类的性能实在太差,所以我们可以通过并发容器类代替同步容器类,来提高系统的可伸缩性。我们主要介绍ConcurrentHashMap
和CopyOnWriteArrayList
。
ConcurrentHashMap
特点
- ConcorrentHashMap 实现了 ConcorrentMap 接口,能在并发环境实现更高的吞吐量,而在单线程环境中只损失很小的性能;
- 采用分段锁,使得任意数量的读取线程可以并发地访问 Map,一定数量的写入线程可以并发地修改 Map;
- 不会抛出 ConcorrentModificationException,它返回迭代器具有“弱一致性”,即可以容忍并发修改,但不保证将修改操作反映给容器;
- size() 的返回结果可能已经过期,只是一个估计值,不过 size() 和 isEmpty() 方法在并发环境中用的也不多;
- 提供了许多原子的复合操作:
V putIfAbsent(K key, V value);
:K 没有相应映射才插入boolean remove(K key, V value);
:K 被映射到 V 才移除boolean replace(K key, V oldValue, V newValue);
:K 被映射到 oldValue 时才替换为 newValue
ConcurrentHashMap 内部结构:
- 在构造的时候,Segment 的数量由所谓的 concurrentcyLevel 决定,默认是 16;
- Segment 是基于 ReentrantLock 的扩展实现的,在 put 的时候,会对修改的区域加锁。
锁分段实现原理
锁分段: 不同线程在同一数据的不同部分上不会互相干扰,例如,ConcurrentHashMap 支持 16 个并发的写入器,是用 16 个锁来实现的。它的实现原理如下:
-
使用了一个包含 16 个锁的数组,每个锁保护所有散列桶的 1/16,其中第 N 个散列桶由第(N % 16)个锁来保护;
-
这大约能把对于锁的请求减少到原来的 1/16,也是 ConcurrentHashMap 最多能支持 16 个线程同时写入的原因;
-
对于 ConcurrentHashMap 的 size() 操作,为了避免枚举每个元素,ConcurrentHashMap 为每个分段都维护了一个独立的计数,并通过每个分段的锁来维护这个值,而不是维护一个全局计数;
-
代码示例:
public class StripedMap { // 同步策略:buckets[n]由locks[n % N_LOCKS]保护 private static final int N_LOCKS = 16; private final Node[] buckets; private final Object[] locks; // N_LOCKS个锁 private static class Node { Node next; Object key; Object value; } public StripedMap(int numBuckets) { buckets = new Node[numBuckets]; locks = new Object[N_LOCKS]; for (int i = 0; i < N_LOCKS; i++) locks[i] = new Object(); } private final int hash(Object key) { return Math.abs(key.hashCode() % buckets.length); } public Object get(Object key) { int hash = hash(key); synchronized (locks[hash % N_LOCKS]) { // 分段加锁 for (Node m = buckets[hash]; m != null; m = m.next) if (m.key.equals(key)) return m.value; } return null; } public void clear() { for (int i = 0; i < buckets.length; i++) { synchronized (locks[i % N_LOCKS]) { // 分段加锁 buckets[i] = null; } } } }
注意
- 关于 put 操作:
- 是否需要扩容
- 在插入元素前判断是否需要扩容,
- 比 HashMap 的插入元素后判断是否需要扩容要好,因为可以插入元素后,Map 扩容,之后不再有新的元素插入,Map就进行了一次无效的扩容
- 如何扩容
- 先创建一个容量是原来的2倍的数组,然后将原数组中的元素进行再散列后插入新数组中
- 为了高效,ConcurrentHashMap 只对某个 segment 进行扩容
- 是否需要扩容
- 关于 size 操作:
- 存在问题:如果不进行同步,只是计算所有 Segment 维护区域的 size 总和,那么在计算的过程中,可能有新的元素 put 进来,导致结果不准确,但如果对所有的 Segment 加锁,代价又过高。
- 解决方法:重试机制,通过获取两次来试图获取 size 的可靠值,如果没有监控到发生变化,即
Segment.modCount
没有变化,就直接返回,否则获取锁进行操作。
CopyOnWriteArrayList
-
只要正确发布了这个 list,它就是不可变的了,所以随便并发访问,当需要修改时,就创建一个新的容器副本替代原来的,以实现可变性;
-
应用于迭代操作远多于修改操作的情形,如:事件通知系统,分发通知时需要迭代已注册监听器链表,并调用每一个监听器,一般注册和注销事件监听器的操作远少于接收事件通知的操作。
并发工具类
可以根据自身状态协调线程的控制流:
- 生产者消费者模式:阻塞队列(BlockingQueue)
- 并发流程控制:
- 闭锁(CountDownLatch)
- 栅栏(Barrier)
- 信号量(Semaphore)
- 线程间的数据交换:交换者(Exchanger)
BlockingQueue
BlockingQueue 提供了可阻塞的 put 和 take 方法:(都是阻塞方法,会抛出 InterruptException 异常)
- 如果队列为空,take 方法一直被阻塞,直到队列中出现一个可用元素
- 如果队列已满,put 方法一直被阻塞,直到队列中出现可用空间
是设计 “生产者 -- 消费者模式” 的利器!
Java 中支持的阻塞队列
阻塞队列类 | 结构 | 界 | 特点 |
---|---|---|---|
ArrayBlockingQueue | 数组 | 有 | FIFO |
LinkedBlockingQueue | 链表 | 有 | FIFO |
PriorityBlockingQueue | 优先队列 | 无 | 按优先级先后出队 |
DelayQueue | 使用优先队列实现 | 无 | 向队列中 put 元素时指定多久才能从队列中获取当前元素,只有当延时时间到了,才能从队列中获取该元素,队列元素要实现 Delayed 接口,可以用来设计缓存系统 |
SynchronousQueue | 不存储元素的阻塞队列 | 有 | 每一个 put 操作等待一个 take 操作,否则无法继续添加元素 |
LinkedTransferQueue | 链表 | 无 | transfer() :如果当前有在等待接收元素的消费者,可以把新元素直接给消费者,没有则阻塞;tryTransfer() :如果没有消费者等待会返回 false;它们的区别就在于会不会立即返回 |
LinkedBlockDeque | 链表(双向队列) | 有 | 双向队列可用来实现工作密取模式,即如果一个消费者完成了自己的 Deque 中的全部任务,它可以偷偷的去其他消费者的 Deque 的尾部获取工作,以保证所有线程都处于忙碌状态,可应用于爬虫。 |
阻塞队列的实现原理
对于阻塞队列的实现原理,我们最关注的是其通知模式的实现,即 BlockingQueue 是如何在队列满时通知 put 操作等待,和如何在队列空时通知 take 操作等待的。
我们可以通过阅读 ArrayBlockingQueue 的源码得知:
- ArrayBlockingQueue 中有一个 ReentrantLock lock;
- 这个 lock 给我们提供了两个 Condition:notEmpty 和 notFull;
- take 操作中,会以 while 循环的方式轮询 count == items.length,如果为 true,就 notFull.await(),这个阻塞状态需要通过 dequeue 方法中的 notFull.signal() 来解除;
- put 操作中,会以 while 循环的方式轮询 count == 0,如果为 true,就 notEmpty.await(),这个阻塞状态需要通过 enqueue 方法中的 notEmpty.signal() 来解除。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
int count; // 队列中元素的个数
final ReentrantLock lock; // 下面的两个Condition绑定在这个锁上
private final Condition notEmpty; // 用来等待take的条件
private final Condition notFull; // 用来等待put的条件
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略...
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加可中断锁
try {
while (count == items.length)
notFull.await(); // 轮询count值,等待count < items.length
enqueue(e); // 包含notEmpty.signal();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 轮询count值,等待count > 0
return dequeue(); // 包含notFull.signal();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 会唤醒在等待的take操作
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 会唤醒在等待的put操作
return x;
}
}
CountDownLatch
可以让一个或多个线程等待其他线程操作完成在继续执行,不可以循环使用,只能使用一次。
API
public CountDownLatch(int count); // 参数count为计数值
// 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行,或等待中线程中断
public void await() throws InterruptedException;
// 和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
public void countDown(); // 将count值减1
使用 CountDownLatch 替代 join()
public class CountDownLatchAndJoin {
static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread() {
@Override
public void run() {
System.out.println(1);
countDownLatch.countDown();
System.out.println(2);
countDownLatch.countDown();
}
}.start();
countDownLatch.await();
System.out.println("Main Finished");
}
}
/*
输出:
1
2
Main Finished // main线程会等待main启动的线程执行完再结束
*/
CyclicBarrier
可以让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,让所有线程通过,并且这个屏障可以循环使用(这点和 CountDownLatch 很不同)。
API
/**
* parties指让多少个线程或者任务等待至barrier状态
* barrierAction为当这些线程都达到barrier状态时会执行的内容
*/
public CyclicBarrier(int parties, Runnable barrierAction); // 常用
public CyclicBarrier(int parties);
public int await()
throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit)
throws InterruptedException, BrokenBarrierException, TimeoutException;
Demo
public class CyclicBarrierDemo2 {
static CyclicBarrier barrier = new CyclicBarrier(2, new After());
public static void main(String[] args) {
new Thread() {
@Override
public void run() {
System.out.println("In thread");
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
System.out.println("In main");
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Finish.");
}
static class After implements Runnable {
@Override
public void run() {
System.out.println("All reach barrier.");
}
}
}
/*
输出:
In main // main线程到达屏障之后会被阻塞
In thread
All reach barrier. // thread到达屏障之后会执行After的run
Main finish // 然后被阻塞的main线程和thread线程才会继续执行下去
Thread finish
*/
Semaphore
用来控制同时访问特定资源的线程数量。
API
// 参数permits表示许可数目,即同时可以允许多少线程进行访问,默认是非公平的
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {
sync = (fair) ? new FairSync(permits) : new NonfairSync(permits);
}
/* 会阻塞等待的acquire方法 */
public void acquire() throws InterruptedException; // 获取一个许可
public void acquire(int permits) throws InterruptedException; // 获取permits个许可
public void release(); // 释放一个许可
public void release(int permits); // 释放permits个许可
/* 会阻塞但不等待,立即返回的acquire方法 */
// 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire() { }
// 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException { }
// 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits) { }
// 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException { }
Demo
public class SemaphoreDemo2 {
private static final int THREAD_COUNT = 10;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("save data");
Thread.sleep(1000);
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}
/*
结果会两个两个的蹦出:save data,说明同时只有两个线程能拿到资源
*/
Exchanger
一个用于两个线程间交换数据的工具类。如果第一个线程先执行了exchange(V)
方法,它会阻塞在那里,等待第二个线程执行exchange(V)
方法,exchange(V)
会返回另一个线程传入的数据。
API
public Exchanger();
public V exchange(V x)
throws InterruptedException;
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException;
Demo
public class ExchangeDemo {
private static Exchanger<String> exch = new Exchanger<>();
private static ExecutorService pool = Executors.newFixedThreadPool(2);
// 用来保证线程池在两个线程执行完之后再关闭
private static CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) {
pool.execute(new Runnable() {
@Override
public void run() {
try {
String data = "第一个线程的结果";
Thread.sleep(100);
String res = exch.exchange(data);
System.out.println("我是第一个线程,我收到另一个线程的结果为:" + res);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
pool.execute(new Runnable() {
@Override
public void run() {
try {
String data = "第二个线程的结果";
Thread.sleep(1000);
String res = exch.exchange(data);
System.out.println("我是第二个线程,我收到另一个线程的结果为:" + res);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
latch.await(); // 等待两线程执行完,然后关闭线程池
} catch (Exception e) {
e.printStackTrace();
}
pool.shutdown();
}
}
03-线程池的使用
为什么要使用线程池
我们希望应用程序是这样的:
- 在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性;
- 当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。
如果不使用线程池,为每一个任务都创建一个线程来执行,我们将会遇到如下问题:
- 线程的创建和销毁都需要时间,会延迟请求的处理,并且消耗系统资源;
- 尤其是内存,还会增加 GC 的压力;同时在系统崩溃的临界点,如果多创建一个线程,就会导致系统崩溃,而不是性能的缓慢下降。
- 如果线程数超过了 CPU 数,增加线程反而会降低性能,因为会出现频繁的上下文切换。
合理使用线程池的好处:
- 降低资源消耗:可以重复使用已经创建好的线程
- 提高响应速度:任务到达时,可以不需要等待线程创建的时间
- 提高线程的可管理性
线程池中的工作线程是如何实现线程复用的?
一个线程一般在执行完任务后就结束了,怎么再让他执行下一个任务呢?
线程重用的核心是,我们知道,Thread.start()只能调用一次,一旦这个调用结束,则该线程就到了stop状态,不能再次调用start。
则要达到复用的目的,则必须从Runnable接口的run()方法上入手,可以这样设计这个Runnable.run()方法(就叫外面的run()方法):
它本质上是个无限循环,跑的过程中不断检查我们是否有新加入的子Runnable对象(就叫内部的runnable:run()吧,它就是用来实现我们自己的任务),有就调一下我们的run(),其实就一个大run()把其它小run()#1,run()#2,...给串联起来了,基本原理就这么简单不停地处理我们提交的Runnable任务。
public void run() {
while(true) {
if(tasks available) {
Runnable task = taskqueue.dequeue();
task.run();
} else {
// wait or whatever
}
}
}
Executor 框架概述
通过 Executor 框架,我们可以将工作单元(Runnable & Callable)与执行机制(Executor)分离,即将任务的提交和任务的执行分离。
Executor 框架主要由 3 大部分组成
- 任务: 实现接口:Runnable 接口或 Callable 接口。
- 任务的执行:包括任务执行机制的核心接口 Executor,以及继承自 Executor 的 ExecutorService 接口。
- Executor 框架有两个关键类实现了 ExecutorService 接口
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
- Executor 框架有两个关键类实现了 ExecutorService 接口
- 异步计算的结果 :接口 Future 和实现 Future 接口的 FutureTask 类。
Executor 是基于生产者 -- 消费者模式的
- 提交任务的操作相当于生产者(生成待完成的工作单元)
- 执行任务的线程则相当于消费者(执行完这些工作单元)
Executor 框架的主要类和接口
Executor 接口: 框架的基础,线程池都是实现自它的子接口的。
public interface Executor {
void execute(Runnable command);
}
Executor 框架的主要成员:
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
- Future 接口 & FutureTask 实现类
- Executors 工厂类
接下来,我们将一一介绍这些 Executor 框架的组件。在介绍这些组件之前,我们先来看一下线程池要如何使用。
线程池的基本使用方法
public class ThreadPoolDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(5);
CountDownLatch latch = new CountDownLatch(15); // 用来判断线程池是否可以关闭
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("线程开始");
Thread.sleep(1000);
System.out.println("线程结束");
latch.countDown();
} catch (InterruptedException e) {
}
}
};
for (int i = 0; i < 15; i++) {
pool.execute(runnable);
}
latch.await(); // 等待线程池中的线程运行完毕
System.out.println("finish");
pool.shutdown();
}
}
以上 Demo 运行之后,15 个线程不会一下执行完,而是会 5 个 5 个的往外蹦。
ThreadPoolExecutor
ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。一般通过 Executors 工具类创建,我们可以通过 Executor 创建如下三种 ThreadPoolExecutor:
- FixedThreadPool
- CacheThreadPool
- SingleThreadExecutor
接下来我们将分别介绍它们。
首先,我们需要介绍一下 ThreadPoolExecutor 的构造方法,因为以上三种 ThreadPoolExecutor 其实都是被赋予了不同的构造参数的 ThreadPoolExecutor 对象。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
参数说明:
参数 | 描述 |
---|---|
corePoolSize | 核心线程池大小,即没有执行任务时的线程池大小,只有在工作队列满了的情况下才会创建超出这个数量的线程 |
maximumPoolSize | 最大线程池的大小 |
keepAliveTime | 某个线程的空闲时间超过了存活时间,那么将被标记为可回收的 |
BlockingQueue | 用来暂时保存任务的工作队列 |
RejectedExecutionHandler | 当 ThreadPoolExecutor 已经关闭或者达到了最大线程池大小并且工作队列已满时,调用 execute() 方法会调用 RejectedExecutionHandler handler 的 rejectedExecution(Runnable r, ThreadPoolExecutor executor); 方法 |
FixedThreadPool
特点: 固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,如果某个线程由于发生了未预期的 Exception 而结束,那么线程池会补充一个新的线程。
创建方法:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, // 线程池大小不可扩展
0L, TimeUnit.MILLISECONDS, // 多余线程会被立即终止
new LinkedBlockingQueue<Runnable>());
// 使用容量为 Integer.MAX_VALUE 的工作队列
// 由于使用了无界队列,不会拒绝任务,所以不会调用 handler
}
CacheThreadPool
特点: 可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
创建方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 初始为0,线程池中的线程数是无界的
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
注意:
- 池中不会有空闲线程,也不会有等待的线程
- 一旦任务到达的速度大于线程池处理任务的速度,就会创建一个新的线程给任务
- 与另外两个线程池不同的地方在于,这个工作队列并不是用来放还没有执行的任务的,而是用来放执行过任务后空闲下的线程的,空闲下来的线程会被:
SynchronousQueue#poll(keepAliveTime, TimeUnit.NANOSECONDS)
poll 到工作队列中等待 60s,如果这 60s 有新的任务到达了,这个线程就被派出去执行任务,如果没有,就销毁。
SingleThreadPool
特点: 单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。能确保依照任务在队列中的顺序来串行执行。
创建方法:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, // 线程池的大小固定为1
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
// 使用容量为 Integer.MAX_VALUE 的工作队列
}
Remarks
- 在创建 ThreadPoolExecutor 初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用 prestartAllCoreThreads
- 将线程池的 corePoolSize 设置为 0 且不使用 SynchronousQueue 作为工作队列会产生的奇怪行为:只有当线程池的工作队列被填满后,才会开始执行任务
- 产生原因:如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下ThreadPoolExecutor才会创建新的线程,如果线程池的基本大小为零并且其工作队列有一定的容量,那么当把任务提交给该线程池时,只有当线程池的工作队列被填满后,才会开始执行任务,因为这个时候才会创建新的线程,在此之前,线程池只有在工作队列中等待任务,没有执行任务的线程。
ScheduledThreadPoolExecutor
特点: 可以在给定的延迟后运行命令,或者定期执行命令。比 Timer 更灵活,功能更强大。
创建方法: 也是通过 Executor 工具创建。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
然后可以通过 schedule 方法提交线程到线程池:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit)
实现原理:
- 使用 DelayWorkQueue 作为工作队列,ScheduledThreadPoolExecutor 会把待执行的任务 ScheduledFutureTask 放到工作队列中
- ScheduledFutureTask 中有以下 3 个主要的成员变量:
- long time:表示该任务将要被执行的具体时间;
- long sequenceNumber:表示任务被添加到 ScheduledThreadPoolExecutor 中的序号;
- long period:表示任务执行的间隔周期。
- 任务执行的过程:
- 线程从 DelayWorkQueue 中获取到期的任务;
- 执行这个任务;
- 修改这个任务的 time 为下一次的执行时间;
- 将该任务再次 add 进 DelayWorkQueue。
对比 Timer(Timer 的缺陷)
- Timer 在执行所有定时任务时只会创建一个线程。如果有一个任务执行时间太长导致它后面的任务超时,那么后面超时的任务会立即执行,从而破坏了其他 TimerTask 的准时执行。线程池能弥补这个缺陷,因为它可以提供多个线程来执行延时任务和周期任务。
- 线程泄漏:Timer 线程并不捕获未检查异常,当 TimerTask 抛出未检查的异常时将终止定时线程。这种情况下,整个 Timer都会被取消,将导致已经被调度但尚未执行的 TimerTask 将不会再执行,新的任务也不能被调度。
ThreadPoolExecutor 补充内容
BlockingQueue<Runnable> workQueue
的设置
- 无界队列
- 使用无界队列的线程池
- newFixedThreadPool
- newSingleThreadExecutor
- BlockingQueue 选择
- 无界的 LinkedBlockingQueue
- 使用无界队列的线程池
- 有界队列 (可以避免资源耗尽,队列满了的处理方法请看下小节:
RejectedExecutionHandler handler
的设置)- 只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题
- BlockingQueue 选择
- ArrayBlockingQueue
- 有界的 LinkedBlockingQueue
- PriorityBlockingQueue
- BlockingQueue 选择
- 只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题
- 同步移交 (SynchronousQueue)
- newCachedThreadPool 中使用
- 对于非常大的或者无界的线程池,可以通过使用 SynchronousQueue 来避免任务排队
- SynchronousQueue 不是一个真正的队列,而是一种在线程之间进行移交的机制
- 要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么 ThreadPoolExecutor 将创建一个新的线程,否则这个任务将被拒绝。
RejectedExecutionHandler handler
的设置`
JDK 提供了 4 种 RejectedExecutionHandler 接口的实现,它们都是以 ThreadPoolExecutor 类的静态内部类的形式定义的,它们的具体实现以及拒绝策略如下:
-
AbortPolicy (默认)(Abort:流产)
-
抛出未检查的 RejectedExecutionException,调用者自己捕获处理
-
实现:
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); // 抛异常! } }
-
这个是 ThreadPoolExecutor 的默认的 RejectedExecutionHandle handler,ThreadPoolExecutor 中有一个:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
如果调用 ThreadPoolExecutor 的构造方法时没有给出 RejectedExecutionHandle 参数的话,它就会将上面的 defaultHandler 作为参数构造 ThreadPoolExecutor 对象,像这样:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); // 默认传入了 defaultHandler }
-
-
DiscardPolicy (Discard:抛弃)
-
抛弃新提交的任务
-
实现:它的
rejectedExecution
方法啥都没干……public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
-
-
DiscardOldestPolicy
-
抛弃下一个被执行的任务,然后重新尝试提交任务
-
实现:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 先判断线程池关没 e.getQueue().poll(); // 丢到等待队列中下一个要被执行的任务 e.execute(r); // 重新尝试提交新来的任务 } } }
-
不要和 PriorityBlockingQueue 一起使用,会丢失优先级最高的任务
-
-
CallerRunsPolicy (既不抛出异常,也不抛弃任务)
-
它不会在线程池中执行该任务,而是在调用 execute 提交这个任务的线程执行
-
如当主线程提交了任务时,任务队列已满,此时该任务会在主线程中执行。这样主线程在一段时间内不会提交任务给线程池,使得工作者线程有时间来处理完正在执行的任务
-
可以实现服务器在高负载下的性能缓慢降低
-
提交任务的应用程序被拿去执行任务了,不会返回 accept,TCP 层的请求队列会被填满而抛弃请求,客户端才会反应过来,即可以通过 TCP 层来缓冲一下
-
实现:
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 直接在把它提交来的线程调用它的 run 方法,相当于没有新建一个线程来执行它, // 而是直接在提交它的线程执行它,这样负责提交任务的线程一段时间内不会提交新的任务来 r.run(); } } }
-
ThreadPoolExecutor 的饱和策略可以通过调用 setRejectedExecutionHandler 来修改。
Future 接口 & FutureTask 实现类
Future 接口 & FutureTask 实现类表示异步运算的结果,截至至 Java 8,FutureTask 是 Future 接口唯一的实现类。
FutureTask 的状态迁移:
FutureTask 的 get 和 cancel 执行效果:
Future 的 get 方法对于任务的状态的不同表现:
- 任务已完成:立即返回结果或抛出异常。
- 任务未完成:阻塞直到任务完成。
- 任务抛出异常:将异常封装为 ExecutionException 后再次抛出,ExecutionException 异常可以通过 getCause() 方法获取被封装的初始异常。
- 任务被取消:抛出 CancallationException 异常,这是个 RuntimeException,需要显式 catch。
Runnable 接口 & Callable 接口
Java 创建线程的三种方式:
- extends Thread
- implements Runnable
- implements Callable
Callable 接口是比 Runnable 更好的基本任务表示形式,它任务主入口点 call 将返回一个值或者抛出一个异常。
public interface Callable<V> {
V call() throws Exception;
}
// 如果想要使用无返回值的 Callable,可以使用 Callable<Void>
Executor 的生命周期
Executor 的生命周期方法:
public interface ExecutorService extends Executor {
// shutdown方法将执行平缓的关闭过程:
// 不再接受新的任务,同时等待已经提交的任务执行完成(包括那些还未开始执行的任务)
void shutdown();
// 执行粗暴的关闭过程:
// 它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务
List<Runnable> shutdownNow();
boolean isshutdown();
// 返回ExecutorService是否已经终止
boolean isTerminated();
// 等待ExecutorService到达终止状态,一般调用完它之后立即调用shutdown
boolean awaitTermination(long timeout,TimeUnit unit)
throws InterruptedException;
// ...
}
ExecutorService的生命周期的3种状态:
- 运行: 初始创建的时候。
- 关闭:调用 shutdown 和 shutdownNow,关闭任务
- 在 ExecutorService 关闭后提交的任务将由“拒绝执行处理器(RejectedExecutionHandler)”来处理,它会抛弃任务,或者使得 execute 方法抛出一个未检查的 RejectedExecutionException(RuntimeException异常)
- 已终止:等所有任务都完成后,ExecutorService 将转入终止状态
- 可以调用 awaitTermination 等待 ExecutorService 到达终止状态
- 可以通过调用 isTerminated 来轮询 ExecutorService 是否已经终止
设置线程池的大小
线程池过大过小的缺点
- 过大
- 大量线程将在很少的 CPU 资源上发生竞争
- 大量空闲线程会耗费内存,导致资源耗尽
- 过小
- CPU 闲置,系统吞吐率下降
线程池大小的设置
-
计算密集型任务:N = N_cpu + 1
- 加 1 的原因:当有一个线程偶尔故障时,额外的那个线程可以立即补上,保证CPU时钟不会被浪费
-
包含 I/O 或其他阻塞操作:N = N_cpu * U_cpu * (1 + W / C)
- N_cpu:CPU 的个数
- U_cpu:目标 CPU 利用率
- W / C:等待时间 (Wait) / 计算时间 (Compute)
- 获取 CPU 数目的方法:
int N_CPUS = Runtime.getRuntime().availableProcessors();
04-安全取消任务
简单的任务取消方法
最简单的任务取消方法:自己设置一个取消标记:private volatile boolean cancelled;
,然后在运行任务过程中不断的循环判断这个标记,然后用另一个线程改变这个标记,一旦当前线程检测到这个取消标记发生变化,就退出停止执行。
示例:
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) { // 这里循环判断这个标记
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true; // 用另一个线程设置这个取消标记
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
}
不过这种方法是有缺陷的,一旦正在执行的任务发生了阻塞,并且该阻塞状态一直没有解除,那么它将不再有机会判断取消标记,这样即使令 cancelled = true
了,需要被取消的线程也检测不到。就像下面这段代码这样:
class BrokenPrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled)
// 如果队列已经满了,而且也没有消费者从队列中take元素了
// 这个线程将一直卡在这里,没有机会去判断cancelled的状态
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {}
}
public void cancel() {
cancelled = true;
}
}
中断!!!
通过上一节的讲解我们发现,自己设置一个标记用来判断线程是否要被取消实在不太好用,所以我们就不自己定义一个 boolean 表示线程的状态了,而是直接用 Java 给我们提供的中断机制,其实,每个线程都有一个 boolean 类型的中断状态。当中断线程时,这个线程的中断状态将被设置为 true,也就是说: 中断并不会真正地停止一个正在运行的线程,只是将被中断线程的中断标记设置为 true,然后由线程自己在一个合适的时刻检查自己的中断标记中断自己(这些时刻也被称为取消点),这样可以防止线程在不应该被中断的地方强制停止执行。
中断方法
方法 | 说明 |
---|---|
public void interrupt() | 中断目标线程(将当前线程的中断标记设置为 true) |
public boolean isInterrupted() | 返回目标线程的中断状态,执行后中断标记还保持它原来的值 |
public static boolean interrupted() | 返回目标线程的中断状态,执行后将中断标记设置为为 false |
注意:
interrupted()
方法是能清除中断状态的唯一方法;- 在调用
interrupted()
返回值为 true 时,除非我们想要屏蔽这个中断,否则必须对它进行处理。有以下两种处理方式:(这两种方法也是正确的中断处理方法!)
- 抛出
InterruptedException
- 再次调用
interrupt()
方法,将中断标记恢复为 true
中断是如何解决简单的任务取消方法中的阻塞问题的?
阻塞方法一般都会 throws InterruptedException
,它们在中断标记被修改并且被它们检测到后会:
- 清除中断标记,即把中断标记设置为 false;
- 抛出 InterruptedException 异常,停止线程执行。
注意: JVM 并不保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。
ThreadPoolExecutor 拥有的线程检测到中断时的操作
检查线程池是否正在关闭:
- 如果是,在结束之前执行一些线程池清理工作;
- 如果不是,创建一个新线程将线程池恢复到合理的规模。
Future 实现计时运行
需求: 给一个 Runnable r 和时间 long timeout,解决“最多花 timeout 分钟运行 Runnable,没运行完就取消”这种要求。
private static final ExecutorService cancelExec = Executors.newCachedThreadPool();
public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
Future<?> task = cancelExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 如果超时,抛出超时异常
} catch (ExecutionException e) {
// 如果任务运行出现了异常,抛出任务的异常
throw launderThrowable(e.getCause());
} finally {
// 如果任务已经结束,这句没影响
// 如果任务还在运行,这句会中断任务
task.cancel(true);
}
}
安全停止基于线程的服务
线程的所有权
线程的所有者: 创建这个线程的类(线程池是其工作者线程的所有者,如果要中断这些线程,要通过线程池)
线程的所有权是不可传递的:
- 应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程,而是要通过服务来停止。
- 对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。 服务可以通过生命周期方法关闭它所拥有的线程。
一个有问题的日志服务
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private final PrintWriter writer;
private boolean isShutdown; // 新加一个关闭判断
private static final int CAPACITY = 1000;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
public void log(String msg) throws InterruptedException {
if (!isShutdown) { // 如果关了,就抛异常
queue.put(msg); // 但是这一行和上一行不是一个原子操作,有可能会丢失日志
// 我们又不能给这个方法加synchronized,因为给一个阻塞方法加锁是很危险的!
} else {
throw new IllegalStateException("logger is shut down");
}
}
private class LoggerThread extends Thread {
public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // autoflush
}
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}
方法一:BlockingQueue + isShutdown + count
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
private boolean isShutdown;
// 这个计数器的作用是:如果queue满了,有线程阻塞着,
// 它可以保证所有在关日志前添加的日志都记录了再真正关闭日志服务
private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations; // 记录待处理的日志数量
}
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (isShutdown && reservations == 0)
break; // 只有当isShutdown == true并且没有待处理的日志时才能关闭日志服务
}
String msg = queue.take();
synchronized (LogService.this) {
--reservations; // 处理完一个,待处理的日志数就-1
}
writer.println(msg);
} catch (InterruptedException e) { /* retry */
}
}
} finally {
writer.close();
}
}
}
}
方法二:线程池 ExecutorService
ExecutorService 的关闭方法:
- shutdown
- 会把当前执行的和尚未启动的任务清单中的程序执行完再关闭
- 关闭速度慢但安全
- shutdownNow
- 首先关闭当前正在执行的任务,然后任何返回任务清单中尚未执行的任务
使用 SingleThreadExecutor 线程池构建日志服务:
public class LogService {
private final ExecutorService exec = Executors.newSingleThreadExecutor();
private final PrintWriter writer;
public void start() {}
public void stop() {
try {
// 下两行经常一起用!
exec.shutdown();
exec.awaitTermination(TIMEOUT, UNIT);
} finally {
write.close();
}
}
public void log(String msg) {
try {
exec.execute(new WriteTask(msg));
} catch (RejectedExecutionException e) {}
}
}
毒丸对象
另一种关闭生产者一消费者服务的方式就是使用“毒丸(Poison Pill)”对象。 “毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止。 在 FIFO(先进先出)队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作。
注意:
- 只有在无界队列中,“毒丸”对象才能可靠地工作,否则可能无法将毒丸对象 put 到队列中去。
- 只有在生产者和消费者的数量都已知的情况下,才可以使用“毒丸”对象,否则无法判断应该使用几个毒丸对象。
- 扩展到多个生产者:每个生产者都向队列中放入一个“毒丸”对象,并且消费者仅当在接收到生产者数量个“毒丸”对象时才停止。
- 扩展到多个消费者的情况:生产者将消费者数量个“毒丸”对象放入队列。
处理 RuntimeException
RuntimeException 通常不会被捕捉,是导致线程死亡的最主要原因。
处理方法
方法一:在线程池内部构建一个工作者线程。如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知框架该线程已经终结。
public void run() {
Throwable thrown = null;
try {
while (!isInterrupted()) {
runTask(getTaskFromWorkQueue());
}
} catch (Throwable e) {
thrown = e;
} finally {
threadExited(this, thrown);
}
}
方法二: implements Thread.UncaughtException
接口
public interface UncaughtExceptionHandler {
void uncaughtException(Thread t, Throwable e);
}
令会抛出 RuntimeException 异常的的类实现 UncaughtException 接口,这样当该类抛出未捕获的异常时,会执行 uncaughtException(Thread t, Throwable e)
方法,我们可以在这个方法中将错误信息写入日志,或者尝试重新启动线程,关闭应用程序等。
注意: 只有通过 execute 提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过 submit 提交的任务,无论是抛出的未检查异常还是已检查异常,都将被认为是任务返回状态的一部分。如果一个由 submit 提交的任务由于抛出了异常而结束,那么这个异常将被 Future.get 封装在 ExecutionException 中重新抛出。
JVM 关闭
关闭钩子
JVM 关闭流程:
- 在正常关闭中,JVM 首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过
Runtime.addShutdownHook(Thread)
注册的但尚未开始的线程。JVM并不能保证关闭钩子的调用顺序。 - 当被强行关闭时,只是关闭JVM,而不会运行关闭钩子。
应用:通过注册一个关闭钩子来停止日志服务
public void start(){
Runtime.getRuntime().addshutdownHook(new Thread() {
public void run() {
try { Logservice. this. stop(); }
catch (InterruptedException ignored) {}
}
});
}
注意:Runtime 是个单例类
守护线程 Daemon
定义: 一种特殊的线程,当进程中不存在非守护线程了,守护线程自动销毁。
应用: 垃圾回收线程,当进程中没有非守护线程了,垃圾回收线程就没有存在的必要了,会自动销毁。
设置方法: thread.setDaemon(true);
Ch3-Java并发高级
00-Java中的锁
Lock 接口
在 Java SE 5 后,Java 并发包 java.util.concurrent 中新增了 Lock
接口及其相关实现类,如:ReentrantLock
来实现锁的功能,它提供了与 synchronized
相似的同步功能,不过在使用时需要显示的获取锁和释放锁,虽然从这个角度来看,使用 Lock 接口更为麻烦,不过我们可以通过 Lock 接口的实现类,实现以下功能:
- 尝试非阻塞的获取锁,即
tryLock()
tryLock()
方法会尝试非阻塞的获取锁,即调用方法后不管能否取到锁都会立即返回,不会被阻塞
- 能被中断的获取锁
- 与 synchronied 不同,获取到锁的线程能相应中断,当线程被中断时,中断异常会被抛出,并且锁也会被释放
- 超时获取锁,即
tryLock(long time, TimeUnit unit)
Lock
的标准使用示例
Lock lock = new ReentrantLock();
lock.lock();
try {
// 同步代码块
} finally {
lock.unlock(); // 千万不能忘记在finally块中释放锁
}
Lock
的 API
/* 构造方法 */
public ReentrantLock(boolean fair) { // fair默认是false
sync = fair ? new FairSync() : new NonfairSync();
}
/* 重要方法 */
void lock()
void lockInterruptibly() throws InterruptedException
boolean tryLock()
boolean tryLock(long time, TimeUnit unit) throws InterruptedException
void unlock()
Condition newCondition()
接下来,我们将对 Lock 的重要方法进行介绍。
3 个高级的 lock 方法
轮询锁: tryLock()
-
只有在锁没有被其他线程拿到时才获取锁,然后返回 true,否则返回 false,会立即返回,不会阻塞
-
不是可中断锁
-
可以避免锁顺序死锁的发生
-
我们知道,死锁发生的一个典型示例就是锁顺序死锁,即(假设我们要进行一个转账操作)
public boolean transferMoney(Account fromAcct, Account toAcct, double money) { synchronized (fromAcct) { synchronized (toAcct) { // 转账 } } } // 调用 final Account A = new Account(); final Account B = new Account(); new Thread() { public void run() { transferMoney(A, B, 100) } }.start(); new Thread() { public void run() { transferMoney(B, A, 100) } }.start(); // 两个线程在进行方向相反的转账操作,及容易发生死锁!
-
我们可以通过
tryLock()
的方式来避免锁顺序死锁public boolean transferMoney(Account fromAcct, Account toAcct, double money) { long fixedDelay = getFixedDelayComponentNanos(timeout, unit); // 固定时延部分 long randMod = getRandomDelayModulusNanos(timeout, unit); // 随机时延部分 long stopTime = System.nanoTime() + unit.toNanos(timeout); // 过期时间 while (true) { if (fromAcct.lock.tryLock()) { try { if (toAcct.lock.tryLock()) { // 如果失败了,该线程会放开已经持有的锁,避免了死锁发生 try { // 转账 } finally { toAcct.lock.unlock(); } } } finally { fromAcct.lock.unlock(); } } if (System.nanoTime() < stopTime) // 检查是否超时 return false; NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod); // 等待一定时长,防止陷入活锁 } }
-
定时锁: tryLock(long time, TimeUnit unit)
- 定时锁是可中断锁,你看它是能
throw InterruptedException
的,能抛出 InterruptedException 的方法都是阻塞方法 - 等待 timeout 时间,再去 tryLock 锁
中断锁: lockInterruptibly()
- 能在获得锁的同时保持对中断的响应,即在调用 lockInterruptibly() 获得锁之后,如果线程被 interrupt() 打上了中断标记,会抛中断异常
- 相当于在同步代码块中加入了取消点
公平锁与非公平锁: ReentrantLock(boolean fair)
- 公平锁: 在有线程持有锁和有线程在队列中等待锁的时候,会将新发出请求的线程放入队列中,而不是立即执行它,也就是说,获取锁的顺序和线程请求锁的顺序是一样的。
- 非公平锁: 只当有线程持有锁时,新发出请求的线程才被放入队列中,如果新的线程到达时没有线程持有锁,但队列中有等待的线程(比如队列中的线程还在启动中,还没有拿到锁),这时新请求锁的线程会先于队列中的线程获取锁。
- 非公平锁性能更优的原因:
- 恢复一个被挂起的线程到这个线程真正运行起来之间,存在着巨大时时延
- 在等待队列中的线程被恢复的超长时延里,如果正好进来了一个线程获取锁,非公平锁会让这个新进来的线程先执行,它很有可以能等待队列中的线程恢复运行前就执行完了,相当于时间不变的情况下,利用等待线程的恢复运行时延,多执行了一个线程
- 只要当线程运行时间长,或锁的请求频率比较低时,才适合使用公平锁
Condition: newCondition()
在介绍 Condition 前,我们要先来介绍以下为什么需要 Condition,因此,我们需要先来介绍一下 “等待/通知机制”。
等待/通知机制
主要方法: 这些方法都是 Object 类的方法,因为 synchronized 可以将任意一个对象作为锁。
wait() // 使调用该方法的线程释放锁,从运行状态中退出,进入等待队列,直到接到通知或者被中断
wait(long timeout) // 等待time毫秒内是否有线程对其进行了唤醒,如果超过这个时间则自动唤醒
notify() // 随机唤醒等待队列中等待锁的一个线程,使该线程退出等待队列,进入可运行状态
notifyAll() // 使所有线程退出等待队列,进入可运行状态,执行的顺序由JVM决定
注意
- 在调用以上这些方法时,如果它们没有持有适当的锁,即不在同步代码块中,会抛出 IllegalMonitorStateException 异常(RuntimeException,不用 catch),同时调用 wait() 和 notify() 的方法也必须是同一个对象
- 最好使用 notifyAll() 来唤醒等待线程,不然很容易发生死锁
wait 的线程是如何被其对应的 notify 通知到的?(等待/通知机制实现原理)!!!
- 每个锁对象都又两个队列,一个是就绪队列,一个是阻塞队列
- 就绪队列中存储了将要获得锁的线程,阻塞队列中存储了被阻塞的线程
- 一个线程被唤醒后,会进入就绪队列,等待 CPU 调度
- 一个线程被 wait 后就会进入阻塞队列,等待其他线程调用 notify,它才会被选中进入就绪队列,等待被 CPU 调度
条件队列的标准使用形式:
void stateDependentMethod() throws InterruptedException {
synchronized (lock) {
while(!conditionPredicate())
lock.wait(); // 一个条件队列可能与多个条件相关,
// 我们并不知道notifyAll是针对哪一个条件的,
// 为了防止wait被过早唤醒,wait必须放在循环中!
}
}
void stateAwakeMethod() {
synchronized (lock) {
lock.notifyAll(); // 不要使用notify!!!
// 一旦有一个notify错误的在wait前执行了,
// 将会有一个wait永远无法被唤醒!
}
}
使用 wait 和 notifyAll 实现可重新关闭的阀门:巧妙的用法!!!
public class ThreadGate {
@GuardedBy("this") private boolean isOpen;
@GuardedBy("this") private int generation;
public synchronized void close() {
isOpen = false;
}
public synchronized void open() {
++generation;
isOpen = true;
notifyAll();
}
public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
// 如果阀门打开后很快就关闭了,那么这个while循环可能检测不到isOpen为true的状态,
// 会一直阻塞在这里;添加一个generation,在open时该变它的值,
// 这样只要open了一次,这个while循环就一直为false了,一定会放行线程!
while (!isOpen && arrivalGeneration == generation)
wait();
}
}
通过观察上面:条件队列的标准使用形式,我们发现 “等待/通知机制” 由于不能指定条件,使用起来是很不方便的,因为我们不能控制 notify 唤醒的 wait 到底是哪一个,可能会导致:提前唤醒了正在 wait 的线程,然后本应该被唤醒的 wait 却没有被唤醒。这种时候,我们可以通过 Condition
来实现 wait 和 notify 的分堆,防止 notify 唤醒别人的 wait。
Condition 接口
/* 获取Condition的方法 */
protected final Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
/* Condition接口中的方法 */
void await() throws InterruptedException; // 相当于wait()
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException; // 相当于wait(long timeout)
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal(); // 相当于notify()
void signalAll(); // 相当于notifyAll()
有了 Condition 后,我们就可以选择使用 signal()
而不是 signalAll()
了,因为我们不用担心 signal()
会唤醒其他 await()
然后错过自己本该唤醒的 await()
了。这个时候使用 signal()
,每次只会唤醒一个线程,能降低锁的竞争,减少上下问切换的次数,性能是要比 signalAll()
好的。
synchronized 和 ReentrantLock 的选择
- 选择方式:
- 只有当我们需要如下高级功能时才使用 ReentrantLock,否则优先使用 synchronized
- 可轮询、可定时、可中断的锁
- 公平锁
- 非块结构的锁
- 只有当我们需要如下高级功能时才使用 ReentrantLock,否则优先使用 synchronized
- 优先选择 synchronized 的原因:
- Java 6开始,ReenstrantLock 和内置锁的性能相差不大
- synchronized 是 JVM 的内置属性,未来更有可能对 synchronized 进行性能优化,如对线程封闭的锁对象的锁消除,增加锁的粒度等
- ReenstrantLock 危险性更高(如忘记在 finally 块中 lock.unlock() 了,会导致锁永远无法被释放,出现问题,极难 debug)
- 许多现有程序中已使用了 synchronized,两种方式混合使用比较易错
读写锁:ReentrantReadWriteLock
特点: 支持读操作并发执行,涉及到写操作时才线程间互斥执行。
方法:
- 获得读锁:
lock.readLock().lock()
- 释放读锁:
lock.readLock().unlock()
- 获得写锁:
lock.writeLock().lock()
- 释放写锁:
lock.writeLock().unlock()
01-Java中13个原子操作类
分类
- 原子更新基本类型
AtomicBoolean
AtomicInteger
AtomicLong
- 原子更新数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
- 原子更新引用类型
AtomicReference
AtomicReferenceFieldUpdater
:原子更新引用类型里的字段AtomicMarkableReference
:原子更新带有标记位的引用类型
- 原子更新字段类
AtomicIntegerFieldUpdater
:原子更新 Integer 字段的更新器AtomicLongFieldUpdater
:原子更新长 Long 字段的更新器AtomicStampedReference
:原子更新带版本号的引用类型。即将整数的版本号值与引用关联起来,每一次更新都会改变版本号值,可以用来解决 CAS 中的 ABA 问题
AtomicInteger
的常用方法
// 原子方式将数值加delta,并返回
public final int addAndGet(int delta)
// 如果oldValue == expect,将它更新为update
public final boolean compareAndSet(int expect, int update)
// 类似i++操作,返回的是旧值
public final int getAndIncrement()
// 最终会设置为newValue,使用lazySet设置后,可能在之后的一段中,其他线程读到的还是旧值
public final void lazySet(int newValue)
// 原子方式设置新值,并返回旧值
public final int getAndSet(int newValue)
使用方法:
public class AtomicIntegerDemo {
static AtomicInteger ai = new AtomicInteger(1);
public static void main(String[] args) {
System.out.println(ai.getAndIncrement());
System.out.println(ai.get());
}
}
AtomicIntegerArray
的常用方法
// 对数组索引为i的元素进行addAndGet(int delta)操作
public final int addAndGet(int i, int delta)
// 对数组索引为i的元素进行compareAndSet(int expect, int update)操作
public final boolean compareAndSet(int i, int expect, int update)
使用方法:
public class AtomicIntegerArrayDemo {
static int[] value = new int[] {1, 2};
static AtomicIntegerArray ai = new AtomicIntegerArray(value);
public static void main(String[] args) {
ai.getAndSet(0, 3)
System.out.println(ai.get(0));
System.out.println(value[0]);
}
}
AtomicReference
使用示例
public class AtomicReferenceDemo {
public static AtomicReference<User> ar =
new AtomicReference<User>();
public static void main(String[] args) {
User user = new User("user", 15);
ar.set(user);
System.out.println(ar.get().name);
System.out.println(ar.get().old);
}
public static class User {
public String name;
public int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
}
}
AtomicIntegerFieldUpdater
使用示例
想要原子的更新字段需要两步:
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName)
得到更新器实例,设置想要进行原子更新的类和具体的属性- 更新类的字段必须使用
public volatile
修饰
示例:
public class AtomicIntegerFieldUpdaterDemo {
private static AtomicIntegerFieldUpdater<User> updater =
AtomicIntegerFieldUpdater.newUpdater(User.class, "old");
public static void main(String[] args) {
User user = new User("user", 10);
System.out.println(updater.getAndIncrement(user));
System.out.println(user.old);
}
public static class User {
public String name;
public volatile int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
}
}
02-非阻塞同步机制
对比锁与非阻塞同步机制
锁
- 优势: 线程之间存在竞争时,锁能自动处理竞争问题,即让一个线程拿到锁执行,然后阻塞其他线程。
- 劣势:线程被阻塞到恢复执行的过程中存在很大的性能开销。
- 有一些智能的 JVM 会根据之前的操作对锁的持有时间的长短,判断是自旋等待还是挂起线程,以提高性能。
非阻塞同步:CAS(Compare and Set)比较并设置
- 输入:
- 需要读写的内存位置 V
- 我们认为这个位置现在的值 A
- 想要写入的新值 B
- 输出: V 位置以前的值(无论写入操作是否成功)
- 含义: 我们认为 V 处的值应该是 A,如果是,把 V 处的值改为 B,如果不是则不修改,然后把 V 处现在的值返回给我。
乐观锁与悲观锁
锁与 CAS 分别对应着悲观锁与乐观锁这两种不同的锁。它们的定义如下:
- 悲观锁
- 就是独占锁,假设最坏情况,同一时刻只允许一个线程执行
- 适合写多读少,锁竞争严重的情况 (当资源竞争严重时,CAS 大概率会自旋,会浪费 CPU 资源)
- 乐观锁
- 借助冲突检查机制判断在更新状态的过程中有没有其他线程修改状态,如果有,更新操作失败,可以选择重试
- 适合读多写少,资源竞争少的情况 (资源竞争少时,使用 synchronized 同步锁会进行线程的阻塞和唤醒,而 CAS 不需要切换线程,并且自旋概率较低)
非阻塞算法
我们一般通过使用原子变量类来实现非阻塞同步算法,因为它们有 compareAndSet 方法
非阻塞计数器(基于 1 个CAS)
public class CasCounter {
private SimulatedCAS value;
public int getValue() {
return value.get();
}
public int increment() {
int v;
// 以下3行为CAS的标准使用方式:
// 1. 使用do-while循环,在do中先获取oldValue值
// 2. 在while的判断中进行CAS操作,并将返回值与do语句块中获取的oldValue比较
// 3. 直到CAS成功才结束循环
do {
v = value.get();
} while (v != value.compareAndSwap(v, v + 1));
return v + 1;
}
}
非阻塞栈(基于 1 个CAS)
通过链表实现,链表头是栈顶元素,在进行 push 和 pop 操作时,判断栈顶元素是否发生了了改变,以此为依据判断该栈是否被修改过。因为栈被修改的话,变的只能是栈顶元素,所以我们只需要通过 CAS 维护一个栈顶元素即可。
public class ConcurrentStack <E> {
AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); // 栈顶元素
public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
private static class Node <E> {
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}
非阻塞链表队列(基于 2 个CAS)!!!
非阻塞链表队列的实现要比栈复杂很多,因为它的插入和删除节点的操作需要修改两个指针,也就是说需要两个CAS!
链表队列的插入操作需要更新以下两个指针:
- 当前最后一个元素的 next 指针
- 尾结点指针
在这两个操作中间,链表队列处在一种中间状态。
解决方法:
- 可以通过检查
tail.next
是否为空来判断队列当前的状态。tail.next
为空,链表队列处于稳定状态tail.next
不为空,链表队列处于中间状态
- 对于处于中间状态的链表队列,我们就进行
tail = tail.next
,提前结束其他线程正在进行的插入操作,提前使队列恢复稳定状态。
public class LinkedQueue <E> {
private static class Node <E> {
final E item;
final AtomicReference<LinkedQueue.Node<E>> next;
public Node(E item, LinkedQueue.Node<E> next) {
this.item = item;
this.next = new AtomicReference<LinkedQueue.Node<E>>(next);
}
}
private final LinkedQueue.Node<E> dummy = new LinkedQueue.Node<E>(null, null);
private final AtomicReference<LinkedQueue.Node<E>> head
= new AtomicReference<LinkedQueue.Node<E>>(dummy);
private final AtomicReference<LinkedQueue.Node<E>> tail
= new AtomicReference<LinkedQueue.Node<E>>(dummy);
public boolean put(E item) {
LinkedQueue.Node<E> newNode = new LinkedQueue.Node<E>(item, null);
while (true) {
LinkedQueue.Node<E> curTail = tail.get();
LinkedQueue.Node<E> tailNext = curTail.next.get();
if (curTail == tail.get()) {
if (tailNext != null) {
// 队列处于中间状态,推进尾结点
tail.compareAndSet(curTail, tailNext);
} else {
// 队列处于稳定状态,尝试插入新的节点
if (curTail.next.compareAndSet(null, newNode)) {
// 队列插入节点成功,尝试更新尾结点,注意:这时,尾结点有可能已经被其他节点更新好了
tail.compareAndSet(curTail, newNode);
return true;
}
}
}
}
}
}
ABA 问题
问题描述: V 处的值经历了 A -> B -> A
的变化后,也认为是发生了变化的,而传统的 CAS 是无法发现这种变化的。
解决方法:
- 使用
AtomicStampedReference
的int stamp
版本号判断数据是否被修改过 - 使用
AtomicMarkableReference
的boolean marked
判断数据是否被修改过
03-AQS框架
AbstractQueuedSynchronizer 框架 (AQS)
AQS 框架是用来构建锁或其他同步组件的基础框架,其核心思想为: 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,AQS 通过 CLH 队列实现了这种机制。 其实现原理为: 使用了一个 int 成员变量表示同步状态,然后通过内置的 FIFO 队列来完场资源获取线程的排队工作 。使用 AQS 能简单高效地构造出大量的同步器,如:
- ReentrantLock
- Semaphore
- CountDownLatch
- FutureTask
- ReentrantReadWriteLock
CLH (Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)实现锁分配的,并且这个队列遵循 FIFO 原则。
AQS 框架的方法
在构建同步器的过程中,我们主要依赖于一下几类操作:
- 状态更改操作:
protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)
- 获取和释放操作:
- 独占式:
public final void acquire(int arg)
public final boolean release(int arg)
- 共享式:
public final void acquireShared(int arg)
public final boolean releaseShared(int arg)
- 独占式:
- try 获取和释放操作:模板方法,extends AbstractQueuedSynchronizer 时需要按需修改的方法。
- 独占式:
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
- 共享式:
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
- 独占式:
- 判断同步器是否被当前线程独占:
protected boolean isHeldExclusively()
AQS 使用方法
-
在要构建的同步类中加一个私有静态内部类:
private class Sync extends AbstractQueuedSynchronizer
-
在子类中覆盖 AQS 的 try 前缀等方法,这样 Sync 将在执行获取和释放方法时,调用这些被子类覆盖了的 try 方法来判断某个操作是否能执行(模板方法模式,就是基于继承该类,然后根据需要重写模板方法)
-
一个 AQS 实现简单闭锁的示例:
public class OneShotLatch { private final Sync sync = new Sync(); public void signal() { sync.releaseShared(0); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(0); } private class Sync extends AbstractQueuedSynchronizer { protected int tryAcquireShared(int ignored) { // Succeed if latch is open (state == 1), else fail return (getState() == 1) ? 1 : -1; } protected boolean tryReleaseShared(int ignored) { setState(1); // Latch is now open return true; // Other threads may now be able to acquire } } }
Ch4-对比 HashMap & HashTable & TreeMap
基本区别
它们都是最常见的 Map 实现,是以键值对的形式存储数据的容器类型。
- HashTable
- 线程安全,不支持 null 作为键或值,它的线程安全是通过在所有方法 public 方法上加 synchronized 实现的,所以性能很差,很少使用。
- HashMap
- 不是线程安全的,但是支持 null 作为键或值,是绝大部分利用键值对存取场景的首选,put 和 get 基本可以达到常数级别的时间复杂度。
- TreeMap
- 基于红黑树的一种提供顺序访问的 Map,它的 get,put,remove 等操作是 O(log(n)) 级别的时间复杂度的(因为要保证顺序),具体的排序规则可以由 Comparator 指定:
public TreeMap(Comparator<? super K> comparator)
。
- 基于红黑树的一种提供顺序访问的 Map,它的 get,put,remove 等操作是 O(log(n)) 级别的时间复杂度的(因为要保证顺序),具体的排序规则可以由 Comparator 指定:
它们的简单类图如下:
在对 Map 的顺序没有要求的情况下,HashMap 基本是最好的选择,不过 HashMap 的性能十分依赖于 hashCode 的有效性,所以必须满足:
- equals 判断相等的对象的 hashCode 一定相等
- 重写了 hashCode 必须重写 equals
我们注意到,除了 TreeMap,LinkedHashMap 也可以保证某种顺序,它们的 区别 如下:
- LinkedHashMap:提供的遍历顺序符合插入顺序,是通过为 HashEntry 维护一个双向链表实现的。
- TreeMap:顺序由键的顺序决定,依赖于 Comparator。
HashMap 源码分析
HashMap 内部结构
HashMap 的内部结构如下:
解决哈希冲突的常用方法:
- 开放地址法:出现冲突时,以当前哈希值为基础,产生另一个哈希值。
- 再哈希法:同时构造多个不同的哈希函数,发生冲突就换一个哈希方法。
- 链地址法:将哈希地址相同的元素放在一个链表中,然后把这个链表的表头放在哈希表的对应位置。
- 建立公共溢出区:将哈希表分为基本表和溢出表两部分,凡是和基本表发生冲突的元素,一律填入溢出表。
HashMap 采用的是链表地址法,不过如果由一个位置的链表比较长了(超过阈值 8 了),链表会被改造为树形结构以提高查找性能。
这个桶数组并没有在 HashMap 的构造函数中初始化好,只是设置了容量(默认初始容量为 16),应该是采用了 lazy-load 原则。
public HashMap(int initialCapacity, float loadFactor){
// ...
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
接下来,我们看一下 put 方法:
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
可以看到,put 方法调用了 putVal 方法:
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// lazy-load,tab要是空的,用resize初始化它
// resize既要负责初始化,又要负责在容量不够时扩容
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 无哈希冲突,直接new一个节点放到tab[i]就行
// 具体键值对在哈希表中的位置:i = (n - 1) & hash
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
// 该key存在,直接修改value就行
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 当前hashCode下面挂的已经是颗树了,用树的插入方式插入新节点
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
// 当前hashCode下面挂的还是个链表,不过保不齐会变成颗树
else {
// ...
if (binCount >= TREEIFY_THRESHOLD - 1) // 链表要变树啦!
treeifyBin(tab, hash);
// ...
}
}
++modCount;
if (++size > threshold) // 容量不够了,扩容
resize();
}
分析:
-
key 的 hashCode 用的并不是 key 自己的 hashCode,而是通过 HashMap 内部的一个 hash 方法另算的。那么为什么要另算一个 hashCode 呢?这是因为: 有些数据计算出的哈希值差异主要在高位,而 HashMap 里的哈希寻址是忽略容量以上的高位的,这种处理可以有效避免这种情况下的哈希碰撞。
static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }
-
resize 方法: (重点!这个方法和以前写的有点不一样了……)现在的写法不会出现链表扩容时发生死循环了,以前的写法相当于将 oldTab 上的 Node 一个一个卸下来,然用头插法的方式插入到 newTab 的对应位置,因为用的是头插法,会给链表倒序,这种倒序导致了在多线程时,链表的两个 Node 的 next 可能会互相指向对方,出现死循环(详见此文)。现在的方法是使用尾插法,即不会改变链表原来在 oldTab 挂着的时候的相对顺序,在
oldTab[j]
处的链表会根据 hash 值分成 lo 和 hi 两个链表,然后分别挂在 newTab 的newTab[j]
和newTab[j + oldCap]
两个不同的位置。final Node<K,V>[] resize() { Node<K,V>[] oldTab = table; // oldTab 的长度,一定是 2 的幂,也就是说,二进制只有一位为 1 int oldCap = (oldTab == null) ? 0 : oldTab.length; int oldThr = threshold; int newCap, newThr = 0; if (oldCap > 0) { // MAXIMUM_CAPACITY = 1 << 30,如果超过这个容量就扩不了容了 if (oldCap >= MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return oldTab; } // newCap = oldCap << 1,容量变成原来的 2 倍 else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY) newThr = oldThr << 1; // double threshold } else if (oldThr > 0) // initial capacity was placed in threshold newCap = oldThr; else { // zero initial threshold signifies using defaults newCap = DEFAULT_INITIAL_CAPACITY; newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); } if (newThr == 0) { float ft = (float)newCap * loadFactor; newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE); } threshold = newThr; @SuppressWarnings({"rawtypes","unchecked"}) Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; table = newTab; // 把 oldTab 中的数据移到 newTab 中,这里是要进行 rehash 的 if (oldTab != null) { for (int j = 0; j < oldCap; ++j) { Node<K,V> e; if ((e = oldTab[j]) != null) { // 把 oldTab 中的非 null 元素放到 newTab 去 oldTab[j] = null; // 把链表从 oldTab[j] 上取下来 if (e.next == null) // oldTab[j] 处只有一个元素 newTab[e.hash & (newCap - 1)] = e; else if (e instanceof TreeNode) // oldTab[j] 处是一颗树 ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); else { // oldTab[j] 处是一个长度不超过 8 链表 Node<K,V> loHead = null, loTail = null; Node<K,V> hiHead = null, hiTail = null; Node<K,V> next; do { next = e.next; /* 重点!!! 下面将根据 (e.hash & oldCap) == 0 将原来 oldTab[j] 处的链表分成 lo 和 hi 两个链表,为什么要这么分呢? 因为挂在 oldTab[j] 处的节点都是 hash % oldCap == j 的,但是现在, hash % newCap 的结果有了以下两种可能: - hash % newCap == j; - hash % newCap == j + oldCap。 如何区分这两种情况呢?就是通过 (e.hash & oldCap) == 0 来区分的, - 如果 (e.hash & oldCap) == 0,为 hash % newCap == j; - 如果 (e.hash & oldCap) != 0,为 hash % newCap == j + oldCap。 */ if ((e.hash & oldCap) == 0) { if (loTail == null) // 第一次执行 do-while 循环 loHead = e; // 用 loHead 记录 oldTab[j] 处链表的第一个 Node else // 非第一次执行 do-while 循环 loTail.next = e; // 把当前节点 e 挂到 lo 链表上 loTail = e; // 移动 lo 链表的尾结点指针到当前节点 e } else { // hi 链表的处理方式和上面的 lo 链表一样 if (hiTail == null) hiHead = e; else hiTail.next = e; hiTail = e; } } while ((e = next) != null); if (loTail != null) { // 如果 lo 链表不为空 loTail.next = null; newTab[j] = loHead; // 把 lo 链表挂到 newTab[j] 上 } if (hiTail != null) { // 如果 hi 链表不为空 hiTail.next = null; newTab[j + oldCap] = hiHead; // 把 hi 链表挂到 newTab[j + oldCap] 上 } } } } } return newTab; }
容量、负载因子和树化
容量和负载因子决定了桶数组中的桶数量,如果桶太多了会浪费空间,但桶太少又会影响性能。我们要保证:
负载因子 * 容量 > 元素数量 && 容量要是 2 的倍数
对于负载因子:
- 如果没有特别需求,不要轻易更改;
- 如果需要调整,不要超过 0.75,否则会显著增加冲突;
- 如果使用太小的负载因子,也要同时调整容量,否则可能会频繁扩容,影响性能。
那么为什么要树化呢?
这本质上是一个安全问题,我们知道如果同一个哈希值对应位置的链表太长,会极大的影响性能,而在现实世界中,构造哈希冲突的数据并不是十分复杂的事情,恶意代码可以利用这些数据与服务端进行交互,会导致服务端 CPU 大量占用,形成哈希碰撞拒绝服务攻击。
Ch5-活跃性与性能
00-避免活跃性危险
死锁
产生的原因: 每个线程拥有其他线程需要的资源,同时又等待其他线程已经拥有的资源,并且所有线程在获得所有需要的资源之前都不会放弃已经拥有的资源。
检查死锁的技巧:
- 不同的线程会用到两个相同的锁;
- 并且不同线程获取锁的顺序不同。
锁顺序死锁(最常见)
产生原因: 两个线程试图以不同的顺序来获得相同的两个及以上的锁。
静态顺序死锁:
private final Object left = new Object();
private final Object right = new Object();
new Thread() {
public void run() {
synchronized (left) {
synchronized (right) {
doSomething();
}
}
}
}.start();
new Thread() {
public void run() {
synchronized (right) {
synchronized (left) {
doSomething();
}
}
}
}.start();
动态顺序死锁:
public static void transferMoney(Account fromAccount,
Account toAccount,
double money) {
synchronized (fromAccount) {
synchronized (toAccount) {
// 转账
}
}
}
Account account1 = new Account();
Account account2 = new Account();
new Thread() {
public void run() {
transferMoney(account1, account2, 100.0);
}
}.start();
new Thread() {
public void run() {
transferMoney(account2, account1, 100.0);
}
}.start();
资源死锁
- 两个线程持有对方需要的资源并不放开自己拿到的资源。
- 例子:单线程 Executor,等待的资源在工作队列中。
避免死锁的方法
- 避免一个线程同时获取多个锁;
- 避免一个线程在锁内占有多个资源,尽量保证每个锁只占有一个资源;
- 使用定时锁,即
lock.tryLock(timeout)
,这样拿不到锁就放弃,不会发生死锁一直卡在那里; - 对于数据库锁,加锁和解锁必须在同一个数据库连接中,否则可能会解锁失败。
饥饿
- 线程由于无法访问它所需要的资源而不能继续执行。
- 最常见资源就是 CPU 时钟周期 。如果在 Java 应用程序中对线程的优先级使用不当,或者在持有锁时执行一些无法结束的结构(例如无限循环,或者无限制地等待某个资源),那么也可能导致饥饿,因为其他需要这个锁的线程将无法得到它。
- 尽量不要改变线程的优先级 。只要改变了线程的优先级,程序的行为就将与平台相关,并且会导致发生饥饿问题的风险。你经常能发现某个程序会在一些奇怪的地方调用 Thread.sleep 或 Thread.yield,这是因为该程序试图克服优先级调整问题或响应性问题,试图让低优先级的线程执行更多的时间。
活锁
- 当多个相互协作的线程都对彼此进行响应从而修改各自的状态,并使得任何一个线程都无法继续执行时,就发生了活锁。这就像两个过于礼貌的人在半路上面对面地相遇:他们彼此都让出对方的路,然而又在另一条路上相遇了,因此他们就这样反复地避让下去。
- 解决方法: 在重试机制中引入随机性。
资源限制的挑战
什么是资源限制?
资源限制指在进行并发编程时,程序的执行速度受限于计算机硬件资源。比如服务器的带宽只有 2 MB/s,某个资源的下载速度是 1 MB/s,那么开 10 个线程下载,下载速度也不会变成 10 MB/s。
资源限制会引发的问题?
并发编程中,将代码执行速度加快的原则是将代码串行执行的部分变成并发执行,但是如果本该并发执行的部分,由于资源限制的原因,仍然在串行执行,此时,由于多个线程间竞争的缘故,增加了上下文切换次数以及资源调度的时间,反而会比串行执行的还慢。
如何解决资源限制问题?
对于硬件资源限制,可以使用集群并行执行程序,让程序在多台机器上运行,让不同的机器处理不同的数据。例如:可以通过 “数据 ID % 机器数”,计算出该数据应该在编号为多少的机器上运行。
对于软件资源限制,可以使用资源池将资源复用。例如:使用连接池将数据库和 Socket 连接复用。
01-Java程序的性能评估
应用程序性能评估
- 运行速度:指定的任务单元需要多久才能处理完。
- 响应性:请求从发出到完成的时间(时延)
- 处理能力:在计算资源一定的情况下,能完成多少工作。
- 生产量:一定时间能处理多少数据
- 吞吐量:一组任务中已完成任务所占的比例
- 可伸缩性:增加资源(通常是CPU)后吞吐量的提升
Amdahl 定律
Amdahl 定律描述: 在增加计算资源的情况下,程序在理论上能实现的最高加速比。用于分析多处理器下的性能。
任务中的串行部分:
- 从任务队列中取任务
- 处理多个线程的运算结果
加速比上界公式:
公式解释:
如果程序中完全不存在串行部分 F,那么加速比最大为 N,因为存在串行部分,所以要用 1 除以 串行部分加上可以用 N 个处理器并行执行的并行部分:(1 - F) / N。
线程引入的开销
上下文切换
- 产生原因: 可运行的线程数大于CPU的数量,那么操作系统最终会将某个正在运行的线程调度出来,从而使其他线程能够使用CPU。
- 切换的过程: 保存当前运行线程的执行上下文,并将新调度进来的线程的执行上下文设置为当前上下文。当任务在运行和阻塞这两个状态之间转换时,就相当于一次上下文切换。
- 判断方式: 如果内核占用率较高(超过10%),那么通常表示调度活动发生得很频繁,这很可能是由 I/O 或竞争锁导致的阻塞引起的。
- 如何减少上下文切换?
- 无锁并发编程,如将数据 ID 按照 Hash 算法取模分段,不同线程处理不同段数据;
- CAS 非阻塞同步机制;
- 避免创建不需要的线程;
- 协程:在单线程中实现多线程的调度,并在单线程中维持多个任务间的切换。
阻塞(锁竞争失败)
JVM 在实现阻塞操作时有两种方法
- 自旋等待(循环不断尝试获取锁直到成功): 适合等待时间短,不会发生上下文切换。
- 操作系统挂起被阻塞线程:适合等待时间长。
- 会发生两次上下文切换
- 交换出去一次
- 切换回来一次
- 会发生必要的操作系统操作和缓存操作
- 会发生两次上下文切换
减少锁的竞争
锁竞争会导致:串行操作会降低可伸缩性,并且会发生两次上下文切换会降低性能。
三种降低锁竞争程度的方式
减少锁的持有时间
public class BetterAttributeStore {
@GuardedBy("this") private final Map<String, String>
attributes = new HashMap<String, String>();
public boolean userLocationMatches(String name, String regexp) {
String key = "users." + name + ".location";
String location;
synchronized (this) { // 把本来加在方法上的synchronized的范围缩小
location = attributes.get(key);
}
if (location == null)
return false;
else
return Pattern.matches(regexp, location);
}
}
降低锁的请求频率:减小锁的粒度,即锁分解和锁分段!!!
即把一个锁拆分成多个锁,但是会增大发生死锁的危险。
锁分解: 不同数据使用不同的锁。
public class ServerStatusAfterSplit {
// 以下两个成员用不同的锁保护
@GuardedBy("users") public final Set<String> users;
@GuardedBy("queries") public final Set<String> queries;
public ServerStatusAfterSplit() {
users = new HashSet<String>();
queries = new HashSet<String>();
}
public void addUser(String u) {
synchronized (users) {
users.add(u);
}
}
public void addQuery(String q) {
synchronized (queries) {
queries.add(q);
}
}
}
锁分段: 不同线程在同一数据的不同部分上不会互相干扰,常用于集合。不过会导致实现集合的独占访问困难且开销高。ConcurrentHashMap
就是通过锁分段,使用 16 个锁保护数据的不同部分,达到最大可以支持 16 个线程并发写入的操作的。
public class StripedMap {
// 同步策略:buckets[n]由locks[n % N_LOCKS]保护
private static final int N_LOCKS = 16;
private final Node[] buckets;
private final Object[] locks; // N_LOCKS个锁
private static class Node {
Node next;
Object key;
Object value;
}
public StripedMap(int numBuckets) {
buckets = new Node[numBuckets];
locks = new Object[N_LOCKS];
for (int i = 0; i < N_LOCKS; i++)
locks[i] = new Object();
}
private final int hash(Object key) {
return Math.abs(key.hashCode() % buckets.length);
}
public Object get(Object key) {
int hash = hash(key);
synchronized (locks[hash % N_LOCKS]) { // 分段加锁
for (Node m = buckets[hash]; m != null; m = m.next)
if (m.key.equals(key))
return m.value;
}
return null;
}
public void clear() {
for (int i = 0; i < buckets.length; i++) {
synchronized (locks[i % N_LOCKS]) { // 分段加锁
buckets[i] = null;
}
}
}
}
使用有协调机制的独占锁,允许更高的并发性
- 并发容器
- 读写锁 ReadWriteLock:多个读取操作单个写入操作情况下的加锁规则
- 不可变对象
- 原子变量
监测CPU的利用率
CPU 没有得到充分利用的原因
- 负载不充分: 测试程序没有足够多的负载
- I/O 密集:花费了大量的时间等待 I/O 或网络通信的结果,CPU 利用率自然不高
- 判断磁盘 I/O 操作是否密集
- 监测网络的通信流量级别
- 外部限制: 使用分析工具或数据库管理工具判断等待外部服务的结果需要多少时间
- 锁竞争:
- 使用分析工具分析程序的锁竞争程度,以及是哪些锁存在激烈竞争
- 随机取样,触发线程缓存,在其中查找发生锁竞争的线程
如何提高程序的可伸缩性
- vmstat 输出:查看当前处于可运行状态,但由于没有足够的 CPU 数而没有运行的线程数
- 如果 CPU 的利用率很高,在等待的线程也很多,这时增加 CPU 数会提高程序的性能
应用示例:减小日志方法的上下文切换开销
在服务器应用程序中,发生阻塞的原因之一就是在处理请求时产生各种日志消息。减少日志的上下文切换次数可以提高吞吐量。
方法:
- 调用 log 方法的线程将不会因为等待输出流的锁或者 I/O 完成而被阻塞,它们只将消息放入队列,然后就返回到各自的任务中。即通过将 I/O 操作从处理请求的线程中分离出来,缩短处理请求的平均服务时间。
- 虽然在消息队列上可能会发生竞争,但 put 操作相对于记录日志的 I/O 操作是一种更为轻量级的操作。
- 然后将 I/O 操作移到了另一个用户感知不到开销的线程上,即有一个消费者线程不断的在后台从消息队列中取任务,自己在后台慢慢做。
比喻:
- 类似于两种不同救火方案之间的差异:
- 第一种方案是所有人排成一队,一个人按顺序浇一桶水灭火;
- 第二种方案是每个人都拿着一个水桶去救火;
- 第二种方法看起来很好,但是其在水源和着火点存在更大的竞争,效率其实更低。好比中断会干扰人们的工作并降低效率,阻塞和上下文切换同样会干扰线程的正常执行。
02-并发程序测试的陷阱
并发程序测试的陷阱
- 垃圾回收
- 垃圾回收操作不可预测
- 解决方法:
- 测试运行时,GC一次都不执行
- 测试运行时,GC多次运行,需要足够长的时间(几分钟)[更好]
- 动态编译
- JVM 是解释执行和编译执行相结合的。某个类第一次执行时,JVM 会通过解释字节码的方式执行它,在某个时刻,如果一个方法运行的次数足够多,动态编译器会将它编译为机器码,这样代码便由解释执行变为了编译执行(因为编译执行更快)。除此之外,代码还有可能被反编译和重新编译,并且编译的过程也是要消耗时间的。
- 解决方法:
- 运行足够长的时间,使得编译时间和解释执行的时间可以忽略
- 预先运行一段时间代码再开始测试,保证在测试前,代码已被完全编译
- 运行程序时使用命令行选项:-xx: +PrintCompilation,会在动态编译运行时输出一条信息,可以通过这条消息验证动态编译是在测试运行前执行的
- 无用代码消除
- 在 -server 模式下会比 -client 模式下运行的更好,因为 -server 模式更易于通过优化消除无用代码,如果我们要测试的代码被当成无用代码被消除了,测试就没有意义了
- 解决方法:计算某个对象中域的散列值,并将它与
System.nanoTime()
进行比较if (foo.x.hashCode() == System.nanoTime()) System.out.print(" ");
- 对代码路径的不真实采样
- 运行时,编译器会根据收集到的信息对已编译的代码进行优化,意味着在编译某个程序的方法M时生成的代码,可能与编译另一个程序中的方法M生成的代码不同,从而产生测试的差距。
- 不真实的竞争程度
如何检查一个多生产者多消费者模式的正确性
- 维护两个
AtomicInteger
:putSum
和takeSum
- 把数放入队列,并加到
putSum
- 把数从队列中取出,并加到
takeSum
- 检查:
putSum == takeSum
- 通过
CyclicBarrier
保证线程的并发
public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final SemaphoreBoundedBuffer<Integer> bb;
protected final int nTrials, nPairs;
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); // sample parameters
pool.shutdown();
}
public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
// 使用CyclicBarrier保证生产者和消费者线程的并发执行
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}
void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await(); // 阻塞主线程,等待所有线程就绪
barrier.await(); // 等待所有线程执行完成,
// 即生产者和消费者线程中的第二个await执行完
// 相当于循环使用了两次CyclicBarrier
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// 测试用的随机数生成代码
// 使用方法:通过前一个seed生成后一个seed
// int seed = (this.hashCode() ^ (int) System.nanoTime());
// seed = xorShift(seed);
static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
class Producer implements Runnable {
public void run() {
try {
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
barrier.await(); // 通知主线程生产者线程开始执行准备就绪
for (int i = nTrials; i > 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await(); // 通知主线程生产者线程执行结束
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
public void run() {
try {
barrier.await(); // 通知主线程消费者线程开始执行准备就绪
int sum = 0;
for (int i = nTrials; i > 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await(); // 通知主线程消费者线程执行结束
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
阻塞队列性能比较
LinkedBlockingQueue
的可伸缩性要优于ArrayBlockingQueue
。- 违背常理:因为链表元素在插入元素时要分配一个链表节点对象,理论上应该更费时间才对。
- 原因:与基于数组的队列相比,链表队列的 put 和 take 方法支持更高的访问,因为一些优化后的链表队列能将队列的头节点的更新操作和尾节点的更新操作分离开来。