目的
Spring 本身的线程池很简单,每次用到线程都会开辟一个新的线程,效率不高,因此创建一个健壮的线程池,尽量重用现用线程,减少对象创建、消亡的开销是很有必要的。
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Slf4j @Configuration @ConfigurationProperties(prefix = "task.pool") @Setter @EnableAsync public class TaskExecutorConfig {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
@Bean public ThreadPoolTaskExecutor taskExecutorPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setQueueCapacity(queueCapacity);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
|
分析
1、添加 Spring 配置
使用类配置注解,并设置前缀,这样在对象的成员变量会自动注入 spring 的配置文件中的键值对。
1 2 3
| @Configuration @ConfigurationProperties(prefix = "executor.pool") @Setter
|
对应的配置文件,当然需要使用 setter 方法进行赋值:
1 2 3 4 5 6
| executor: pool: core-pool-size: 20 max-pool-size: 40 keep-alive-seconds: 300 queue-capacity: 50
|
需要的依赖:
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
|
2、开启异步支持
使用 @EnableAsync
注解,也可以在启动类上添加。全局只要一个即可开启异步。
使用
交由 Spring 托管,直接使用依赖注入,注入名称默认为 @Bean
标记的方法名。
1 2 3 4
| @Resource(name = "taskExecutorPool") private ThreadPoolTaskExecutor taskExecutor;
@Async("taskExecutorPool")
|
拓展配置
当前线程在执行或提交时,不会显示的打印出线程池状态,为了实现这一点,此处通过匿名子类的方式拓展 spring#ThreadPoolTaskExecutor
。自然,单独实现子类也可以达成效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
private void showThreadPoolInfo(String prefix){ ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); log.info("{}{},线程数: [{}], 已完成线程数: [{}], 活跃线程数: [{}], 队列大小: [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("执行线程"); super.execute(task); }
@Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("执行线程"); super.execute(task, startTimeout); }
@Override public Future<?> submit(Runnable task) { showThreadPoolInfo("提交线程"); return super.submit(task); }
@Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("提交线程"); return super.submit(task); }
@Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("提交监听"); return super.submitListenable(task); }
@Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("提交监听"); return super.submitListenable(task); } };
|