延遲隊列存儲的對象是對應(yīng)的延遲消息,所謂的延遲消息是指當(dāng)消息被發(fā)送以后,并不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進(jìn)行消費
利用RabbitMq
的TTL
和死信隊列 來實現(xiàn)延時消費。
如果設(shè)置的是隊列統(tǒng)一過期時間放到死信隊列,沒有什么問題。
(資料圖片)
如果是延時時間設(shè)置到每條消息上的。而不是給隊列的。
實現(xiàn)方式為消息存活時間為動態(tài)用戶頁面可配置的。
這就導(dǎo)致了一個問題:
先用一條消息的存活時間是1天。后面又進(jìn)了一條消息存活時間是1小時。
結(jié)果一小時到了,發(fā)現(xiàn)這條消息并沒有被轉(zhuǎn)發(fā)到消費延時過期消息的隊列。
原因是盡管ttl是設(shè)給每條消息的。但是本質(zhì)上,所有延時消息都還在一個隊列里,對它過期時間的檢測也是從頭部開始的。
它不會檢測每一條消息是否過期。而是順序檢測。
如果first in
的消息過期時間很長,會導(dǎo)致它阻塞后進(jìn)的消息。
不僅無法實現(xiàn)真正的過期時間。還會導(dǎo)致,一個大的過期時間的先進(jìn)的消息,會堆積一堆后進(jìn)的過期時間短的消息。
問題解決
這個時候可以使用rabbitMq的一個插件:rabbitmq_delayed_message_exchange
一段時間以來,人們一直在尋找用RabbitMQ實現(xiàn)延遲消息的傳遞方法,到目前為止,公認(rèn)的解決方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此來實現(xiàn)的,RabbitMQ延遲消息插件新增了一種新的交換器類型,消息通過這種交換器路由就可以實現(xiàn)延遲發(fā)送
插件安裝
需要根據(jù)自己的rabbitMq選擇對應(yīng)的版本。我rabbitMq的版本是RabbitMQ 3.11.0
,對應(yīng)的插件版本就是:3.11.1
基于Linux
--1、cd到rabbitmq默認(rèn)安裝位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通過ftp工具將插件上傳到此目錄下--3、開啟插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重啟MQ服務(wù)systemctl restart rabbitmq-server
基于Docker
--1、通過ftp工具將插件上傳到Linux服務(wù)器的根目錄下--2、拷貝到docker中rabbitmq插件目錄下,rabbitmq_delayed_message_exchange-3.9.0.ez(下載包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、進(jìn)入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(確保2中的操作已經(jīng)將插件拷貝過來了)cd pluginsls |grep delay--5、開啟插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重啟MQ服務(wù)docker restart 容器ID
安裝成功
web界面新建交換機選擇類型出現(xiàn)紅框標(biāo)注即表示成功
代碼實現(xiàn)
1:springBoot配置
@Configurationpublic class DelayRabbitmqConfig { /** * 聲明延遲隊列 * @return */ @Bean public Queue delayQueue(){ return new Queue(QueueConstant.DelayQueue, true,false,false); } /** * 聲明延遲自定義交換機類型 * @return */ @Bean public CustomExchange delayCustomExchange(){ HashMap args = new HashMap<>();// 設(shè)置 x-delayed-type 為 direct,當(dāng)然也可以是 topic 等 發(fā)送消息時設(shè)置消息頭 headers 的 x-delay 屬性,即延遲時間,如果不設(shè)置消息將會立即投遞 args.put("x-delayed-type","direct"); return new CustomExchange(ExchangeConstant.DelayCustomerExchange, "x-delayed-message",true,false,args); } /** * 綁定延遲交換機和隊列 * @return */ @Bean public Binding delayQueueAndCustomExchange(){ return BindingBuilder.bind(delayQueue()) .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs(); }}
springMvc配置
引入依賴: xmlns:util="http://www.springframework.org/schema/util" http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd
代碼實現(xiàn)
//消息發(fā)送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每條消息時間配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延遲消息處理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor { /** * 消息延遲時間,單位:毫秒 */ private final Integer TTL; public MyMessagePostProcessor(final Integer ttl) { this.TTL = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(TTL); return message; }}
關(guān)鍵詞: RabbitMQ