JB1-7-并发编程(五)

Java道经第1卷 - 第7阶 - 并发编程(五)


传送门:JB1-7-并发编程(一)
传送门:JB1-7-并发编程(二)
传送门:JB1-7-并发编程(三)
传送门:JB1-7-并发编程(四)
传送门:JB1-7-并发编程(五)

S09. 线程池

E01. 线程池基础概念

心法:阿里巴巴 Java 开发手册中指出,线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,一方面是线程的创建更加规范,可以合理控制开辟线程的数量,一方面线程的细节管理交给线程池处理,优化了资源的开销。

线程池使用场景:一般情况下,一个请求需要至少开启一个线程来执行具体的请求内容,若请求数量特别多,且请求内容处理的时间非常短,则会造成频繁的创建和销毁线程的过程,系统开销大,此时建议使用线程池来解决该问题。

线程池优点

  1. 线程池可以维护多个线程的生命周期。
  2. 线程池可以通过线程复用来提高线程的利用率。
  3. 线程池可以提前创建好一批线程,在请求到达时线程直接获取,避免了线程的频繁创建和销毁。
  4. 线程池中可以适当地调整线程数,以对请求进行限流,从而可以防止资源不足。

线程池缺点

  1. 线程池本身也是需要维护的,当线程数量特别少,或者执行时间特别长的时候,不建议使用。
  2. 线程池容易遭受的并发风险,比如同步错误,与池有关的死锁,资源不足和线程泄漏。

线程池使用流程

  1. 创建一个线程池,并进行基础的配置:
    • 可以通过 ThreadPoolExecutor 类来创建,推荐。
    • 可以通过 Executors 类来创建,是前者的上层封装,不灵活,不推荐。
  2. 将 Runnable 或 Callable 线程任务提交到线程池中。
  3. 线程池自动为该任务分配线程并负责该线程从启动,运行到销毁的全部生命周期活动。

线程池任务提交相关 API 方法:以下两个提交任务的方法均不会阻塞当前所在线程:

  • threadPool.execute(线程任务)
    • 提交 Runnable 任务到线程池,无返回值。
    • 该方法不支持提交 Callable 任务到线程池。
  • threadPool.submit(线程任务)
    • 提交 Runnable 任务到线程池,返回一个 Future<?> 实例,实例中数据为空。
    • 提交 Callable 任务到线程池,返回一个 Future<返回值类型> 实例。
  • threadPool.shutdown():不再新增线程,发出停止信号,等所有线程执行完毕,关闭线程池,节省资源。
  • threadPool.shutdownNow():不再新增线程,立即关闭线程池,节省资源。

1. Runnable线程任务

心法:Runnable 函数式线程接口,内部仅包含一个 run() 方法,无返回值,不抛出异常。

Runnable 和线程池配合使用流程如下

创建
Runnable任务
创建
线程池
将Runnable任务
提交到线程池
该过程
不阻塞
线程池分配线程
执行Runnable任务
该过程
不阻塞

武技:测试将 Runnable 任务提交到线程池

package pool;

public class RunnableThreadPoolTest {

    @SneakyThrows
    @Test
    public void testExecute() {
        // 创建一个可缓存的线程池
        ExecutorService threadPool = Executors.newCachedThreadPool();
        System.out.println("0s: 线程池创建完毕");

        // 创建一个Runnable任务
        Runnable runnableTask = () -> {
            try {
                System.out.println("0s: runnable任务睡眠");
                TimeUnit.SECONDS.sleep(2L);
                System.out.println("2s: runnable任务执行完毕");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };

        // 将Runnable任务异步提交到线程池中,该方法不会阻塞主线程
        threadPool.execute(runnableTask);
        System.out.println("0s: 若我立刻输出,则证明 execute 方法不阻塞主线程");

        // 不再新增线程,发出停止信号,等所有线程执行完毕,关闭线程池,节省资源
        threadPool.shutdown();
        System.out.println("2s: 关闭线程池");
        System.out.println("2s: 主线程退出");
    }
}

2. Callable线程任务

心法:Callable 函数式线程接口,内部仅包含一个 call() 方法,有返回值,会抛出 Exception 异常。

Callable 和线程池配合使用流程如下

创建
Callable任务
创建
线程池
将Callable任务
提交到线程池
该过程
不阻塞
线程池分配线程
执行Callable任务
该过程
不阻塞
线程池封装结果
到Future中并返回
从Future中
获取返回值
该过程
阻塞

武技:测试将 Callable 任务提交到线程池

package pool;

public class CallableThreadPoolTest {


    @SneakyThrows
    @Test
    public void testSubmit() {
        // 创建一个可缓存的线程池
        ExecutorService threadPool = Executors.newCachedThreadPool();
        System.out.println("0s: 线程池创建完毕");

        // 创建一个Callable任务
        Callable<Integer> callableTask = () -> {
            System.out.println("0s: callable任务睡眠");
            TimeUnit.SECONDS.sleep(2L);
            System.out.println("2s: callable任务执行完毕");
            return 9527;
        };

        // 将Callable任务异步提交到线程池中,该方法不会阻塞主线程
        Future<Integer> callableResult = threadPool.submit(callableTask);
        System.out.println("0s: 若我立刻输出,则证明 submit 方法不阻塞");

        // 获取任务方法的返回值,该方法会阻塞主线程,直到获取到结果
        System.out.println("2s: 获取到call方法返回值 -> " + callableResult.get());
        System.out.println("2s: 若我立刻输出,则证明 get 方法不阻塞");

        // 不再新增线程,发出停止信号,等所有线程执行完毕,关闭线程池,节省资源
        threadPool.shutdown();
        System.out.println("2s: 关闭线程池");
        System.out.println("2s: 主线程退出");
    }
}

3. FutureTask

心法:j.u.c.FutureTask 类既实现了 Runnable 接口,又实现了 Future 接口,所以它既能作为一个线程类,又能直接存储 Callable 任务的结果。

FutureTask 相关 API 方法

  • new FutureTask<>(Callable任务):创建一个 FutureTask 实例。
  • new Thread(futureTask):FutureTask 可以直接作为一个实现了 Runnable 接口的线程类。
  • futureTask.get():获取任务方法的返回值,该方法会阻塞主线程,直到获取到结果。

武技:测试 FutureTask 类

package pool;

/** @author 周航宇 */
public class FutureTaskTest {

    @SneakyThrows  
	@Test  
	public void testFutureTask() {  
	  
	    System.out.println("0s: 子线程启动");  
	  
	    // 创建一个FutureTask实例,参数是一个Callable任务  
	    FutureTask<Integer> futureTask = new FutureTask<>(() -> {  
	        System.out.println("0s: Callable任务睡眠");  
	        TimeUnit.SECONDS.sleep(2L);  
	        System.out.println("2s: Callable任务结束,返回9527");  
	        System.out.println("2s: 子线程退出");  
	        return 9527;  
	    });  
	  
	    // futureTask实现了Runnable接口,是一个线程类  
	    new Thread(futureTask).start();  
	  
	    // futureTask实现了Future接口,线程体方法支持返回值  
	    // 获取线程中call()的返回值,这个方法会阻塞主线程,直到获取到结果  
	    Integer callableResult = futureTask.get();  
	    System.out.println("2s: call方法的返回值: " + callableResult);  
	    System.out.println("2s: 主线程退出");  
	}
}

4. CompletableFuture

心法:j.u.c.CompletableFuture 类可以对多个任务进行异步执行和统一管理。

CompletableFuture 常用 API 方法

  • CompletableFuture.supplyAsync(线程任务):异步执行一个 Callable 线程任务,要返回一个 Future<返回值类型> 结果。
  • CompletableFuture.runAsync(线程任务):异步执行一个 Runnable 线程任务,即使没有返回值,也要返回一个 Future<Void> 结果。
  • CompletableFuture.allOf(FutureA, FutureB ...):汇总所有的 Future 结果,得到一个新的 Future<Void> 结果。

在这里插入图片描述

武技:测试 CompletableFuture 异步调用耗时情况

  1. 开发 6 个任务方法,3 个带返回值,3 个不带返回值:
package pool;

/** @author 周航宇 */
public class CompletableFutureTest {

    @SneakyThrows
    private String taskA() {
        TimeUnit.SECONDS.sleep(1L);
        System.out.println("1s: taskA执行完毕");
        return "A";
    }

    @SneakyThrows
    private String taskB() {
        TimeUnit.SECONDS.sleep(2L);
        System.out.println("2s: taskB执行完毕");
        return "B";
    }

    @SneakyThrows
    private String taskC() {
        TimeUnit.SECONDS.sleep(3L);
        System.out.println("3s: taskC执行完毕");
        return "C";
    }

    @SneakyThrows
    private void taskD() {
        TimeUnit.SECONDS.sleep(1L);
        System.out.println("1s: taskD执行完毕");
    }

    @SneakyThrows
    private void taskE() {
        TimeUnit.SECONDS.sleep(2L);
        System.out.println("2s: taskE执行完毕");
    }

    @SneakyThrows
    private void taskF() {
        TimeUnit.SECONDS.sleep(3L);
        System.out.println("3s: taskF执行完毕");
    }
}
  1. 测试同步调用 6 个任务时的总耗时:
package pool;

/** @author 周航宇 */
public class CompletableFutureTest {

	/** 同步执行6个任务,计算总耗时 */
    @SneakyThrows
    @Test
    public void testTask() {
        long start = System.currentTimeMillis();
        System.out.println("1s: taskA返回值: " + taskA());
        System.out.println("2s: taskB返回值: " + taskB());
        System.out.println("3s: taskC返回值: " + taskC());
        taskD();
        taskE();
        taskF();
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end - start));
    }
}
  1. 测试使用 CompletableFuture 异步调用 6 个任务的总耗时:
package pool;

/** @author 周航宇 */
public class CompletableFutureTest {

    /** 使用CompletableFuture异步执行6个任务,计算总耗时 */
    @SneakyThrows
    @Test
    public void testTaskByCompletableFuture() {
        long start = System.currentTimeMillis();

        // supplyAsync(): 执行三个带返回值的异步任务
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(this::taskA);
        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(this::taskB);
        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(this::taskC);

        // runAsync(): 执行三个不带返回值的异步任务
        CompletableFuture<Void> futureD = CompletableFuture.runAsync(this::taskD);
        CompletableFuture<Void> futureE = CompletableFuture.runAsync(this::taskE);
        CompletableFuture<Void> futureF = CompletableFuture.runAsync(this::taskF);

        // 任务汇总: 当六个future全部完成之后,汇总到一个新的future实例中
        CompletableFuture<Void> future = CompletableFuture.allOf(
                futureA, futureB, futureC, futureD, futureE, futureF);

        // 让任务汇总工作插主线程的队
        future.join();

        // 当futureA/futureB/futureC任务均执行完毕后,获取其返回值
        if (futureA.isDone() && futureB.isDone() && futureC.isDone()) {
            System.out.println("taskA返回值: " + futureA.get());
            System.out.println("taskB返回值: " + futureB.get());
            System.out.println("taskC返回值: " + futureC.get());
        }

		long end = System.currentTimeMillis();  
		  
		// 计算总耗时  
		System.out.println("耗时:" + (end - start));
    }
}

E02. ThreadPoolExecutor

心法:j.u.c.ThreadPoolExecutor 用于创建线程池,每个线程池内部都维护了一个 HashSet 结构的线程集合和一个任务队列。

在这里插入图片描述

1. 线程池核心参数

心法 ThreadPoolExecutor 构造器拥有 7 个参数。

参数具体如下

  1. int corePoolSize: 核心线程数,线程池维护的最少线程数量,即使空闲也不会给归还给 OS。
  2. int maximumPoolSize:线程池维护的最多线程数量,即当核心线程不够了,最大能拓展到多少。
  3. long keepAliveTime:非核心线程所允许的最大的空闲时间,某个线程的空闲时间如果超过此指定值,会被归还给 OS。
  4. TimeUnit unit:最大的空闲时间单位。
  5. BlockingQueue<Runnable> workQueue:线程池所使用的工作队列类型。
  6. ThreadFactory threadFactory:线程工厂,用于产生线程,可以自定义,默认使用DefaultThreadFactory 类。
  7. RejectedExecutionHandler handler:线程池对拒绝任务采取的拒绝策略,默认使用 AbortPolicy 策略类。

武技:测试通过 ThreadPoolExecutor 创建线程池

package pool;

/** @author 周航宇 */
public class ThreadPoolExecutorTest {

    @SneakyThrows
    @Test
    public void testThreadPoolExecutor() {

        // 创建一个线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                // 核心线程数,线程池维护的最少线程数量,即使空闲也不会给归还给OS
                2,
                // 线程池维护的最多线程数量,即当核心线程不够了,最大能拓展到多少
                4,
                // 非核心线程所允许的最大的空闲时间,某个线程的空闲时间如果超过此指定值,会被归还给OS
                3, TimeUnit.SECONDS,
                // 线程池所使用的工作队列类型,即BlockingQueue类型
                new ArrayBlockingQueue<>(3),
                // 线程工厂,用于产生线程,默认使用DefaultThreadFactory
                Executors.defaultThreadFactory(),
                // 线程池对拒绝任务采取的拒绝策略,默认使用AbortPolicy
                new ThreadPoolExecutor.AbortPolicy()
        );

        // Runnable线程任务使用 execute() 提交到线程池中
        threadPool.execute(() -> System.out.println("runnable.."));

        TimeUnit.SECONDS.sleep(2L);

        // Callable线程任务使用 submit() 提交到线程池中
        Future<Integer> future = threadPool.submit(() -> {
            System.out.println("callable..");
            return 9527;
        });

        // 获取Callable任务实例的返回值
        System.out.println(future.get());

        // 关闭线程池
        threadPool.shutdown();
    }
}

2. 线程池处理任务流程

心法:ThreadPoolExecutor 接收任务整体过程中,若某个线程执行完任务,则会优先从等待队列中拉取任务继续执行,而新来的任务继续进入等待队列。

流程模拟:假设我们设置核心线程数为 2,最大线程数为 4,任务队列最大值为 2,那么接收任务的流程如下图:

线程池
首次创建
队列和集合均为空
任务A提交
到线程池
启动1个核心线程T1
处理任务A
核心线程
T1
任务B提交
到线程池
启动1个核心线程T2
处理任务B
核心线程
T1, T2
已满
任务C提交
到线程池
将任务C加入队列
任务队列
任务C
核心线程
T1, T2
已满
任务D提交
到线程池
将任务D加入队列
任务队列
任务C, 任务D
已满
核心线程
T1, T2
已满
任务E提交
到线程池
拓展一个临时线程T3
处理任务E
任务队列
任务C, 任务D
已满
核心线程
T1, T2
已满
临时线程
T3
任务F提交
到线程池
拓展一个临时线程T4
处理任务F
任务队列
任务C, 任务D
已满
核心线程
T1, T2
已满
临时线程
T3, T4
已满
任务G提交
到线程池
对任务G执行拒绝策略

3. 核心参数设计原则

心法:CPU 密集型任务多就用小线程池(避免过度线程切换,加重 CPU 消耗),IO 密集型任务多就用大线程池(让 CPU 在等待 IO 时,有其他线程去处理别的任务,充分利用 CPU 时间)。

CPU 密集型任务:CPU 利用率很高,但 IO 读写次数很低,比如数据加密,文件压缩,复杂计算等。

IO 密集型任务:CPU 利用率很低,但 IO 读写很耗时,比如文件读写,数据库通信,网络通信等。

ThreadPoolExecutor 中核心参数设计的公式:假设执行单个任务平均耗时 0.1 秒,系统最大产生 1000 个任务,每秒平均产生 100 个任务,若需要这 100 个任务可以在 1 秒内完成,则:

  • 核心线程数总时间 * 平均任务数 * 单个任务平均耗时,即 1 * 100 * 0.1 = 10 个。
  • 队列容量核心线程数 / 单个任务平均耗时 * 2,即 10 / 0.1 * 2 = 200 个。
  • 最大线程数(最大任务数 - 队列容量) * 单个任务平均耗时,即 (1000 - 200) * 0.1 = 80 个。

4. CachedThreadPool

心法:CachedThreadPool 是 Executors 框架中的内置线程池,底层使用 ThreadPoolExecutor 工具。

CachedThreadPool 基本参数如下

  • 核心线程数:0
  • 最大线程数:Integer 最大值,容易 OOM。
  • 最大空闲时间:60 秒
  • 工作队列:SynchronousQueue

武技:测试 CachedThreadPool 内置线程池

package pool;

/** @author 周航宇 */
public class CachedThreadPoolTest {

    @SneakyThrows
    @Test
    public void testCachedThreadPool() {
        // 创建一个缓存线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0, j = 10; i < j; i++) {
            TimeUnit.SECONDS.sleep(1L);
            executorService.execute(() -> System.out.println("hello!"));
        }
        TimeUnit.SECONDS.sleep(20L);
    }
}

5. FixedThreadPool

心法:FixedThreadPool 是 Executors 框架中的内置线程池,底层使用 ThreadPoolExecutor 工具。

FixedThreadPool 基本参数如下

  • 核心线程数:需要在构造时指定。
  • 最大线程数:与核心线程数一致。
  • 最大空闲时间:0 秒
  • 工作队列:LinkedBlockingQueue,最大容量 Integer 最大值,容易 OOM。

武技:测试 FixedThreadPool 内置线程池

package pool;

/** @author 周航宇 */
public class FixedThreadPoolTest {

    @SneakyThrows
    @Test
    public void testFixedThreadPool() {
        // 创建一个固定线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        // 每隔1秒提交一个任务到线程池中
        for (int i = 0, j = 10; i < j; i++) {
            TimeUnit.SECONDS.sleep(1L);
            executorService.execute(() -> System.out.println("hello!"));
        }
        TimeUnit.SECONDS.sleep(20L);
    }
}

6. ScheduledThreadPool

心法:ScheduledThreadPool 是 Executors 框架中的内置线程池,底层使用 ThreadPoolExecutor 工具。

ScheduledThreadPool 基本参数如下

  • 核心线程数:需要在构造时指定。
  • 最大线程数:Integer 最大值,容易 OOM。
  • 最大空闲时间:10 毫秒
  • 工作队列:DelayedWorkQueue

武技:测试 ScheduledThreadPool 内置线程池

package pool;

/** @author 周航宇 */
public class ScheduledThreadPoolTest {

    @SneakyThrows
    @Test
    public void testSchedule() {
        // 获取当前时间中的秒
        System.out.println(Calendar.getInstance().get(Calendar.SECOND));
        // 创建一个定时线程池
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
        // 延迟2秒后执行任务
        executorService.schedule(() -> {
            System.out.println(Calendar.getInstance().get(Calendar.SECOND));
        }, 2, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(20L);
    }

    @SneakyThrows
    @Test
    public void testScheduleAtFixedRate() {
        // 获取当前时间中的秒
        System.out.println(Calendar.getInstance().get(Calendar.SECOND));
        // 创建一个定时线程池
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        // 延迟2秒后执行任务,然后从上一个任务开始执行时始计算,每隔3秒执行一次
        executorService.scheduleAtFixedRate(() -> {
            try {
                TimeUnit.SECONDS.sleep(2L);
                System.out.println(Calendar.getInstance().get(Calendar.SECOND));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 2, 3, TimeUnit.SECONDS);

        System.out.println(System.in.read());
    }

    @SneakyThrows
    @Test
    public void testScheduleWithFixedDelay() {
        // 获取当前时间中的秒
        System.out.println(Calendar.getInstance().get(Calendar.SECOND));
        // 创建一个定时线程池
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        // 延迟2秒后执行任务,然后从上一个任务执行结束时始计算,每隔3秒执行一次
        executorService.scheduleWithFixedDelay(() -> {
            try {
                TimeUnit.SECONDS.sleep(2L);
                System.out.println(Calendar.getInstance().get(Calendar.SECOND));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 2, 3, TimeUnit.SECONDS);

        System.out.println(System.in.read());
    }
}

7. SingleThreadExecutor

心法:SingleThreadExecutor 是 Executors 框架中的内置线程池,底层使用 ThreadPoolExecutor 工具。

SingleThreadExecutor 基本参数如下

  • 核心线程数:1
  • 最大线程数:1
  • 最大空闲时间:0 秒
  • 工作队列:LinkedBlockingQueue,最大容量 Integer 最大值,容易 OOM。

武技:测试 SingleThreadExecutor 内置线程池

package pool;

/** @author 周航宇 */
public class SingleThreadExecutorTest {

    @SneakyThrows
    @Test
    public void testSingleThreadExecutor() {
        // 创建单线程线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        // 每隔1秒提交一个任务到线程池中
        for (int i = 0, j = 10; i < j; i++) {
            TimeUnit.SECONDS.sleep(1L);
            executorService.execute(() -> System.out.println("hello!"));
        }
        TimeUnit.SECONDS.sleep(20L);
    }
}

8. 线程池拒绝策略

心法:线程池拒绝策略指的是当总线程数已达最大时,对新申请提交的任务的拒绝方式。

ThreadPoolExecutor 内置的拒绝策略包含以下四种

  1. threadPoolExecutor.AbortPolicy():直接抛出异常,阻止系统正常工作。 2. threadPoolExecutor.DiscardPolicy():啥事都不干,直接把任务丢弃。 3. threadPoolExecutor.DiscardOldestPolicy():丢弃等待队列中的第一个任务(最老),让新来的任务入队。
  2. threadPoolExecutor.CallerRunsPolicy():只要线程池没有关闭,该策略直接在调用者线程中执行当前被丢弃的任务,即哪个线程调用了 execute()/submit() 方法,哪个线程执行这个任务。

武技:测试 ThreadPoolExecutor 内置的拒绝策略

  1. 开发测试类:测试 AbortPolicy 策略:
package pool;

/** @author 周航宇 */
public class AbortPolicyTest {

    @ToString
    private static class MyTask implements Runnable {
        private int i;

        private MyTask(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + ": " + i);
                TimeUnit.SECONDS.sleep(10L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @SneakyThrows
    @Test
    public void testAbortPolicy() {

        // 创建线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2, 4, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        // 提交8个任务,将线程池中的核心线程,工作队列和拓展线程都占满
        for (int i = 0, j = 8; i < j; i++) {
            threadPool.execute(new MyTask(i));
        }

        // 查看工作队列
        System.out.println("queue:" + threadPool.getQueue());

        // 再次提交一个任务
        System.out.println("再次提交一个100号任务");
        threadPool.execute(new MyTask(100));

        // 查看工作队列
        System.out.println("queue:" + threadPool.getQueue());

        // 关闭线程池
        threadPool.shutdown();

        // 阻止主线程
        TimeUnit.SECONDS.sleep(20L);
    }
}
  1. 开发测试类:测试 DiscardPolicy 策略:
package pool;

/** @author 周航宇 */
public class DiscardPolicyTest {

    @ToString
    private static class MyTask implements Runnable {
        private int i;

        private MyTask(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + ": " + i);
                TimeUnit.SECONDS.sleep(10L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @SneakyThrows
    @Test
    public void testDiscardPolicy() {

        // 创建线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2, 4, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());

        // 提交8个任务,将线程池中的核心线程,工作队列和拓展线程都占满
        for (int i = 0, j = 8; i < j; i++) {
            threadPool.execute(new MyTask(i));
        }

        // 查看工作队列
        System.out.println("queue:" + threadPool.getQueue());

        // 再次提交一个任务
        System.out.println("再次提交一个100号任务");
        threadPool.execute(new MyTask(100));

        // 查看工作队列
        System.out.println("queue:" + threadPool.getQueue());

        // 关闭线程池
        threadPool.shutdown();

        // 阻止主线程
        TimeUnit.SECONDS.sleep(20L);
    }
}
  1. 开发测试类:测试 DiscardOldestPolicy 策略:
package pool;

/** @author 周航宇 */
public class DiscardOldestPolicyTest {

    @ToString
    private static class MyTask implements Runnable {
        private int i;

        private MyTask(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + ": " + i);
                TimeUnit.SECONDS.sleep(10L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @SneakyThrows
    @Test
    public void testDiscardOldestPolicy() {

        // 创建线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2, 4, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        // 提交8个任务,将线程池中的核心线程,工作队列和拓展线程都占满
        for (int i = 0, j = 8; i < j; i++) {
            threadPool.execute(new MyTask(i));
        }

        // 查看工作队列
        System.out.println("queue:" + threadPool.getQueue());

        // 再次提交一个任务
        System.out.println("再次提交一个100号任务");
        threadPool.execute(new MyTask(100));

        // 查看工作队列
        System.out.println("queue:" + threadPool.getQueue());

        // 关闭线程池
        threadPool.shutdown();

        // 阻止主线程
        TimeUnit.SECONDS.sleep(20L);
    }
}
  1. 开发测试类:测试 CallerRunsPolicy 策略:
package pool;

/** @author 周航宇 */
public class CallerRunsPolicyTest {

    @ToString
    private static class MyTask implements Runnable {
        private int i;

        private MyTask(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + ": " + i);
                TimeUnit.SECONDS.sleep(10L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @SneakyThrows
    @Test
    public void testCallerRunsPolicy() {

        // 创建线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2, 4, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 提交8个任务,将线程池中的核心线程,工作队列和拓展线程都占满
        for (int i = 0, j = 8; i < j; i++) {
            threadPool.execute(new MyTask(i));
        }

        // 查看工作队列
        System.out.println("queue:" + threadPool.getQueue());

        // 再次提交一个任务
        System.out.println("再次提交一个100号任务");
        threadPool.execute(new MyTask(100));

        // 查看工作队列
        System.out.println("queue:" + threadPool.getQueue());

        // 关闭线程池
        threadPool.shutdown();

        // 阻止主线程
        TimeUnit.SECONDS.sleep(20L);
    }
}

E03. 自定义线程池

心法:自定义线程池流程

自定义任务类
自定义线程工厂
自定义拒绝策略
测试

武技:自定义线程池

1. 自定义线程任务类

package pool;

/** @author 周航宇 */
public class MyThreadPoolTest {

    /** 自定义线程任务类 */
    @Data
    private static class MyTask implements Runnable {

        private String taskName;

        MyTask(String taskName) {
            this.taskName = taskName;
        }

        @SneakyThrows
        @Override
        public void run() {
            // 打印哪个线程正在执行哪个任务
            System.out.println(Thread.currentThread().getName() + " 正在执行 " + taskName + "任务");
            // 模拟任务耗时3L
            TimeUnit.SECONDS.sleep(10L);
        }
    }
}

2. 自定义线程工厂类

package pool;

/** @author 周航宇 */
public class MyThreadPoolTest {

	/** 自定义线程工厂 */
	private static class MyThreadFactory implements ThreadFactory {
	
	    private final AtomicInteger mThreadNum = new AtomicInteger(1);
	
	    @Override
	    public Thread newThread(@NonNull Runnable r) {
	        Thread thread = new Thread(r, "自定义线程-" + mThreadNum.getAndIncrement());
	        System.out.println(thread.getName() + " 被创建");
	        return thread;
	    }
	}
}

3. 自定义拒绝策略类

package pool;

/** @author 周航宇 */
public class MyThreadPoolTest {

    /** 自定义拒绝策略 */
    private static class MyRejectedPolicy implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 可以将未能处理的任务保存到Redis,MQ或者DB中,在其他时间进行处理
            System.out.println(((MyTask) r).getTaskName() + " 因线程池耗尽而被拒绝..");
        }
    }
}

4. 测试自定义线程池

package pool;

/** @author 周航宇 */
public class MyThreadPoolTest {

	@Test
	@SneakyThrows
	public void testMyThreadPool() {
	
	    // 创建一个线程池实例: 使用自定义的线程工厂和拒绝策略
	    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
	            2, 4, 10, TimeUnit.SECONDS,
	            new ArrayBlockingQueue<>(2),
	            new MyThreadFactory(),
	            new MyRejectedPolicy());
	
	    // 向线程池提交8个任务
	    // 核心线程执行2个任务,任务队列留存2个任务,拓展线程执行2个任务,其余2个任务被拒绝
	    for (int i = 1, j = 8; i <= j; i++) {
	        threadPool.execute(new MyTask("任务" + i));
	    }
	
	    // 阻止主线程结束
	    TimeUnit.SECONDS.sleep(20L);
	}
}

Java道经第1卷 - 第7阶 - 并发编程(五)


传送门:JB1-7-并发编程(一)
传送门:JB1-7-并发编程(二)
传送门:JB1-7-并发编程(三)
传送门:JB1-7-并发编程(四)
传送门:JB1-7-并发编程(五)

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值