什么是RabbitMQ-ACK机制?
作者:程序员马丁
在线博客:https://open8gu.com
大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。
回答话术
RabbitMQ 的 ACK(Message Acknowledgement)是一种消息传递确认的机制,用于确保消息从队列中正确地被消费者接收和处理。当消费者从队列中接收到消息后,它需要向 RabbitMQ 发送一个 ACK 信号来确认消息已被成功处理。
如果消费者处理失败或在处理过程中出现异常,可以选择不发送 ACK 或发送一个 Negative ACK,RabbitMQ 会根据这些信号决定是否需要重新将消息放入队列中,以便其他消费者处理。这个机制确保了消息的可靠性和一致性,防止消息在处理过程中丢失。
下图为手动 ACK 图例:
问题详解
1. 消息消费出错重试
对于 SpringBoot 集成 RabbitMQ,可以使用 Spring Retry 或其他重试机制来配置。在消费者方法上,添加 @Retryable 注解,并在配置文件中指定重试策略,例如重试次数、延迟等。如果消息处理失败,Spring Retry 会自动触发重试。
消息消费出错重试伪代码如下:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
@Service
public class MyConsumer {
@RabbitListener(queues = "myQueue")
@Retryable(maxAttempts = 3) // 设置最大重试次数
public void handleMessage(String message) throws Exception {
// 处理消息的业务逻辑
if (message.equals("error")) {
throw new Exception("处理消息失败");
}
// 处理成功,不抛出异常即表示确认消息
}
}
当然,更常见的是使用 RabbitMQ 内置的消息重试机制来实现消息消费失败,重新发送到队列。
RabbitMQ 确实有内置的重试机制,通常称为"消息重试"或"消息退回"。这个机制基于以下几个重要属性和设置:
1.1. 消息退回 Message Rejection
当消费者无法处理一条消息时,可以选择将该消息退回(reject)。这告诉 RabbitMQ 将消息返回到原始队列。消息退回时,可以选择是否重新排队以进行重试。
1.2. 重试次数 Retry Count
RabbitMQ 允许我们配置队列的最大重试次数。如果一条消息被拒绝并重新排队的次数超过了这个配置的最大重试次数,消息将会被丢弃或移到死信队列。
1.3. 死信队列 Dead Letter Queue
死信队列是用于存储无法被消费者处理的消息的队列。当消息被拒绝并达到最大重试次数时,消息可以被移动到死信队列。我们可以为每个队列配置关联的死信队列。
要想实现消息重试,可以使用以下代码实现:
// 配置队列和死信队列
@Bean
public Queue myQueue() {
return QueueBuilder.durable("myQueue")
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", "dead-letter-queue")
.build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead-letter-queue").build();
}
// 配置消息监听器
@RabbitListener(queues = "myQueue")
public void handleMessage(Message message, Channel channel) throws IOException {
try {
// 处理消息的业务逻辑
// 如果处理失败,可以选择将消息拒绝并重新排队
throw new RuntimeException("处理消息失败");
} catch (Exception ex) {
// 处理失败时,将消息退回到原始队列以进行重试
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
在上述示例中,配置了一个队列 myQueue
,并配置了它的死信队列 dead-letter-queue
。当消息处理失败时,通过 channel.basicNack()
方法将消息退回原始队列以进行重试。如果消息达到最大重试次数,它将被移动到死信队列。
当然,你也可以将消息退回到原队列,这取决于具体的需求。
2. 开启了手动 ACK ,但是没有确认,会怎么样?
如果忘记确认或者确认失败,可能会导致系统出现一些未知的问题,问题的类型具体取决于应用程序的代码逻辑。比如列举以下可能会发生的一些情况:
- 消息重复处理:如果应用程序没有确认消息,RabbitMQ 将认为消息未被消费,然后会将消息重新发送到队列中,等待重新处理。这可能导致消息被多次处理,从而引发数据重复或不一致的问题。
- 消息堆积:如果应用程序持续没有手动确认消息,未确认的消息会不断地堆积在队列中,导致队列中积压大量未处理的消息,可能导致系统资源不足,影响性能。
- 消息丢失:如果应用程序没有手动确认消息,而且消息已被接收但未被处理,那么在重新发送之前,消息可能已被丢失或被其他应用程序处理。
为了避免这些问题,需要考虑以下规避办法:
- 编写可靠的消息处理逻辑,确保消息处理幂等性,这样即使消息被多次处理也不会导致问题。
- 实现异常处理机制,当消息处理失败时,能够记录错误并在适当的时候手动确认消息,以防止消息无限重试。
- 使用消息的 TTL(Time To Live)来设置消息的有效期,避免长时间未被确认的消息一直堆积在队列中。
- 使用消息的死信队列(DLQ)来处理无法被正确处理的消息,以便进一步分析和处理问题。
- 采用监控和警报系统来实时监视消息队列的状态和应用程序的行为,以及在出现问题时及时采取措施。
使用 RabbitMQ 时,手动确认消息虽然是一种灵活的方式,但也需要谨慎处理,以确保消息处理的可靠性和一致性。
3. SpringBoot 集成 RabbitMQ 的消息确认
RabbitMQ 提供了消息确认机制,确保消息在生产者和消费者之间的可靠传递。消息确认分为两部分:消息发送确认和消息接收确认。
3.1. 准备环境
在 SpringBoot 项目中引入 spring-boot-starter-amqp
依赖包。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
修改 application.properties 配置,开启消息确认机制。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 发送者开启 Confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 Return 确认机制
spring.rabbitmq.publisher-returns=true
# 设置消费端手动 ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
定义 Exchange 和 Queue,将队列绑定在交换机上。
@Configuration
public class QueueConfig {
@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
@Bean(name = "confirmTestExchange")
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirmTestExchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue(
@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
@Qualifier("confirmTestQueue") Queue confirmTestQueue) {
return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
}
}
3.2. 消息发送确认
3.2.1. ConfirmCallback 确认模式
使用 ConfirmCallback 确认模式来确认消息是否成功发送到 RabbitMQ Broker。
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
} else {
log.info("消息已确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
3.2.2. ReturnCallback 退回模式
使用 ReturnCallback 退回模式来处理消息投递到队列失败的情况。
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息投递失败,replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
3.2.3. 消息发送时动态设置
在消息发送时设置 Confirm 和 Return 回调,并确保消息持久化。
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
public void sendMessage(String exchange, String routingKey, Object msg) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnCallback(returnCallbackService);
rabbitTemplate.convertAndSend(exchange, routingKey, msg,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
}
消息生产者确认流程基本流程如下图:
3.3. 消息接收确认
使用 @RabbitHandler
注解标注的方法来处理接收到的消息,并进行手动 ACK 确认。
@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("接收到消息:{}", msg);
// 处理业务逻辑
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
3.4. 功能测试总结
进行消息发送测试,确保消息确认机制生效,同时观察 RabbitMQ 的消息流转是否正确,以及模拟异常情况查看重发幂等策略是否健全。
其次,不要忘记消息确认,确保在消费消息后手动 ACK,否则消息会一直存在,导致重复消费。
小心消息无限投递的情况,处理业务异常时要谨慎,建议设置消息重试次数和异常处理策略,以防止消息无限循环投递。
确保消息消费具有幂等性,可以通过将消息持久化并使用唯一性属性进行校验来保证。