更新時間:2022-08-25 來源:黑馬程序員 瀏覽量:
目錄
- [Spring的Async注解線程池擴展方案]
- [目錄]
- [1. 擴展目的]
- [2. 擴展實現]
- [2.1 擴展Async注解的執行攔截器`AnnotationAsyncExecutionInterceptor`]
- [2.2 擴展Async注解的Spring代理顧問`AsyncAnnotationAdvisor`]
- [2.3 擴展Async注解的 Spring Bean 后置處理器`AsyncAnnotationBeanPostProcessor`]
- [2.4 擴展代理異步配置類`ProxyAsyncConfiguration`]
- [2.5 擴展異步代理配置選擇器`AsyncConfigurationSelector`]
- [2.6 擴展異步啟動注解`@EnableAsync`]
- [3. 額外擴展:給`@Async`注解代理指定線程池]
擴展目的
1. 異步調用,改用Spring提供的`@Aysnc`注解實現,代替手寫線程池執行。
2. 在實際場景中,可能會遇到需要將主線程的一些個性化參數、變量、數據傳遞到子線程中使用的需求。
3. `InheritableThreadLocal`可以解決子線程繼承父線程值的需求,但是它存在一些問題。
1. `SessionUser.SESSION_USER`是中臺提供,無法修改。
2. `InheritableThreadLocal`在線程池機制應用中并不友好,不及時在子線程中清除的話,會造成線程安全問題。
實現思路有兩種:
1. 針對`ThreadLocal`進行擴展,并說服中臺統一改用擴展后的`ThreadLocal`。
2. 針對`@EnableAsync`和`@Async`注解進行擴展,將手動copy的代碼寫入到Spring代理類中。
第一種要跟中臺打交道,就很煩,能夠天平自己獨立解決,就自己解決。第二種會是一個不錯的選擇,擴展實現也并不困難。
2. 擴展實現
2.1 擴展Async注解的執行攔截器`AnnotationAsyncExecutionInterceptor`
類全名:`org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor`
從調試記錄可以分析得出`AnnotationAsyncExecutionInterceptor#invoke`方法,正是創建異步任務并且執行異步任務的核心代碼所在,我們要做的就是重寫這個方法,將父線程的運行參數手動copy到子線程任務體中。
2.2 擴展Async注解的Spring代理顧問`AsyncAnnotationAdvisor`
我們依靠追蹤`AnnotationAsyncExecutionInterceptor`的構造方法調用,定位到了它。
全類名:`org.springframework.scheduling.annotation.AsyncAnnotationAdvisor`
> 補充說明:代理顧問(`Advisor`)、建議(`Advice`)以及Spring代理實現原理
>
> Spring `@EnableAsync`默認的代理模式是 JDK 代理,代理機制如下:
>
> Spring 一個 Bean 會在 `BeanPostProcessor#postProcessAfterInitialization()`這個生命周期環節,遍歷所有的`BeanPostProcessor`實例,判斷Bean是否符合代理條件,如果符合代理條件,就給 Bean 代理對象中追加建議(`Advice`)對象,這樣就完成了代理。
>
> 而建議(`Advice`)對象是由顧問(`Advisor`)對象創建和提供。
>
> 上一小節提到的異步執行攔截器`AnnotationAsyncExecutionInterceptor`就是實現了`Advice`接口的類。
在`@Async`注解的代理過程中,異步執行攔截器`AnnotationAsyncExecutionInterceptor`就是通過`AsyncAnnotationAdvisor#buildAdvice`方法創建的。
所以,當我們想要將擴展的新的異步執行攔截器`LibraAnnotationAsyncExecutionInterceptor`用起來,則需要相應的,還要把`AsyncAnnotationAdvisor#buildAdvice`方法重寫。
2.3 擴展Async注解的 Spring Bean 后置處理器`AsyncAnnotationBeanPostProcessor`
我們依靠追蹤`AsyncAnnotationAdvisor`的構造方法調用,定位到了它。
類全名:`org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor`
這個沒什么好說的,Spring Bean 的生命周期其中一環。是 Spring Bean 實現代理的起點。
開發人員可以自定義一個`BeanPostProcessor`類,把它注冊到 Bean 容器中,它就會自動生效,并將后續的每一個 Bean 實例進行條件判斷以及進行代理。
我們要重寫的方法是:`AsyncAnnotationBeanPostProcessor#setBeanFactory`。這個方法構造了異步代理顧問`AsyncAnnotationAdvisor`對象。
2.4 擴展代理異步配置類`ProxyAsyncConfiguration`
`AsyncAnnotationBeanPostProcessor`不是一般的 Spring Bean。它有幾個限制,導致它不能直接通過`@Component`或者`@Configuration`來創建實例。
`AsyncAnnotationBeanPostProcessor`僅僅是實現了基于 JDK 代理,如果開發決定另外一種(基于ASPECTJ編織),那么它就應該受到某種條件判斷來進行 Bean 實例化。
2. `AsyncAnnotationBeanPostProcessor`還需要配置指定的線程池、排序等等屬性,所以無法直接使用`@Component`注解注冊為 Bean。
我們閱讀一下`@EnableAsync`注解源碼:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { Class<? extends Annotation> annotation() default Annotation.class; boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; } ```
進一步閱讀`AsyncConfigurationSelector`的源碼:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * 分別為EnableAsync.mode()的PROXY和ASPECTJ值返回{@link ProxyAsyncConfiguration}或{@code AspectJAsyncConfiguration} 。 */ @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } } ```
謎底揭曉,`ProxyAsyncConfiguration`原來是在這里開始注冊到 Spring 容器中的。
Spring Boot 啟動后,會根據`@EnableAsync`注解的`mode()`方法的具體值,來決定整個Spring的 Bean 代理機制。
既然 Spring 代理機制只會有一種,所以,也就只會在兩種機制的配置類中選擇其中一個來進行實例化。
而默認`EnableAsync$mode()`默認值是`AdviceMode.PROXY`,所以默認采用 JDK 代理機制。
2.5 擴展異步代理配置選擇器`AsyncConfigurationSelector`
類全名:`org.springframework.scheduling.annotation.AsyncConfigurationSelector`
2.6 擴展異步啟動注解`@EnableAsync`
類全名:`org.springframework.scheduling.annotation.EnableAsync`
3. 額外擴展:給`@Async`注解代理指定線程池
`@Async`會自動根據類型`TaskExecutor.class`從 Spring Bean 容器中找一個已經實例化的異步任務執行器(線程池)。如果找不到,則另尋他路,嘗試從 Spring Bean 容器中查找名稱為`taskExecutor`的`Executor.class`實例。最后都還是未找到呢,就默認自動`new`一個`SimpleAsyncTaskExecutor`來用。
> 補充說明:`TaskExecutor.class`是Spring定義的,而`Executor.class`JDK定義的。
場景:其他小伙伴、或者舊代碼已經實現過了一個線程池,但是這個線程池,是個`Executor.class`類型,且 Bean 實例名稱不是`taskExecutor`(假設是`libraThreadPool`),正常情況下`@Async`根本無法找到它。
需求:通過配置,將`@Async`的默認線程池,指定為名為`libraThreadPool`的`Executor.class`類型線程池。
我們只需要注冊一個實現`AsyncConfigurer`接口的配置類
`org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers`:
/** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } ```