RabbitMQ(8)消息可靠性.md
-消息发送成功”);
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println(“—-消息发送失败”);
}
});
channel.close();
connection.close();
1 | ###### 2.1.4 return机制 |
2.2 SpringBoot应用消息确认
2.2.1 配置application.yml(其他配置省略)
1 | spring: |
2.2.2 开启confirm和return监听
1 | package com.qfedu.mq_producer.utils; |
3.避免消息重复消费
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
3.1普通Maven项目避免重复消费
生产者,发送消息时,指定messageId
1
2
3
4
5
6AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) //指定消息书否需要持久化 1 - 需要持久化 2 - 不需要持久化
.messageId(UUID.randomUUID().toString())
.build();
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());消费者,在消费消息时,根据具体业务逻辑去操作redis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21DefaultConsumer consume = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Jedis jedis = new Jedis("192.168.199.109",6379);
String messageId = properties.getMessageId();
//1. setnx到Redis中,默认指定value-0
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if(result != null && result.equalsIgnoreCase("OK")) {
System.out.println("接收到消息:" + new String(body, "UTF-8"));
//2. 消费成功,set messageId 1
jedis.set(messageId,"1");
channel.basicAck(envelope.getDeliveryTag(),false);
}else {
//3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
String s = jedis.get(messageId);
if("1".equalsIgnoreCase(s)){
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}
};
3.2 SpringBoot应用避免重复消费
3.2.1 导入依赖
1 | <dependency> |
3.2.2 编写配置文件
1 | spring: |
3.2.3 修改生产者
1 |
|
3.2.4 修改消费者
1 |
|
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 LT的编程笔记!