并发编程基础

1. 基础概念

在正式学习 Java 的并发编程之前,还有几个并发编程的基础概念我们需要熟悉和学习。

进程和线程

进程

我们常听说的是应用程序,也就是 app,由指令和数据组成。但是当我们不运行一个具体的 app 时,这些应用程序就是放在磁盘(也包括 U 盘、远程网络存储等等)上的一些二进制的代码。一旦我们运行这些应用程序,指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备,从这种角度来说,进程就是用来加载指令、管理内存、管理 IO 的。

当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。

进程可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器 等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)。显然,程序是死的、静态的,进程是活的、动态的。进程可以分为系统进程和用户进程。凡是用于完成操作系统的各种功能的进程就是系统进程,它们就是处于运行状态下的操作系统本身,用户进程就是所有由你启动的进程。

站在操作系统的角度,进程是程序运行资源分配(以内存为主)的最小单位

线程

一个机器中肯定会运行很多的程序,CPU 又是有限的,怎么让有限的 CPU 运行这么多程序呢?就需要一种机制在程序之间进行协调,也就所谓 CPU 调度。线程则是 CPU 调度的最小单位

线程必须依赖于进程而存在,线程是进程中的一个实体,是 CPU 调度和分派的基本单位,它是比进程更小的、能独立运行的基本单位。线程自己基本上不拥有系统资源,只有在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他线程共享进程所拥有的全部资源。一个进程可以拥有多个线程,一个线程必须有一个父进程。线程,有时也被称为轻量级进程(Lightweight Process,LWP),早期 Linux 的线程实现几乎就是复用的进程,后来才独立出自己的 API。

Java 线程的无处不在

Java 中不管任何程序都必须启动一个 main 函数的主线程;Java Web 开发里定时任务、定时器、JSP 和 Servlet、异步消息处理机制、远程访问接口 RM 等,任何一个监听事件,onclick 的触发事件等都离不开线程和并发的知识。

什么是进程间的通信?
同一台计算机的进程通信称为 IPC(Inter-process communication),不同计算机之间的进程通信被称为 RPC(Remote Procedure Call),需要通过网络,并遵守共同的协议,比如大家熟悉的 Dubbo 就是一个 RPC 框架,而 Http 协议也经常用在 RPC 上,比如 SpringCloud 微服务。
进程间通信有几种方式?

  1. 管道,分为匿名管道(pipe)及命名管道(named pipe):匿名管道可用于具有亲缘关系的父子进程间的通信,命名管道除了具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。
  2. 信号(signal):信号是在软件层次上对中断机制的一种模拟,它是比较复杂的通信方式,用于通知进程有某事件发生,一个进程收到一个信号与处理器收到一个中断请求效果上可以说是一致的。
  3. 消息队列(message queue):消息队列是消息的链接表,它克服了上两种通信方式中信号量有限的缺点,具有写权限得进程可以按照一定得规则向消息 队列中添加新信息;对消息队列有读权限得进程则可以从消息队列中读取信息。
  4. 共享内存(shared memory):可以说这是最有用的进程间通信方式。它使得多个进程可以访问同一块内存空间,不同进程可以及时看到对方进程中对共享内存中数据得更新。这种方式需要依靠某种同步操作,如互斥锁和信号量等。
  5. 信号量(semaphore):主要作为进程之间及同一种进程的不同线程之间得同步和互斥手段。
  6. 套接字(socket):这是一种更为一般得进程间通信机制,它可用于网络中不同机器之间的进程间通信,应用非常广泛。同一机器中的进程还可以使用Unix domain socket(比如同一机器中 MySQL 中的控制台 mysql shell 和 MySQL 服 务程序的连接),这种方式不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,比纯粹基于网络的进程间通信肯定效率更高。

CPU 核心数和线程数的关系

前面说过,目前主流 CPU 都是多核的,线程是 CPU 调度的最小单位。同一时刻,一个 CPU 核心只能运行一个线程,也就是 CPU 内核和同时运行的线程数是 1:1 的关系,也就是说 8 核 CPU 同时可以执行 8 个线程的代码。但 Intel 引入超线程技术后,产生了逻辑处理器的概念,使核心数与线程数形成 1:2 的关系。6核12线程中12线程就是逻辑处理器数。

在 Java 中可以通过 Runtime.getRuntime().availableProcessors() 获取当前的 CPU 核心数,注意这个核心数指的是逻辑处理器数。

上下文切换(Context Switch)

上下文切换是指 CPU 在多个进程(或线程)之间切换时,必须保存当前线程的状态(即上下文),并加载下一个线程的状态。

  • 上下文切换的步骤
  1. 暂停当前进程,并保存其 CPU 状态(即上下文)到内存中。
  2. 从内存中加载下一个进程的上下文,并恢复到 CPU 的寄存器中。
  3. 恢复程序计数器指示的位置,以便线程从被中断的地方继续执行。
  • 上下文切换的成本
    上下文切换是计算密集型的,需要拷贝数据到寄存器和缓存中。一次上下文切换大约需要 5000~20000 个时钟周期,相对于简单的指令执行,这个成本非常高。

并行和并发

  • 并发(Concurrent)

并发是指在同一时间段内,应用可以交替执行不同的任务。即便在单核 CPU 上,多个线程之间也通过快速切换模拟了“并行”执行的效果。

  • 并行(Parallel)

并行是指在同一时间点内,应用可以同时执行多个任务。例如,吃饭时可以边吃饭边看视频,这两件事情可以同时进行。

2. 认识 Java 里的线程

Java 程序天生就是多线程的

一个 Java 程序从 main() 方法开始执行,然后按照既定的代码逻辑执行,看似没有其他线程参与,但实际上 Java 程序天生就是多线程程序,因为执行 main() 方法的是一个名称为 main 的线程。

而一个 Java 程序的运行即使没有用户自己开启的线程,实际也有很多 JVM 自行启动的线程,一般来说有:

  • [6] Monitor Ctrl-Break:监控 Ctrl-Break 中断信号的线程
  • [5] Attach Listener:用于内存 dump、线程 dump、类信息统计、获取系统属性等的线程
  • [4] Signal Dispatcher:分发处理发送给 JVM 信号的线程
  • [3] Finalizer:调用对象 finalize 方法的线程
  • [2] Reference Handler:清除 Reference 的线程
  • [1] main:用户程序入口线程(main 线程)

尽管这些线程根据不同的 JDK 版本会有所差异,但依然证明了 Java 程序天生就是多线程的。

线程的启动与中止

启动线程

启动线程的方式有:

  1. X extends Thread,然后 X.start() 启动
  2. X implements Runnable,然后交给 Thread 运行
详细代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NewThread {
/*扩展自Thread类*/
private static class UseThread extends Thread{
@Override
public void run() {
super.run();
SleepTools.second(1);
// do my work;
System.out.println("I am extendec Thread");
}
}

/*实现Runnable接口*/
private static class UseRunnable implements Runnable{

@Override
public void run() {
// do my work;
System.out.println("I am implements Runnable");
}
}

public static void main(String[] args)
throws InterruptedException, ExecutionException {
UseThread useThread = new UseThread();
useThread.start();
//useThread.start();start方法只能调用一次,否则会抛出异常
//useThread.run();仅仅是调用对象的run方法,而不会启动新线程,直接在main线程中执行

UseRunnable useRunnable = new UseRunnable();
new Thread(useRunnable).start();
System.out.println("main end");

}
}

Thread 和 Runnable 的区别

  • Thread 是 Java 中对线程的唯一抽象。
  • Runnable 只是对任务(业务逻辑)的抽象。
  • Thread 可以接受任意一个 Runnable 的实例并执行。

Callable、Future 和 FutureTask

Runnable 是一个接口,在它里面只声明了一个 run() 方法,由于 run() 方法返回值为 void 类型,因此执行完任务之后无法返回任何结果。

Callable 位于 java.util.concurrent 包下,它是一个接口,声明了 call() 方法,返回的结果类型是泛型 V
Future 用于控制具体的 RunnableCallable 任务的执行结果,支持取消、查询是否完成、获取结果。

  • get() 方法阻塞直到任务返回结果。
    FutureTask 实现了 RunnableFuture 接口,RunnableFuture 继承了 RunnableFuture 接口。
  • FutureTask 可以作为 Runnable 被线程执行,也可以作为 Future 得到 Callable 的返回值。
  • Thread 不支持构造方法中传递Callable 的实例,需要用 FutureTask 包装 Callable,然后交给 Thread 执行,以获取执行结果。

callable

新启线程有几种方式?

在 Java 中有两种方式创建线程:

  1. 通过继承 Thread 类;
  2. 通过实现 Runnable 接口。

本质上 Java 中实现线程只有一种方式,都是通过 new Thread() 创建线程对象,调用 Thread#start() 启动线程。

  • 基于 Callable 接口的方式,最终会通过 FutureTask 包装成 Runnable,然后交给 Thread 执行,因此可以视为与实现 Runnable 接口的方式相同。
  • 线程池技术本质上是资源的复用,与新启线程的方式无关。

中止线程

线程自然终止

线程自然终止的方式:

  • run 执行完成了。
  • 线程抛出了一个未处理的异常导致线程提前结束。

stop()suspend() 方法

suspend()resume()stop() 方法是过期的方法,不建议使用。

  • suspend() 会导致线程占有的资源(如锁)无法释放,容易引发死锁。
  • stop() 方法在终结一个线程时不会保证资源的正常释放,可能导致程序进入不确定状态。

安全的中止方法

  • 通过其他线程调用 interrupt() 方法对目标线程进行中断操作。
  • 中断并不意味着立即停止线程,线程会通过检查自身的中断标志位来响应中断请求。
  • 线程可以使用 isInterrupted() 方法判断是否被中断,也可以通过 Thread.interrupted() 检查当前线程是否被中断,Thread.interrupted() 会同时将中断标志位清除为 false
  • 如果线程处于阻塞状态(如调用了 sleepjoinwait 等),在中断标志为 true 时,会抛出 InterruptedException 异常,并清除中断标志。

不建议自定义取消标志位来中止线程的运行,因为在调用阻塞方法时可能无法及时检测到取消标志,使用中断机制更加高效。

注意:处于死锁状态的线程无法被中断。

深入理解 run()start()

  • Thread 类是 Java 里对线程概念的抽象。通过 new Thread() 创建的是 Thread 的实例,但并没有真正的操作系统线程。
  • 只有调用 start() 方法后,操作系统才会挂起该线程并真正启动。
  • start() 方法会将线程放入就绪队列,等待 CPU 分配,分配到 CPU 后才会执行实现的 run() 方法。且start() 方法不能重复调用,若重复调用会抛出异常。
  • run() 方法是线程的业务逻辑实现,实际上与任何类的成员方法没有区别。run() 方法可以被单独调用,但这并不会启动新的线程,run() 只是作为普通方法执行。

3. 深入学习 Java 的线程

线程的状态/生命周期

Java 中线程的状态分为 6 种:

  1. 初始(NEW):新创建了一个线程对象,但还没有调用 start() 方法。
  2. 运行(RUNNABLE):Java 线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
    • 线程对象创建后,其他线程(比如 main 线程)调用了该对象的 start() 方法。
    • 该状态的线程位于可运行线程池中,等待被线程调度选中,获取 CPU 的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得 CPU 时间片后变为运行中状态(running)。
  3. 阻塞(BLOCKED):表示线程阻塞于锁。只有synchronized关键字修饰的方法或代码块会导致阻塞。
  4. 等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
  5. 超时等待(TIMED_WAITING):该状态不同于 WAITING,它可以在指定的时间后自行返回。
  6. 终止(TERMINATED):表示该线程已经执行完毕。

状态之间的变迁如下图所示:
线程状态

掌握这些状态可以让我们在进行 Java 程序调优时提供很大的帮助。

其他的线程相关方法

  • yield() 方法:使当前线程让出 CPU 占有权,但让出的时间是不可设定的。不会释放锁资源。执行 yield() 的线程可能在进入到就绪状态后再次被操作系统选中执行。
    比如,ConcurrentHashMap#initTable 方法中就使用了这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)

Thread.yield(); // lost initialization race; just spin
// 线程安全的HashMap初始化时不分配内存,只有在第一个put操作时才会分配内存
// 所以初始化操作将很快,如果同时有多个线程初始化,为了避免阻塞或者等待这些操作
// 引发的上下文切换等等开销,使用yield方法保证只有一个线程初始化

else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
  • wait() / notify() / notifyAll()

线程的优先级

在 Java 线程中,通过一个整型成员变量 priority 来控制优先级,优先级的范围从 1~10。线程构建时可以通过 setPriority(int) 方法修改优先级,默认优先级是 5。

优先级高的线程分配时间片的数量要多于优先级低的线程。设置线程优先级时,针对频繁阻塞(休眠或者 I/O 操作)的线程需要设置较高优先级,而偏重计算(需要较多 CPU 时间或偏运算)的线程则设置较低的优先级,确保处理器不会被独占。

线程的调度

线程调度是指系统为线程分配 CPU 使用权的过程,主要调度方式有两种:

  • 协同式线程调度 (Cooperative Threads-Scheduling):线程执行的时间由线程本身来控制,线程主动通知系统切换。一个线程结束后,系统才会调度下一个线程。没有线程同步问题。
  • 抢占式线程调度 (Preemptive Threads-Scheduling):线程的执行时间及是否切换由系统决定。Java 使用抢占式调度。

Java 中,Thread.yield() 可以让出 CPU 执行时间,但线程获取执行时间的控制是有限的。线程唯一可以使用的手段是设置优先级。

线程和协程

为什么 Java 线程调度是抢占式调度?

语言层面的线程实现模式主要有三种方式:

  1. 内核线程实现 (1:1 实现):线程由操作系统内核直接支持。
  2. 用户线程实现 (1:N 实现):线程由用户空间中的线程库实现。
  3. 用户线程加轻量级进程 (N:M 实现):结合内核线程和用户线程的优点。
详细说明

内核线程实现
使用内核线程实现的方式也被称为 1: 1 实现。 内核线程(Kernel-LevelThread, KLT) 就是直接由操作系统内核(Kernel, 下称内核) 支持的线程,这种线程由内核来完成线程切换, 内核通过操纵调度器(Scheduler) 对线程进行调度, 并负责将线程的任务映射到各个处理器上。由于内核线程的支持,每个线程都成为一个独立的调度单元,即使其中某一个在系统调用中被阻塞了,也不会影响整个进程继续工作,相关的调度工作也不需要额外考虑,已经由操作系统处理了。局限性:首先,由于是基于内核线程实现的,所以各种线程操作,如创建、析构及同步,都需要进行系统调用。而系统调用的代价相对较高, 需要在用户态(User Mode)和内核态(Kernel Mode)中来回切换。其次,每个语言层面的线程都需要有一个内核线程的支持,因此要消耗一定的内核资源(如内核线程的栈空间),因此一个系统支持的线程数量是有限的。
用户线程实现
严格意义上的用户线程指的是完全建立在用户空间的线程库上,系统内核不能感知到用户线程的存在及如何实现的。用户线程的建立、同步、销毁和调度完全在用户态中完成,不需要内核的帮助。 如果程序实现得当, 这种线程不需要切换到内核态, 因此操作可以是非常快速且低消耗的, 也能够支持规模更大的线程数量, 部分高性能数据库中的多线程就是由用户线程实现的。用户线程的优势在于不需要系统内核支援,劣势也在于没有系统内核的支援,所有的线程操作都需要由用户程序自己去处理。线程的创建、销毁、切换和调度都是用户必须考虑的问题,而且由于操作系统只把处理器资源分配到进程,那诸如“阻塞如何处理”“多处理器系统中如何将线程映射到其他处理器上”这类问题解决起来将会异常困难, 甚至有些是不可能实现的。 因为使用用户线程实现的程序通常都比较复杂,所以一般的应用程序都不倾向使用用户线程。Java 语言曾经使用过用户线程,最终又放弃了。 但是近年来许多新的、以高并发为卖点的编程语言又普遍支持了用户线程,譬如 Golang。
混合实现
线程除了依赖内核线程实现和完全由用户程序自己实现之外, 还有一种将内核线程与用户线程一起使用的实现方式, 被称为 N:M 实现。 在这种混合实现下, 既存在用户线程, 也存在内核线程。用户线程还是完全建立在用户空间中, 因此用户线程的创建、 切换、 析构等操作依然廉价, 并且可以支持大规模的用户线程并发。同样又可以使用内核提供的线程调度功能及处理器映射,并且用户线程的系统调用要通过内核线程来完成。在这种混合模式中, 用户线程与轻量级进程的数量比是不定的,是 N:M 的关系。

Java 线程的实现

Java 早期使用用户线程模型(1:N),但从 JDK 1.3 起,主流 Java 虚拟机采用内核线程(1:1)模型。

在 HotSpot JVM 中,每个 Java 线程都直接映射到操作系统的原生线程,由操作系统调度。Java 中的线程调度就是抢占式调度。线程优先级通过操作系统的原生线程实现,所以最终的调度由操作系统决定。

详细说明

HotSpot 的每一个 Java 线程都是直接映射到一个操作系统原生线程来实现的,而且中间没有额外的间接结构, 所以 HotSpot 自己是不会去干涉线程调度的,全权交给底下的操作系统去处理。所以,这就是我们说 Java 线程调度是抢占式调度的原因。而且 Java 中的线程优先级是通过映射到操作系统的原生线程上实现的,所以线程的调度最终取决于操作系统,操作系统中线程的优先级有时并不能和 Java 中的一一对应,所以Java 优先级并不是特别靠谱。

协程

协程(Coroutine)是一种用户级线程,它具有轻量级的特点,能够提供更高的并发性能。协程常用于处理大量的 I/O 操作,因为它能够有效减少上下文切换的开销。

协程的主要优势是轻量,栈空间较小,与传统的内核线程相比,协程的内存消耗非常低。

Java 中的协程

在 Java 中,协程的实现主要通过 纤程(Fiber) 进行,OpenJDK 在 2018 年创建了 Loom 项目,旨在为 Java 引入用户线程模型,以支持更高效的并发编程。

Quasar:一个流行的 Java 协程库,使用字节码注入技术实现协程。它能够大幅度提高并发性能,尤其适用于处理大量的 I/O 操作。

JDK19 的虚拟线程

JDK 19 引入了轻量级虚拟线程(也就是协程),但此特性目前还处于预览版,无法在生产环境中使用。虚拟线程的设计目的是减少线程调度的开销,支持更高并发,尤其适合处理大量 I/O 操作的场景。

要使用 JDK19 的虚拟线程,需要通过相应的配置,以下是启动虚拟线程的基本命令:

1
-javaagent:D:\Maven\repository\co\paralleluniverse\quasar-core\0.7.9\quasar-core-0.7.9.jar

守护线程

Daemon(守护)线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。
当一个 Java 虚拟机中不存在非 Daemon 线程的时候,Java 虚拟机将会退出。可以通过调用 Thread.setDaemon(true)将线程设置为 Daemon 线程。我们一般用不上,比如垃圾回收线程就是 Daemon 线程。
Daemon 线程被用作完成支持性工作,但是在 Java 虚拟机退出时 Daemon 线程中的 finally 块并不一定会执行。在构建 Daemon 线程时,不能依靠 finally 块中。
所以操作一些文件等资源释放等工作,最好不要放在 Daemon 线程中。

4. 线程间的通信和协调、协作

很多时候,孤零零的一个线程工作并没有太多用处,更多时候,我们是很多线程一起工作,且这些线程间进行通信或配合着完成某项工作,这就离不开线程间的通信和协调、协作。

管道输入输出流

我们已经知道,进程间有好几种通信机制,其中包括了管道。其实 Java 的线程里也有类似的管道机制,用于线程之间的数据传输,而传输的媒介为内存。

设想这么一个应用场景:通过 Java 应用生成文件,然后需要将文件上传到云端,比如:

  1. 页面点击导出后,后台触发导出任务,将 MySQL 中的数据根据导出条件查询出来,生成 Excel 文件,并将文件上传到 OSS,最后发布一个下载文件的链接。
  2. 和银行及金融机构对接时,从本地某个数据源查询数据后,上报 XML 格式的数据,给到指定的 FTP 或 OSS 的某个目录下也是类似的。

一般做法是,先将文件写入到本地磁盘,再从文件磁盘读出来上传到云盘,但通过 Java 中的管道输入输出流一步到位,可以避免写入磁盘这一步。

Java 中的管道输入/输出流主要包括如下 4 种具体实现:

  • PipedOutputStreamPipedInputStream(面向字节)
  • PipedReaderPipedWriter(面向字符)
详细代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Piped {
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
/* 将输出流和输入流进行连接,否则在使用时会抛出IOException*/
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
/*将键盘的输入,用输出流接受,在实际的业务中,可以将文件流导给输出流*/
while ((receive = System.in.read()) != -1){
out.write(receive);
}
} finally {
out.close();
}
}

static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}

@Override
public void run() {
int receive = 0;
try {
/*输入流从输出流接收数据,并在控制台显示
*在实际的业务中,可以将输入流直接通过网络通信写出 */
while ((receive = in.read()) != -1){
System.out.print((char) receive);
}
} catch (IOException ex) {
}
}
}
}

join 方法

现在有 T1、T2、T3 三个线程,如何保证 T2 在 T1 执行完后执行,T3 在 T2 执行完后执行?

答:用 Thread#join 方法。在 T3 中调用 T2.join(),在 T2 中调用 T1.join()

join() 将指定的线程加入到当前线程。可以将两个交替执行的线程合并为顺序执行。例如,在线程 B 中调用线程 A 的 join() 方法,直到线程 A 执行完毕,才会继续执行线程 B 剩下的代码。

synchronized 内置锁

线程开始运行时,拥有自己的栈空间,按照既定的代码一步一步地执行,直到终止。但如果多个线程能够相互配合完成工作,包括数据共享和协同处理,这将带来巨大的价值。
Java 支持多个线程同时访问一个对象或对象的成员变量,但多个线程同时访问同一个变量会导致不可预料的结果。
关键字 synchronized 可以修饰方法或者作为同步块来使用,它主要确保多个线程在同一个时刻只允许有一个线程处于方法或同步块中,保证了线程对变量访问的可见性和排他性。

对象锁和类锁:

  • 对象锁:用于对象实例方法,或一个对象实例上的锁。
  • 类锁:用于类的静态方法或类的 class 对象上的锁。

对同一个变量操作时,被用来做锁的对象必须是同一个对象。 否则,锁无效。

需要注意的是,类锁仅在概念上存在,实际锁住的是每个类对应的 class 对象,每个类只有一个 class 对象,因此每个类只有一个类锁。
当对同一个变量操作时,类锁和对象(非 class 对象)锁混用也同样毫无用处。

错误的加锁和原因分析

详细代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TestIntegerSyn {

public static void main(String[] args) throws InterruptedException {
Worker worker=new Worker(1);
//Thread.sleep(50);
for(int i=0;i<5;i++) {
new Thread(worker).start();
}
}

private static class Worker implements Runnable{

private Integer i;
private Object o = new Object();

public Worker(Integer i) {
this.i=i;
}

@Override
public void run() {
synchronized (i) {
Thread thread=Thread.currentThread();
System.out.println(thread.getName()+"--@"
+System.identityHashCode(i));
i++;
System.out.println(thread.getName()+"-------[i="+i+"]-@"
+System.identityHashCode(i));
SleepTools.ms(3000);
System.out.println(thread.getName()+"-------[i="+i+"]--@"
+System.identityHashCode(i));
}
}
}
}

执行结果将看到 i 的取值出现乱序或重复取值的现象。

原因:虽然我们对 i 进行了加锁,但当反编译该类的 class 文件后可以看到,i++ 实际上是返回了一个新的 Integer 对象。每个线程实际加锁的是不同的 Integer 对象,因此加锁无效。

volatile 关键字,最轻量的通信/同步机制

volatile 保证了不同线程对该变量操作时的可见性,即一个线程修改了某个变量的值,其他线程会立即看到这个新值。

不加 volatile 时,子线程无法感知主线程修改了 ready 的值,从而不会退出循环;加了 volatile 后,子线程可以感知主线程修改了 ready 的值,迅速退出循环。

volatile 不能保证数据在多个线程下同时写时的线程安全(不能保证原子性),volatile 最适用的场景是一个线程写,多个线程读。

等待/通知机制

线程间相互配合,完成某项工作。例如,一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应操作。整个过程开始于一个线程,最终执行由另一个线程完成。前者是生产者,后者是消费者。
(比如后续的Sql连接池示例,释放连接的线程就是生产者,获取连接的线程就是消费者)

简单的方法是让消费者线程不断地循环检查变量是否符合预期。如果条件满足,退出 while 循环,完成消费者工作。然而,这存在如下问题:

  1. 难以确保及时性。
  2. 难以降低开销。若降低睡眠时间(如休眠 1 毫秒),消费者能更迅速地发现条件变化,但也可能消耗更多的处理器资源,造成无端浪费。
详细解释

问题 1:难以确保及时性

原因:线程只能在固定的时间间隔(如 Thread.sleep(1000))检查变量,而不是在变量变化的瞬间被通知。

1
2
3
4
5
6
7
8
while (!conditionMet) {
// 不断检查条件
try {
Thread.sleep(1000); // 每隔 1 秒检查一次
} catch (InterruptedException e) {
e.printStackTrace();
}
}

为什么不及时?

  • 如果变量 conditionMetsleep(1000) 期间发生变化,消费者线程仍然需要等到下一次检查(最长可能需要 1 秒)。
  • 这样,条件可能早已满足,但线程仍然要等一个周期,导致响应延迟

问题 2:难以降低 CPU 开销

原因:如果减少 sleep 时间(如 Thread.sleep(1)),线程会更频繁地检查变量,但这会导致 CPU 资源浪费。

1
2
3
4
5
6
7
8
while (!conditionMet) {
// 频繁检查
try {
Thread.sleep(1); // 休眠 1 毫秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}

为什么会消耗过多 CPU?

  • 线程会每 1 毫秒检查一次 conditionMet,如果 conditionMet 需要很长时间才能满足,线程会浪费大量 CPU 资源在无效的轮询上。
  • 即使 conditionMet 很快满足,大量的CPU 上下文切换仍然会造成性能浪费

等待/通知机制的优势:

通过 wait()notify() 方法,一个线程 A 可以进入等待状态,另一个线程 B 在某个时刻通知线程 A,从而避免了不必要的资源浪费。

  • notify():通知一个在对象上等待的线程,使其从 wait 方法返回,前提是该线程已获得对象的锁。
  • notifyAll():通知所有等待在该对象上的线程。
  • wait():调用该方法的线程进入 WAITING 状态,只有等待其他线程的通知或被中断才会返回。

等待和通知的标准范式:

等待方

  1. 获取对象的锁。
  2. 如果条件不满足,调用对象的 wait() 方法。被通知后,仍要检查条件。
  3. 条件满足则执行对应的逻辑。
1
2
3
4
5
6
synchronized (lock) {
while (condition) {
lock.wait();
}
// 执行对应的逻辑
}

通知方

  1. 获取对象的锁。
  2. 改变条件。
  3. 通知所有等待在对象上的线程。
1
2
3
4
synchronized (lock) {
// 改变条件
lock.notify(); // 或 lock.notifyAll();
}

notify 和 notifyAll 应该用谁

尽可能用 notifyall(),谨慎使用 notify(),因为 notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程。

为什么 wait()notify() 要在同步块中调用?

这是因为 Java API 强制要求在同步方法或同步块中调用 wait()notify()。如果不这么做,会抛出 IllegalMonitorStateException 异常。

假如我们有两个线程,一个消费者线程,一个生产者线程。生产者线程的任务可以简化成将 count 加一,然后唤醒消费者;消费者则是将 count 减一,当 count 减到 0 时,消费者会陷入睡眠。

生产者伪代码:

1
2
count+1;
notify();

消费者伪代码:

1
2
3
while(count <= 0)
wait();
count--;

问题出在上述代码中,虽然生产者和消费者各自有两步操作,这两部操作是非原子的,但它们的操作顺序容易受到线程调度的影响,造成竞态条件。具体问题如下:

  1. 生产者操作:

    • count + 1
    • notify()
  2. 消费者操作:

    • 检查 count 是否小于等于 0,如果是则调用 wait()
    • 减去 count

假设初始时 count = 0,然后消费者线程检查 count 的值,发现它小于等于 0,进入 wait()。此时,生产者线程执行了 count + 1notify(),但是由于上下文切换,消费者线程并没有进入 wait() 完全睡眠,而是被生产者线程唤醒,导致 notify() 被丢失(因为消费者还没睡,生产者已经发出通知)。这样,消费者会错误地执行接下来的代码,导致 count 的值变得不一致,形成丢失通知的问题。

为了避免这种问题,最有效的办法是让消费者和生产者竞争同一个锁。这样,确保生产者和消费者在访问 count 时,是互斥的,避免线程调度过程中引发的竞态条件。消费者线程检查 count 到调用 wait() 的这段代码,也需要在同步块中进行,确保 wait() 调用前 count 的值不会被改变。

为什么在循环中检查等待条件?

等待状态的线程可能会收到错误警报或伪唤醒。如果不在循环中检查等待条件,程序就可能在没有满足结束条件的情况下退出。因此,在等待线程醒来时,必须重新检查等待条件。
比如在数据库连接池中,等待状态的获取连接线程收到通知,但此时可能有其他线程已经获取了连接,因此需要检查连接池内是否还有连接。

调用 yield() 、sleep()、wait()、notify()等方法对锁有何影响?

yield() 、sleep()被调用后,都不会释放当前线程所持有的锁。
调用 wait()方法后,会释放当前线程持有的锁,而且当前被唤醒后,会重新去竞争锁,锁竞争到后才会执行 wait 方法后面的代码。
调用 notify()系列方法后,对锁无影响,线程只有在 syn 同步代码执行完后才会自然而然的释放锁,所以 notify()系列方法一般都是 syn 同步代码的最后一行。

等待超时模式实现一个连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class DBPool {

/*容器,存放连接*/
private static LinkedList<Connection> pool = new LinkedList<Connection>();

/*限制了池的大小=20*/
public DBPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
}

/*释放连接,通知其他的等待连接的线程*/
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool){
pool.addLast(connection);
//通知其他等待连接的线程
pool.notifyAll();
}
}
}

/*获取*/
// 在mills内无法获取到连接,将会返回null 1S
public Connection fetchConnection(long mills)
throws InterruptedException {
synchronized (pool){
//永不超时
if(mills<=0){
while(pool.isEmpty()){
pool.wait();
}
return pool.removeFirst();
}else{
/*超时时刻*/
long future = System.currentTimeMillis()+mills;
/*等待时长*/
long remaining = mills;
while(pool.isEmpty()&&remaining>0){
pool.wait(remaining);
/*唤醒一次,重新计算等待时长*/
remaining = future-System.currentTimeMillis();
}
Connection connection = null;
if(!pool.isEmpty()){
connection = pool.removeFirst();
}
return connection;
}
}
}
}
CompleteableFuture

CompleteableFuture

Java 1.5 版本引入了 Future,它是运算结果的占位符,提供了 get()get(long timeout, TimeUnit unit) 两个方法来获取运算结果。

然而,Future 有一些缺点:

  1. 阻塞:调用 get() 会阻塞,直到计算完成。
  2. 链式调用和结果聚合处理:难以链式调用多个 Future 完成长时间计算,并聚合结果。
  3. 异常处理:没有提供异常处理机制。

JDK 1.8 新增了 CompletableFuture,解决了这些问题。CompletableFuture 实现了 Future<T>CompletionStage<T> 接口。它允许链式调用、附加回调函数,且提供了异常处理机制。

CompletableFuture 的几种关键方法:

  • 创建:除了 new CompletableFuture<>(),还可以通过工厂方法创建实例。
  • 获取结果的方法
    • get():阻塞获取计算结果。
    • get(long timeout, TimeUnit unit):阻塞获取计算结果,超时抛出异常。
    • join():返回计算结果,抛出未检查异常。
    • getNow(T valueIfAbsent):若结果已计算,则返回结果,否则返回 valueIfAbsent

CompletableFuture 的辅助方法:

  • allOf():当所有 CompletableFuture 完成时执行计算。
  • anyOf():当任意一个 CompletableFuture 完成时执行计算。

常用的 then 方法:

  • thenApply():转换类,将结果转换为另一种类型。
  • thenAccept():消费类,消费当前计算结果。
  • thenRun():执行类,执行一个 Runnable 操作。

结合操作:

  • thenCombine():将两个 CompletableFuture 的结果结合起来进行计算。
  • runAfterBoth():两个任务都执行完时,执行一个操作。

异常处理:

  • exceptionally():出现异常时的补偿操作。