Java线程池详解


当我们需要使用线程时,就需要每次手动进行创建,显然很费时费力,线程池的作用就是简化这一步骤。

你可以简单把线程池理解为出租公司,里面的每一辆出租车就是一个线程,当有顾客(任务)上门时,顾客可以直接使用(提交任务),当使用介绍后再还回(关闭线程)。

在 Java 中为此提供了四大类线程池类型,下面分别进行介绍。

一、Executor

Executor 为所有线程线程池的顶级抽象类,提供了 execute() 方法用于提交线程子任务。

public void executorDemo() {
    Executor executor = Executors.newSingleThreadExecutor();
    executor.execute(() -> {
        System.out.println("Task 1.");
    });
}

在通过线程池提供任务时若线程任务中存在状态值变更操作时一定需要注意异常的情况,因为线程池子任务在抛出异常时并不会中断主线程的执行,而没有手动进行 try catch 则会发生无法预料的结果。

如下两个线程池示例中,在 demo1() 中并无法在主线程中捕获到异常信息,而会被线程池吞掉,在 demo2() 中通过显式的捕获即可正常拦截异常信息。

@Test
public void demo1() {
    Executor executor = Executors.newSingleThreadExecutor();
    executor.execute(() -> {
        throw new RuntimeException("You can't receive this message");
    });
}

@Test
public void demo2() {
    Executor executor = Executors.newSingleThreadExecutor();
    executor.execute(() -> {
        try {
            throw new RuntimeException("You can receive this message");
        } catch (Exception e) {
            System.out.println("Info" + e.getMessage());
        }
    });
}

二、ExecutorService

1. 基本介绍

ExecutorService 扩展了 Executor 接口, 支持有返回值的线程与管理线程生命周期。即可通过 submit()invokeAny() 提交 RunnableCallable 两类有返回值线程任务。

同时 ExecutorService 提供了 shutdown()、shutdownNow()、isShutdown() 等方法用于控制线程池的启停,当关闭线程池后无法继续提交线程任务,但已提交的线程任务仍然继续执行,待所有任务完成后则释放线程池资源。

2. 创建示例

(1) 单线程池

通过 newSingleThreadExecutor() 创建线程数为一的线程池,当提前的任务大于 1 ,其余任务将进入等待。

如下示例中每间隔 2 秒才会打印一次。

public void demo1() {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 5; i++) {
        int finalI = i;
        executor.submit(() -> {
            System.out.println("Task-" + finalI + " running.");
            try {
                 TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }
}  
(2) 固定线程池

通过 newFixedThreadPool(n) 创建指定线程数的线程池,线程池大小在初始化声明之后便无法改变。

如下示例中每间隔 2 秒打印 3 条信息。

public void demo2() {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 6; i++) {
        int finalI = i;
        executor.submit(() -> {
            System.out.println("Task-" + finalI + " running.");
            try {
                 TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    // 关闭线程池
    executor.shutdown();
}  
(3) 自适应线程池

通过 newCachedThreadPool() 创建自适应线程数连接池,根据提交的任务数量自行增减。

如下示例中将一次性打印 5 条信息。

public void demo3() {
    ExecutorService executor = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
        int finalI = i;
        executor.submit(() -> {
            System.out.println("Task-" + finalI + " running.");
            try {
                 TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }
}

三、定时线程池

ScheduledExecutorServiceExecutorService 的基础上再次进行扩展,可用于创建定时线程任务资源池。

1. 创建示例

通过 schedule() 方法即可提交定时线程任务,下面通过示例进行详细介绍。

通过 newScheduledThreadPool() 创建定时任务线程池资源。

public void scheduledPoolDemo1() throws Exception {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    // 当前时间 3 秒后执行一次
    int interval = 3;
    executor.schedule(() -> {
        System.out.println("Task-1 running.");
    }, interval, TimeUnit.SECONDS);
    
    // 等待子线程结果
    TimeUnit.SECONDS.sleep(6);
}

2. 线程重复

除了延迟触发,也可通过 scheduleAtFixedRate()scheduleWithFixedDelay() 设置线程任务定期往复执行,

public void scheduledPoolDemo2() {
    int start = 3;
    int interval = 3;
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    // 当前时间 1 秒后执行, 且每隔 3 秒重复执行
    // (间隔时间:上一次任务开始时计时)
    executor.scheduleAtFixedRate(new Task("fixed-rate"), start, interval, TimeUnit.SECONDS);

    // 当前时间 1 秒后执行, 上一任务执行完成 3 秒后重复执行
    // (间隔时间:上一次任务结束时计时)
    executor.scheduleWithFixedDelay(new Task("fixed-delay"), start, interval, TimeUnit.SECONDS);
    executor.shutdown();
}

需要注意 scheduleAtFixedRate scheduleWithFixedDelay 的区别。

  • scheduleAtFixedRate()
    间隔时间是从上一个任务开始时计算,无论上个任务是否已经结束。
  • scheduleWithFixedDelay()
    间隔时间是从上个任务完成时开始计算,只有当上个任务结束才会开始计时。
    scheduleAtFixedRate 与 scheduleWithFixedDelay

四、ThreadPoolExecutor

1. 核心参数

ThreadPoolExecutor 是开发中最为常用的线程资源池,其提供一系列初始化参数用于控制线程池的资源占用,下面重点进行介绍。

方法 作用
getPoolSize() 获取线程池当前的线程数量。
getActiveCount() 获取当前线程池中正在执行任务的线程数量。
getTaskCount() 获取线程池已经执行的和未执行的任务总数。
getCompletedTaskCount() 获取线程池已完成的任务数量,该值小于等于。
getLargestPoolSize() 获取线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。

2. 执行流程

当在代码中向线程池提交一个线程任务时,系统将会根据下述流程决定线程的创建销毁与否。

(Ⅰ) 判断线程池中当前存活线程数量是否达到了 corePoolSize

  • 达到 :进入下一步。
  • 未达到 :新建线程运行此任务,且任务结束后将该线程保留在线程池中,不做销毁处理。

(Ⅱ) 判断工作队列 (queueCapacity) 是否已满。

  • 已满 :进入下一步。
  • 未满 :将新的任务提交到工作队列中进入等待。

(Ⅲ) 判断线程池中存活的线程数量是否达到了 maxPoolSize

  • 达到 :使用 饱和策略 来处理这个任务。
  • 未达到 :新建一个工作线程来执行这个任务。

需要注意一点:在线程池中的线程数量超过 corePoolSize 时,每当有线程的空闲时间超过了 keepAliveTime ,这个线程就会被终止,直到线程池中线程的数量不大于 corePoolSize 为止。

3. 阻塞队列

线程池的等待队列用于保存待执行的任务的阻塞队列,包含以下四类:

(Ⅰ) ArrayBlockingQueue

有界阻塞队列。基于数组的先进先出队列 (FIFO),创建时必须指定大小。

(Ⅱ) LinkedBlockingQueue

无界阻塞队列,基于链表的先进先出队列 (FIFO),如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE

吞吐量通常要高于 ArrayBlockingQueueExecutors.newFixedThreadPool 使用了这个队列。若使用 LinkedBlockingQueuemaximumPoolSize 将不起作用,线程池能创建的最大线程数为 corePoolSize ,因为任务等待队列是无界队列。

(Ⅲ) SynchronousQueue

不会保存提交的任务,而是将直接新建一个线程来执行新来的任务,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。

SynchronousQueue 吞吐量通常要高于 LinkedBlockingQueueExecutors.newCachedThreadPool 使用了这个队列。

(Ⅳ) PriorityBlockingQueue

PriorityBlockingQueue 是具有优先级的无界阻塞队列。

4. 饱和策略

通过饱和策略用于管理当线程资源已满时如何处理新提交的线程任务。

策略 描述
AbortPolicy 中止策略,默认的饱和策略,该策略将抛出未检查的 RejectedExecutionException。
DiscardPolicy 抛弃策略,会悄悄抛弃该任务,当新提交的任务无法保存到队列中等待执行时。
DiscardOldestPolicy 抛弃最旧的策略,会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。
CallerRunsPolicy 该策略既不会抛弃任务,也不会抛出异常,而是由调用线程(提交任务的线程)处理该任务。

5. 创建示例

具体的 ThreadPoolExecutor 线程池创建示例如下:

public void threadPoolDemo() throws InterruptedException {
    // 核心线程数
    int coreSize = 4;
    // 最大线程数量
    int maxSize = 8;
    // 线程保持活动的时间
    long keepAliveTime = 60;
    // keepAliveTime 的时间单位
    TimeUnit unit = TimeUnit.SECONDS;
    // 保存等待执行的任务的阻塞队列
    BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
    // 线程工厂, 可通过线程工厂设置线程名字
    ThreadFactory threadFactory = Executors.defaultThreadFactory();
    // 线程饱和策略
    RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, unit,
                workQueue, threadFactory, handler);
    for (int i = 0; i < 3; i++) {
        executor.execute(new Task("Task - " + i));
    }
}

五、ForkJoinPool

1. 基本介绍

ForkJoinPoolJava SE 7 中引入的一个用于实现“分而治之”并行算法的线程池。它可以处理递归式的并行任务,将一个大任务拆分成若干个小任务,将这些小任务放到多个处理器上并行处理,最后将结果合并。

ForkJoinPool 支持工作窃取算法,即当一个线程的任务执行完毕后会随机选择其他线程的任务进行处理,从而以达到任务的均衡执行,有效地利用了多核 CPU 的性能,因此被广泛应用于大规模的并行计算和多线程编程领域。

2. 创建示例

ForkJoinPool 无参构造函数创建的线程池资源核心线程数默认为 CPU 的核心数的两倍。

如果不想使用默认的核心线程数,可以通过 new ForkJoinPool(n) 指定核心线程数大小。

public void forkJoinDemo() {
    // 创建默认 forkjoin 线程池
    ForkJoinPool forkJoinPool = new ForkJoinPool()
    for (int i = 0; i < 5; i++) {
        forkJoinPool.submit(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    // 线程池基本信息获取
    int size = forkJoinPool.getPoolSize();
    System.out.println("size: " + size);
    int active = forkJoinPool.getActiveThreadCount();
    System.out.println("active: " + active);
    long count = forkJoinPool.getStealCount();
    System.out.println("stealCount: " + count);
    long parallelism = forkJoinPool.getParallelism();
    System.out.println("parallelism: " + parallelism);
}

3. 线程任务

普通线程任务实现了 RunnableCallable 接口,而 RecursiveTaskRecursiveTask 继承自 ForkJoinTask , 又因 ForkJoinTask 实现了 FutureComparable 接口,故可获得异步计算的结果和比较操作。

RecursiveTaskRecursiveTask 可通过调用 fork() 方法启动新的子任务,然后调用 join() 方法等待子任务的结果并将其合并,可以更好地利用多核 CPU 的优势。

(1) RecursiveTask

RecursiveTask 所提交任务有返回值。

public void forkJoinTaskDemo() throws Exception {
    ForkJoinTask<String> task1 = new RecursiveTask<String>() {
        @Override
        protected String compute() {
            System.out.println("RecursiveTask compute.");
            return "RecursiveTask compute.";
        }
    };
}
(2) RecursiveAction

RecursiveAction 所提交任务无返回值。

public void forkJoinTaskDemo() throws Exception {
    ForkJoinTask<Void> task2 = new RecursiveAction() {
        @Override
        protected String compute() {
            System.out.println("RecursiveAction compute.");
        }
    };
}

六、Future类

1. 基础示例

Future 类正如字面意思为未来可能的值,用于声明线程返回值。

其同样提供了一系列接口方法,相关示例如下:

public void futureDemo() {
    ExecutorService executor = Executors.newCachedThreadPool();
    // submit the task
    Future<String> future = executor.submit(() -> {
        System.out.println("Task message from thread pool.");
    });

    try {
        // 判断任务是否结束
        boolean status = future.isDone();
        System.out.println("status: " + status);

        // 获取结果,进入阻塞
        String result1 = future.get();
        System.out.println("result1: " + result1);

        // 等待后获取结果,阻塞指定时长后未取到退出
        String result2 = future.get(5, TimeUnit.SECONDS);
        System.out.println("result2: " + result2);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

2. 任务取消

当提交一个线程任务时可通过 cancel() 方法取消该任务,但注意其并非实时,仅是发送了一个信号通知线程取消该任务。

public void cancelDemo() throws Exception {
    Future<?> future = executor.submit(() -> {
        try {
            int i = 0;
            while (true) {
                TimeUnit.MILLISECONDS.sleep(200);
                System.out.println(++i);
            }
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                System.out.println("Task was Interrupt.");
            }
            e.printStackTrace();
        }
    });
    TimeUnit.MILLISECONDS.sleep(1000);
    boolean cancel = future.cancel(true);
    System.out.println("Is cancel: " + cancel);
}

七、CompletableFuture

CompletableFuture 实现扩展了 Future 接口,默认使用 Fork-Join 线程池框架。

1. supplyAsync()

CompletableFuture 提供了 supplyAsync()runAsync() 两种任务提交方式,二者区别在于后者没有返回值。

通过 handle() 回调方法可实现对子任务执行状态监控处理,注意使用 runAsync()handle() 也只能捕获到时候异常而无返回值。

方法 作用
get() 获取任务执行结果,此时线程阻塞。
get(long, timeunit) 获取任务执行结果,指定时间未取到则继续向下执行。
join() 阻塞队列,与常见阻塞不同其同时返回任务结果。
handle() 同步阻塞处理任务执行结果与异常。
handleAsync() 异步非阻塞处理任务执行结果与异常。
public void completableFutureDemo() {
    // 
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        int time = new Random().nextInt(3);
        try {
            TimeUnit.SECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "f1: " + time;
    });
    
    // join(): block and return the value
    String data = f1.join();
    System.out.println(data);

    f1.handle((res, e) -> {
        if (e != null) {
            System.out.println("Error: " + e.getMessage());
        } else {
            System.out.println("Successful: " + res);
        }
        return null;
    });
    System.out.println("Quit program.");
}

2. thenApply()

通过 thenApply() 关联任务,当任务一结束时才能触发任务二。

其中 res 变量为第一个线程方法执行结果。

public void thenApplyDemo() {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            System.err.println(e.getMessage());
        }
        return "Task-1";
    });
    CompletableFuture<String> future = f1.thenApply(res -> {
        String.format("params = %s", res)
    });
    // params = Task-1
    System.out.println(future.join());
}

3. thenRun()

thenRun()thenApply() 类似用于关联任务,但其执行没有返回值。

public void thenRunDemo() {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            System.err.println(e.getMessage());
        }
        return "Task-1";
    });
    CompletableFuture<String> future = f1.thenApply(() -> {
        System.out.println("Trigger function")
    });
    // null
    System.out.println(future.join());
}

4. applyToEither()

applyToEither() 可同时传入两个任务,无论哪个任务任务结果都会触发任务三。

如下述示例中无论是 f1.applyToEither(f2) 还是 f2.applyToEither(f1) 最终输出的结果的 future = Task-1,因为 f1 总是在在 f2 前执行完成。

public void applyToEitherDemo() {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            System.err.println(e.getMessage());
        }
        return "Task-1";
    });
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            System.err.println(e.getMessage());
        }
        return "Task-2";
    });


    CompletableFuture<String> future1 = f1.applyToEither(f2, res -> {
        // res = futures[0] result
        return String.format("future = %s.", res);
    });
    // future = Task-1
    System.out.println(future1.join());

    CompletableFuture<String> future2 = f2.applyToEither(f1, res -> {
        // res = futures[0] result
        return String.format("future = %s.", res);
    });
    // future = Task-1
    System.out.println(future2.join());
}

5. thenCombine()

thenCombine()thenApply() 类似可以关联任务定义执行顺序。

public void combineDemo() {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            System.err.println(e.getMessage());
        }
        return "Task-1";
    });
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            System.err.println(e.getMessage());
        }
        return "Task-2";
    });

    CompletableFuture<String> future = f1.thenCombine(f2, (res1, res2) -> {
        // res1 = f1 result
        // res2 = f2 result
        return String.format("f1 = %s, f2 = %s ", res1, res2);
    });
    System.out.println(future.join());
}

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录