可靠消息最终一致性 本地消息表、RocketMQ 事务消息方案

小编:管理员 286阅读 2022.07.29

一、可靠消息最终一致性事务概述

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。 此方案是利用消息中间件完成,如下图:

事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题。因此可靠消息最终一致性方案要解决以下几个问题: 【1】本地事务与消息发送的原子性问题:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。先来尝试下这种操作,先发送消息,再操作数据库这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,据库操作失败。

begin transaction; 
    //1.发送MQ 
    //2.数据库操作 
commit transation;
复制

第二种方案,先进行数据库操作,再发送消息:这种情况下貌似没有问题,如果发送 MQ消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但 MQ其实已经正常发送了,同样会导致不一致。

begin transaction; 
    //1.数据库操作 
    //2.发送MQ 
commit transation;
复制

【2】事务参与方接收消息的可靠性:事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。 【3】消息重复消费的问题:由于步骤2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

二、解决方案【本地消息表方案

本地消息表这个方案最初是 eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。 下面以注册送积分为例来说明:下例共有两个微服务交互,用户服务和积分服务,用户服务负责添加用户,积分服务负责增加积分。

【交互流程如下】:【1】用户注册 :用户服务在本地事务新增用户和增加 “积分消息日志”。(用户表和消息表通过本地事务保证一致)下边是伪代码,这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原子性。

begin transaction; 
    //1.新增用户 
    //2.存储积分消息日志 
commit transation;
复制

【2】定时任务扫描日志:如何保证将消息发送给消息队列呢?经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。 【3】消费消息:如何保证消费者一定能消费到消息呢?这里可以使用 MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ 发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重试向消费者来发送消息。积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复投递此消息。由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性。

三、解决方案【RocketMQ事务消息方案

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。RocketMQ 事务消息设计主要为解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。在 RocketMQ 4.3后实现了完整的事务消息,实际上是对本地消息表的一个封装,将本地消息表移动到了 MQ内部,解决Producer 端的消息发送与本地事务执行的原子性问题。

执行流程如下为方便理解我们还以注册送积分的例子来描述整个流程。Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。 【1】Producer 发送事务消息:Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。本例中,Producer 发送 ”增加积分消息“ 到MQ Server。 【2】MQ Server回应消息发送成功:MQ Server接收到 Producer 发送给的消息则回应发送成功。表示 MQ已接收到消息。 【3】Producer 执行本地事务:Producer 端执行业务代码逻辑,通过本地数据库事务控制。本例中,Producer 执行添加用户操作。 【4】消息投递:若 Producer 本地事务执行成功则自动向 MQServer发送 commit消息,MQ Server接收到 Commit消息后将“增加积分消息” 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息。若Producer 本地事务执行失败则自动向 MQServer发送 Rollback消息,MQ Server接收到 Rollback消息后将删除“增加积分消息”。MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里 ack默认自动回应,即程序执行正常则自动回应ack。 【5】事务回查:如果执行 Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态(维护本地事务状态表)即可。 RoacketMQ提供 RocketMQLocalTransactionListener接口:

public interface RocketMQLocalTransactionListener {
    /**发送prepare消息成功此方法被回调,该方法用于执行本地事务
    * @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
    * @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
    * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
    */
    RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);

     /**@param msg 通过获取transactionId来判断这条消息的本地事务执行状态
      * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
      */
    RocketMQLocalTransactionState checkLocalTransaction(Message msg);
 }
复制

【6】发送事务消息:以下是 RocketMQ提供用于发送事务消息的API:

TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//设置TransactionListener实现
producer.setTransactionListener(transactionListener);
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
复制四、RocketMQ实现可靠消息最终一致性事务

业务说明】通过 RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟两个账户的转账交易过程。两个账户在分别在不同的银行(张三在 bank1、李四在 bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定金额。 上述交易步骤,张三扣减金额与给 bank2发转账消息,两个操作必须是一个整体性的事务。

【核心代码】:程序技术架构如下:

【交互流程如下】:【1】Bank1向 MQ Server发送转账消息; 【2】Bank1执行本地事务,扣减金额; 【3】Bank2接收消息,执行本地事务,添加金额; 【数据库】:在bank1、bank2数据库中新增 de_duplication,交易记录表(去重表),用于交易幂等控制。

DROP TABLE IF EXISTS `de_duplication`; 
CREATE TABLE `de_duplication` ( 
    `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, 
    `create_time` datetime(0) NULL DEFAULT NULL, 
    PRIMARY KEY (`tx_no`) USING BTREE 
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
复制

版本依赖在父工程中指定了rocketmq-spring-boot-starter的版本


    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.0.2
复制

配置rocketMQ在application-local.propertis 中配置 rocketMQ nameServer地址及生产组。

rocketmq.producer.group = producer_bank2
rocketmq.name-server = 127.0.0.1:9876
复制

张三服务层代码

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author Administrator
 * @version 1.0
 **/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    RocketMQTemplate rocketMQTemplate;


    //向mq发送转账消息
    @Override
    public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {

        //将accountChangeEvent转成json
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("accountChange",accountChangeEvent);
        String jsonString = jsonObject.toJSONString();
        //生成message类型
        Message message = MessageBuilder.withPayload(jsonString).build();
        //发送一条事务消息
        /**
         * String txProducerGroup 生产组
         * String destination topic,
         * Message message, 消息内容
         * Object arg 参数
         */
        rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null);

    }

    //更新账户,扣减金额
    @Override
    @Transactional
    public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
        //幂等判断,txNo是在Ctroller中生成的 UUID,全局唯一
        if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //扣减金额
        accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);
        //添加事务日志
        accountInfoDao.addTx(accountChangeEvent.getTxNo());
        if(accountChangeEvent.getAmount() == 3){
            throw new RuntimeException("人为制造异常");
        }
    }
}
复制

张三RocketMQLocalTransactionListener编写 RocketMQLocalTransactionListener接口实现类,实现执行本地事务和事务回查两个方法。

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author Administrator
 * @version 1.0
 **/
@Component
@Slf4j
//生产者组与发送消息时定义组相同
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {

    @Autowired
    AccountInfoService accountInfoService;

    @Autowired
    AccountInfoDao accountInfoDao;

    //事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

        try {
            //解析message,转成AccountChangeEvent
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            String accountChangeString = jsonObject.getString("accountChange");
            //将accountChange(json)转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //执行本地事务,扣减金额
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
            //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //事务状态回查,查询是否扣减金额
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析message,转成AccountChangeEvent
        String messageString = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String accountChangeString = jsonObject.getString("accountChange");
        //将accountChange(json)转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //事务id
        String txNo = accountChangeEvent.getTxNo();
        int existTx = accountInfoDao.isExistTx(txNo);
        if(existTx>0){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}
复制

李四服务层代码

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author Administrator
 * @version 1.0
 **/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    //更新账户,增加金额
    @Override
    @Transactional
    public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
        log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //增加金额
        accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //添加事务记录,用于幂等
        accountInfoDao.addTx(accountChangeEvent.getTxNo());
        if(accountChangeEvent.getAmount() == 4){
            throw new RuntimeException("人为制造异常");
        }
    }
}
复制

【MQ监听类】:通过实现 RocketMQListener接口监听目标 Topic

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;

/**
 * @author Administrator
 * @version 1.0
 **/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_group_txmsg_bank2",topic = "topic_txmsg")
public class TxmsgConsumer implements RocketMQListener {

    @Autowired
    AccountInfoService accountInfoService;

    //接收消息
    @Override
    public void onMessage(String message) {
        log.info("开始消费消息:{}",message);
        //解析消息
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号为李四的
        accountChangeEvent.setAccountNo("2");
        //更新本地账户,增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}
复制五、总结

可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了 RocketMQ作为消息中间件,RocketMQ主要解决了两个功能: 【1】本地事务与消息发送的原子性问题; 【2】事务参与方接收消息的可靠性; 可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。

关联标签: