聊聊RocketMQTransactionAnnotationProcessor
小编:管理员 198阅读 2022.07.29
本文主要研究一下RocketMQTransactionAnnotationProcessor
RocketMQTransactionAnnotationProcessorrocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
public class RocketMQTransactionAnnotationProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class); private ApplicationContext applicationContext; private final Set复制> nonProcessedClasses = Collections.newSetFromMap(new ConcurrentHashMap , Boolean>(64)); private TransactionHandlerRegistry transactionHandlerRegistry; public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) { this.transactionHandlerRegistry = transactionHandlerRegistry; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (!this.nonProcessedClasses.contains(bean.getClass())) { Class> targetClass = AopUtils.getTargetClass(bean); RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class); this.nonProcessedClasses.add(bean.getClass()); if (listener == null) { // for quick search log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass()); } else { try { processTransactionListenerAnnotation(listener, bean); } catch (MQClientException e) { log.error("Failed to process annotation " + listener, e); throw new BeanCreationException("Failed to process annotation " + listener, e); } } } return bean; } private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) throws MQClientException { if (transactionHandlerRegistry == null) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate", null); } if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must implement interface RocketMQLocalTransactionListener", null); } TransactionHandler transactionHandler = new TransactionHandler(); transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); transactionHandler.setName(listener.txProducerGroup()); transactionHandler.setBeanName(bean.getClass().getName()); transactionHandler.setListener((RocketMQLocalTransactionListener) bean); transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(), listener.keepAliveTime(), listener.blockingQueueSize()); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), listener.accessKey(), listener.secretKey()); if (Objects.nonNull(rpcHook)) { transactionHandler.setRpcHook(rpcHook); } else { log.debug("Access-key or secret-key not configure in " + listener + "."); } transactionHandlerRegistry.registerTransactionHandler(transactionHandler); } @Override public int getOrder() { return LOWEST_PRECEDENCE; } }
- RocketMQTransactionAnnotationProcessor实现了BeanPostProcessor, Ordered, ApplicationContextAware接口
- postProcessAfterInitialization方法会查找标记有RocketMQTransactionListener注解的bean,然后执行processTransactionListenerAnnotation方法
- processTransactionListenerAnnotation方法会创建transactionHandler,然后执行transactionHandlerRegistry.registerTransactionHandler进行注册
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandler.java
class TransactionHandler { private String name; private String beanName; private RocketMQLocalTransactionListener bean; private BeanFactory beanFactory; private ThreadPoolExecutor checkExecutor; private RPCHook rpcHook; public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public String getName() { return name; } public void setName(String name) { this.name = name; } public RPCHook getRpcHook() { return rpcHook; } public void setRpcHook(RPCHook rpcHook) { this.rpcHook = rpcHook; } public BeanFactory getBeanFactory() { return beanFactory; } public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; } public void setListener(RocketMQLocalTransactionListener listener) { this.bean = listener; } public RocketMQLocalTransactionListener getListener() { return this.bean; } public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) { this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(blockingQueueSize)); } public ThreadPoolExecutor getCheckExecutor() { return checkExecutor; } }复制
- TransactionHandler包含了name、beanName、bean、beanFactory、checkExecutor、rpcHook属性
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
public class TransactionHandlerRegistry implements DisposableBean { private RocketMQTemplate rocketMQTemplate; private final Set复制listenerContainers = new ConcurrentSet<>(); public TransactionHandlerRegistry(RocketMQTemplate template) { this.rocketMQTemplate = template; } @Override public void destroy() throws Exception { listenerContainers.clear(); } public void registerTransactionHandler(TransactionHandler handler) throws MQClientException { if (listenerContainers.contains(handler.getName())) { throw new MQClientException(-1, String .format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(), handler.getBeanName())); } listenerContainers.add(handler.getName()); rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook()); } }
- TransactionHandlerRegistry实现了DisposableBean接口,其clear方法直接清空listenerContainers;registerTransactionHandler方法会往listenerContainers添加该handler的name,然后执行rocketMQTemplate.createAndStartTransactionMQProducer来创建并启动TransactionMQProducer
- RocketMQTransactionAnnotationProcessor实现了BeanPostProcessor, Ordered, ApplicationContextAware接口
- postProcessAfterInitialization方法会查找标记有RocketMQTransactionListener注解的bean,然后执行processTransactionListenerAnnotation方法
- processTransactionListenerAnnotation方法会创建transactionHandler,然后执行transactionHandlerRegistry.registerTransactionHandler进行注册
- RocketMQTransactionAnnotationProcessor
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…