计算机系统应用教程网站

网站首页 > 技术文章 正文

Rabbitmq消费端实战 rabbitmq官网

btikc 2024-10-17 08:46:39 技术文章 7 ℃ 0 评论

消费者如何确认消费?

为什么要确认消费?默认情况下消费者在拿到rabbitmq的消息时已经自动确认这条消息已经消费了,讲白话就是rabbitmq的队列里就会删除这条消息了,但是我们实际开发中难免会遇到这种情况,比如说拿到这条消息发现我处理不了比如说参数不对,又比如说我当前这个系统出问题了,暂时不能处理这个消息,但是这个消息已经被你消费掉了 rabbitmq的队列里也删除掉了,你自己这边又处理不了,那么,这个消息就被遗弃了。这种情况在实际开发中是不合理的,rabbitmq提供了解决这个问题的方案,也就是我们上面所说的confirm模式只是我们刚刚讲的是发送方的这次我们来讲消费方的。

首先我们在消费者这边(再强调一遍我这里建议大家消费者和生产者分两个项目来做,包括我自己就是这样的,虽然一个项目也可以,我觉得分开的话容易理解一点)

设置一下消息确认为手动确认:

当然我们要对我们的消费者监听器进行一定的配置的话,我们需要先实例一个监听器的Container 也就是容器,那么我们的监听器(一个消费者里面可以实例多个监听器)可以指定这个容器那么我们只需要对这个Container(容器)进行配置就可以了

首先得声明一个容器并且在容器里面指定消息确认为手动确认:AcknowledgeMode关于这个类就是一个简单的枚举类我们来看看:


AcknowledgeMode关于这个类就是一个简单的枚举类我们来看看:



3个状态不确认手动确认自动确认

我们刚刚配置的就是中间那个手动确认既然现在是手动确认了

那么我们在处理完这条消息之后得使这条消息确认:


正常情况下的效果,我就不演示给大家看了,这里给大家看一个如果忘记退回消息的效果:

这里我把消息确认的代码注释掉:



然后调用生产者发送一条消息我们来看管理页面:



这里能看到有一条消息在rabbitmq当中而且状态是ready

然后我们使用消费者来消费掉他注意这里我们故意没有告诉rabbitmq我们消费成功了来看看效果

这里消费的结果打印就不截图了还是来看管理页面:


就算我们消费端消费了下次但是能看到这条消息还是会在rabbitmq当中只是他的状态为 unacked 就是未确认

这就是我们刚刚说的那种情况无论消费成功与否一定要通知rabbitmq 不然就会这样一直囤积在rabbitmq当中直到连接断开为止.

消息预取

扯完消息确认我们来讲一下刚刚所说的批量处理的问题

什么情况下会遇到批量处理的问题呢?

在这里就要先扯一下rabbitmq的消息发放机制了

rabbitmq 默认他会最快以轮询的机制吧队列所有的消息发送给所有客户端(如果消息没确认的话他会添加一个Unacked的标识上图已经看过了)

那么这种机制会有什么问题呢,对于Rabbitmq来讲这样子能最快速地使自己不会囤积消息而对性能造成影响,但是对于我们整个系统来讲,这种机制会带来很多问题,比如说我一个队列有2个人同时在消费,而且他们处理能力不同,我打个最简单的比方有100个订单消息需要处理(消费)现在有消费者A 和消费者B ,消费者A消费一条消息的速度是 10ms 消费者B 消费一条消息的速度是15ms (当然这里只是打比方)那么 rabbitmq 会默认给消费者A B 一人50条消息让他们消费但是消费者A 他500ms 就可以消费完所有的消息并且处于空闲状态而消费者B需要750ms 才能消费完如果从性能上来考虑的话这100条消息消费完的时间一共是750ms(因为2个人同时在消费)但是如果在消费者A消费完的时候能把这个空闲的性能用来和B一起消费剩下的信息的话,那么这处理速度就会快非常多。

这个例子可能有点抽象,我们通过代码来演示一下

我往Rabbitmq生产100条消息由2个消费者来消费其中我们让一个消费者在消费的时候休眠0.5秒(模拟处理业务的延迟)另外一个消费者正常消费我们来看看效果:

正常的那个消费者会一瞬间吧所有消息(50条)全部消费完(因为我们计算机处理速度非常快)下图是加了延迟的消费者:



可能我笔记里面你看不出效果,这个你自己测试就会发现其中一个消费者很快就处理完自己的消息了另外一个消费者还在慢慢的处理其实这样严重影响了我们的性能了。

其实讲了这么多那如何来解决这个问题呢?

我刚刚解释过了造成这个原因的根本就是rabbitmq消息的发放机制导致的,那么我们现在来讲一下解决方案: 消息预取

什么是消息预取?讲白了以前是rabbitmq一股脑吧所有消息都均发给所有的消费者(不管你受不受得了)而现在是在我消费者消费之前先告诉rabbitmq 我一次能消费多少数据等我消费完了之后告诉rabbitmq rabbitmq再给我发送数据

在代码中如何体现?

在使用消息预取前要注意一定要设置为手动确认消息,原因参考上面划重点的那句话。

因为我们刚刚设置过了这里就不贴代码了,完了之后设置一下我们预取消息的数量一样是在容器(Container)里面设置:



那么设置完之后是什么效果呢?还是刚刚那个例子还是2个消费者因为会在消费者返回消息的确认之后 rabbitmq才会继续发送消息给客户端而且客户端的消息累计量不会超过我们刚刚设置预取的数量,所以我们再运行同样的例子的话会发现 A消费者消费完99条消息了 B消费者才消费1条(因为B消费者休眠了0.5秒才消费完{返回消息确认} 但是0.5秒之内A消费者就已经把所有消息消费完毕了当然如果计算机处理速度较慢这个结果可能会有差异,效果大概就是A消费者会处理大量消息)

我这里的效果就是B消费者只消费一条消息 A消费者就消费完了,效果图就不发了这里同学们尽量自己测试一下或者改变一下参数看看效果。

关于这个预取的数量如何设置呢?我们发现如果设置为1 能极大的利用客户端的性能(我消费完了就可以赶紧消费下一条不会导致忙得很忙闲得很闲)但是,我们每消费一条消息就要通知一次rabbitmq 然后再取出新的消息,这样对于rabbitmq的性能来讲是非常不合理的所以这个参数要根据业务情况设置

我根据我查阅到的资料然后加以测试,这个数值的大小与性能成正比但是有上限,与数据可靠性,以及我们刚刚所说的客户端的利用率成反比大概如下图:



那么批量确认,就是对于我们预取的消息,进行统一的确认。

死信交换机

我们来看一段代码:

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

我们上面解释过这个代码是消息处理失败的确认然后第三个参数我有解释过是消息是否返回到原队列,那么问题来了,如果没有返回给原队列那么这条消息就被作废了?

rabbitmq考虑到了这个问题提供了解决方案:死信交换机(有些人可能叫作垃圾回收器,垃圾交换机等)

死信交换机有什么用呢?在创建队列的时候可以给这个队列附带一个交换机,那么这个队列作废的消息就会被重新发到附带的交换机,然后让这个交换机重新路由这条消息

理论是这样,代码如下:



大概是这样的一个效果:


其实我们刚刚发现所谓死信交换机,只是对应的队列设置了对应的交换机是死信交换机,对于交换机来讲,他还是一个普通的交换机。

下面会列出rabbitmq的常用配置:

队列配置:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表