RabbitMQ(9)消息延迟实现.md
RabbitMQ延迟机制(TTL)— SpringBoot
1.延迟队列概念
什么是延迟队列
- 延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
RabbitMQ如何实现延迟队列?
- AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过TTL(Time To Live)特性模拟出延迟队列的功能。
消息的TTL(Time To Live)
- 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间.
实现延迟队列
- 延迟任务通过消息的TTL来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。
- 生产者输出消息到Queue1,并且这个消息是设置有有效时间的,比如60s。消息会在Queue1中等待60s,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到处理延迟任务。
2.创建延迟交换机
- 创建路由交换机
- 创建死信队列
- 创建死信转发队列
- 交换机队列绑定
3.SpringBoot实现延迟队列
添加MQ依赖
1
2
3
4
5
6
7
8
9<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>在application.yml配置RabbitMQ服务器连接属性
1
2
3
4
5
6
7
8
9
10
11
12
13spring:
application:
name: mq-sender-demo
rabbitmq:
host: 47.96.11.185
port: 5672
username: ytao
password: admin123
virtual-host: wfx_host
# 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
listener:
simple:
acknowledge-mode: manual生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RabbitProduct{
private RabbitTemplate rabbitTemplate;
public void sendDelayMessage(List<Integer> list) {
//这里的消息可以是任意对象,无需额外配置,直接传即可
log.info("===============延时队列生产消息====================");
log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), list.toString());
this.rabbitTemplate.convertAndSend(
"delay_exchange",
"delay_key",
list,
message -> {
//注意这里时间要是字符串形式
message.getMessageProperties().setExpiration("60000");
return message;
}
);
log.info("{}ms后执行", 60000);
}
}消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class RabbitConsumer {
private CcqCustomerCfgService ccqCustomerCfgService;
/**
* 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
* 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完
* 解决方案:手动ACK,或者try-catch 然后在 catch 里面将错误的消息转移到其它的系列中去
* spring.rabbitmq.listener.simple.acknowledge-mode = manual
* @param list 监听的内容
*/
public void cfgUserReceiveDealy(List<Integer> list, Message message, Channel channel) throws IOException {
log.info("===============接收队列接收消息====================");
log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), list.toString());
//通知 MQ 消息已被接收,可以ACK(从队列中删除)了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
try {
dosomething.....
} catch (Exception e) {
log.error("============消费失败,尝试消息补发再次消费!==============");
log.error(e.getMessage());
/**
* basicRecover方法是进行补发操作,
* 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
* 设置为false是只补发给当前的consumer
*/
channel.basicRecover(false);
}
}
}
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 LT的编程笔记!