1.简介
消息消费异常时会自动进行消费重试,达到最大重试次数后还未成功,则消息会转为死信状态。
RocketMQ 支持将这些死信消息保存至指定Topic,方便后续进行业务恢复或回溯。
本文介绍死信消息的应用场景、死信策略、使用建议和自定义构建死信队列。
2.应用场景
典型死信消息处理场景
消息重试失败后,您可以选择将死信消息存储到指定的死信Topic中,通过另外创建一个ConsumerGroup消费这些死信消息来处理异常链路或分析死信消息。
典型错误使用死信消息场景
处理死信消息时,如果将死信消息多层流转,转储回原Topic,会引起死信消息再次被循环重试,可能会引起雪崩效应。
3.死信策略
3.1 什么时候消息会转为死信状态
消息重试达到最大重试次数后还没有被成功消费,消息将不再被投递,转为死信状态。
3.2 死信消息保存规则
云消息队列 RocketMQ 版默认不保存死信消息,消息转为死信状态后将被丢弃。
您可以通过控制台开启死信消息保存功能,开启后,死信消息将被存储至指定的Topic中,该Topic被称为死信Topic。具体操作,请参见配置死信消息保存规则。
死信消息是作为一条新的消息被存储到死信Topic中,其消息属性变更如下:
- Message ID:死信消息转储到死信Topic后会生成新的Message ID。
- 用户自定义属性、消息体等用户设置的信息不变。
- 死信消息的保存时长:从进入死信Topic后重新开始计算。例如,某条消息生产者发送到服务端的时间为13∶00∶00,2个小时后(15∶00∶00)该消息消费失败且重试失败被存储至死信Topic,则该消息的保存时长从15∶00∶00开始计算。
4.使用建议
4.1 消费者如何获取原Topic信息
- 方案一:在设置死信Topic时,将死信Topic和原Topic进行一一对应。
例如,原消息Topic为testTopic,则将死信Topic设置为DLQ-testTopic。
- 方案二:将原Topic的信息设置到消息的自定义属性中。示例如下:
messageBuilder.addProperty("originalTopic","testTopic")
4.2 死信消息和主业务区分处理
死信消息是正常业务流程中消息重试仍然失败的消息,需要和主业务流程分开处理,避免影响正常业务。
死信Topic和原消息发送Topic不能相同,避免死信消息通过循环配置流转回原Topic,导致无法消费的消息影响正常的ConsumerGroup业务,引起业务雪崩。
避免使用生产流程中的ConsumerGroup去消费死信消息,影响正常消息的消费。
4.3 消息在监听器中如何ACK
4.3.1 java API
4.3.2 springboot rocketmq
在Springboot 集成rocketmq中,监听器如下:
@Slf4j
@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "consumer_order",
messageModel = MessageModel.CLUSTERING)
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
log.info("consumer收到消息 [{}]", msg);
}
}
我们只要关注自己的业务逻辑实现即可,不过业务方法执行抛异常时,不可自己捕获,而要让其抛出来。
原因如下:
DefaultRocketMQListenerContainer实现了MessageListenerConcurrently方法,它会循环调用rocketMQListener.onMessage,
- onMessage()方法正常执行后,就会返回 CONSUME_SUCCESS 状态,即 ACK。
- 出现异常会设置delayLevelWhenNextConsume,然后立即返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,让MQ服务重新延迟再次发送消息。
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
rocketMQListener.onMessage(doConvertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
当某个消息重复执行了16次后,rocketmq就会把这条消息判断为死信消息,转向到死信队列中去。简而言之,就是消息不能执行,MQ就会把它转移到另一个队列中,不能对原有的队列产生影响。
5.自定义构建死信队列
实现二次封装需要创建一个自定义的starter,这样其他项目只需要依赖此starter即可使用封装功能。
简单做法:
- 1.用模板方法封装原来的 onMessage()方法,里面会加入日志打印,异常处理。
- 2.当onMessage()方法执行正常时,和原来的逻辑一样;
- 3.当onMessage()方法执行异常时,会有个判断异常是否捕获。
- 若抛出异常,则和原本的执行逻辑一样,让Spring返回RECONSUME_LATER状态。
- 若捕获异常,则会重新生成消息,发送给MQ,这里有个次数限制,超过3次,则会转发到另一个自定义普通队列中(即自定义的死信队列)。
以上做法,都是基于 RocketMQ没有开启死信队列,如果死信队列是开启的,则只要监听死信队列这个就可以了。
5.1 消息实体类的封装
@Data
public abstract class BaseMessage {
/**
* 业务键,用于RocketMQ控制台查看消费情况
*/
protected String key;
/**
* 发送消息来源,用于排查问题
*/
protected String source = "";
/**
* 发送消息类型(普通消息默认空,重试消息,死信消息)
*/
protected String msgType = "";
/**
* 发送时间
*/
protected LocalDateTime sendTime = LocalDateTime.now();
/**
* 重试次数,用于判断重试次数,超过重试次数发送异常警告
*/
protected Integer retryTimes = 0;
/**
* 源主题
*/
private String originalTopic;
/**
* 源tag
*/
private String originalTag;
/**
* 错误信息
*/
private String error;
}
后面所有发送的消息实体都需要继承此实体类。
5.2 消息发送工具类的封装
@Slf4j
public class RocketMqEnhanceTemplate {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public RocketMQTemplate getTemplate() {
return rocketMQTemplate;
}
/**
* 根据系统上下文自动构建隔离后的topic
* 构建目的地
*/
private String buildDestination(String topic, String tag) {
return topic + ":" + tag;
}
/**
* 发送同步消息
*/
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
// 注意分隔符
return send(buildDestination(topic, tag), message);
}
private <T extends BaseMessage> SendResult send(String destination, T message) {
// 设置业务键,此处根据公共的参数进行处理
// 更多的其它基础业务处理...
Message<T> sendMessage = MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = rocketMQTemplate.syncSend(destination, sendMessage);
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message),
JSONObject.toJSON(sendResult));
return sendResult;
}
/**
* 发送延迟消息
*/
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
return send(buildDestination(topic, tag), message, delayLevel);
}
private <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = rocketMQTemplate.syncSend(destination, sendMessage, 3000, delayLevel);
log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
return sendResult;
}
}
这里封装了一个消息发送类,实现了日志记录以及自动重建topic的功能(即生产者实现环境隔离),后面项目中只需要注入RocketMQEnhanceTemplate来实现消息的发送。
5.3 消费者的封装
@Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> {
/**
* 默认重试次数(超过3次需要进入死信队列,这个需要在使用时申请创建死信队列)
*/
private static final int MAX_RETRY_TIMES = 3;
/**
* 截取错误信息最大长度
*/
private static final int ERROR_MSG_MAX_LENGTH = 100;
/**
* 延时等级
*/
private static final int DELAY_LEVEL = MessageLevelEnum.ONE_MINUTE.getLevel();
@Resource
private RocketMqEnhanceTemplate rocketMQEnhanceTemplate;
/**
* 消息处理
*
* @param message 待处理消息
* @throws Exception 消费异常
*/
protected abstract void handleMessage(T message) throws Exception;
/**
* 使用模板模式构建消息消费框架,这里会由父类打印日志,可自由扩展或删减
*/
public void dispatchMessage(T message) {
log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
// 超过最大重试次数时调用子类方法处理
if (message.getRetryTimes() > getMaxRetryTimes()) {
handleMaxRetriesExceeded(message);
return;
}
try {
long now = System.currentTimeMillis();
handleMessage(message);
long costTime = System.currentTimeMillis() - now;
log.info("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime);
} catch (Exception e) {
log.error("消息{}消费异常", message.getKey(), e);
// 是捕获异常还是抛出,由子类决定
if (throwException()) {
// 抛出异常,由 DefaultMessageListenerConcurrently 类处理
// 消息让 rocketmq 重试,消费正常则ACK;消费异常,在 rocketmq 中重试次数达到16次会进入死信队列
throw new RuntimeException(e);
}
//最后一次设置异常消息
if (message.getRetryTimes() > getMaxRetryTimes() - 1) {
if (e.getMessage().length() > ERROR_MSG_MAX_LENGTH) {
message.setError(e.getMessage().substring(0, ERROR_MSG_MAX_LENGTH));
} else {
message.setError(e.getMessage());
}
}
// 此时是捕获异常,说明不开启重试机制,本次消息默认ACK
handleRetry(message);
}
}
/**
* 重试
* @param message
*/
private void handleRetry(T message) {
// 获取子类RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (annotation == null) {
throw new RuntimeException("重试消息发送失败");
}
//重新构建消息体
message.setMsgType(MessageTypeEnum.RETRY.getCode());
//消费次数+1
message.setRetryTimes(message.getRetryTimes() + 1);
SendResult sendResult;
try {
// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
sendResult = rocketMQEnhanceTemplate.send(
annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
} catch (Exception ex) {
// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
//由生产者直接发送
throw new RuntimeException(ex);
}
// 发送失败的处理就是不进行ACK,继续由RocketMQ重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("重试消息发送失败");
}
}
/**
* 超过重试次数消息,发送到死信队列
* <br/>
* 1.死信Topic和原消息发送Topic不能相同,避免死信消息通过循环配置流转回原Topic,
* 导致无法消费的消息影响正常的ConsumerGroup业务,引起业务雪崩。
* <br/>
* 2.避免使用生产流程中的ConsumerGroup去消费死信消息,影响正常消息的消费。
* @param message 待处理消息
*/
protected void handleMaxRetriesExceeded(T message) {
// 如果消息发送不成功,则再次重新发送,
// 如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (annotation == null) {
throw new RuntimeException("重试消息发送失败");
}
// 这里简单处理,直接保存主题.
// 但这是重新发送主题,会使订阅该主题的消费者都会再次消费该消息.
String deadLetterTopic = ENHANCE_DEAD_LETTER_QUEUE_PREFIX + annotation.topic();
String tag = annotation.selectorExpression();
//重新构建消息体
message.setOriginalTopic(annotation.topic());
message.setOriginalTag(tag);
message.setMsgType(MessageTypeEnum.DEAD_LETTER.getCode());
rocketMQEnhanceTemplate.send(deadLetterTopic, tag, message);
}
/**
* 最大重试次数
*
* @return 最大重试次数, 默认5次
*/
protected int getMaxRetryTimes() {
return MAX_RETRY_TIMES;
}
/**
* isRetry开启时,重新入队延迟时间
*
* @return -1:立即入队重试
*/
protected int getDelayLevel() {
return DELAY_LEVEL;
}
/**
* 消费异常时是否抛出异常
* true: 则由rocketmq机制自动重试(默认)
* false: 捕获异常,开启消息再次延迟发送
*/
protected boolean throwException() {
return true;
}
}
使用模版设计模式定义了消息消费的骨架,实现了日志打印,异常重试等公共逻辑,业务处理则交由子类实现。
5.4 基础配置类
@Configuration
public class RocketMqEnhanceAutoConfiguration {
@Bean
public RocketMqEnhanceTemplate rocketMqEnhanceTemplate() {
return new RocketMqEnhanceTemplate();
}
/**
* 解决RocketMQ Jackson不支持Java时间类型配置
*/
@Bean
@Primary
public RocketMQMessageConverter enhanceRocketMQMessageConverter() {
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeMessageConverter =
(CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverterList) {
if (messageConverter instanceof MappingJackson2MessageConverter) {
MappingJackson2MessageConverter jackson2MessageConverter =
(MappingJackson2MessageConverter) messageConverter;
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
objectMapper.registerModules(new JavaTimeModule());
}
}
return converter;
}
}
5.5 定义加载文件
在项目中的 resources
目录中创建 META-INF\spring.factories
文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.zhouzz.common.rocketmq.config.RocketMqEnhanceAutoConfiguration
之后就可以在其他项目中导入使用了。
5.6 如何使用
@Slf4j
@RocketMQMessageListener
public class DemoMessageListener extends EnhanceMessageHandler<DemoMessage> implements RocketMQListener<DemoMessage> {
/**
* 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
*/
@Override
public void onMessage(DemoMessage demoMessage) {
// 这里会由父类打印日志
super.dispatchMessage(demoMessage);
}
@Override
public void handleMessage(DemoMessage message) throws Exception {
// 此时这里才是最终的业务处理
}
@Override
protected boolean throwException() {
// 当返回 false,出现异常时,不抛出异常。
// 由自定义逻辑重试3次,3次后进入自定义的死信队列(这个与rocketMQ定义的死信队列不是同一个)
return false;
}
}
评论区