RabbitMq TTL+死信队列 延迟消息问题记录
原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。
延迟队列存储的对象是对应的延迟消息,所谓的延迟消息是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
利用RabbitMq
的TTL
和死信队列 来实现延时消费。
如果设置的是队列统一过期时间放到死信队列,没有什么问题。
(资料图片仅供参考)
如果是延时时间设置到每条消息上的。而不是给队列的。
实现方式为消息存活时间为动态用户页面可配置的。
先用一条消息的存活时间是1天。后面又进了一条消息存活时间是1小时。
结果一小时到了,发现这条消息并没有被转发到消费延时过期消息的队列。
原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。
它不会检测每一条消息是否过期。而是顺序检测。
如果first in
的消息过期时间很长,会导致它阻塞后进的消息。
不仅无法实现真正的过期时间。还会导致,一个大的过期时间的先进的消息,会堆积一堆后进的过期时间短的消息。
这个时候可以使用rabbitMq的一个插件:rabbitmq_delayed_message_exchange
一段时间以来,人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送
需要根据自己的rabbitMq选择对应的版本。我rabbitMq的版本是RabbitMQ 3.11.0
,对应的插件版本就是:3.11.1
--1、cd到rabbitmq默认安装位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通过ftp工具将插件上传到此目录下--3、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重启MQ服务systemctl restart rabbitmq-server
--1、通过ftp工具将插件上传到Linux服务器的根目录下--2、拷贝到docker中rabbitmq插件目录下,rabbitmq_delayed_message_exchange-3.9.0.ez(下载包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、进入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(确保2中的操作已经将插件拷贝过来了)cd pluginsls |grep delay--5、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重启MQ服务docker restart 容器ID
web界面新建交换机选择类型出现红框标注即表示成功
@Configurationpublic class DelayRabbitmqConfig { /** * 声明延迟队列 * @return */ @Bean public Queue delayQueue(){ return new Queue(QueueConstant.DelayQueue, true,false,false); } /** * 声明延迟自定义交换机类型 * @return */ @Bean public CustomExchange delayCustomExchange(){ HashMap args = new HashMap<>();// 设置 x-delayed-type 为 direct,当然也可以是 topic 等 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递 args.put("x-delayed-type","direct"); return new CustomExchange(ExchangeConstant.DelayCustomerExchange, "x-delayed-message",true,false,args); } /** * 绑定延迟交换机和队列 * @return */ @Bean public Binding delayQueueAndCustomExchange(){ return BindingBuilder.bind(delayQueue()) .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs(); }}
引入依赖: xmlns:util="http://www.springframework.org/schema/util" http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd
//消息发送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每条消息时间配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延迟消息处理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor { /** * 消息延迟时间,单位:毫秒 */ private final Integer TTL; public MyMessagePostProcessor(final Integer ttl) { this.TTL = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(TTL); return message; }}
关键词: RabbitMQ
原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。
1、循环泵指装置中输送反应、吸收、分离、吸收液再生的循环液用泵。文章到此就分享结束,希望对大家有所帮助。
02月26日18时海南屯昌疫情数据阳了以后为什么会腰疼?应该怎么办?以下为详情!一、02月26日18时屯昌疫情数据概览
1、东润枫景一期位于北京市朝阳朝阳公园,由北京天鸿永业房地产开发有限责任公司建成,共计房屋156户。2、小区物业公司
赵继伟:我们打得还是有些紧乔帅要求我们用团队的力量去竞争,乔帅,赵继伟,亚预赛,中国男篮,新疆男篮,中国体育联赛,中国篮球联赛,奥林匹克运动会
是故愿生彼,阿弥陀佛国。此二句,结成上观察十七种庄严国土成就,所以愿生。释器世间清净,讫之于上。——昙鸾大师《往生论注》是故愿生彼,
1、宁波香格里拉大酒店是位于著名的海港城市——宁波市中心的五星级豪华大酒店。2、酒店波浪形的建筑与城市的中文名称“海定则
青壮年一般高烧阶段不超过3天,到第4、第5天会明显好转。如高烧持续3天以上,且没有缓解趋势,建议前往医院就诊。一、02月26日10时吉林白山通
1、一、傅雷的爱子情深“亲爱的孩子,你走后第二天,就想写信,怕你嫌烦,也就罢了。2、可是没一天不想着你,每天清早六七
1、电动剃须刀:利用电力带动刀片,剃剪胡须和鬓发的整容电器。2、1930年在美国问世。3、电动剃须刀按刀片动作方式分为旋
1、直接把魔兽版本换成1 24版的,比如DOTA6 66的就只能在1 24版的读出来,而1 20版的就提示该地图过大。2
1、我肚子疼,大便拉不出来。2、可能是肠梗阻。3、肠梗阻的典型临床表现是腹痛、腹胀和停止排便排气。4、分为机械性肠梗阻、
1、早在新石器时代,野菜就是人类采集的对象之一。2、中国已发掘的一些新石器时代遗址,如浙江余姚河姆渡遗址中出土有大量
1、缩小PDF文件的方法很简单。这个回答介绍了三种减少的方法。在百度浏览器中进入在线压缩的PDF,选择PDF文件,点击打
1、《天堂的欲望》原名为《天堂的匿名使者》分为上、下部,上部为(天堂的欲望)下部为(天堂的幸福)。2、由湖北映月影视文化
山东观唐温泉(国际)度假村,位于临沂汤泉省级旅游度假区,地理位置优势明显,公铁交通便利发达。这座度假村毗邻沂河而建,温泉
“这是给我们最狠的一记耳光,此刻的事情说明他根本不关心我们。”美国有毒化学列车翻车地东巴勒斯坦市的市长特伦特·康纳威在接受媒体采访时
1、桩基础是通过承台把若干根桩的顶部联结成整体,共同承受动静荷载的一种深基础,而桩是设置于土中的竖直或倾斜的基础构件。2
为深入贯彻落实省、市、区关于烟花爆竹安全管理相关要求和规定,扎实做好2023年全区烟花爆竹经营安全管理工作,切实提高烟花爆竹从业人员安全
1、我是PEP版的,希望能对你有所帮助一、名词复数的规则变化名词复数的规则变化构成法例词A 在一般情况
1、乐山锦江新嘉州文化旅游有限责任公司于2019年03月19日成立。2、法定代表人黄汀江,公司经营范围包括:酒店管理
耕地净增有望实现历史性突破——湖南自然资源服务打好“发展六仗”系列报道之三
夏弥来为大家解答以下的问题,袋式除尘器,说一说袋式除尘器的简介,现在让我们一起来看看吧!1、袋式除尘器是一种干式滤尘装置
1、二圣宫位于陕西省高陵县张卜乡龙胡村。2、年代是清代。3、陕西省第五批文物保护单位。文章到此就分享结束,希望对大家
1、MIPS(MillionInstructionsPerSecond):单字长定点指令平均执行速度Millio
Copyright 2015-2022 太平洋酒业网 版权所有 备案号:豫ICP备2022016495号-17 联系邮箱:93 96 74 66 9@qq.com