线程安全问题

1. 线程安全性

什么是线程安全性?

我们可以这么理解,我们所写的代码在并发情况下使用时,总是能表现出正确的行为;反之,未实现线程安全的代码,表现的行为是不可预知的,有可能正确,而绝大多数的情况下是错误的。正如 Java 语言规范在《Chapter 17. Threads and Locks》所说的:

线程的行为(尤其是在未正确同步的情况下)可能会造成混淆并且违反直觉。本章描述了多线程程序的语义。它包括规则,通过读取多个线程更新的共享内存可以看到值。

如果要实现线程安全性,就要保证我们的类是线程安全的。在《Java 并发编程实战》中,定义“类是线程安全的”如下:

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在调用代码中不需要任何额外的同步或者协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

如何实现线程安全?

线程封闭

实现好的并发是一件困难的事情,所以很多时候我们都想躲避并发。避免并发最简单的方法就是线程封闭。

线程封闭就是把对象封装到一个线程里,只有这一个线程能看到此对象。那么这个对象就算不是线程安全的也不会出现任何安全问题。

栈封闭

栈封闭是我们编程当中遇到的最多的线程封闭。简单来说,就是局部变量。多个线程访问一个方法,此方法中的局部变量都会被拷贝一份到线程栈中。所以局部变量是不被多个线程所共享的,也就不会出现并发问题。因此,能用局部变量就别用全局的变量,全局变量容易引起并发问题。

ThreadLocal

ThreadLocal 是实现线程封闭的最好方法。ThreadLocal 内部维护了一个 Map,Map 的 key 是每个线程的名称,而 Map 的值就是我们要封闭的对象。每个线程中的对象都对应着 Map 中一个值,也就是 ThreadLocal 利用 Map 实现了对象的线程封闭。

无状态的类

没有任何成员变量的类,就叫无状态的类,这种类一定是线程安全的。

如果这个类的成员方法的参数传入了对象,也是线程安全的。

让类不可变

让状态不可变,加 final 关键字,对于一个类,所有的成员变量应该是私有的,同时只要有可能,所有的成员变量应该加上 final 关键字。

但是要注意,一旦类的成员变量中有对象,上述的 final 关键字保证不可变并不能保证类的安全性。
final 修饰的对象是引用不能被修改,但是对象的状态是可以被修改的。
因为在多线程下,虽然对象的引用不可变,但是对象在堆上的实例是有可能被多个线程同时修改的,没有正确处理的情况下,对象实例在堆中的数据是不可预知的。

加锁和 CAS

我们最常使用的保证线程安全的手段,包括:

  • 使用 synchronized 关键字
  • 使用显式锁
  • 使用各种原子变量
  • 修改数据时使用 CAS 机制

2. 死锁

概念

死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁。

  1. 死锁是必然发生在多操作者(M>=2 个)争夺多个资源(N>=2 个,且 N<=M)才会发生这种情况。
  2. 争夺资源的顺序不对,如果争夺资源的顺序是一样的,也不会产生死锁;
  3. 争夺者对拿到的资源不放手。

死锁代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class DeadlockExample {
// 定义两个对象作为锁
private static final Object resource1 = new Object();
private static final Object resource2 = new Object();

public static void main(String[] args) {
// 创建第一个线程
Thread thread1 = new Thread(() -> {
// 线程 1 先获取 resource1 的锁
synchronized (resource1) {
System.out.println("Thread 1: Holding resource 1...");
try {
// 模拟一些操作
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 1: Waiting for resource 2...");
// 线程 1 尝试获取 resource2 的锁
synchronized (resource2) {
System.out.println("Thread 1: Holding resource 1 and 2...");
}
}
});

// 创建第二个线程
Thread thread2 = new Thread(() -> {
// 线程 2 先获取 resource2 的锁
synchronized (resource2) {
System.out.println("Thread 2: Holding resource 2...");
try {
// 模拟一些操作
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 2: Waiting for resource 1...");
// 线程 2 尝试获取 resource1 的锁
synchronized (resource1) {
System.out.println("Thread 2: Holding resource 2 and 1...");
}
}
});

// 启动两个线程
thread1.start();
thread2.start();
}
}
专业术语

产生死锁的四个必要条件

  1. 互斥条件:进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。
  2. 请求和保持条件:进程已经持有至少一个资源,但又提出新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。
  3. 不剥夺条件:进程已获得的资源,在未使用完之前,不能被剥夺。
  4. 环路等待条件:发生死锁时,必然存在一个进程—资源的环形链。

避免死锁的常见方法

  1. 打破互斥条件:改造独占性资源为虚拟资源,大部分资源已无法改造。
  2. 打破不可抢占条件:当一进程占有一独占性资源后又申请一独占性资源而无法满足,则退出原占有的资源。
  3. 打破占有且申请条件:采用资源预先分配策略。
  4. 打破循环等待条件:实现资源有序分配策略,对所有设备实现分类编号。

避免死锁常见的算法有:

  • 有序资源分配法
  • 银行家算法

避免死锁

找到死锁

通过 jstack 命令找到死锁的线程,然后通过 jstack -l pid 找到死锁的线程堆栈信息。

有序资源分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SafeOperate {

private static Object No13 = new Object();//第一个锁
private static Object No14 = new Object();//第二个锁
private static Object tieLock = new Object();//第三把锁

public void transfer(Object first,Object second)
throws InterruptedException {

int firstHash = System.identityHashCode(first);
int secondHash = System.identityHashCode(second);

if(firstHash<secondHash){
synchronized (first){
System.out.println(Thread.currentThread().getName()+" get "+first);
Thread.sleep(100);
synchronized (second){
System.out.println(Thread.currentThread().getName()+" get "+second);
}
}
}else if(secondHash<firstHash){
synchronized (second){
System.out.println(Thread.currentThread().getName()+" get"+second);
Thread.sleep(100);
synchronized (first){
System.out.println(Thread.currentThread().getName()+" get"+first);
}
}
}else{
synchronized (tieLock){
synchronized (first){
synchronized (second){
System.out.println(Thread.currentThread().getName()+" get"+first);
System.out.println(Thread.currentThread().getName()+" get"+second);
}
}
}
}
}
}

拿不到锁释放资源:尝试拿锁的机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class TryLock {
private static Lock No13 = new ReentrantLock();//第一个锁
private static Lock No14 = new ReentrantLock();//第二个锁

//先尝试拿No13 锁,再尝试拿No14锁,No14锁没拿到,连同No13 锁一起释放掉
private static void A() throws InterruptedException {
String threadName = Thread.currentThread().getName();
Random r = new Random();
while(true){
if(No13.tryLock()){
System.out.println(threadName +" get 13");
try{
if(No14.tryLock()){
try{
System.out.println(threadName +" get 14");
System.out.println("A do work------------");
break;
}finally{
No14.unlock();
}
}
}finally {
No13.unlock();
}

}
Thread.sleep(r.nextInt(3));
}
}

//先尝试拿No14锁,再尝试拿No13锁,No13锁没拿到,连同No14锁一起释放掉
private static void B() throws InterruptedException {
String threadName = Thread.currentThread().getName();
Random r = new Random();
while(true){
if(No14.tryLock()){
System.out.println(threadName +" get 14");
try{
if(No13.tryLock()){
try{
System.out.println(threadName +" get 13");
System.out.println("B do work------------");
break;
}finally{
No13.unlock();
}
}
}finally {
No14.unlock();
}

}
Thread.sleep(r.nextInt(3));
}
}

private static class AThread extends Thread{

private String name;

public AThread(String name) {
this.name = name;
}

public void run(){
Thread.currentThread().setName(name);
try {
A();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
Thread.currentThread().setName("B线程");
AThread aThread = new AThread("A线程");
aThread.start();
try {
B();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

3. 其他安全问题

活锁

两个线程在尝试拿锁的机制中,发生多个线程之间互相谦让,不断发生同一个线程总是拿到同一把锁,在尝试拿另一把锁时因为拿不到,而将本来已经持有的锁释放的过程。

解决办法:每个线程休眠随机数,错开拿锁的时间。

线程饥饿

低优先级的线程,总是拿不到执行时间。

单例模式

4. 线程安全的单例模式

双重检查锁定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 懒汉式-双重检查
*/
public class SingleDcl {
private volatile static SingleDcl singleDcl;
//私有化
private SingleDcl(){
}

public static SingleDcl getInstance(){
if (singleDcl == null){ //第一次检查,不加锁
System.out.println(Thread.currentThread()+" is null");
synchronized(SingleDcl.class){ //加锁
if (singleDcl == null){ //第二次检查,加锁情况下
System.out.println(Thread.currentThread()+" is null");
//内存中分配空间 1
//空间初始化 2
//把这个空间的地址给我们的引用 3
singleDcl = new SingleDcl();
}
}
}
return singleDcl;
}
}

单例模式推荐实现

懒汉式(类初始化模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 懒汉式-延迟初始化占位类模式
*/
public class SingleInit {
private SingleInit(){}

private static class InstanceHolder{
private static SingleInit instance = new SingleInit();
}

public static SingleInit getInstance(){
return InstanceHolder.instance;
}

}

饿汉式(枚举方式)

1
2
3
4
5
6
7
8
9
10
/**
* 饿汉式
* 枚举
*/
public class SingleEHan {
private SingleEHan(){}
private static SingleEHan singleDcl = new SingleEHan();

}

这种方式可以防止反序列化破坏单例。