Skip to main content

RabbitMQ如何实现延迟队列?

作者:程序员马丁

在线博客:https://open8gu.com

note

大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。

面试话术

RabbitMQ 普通队列的消息希望早点被处理,生产后如果有空闲的消费端会“马上”消费,而延时队列则希望消息被在指定的时间进行处理。简单说,延时队列就是用来存放需要在指定时间被处理的消息的队列。

延迟队列的应用场景非常多,比如工作中常用的:

  • 用户下订单十分钟未支付,自动取消该订单。
  • 用户系统注册三天后未登录,触发短信提醒用户。
  • 用户系统内发起退款三天未处理,触发运营人员通知行为。
  • 预定会议十分钟前通知所有参会人员。

1. TTL (Time To Live)

TTL 是 RabbitMQ 中一个高级特性,可以使一条消息或一个队列具备消息最大存活时间限制(单位为毫秒),如果一个队列开启了 TTL 或者一条消息开启了 TTL 属性,那么这条消息在设置的 TTL 的时间内没有被消费,则会成为“死信”。

RabbitMQ 会将“死信”转发到死信交换机,通过消息的路由键则可以实现路由到不同的死信队列。则消息者只需要绑定并监听指定的死信队列即可实现延迟消费。

交换机(Exchange):交换机是一个消息传递中的路由器,它根据特定的规则(如主题或队列名称)接收消息,并将它们分发到一个或多个队列中。

路由键(routeingKey):用来决定消息应该被发送到哪个特定的队列或消费者,类似于邮件系统中的地址标签,确保消息通过交换机时正确投递到指定的队列。

1-1 (1).jpg

这里需要注意的是,有两种方式设置 TTL:一种是设置整个队列的统一过期时间,另一种是为每条消息单独设置 TTL。

当选择为每条消息单独设定 TTL 时,会出现了一个问题。比如,先发送了一条消息,它的 TTL 是 1 天,然后又发送了一条 TTL 只有 1 小时的消息。按理说,1 小时后,第二条消息应该已经过期,转移到死信队列中去,但实际上不会。

原因在于,虽然我们可以为每条消息单独设置 TTL,但所有这些消息还是在同一个队列里(队列是有序的)。队列在检测消息是否过期时,是按顺序从队头开始检查的。不会检查每条消息是否过期。这意味着,如果队列前面的消息设置了很长的 TTL,它就会阻塞后面的消息被检查。结果就是,一个 TTL 较长的消息不仅没能按预期过期,还会导致许多 TTL 较短的后续消息堆积在它后面,无法及时处理。

队列:一种先进先出(FIFO, First-In-First-Out)的线性数据结构,它允许在一端(队尾)添加元素,在另一端(队头)移除元素。

2. RabbitMQ 插件实现延迟队列

利用 RabbitMQ 提供的官方插件:rabbitmq-delayed-message-exchange,同样可以实现延迟消息,并解决上述问题。插件底层通过实现扩展点来增强 RabbitMQ 来实现延迟队列,是一种更加常用和灵活的方式。

插件下载地址:RabbitMQ 插件 sourl.cn/aze3iV

rabbitmq_delayed_message_exchange+%281%29.jpg

插件底层实现原理是通过为消息添加时间戳,使用优先队列按照预定发送时间排序消息,定期检查优先队列中是否到达预定的发送时间,并定期检查优先队列中的消息,查看是否有消息已经到了预定的发送时间,一旦到达即移出队列并发送到目标交换机进行路由。

插件下载地址:Community Plugins sourl.cn/u8L7jG

问题详解

虽然 TTL 存在繁琐、堆积等不足,但并不是一无是处。

有些公司的 RabbitMQ 因为版本、安全以及规模,或者为了追求稳定性,可能并不具备在 Server 端安装插件的条件。再或者已经形成的开发规范有固定的一套使用 TTL 来处理延迟消息的模板,需要分情况去判断。不管是使用哪种手段来实现消息延迟到达的目的,都需要清楚这两种手段背后的底层原理。

如果没有上述情况限制,插件依然是目前最主流的解决方案。下面给大家描述下如何使用插件来实现延迟消息:

服务端安装步骤

下载 rabbitmq-delayed-message-exchange 插件,将插件按照文档放入服务端的 plugins 目录,执行以下命令加载插件即可。

rabbitmq-plugins directories -s
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

代码中使用

为了简化,只提供了部分代码。

// 声明延迟交换机
@Bean
public CustomExchange delayedMessageExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(exchangeDelayName, "x-delayed-message",true, false,args);
}

// 声明延迟队列
@Bean
public Queue delayedMessageQueue() {
return new Queue(queueDelayName);
}

// 绑定交换机和队列
@Bean
public Binding delayedBinding(CustomExchange delayedMessageExchange, Queue delayedMessageQueue) {
return BindingBuilder
.bind(delayedMessageQueue)
.to(delayedMessageExchange)
.with("routeingKey")
.noargs();
}

// 使用
@Autowired
private AmqpTemplate amqpTemplate;

public void createOrder(Order order) {
// 设置订单未支付自动取消时间为 10 分钟
long delay = 10 * 60 * 1000;

// 发送延迟消息
amqpTemplate.convertAndSend(delayedMessageExchange.getName(), "routeingKey", order,
message -> {
message.getMessageProperties().setDelay(delay);
return message;
});

}

应该声明几个延迟交换机?

如果项目中需要使用多种类型的延迟消息,那么可以声明多个延迟交换机,否则可以仅声明一个延迟交换机。这样可以简化配置,并提高性能。

结合延迟交换机的插件内部实现原理不难看出,每一个延迟交换机都会对 RabbitMQ 的服务端产生开销,所以需要权衡利弊,在性能和可维护性上做考量。

插件的限制和安全性

rabbitmq-delayed-message-exchange 插件内部将消息存储在一个本地表中,只在当前节点上有一个磁盘副本,因此可以在节点重启后仍然存在。另外,当前设计不适合处理大量的延迟消息,例如成千上万或数百万条。

如果已经使用该插件,禁用此插件时需要注意所有未交付的延迟消息将丢失。