计算机系统应用教程网站

网站首页 > 技术文章 正文

springboot+rabbitmq+消息发送确认

btikc 2024-10-17 08:46:39 技术文章 6 ℃ 0 评论

首先说一下队列中的消息是怎么存储的,rabbitmq没有开启消息的持久化的话,消息存储在内存中,如果此时重启服务器,那么消息将会丢失,如果开启持久化则会牺牲性能,响应时间和吞吐量。

开启持久化需要做以下3步:

  • 生产者在生产消息的时候,将消息的投递模式(MessageDeliveryMode)设置为2(持久)

  • 发送到持久化的交换器(配置了durable=true)

  • 到达持久化的队列(配置了durable=true)

在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的(重点),也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ为我们提供了两种方式:

方式一:通过AMQP事务机制实现,这也是从AMQP协议层面提供的解决方案;

方式二:通过将channel设置成confirm模式来实现

这里我们探讨一下confirm模式的实现

生产者确认模式实现原理:

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;

特别强调,这是生产者和rabbit服务器之间的互动,没有消费者的联系

1、导入jar包

消息生产者

不论是创建消息消费者或生产者都需要ConnectionFactory,如下面的操作

参考截图

但是在springboot中,不是必须这样写,也可以再application.properties文件中设置

这样同样可以实现上面的效果,特别注意:spring.rabbitmq.publisher-confirms=true,必须指定,设置为true才可以进行消息的回调

RabbitTemplate

通过使用RabbitTemplate来对开发者提供API操作,其中setConfirmCallback()方法中就是设置返回确认消息的,当有消息回调时都会调用confim方法

在发送消息时通过调用RabbitTemplate中的如下方法

exchange:交换机名称 routingKey:路由关键字 object:发送的消息内容 correlationData:消息ID

调用方式例子:

也可以这么写发送的方法:

这里说明一下,convertAndSend方法时默认发的持久化的信息,进去源码可以看到第三个参数是Message对象,而实例化Message对象的时候需要提供MessageProperties类对象,在MessageProperties类型默认的为持久化消息

在回调函数中打印的日志:

另外说一下,这里只是说了一下生产者这边的代码,但是没有创建Exchange和queue所以还不能发送,可以在消费者这端创建,也可以在生产者端创建

具体可以参考:springboot整合rabbitmq

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表