-消息发送成功”);
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println(“—-消息发送失败”);
}
});

channel.close();
connection.close();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
###### 2.1.4 return机制

- 发送消息之前开启return机制
- 发送消息时指定mandatory参数为true
- 由于return机制是异步处理,所以在发送消息之后不关闭channel
```java
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String message = "Hello World!";

channel.addReturnListener(new ReturnListener() {
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties,
byte[] bytes) throws IOException {
System.out.println("消息未分发到队列中");
}
});
//channel.basicPublish("", "queue1", null, message.getBytes());
channel.basicPublish("ex2", "c", true,null, message.getBytes());

2.2 SpringBoot应用消息确认

2.2.1 配置application.yml(其他配置省略)
1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: simple
publisher-returns: true

2.2.2 开启confirm和return监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.qfedu.mq_producer.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Component
public class PublisherConfireAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

Logger logger = LoggerFactory.getLogger(PublisherConfireAndReturnConfig.class);

@Resource
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if(ack){
logger.info("--------消息发送(到交换机)成功");
}else{
logger.warn("--------消息发送(到交换机)失败");
}
}

@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
logger.info("~~~~~~~~消息发送到交换机但未分发到队列!!!");
}
}

3.避免消息重复消费

重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack

为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

3.1普通Maven项目避免重复消费

  • 生产者,发送消息时,指定messageId

    1
    2
    3
    4
    5
    6
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    .deliveryMode(1) //指定消息书否需要持久化 1 - 需要持久化 2 - 不需要持久化
    .messageId(UUID.randomUUID().toString())
    .build();
    String msg = "Hello-World!";
    channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());
  • 消费者,在消费消息时,根据具体业务逻辑去操作redis

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    DefaultConsumer consume = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    Jedis jedis = new Jedis("192.168.199.109",6379);
    String messageId = properties.getMessageId();
    //1. setnx到Redis中,默认指定value-0
    String result = jedis.set(messageId, "0", "NX", "EX", 10);
    if(result != null && result.equalsIgnoreCase("OK")) {
    System.out.println("接收到消息:" + new String(body, "UTF-8"));
    //2. 消费成功,set messageId 1
    jedis.set(messageId,"1");
    channel.basicAck(envelope.getDeliveryTag(),false);
    }else {
    //3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
    String s = jedis.get(messageId);
    if("1".equalsIgnoreCase(s)){
    channel.basicAck(envelope.getDeliveryTag(),false);
    }
    }
    }
    };

3.2 SpringBoot应用避免重复消费

3.2.1 导入依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
3.2.2 编写配置文件
1
2
3
4
spring:
redis:
host: 47.96.11.185
port: 6379
3.2.3 修改生产者
1
2
3
4
5
6
@Test
void contextLoads() throws IOException {
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
System.in.read();
}
3.2.4 修改消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Autowired
private StringRedisTemplate redisTemplate;


@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 获取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 设置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消费消息
System.out.println("接收到消息:" + msg);

//3. 设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}