`
ahua186186
  • 浏览: 555341 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

@Async核心实现1 --------AsyncExecutionAspectSupport

阅读更多
基本原理:

通过spring的扩展接口AbstractBeanFactoryAwareAdvisingPostProcessor,初始化自定义的切面AsyncAnnotationAdvisor来实现方法的代理,最后通过自定义方法拦截器AsyncExecutionInterceptor实现异步执行。

核心实现是:
1.继续接口MethodInterceptor
2.获取自定义注解:getExecutorQualifier



public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
    public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
    private static final boolean completableFuturePresent = ClassUtils.isPresent("java.util.concurrent.CompletableFuture", AsyncExecutionInterceptor.class.getClassLoader());
    protected final Log logger;
    private final Map<Method, AsyncTaskExecutor> executors;
    private volatile Executor defaultExecutor;
    private AsyncUncaughtExceptionHandler exceptionHandler;
    private BeanFactory beanFactory;

    public AsyncExecutionAspectSupport(Executor defaultExecutor) {
        this(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler());
    }

    public AsyncExecutionAspectSupport(Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
        this.logger = LogFactory.getLog(this.getClass());
        this.executors = new ConcurrentHashMap(16);
        this.defaultExecutor = defaultExecutor;
        this.exceptionHandler = exceptionHandler;
    }

    public void setExecutor(Executor defaultExecutor) {
        this.defaultExecutor = defaultExecutor;
    }

    public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    public void setBeanFactory(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
    }

    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
        if (executor == null) {
            String qualifier = this.getExecutorQualifier(method);
            Executor targetExecutor;
            if (StringUtils.hasLength(qualifier)) {
                targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
            } else {
                targetExecutor = this.defaultExecutor;
                if (targetExecutor == null) {
                    Map var5 = this.executors;
                    synchronized(this.executors) {
                        if (this.defaultExecutor == null) {
                            this.defaultExecutor = this.getDefaultExecutor(this.beanFactory);
                        }

                        targetExecutor = this.defaultExecutor;
                    }
                }
            }

            if (targetExecutor == null) {
                return null;
            }

            executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
            this.executors.put(method, executor);
        }

        return (AsyncTaskExecutor)executor;
    }

    protected abstract String getExecutorQualifier(Method var1);

    protected Executor findQualifiedExecutor(BeanFactory beanFactory, String qualifier) {
        if (beanFactory == null) {
            throw new IllegalStateException("BeanFactory must be set on " + this.getClass().getSimpleName() + " to access qualified executor '" + qualifier + "'");
        } else {
            return (Executor)BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
        }
    }

    protected Executor getDefaultExecutor(BeanFactory beanFactory) {
        if (beanFactory != null) {
            try {
                return (Executor)beanFactory.getBean(TaskExecutor.class);
            } catch (NoUniqueBeanDefinitionException var6) {
                this.logger.debug("Could not find unique TaskExecutor bean", var6);

                try {
                    return (Executor)beanFactory.getBean("taskExecutor", Executor.class);
                } catch (NoSuchBeanDefinitionException var4) {
                    if (this.logger.isInfoEnabled()) {
                        this.logger.info("More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: " + var6.getBeanNamesFound());
                    }
                }
            } catch (NoSuchBeanDefinitionException var7) {
                this.logger.debug("Could not find default TaskExecutor bean", var7);

                try {
                    return (Executor)beanFactory.getBean("taskExecutor", Executor.class);
                } catch (NoSuchBeanDefinitionException var5) {
                    this.logger.info("No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either");
                }
            }
        }

        return null;
    }

    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
        if (completableFuturePresent) {
            Future<Object> result = AsyncExecutionAspectSupport.CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
            if (result != null) {
                return result;
            }
        }

        if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor)executor).submitListenable(task);
        } else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
        } else {
            executor.submit(task);
            return null;
        }
    }

    protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
        if (Future.class.isAssignableFrom(method.getReturnType())) {
            ReflectionUtils.rethrowException(ex);
        } else {
            try {
                this.exceptionHandler.handleUncaughtException(ex, method, params);
            } catch (Throwable var5) {
                this.logger.error("Exception handler for async method '" + method.toGenericString() + "' threw unexpected exception itself", var5);
            }
        }

    }

    @UsesJava8
    private static class CompletableFutureDelegate {
        private CompletableFutureDelegate() {
        }

        public static <T> Future<T> processCompletableFuture(Class<?> returnType, final Callable<T> task, Executor executor) {
            return !CompletableFuture.class.isAssignableFrom(returnType) ? null : CompletableFuture.supplyAsync(new Supplier<T>() {
                public T get() {
                    try {
                        return task.call();
                    } catch (Throwable var2) {
                        throw new CompletionException(var2);
                    }
                }
            }, executor);
        }
    }
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics