计算机系统应用教程网站

网站首页 > 技术文章 正文

每日学习~RabbitMQ消息应答机制 rabbit mq五种消息模型

btikc 2024-10-17 08:46:38 技术文章 7 ℃ 0 评论

概念

(1)RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到;(RabbitMQ默认是自动应答)

(2)消息应答是为了保证消息发送过程中不丢失,消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

自动应答的问题

自动应答应用场景:仅适用在消费者可以高效并能够快速处理这些消息的情况下使用;

存在的问题:存在消息丢失的情况;

  • 1、消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了;
  • 2、生产者那边传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死;

消息应答方法

  • Channel.basicAck(用于肯定确认):RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了;
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

basicAck方法中Multiple的解释

multiple是一个boolean值

  • ture:表示批量应答 channel 上未应答的消息
  • false:只会应答对应tag的消息(这个tag就是一个消息标记,basicAck方法中的第一个入参)

消息自动重新入队

1.消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ会将消息进行重新排队;

2.如果有其他消费者可以处理,则RabbitMQ会进行消息分发,即使某个消费者偶尔死亡,也可以确保消息不丢失;

消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,代码如图所示:

(1)消息生产者

import com.lwt.study.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        }
    }
}

(2)消费者1

public class Work03 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1 等待接收消息处理时间较短");
        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:" + message);
            /**
             * 1.消息标记 tag
             * 2.是否批量应答未应答消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }
}

(3)消费者2

public class Work04 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2 等待接收消息处理时间较长");
        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:" + message);
            /**
             * 1.消息标记 tag
             * 2.是否批量应答未应答消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }
}

注:消费者2和消费者1唯一的区别就是消费者2消费时间比较长;

(4)运行结果

1.生产者发送gg消息,此时由work04工作线程处理,但是处理耗时比较长,此时停掉work04消息,在控制台可发现有unacked未应答数据;

2.发现gg消息被Work03消费掉了,说明未进行ack应答的消息重新进入队列,并分配给可以其他消费者处理

今天的学习就到这里啦,rabbitmq相关的学习都会放到下面的代码里~

代码地址:StudyPromote: 学习项目

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表