分布式事务就是这么简单解决RocketMQ方案

小编:管理员 198阅读 2022.08.02

前言

现在比较流行的分布式架构而言,它虽然带来一系列好处,比如支持高并发,高可用集群。同时它也带来一系列的挑战,今天我们将的就是其中一种挑战 - 分布式事务

在传统的all in项目中单数据源的事务一致性依赖于单机事务,但是如果上升到分布式项目中,那么保证事务的一致性仅仅依靠单机事务是不能实现的,这时候就依赖于分布式事务。

介绍

目前业界比较主流的分布式事务解决方法大概可以分为两种

  • 强一致性
  • 最终一致性
强一致性

主要解决方法代表有 2PC 、 Tcc 适用于 金融交易场景

最终一致性

主要解决方法代表有 RocketMQ事务消息 适用于常见的积分订单场景,1、比如创建订单 2、如果订单创建成功 3、增加买家积分 不管中途发生了什么 只要订单成功,那么买家的积分就一定要增加。保证最终一致性

实现架构术语介绍
  • HALF MESSAGE : 事务消息 也称半消息 标识该消息处于"暂时不能投递"状态,不会被Comsumer所消费,待服务端收到生成者对该消息的commit或者rollback响应后,消息会被正常投递或者回滚(丢弃)消息
  • RMQ_SYS_TRANS_HALF_TOPIC :半消息在被投递到Mq服务器后,会存储于Topic为RMQ_SYS_TRANS_HALF_TOPIC的消费队列中
  • RMQ_SYS_TRANS_OP_HALF_TOPIC : 在半消息被commit或者rollback处理后,会存储到Topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中,标识半消息已被处理

在RocketMQ中 核心思路就是 **两段提交 定时回查 **

流程图如下:

image-20190726114320498

1、首先事务发起者 给RocketMQ发送一个半消息

2、RocketMQ响应事务发起者 半消息发送成功

3、事务发起者提交本地事务

4、根据本地事务运行结果 响应RocketMQ 半消息是commit还是rollback

5、如果没有收到第4步通知,则RocketMQ回查事务发起者。

6、事务发起者收到回查通知检查本地消息状态

7、将回查结果返回RocketMQ 根据结果commit/rollback半消息

8、如果broker收到commit 则将半消息从 trans_half队列提交到真正的业务队列中。如果收到rollback或者半消息过期 则提交到trans_op_half队列中。

9、如果半消息被commit 则消息订阅方法能读取消费该消息,只要保证下游消费失败重试,即可保证消息最终一致性。

分析一下 可能遇到的场景

1、半消息发送成功,本地事务运行失败。rollback半消息,下游业务无感知,正常。

2、半消息发送成功,本地事务运行成功。但是第4步通知broker由于网络原因发送失败,但是broker有轮询机制,根据唯一id查询本地事务状态,从而提交半消息。

通过以上几步就实现了RocketMQ的事务消息。

实例

这里通过一个实例来讲一下RocketMQ实现分布式事务具体编码。

场景: 下单场景,当订单支付成功之后,对应的买家对应的账号需要增加积分。(暂时不考虑物流 库存简单分析。)

很明显两个服务, 1、订单服务 2、积分服务

用户付款成之后 1、修改订单状态已支付 2、通知积分服务 给对应的买家涨积分。

实体结构

订单

/**
 * @author yukong
 * @date 2019-07-25 15:18
 * 订单  省略其他字段
 */
@Data
public class Order {

    /**
     * 订单号
     */
    private String orderNo;

    /**
     * 买家id
     */
    private Integer buyerId;

    /**
     * 支付状态 0 已支付 1 未支付 2 已超时
     */
    private Integer payStatus;

    /**
     * 下单日期
     */
    private Date createDate;

    /**
     * 金额
     */
    private Long amount;



}
复制

积分添加记录

/**
 * @author yukong
 * @date 2019-07-25 15:32
 * 积分添加记录表
 */
@Data
public class PointRecord {

    /**
     * 订单号
     */
    private String orderNo;

    /**
     * 用户id
     */
    private Integer userId;

}
复制

首先我们需要实现业务代码,也是修改订单状态,然后记录一条积分添加记录(可以用于事务回查,判断本地事务是否允许成功)。

/**
 * @author yukong
 * @date 2019-07-25 15:14
 */
@Service("payService")
@Slf4j
public class PayService {


    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private PointRecordMapper pointRecordMapper;

    /**
     * 支付功能:
     *  如果支付成功 则下游业务 也就是积分服务对应的账号需要增加积分
     *  如果支付失败,则下游业务无感知
     */
    @Transactional(rollbackFor = Exception.class)
    public void pay(String orderNo, Integer buyerId) {
        // 1、构造积分添加记录表
        PointRecord record = new PointRecord();
        record.setOrderNo(orderNo);
        record.setUserId(buyerId);
        // 2、存入数据库
        pointRecordMapper.insert(record);
         // 3、修改订单状态 为已支付
        Order order = new Order();
        order.setOrderNo(orderNo);
        order.setBuyerId(buyerId);
        //4、 更新订单信息
        orderMapper.updateOrder(order);

        log.info("执行本地事务,pay() ");
    }

    public Boolean checkPayStatus(String orderNo) {
        // 根据判断是否有PointRecord这个记录来 确实是否支付成成功 用于事务回查判断本地事务是否执行成功
        return Objects.nonNull(pointRecordMapper.getPointRecordByOrderNo(orderNo));
    }

}
复制

接下来要实现事务发起者的代码,也是就是半消息发送者。

/**
 * @author yukong
 * @date 2019-07-25 14:48
 * 事务消息生产者
 */
@Component
@Slf4j
public class TransactionProducer implements InitializingBean {

    private  TransactionMQProducer producer;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    @Autowired
    private TransactionListener transactionListener;


    /**
     * 构造生产者
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {

        producer = new TransactionMQProducer(rocketMQProperties.getTransactionProducerGroupName());
        producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
        producer.setExecutorService(executor);

        producer.setTransactionListener(transactionListener);

        producer.start();

    }

    /**
     * 真正的事物消息发送者
     */
    public void send() throws JsonProcessingException, UnsupportedEncodingException, MQClientException {

        ObjectMapper objectMapper = new ObjectMapper();

        // 模拟接受前台的支付请求
        String orderNo = UUID.randomUUID().toString();
        Integer userId = 1;
        // 构造发送的事务 消息
        PointRecord record = new PointRecord();
        record.setUserId(userId);
        record.setOrderNo(orderNo);

        Message message = new Message(rocketMQProperties.getTopic(), "", record.getOrderNo(),
                objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET));

        producer.sendMessageInTransaction(message, null);

        log.info("发送事务消息, topic = {}, body = {}", rocketMQProperties.getTopic(), record);
    }
}
复制

紧接着我们要实现,事务消息的二段提交与事务消息回查本地事务状态的编码。

/**
 * @author yukong
 * @date 2019-07-25 15:08
 * 事务消息 回调监听器
 */
@Component
@Slf4j
public class PointTransactionListener implements TransactionListener {

    @Autowired
    private PayService payService;



    /**
     * 根据消息发送的结果 判断是否执行本地事务
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 根据本地事务执行成与否判断 事务消息是否需要commit与 rollback
        ObjectMapper objectMapper = new ObjectMapper();
        LocalTransactionState state = LocalTransactionState.UNKNOW;
        try {
            PointRecord record = objectMapper.readValue(message.getBody(), PointRecord.class);
            payService.pay(record.getOrderNo(), record.getUserId());
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } catch (UnsupportedEncodingException e) {
            log.error("反序列化消息 不支持的字符编码:{}", e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } catch (IOException e) {
            log.error("反序列化消息失败 io异常:{}", e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    /**
     * RocketMQ 回调 根据本地事务是否执行成功 告诉broker 此消息是否投递成功
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        ObjectMapper objectMapper = new ObjectMapper();
        LocalTransactionState state = LocalTransactionState.UNKNOW;
        PointRecord record = null;
        try {
            record = objectMapper.readValue(messageExt.getBody(), PointRecord.class);
        } catch (IOException e) {
            log.error("回调检查本地事务状态异常: ={}", e);

        }
        try {
            //根据是否有transaction_id对应转账记录 来判断事务是否执行成功
            boolean isCommit = payService.checkPayStatus(record.getOrderNo());
            if (isCommit) {
                state = LocalTransactionState.COMMIT_MESSAGE;
            } else {
                state = LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            log.error("回调检查本地事务状态异常: ={}", e);
        }
        return state;

    }
}
复制

这样我们就实现了分布式事务的最终一致性。

具体消费方代码就不写了,只要上游本地事务运行成功,且事务消息成功投递给对应的topic,这样下游业务对于上游是无感知,所以消费方只要保证幂等性即可。

关联标签: