目 录CONTENT

文章目录

RocketMQ如何支持多事务消息

zhouzz
2024-09-29 / 0 评论 / 0 点赞 / 6 阅读 / 14661 字
温馨提示:
本文最后更新于 2024-09-29,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1. 问题背景

在实际开发中,我们常常会面临多事务消息的场景,例如在订单模块中,用户支付后需要调用库存服务进行库存扣减,而在订单确认收货后需要调用用户服务实现积分赠送。这两个业务逻辑都需要通过事务消息来保证分布式事务。

为了处理这种情况,我们可能会考虑在订单模块中创建两个事务消息监听器,分别用于处理库存扣减和积分赠送的事务处理和事务回查。

//处理订单支付的事务监听器
@Component
@Slf4j
public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
    //处理订单支付逻辑
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
      //检查订单处理逻辑
   }
}

//处理订单收货的事务监听器
@Component
@Slf4j
public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
   }
}

然而,当我们信心满满地完成业务逻辑编写并启动服务时,可能会遇到如下错误:rocketMQTemplate already exists RocketMQLocalTransactionListener

在rocketmq-spring-boot-starter版本低于2.1.0的项目中,可以使用多个 @RocketMQTransactionListener 监听不同的 txProducerGroup 来发送不同类型的事务消息到topic。然而,从 RocketMQ-Spring 2.1.0 版本开始,注解 @RocketMQTransactionListener 不能设置 txProducerGroup、ak、sk,这些值均需与对应的 RocketMQTemplate 保持一致。通过阅读源码 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已经存在了 RocketMQTransactionListener 则会出现上述错误。

2. 如何解决

发送消息体时增加自定义header,或者自定义消息体增加类别的区分都可以。

比如:

MessageBuilder.withPayload(message).setHeader("trans_source", "OrderPaid").build();

MessageBuilder.withPayload(message).setHeader("trans_source", "OrderReceived").build();

这样在事务监听器中根据消息获取 header,然后根据 trans_source的不同进行不同的业务处理。不过这样有很多 if 判断。

如何将 很多if 判断去掉呢?

这就下面说的 工厂模式 + 策略模式消除 if。

为了在保证系统只有一个 RocketMQTransactionListener 的前提下实现多事务消息,我们可以将 RocketMQLocalTransactionListener 不处理具体业务逻辑,而是将其作为一个分发器使用。

在生产者发送事务消息时指定对应的事务处理器 ,并将事务处理器放置在消息头上发送出去,在 RocketMQTransactionListener 中根据消息头选择具体的事务处理器来实现业务逻辑。

3.具体实现

3.1 定义事务消息处理接口

首先,定义公共的事务消息处理接口,所有事务消息都实现此接口而非 RocketMQ 默认的 RocketMQLocalTransactionListener。

public interface EnhanceTransactionMessageHandler {

    /**
     * 执行本地事务
     * @param message 消息体
     * @param arg     参数
     * @return 执行结果
     */
    RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg);

    /**
     * 检查本地执行状态
     * @param message 消息体
     * @return 执行结果
     */
    RocketMQLocalTransactionState checkLocalTransaction(Message message);
}

3.2 修改事务消息发送工具类,指定消息处理器

public <T extends BaseMessage> TransactionSendResult sendTransaction(
            String topic, String tag, T message, String messageListenerBeanName) {
    if (messageListener == null) {
        throw new IllegalArgumentException("transactionMessageListener must not null");
    }
    String destination = buildDestination(topic, tag);
    Message<T> sendMessage = MessageBuilder.withPayload(message)
            .setHeader(RocketMQHeaders.KEYS, message.getKey())
            .setHeader(SOURCE_HEADER, message.getSource())
            .setHeader(TRANSACTION_MESSAGE_HEADER, messageListenerBeanName)
            .build();
    TransactionSendResult sendResult = rocketMqTemplate.sendMessageInTransaction(destination, sendMessage, null);
    log.info("[{}]事务消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
    return sendResult;
}

3.3 修改RocketMQ事务消息监听器

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor
public class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener {
    /**
     * 构造器注入
     * final + `@RequiredArgsConstructor` 结合,能使Spring对handlerHashMap进行属性注入
     */
    final Map<String, EnhanceTransactionMessageHandler> handlerHashMap;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("消费者收到事务消息[{}]", JSONObject.toJSON(message));
        String listenerName = (String) message.getHeaders().get(TRANSACTION_MESSAGE_HEADER);
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        RocketMQLocalTransactionState state;
        try {
            EnhanceTransactionMessageHandler messageHandler = handlerHashMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.executeLocalTransaction(message, arg);
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return state;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("消费者收到事务回查消息[{}]", JSONObject.toJSONString(message.getHeaders()));
        String listenerName = (String) message.getHeaders().get(TRANSACTION_MESSAGE_HEADER);
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        RocketMQLocalTransactionState state;
        try {
            EnhanceTransactionMessageHandler messageHandler = handlerHashMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.checkLocalTransaction(message);
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return state;
    }
}

在上述代码中,根据消息头中的TRANSACTION_MESSAGE_HEADER参数选择对应的事务处理器来处理事务消息。

3.4 如何使用

所有的事务消息处理逻辑都实现 TransactionMessageHandler 接口,以订单支付的处理逻辑为例

@Slf4j
@Component(value = "orderPaidTransactionMessageHandler")
public class OrderPaidTransactionMessageHandler implements EnhanceTransactionMessageHandler {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
        log.info("事务提交,消息[{}]正常处理", jsonStr);

        // 返回COMMIT状态的消息会立即被消费者消费到
        // RocketMQLocalTransactionState.COMMIT
        return null;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
        log.info("调用回查本地事务接口,消息:[{}]", jsonStr);

        return RocketMQLocalTransactionState.COMMIT;
    }
}

事务消息发送逻辑,指定事务处理器

messageService.sendTransaction("order_paid", "orderPay", message, "orderPaidTransactionMessageHandler");

6.小结

本文解决了在 RocketMQ 2.1.0 版本以后,无法简单使用多个 @RocketMQTransactionListener 的问题。通过引入事务消息处理接口 TransactionMessageHandler,我们将原有的事务处理器改造成了一个分发器,使得在 DailyMart 项目中可以轻松处理多事务消息的场景。

0

评论区