RabbitMQ延迟机制(TTL)— SpringBoot

1.延迟队列概念

  • 什么是延迟队列

    • 延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
  • RabbitMQ如何实现延迟队列?

    • AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过TTL(Time To Live)特性模拟出延迟队列的功能。
  • 消息的TTL(Time To Live)

    • 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间.
  • 实现延迟队列

    • 延迟任务通过消息的TTL来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。
    • 生产者输出消息到Queue1,并且这个消息是设置有有效时间的,比如60s。消息会在Queue1中等待60s,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到处理延迟任务。

2.创建延迟交换机

  • 创建路由交换机
  • 创建死信队列
  • 创建死信转发队列
  • 交换机队列绑定

3.SpringBoot实现延迟队列

  • 添加MQ依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
    </dependency>
  • 在application.yml配置RabbitMQ服务器连接属性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    spring:
    application:
    name: mq-sender-demo
    rabbitmq:
    host: 47.96.11.185
    port: 5672
    username: ytao
    password: admin123
    virtual-host: wfx_host
    # 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
    listener:
    simple:
    acknowledge-mode: manual
  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Component
    @Slf4j
    public class RabbitProduct{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(List<Integer> list) {
    //这里的消息可以是任意对象,无需额外配置,直接传即可
    log.info("===============延时队列生产消息====================");
    log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), list.toString());
    this.rabbitTemplate.convertAndSend(
    "delay_exchange",
    "delay_key",
    list,
    message -> {
    //注意这里时间要是字符串形式
    message.getMessageProperties().setExpiration("60000");
    return message;
    }
    );
    log.info("{}ms后执行", 60000);
    }
    }
  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    @Component
    @Slf4j
    public class RabbitConsumer {
    @Autowired
    private CcqCustomerCfgService ccqCustomerCfgService;

    /**
    * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
    * 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完
    * 解决方案:手动ACK,或者try-catch 然后在 catch 里面将错误的消息转移到其它的系列中去
    * spring.rabbitmq.listener.simple.acknowledge-mode = manual
    * @param list 监听的内容
    */
    @RabbitListener(queues = "receive_queue")
    public void cfgUserReceiveDealy(List<Integer> list, Message message, Channel channel) throws IOException {
    log.info("===============接收队列接收消息====================");
    log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), list.toString());
    //通知 MQ 消息已被接收,可以ACK(从队列中删除)了
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    try {
    dosomething.....
    } catch (Exception e) {
    log.error("============消费失败,尝试消息补发再次消费!==============");
    log.error(e.getMessage());
    /**
    * basicRecover方法是进行补发操作,
    * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
    * 设置为false是只补发给当前的consumer
    */
    channel.basicRecover(false);
    }
    }
    }