计算机系统应用教程网站

网站首页 > 技术文章 正文

rabbitmq的学习笔记

btikc 2025-02-07 16:40:34 技术文章 12 ℃ 0 评论

在win10环境下安装RabbitMQ的步骤

第一步:下载并安装erlang

  • 原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。
  • 下载地址:http://www.erlang.org/downloads
  • 下载后双击安装完之后陪erlang的环境变量w
  • 最后在cmd窗口中输入erl,若出现下面图片所示情况说明erlang安装成功

第二步:下载并安装RabbitMQ

  • 下载地址:http://www.rabbitmq.com/download.html
  • 双击下载后的.exe文件,安装过程与erlang的安装过程相同。
  • RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQ的sbin目录。
  • 然后在后面输入rabbitmq-plugins enable rabbitmq_management命令进行安装
  • 最后打开sbin目录,双击rabbitmq-server.bat运行rabbitmq
  • 访问管理台的监听端口15672就可以看到控制台页面了


简单队列消息代码示例

public class Producter {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置IP
        factory.setHost("127.0.0.1");
        //设置端口
        factory.setPort(5672);
        //用户名
        factory.setUsername("guest");
        //密码
        factory.setPassword("guest");

        //建立tcp连接
        Connection connection = factory.newConnection();

        //创建通信信道
        Channel channel = connection.createChannel();

        /**
         * 创建一个队列
         * 参数说明:
         * 1.队列名称
         * 2.是否持久化,true:持久化,false:不持久化,默认false这里是队列持久化,并非消息
         * 3.是否消息共享,true:被多个队列消费,false:只被一个队列消费
         * 4.是否自动删除
         * 5.其他参数
         */
        channel.queueDeclare(MqContents.QUEUE_NAME, false, false, false, null);

        /**
         * 发送消息
         * 参数说明:
         * 1.交换机名称
         * 2.队列名或者routingKey
         * 3.其他参数(比如持久化)
         * 4.消息体
         */
        channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好世界".getBytes("utf-8"));
        System.out.println("发送完毕");
    }
}
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置IP
        factory.setHost("127.0.0.1");
        //设置端口
        factory.setPort(5672);
        //用户名
        factory.setUsername("guest");
        //密码
        factory.setPassword("guest");

        //建立tcp连接
        Connection connection = factory.newConnection();

        //创建通信信道
        Channel channel = connection.createChannel();


        DeliverCallback deliverCallback =  (messageTag, delivery)->{
            System.out.println(new String(delivery.getBody(),"utf-8"));
        };
        CancelCallback cancelCallback = message -> {
            System.out.println(message);
        };

        /**
         * 消费消息
         * 参数说明:
         * 1.要消费的队列名
         * 2.是否自动应答
         * 3.消费成功的回调
         * 4.取消消费的回调
         */
        channel.basicConsume(MqContents.QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,再来看看工作队列。

工作队列模式

工作队列:用来将耗时的任务分发给多个消费者(工作者)

主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

工作队列也称为公平性队列模式:

循环分发,假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询

代码示例:

public class Producter {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置IP
        factory.setHost("127.0.0.1");
        //设置端口
        factory.setPort(5672);
        //用户名
        factory.setUsername("guest");
        //密码
        factory.setPassword("guest");

        //建立tcp连接
        Connection connection = factory.newConnection();

        //创建通信信道
        Channel channel = connection.createChannel();

        /**
         * 创建一个队列
         * 参数说明:
         * 1.队列名称
         * 2.是否持久化,true:持久化,false:不持久化,默认false这里是队列持久化,并非消息
         * 3.是否消息共享,true:被多个队列消费,false:只被一个队列消费
         * 4.是否自动删除
         * 5.其他参数
         */
        channel.queueDeclare(MqContents.QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String str = scanner.next();
            /**
             * 发送消息
             * 参数说明:
             * 1.交换机名称
             * 2.队列名或者routingKey
             * 3.其他参数(比如持久化)
             * 4.消息体
             */
            channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes("utf-8"));
        }
}
}

public class Worker1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置IP
        factory.setHost("127.0.0.1");
        //设置端口
        factory.setPort(5672);
        //用户名
        factory.setUsername("guest");
        //密码
        factory.setPassword("guest");

        //建立tcp连接
        Connection connection = factory.newConnection();

        //创建通信信道
        Channel channel = connection.createChannel();


        DeliverCallback deliverCallback =  (messageTag, delivery)->{
            System.out.println(new String(delivery.getBody(),"utf-8"));
            /**
             * 手动应答
             * 参数说明:
             * 1.消息的标记
             * 2.是否批量应答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = message -> {
            System.out.println(message);
        };
        System.out.println("工作者1已启动");

        //0是公平分发,1是不公平;大于1是预期值,消息堆积在信道的最大数
        channel.basicQos(1);

        /**
         * 消费消息
         * 参数说明:
         * 1.要消费的队列名
         * 2.是否自动应答
         * 3.消费成功的回调
         * 4.取消消费的回调
         */
        channel.basicConsume(MqContents.QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}

public class Worker2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置IP
        factory.setHost("127.0.0.1");
        //设置端口
        factory.setPort(5672);
        //用户名
        factory.setUsername("guest");
        //密码
        factory.setPassword("guest");

        //建立tcp连接
        Connection connection = factory.newConnection();

        //创建通信信道
        Channel channel = connection.createChannel();


        DeliverCallback deliverCallback =  (messageTag, delivery)->{
            System.out.println(new String(delivery.getBody(),"utf-8"));
            /**
             * 手动应答
             * 参数说明:
             * 1.消息的标记
             * 2.是否批量应答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = message -> {
            System.out.println(message);
        };
        System.out.println("工作者2已启动");

        //0是公平分发,1是不公平;大于1是预期值,消息堆积在信道的最大数
        channel.basicQos(1);

        /**
         * 消费消息
         * 参数说明:
         * 1.要消费的队列名
         * 2.是否自动应答
         * 3.消费成功的回调
         * 4.取消消费的回调
         */
        channel.basicConsume(MqContents.QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}

消息应答

执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。

但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。

没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

自动应答模式需要在高吞吐量数据传输安全性方面做权衡, 仅适用于消费者高效并以某种速率能处理这些消息的情况下进行。

核心代码:

/**
 * 消费消息
 * 参数说明:
 * 1.要消费的队列名
 * 2.是否自动应答
 * 3.消费成功的回调
 * 4.取消消费的回调
 */
channel.basicConsume(MqContents.QUEUE_NAME, false, deliverCallback, cancelCallback);
其中第三个参数设置为false是手动应答,true是自动应答

发布确认

原因同消息的确认差不多,让生产者可以感知消息是否发送成功,保证不丢失。

代码:

//开启发布确认
channel.confirmSelect();

/**单个发布确认
 * 发送消息
 * 参数说明:
 * 1.交换机名称
 * 2.队列名或者routingKey
 * 3.其他参数(比如持久化)
 * 4.消息体
 */
channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好世界".getBytes("utf-8"));
boolean flag = channel.waitForConfirms();
if (flag) {
    System.out.println("发送完毕");
} else {
    System.out.println("发送失败");
}


/**批量发布确认
 * 发送消息
 * 参数说明:
 * 1.交换机名称
 * 2.队列名或者routingKey
 * 3.其他参数(比如持久化)
 * 4.消息体
 */
for(int i = 1; i <= 1000; i++){
    channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好世界".getBytes("utf-8"));
    //每一百条确认一次
    if (i % 100 == 0) {
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("发送完毕");
        } else {
            System.out.println("发送失败");
        }
    }
}




ConfirmCallback ackConfirmCallback = (v1, v2) -> {
    System.out.println("成功的消息:" + v1);
};
ConfirmCallback nackConfirmCallback = (v1, v2) -> {
    System.out.println("失败的消息:" + v1);
};

//准备消息的监听器,参数说明:1.消息成功的监听器;2.消息失败的监听器;
channel.addConfirmListener(ackConfirmCallback, nackConfirmCallback);

String message = "你好世界";

/**异步发布确认
 * 发送消息
 * 参数说明:
 * 1.交换机名称
 * 2.队列名或者routingKey
 * 3.其他参数(比如持久化)
 * 4.消息体
 */
channel.basicPublish("", MqContents.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));

’队列与消息的持久化

问题原因描述:

1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

这种情况可以使用RabbitMQ提供的消息队列的持久化机制。

要注意队列与消息都需持久化,才能确保消息的不丢失。

代码:

/**
 * 创建一个队列
 * 参数说明:
 * 1.队列名称
 * 2.是否持久化,true:持久化,false:不持久化,默认false这里是队列持久化,并非消息
 * 3.是否消息共享,true:被多个队列消费,false:只被一个队列消费
 * 4.是否自动删除
 * 5.其他参数
 */
channel.queueDeclare(MqContents.QUEUE_NAME, false, false, false, null);

//生产者发消息时设置消息持久化MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish(MqContents.TOPIC_EXHANGE, cron, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));

消息的不公平分发

问题原因描述:

在RabbitMQ中,队列向消费者发送消息,如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积,因为这些消息未收到消费者发送的ack,所以只能暂时存储在缓存区中,等待ack,然后删除对应消息。这样的话,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的

代码:

//0是公平分发,1是不公平;大于1是预期值,消息堆积在信道的最大数
channel.basicQos(1);

死信队列

产生死信的来源大致有如下几种:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

应用场景:用户下单30分钟没有支付系统取消订单,会议前15分钟发邮件提醒相关人员等等。

延时队列

利用可以利用本身自带的死信队列机制实现,缺陷是,消息在队列里,有先进先出的限制,若前面的消息过期时间没有到,那么这个消息也不会放到死信队列中

mq插件弥补延时队列的缺陷

安装mq的插件,让消息积压在交换机那边,不会有先进先出的限制,等ttl时间到了直接放到死信队列里

安装插件的命令:rabbitmq-plugins enable
rabbitmq_delayed_message_exchange

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表