Java 线程池

1. 线程池的基本组成

Java 线程池是通过 ThreadPoolExecutor 类实现的。我们来看下线程池的主要参数以及如何使用它。

参数说明:

  • corePoolSize:核心线程池大小。线程池中核心线程的数量,默认情况下会一直存在,即使没有任务需要执行。如果线程池中的核心线程数目还没有达到该值,线程池会创建新的线程来处理任务。

  • maximumPoolSize:最大线程池大小。线程池中能够容纳的最大线程数。若当前有任务需要执行且线程池中线程数达到了 corePoolSize,线程池会根据任务队列的情况来决定是否创建新的线程,直到达到最大线程池大小。

  • keepAliveTime:线程空闲时间。对于超出 corePoolSize 但没有被销毁的线程,如果在一定时间内没有新的任务到来,线程将会被销毁。

  • unit:时间单位,通常用 TimeUnit.SECONDS 或其他单位。

  • workQueue:工作队列。当线程池中的线程都忙时,任务会被放入此队列中等待。常见的队列有:ArrayBlockingQueueLinkedBlockingQueue 等。

  • threadFactory:线程工厂。用来创建新线程的工厂。你可以自定义线程的创建逻辑。

  • handler:拒绝策略。当线程池达到最大线程数且工作队列已满时,新的任务会根据该策略进行处理。常见的拒绝策略有:

    • AbortPolicy(默认):直接抛出 RejectedExecutionException
    • CallerRunsPolicy:在调用者线程中执行任务。
    • DiscardPolicy:丢弃当前任务。
    • DiscardOldestPolicy:丢弃任务队列中最旧的任务。

线程池执行流程

线程池执行流程

  1. 当我们提交任务,线程池会根据corePoolSize大小创建线程来执行任务;
  2. 当任务的数量超过corePoolSize数量,后续的任务将会进入阻塞队列阻塞排队;
  3. 当阻塞队列也满了之后,那么将会继续创建(maximumPoolSize-corePoolSize)个数
    量的线程来执行任务,如果任务处理完成,maximumPoolSize-corePoolSize个额外创
    建的线程等待 keepAliveTime之后被自动销毁;
  4. 如果达到maximumPoolSize,阻塞队列还是满的状态,那么将根据不同的拒绝策略
    进行拒绝处理;

2. 代码讲解

下面的代码是一个 Java 线程池的示例,基于 Executor 框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadPoolExample {

public static void main(String[] args) {
// 创建线程池,核心线程数为1,最大线程数为1,线程空闲时间60秒,使用一个大小为1的阻塞队列
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, // corePoolSize
1, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<Runnable>(1), // 阻塞队列
new MyThreadFactory("pay-thread-pool-"), // 自定义线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

// 提交3个任务到线程池
for (int i = 0; i < 3; i++) {
threadPoolExecutor.execute(new MyRunnable(i));
}

// 关闭线程池
threadPoolExecutor.shutdown();
}

// 自定义线程工厂
static class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1); // 线程编号
private final String namePrefix; // 线程名称前缀

public MyThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
if (t.isDaemon()) t.setDaemon(false); // 设置为非守护线程
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); // 设置线程优先级
return t;
}
}

// 自定义任务
static class MyRunnable implements Runnable {
private int i;

public MyRunnable(int i) {
this.i = i;
}

@Override
public void run() {
System.out.println(this.i + " 任务执行...... " + Thread.currentThread().getName());
}
}
}

3. 代码解释:

创建线程池:

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, // corePoolSize
1, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<Runnable>(1), // 阻塞队列,最大容量为1
new MyThreadFactory("pay-thread-pool-"), // 自定义线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
  • 创建一个线程池 ThreadPoolExecutor,核心线程数和最大线程数都设置为 1。线程池使用 ArrayBlockingQueue 阻塞队列,容量为 1。
  • 任务提交后,如果线程池中没有空闲线程,且队列已满,则使用 CallerRunsPolicy 策略,意味着任务将在调用者线程中执行。

提交任务:

1
2
3
for (int i = 0; i < 3; i++) {
threadPoolExecutor.execute(new MyRunnable(i));
}
  • 这里通过 threadPoolExecutor.execute() 方法提交了 3 个任务到线程池。
  • 每个任务是 MyRunnable 的实例,执行时会输出任务编号和当前线程名称。

线程工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

public MyThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
if (t.isDaemon()) t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
  • 自定义线程工厂 MyThreadFactory 用于创建线程。通过 AtomicInteger 来保证线程编号的唯一性,并设置线程名称的前缀。
  • 线程默认设置为非守护线程,并且优先级设置为正常。

任务执行:

1
2
3
4
5
6
7
8
9
10
11
12
static class MyRunnable implements Runnable {
private int i;

public MyRunnable(int i) {
this.i = i;
}

@Override
public void run() {
System.out.println(this.i + " 任务执行...... " + Thread.currentThread().getName());
}
}
  • MyRunnable 实现了 Runnable 接口,它的 run() 方法会输出任务编号以及当前执行任务的线程名。

关闭线程池:

1
threadPoolExecutor.shutdown();
  • 调用 shutdown() 方法关闭线程池。线程池中的线程将继续执行已经提交的任务,但不再接收新的任务。

运行结果:

1
2
3
0 任务执行...... pay-thread-pool-1
1 任务执行...... pay-thread-pool-2
2 任务执行...... pay-thread-pool-3
  • 线程池中的每个线程都会依次执行任务,任务执行时会输出任务编号和当前线程的名称。

4. 核心线程数设置

1. 公式参考:

  • 公式:Nthreads = Ncpu * Ucpu * (1 + W/C)
    Ncpu:CPU的核心数。
    Ucpu:CPU的使用率(0~1之间的值)。
    W:线程等待时间。(比如线程等待I/O操作的时间)
    C:线程计算时间。(比如线程执行for循环的时间)

  • 举例:
    8 * 100% * (1 + 60/40) = 20
    8 * 100% * (1 + 80/20) = 40

2. CPU 密集型任务:

  • 对于 CPU 密集型任务,主要是执行计算或解压、压缩等需要大量 CPU 资源的任务。对于这种任务类型,线程池的核心线程数通常设置为 CPU 核心数 + 1
  • 这样做的目的是避免线程空闲时浪费 CPU 资源。当一个任务完成后,可以用多出的线程来防止 CPU 空闲,提高处理效率。

3. I/O 密集型任务:

  • 对于 I/O 密集型任务,主要是执行与 I/O 操作相关的任务,比如数据库操作、文件读取、网络通信等。此类任务不会特别消耗 CPU 资源,因此可以配置更多的线程来处理。
  • 对于 I/O 密集型任务,线程池的核心线程数通常设置为 CPU 核心数 * 2。这样做是因为线程在进行 I/O 操作时不会占用 CPU,CPU 可以交给其他线程使用。

总结:

  • ThreadPoolExecutor 是实现线程池的核心类,支持多个配置参数。
  • 通过 execute() 方法提交任务,线程池会根据核心线程数和最大线程数来管理线程的生命周期。
  • 任务提交时,线程池首先尝试使用核心线程执行,如果核心线程已满,则根据队列大小和最大线程池数来创建新线程。
  • 如果线程池中的线程数达到了最大线程数且队列已满,任务会被拒绝,取决于指定的拒绝策略。