目的

Spring 本身的线程池很简单,每次用到线程都会开辟一个新的线程,效率不高,因此创建一个健壮的线程池,尽量重用现用线程,减少对象创建、消亡的开销是很有必要的。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "task.pool")
@Setter
@EnableAsync
public class TaskExecutorConfig {

/**
* 核心线程池大小
*/
private int corePoolSize;

/**
* 最大线程池大小
*/
private int maxPoolSize;

/**
* 活跃时间
*/
private int keepAliveSeconds;

/**
* 队列容量
*/
private int queueCapacity;

@Bean
public ThreadPoolTaskExecutor taskExecutorPool() {
// 标记此处-1,下方会使用匿名类对原生线程池进行拓展
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setQueueCapacity(queueCapacity);
// 当线程池达到最大时,不再开启新线程,使用调用者所在线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

分析

1、添加 Spring 配置

使用类配置注解,并设置前缀,这样在对象的成员变量会自动注入 spring 的配置文件中的键值对。

1
2
3
@Configuration
@ConfigurationProperties(prefix = "executor.pool")
@Setter

对应的配置文件,当然需要使用 setter 方法进行赋值:

1
2
3
4
5
6
executor:
pool:
core-pool-size: 20
max-pool-size: 40
keep-alive-seconds: 300
queue-capacity: 50

需要的依赖:

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

2、开启异步支持

使用 @EnableAsync  注解,也可以在启动类上添加。全局只要一个即可开启异步。

1
@EnableAsync

使用

交由 Spring 托管,直接使用依赖注入,注入名称默认为  @Bean  标记的方法名。

1
2
3
4
@Resource(name = "taskExecutorPool")
private ThreadPoolTaskExecutor taskExecutor;
// 或
@Async("taskExecutorPool")

拓展配置

当前线程在执行或提交时,不会显示的打印出线程池状态,为了实现这一点,此处通过匿名子类的方式拓展 spring#ThreadPoolTaskExecutor。自然,单独实现子类也可以达成效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 此处代码对应上方标记-1
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {

private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
log.info("{}{},线程数: [{}], 已完成线程数: [{}], 活跃线程数: [{}], 队列大小: [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("执行线程");
super.execute(task);
}

// 该方法可忽略,源码中并未使用 startTimeout 参数,直接调用 execute(task)
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("执行线程");
super.execute(task, startTimeout);
}

@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("提交线程");
return super.submit(task);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("提交线程");
return super.submit(task);
}

@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("提交监听");
return super.submitListenable(task);
}

@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("提交监听");
return super.submitListenable(task);
}
};

评论