在之前的文章中,我们对 Java
线程有了一个初步的了解,通过多线程设计能够大大提升系统功能。但多线程同样也带来一个新的问题,我们都知道 Java 的线程强依赖于系统调度,频繁的创建线程显然造成了一定的性能循环。
因此,线程与数据库连接类似,同样具备资源池化能力,即线程池。通过线程池可以实现资源的最大化利用,当一个线程完结之后并不会被立马销毁,而是放回线程池,在新的任务被提交时即可复用此线程,省去新建以及初始化线程所带来的开销。
本文就深入了解 Java
中为我们提供了哪些线程池以及各自所具备的特性。
一、Executor
1. 基础接口
Executor
为所有线程线程池的顶级抽象类,提供了 execute()
方法用于提交线程子任务。
public void executorDemo() {
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
System.out.println("Task 1.");
});
}
在通过线程池提供任务时若线程任务中存在状态值变更操作时一定需要注意异常的情况,因为线程池子任务在抛出异常时并不会中断主线程的执行,而没有手动进行 try catch
则会发生无法预料的结果。
如下两个线程池示例中,在 demo1()
中并无法在主线程中捕获到异常信息,而会被线程池吞掉,在 demo2()
中通过显式的捕获即可正常拦截异常信息。
@Test
public void demo1() {
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
throw new RuntimeException("You can't receive this message");
});
}
@Test
public void demo2() {
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
try {
throw new RuntimeException("You can receive this message");
} catch (Exception e) {
System.out.println("Info" + e.getMessage());
}
});
}
2. 进阶接口
ExecutorService
扩展了 Executor
接口, 支持有返回值的线程与管理线程生命周期。即可通过 submit()
与 invokeAny()
提交 Runnable
与 Callable
两类有返回值线程任务。
同时 ExecutorService
提供了 shutdown()、shutdownNow()、isShutdown()
等方法用于控制线程池的启停,当关闭线程池后无法继续提交线程任务,但已提交的线程任务仍然继续执行,待所有任务完成后则释放线程池资源。
3. 线程类型
(1) 单线程池
通过 newSingleThreadExecutor()
创建线程数为一的线程池,当提前的任务大于 1
,其余任务将进入等待。
如下示例中每间隔 2
秒才会打印一次。
public void demo1() {
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
int finalI = i;
executor.submit(() -> {
System.out.println("Task-" + finalI + " running.");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
(2) 固定线程池
通过 newFixedThreadPool(n)
创建指定线程数的线程池,线程池大小在初始化声明之后便无法改变。
如下示例中每间隔 2
秒打印 3
条信息。
public void demo2() {
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 6; i++) {
int finalI = i;
executor.submit(() -> {
System.out.println("Task-" + finalI + " running.");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
// 关闭线程池
executor.shutdown();
}
(3) 自适应线程池
通过 newCachedThreadPool()
创建自适应线程数连接池,根据提交的任务数量自行增减。
如下示例中将一次性打印 5
条信息。
public void demo3() {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
int finalI = i;
executor.submit(() -> {
System.out.println("Task-" + finalI + " running.");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
二、定时线程池
ScheduledExecutorService
在 ExecutorService
的基础上再次进行扩展,可用于创建定时线程任务资源池。
1. 创建示例
通过 schedule()
方法即可提交定时线程任务,下面通过示例进行详细介绍。
通过 newScheduledThreadPool()
创建定时任务线程池资源。
public void scheduledPoolDemo1() throws Exception {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 当前时间 3 秒后执行一次
int interval = 3;
executor.schedule(() -> {
System.out.println("Task-1 running.");
}, interval, TimeUnit.SECONDS);
// 等待子线程结果
TimeUnit.SECONDS.sleep(6);
}
2. 线程重复
除了延迟触发,也可通过 scheduleAtFixedRate()
与 scheduleWithFixedDelay()
设置线程任务定期往复执行,
public void scheduledPoolDemo2() {
int start = 3;
int interval = 3;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 当前时间 1 秒后执行, 且每隔 3 秒重复执行
// (间隔时间:上一次任务开始时计时)
executor.scheduleAtFixedRate(new Task("fixed-rate"), start, interval, TimeUnit.SECONDS);
// 当前时间 1 秒后执行, 上一任务执行完成 3 秒后重复执行
// (间隔时间:上一次任务结束时计时)
executor.scheduleWithFixedDelay(new Task("fixed-delay"), start, interval, TimeUnit.SECONDS);
executor.shutdown();
}
需要注意 scheduleAtFixedRate
与 scheduleWithFixedDelay
的区别。
scheduleAtFixedRate()
间隔时间是从上一个任务开始时计算,无论上个任务是否已经结束。scheduleWithFixedDelay()
间隔时间是从上个任务完成时开始计算,只有当上个任务结束才会开始计时。
三、ThreadPoolExecutor
ThreadPoolExecutor
是开发中使用相对较为频繁的线程资源池,其提供一系列初始化参数用于控制线程池的资源占用,下面重点进行介绍。
1. 核心参数
ThreadPoolExecutor
线程池的核心配置参数参考下表:
方法 | 作用 |
---|---|
getPoolSize() | 获取线程池当前的线程数量。 |
getActiveCount() | 获取当前线程池中正在执行任务的线程数量。 |
getTaskCount() | 获取线程池已经执行的和未执行的任务总数。 |
getCompletedTaskCount() | 获取线程池已完成的任务数量,该值小于等于。 |
getLargestPoolSize() | 获取线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。 |
2. 执行流程
当在代码中向线程池提交一个线程任务时,系统将会根据下述流程决定线程的创建销毁与否。
(Ⅰ) 判断线程池中当前存活线程数量是否达到了
corePoolSize
- 达到 :进入下一步。
- 未达到 :新建线程运行此任务,且任务结束后将该线程保留在线程池中,不做销毁处理。
(Ⅱ) 判断工作队列 (
queueCapacity
) 是否已满。
- 已满 :进入下一步。
- 未满 :将新的任务提交到工作队列中进入等待。
(Ⅲ) 判断线程池中存活的线程数量是否达到了
maxPoolSize
- 达到 :使用
饱和策略
来处理这个任务。 - 未达到 :新建一个工作线程来执行这个任务。
需要注意一点:在线程池中的线程数量超过 corePoolSize
时,每当有线程的空闲时间超过了 keepAliveTime
,这个线程就会被终止,直到线程池中线程的数量不大于 corePoolSize
为止。
3. 阻塞队列
线程池的等待队列用于保存待执行的任务的阻塞队列,包含以下四类:
(Ⅰ) ArrayBlockingQueue
有界阻塞队列。基于数组的先进先出队列 (FIFO)
,创建时必须指定大小。
(Ⅱ) LinkedBlockingQueue
无界阻塞队列,基于链表的先进先出队列 (FIFO)
,如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE
。
吞吐量通常要高于 ArrayBlockingQueue
,Executors.newFixedThreadPool
使用了这个队列。若使用 LinkedBlockingQueue
则 maximumPoolSize
将不起作用,线程池能创建的最大线程数为 corePoolSize
,因为任务等待队列是无界队列。
(Ⅲ) SynchronousQueue
不会保存提交的任务,而是将直接新建一个线程来执行新来的任务,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
SynchronousQueue
吞吐量通常要高于 LinkedBlockingQueue
, Executors.newCachedThreadPool
使用了这个队列。
(Ⅳ) PriorityBlockingQueue
PriorityBlockingQueue
是具有优先级的无界阻塞队列。
4. 饱和策略
通过饱和策略用于管理当线程资源已满时如何处理新提交的线程任务。
策略 | 描述 |
---|---|
AbortPolicy | 中止策略,默认的饱和策略,该策略将抛出未检查的 RejectedExecutionException。 |
DiscardPolicy | 抛弃策略,会悄悄抛弃该任务,当新提交的任务无法保存到队列中等待执行时。 |
DiscardOldestPolicy | 抛弃最旧的策略,会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。 |
CallerRunsPolicy | 该策略既不会抛弃任务,也不会抛出异常,而是由调用线程(提交任务的线程)处理该任务。 |
5. 创建示例
具体的 ThreadPoolExecutor
线程池创建示例如下:
public void threadPoolDemo() throws InterruptedException {
// 核心线程数
int coreSize = 4;
// 最大线程数量
int maxSize = 8;
// 线程保持活动的时间
long keepAliveTime = 60;
// keepAliveTime 的时间单位
TimeUnit unit = TimeUnit.SECONDS;
// 保存等待执行的任务的阻塞队列
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
// 线程工厂, 可通过线程工厂设置线程名字
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 线程饱和策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
// 任务提交
for (int i = 0; i < 3; i++) {
executor.execute(new Task("Task - " + i));
}
}
四、Future类
1. 基本介绍
众所周知,线程任务在提交之后执行是异步的,因此其需要载体接收最终的执行结果,Future
类正如字面意思为未来可能的值,用于声明线程返回值。
例如在通过线程池通过 submit()
提交任务时,其返回结果即为 Future<T>
泛型,当任务执行完成之后可通过 get()
方法获取结果。
public void futureDemo() {
ExecutorService executor = Executors.newCachedThreadPool();
// submit the task
Future<String> future = executor.submit(() -> {
return "Task message from thread pool.";
});
try {
// 判断任务是否结束
boolean status = future.isDone();
System.out.println("status: " + status);
// 获取结果,进入阻塞
String result1 = future.get();
System.out.println("result1: " + result1);
// 等待后获取结果,阻塞指定时长后未取到退出
String result2 = future.get(5, TimeUnit.SECONDS);
System.out.println("result2: " + result2);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
2. 任务取消
当提交一个线程任务时可通过 cancel()
方法取消该任务,但注意其并非实时,仅是发送了一个信号通知线程取消该任务。
public void cancelDemo() throws Exception {
Future<?> future = executor.submit(() -> {
try {
int i = 0;
while (true) {
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(++i);
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
System.out.println("Task was Interrupt.");
}
e.printStackTrace();
}
});
TimeUnit.MILLISECONDS.sleep(1000);
boolean cancel = future.cancel(true);
System.out.println("Is cancel: " + cancel);
}
五、ForkJoinPool
1. 基本介绍
ForkJoinPool
是 Java SE 7
中引入的一个用于实现“分而治之”并行算法的线程池,通过递归式的并行任务处理,将一个大任务拆分成若干个小任务,并这些小任务放到多个处理器上并行处理,最后将结果合并。
ForkJoinPool
支持工作窃取算法,即当一个线程的任务执行完毕后会随机选择其他线程的任务进行处理,有效地利用了多核 CPU
的性能,从而以达到任务的均衡执行。
2. 方法参数
ForkJoinPool
无参构造函数创建的线程池资源核心线程数默认为 CPU
的核心数的两倍。
如果不想使用默认的核心线程数,可以通过 new ForkJoinPool(n)
指定核心线程数大小。
方法 | 描述 |
---|---|
getPoolSize() | 获取已创建启用的线程数。 |
getActiveThreadCount() | 获取执行任务中的线程数。 |
getParallelism() | 获取并行数,为 CPU 核心数两倍。 |
public void forkJoinDemo() {
// 创建默认 forkjoin 线程池
ForkJoinPool forkJoinPool = new ForkJoinPool()
for (int i = 0; i < 5; i++) {
// 提交线程任务
forkJoinPool.submit(() -> {
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
3. 任务结果
与普通线程池的 Feature
类似,在 ForkJoinPool
中返回的结果为 ForkJoinTask
。
ForkJoinTask
拥有两个实现类:RecursiveTask
与 RecursiveAction
,二者最核心的区别在于后者没有返回值。
(1) RecursiveTask
RecursiveTask
所提交任务有返回值。
public void forkJoinTaskDemo() throws Exception {
ForkJoinTask<String> task1 = new RecursiveTask<String>() {
@Override
protected String compute() {
return "RecursiveTask compute.";
}
};
System.out.println("result: " + task1.get());
}
(2) RecursiveAction
RecursiveAction
所提交任务无返回值。
public void forkJoinTaskDemo() throws Exception {
ForkJoinTask<Void> task2 = new RecursiveAction() {
@Override
protected String compute() {
System.out.println("RecursiveAction compute.");
}
};
}
六、CompletableFuture
CompletableFuture
实现扩展了 Future
接口,默认使用 Fork-Join
线程池框架。
1. supplyAsync()
CompletableFuture
提供了 supplyAsync()
与 runAsync()
两种任务提交方式,二者区别在于后者没有返回值。
通过 handle()
回调方法可实现对子任务执行状态监控处理,注意使用 runAsync()
时 handle()
也只能捕获到时候异常而无返回值。
方法 | 作用 |
---|---|
get() | 获取任务执行结果,此时线程阻塞。 |
get(long, timeunit) | 获取任务执行结果,指定时间未取到则继续向下执行。 |
join() | 阻塞队列,与常见阻塞不同其同时返回任务结果。 |
handle() | 同步阻塞处理任务执行结果与异常。 |
handleAsync() | 异步非阻塞处理任务执行结果与异常。 |
public void completableFutureDemo() {
//
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
int time = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f1: " + time;
});
// join(): block and return the value
String data = f1.join();
System.out.println(data);
f1.handle((res, e) -> {
if (e != null) {
System.out.println("Error: " + e.getMessage());
} else {
System.out.println("Successful: " + res);
}
return null;
});
System.out.println("Quit program.");
}
2. thenApply()
通过 thenApply()
关联任务,当任务一结束时才能触发任务二。
其中 res
变量为第一个线程方法执行结果。
public void thenApplyDemo() {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
return "Task-1";
});
CompletableFuture<String> future = f1.thenApply(res -> {
String.format("params = %s", res)
});
// params = Task-1
System.out.println(future.join());
}
3. thenRun()
thenRun()
与 thenApply()
类似用于关联任务,但其执行没有返回值。
public void thenRunDemo() {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
return "Task-1";
});
CompletableFuture<String> future = f1.thenApply(() -> {
System.out.println("Trigger function")
});
// null
System.out.println(future.join());
}
4. applyToEither()
applyToEither()
可同时传入两个任务,无论哪个任务任务结果都会触发任务三。
如下述示例中无论是 f1.applyToEither(f2)
还是 f2.applyToEither(f1)
最终输出的结果的 future = Task-1
,因为 f1
总是在在 f2
前执行完成。
public void applyToEitherDemo() {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
return "Task-1";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
return "Task-2";
});
CompletableFuture<String> future1 = f1.applyToEither(f2, res -> {
// res = futures[0] result
return String.format("future = %s.", res);
});
// future = Task-1
System.out.println(future1.join());
CompletableFuture<String> future2 = f2.applyToEither(f1, res -> {
// res = futures[0] result
return String.format("future = %s.", res);
});
// future = Task-1
System.out.println(future2.join());
}
5. thenCombine()
thenCombine()
与 thenApply()
类似可以关联任务定义执行顺序。
public void combineDemo() {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
return "Task-1";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
return "Task-2";
});
CompletableFuture<String> future = f1.thenCombine(f2, (res1, res2) -> {
// res1 = f1 result
// res2 = f2 result
return String.format("f1 = %s, f2 = %s ", res1, res2);
});
System.out.println(future.join());
}