在win10环境下安装RabbitMQ的步骤
第一步:下载并安装erlang
- 原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。
- 下载地址:http://www.erlang.org/downloads
- 下载后双击安装完之后陪erlang的环境变量w
- 最后在cmd窗口中输入erl,若出现下面图片所示情况说明erlang安装成功
第二步:下载并安装RabbitMQ
- 下载地址:http://www.rabbitmq.com/download.html
- 双击下载后的.exe文件,安装过程与erlang的安装过程相同。
- RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQ的sbin目录。
- 然后在后面输入rabbitmq-plugins enable rabbitmq_management命令进行安装
- 最后打开sbin目录,双击rabbitmq-server.bat运行rabbitmq
- 访问管理台的监听端口15672就可以看到控制台页面了
简单队列消息代码示例
public class Producter {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
//设置IP
factory.setHost("127.0.0.1");
//设置端口
factory.setPort(5672);
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//建立tcp连接
Connection connection = factory.newConnection();
//创建通信信道
Channel channel = connection.createChannel();
/**
* 创建一个队列
* 参数说明:
* 1.队列名称
* 2.是否持久化,true:持久化,false:不持久化,默认false这里是队列持久化,并非消息
* 3.是否消息共享,true:被多个队列消费,false:只被一个队列消费
* 4.是否自动删除
* 5.其他参数
*/
channel.queueDeclare(MqContents.QUEUE_NAME, false, false, false, null);
/**
* 发送消息
* 参数说明:
* 1.交换机名称
* 2.队列名或者routingKey
* 3.其他参数(比如持久化)
* 4.消息体
*/
channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好世界".getBytes("utf-8"));
System.out.println("发送完毕");
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
//设置IP
factory.setHost("127.0.0.1");
//设置端口
factory.setPort(5672);
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//建立tcp连接
Connection connection = factory.newConnection();
//创建通信信道
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (messageTag, delivery)->{
System.out.println(new String(delivery.getBody(),"utf-8"));
};
CancelCallback cancelCallback = message -> {
System.out.println(message);
};
/**
* 消费消息
* 参数说明:
* 1.要消费的队列名
* 2.是否自动应答
* 3.消费成功的回调
* 4.取消消费的回调
*/
channel.basicConsume(MqContents.QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,再来看看工作队列。
工作队列模式
工作队列:用来将耗时的任务分发给多个消费者(工作者)
主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。
工作队列也称为公平性队列模式:
循环分发,假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。
代码示例:
public class Producter {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
//设置IP
factory.setHost("127.0.0.1");
//设置端口
factory.setPort(5672);
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//建立tcp连接
Connection connection = factory.newConnection();
//创建通信信道
Channel channel = connection.createChannel();
/**
* 创建一个队列
* 参数说明:
* 1.队列名称
* 2.是否持久化,true:持久化,false:不持久化,默认false这里是队列持久化,并非消息
* 3.是否消息共享,true:被多个队列消费,false:只被一个队列消费
* 4.是否自动删除
* 5.其他参数
*/
channel.queueDeclare(MqContents.QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String str = scanner.next();
/**
* 发送消息
* 参数说明:
* 1.交换机名称
* 2.队列名或者routingKey
* 3.其他参数(比如持久化)
* 4.消息体
*/
channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes("utf-8"));
}
}
}
public class Worker1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
//设置IP
factory.setHost("127.0.0.1");
//设置端口
factory.setPort(5672);
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//建立tcp连接
Connection connection = factory.newConnection();
//创建通信信道
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (messageTag, delivery)->{
System.out.println(new String(delivery.getBody(),"utf-8"));
/**
* 手动应答
* 参数说明:
* 1.消息的标记
* 2.是否批量应答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = message -> {
System.out.println(message);
};
System.out.println("工作者1已启动");
//0是公平分发,1是不公平;大于1是预期值,消息堆积在信道的最大数
channel.basicQos(1);
/**
* 消费消息
* 参数说明:
* 1.要消费的队列名
* 2.是否自动应答
* 3.消费成功的回调
* 4.取消消费的回调
*/
channel.basicConsume(MqContents.QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
public class Worker2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
//设置IP
factory.setHost("127.0.0.1");
//设置端口
factory.setPort(5672);
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//建立tcp连接
Connection connection = factory.newConnection();
//创建通信信道
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (messageTag, delivery)->{
System.out.println(new String(delivery.getBody(),"utf-8"));
/**
* 手动应答
* 参数说明:
* 1.消息的标记
* 2.是否批量应答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = message -> {
System.out.println(message);
};
System.out.println("工作者2已启动");
//0是公平分发,1是不公平;大于1是预期值,消息堆积在信道的最大数
channel.basicQos(1);
/**
* 消费消息
* 参数说明:
* 1.要消费的队列名
* 2.是否自动应答
* 3.消费成功的回调
* 4.取消消费的回调
*/
channel.basicConsume(MqContents.QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
消息应答
执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
自动应答模式需要在高吞吐量数据传输安全性方面做权衡, 仅适用于消费者高效并以某种速率能处理这些消息的情况下进行。
核心代码:
/**
* 消费消息
* 参数说明:
* 1.要消费的队列名
* 2.是否自动应答
* 3.消费成功的回调
* 4.取消消费的回调
*/
channel.basicConsume(MqContents.QUEUE_NAME, false, deliverCallback, cancelCallback);
其中第三个参数设置为false是手动应答,true是自动应答
发布确认
原因同消息的确认差不多,让生产者可以感知消息是否发送成功,保证不丢失。
代码:
//开启发布确认
channel.confirmSelect();
/**单个发布确认
* 发送消息
* 参数说明:
* 1.交换机名称
* 2.队列名或者routingKey
* 3.其他参数(比如持久化)
* 4.消息体
*/
channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好世界".getBytes("utf-8"));
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("发送完毕");
} else {
System.out.println("发送失败");
}
/**批量发布确认
* 发送消息
* 参数说明:
* 1.交换机名称
* 2.队列名或者routingKey
* 3.其他参数(比如持久化)
* 4.消息体
*/
for(int i = 1; i <= 1000; i++){
channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好世界".getBytes("utf-8"));
//每一百条确认一次
if (i % 100 == 0) {
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("发送完毕");
} else {
System.out.println("发送失败");
}
}
}
ConfirmCallback ackConfirmCallback = (v1, v2) -> {
System.out.println("成功的消息:" + v1);
};
ConfirmCallback nackConfirmCallback = (v1, v2) -> {
System.out.println("失败的消息:" + v1);
};
//准备消息的监听器,参数说明:1.消息成功的监听器;2.消息失败的监听器;
channel.addConfirmListener(ackConfirmCallback, nackConfirmCallback);
String message = "你好世界";
/**异步发布确认
* 发送消息
* 参数说明:
* 1.交换机名称
* 2.队列名或者routingKey
* 3.其他参数(比如持久化)
* 4.消息体
*/
channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
’队列与消息的持久化
问题原因描述:
1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。
这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。
2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。
这种情况可以使用RabbitMQ提供的消息队列的持久化机制。
要注意队列与消息都需持久化,才能确保消息的不丢失。
代码:
/**
* 创建一个队列
* 参数说明:
* 1.队列名称
* 2.是否持久化,true:持久化,false:不持久化,默认false这里是队列持久化,并非消息
* 3.是否消息共享,true:被多个队列消费,false:只被一个队列消费
* 4.是否自动删除
* 5.其他参数
*/
channel.queueDeclare(MqContents.QUEUE_NAME, false, false, false, null);
//生产者发消息时设置消息持久化MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish(MqContents.TOPIC_EXHANGE, cron, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
消息的不公平分发
问题原因描述:
在RabbitMQ中,队列向消费者发送消息,如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积,因为这些消息未收到消费者发送的ack,所以只能暂时存储在缓存区中,等待ack,然后删除对应消息。这样的话,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的
代码:
//0是公平分发,1是不公平;大于1是预期值,消息堆积在信道的最大数
channel.basicQos(1);
死信队列
产生死信的来源大致有如下几种:
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
应用场景:用户下单30分钟没有支付系统取消订单,会议前15分钟发邮件提醒相关人员等等。
延时队列
利用可以利用本身自带的死信队列机制实现,缺陷是,消息在队列里,有先进先出的限制,若前面的消息过期时间没有到,那么这个消息也不会放到死信队列中
mq插件弥补延时队列的缺陷
安装mq的插件,让消息积压在交换机那边,不会有先进先出的限制,等ttl时间到了直接放到死信队列里
安装插件的命令:rabbitmq-plugins enable
rabbitmq_delayed_message_exchange
本文暂时没有评论,来添加一个吧(●'◡'●)