网站首页 > 技术文章 正文
消息确认
完成一项任务可能需要几秒钟。您可能想知道如果其中一个消费者开始一项长期任务并且只完成了部分任务而死去会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果你杀死一个消费者,我们将丢失它刚刚处理的消息。我们还将丢失所有发送给该特定工作人员但尚未处理的消息(如已经此消费者队列的消息)。
但是我们不想丢失任何任务。如果一个消费者死亡,我们希望将任务交付给另一个消费者。
为了确保消息永远不会丢失,RabbitMQ 支持 “消息确认”。消费者发回一个确认,告诉 RabbitMQ 一个特定的消息已经被接收、并处理完成, RabbitMQ 可以自由地删除它。
如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理并将重新排队。如果同时有其他消费者在线,它会迅速将其重新发送给另一个消费者。这样,即使消费者偶尔死亡,您也可以确保不会丢失任何消息。
在所有当前支持的 RabbitMQ 版本中,对消费者交付确认强制执行超时。这有助于检测从不确认交付的错误(卡住)消费者。这样的消费者可能会影响节点的磁盘数据压缩,并可能将节点驱动到磁盘空间之外。
如果消费者在超过超时值(默认为 30 分钟)内未确认其交付,则其通道将因PRECONDITION_FAILED通道异常而关闭。该错误将由消费者连接到的节点记录。该渠道上所有消费者的所有未完成交付将被重新排队。
超时值可在/etc/rabbitmq/rabbitmq.conf中配置(以毫秒为单位):
默认情况下,手动消息确认是打开的。在前面的示例中,我们通过autoAck=true 标志明确地关闭了它。一旦我们完成了一项任务,是时候将此标志设置为false并从工作人员那里发送适当的确认。
官方截图:
示例代码:
消息持久化
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为持久的:
虽然这个命令本身是正确的,但它在我们目前的设置中不起作用。那是因为我们已经定义了一个名为hello的队列 ,它不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向任何尝试这样做的程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如task_queue。
此queueDeclare更改需要同时应用于生产者和消费者代码。
至此,我们确定即使 RabbitMQ 重启, task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久 - 通过将MessageProperties(它实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。
关于消息持久性的注意事项:
- 将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 接受消息并且还没有保存它时,仍然有很短的时间窗口。此外,RabbitMQ 不会对每条消息都执行fsync(2) ——它可能只是保存到缓存中而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经绰绰有余了。如果您需要更强的保证,那么您可以使用 发布者确认(publish/confirms模式)。
公平调度
您可能已经注意到调度仍然不能完全按照我们的意愿工作。例如,在有两个工人的情况下,当所有奇数消息很重而偶数消息都很轻时,一个工人将一直很忙,另一个工人几乎不做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地发送消息。
发生这种情况是因为 RabbitMQ 只是在消息进入队列时分派消息。它不查看消费者未确认消息的数量。它只是盲目地将第 n 个消息发送给第 n 个消费者。
为了解决这个问题,我们可以使用带有prefetchCount = 1设置的basicQos方法 。这告诉 RabbitMQ 一次不要给一个 worker 多条消息。或者,换句话说,在工作人员处理并确认之前的消息之前,不要向工作人员发送新消息。相反,它将把它分派给下一个不忙的工人-即不公平调度:
预取值: prefetchCount与源码解析
此值用于设置每一次RebbitMQ会给这个信道发送的消息的数量,也即允许这个信道上,没有被确认最大的消息数量。
查看Channel源代码,basicQos有三个重载:
prefetchSize:
只接收预取数量的:
同时接收prefetchSize可以接收到字节的总数量:
及接收preferCount和global的:
承前示例,设置多个消费者的basicQos(1)后,可见,由于消费者0处理的速度比较快,所以,RabbitMQ给消费者0发送的信息也最多:
完整代码:
package wj.rabbitmq.workqueue;
import cn.hutool.core.thread.ThreadUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
import java.util.Random;
@Slf4j
public class Receiver {
public static void main(String[] args) throws Exception {
final String queueName = "WorkQueue";
int consumerCount = 1; //定义消费者个数
if (args.length >= 1) {
String countStr = args[0];
if (countStr.matches("\\d+")) {
consumerCount = Integer.parseInt(countStr);
}
}
Connection con = ConnUtils.newConnection();
long time = System.currentTimeMillis();
Random random = new Random();
for (int i = 0; i < consumerCount; i++) {
Thread thread = new Thread(() -> {
final String name = Thread.currentThread().getName();
try {
Channel channel = con.createChannel();
//预取值,即消费者没有确认消息的最大值
channel.basicQos(1);
channel.queueDeclare(queueName, true, false,
false, null);
DeliverCallback callback = (consumerTag, message) -> {
try {
String msg = new String(message.getBody(), "UTF-8");
long sleep = 1000 * random.nextInt(10);
ThreadUtil.sleep(sleep);//休眠Nms
log.info("{} 处理完成:{},处理用时:{}", name, msg, sleep);
//最后一个参数,是否批量确认
long tag = message.getEnvelope().getDeliveryTag();
channel.basicAck(tag,false);
}catch (Exception e){
//最后一个参数:是否重新返回队列
long tag = message.getEnvelope().getDeliveryTag();
channel.basicNack(tag,false,true);
e.printStackTrace();
}
};
CancelCallback cancelCallback = consumerTag -> {
//ignore
};
boolean autoAck = false;//将自动确认设置为false
channel.basicConsume(queueName,autoAck,callback,cancelCallback);
} catch (Exception e) {
e.printStackTrace();
}
});
thread.setName("消费者" + i);
thread.start();
}
}
}
截图:
启动命令:
C:/ > java -cp <yourPackage>.Receiver
Spring的不公平调度
只需要在配置文件中添加:spring.rabbitmq.listener.simple.prefetch=1,即可以实现在springboot项目中的非公平分发。修改后的配置文件:
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。在这种模式下,调度不一定完全按照我们的意愿工作。例如,在有两个工人的情况下,当所有奇数消息很重而偶数消息都很轻时,一个工人将一直很忙,另一个工人几乎不做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地发送消息。
发生这种情况是因为 RabbitMQ 只是在消息进入队列时分派消息。它不查看消费者未确认消息的数量。它只是盲目地将第 n 个消息发送给第 n 个消费者。
但是,“公平调度”是 Spring AMQP 的默认配置。AbstractMessageListenerContainer将DEFAULT_PREFETCH_COUNT的 值定义为 250。如果DEFAULT_PREFETCH_COUNT设置为 1,则行为将是如上所述的循环传递。
- 设置spring.rabbitmq.listener.simple.prefetch=1。
- 调整消费者2的休眠时间为0
- 依然保留消费者2的休眠时间为随机。
- 最后输出的效果如下,因为消费者2运行的速度更快,所以消费者2获取了更多的资源(注意消息必须是顺序输出的)。
排他性
在向 AMQP 0-9-1 客户端注册消费者时,可以将独占标志 设置为 true 以请求消费者成为目标队列中的唯一消费者。仅当当时没有消费者注册到队列时,调用才会成功。这允许确保一次只有一个消费者从队列中消费。
如果独占消费者被取消或死亡,这是应用程序负责注册一个新消费者以继续从队列中消费。
如果需要独占消费和消费连续性, 单一活跃消费者可能更合适。
测试:
- 先注册一个消费者,再启动一个消费者。
- 后面的消费者应该注册不成功。
- 同时,任何其他的channel都不可以再使用queueDeclare声明这个队列。
- 但生产者可以在不声明的情况下,向这个队列发送数据。
- 其他消费者,如果也没使用queueDeclare声明队列,但试图通过basicConsume消费这个排他的队列时,依然也会抛出异常。
后续消费者,声明或消费这个排他队列时,抛出的异常为:
channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'WorkQueue' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the original declaration., class-id=60, method-id=20)
单一活跃消费者
单个活动消费者允许从队列中一次只拥有一个消费者,并在活动消费者被取消或死亡的情况下故障转移到另一个注册消费者。当消息必须按照它们到达队列的相同顺序被消费和处理时,只使用一个消费者很有用。
典型的事件序列如下:
- 声明了一个队列,并且一些消费者大致同时注册到它。
- 第一个注册的消费者成为单一的活跃消费者:消息被分派给它,而其他消费者被忽略。
- 单个活跃消费者由于某种原因被取消或干脆死亡。其中一个注册的消费者成为新的单一活动消费者,并且消息现在被分派给它。换句话说,队列会自动故障转移到另一个消费者。
- 请注意,如果未启用单个活动消费者功能,则消息将使用轮询方式发送给所有消费者。
声明队列时可以启用单个活动消费者,将 x-single-active-consumer参数设置为true,例如使用 Java 客户端:
官方参考地址:https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/
注意,如果声明了单一活跃的参数,则必须所有生产者和消费者都需要添加这个参数,否则启动会失败。
测试代码:
生产者添加代码:
消费者添加代码,注意以下模拟某个消费者,在处理到第5条信息时,抛出异常,让当前消费者宕机:
运行两个或更多消费者,查看运行的输出,从以下的这个输出可以看出:
- 消费者2先注册为第一个消费者,所以它拥有优先消费权。
- 但消费者2处理到第5个信息时,因抛出异常而终止,所以也没有确认消息6。
- RabbitMQ将后续的消息发送的消费者0,继续处理消息。
消费者的优先级
可以通过x-priority=int来设置消费者的优先级:
设置优先级后,通过UI查看消费者状态:
测试向队列发送1...10个数据,由于消费者2的优先级高,所以消费者会获取到更多的消息:
消费示例代码:
package wj.rabbitmq.workqueue;
import com.rabbitmq.client.*;
import wj.mq.utils.ConnUtils;
import java.util.HashMap;
import java.util.Map;
/**
* 通过x-priority设置消费者优化级
*/
public class ConsumerPriority {
private static final String QUEUE_NAME = "Priority_Test_Queue";
public static void main(String[] args) throws Exception {
final Connection connection = ConnUtils.newConnection();
//直接声明两个消费者,设置两个消费者的优先级不同
for (int i = 1; i <= 2; i++) {
Thread thread = new Thread(() -> {
try {
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true,
false, false, null);
//獲取,name爲1或2
final String name = Thread.currentThread().getName();
final Integer priority = Integer.parseInt(name) * 10;
//聲明消息接收對象
DeliverCallback msgCallBack = (consumerTag, message) -> {
System.err.println("线程"+name+
",优先级:"+priority+",处理消息:"+
new String(message.getBody()));
};
//设置优先级别
Map<String, Object> params = new HashMap<>();
params.put("x-priority", priority);
params.put("name","消费者"+name);
channel.basicConsume(QUEUE_NAME, true,
params, msgCallBack, consumerTag -> {});
} catch (Exception e) {
e.printStackTrace();
}
});
thread.setName("" + i);
thread.start();
}
}
}
截图,画出核心部分代码:
你可以尝试删除x-priority优先级的设置后,再运行程序, RabbitMQ还是会平均分配消息给所有的消费者。
猜你喜欢
- 2024-10-17 一次教会你如何解决RabbitMQ消息丢失问题
- 2024-10-17 RabbitMQ消息可靠性分析和应用 rabbitmq消息数据类型
- 2024-10-17 基于本地消息表实现MQ最终一致性 本地消息表(异步确保)
- 2024-10-17 MQ怎么确保不丢数据 mq防止数据丢失
- 2024-10-17 Java面试必备!RabbitMQ 常用知识点总结,纯手绘23张图带你拿下
- 2024-10-17 SpringBoot+RabbitMQ 实现 RPC 调用
- 2024-10-17 springboot+rabbitmq+消息发送确认
- 2024-10-17 Rabbitmq消费端实战 rabbitmq官网
- 2024-10-17 每日学习~RabbitMQ消息应答机制 rabbit mq五种消息模型
- 2024-10-17 一文搞懂消息推送技术选型 消息推送的几种实现方式
你 发表评论:
欢迎- 11-19零基础学习!数据分析分类模型「支持向量机」
- 11-19机器学习 | 算法笔记(三)- 支持向量机算法以及代码实现
- 11-19我以前一直没有真正理解支持向量机,直到我画了一张图
- 11-19研一小姑娘分享机器学习之SVM支持向量机
- 11-19[机器学习] sklearn支持向量机
- 11-19支持向量机
- 11-19初探支持向量机:用大白话解释、原理详解、Python实现
- 11-19支持向量机的核函数
- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)