环境: springboot2.2.11.RELEASE + RabbitMQ3.7.4
RabbitMQ在以下情况会出现消息的丢失:
- 交换机、队列、消息未持久化,mq重启后会出现消息丢失。
- 生产者发出的消息第一步是投递到交换机,这一步可能因为网络原因导致失败。
- 消息正常投递到交换机后,通过路由key路由到队列的时候出现失败。(没有符合的队列)
- 代码层面,配置层面,考虑不全导致消息丢失。
- 消费端接收到相关消息之后,消费端还没来得及处理消息,消费端机器就宕机了,此时消息如果处理不当会有丢失风险。
RabbitMQ中有两种方案来实现消息的可靠发送,分别如下:
RabbitMQ的事物机制
RabbitMQ的生产者确认机制
准备环境:分别建立交换机和队列
1、交换机test-exchange 持久化,类型为topic
2、队列test-queue test-queue 持久化。
3、交换机与队列绑定, Routing key = tm.#
方案1:事物机制
pom.xml依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: fals
这里配置很简单主要就是消息监听并发数配置,重试相关的配置。
RabbitConfig.java 配置RabbitMQ
@Configuration
public class RabbitConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class) ;
@Bean
public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory) ;
rabbitTemplate.setChannelTransacted(true) ;
return rabbitTemplate ;
}
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
这里必须配置rabbit事物Bean,因为我们的发送消息的方法添加注解事物时需要设置事务管理器。
rabbitTemplate.setChannelTransacted(true) ;这行代码必须设置为true开启事物功能。
MessageSend.java 消息发送
@Component
public class MessageSend {
private static Logger logger = LoggerFactory.getLogger(MessageSend.class) ;
@Resource
private RabbitTemplate rabbitTemplate ;
@Transactional(rollbackFor = {Exception.class}, transactionManager = "rabbitTransactionManager")
public void send(String msg) {
logger.info("准备发送消息:{}", msg);
rabbitTemplate.convertAndSend("test-exchange", "tm.1", msg) ;
if ("1".equals(msg)) {
throw new RuntimeException("消息内容不真实") ;
}
}
}
注意这里要给发送消息的方法添加@Transactional,同时还要设置事物管理器的Bean Name。
MessageController.java 接口发送。
@RestController
@RequestMapping("/messages")
public class MessageController {
@Resource
private MessageSend ms ;
@GetMapping("/send")
public Object send(String msg) {
ms.send(msg) ;
return "success" ;
}
}
测试:
先发送一条正常的消息
http://localhost:8080/messages/send?msg=123
控制台输出:
rabbitmq接受到了我们的消息。
接着发送消息内容为1的消息,当消息内容为1时,发送程序会抛出异常,看看消息是否能够回滚。
http://localhost:8080/messages/send?msg=1
rabbit并没有收到消息,说明消息回滚了。
接下来把@Transactional注解去掉后看看
rabbit收到了消息,这也说明前面配置的事物是生效的。
到这里基于事物机制的消息可靠传输就完了,接下来看看基于消息确认机制。
方案2:消息确认机制
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
acknowledgeMode: MANUAL
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: false
publisherConfirmType取值说明:
- NONE值是禁用发布确认模式,是默认值
- CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
- SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
注意:在springboot2.2.0.RELEASE版本之前amqp正式支持的属性是(spring.rabbitmq.publisher-confirm),用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-confirm-type属性配置代替,用来配置更多的确认类型;
publisherReturns: 可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃。简单说就是消息不能正确送达到队列将回调。
RabbitConfig.java 配置
@Configuration
public class RabbitConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class) ;
@Bean
public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory) ;
// rabbitTemplate.setChannelTransacted(true) ;
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息发送成功, 关联数据的ID:{}", correlationData.getId()) ;
} else {
logger.error("消息发送失败, 失败原因:{}", cause) ;
}
}
}) ;
rabbitTemplate.setMandatory(true) ;
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
logger.error("发送出现错误,请检查exchange:{}, routingKey: {}是否配置正确, 消息内容:{}",
exchange,
routingKey,
new String(message.getBody(), Charset.forName("UTF-8"))) ;
}
}) ;
return rabbitTemplate ;
}
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
rabbitTemplate.setChannelTransacted(true) ; 这个代码一定要注释了或者不要设置或者设置为false;rabbitmq中的事物机制和消息确认机制是互斥的。。。
setConfirmCallback方法:只要消息到达了交换机ack会true(这时候它是不管该交换机是否与队列有绑定),当消息无法到达交换机(如:交换机不存在)ack为false。
setReturnCallback方法:用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调(也就是说到达了交换机,但是没有队列与该交换机绑定)
MessageSend.java 发送程序
@Component
public class MessageSend {
private static Logger logger = LoggerFactory.getLogger(MessageSend.class) ;
@Resource
private RabbitTemplate rabbitTemplate ;
public void sendConfirm(String msg) {
logger.info("准备发送消息:{}", msg);
rabbitTemplate.convertAndSend("test-exchange1", "tm.1", msg) ;
}
}
注意:这里我故意把交换机名字写错加了一个1,就是为了测试用。
MessageController.java 发送程序
@RestController
@RequestMapping("/messages")
public class MessageController {
@Resource
private MessageSend ms ;
@GetMapping("/send")
public Object send(String msg) {
ms.send(msg) ;
return "success" ;
}
@GetMapping("/send2")
public Object send2(String msg) {
ms.sendConfirm(msg) ;
return "success" ;
}
}
测试:
上面我把交换机名字已经故意写错了,test-exchange1,实际是:test-exchange。
访问:/messages/send2?msg=1
控制台输出了错误信息,这些信息就是在setConfirmCallback方法中写的,这证明了当消息无法到达交换机时就被触发。
接下来把test-exchange1改正确:把key改错误
public void sendConfirm(String msg) {
logger.info("准备发送消息:{}", msg);
rabbitTemplate.convertAndSend("test-exchange", "tm1.1", msg) ;
}
测试:
当交换机正确的时候setConfirmCallback方法的执行是正确的ack为true。但是同时setReturnCallback方法也被调用了,因为我们把key写错了,消息没法路由到正确的queue上,所以回调了。
到此 两种可靠消息发送就结束了,接下来简单说下接受消息。
首先是配置中我们要把消息的确认设置为手动:
acknowledgeMode: MANUAL
接受消息的方法:
@RabbitListener(queues= {"test-queue"})
@RabbitHandler
public void listner(Message message, Channel channel) {
System.out.println("接受到消息.....income") ;
byte[] body = message.getBody() ;
MessageProperties mps = message.getMessageProperties() ;
String content = new String(body, Charset.forName("UTF-8")) ;
try {
System.out.println("接受到消息来自交换机: 【" + mps.getReceivedExchange() + "】, 队列:【" + mps.getConsumerQueue() + "】:\n\t\t内容: " + content) ;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true) ;
} catch (Exception e) {
e.printStackTrace();
channel.basicReject(mps.getDeliveryTag(), false) ;
}
}
这里通过 basicAck应答消息,basicReject拒绝消息。通过手动确认的机制来确保消息的正常消费。
完毕!!!
大家帮忙转发,给个关注呗,谢谢
本文暂时没有评论,来添加一个吧(●'◡'●)