Java多线程笔记(5)-线程池的使用

线程池概述

线程池指的是一种容纳多个线程的容器。线程池中的线程可以重复使用,省去了频繁创建和销毁对象的操作。线程池的核心思想在于线程的复用,一个线程可以被重复使用,用来处理多个任务。线程池有以下的作用:

  1. 降低资源消耗。减少了创建和销毁线程的次数,使得创建出来的工作线程得到重复利用
  2. 提高系统的响应速度。当有任务来临的时候,可以直接使用线程池中的线程,而不用等待线程的创建
  3. 提高线程的可管理性。使用线程池可以对线程进行统一的分配,管理和监控

Java中的线程池,基本的继承关系如下,其中的不同部分会在后续进行说明

ThreadPoolExecutor

ThreadPoolExecutor是Java中提供的线程池类,通过这个类我们可以很容易地创建出一个线程池。

线程池创建

构造方法

ThreadPoolExecutor中完整的构造方法如下所示,其中一共有7个参数:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

不同参数对应线程池工作流程中的不同过程,我们首先描述线程池的工作流程。

  1. 线程池中刚开始没有线程,当一个任务提交给线程池之后,线程池会创建一个新线程来执行任务
  2. 当一个线程完成任务之后,会从队列中取得下一个任务来执行
  3. 当线程数达到corePoolSize并且没有线程空闲,这时候再加入任务,新增加的任务会被加入阻塞队列workQueue中排队,直到有空闲的线程
  4. 如果队列选择了有界队列,那么任务数量超过队列大小时,会创建maximumPoolSize - corePoolSize数目的线程来救急
  5. 如果线程数目已经达到了maximumPoolSize,仍然有新任务,这时候会执行拒绝策略
  6. 当高峰过去之后,超过corePoolSize的救急线程如果一段时间内没有任务执行,那么需要结束来节省资源。这个时间由keepAliveTimeunit来控制
  7. 至于线程工厂参数threadFactory,主要还是用来创建新线程以及进行新线程的命名

提交任务的优先级:核心线程 > 阻塞队列 > 救急线程

执行任务的优先级:核心线程 > 救急线程 > 阻塞队列

线程池中的任务被包装成一个内部类Worker,其中的线程都存储在一个HashSet中

1
private final HashSet<Worker> workers = new HashSet<Worker>();

阻塞队列

阻塞队列与普通队列的不同点在于阻塞队列中提供了阻塞添加以及阻塞删除方法,并且能够保证线程安全

  • 阻塞添加 put():当阻塞队列元素已满时,添加队列元素的线程会被阻塞,直到队列元素不满时才重新唤醒线程执行
  • 阻塞删除 take():在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般会返回被删除的元素)

核心方法如下:

方法类型 抛出异常 特殊值 阻塞 超时
插入(尾部) add(e) offer(e) put(e) offer(e,time,unit)
移除(头部) remove() poll() take() poll(time,unit)
检查(队首元素) element() peek() 不可用 不可用
  • 抛出异常组:
    • 当阻塞队列满时,往队列中 add 插入元素会抛出异常
    • 当阻塞队列空时,从队列中 remove 移除元素会抛出异常
  • 特殊值组:
    • 插入方法:返回是否插入成功,成功插入返回true,否则false
    • 移除方法:如果成功则返回出队列元素,队列没有就返回 null
  • 阻塞组:
    • 当阻塞队列满时,生产者继续往队列里 put 元素,队列会一直阻塞生产线程直到队列有空间 put 数据或响应中断退出
    • 当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程直到队列中有可用元素
  • 超时退出:插入和移除不满足条件时,会等待一定时间

阻塞队列又可以分为有界队列和无界队列:

  • 有界队列:指的是有固定大小的队列
  • 无界队列:没有设置固定大小的队列,可以直接将元素入队直到溢出(最大值为Integer.MAX_VALUE

java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的无界(默认大小 Integer.MAX_VALUE)的阻塞队列,当然也支持指定大小成为有界队列
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayedWorkQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,每一个生产线程会阻塞到有一个 put 的线程放入元素为止
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

拒绝策略

拒绝策略在Java中对应的是RejectExecutionHandler接口,这个接口中只有下面一个抽象方法,其中实现具体的拒绝策略:

1
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

在JDK中也给我们提供了下面4个实现:

  • AbortPolicy:默认策略,让调用者抛出RejectedExecutionExecption异常
  • CallerRunsPolicy:让调用者来执行任务
  • DiscardPolicy:放弃本次任务的执行
  • DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务

Executors工具类

在Executors工具类中,给我们提供了许多模式的线程池创建方式。当然实际上只是对ThreadPoolExecutor类的创建方法进行了一层封装。常见的线程池有下面三种:

newFixedThreadPool:固定线程数目的线程池

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 核心线程数=最大线程数,无救急线程,从而也不需要超时时间
  • 阻塞队列为LinkedBlockingQueue,无界,默认大小为Integer.MAX_VALUE,可以存放任意数量的任务,在任务比较多的时候会造成内存溢出OOM
  • 适用于任务量已知,相对耗时的长期任务

newCachedThreadPool:带有缓冲的线程池

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 核心线程数为0,最大线程数为Integer.MAX_VALUE,即创建的所有线程都是救急线程
  • 救急线程的存活时间为60s
  • 可能出现一次性创建大量线程的情况,导致内存溢出OOM
  • 阻塞队列为SynchronousQueue,没有容量,对于每一个执行take的线程,会阻塞到有一个put线程放入元素位置
  • 适合任务数量比较密集,但是每个任务执行时间较短的情况

newSingleThreadExecutor:只有一个线程的单线程池

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • 线程池中的线程数固定为1,保证所有任务按照指定顺序执行
  • 阻塞队列为LinkedBlockingQueue,无界队列。任务数多于1的时候,多余任务会放入无界队列中排队
  • 任务执行完毕,唯一的线程也不会释放

newSingleThreadExecutor() 对比 单线程:

  • 单线程可以保证串行执行任务,但是如果任务执行失败则没有任何补救措施
  • SingleThreadExecutor则会新建一个线程,保证线程池的正常工作

newSingleThreadExecutor() 对比 newFiexdThreadPool(1)

  • newSingleThreadExecutor() 线程个数始终为1,不能修改。这是因为它的方法中使用了FinalizableDelegatedExecutorService进行包装,对外只暴露了ExecutorService接口
  • newFiexdThreadPool(1) 初始创建线程池中线程个数为1,但是可以进行修改。因为它的方法中暴露的是ThreadPoolExecutor对象,其中提供了setCorePoolSize方法

线程池使用

任务提交

线程任务可以是Runnable或者Callable,二者的区别在于Runnable没有返回值,不能抛出异常;而Callable支持返回值和泛型,可以抛出异常。

线程池中任务提交相关的API如下所示:

方法 说明
void execute(Runnable command) 执行Runnable类型任务
Future<?> submit(Runnable task) 提交任务 task
Future submit(Callable task) 提交任务 task,用返回值 Future 获得任务执行结果
List<Future> invokeAll(Collection<? extends Callable> tasks) 提交 tasks任务列表中的所有任务,返回任务执行结果Future组成的列表
List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) 提交 tasks 中所有任务,返回任务执行结果Future组成的列表。
可以设置超时时间。超时会取消提交列表中没有执行完的任务,并抛出超时异常
T invokeAny(Collection<? extends Callable> tasks) 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

execute对比submit:

  • 二者都是线程池中提交任务的方法
  • execute只能执行Runnable类型的任务,没有返回值。会直接抛出任务执行过程时的异常
  • submit既能够提交Runnable类型的任务,也能够提交Callable类型的任务,底层是封装成FutureTask,然后调用execute来执行。在执行过程中会吞掉异常,通过FutureTask的get方法可以将任务执行时的异常重新抛出

线程池关闭

方法 说明
void shutdown() 将线程池状态变为 SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完。
调用该方法的线程不会被阻塞
List shutdownNow() 将线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,并将队列中的任务返回
boolean isShutdown() 判断线程池的状态,如果是SHUTDOWN则返回 true
boolean isTerminated() 判断线程池的状态,如果是TERMINATED则返回true
boolean awaitTermination(long timeout, TimeUnit unit) 调用 shutdown 后,调用线程不会阻塞等待所有任务运行结束
如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待

线程池状态

在ThreadPoolExecutor中使用一个AtomicInteger来保存线程池的相关状态,其中高3位表示线程池的状态,低29位表示线程的数量。

使用一个原子整数将状态和数量保存在一起,为的是减少CAS的操作次数,只用一次CAS原子操作就可以进行赋值

1
2
3
4
5
6
7
8
9
10
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

这些状态说明如下:

状态 高3位 接收新任务 处理阻塞任务队列 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不接收新任务,但处理阻塞队列剩余任务
STOP 001 N N 中断正在执行的任务,并抛弃阻塞队列中的任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 - - 终止状态

注意事项

使用注意

在线程池的使用方面,阿里巴巴Java开发手册中有如下两点相关要求:

  1. 线程资源必须通过线程池来提供,不允许在应用中自行显式创建线程
  2. 线程池不允许使用Executors工具类去创建,而应该通过ThreadPoolExecutor来创建,明确指定线程池的运行规则,规避资源耗尽的风险

第一点使用线程池而非线程。使用线程池的好处是能够减少创建和销毁线程的消耗,降低系统资源的开销。如果不使用线程池,有可能造成系统创建大量同类线程而导致内存耗尽或者切换频繁的问题

第二点使用ThreadPoolExecutor,明确各个参数以及运行规则。因为Executors返回的线程池对象中一些参数是固定的,例如无界的阻塞队列,可能造成大量线程的创建,导致OOM

线程数目设置

如果线程池线程数量太小,有大量请求需要处理的时候,系统的响应就会很慢,甚至可能出现任务队列大量堆积任务导致内存溢出的情况;如果线程池线程数量太多,大量线程争取CPU资源,导致大量的上下文切换,影响效率

对于CPU密集型任务来说,一般设置为N+1。N为CPU核数。多出来的一个线程可以在某些线程阻塞的时候顶上,充分利用空闲时间;对于IO密集型任务来说,一般设置为2N。频繁出现IO阻塞的话,可以多配置一些线程来利用CPU

ScheduledThreadPoolExecutor

任务调度

我们程序的一种需求是能够定时执行,定时执行可以通过一些脚本来实现,但是Java中也提供了一些类有对应的功能。ScheduledThreadPoolExecutor就是其中一种。在介绍调度线程池之前,我们可以简单了解一下在它出现之前,Java中用来定时执行任务的类 Timer

Timer可以实现定时功能,它简单易用,但是由于所有的任务都是通过同一个线程来调度,因此所有的任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响之后的任务。(加入Timer执行的任务需要封装成TimerTask)

如下面的代码所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task1");
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task2");
}
};
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);

正常情况输出如下:

1
2
20:46:35.621 [Timer-0] c.ThreadTest - task1
20:46:35.621 [Timer-0] c.ThreadTest - task2

如果任务1中出现异常,会影响任务2的执行

1
2
// 在任务1中添加
int i = 1 / 0;

输出报错:

1
2
3
4
5
20:49:36.887 [Timer-0] c.ThreadTest - task1
Exception in thread "Timer-0" java.lang.ArithmeticException: / by zero
at ThreadTest$1.run(ThreadTest.java:20)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

如果任务1中出现延迟,也会影响任务2的执行

1
2
// 在任务1中添加
Thread.sleep(2000);

输出如下:但是我们希望任务2是从当前延时1000ms后执行,这里被任务1的延时影响了

1
2
20:50:23.973 [Timer-0] c.ThreadTest - task1
20:50:25.985 [Timer-0] c.ThreadTest - task2

调度线程池概述

调度线程池ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,通过创建线程来执行定时任务,解决了上面使用Timer过程中可能遇到的问题

构造方法如下:

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
  • 使用内部类 ScheduledFutureTask 封装任务
  • 使用内部类 DelayedWorkQueue 作为线程池队列

常用方法

ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u)延迟执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2);
poolExecutor.schedule(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("task 1");
int i = 1 / 0;
}, 1000, TimeUnit.MILLISECONDS);
poolExecutor.schedule(() -> {
log.debug("task 2");
}, 1000, TimeUnit.MILLISECONDS);

输出如下:任务1的异常和延时并不会影响其他任务。(异常不会在控制台上打印)

1
2
21:04:00.987 [pool-1-thread-2] c.ThreadTest - task 2
21:04:02.994 [pool-1-thread-1] c.ThreadTest - task 1

ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit):定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、时间单位

1
2
3
4
5
6
7
8
9
10
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(1);
log.debug("start");
poolExecutor.scheduleAtFixedRate(() -> {
log.debug("running...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);

输出如下:执行耗时大于周期间隔,发现2s执行一次:

1
2
3
4
5
6
7
8
9
21:06:49.291 [main] c.ThreadTest - start
21:06:50.352 [pool-1-thread-1] c.ThreadTest - running...
21:06:52.366 [pool-1-thread-1] c.ThreadTest - running...
21:06:54.379 [pool-1-thread-1] c.ThreadTest - running...
21:06:56.380 [pool-1-thread-1] c.ThreadTest - running...
21:06:58.388 [pool-1-thread-1] c.ThreadTest - running...
21:07:00.403 [pool-1-thread-1] c.ThreadTest - running...
21:07:02.416 [pool-1-thread-1] c.ThreadTest - running...
21:07:04.422 [pool-1-thread-1] c.ThreadTest - running...

ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit):定时执行周期任务,考虑执行的耗时,参数为初始延迟时间、间隔时间、时间单位

1
2
3
4
5
6
7
8
9
10
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(1);
log.debug("start");
poolExecutor.scheduleWithFixedDelay(() -> {
log.debug("running...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);

输出如下:执行耗时大于周期间隔,发现3s执行一次:

1
2
3
4
5
6
7
8
21:11:06.562 [main] c.ThreadTest - start
21:11:07.619 [pool-1-thread-1] c.ThreadTest - running...
21:11:10.635 [pool-1-thread-1] c.ThreadTest - running...
21:11:13.662 [pool-1-thread-1] c.ThreadTest - running...
21:11:16.677 [pool-1-thread-1] c.ThreadTest - running...
21:11:19.699 [pool-1-thread-1] c.ThreadTest - running...
21:11:22.713 [pool-1-thread-1] c.ThreadTest - running...
21:11:25.743 [pool-1-thread-1] c.ThreadTest - running...

scheduleAtFixedRate 对比 scheduleWithFixedDelay:

  • 前者的间隔时间和执行耗时是同时计算的,如果执行耗时大于间隔时间,则等到执行完毕后就会立即执行下一个周期,不会再次等待
  • 后者的间隔时间和执行耗时是先后计算的,执行完毕之后再计算间隔,间隔之后再执行下一个周期

Fork-Join 线程池

概述

Fork-Join是JDK1.7加入的新的线程池的实现,它体现的是一种分治思想,适用于能够进行任务拆分的CPU密集型运算。所谓的任务拆分,就是将一个大任务拆分成算法上相同的小任务,直至不能拆分,可以直接求解。与递归相关的一些计算,如归并排序,斐波那契数列等,都可以用分治思想进行求解。

Fork-Join在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。

  • Fork-Join使用ForkJoinPool来启动,这是一个特殊的线程池,默认会创建与CPU核心数大小相同的线程池
  • 提交给ForkJoinPool任务需要继承RecursiveTask(有返回值)或RecursiveAction(没有返回值)

使用示例

利用ForkJoinPool来完成1~n的累加,我们需要提交对应的任务类task,并且这个任务类需要继承RecursiveTask或者RecursiveAction,在类中重写计算逻辑,在计算逻辑中进行分治的拆分和结果合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
log.debug("result: {}", forkJoinPool.invoke(new MyTask(1, 5)));
}

public static class MyTask extends RecursiveTask<Integer> {
private int start;
private int end;

public MyTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
log.debug("compute from {} to {}", start, end);
// 按照二分进行拆分
if (start == end) {
return start;
} else if (start + 1 == end) {
return start + end;
} else {
int mid = (start + end) / 2;
// fork
MyTask task1 = new MyTask(start, mid);
MyTask task2 = new MyTask(mid + 1, end);
task1.fork();
task2.fork();

// join
return task1.join() + task2.join();
}
}
}

输出如下:其中,每次新建的MyTask相当于加入线程池中的任务,可能由线程池中任一线程来执行

1
2
3
4
5
6
21:29:32.889 [ForkJoinPool-1-worker-1] c.ThreadTest - compute from 1 to 5
21:29:32.889 [ForkJoinPool-1-worker-1] c.ThreadTest - compute from 1 to 3
21:29:32.889 [ForkJoinPool-1-worker-1] c.ThreadTest - compute from 1 to 2
21:29:32.889 [ForkJoinPool-1-worker-3] c.ThreadTest - compute from 4 to 5
21:29:32.889 [ForkJoinPool-1-worker-2] c.ThreadTest - compute from 3 to 3
21:29:32.889 [main] c.ThreadTest - result: 15

由于这种方式需要用户自定义实现分治和合并操作,仍然有一定的门槛,后续Java提供了Stream API,其中的并行流也可以达到类似的效果


Java多线程笔记(5)-线程池的使用
http://example.com/2022/09/18/Java多线程笔记-5-线程池的使用/
作者
EverNorif
发布于
2022年9月18日
许可协议