目 录CONTENT

文章目录

RocketMQ中死信消息详解

zhouzz
2024-09-28 / 0 评论 / 0 点赞 / 17 阅读 / 27476 字
温馨提示:
本文最后更新于 2024-09-29,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1.简介

消息消费异常时会自动进行消费重试,达到最大重试次数后还未成功,则消息会转为死信状态。

RocketMQ 支持将这些死信消息保存至指定Topic,方便后续进行业务恢复或回溯。

本文介绍死信消息的应用场景、死信策略、使用建议和自定义构建死信队列。

2.应用场景

典型死信消息处理场景

消息重试失败后,您可以选择将死信消息存储到指定的死信Topic中,通过另外创建一个ConsumerGroup消费这些死信消息来处理异常链路或分析死信消息。

典型错误使用死信消息场景

处理死信消息时,如果将死信消息多层流转,转储回原Topic,会引起死信消息再次被循环重试,可能会引起雪崩效应。

3.死信策略

3.1 什么时候消息会转为死信状态

消息重试达到最大重试次数后还没有被成功消费,消息将不再被投递,转为死信状态。

20240928224716.png

3.2 死信消息保存规则

云消息队列 RocketMQ 版默认不保存死信消息,消息转为死信状态后将被丢弃

您可以通过控制台开启死信消息保存功能,开启后,死信消息将被存储至指定的Topic中,该Topic被称为死信Topic。具体操作,请参见配置死信消息保存规则。

死信消息是作为一条新的消息被存储到死信Topic中,其消息属性变更如下:

  • Message ID:死信消息转储到死信Topic后会生成新的Message ID。
  • 用户自定义属性、消息体等用户设置的信息不变。
  • 死信消息的保存时长:从进入死信Topic后重新开始计算。例如,某条消息生产者发送到服务端的时间为13∶00∶00,2个小时后(15∶00∶00)该消息消费失败且重试失败被存储至死信Topic,则该消息的保存时长从15∶00∶00开始计算。

4.使用建议

4.1 消费者如何获取原Topic信息

  • 方案一:在设置死信Topic时,将死信Topic和原Topic进行一一对应。

例如,原消息Topic为testTopic,则将死信Topic设置为DLQ-testTopic。

  • 方案二:将原Topic的信息设置到消息的自定义属性中。示例如下:
messageBuilder.addProperty("originalTopic","testTopic")

4.2 死信消息和主业务区分处理

死信消息是正常业务流程中消息重试仍然失败的消息,需要和主业务流程分开处理,避免影响正常业务。

死信Topic和原消息发送Topic不能相同,避免死信消息通过循环配置流转回原Topic,导致无法消费的消息影响正常的ConsumerGroup业务,引起业务雪崩。

避免使用生产流程中的ConsumerGroup去消费死信消息,影响正常消息的消费。

4.3 消息在监听器中如何ACK

4.3.1 java API

4.3.2 springboot rocketmq

在Springboot 集成rocketmq中,监听器如下:

@Slf4j
@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "consumer_order",
        messageModel = MessageModel.CLUSTERING)
public class OrderConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        log.info("consumer收到消息 [{}]", msg);
    }
}

我们只要关注自己的业务逻辑实现即可,不过业务方法执行抛异常时,不可自己捕获,而要让其抛出来。

原因如下:

DefaultRocketMQListenerContainer实现了MessageListenerConcurrently方法,它会循环调用rocketMQListener.onMessage,

  • onMessage()方法正常执行后,就会返回 CONSUME_SUCCESS 状态,即 ACK。
  • 出现异常会设置delayLevelWhenNextConsume,然后立即返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,让MQ服务重新延迟再次发送消息。
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

    @SuppressWarnings("unchecked")
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt messageExt : msgs) {
            log.debug("received msg: {}", messageExt);
            try {
                long now = System.currentTimeMillis();
                rocketMQListener.onMessage(doConvertMessage(messageExt));
                long costTime = System.currentTimeMillis() - now;
                log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
            } catch (Exception e) {
                log.warn("consume message failed. messageExt:{}", messageExt, e);
                context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

当某个消息重复执行了16次后,rocketmq就会把这条消息判断为死信消息,转向到死信队列中去。简而言之,就是消息不能执行,MQ就会把它转移到另一个队列中,不能对原有的队列产生影响。

5.自定义构建死信队列

实现二次封装需要创建一个自定义的starter,这样其他项目只需要依赖此starter即可使用封装功能。

简单做法:

  • 1.用模板方法封装原来的 onMessage()方法,里面会加入日志打印,异常处理。
  • 2.当onMessage()方法执行正常时,和原来的逻辑一样;
  • 3.当onMessage()方法执行异常时,会有个判断异常是否捕获。
    • 若抛出异常,则和原本的执行逻辑一样,让Spring返回RECONSUME_LATER状态。
    • 若捕获异常,则会重新生成消息,发送给MQ,这里有个次数限制,超过3次,则会转发到另一个自定义普通队列中(即自定义的死信队列)。

以上做法,都是基于 RocketMQ没有开启死信队列,如果死信队列是开启的,则只要监听死信队列这个就可以了。

5.1 消息实体类的封装

@Data
public abstract class BaseMessage {
    /**
     * 业务键,用于RocketMQ控制台查看消费情况
     */
    protected String key;

    /**
     * 发送消息来源,用于排查问题
     */
    protected String source = "";

    /**
     * 发送消息类型(普通消息默认空,重试消息,死信消息)
     */
    protected String msgType = "";

    /**
     * 发送时间
     */
    protected LocalDateTime sendTime = LocalDateTime.now();

    /**
     * 重试次数,用于判断重试次数,超过重试次数发送异常警告
     */
    protected Integer retryTimes = 0;

    /**
     * 源主题
     */
    private String originalTopic;

    /**
     * 源tag
     */
    private String originalTag;

    /**
     * 错误信息
     */
    private String error;
}

后面所有发送的消息实体都需要继承此实体类。

5.2 消息发送工具类的封装

@Slf4j
public class RocketMqEnhanceTemplate {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public RocketMQTemplate getTemplate() {
        return rocketMQTemplate;
    }

    /**
     * 根据系统上下文自动构建隔离后的topic
     * 构建目的地
     */
    private String buildDestination(String topic, String tag) {
        return topic + ":" + tag;
    }

    /**
     * 发送同步消息
     */
    public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(buildDestination(topic, tag), message);
    }

    private <T extends BaseMessage> SendResult send(String destination, T message) {
        // 设置业务键,此处根据公共的参数进行处理
        // 更多的其它基础业务处理...
        Message<T> sendMessage = MessageBuilder.withPayload(message)
                .setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = rocketMQTemplate.syncSend(destination, sendMessage);
        // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
        log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message),
                JSONObject.toJSON(sendResult));
        return sendResult;
    }

    /**
     * 发送延迟消息
     */
    public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(buildDestination(topic, tag), message, delayLevel);
    }

    private <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = rocketMQTemplate.syncSend(destination, sendMessage, 3000, delayLevel);
        log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }
}

这里封装了一个消息发送类,实现了日志记录以及自动重建topic的功能(即生产者实现环境隔离),后面项目中只需要注入RocketMQEnhanceTemplate来实现消息的发送。

5.3 消费者的封装

@Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> {

    /**
     * 默认重试次数(超过3次需要进入死信队列,这个需要在使用时申请创建死信队列)
     */
    private static final int MAX_RETRY_TIMES = 3;

    /**
     * 截取错误信息最大长度
     */
    private static final int ERROR_MSG_MAX_LENGTH = 100;

    /**
     * 延时等级
     */
    private static final int DELAY_LEVEL = MessageLevelEnum.ONE_MINUTE.getLevel();

    @Resource
    private RocketMqEnhanceTemplate rocketMQEnhanceTemplate;

    /**
     * 消息处理
     *
     * @param message 待处理消息
     * @throws Exception 消费异常
     */
    protected abstract void handleMessage(T message) throws Exception;

    /**
     * 使用模板模式构建消息消费框架,这里会由父类打印日志,可自由扩展或删减
     */
    public void dispatchMessage(T message) {
        log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
        // 超过最大重试次数时调用子类方法处理
        if (message.getRetryTimes() > getMaxRetryTimes()) {
            handleMaxRetriesExceeded(message);
            return;
        }
        try {
            long now = System.currentTimeMillis();
            handleMessage(message);
            long costTime = System.currentTimeMillis() - now;
            log.info("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime);
        } catch (Exception e) {
            log.error("消息{}消费异常", message.getKey(), e);
            // 是捕获异常还是抛出,由子类决定
            if (throwException()) {
                // 抛出异常,由 DefaultMessageListenerConcurrently 类处理
                // 消息让 rocketmq 重试,消费正常则ACK;消费异常,在 rocketmq 中重试次数达到16次会进入死信队列
                throw new RuntimeException(e);
            }
            //最后一次设置异常消息
            if (message.getRetryTimes() > getMaxRetryTimes() - 1) {
                if (e.getMessage().length() > ERROR_MSG_MAX_LENGTH) {
                    message.setError(e.getMessage().substring(0, ERROR_MSG_MAX_LENGTH));
                } else {
                    message.setError(e.getMessage());
                }
            }
            // 此时是捕获异常,说明不开启重试机制,本次消息默认ACK
            handleRetry(message);
        }
    }

    /**
     * 重试
     * @param message
     */
    private void handleRetry(T message) {
        // 获取子类RocketMQMessageListener注解拿到topic和tag
        RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
        if (annotation == null) {
            throw new RuntimeException("重试消息发送失败");
        }
        //重新构建消息体
        message.setMsgType(MessageTypeEnum.RETRY.getCode());
        //消费次数+1
        message.setRetryTimes(message.getRetryTimes() + 1);
        SendResult sendResult;
        try {
            // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
            sendResult = rocketMQEnhanceTemplate.send(
                    annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
        } catch (Exception ex) {
            // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
            //由生产者直接发送
            throw new RuntimeException(ex);
        }
        // 发送失败的处理就是不进行ACK,继续由RocketMQ重试
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
            throw new RuntimeException("重试消息发送失败");
        }
    }

    /**
     * 超过重试次数消息,发送到死信队列
     * <br/>
     * 1.死信Topic和原消息发送Topic不能相同,避免死信消息通过循环配置流转回原Topic,
     * 导致无法消费的消息影响正常的ConsumerGroup业务,引起业务雪崩。
     * <br/>
     * 2.避免使用生产流程中的ConsumerGroup去消费死信消息,影响正常消息的消费。
     * @param message 待处理消息
     */
    protected void handleMaxRetriesExceeded(T message) {
        // 如果消息发送不成功,则再次重新发送,
        // 如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
        RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
        if (annotation == null) {
            throw new RuntimeException("重试消息发送失败");
        }

        // 这里简单处理,直接保存主题.
        // 但这是重新发送主题,会使订阅该主题的消费者都会再次消费该消息.
        String deadLetterTopic = ENHANCE_DEAD_LETTER_QUEUE_PREFIX + annotation.topic();
        String tag = annotation.selectorExpression();

        //重新构建消息体
        message.setOriginalTopic(annotation.topic());
        message.setOriginalTag(tag);
        message.setMsgType(MessageTypeEnum.DEAD_LETTER.getCode());

        rocketMQEnhanceTemplate.send(deadLetterTopic, tag, message);
    }

    /**
     * 最大重试次数
     *
     * @return 最大重试次数, 默认5次
     */
    protected int getMaxRetryTimes() {
        return MAX_RETRY_TIMES;
    }

    /**
     * isRetry开启时,重新入队延迟时间
     *
     * @return -1:立即入队重试
     */
    protected int getDelayLevel() {
        return DELAY_LEVEL;
    }

    /**
     * 消费异常时是否抛出异常
     * true: 则由rocketmq机制自动重试(默认)
     * false: 捕获异常,开启消息再次延迟发送
     */
    protected boolean throwException() {
        return true;
    }
}

使用模版设计模式定义了消息消费的骨架,实现了日志打印,异常重试等公共逻辑,业务处理则交由子类实现。

5.4 基础配置类

@Configuration
public class RocketMqEnhanceAutoConfiguration {

    @Bean
    public RocketMqEnhanceTemplate rocketMqEnhanceTemplate() {
        return new RocketMqEnhanceTemplate();
    }

    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter() {
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter =
                (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if (messageConverter instanceof MappingJackson2MessageConverter) {
                MappingJackson2MessageConverter jackson2MessageConverter =
                        (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}

5.5 定义加载文件

在项目中的 resources目录中创建 META-INF\spring.factories文件:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.zhouzz.common.rocketmq.config.RocketMqEnhanceAutoConfiguration

之后就可以在其他项目中导入使用了。

5.6 如何使用

@Slf4j
@RocketMQMessageListener
public class DemoMessageListener extends EnhanceMessageHandler<DemoMessage> implements RocketMQListener<DemoMessage> {

    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(DemoMessage demoMessage) {
        // 这里会由父类打印日志
        super.dispatchMessage(demoMessage);
    }

    @Override
    public void handleMessage(DemoMessage message) throws Exception {
        // 此时这里才是最终的业务处理
    }

    @Override
    protected boolean throwException() {
        // 当返回 false,出现异常时,不抛出异常。
        // 由自定义逻辑重试3次,3次后进入自定义的死信队列(这个与rocketMQ定义的死信队列不是同一个)
        return false;
    }
}

6.小结

0

评论区