Java道经第1卷 - 第7阶 - 并发编程(五)
传送门:JB1-7-并发编程(一)
传送门:JB1-7-并发编程(二)
传送门:JB1-7-并发编程(三)
传送门:JB1-7-并发编程(四)
传送门:JB1-7-并发编程(五)
文章目录
S09. 线程池
E01. 线程池基础概念
心法:阿里巴巴 Java 开发手册中指出,线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,一方面是线程的创建更加规范,可以合理控制开辟线程的数量,一方面线程的细节管理交给线程池处理,优化了资源的开销。
线程池使用场景:一般情况下,一个请求需要至少开启一个线程来执行具体的请求内容,若请求数量特别多,且请求内容处理的时间非常短,则会造成频繁的创建和销毁线程的过程,系统开销大,此时建议使用线程池来解决该问题。
线程池优点:
- 线程池可以维护多个线程的生命周期。
- 线程池可以通过线程复用来提高线程的利用率。
- 线程池可以提前创建好一批线程,在请求到达时线程直接获取,避免了线程的频繁创建和销毁。
- 线程池中可以适当地调整线程数,以对请求进行限流,从而可以防止资源不足。
线程池缺点:
- 线程池本身也是需要维护的,当线程数量特别少,或者执行时间特别长的时候,不建议使用。
- 线程池容易遭受的并发风险,比如同步错误,与池有关的死锁,资源不足和线程泄漏。
线程池使用流程:
- 创建一个线程池,并进行基础的配置:
- 可以通过
ThreadPoolExecutor
类来创建,推荐。 - 可以通过
Executors
类来创建,是前者的上层封装,不灵活,不推荐。
- 可以通过
- 将 Runnable 或 Callable 线程任务提交到线程池中。
- 线程池自动为该任务分配线程并负责该线程从启动,运行到销毁的全部生命周期活动。
线程池任务提交相关 API 方法:以下两个提交任务的方法均不会阻塞当前所在线程:
threadPool.execute(线程任务)
:- 提交 Runnable 任务到线程池,无返回值。
- 该方法不支持提交 Callable 任务到线程池。
threadPool.submit(线程任务)
:- 提交 Runnable 任务到线程池,返回一个
Future<?>
实例,实例中数据为空。 - 提交 Callable 任务到线程池,返回一个
Future<返回值类型>
实例。
- 提交 Runnable 任务到线程池,返回一个
threadPool.shutdown()
:不再新增线程,发出停止信号,等所有线程执行完毕,关闭线程池,节省资源。threadPool.shutdownNow()
:不再新增线程,立即关闭线程池,节省资源。
1. Runnable线程任务
心法:Runnable 函数式线程接口,内部仅包含一个
run()
方法,无返回值,不抛出异常。
Runnable 和线程池配合使用流程如下:
武技:测试将 Runnable 任务提交到线程池
package pool;
public class RunnableThreadPoolTest {
@SneakyThrows
@Test
public void testExecute() {
// 创建一个可缓存的线程池
ExecutorService threadPool = Executors.newCachedThreadPool();
System.out.println("0s: 线程池创建完毕");
// 创建一个Runnable任务
Runnable runnableTask = () -> {
try {
System.out.println("0s: runnable任务睡眠");
TimeUnit.SECONDS.sleep(2L);
System.out.println("2s: runnable任务执行完毕");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
// 将Runnable任务异步提交到线程池中,该方法不会阻塞主线程
threadPool.execute(runnableTask);
System.out.println("0s: 若我立刻输出,则证明 execute 方法不阻塞主线程");
// 不再新增线程,发出停止信号,等所有线程执行完毕,关闭线程池,节省资源
threadPool.shutdown();
System.out.println("2s: 关闭线程池");
System.out.println("2s: 主线程退出");
}
}
2. Callable线程任务
心法:Callable 函数式线程接口,内部仅包含一个
call()
方法,有返回值,会抛出 Exception 异常。
Callable 和线程池配合使用流程如下:
武技:测试将 Callable 任务提交到线程池
package pool;
public class CallableThreadPoolTest {
@SneakyThrows
@Test
public void testSubmit() {
// 创建一个可缓存的线程池
ExecutorService threadPool = Executors.newCachedThreadPool();
System.out.println("0s: 线程池创建完毕");
// 创建一个Callable任务
Callable<Integer> callableTask = () -> {
System.out.println("0s: callable任务睡眠");
TimeUnit.SECONDS.sleep(2L);
System.out.println("2s: callable任务执行完毕");
return 9527;
};
// 将Callable任务异步提交到线程池中,该方法不会阻塞主线程
Future<Integer> callableResult = threadPool.submit(callableTask);
System.out.println("0s: 若我立刻输出,则证明 submit 方法不阻塞");
// 获取任务方法的返回值,该方法会阻塞主线程,直到获取到结果
System.out.println("2s: 获取到call方法返回值 -> " + callableResult.get());
System.out.println("2s: 若我立刻输出,则证明 get 方法不阻塞");
// 不再新增线程,发出停止信号,等所有线程执行完毕,关闭线程池,节省资源
threadPool.shutdown();
System.out.println("2s: 关闭线程池");
System.out.println("2s: 主线程退出");
}
}
3. FutureTask
心法:j.u.c.FutureTask 类既实现了 Runnable 接口,又实现了 Future 接口,所以它既能作为一个线程类,又能直接存储 Callable 任务的结果。
FutureTask 相关 API 方法:
new FutureTask<>(Callable任务)
:创建一个 FutureTask 实例。new Thread(futureTask)
:FutureTask 可以直接作为一个实现了 Runnable 接口的线程类。futureTask.get()
:获取任务方法的返回值,该方法会阻塞主线程,直到获取到结果。
武技:测试 FutureTask 类
package pool;
/** @author 周航宇 */
public class FutureTaskTest {
@SneakyThrows
@Test
public void testFutureTask() {
System.out.println("0s: 子线程启动");
// 创建一个FutureTask实例,参数是一个Callable任务
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println("0s: Callable任务睡眠");
TimeUnit.SECONDS.sleep(2L);
System.out.println("2s: Callable任务结束,返回9527");
System.out.println("2s: 子线程退出");
return 9527;
});
// futureTask实现了Runnable接口,是一个线程类
new Thread(futureTask).start();
// futureTask实现了Future接口,线程体方法支持返回值
// 获取线程中call()的返回值,这个方法会阻塞主线程,直到获取到结果
Integer callableResult = futureTask.get();
System.out.println("2s: call方法的返回值: " + callableResult);
System.out.println("2s: 主线程退出");
}
}
4. CompletableFuture
心法:j.u.c.CompletableFuture 类可以对多个任务进行异步执行和统一管理。
CompletableFuture 常用 API 方法:
CompletableFuture.supplyAsync(线程任务)
:异步执行一个 Callable 线程任务,要返回一个Future<返回值类型>
结果。CompletableFuture.runAsync(线程任务)
:异步执行一个 Runnable 线程任务,即使没有返回值,也要返回一个Future<Void>
结果。CompletableFuture.allOf(FutureA, FutureB ...)
:汇总所有的 Future 结果,得到一个新的Future<Void>
结果。
武技:测试 CompletableFuture 异步调用耗时情况
- 开发 6 个任务方法,3 个带返回值,3 个不带返回值:
package pool;
/** @author 周航宇 */
public class CompletableFutureTest {
@SneakyThrows
private String taskA() {
TimeUnit.SECONDS.sleep(1L);
System.out.println("1s: taskA执行完毕");
return "A";
}
@SneakyThrows
private String taskB() {
TimeUnit.SECONDS.sleep(2L);
System.out.println("2s: taskB执行完毕");
return "B";
}
@SneakyThrows
private String taskC() {
TimeUnit.SECONDS.sleep(3L);
System.out.println("3s: taskC执行完毕");
return "C";
}
@SneakyThrows
private void taskD() {
TimeUnit.SECONDS.sleep(1L);
System.out.println("1s: taskD执行完毕");
}
@SneakyThrows
private void taskE() {
TimeUnit.SECONDS.sleep(2L);
System.out.println("2s: taskE执行完毕");
}
@SneakyThrows
private void taskF() {
TimeUnit.SECONDS.sleep(3L);
System.out.println("3s: taskF执行完毕");
}
}
- 测试同步调用 6 个任务时的总耗时:
package pool;
/** @author 周航宇 */
public class CompletableFutureTest {
/** 同步执行6个任务,计算总耗时 */
@SneakyThrows
@Test
public void testTask() {
long start = System.currentTimeMillis();
System.out.println("1s: taskA返回值: " + taskA());
System.out.println("2s: taskB返回值: " + taskB());
System.out.println("3s: taskC返回值: " + taskC());
taskD();
taskE();
taskF();
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start));
}
}
- 测试使用 CompletableFuture 异步调用 6 个任务的总耗时:
package pool;
/** @author 周航宇 */
public class CompletableFutureTest {
/** 使用CompletableFuture异步执行6个任务,计算总耗时 */
@SneakyThrows
@Test
public void testTaskByCompletableFuture() {
long start = System.currentTimeMillis();
// supplyAsync(): 执行三个带返回值的异步任务
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(this::taskA);
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(this::taskB);
CompletableFuture<String> futureC = CompletableFuture.supplyAsync(this::taskC);
// runAsync(): 执行三个不带返回值的异步任务
CompletableFuture<Void> futureD = CompletableFuture.runAsync(this::taskD);
CompletableFuture<Void> futureE = CompletableFuture.runAsync(this::taskE);
CompletableFuture<Void> futureF = CompletableFuture.runAsync(this::taskF);
// 任务汇总: 当六个future全部完成之后,汇总到一个新的future实例中
CompletableFuture<Void> future = CompletableFuture.allOf(
futureA, futureB, futureC, futureD, futureE, futureF);
// 让任务汇总工作插主线程的队
future.join();
// 当futureA/futureB/futureC任务均执行完毕后,获取其返回值
if (futureA.isDone() && futureB.isDone() && futureC.isDone()) {
System.out.println("taskA返回值: " + futureA.get());
System.out.println("taskB返回值: " + futureB.get());
System.out.println("taskC返回值: " + futureC.get());
}
long end = System.currentTimeMillis();
// 计算总耗时
System.out.println("耗时:" + (end - start));
}
}
E02. ThreadPoolExecutor
心法:j.u.c.ThreadPoolExecutor 用于创建线程池,每个线程池内部都维护了一个 HashSet 结构的线程集合和一个任务队列。
1. 线程池核心参数
心法 ThreadPoolExecutor 构造器拥有 7 个参数。
参数具体如下:
int corePoolSize
: 核心线程数,线程池维护的最少线程数量,即使空闲也不会给归还给 OS。int maximumPoolSize
:线程池维护的最多线程数量,即当核心线程不够了,最大能拓展到多少。long keepAliveTime
:非核心线程所允许的最大的空闲时间,某个线程的空闲时间如果超过此指定值,会被归还给 OS。TimeUnit unit
:最大的空闲时间单位。BlockingQueue<Runnable> workQueue
:线程池所使用的工作队列类型。ThreadFactory threadFactory
:线程工厂,用于产生线程,可以自定义,默认使用DefaultThreadFactory 类。RejectedExecutionHandler handler
:线程池对拒绝任务采取的拒绝策略,默认使用 AbortPolicy 策略类。
武技:测试通过 ThreadPoolExecutor 创建线程池
package pool;
/** @author 周航宇 */
public class ThreadPoolExecutorTest {
@SneakyThrows
@Test
public void testThreadPoolExecutor() {
// 创建一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
// 核心线程数,线程池维护的最少线程数量,即使空闲也不会给归还给OS
2,
// 线程池维护的最多线程数量,即当核心线程不够了,最大能拓展到多少
4,
// 非核心线程所允许的最大的空闲时间,某个线程的空闲时间如果超过此指定值,会被归还给OS
3, TimeUnit.SECONDS,
// 线程池所使用的工作队列类型,即BlockingQueue类型
new ArrayBlockingQueue<>(3),
// 线程工厂,用于产生线程,默认使用DefaultThreadFactory
Executors.defaultThreadFactory(),
// 线程池对拒绝任务采取的拒绝策略,默认使用AbortPolicy
new ThreadPoolExecutor.AbortPolicy()
);
// Runnable线程任务使用 execute() 提交到线程池中
threadPool.execute(() -> System.out.println("runnable.."));
TimeUnit.SECONDS.sleep(2L);
// Callable线程任务使用 submit() 提交到线程池中
Future<Integer> future = threadPool.submit(() -> {
System.out.println("callable..");
return 9527;
});
// 获取Callable任务实例的返回值
System.out.println(future.get());
// 关闭线程池
threadPool.shutdown();
}
}
2. 线程池处理任务流程
心法:ThreadPoolExecutor 接收任务整体过程中,若某个线程执行完任务,则会优先从等待队列中拉取任务继续执行,而新来的任务继续进入等待队列。
流程模拟:假设我们设置核心线程数为 2,最大线程数为 4,任务队列最大值为 2,那么接收任务的流程如下图:
3. 核心参数设计原则
心法:CPU 密集型任务多就用小线程池(避免过度线程切换,加重 CPU 消耗),IO 密集型任务多就用大线程池(让 CPU 在等待 IO 时,有其他线程去处理别的任务,充分利用 CPU 时间)。
CPU 密集型任务:CPU 利用率很高,但 IO 读写次数很低,比如数据加密,文件压缩,复杂计算等。
IO 密集型任务:CPU 利用率很低,但 IO 读写很耗时,比如文件读写,数据库通信,网络通信等。
ThreadPoolExecutor 中核心参数设计的公式:假设执行单个任务平均耗时 0.1 秒,系统最大产生 1000 个任务,每秒平均产生 100 个任务,若需要这 100 个任务可以在 1 秒内完成,则:
- 核心线程数:
总时间 * 平均任务数 * 单个任务平均耗时
,即1 * 100 * 0.1 = 10
个。 - 队列容量:
核心线程数 / 单个任务平均耗时 * 2
,即10 / 0.1 * 2 = 200
个。 - 最大线程数:
(最大任务数 - 队列容量) * 单个任务平均耗时
,即(1000 - 200) * 0.1 = 80
个。
4. CachedThreadPool
心法:CachedThreadPool 是 Executors 框架中的内置线程池,底层使用 ThreadPoolExecutor 工具。
CachedThreadPool 基本参数如下:
- 核心线程数:0
- 最大线程数:Integer 最大值,容易 OOM。
- 最大空闲时间:60 秒
- 工作队列:SynchronousQueue
武技:测试 CachedThreadPool 内置线程池
package pool;
/** @author 周航宇 */
public class CachedThreadPoolTest {
@SneakyThrows
@Test
public void testCachedThreadPool() {
// 创建一个缓存线程池
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0, j = 10; i < j; i++) {
TimeUnit.SECONDS.sleep(1L);
executorService.execute(() -> System.out.println("hello!"));
}
TimeUnit.SECONDS.sleep(20L);
}
}
5. FixedThreadPool
心法:FixedThreadPool 是 Executors 框架中的内置线程池,底层使用 ThreadPoolExecutor 工具。
FixedThreadPool 基本参数如下:
- 核心线程数:需要在构造时指定。
- 最大线程数:与核心线程数一致。
- 最大空闲时间:0 秒
- 工作队列:LinkedBlockingQueue,最大容量 Integer 最大值,容易 OOM。
武技:测试 FixedThreadPool 内置线程池
package pool;
/** @author 周航宇 */
public class FixedThreadPoolTest {
@SneakyThrows
@Test
public void testFixedThreadPool() {
// 创建一个固定线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 每隔1秒提交一个任务到线程池中
for (int i = 0, j = 10; i < j; i++) {
TimeUnit.SECONDS.sleep(1L);
executorService.execute(() -> System.out.println("hello!"));
}
TimeUnit.SECONDS.sleep(20L);
}
}
6. ScheduledThreadPool
心法:ScheduledThreadPool 是 Executors 框架中的内置线程池,底层使用 ThreadPoolExecutor 工具。
ScheduledThreadPool 基本参数如下:
- 核心线程数:需要在构造时指定。
- 最大线程数:Integer 最大值,容易 OOM。
- 最大空闲时间:10 毫秒
- 工作队列:DelayedWorkQueue
武技:测试 ScheduledThreadPool 内置线程池
package pool;
/** @author 周航宇 */
public class ScheduledThreadPoolTest {
@SneakyThrows
@Test
public void testSchedule() {
// 获取当前时间中的秒
System.out.println(Calendar.getInstance().get(Calendar.SECOND));
// 创建一个定时线程池
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
// 延迟2秒后执行任务
executorService.schedule(() -> {
System.out.println(Calendar.getInstance().get(Calendar.SECOND));
}, 2, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(20L);
}
@SneakyThrows
@Test
public void testScheduleAtFixedRate() {
// 获取当前时间中的秒
System.out.println(Calendar.getInstance().get(Calendar.SECOND));
// 创建一个定时线程池
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延迟2秒后执行任务,然后从上一个任务开始执行时始计算,每隔3秒执行一次
executorService.scheduleAtFixedRate(() -> {
try {
TimeUnit.SECONDS.sleep(2L);
System.out.println(Calendar.getInstance().get(Calendar.SECOND));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 2, 3, TimeUnit.SECONDS);
System.out.println(System.in.read());
}
@SneakyThrows
@Test
public void testScheduleWithFixedDelay() {
// 获取当前时间中的秒
System.out.println(Calendar.getInstance().get(Calendar.SECOND));
// 创建一个定时线程池
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延迟2秒后执行任务,然后从上一个任务执行结束时始计算,每隔3秒执行一次
executorService.scheduleWithFixedDelay(() -> {
try {
TimeUnit.SECONDS.sleep(2L);
System.out.println(Calendar.getInstance().get(Calendar.SECOND));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 2, 3, TimeUnit.SECONDS);
System.out.println(System.in.read());
}
}
7. SingleThreadExecutor
心法:SingleThreadExecutor 是 Executors 框架中的内置线程池,底层使用 ThreadPoolExecutor 工具。
SingleThreadExecutor 基本参数如下:
- 核心线程数:1
- 最大线程数:1
- 最大空闲时间:0 秒
- 工作队列:LinkedBlockingQueue,最大容量 Integer 最大值,容易 OOM。
武技:测试 SingleThreadExecutor 内置线程池
package pool;
/** @author 周航宇 */
public class SingleThreadExecutorTest {
@SneakyThrows
@Test
public void testSingleThreadExecutor() {
// 创建单线程线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 每隔1秒提交一个任务到线程池中
for (int i = 0, j = 10; i < j; i++) {
TimeUnit.SECONDS.sleep(1L);
executorService.execute(() -> System.out.println("hello!"));
}
TimeUnit.SECONDS.sleep(20L);
}
}
8. 线程池拒绝策略
心法:线程池拒绝策略指的是当总线程数已达最大时,对新申请提交的任务的拒绝方式。
ThreadPoolExecutor 内置的拒绝策略包含以下四种:
threadPoolExecutor.AbortPolicy()
:直接抛出异常,阻止系统正常工作。 2.threadPoolExecutor.DiscardPolicy()
:啥事都不干,直接把任务丢弃。 3.threadPoolExecutor.DiscardOldestPolicy()
:丢弃等待队列中的第一个任务(最老),让新来的任务入队。threadPoolExecutor.CallerRunsPolicy()
:只要线程池没有关闭,该策略直接在调用者线程中执行当前被丢弃的任务,即哪个线程调用了execute()/submit()
方法,哪个线程执行这个任务。
武技:测试 ThreadPoolExecutor 内置的拒绝策略
- 开发测试类:测试 AbortPolicy 策略:
package pool;
/** @author 周航宇 */
public class AbortPolicyTest {
@ToString
private static class MyTask implements Runnable {
private int i;
private MyTask(int i) {
this.i = i;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": " + i);
TimeUnit.SECONDS.sleep(10L);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SneakyThrows
@Test
public void testAbortPolicy() {
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// 提交8个任务,将线程池中的核心线程,工作队列和拓展线程都占满
for (int i = 0, j = 8; i < j; i++) {
threadPool.execute(new MyTask(i));
}
// 查看工作队列
System.out.println("queue:" + threadPool.getQueue());
// 再次提交一个任务
System.out.println("再次提交一个100号任务");
threadPool.execute(new MyTask(100));
// 查看工作队列
System.out.println("queue:" + threadPool.getQueue());
// 关闭线程池
threadPool.shutdown();
// 阻止主线程
TimeUnit.SECONDS.sleep(20L);
}
}
- 开发测试类:测试 DiscardPolicy 策略:
package pool;
/** @author 周航宇 */
public class DiscardPolicyTest {
@ToString
private static class MyTask implements Runnable {
private int i;
private MyTask(int i) {
this.i = i;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": " + i);
TimeUnit.SECONDS.sleep(10L);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SneakyThrows
@Test
public void testDiscardPolicy() {
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
// 提交8个任务,将线程池中的核心线程,工作队列和拓展线程都占满
for (int i = 0, j = 8; i < j; i++) {
threadPool.execute(new MyTask(i));
}
// 查看工作队列
System.out.println("queue:" + threadPool.getQueue());
// 再次提交一个任务
System.out.println("再次提交一个100号任务");
threadPool.execute(new MyTask(100));
// 查看工作队列
System.out.println("queue:" + threadPool.getQueue());
// 关闭线程池
threadPool.shutdown();
// 阻止主线程
TimeUnit.SECONDS.sleep(20L);
}
}
- 开发测试类:测试 DiscardOldestPolicy 策略:
package pool;
/** @author 周航宇 */
public class DiscardOldestPolicyTest {
@ToString
private static class MyTask implements Runnable {
private int i;
private MyTask(int i) {
this.i = i;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": " + i);
TimeUnit.SECONDS.sleep(10L);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SneakyThrows
@Test
public void testDiscardOldestPolicy() {
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
// 提交8个任务,将线程池中的核心线程,工作队列和拓展线程都占满
for (int i = 0, j = 8; i < j; i++) {
threadPool.execute(new MyTask(i));
}
// 查看工作队列
System.out.println("queue:" + threadPool.getQueue());
// 再次提交一个任务
System.out.println("再次提交一个100号任务");
threadPool.execute(new MyTask(100));
// 查看工作队列
System.out.println("queue:" + threadPool.getQueue());
// 关闭线程池
threadPool.shutdown();
// 阻止主线程
TimeUnit.SECONDS.sleep(20L);
}
}
- 开发测试类:测试 CallerRunsPolicy 策略:
package pool;
/** @author 周航宇 */
public class CallerRunsPolicyTest {
@ToString
private static class MyTask implements Runnable {
private int i;
private MyTask(int i) {
this.i = i;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": " + i);
TimeUnit.SECONDS.sleep(10L);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SneakyThrows
@Test
public void testCallerRunsPolicy() {
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
// 提交8个任务,将线程池中的核心线程,工作队列和拓展线程都占满
for (int i = 0, j = 8; i < j; i++) {
threadPool.execute(new MyTask(i));
}
// 查看工作队列
System.out.println("queue:" + threadPool.getQueue());
// 再次提交一个任务
System.out.println("再次提交一个100号任务");
threadPool.execute(new MyTask(100));
// 查看工作队列
System.out.println("queue:" + threadPool.getQueue());
// 关闭线程池
threadPool.shutdown();
// 阻止主线程
TimeUnit.SECONDS.sleep(20L);
}
}
E03. 自定义线程池
心法:自定义线程池流程
武技:自定义线程池
1. 自定义线程任务类
package pool;
/** @author 周航宇 */
public class MyThreadPoolTest {
/** 自定义线程任务类 */
@Data
private static class MyTask implements Runnable {
private String taskName;
MyTask(String taskName) {
this.taskName = taskName;
}
@SneakyThrows
@Override
public void run() {
// 打印哪个线程正在执行哪个任务
System.out.println(Thread.currentThread().getName() + " 正在执行 " + taskName + "任务");
// 模拟任务耗时3L
TimeUnit.SECONDS.sleep(10L);
}
}
}
2. 自定义线程工厂类
package pool;
/** @author 周航宇 */
public class MyThreadPoolTest {
/** 自定义线程工厂 */
private static class MyThreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread(r, "自定义线程-" + mThreadNum.getAndIncrement());
System.out.println(thread.getName() + " 被创建");
return thread;
}
}
}
3. 自定义拒绝策略类
package pool;
/** @author 周航宇 */
public class MyThreadPoolTest {
/** 自定义拒绝策略 */
private static class MyRejectedPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以将未能处理的任务保存到Redis,MQ或者DB中,在其他时间进行处理
System.out.println(((MyTask) r).getTaskName() + " 因线程池耗尽而被拒绝..");
}
}
}
4. 测试自定义线程池
package pool;
/** @author 周航宇 */
public class MyThreadPoolTest {
@Test
@SneakyThrows
public void testMyThreadPool() {
// 创建一个线程池实例: 使用自定义的线程工厂和拒绝策略
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, 4, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new MyThreadFactory(),
new MyRejectedPolicy());
// 向线程池提交8个任务
// 核心线程执行2个任务,任务队列留存2个任务,拓展线程执行2个任务,其余2个任务被拒绝
for (int i = 1, j = 8; i <= j; i++) {
threadPool.execute(new MyTask("任务" + i));
}
// 阻止主线程结束
TimeUnit.SECONDS.sleep(20L);
}
}
Java道经第1卷 - 第7阶 - 并发编程(五)
传送门:JB1-7-并发编程(一)
传送门:JB1-7-并发编程(二)
传送门:JB1-7-并发编程(三)
传送门:JB1-7-并发编程(四)
传送门:JB1-7-并发编程(五)