计算机系统应用教程网站

网站首页 > 技术文章 正文

Springboot+RabbitMQ+ACK机制((全局、局部)、消费方确认)

btikc 2024-09-04 03:12:08 技术文章 11 ℃ 0 评论

消息确认机制

为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了 消息确认机制 (Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制,只要将自动ack设置为false,消费者有足够的时间处理消息数据,消费者可以在处理完后续的业务逻辑后再进行提交ack,确保消息确实是被消费了,防止服务宕机可能导致的消息丢失。而MQ会一直等待消费者手动提交ack!!

在rabbitmq管理页面上可以详细看到队列中的消息情况:

  • ready: 队列中存在的消息,可以提供给消费者的消息数量
  • Unacked: 表示是发送给消费者了,但是消费者还未将ack反馈给MQ的消息数量(一般只有设置了手动ack时,当消费者获取到消息时才会有值)

自动ACK: 消费者 配置中如果是自动ack机制, MQ将消息发送给消费者后直接就将消息给删除了 ,这个的前提条件是消费者程序没有出现异常,如果消费者接收消息后处理时出现异常,那么MQ将会尝试重发消息给消费者直至达到了 消费者服务 中配置的最大重试次数后将会直接抛出异常不再重试。

手动ACK: 消费者 设置了手动ACK机制后,可以显式的提交/拒绝消息(这一步骤叫做发送ACK),如果消息被消费后正常被提交了ack,那么此消息可以说是流程走完了,然后MQ将此消息从队列中删除。而如果消息被消费后被拒绝了,消费者可选择让MQ重发此消息或者让MQ直接移除此消息。后面可以使用死信队列来进行接收这些被消费者拒绝的消息,再进行后续的业务处理。

RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。

其中发送方确认又分为:生产者到交换机到确认、交换机到队列的确认。(借用下大佬的图)

发送方确认

  • ConfirmCallback()方法,是一个回调方法,生产者将消息发送给Broker(RabbitMQ服务),然后Broker给回调生产者的ConfirmCallback()方法告知生产者消息是否接收到。 也就是确认消息是否正常到达 Exchange 中。
  • # 我们需要在生产者中添加配置,表示开启发布者确认(注意新旧版本) spring.rabbitmq.publisher-confirm-type=correlated # 新版本 spring.rabbitmq.publisher-confirms=true # 老版本
  • ReturnCallback()方法同样是一个回调方法,是交换机和队列之间的消息确认方式。启动消息失败返回,此方法是在交换器路由不到队列时触发回调,这个可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了
  • # 在生产者中配置,表示发布者返回 spring.rabbitmq.publisher-returns=true

使用

application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    # 消息确认(ACK)
    publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
    publisher-returns: true #确认消息已发送到队列(Queue)

理解:springboot中需要给RabbitTemplate设置一些方法的回调即可。

通常情况下我们可以直接在配置类中设置好这些东西,但是可能由于某些业务需求,并不是所有的消息都使用常用的方式,也可以将我们的消息发送服务实现接口然后重写这些回调。

配置类方式(全局方式):

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
 
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    rabbitTemplate.setMandatory(true);

    //确认消息送到交换机(Exchange)回调
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
 
        System.out.println("\n确认消息送到交换机(Exchange)结果:");
        System.out.println("相关数据:" + correlationData);
        System.out.println("是否成功:" + ack);
        System.out.println("错误原因:" + cause);
    });

    //确认消息送到队列(Queue)回调
    rabbitTemplate.setReturnsCallback(returnedMessage -> {
 
        System.out.println("\n确认消息送到队列(Queue)结果:");
        System.out.println("发生消息:" + returnedMessage.getMessage());
        System.out.println("回应码:" + returnedMessage.getReplyCode());
        System.out.println("回应信息:" + returnedMessage.getReplyText());
        System.out.println("交换机:" + returnedMessage.getExchange());
        System.out.println("路由键:" + returnedMessage.getRoutingKey());
    });
    return rabbitTemplate;
}

以接口的形式访问发送一下。注意:确认消息送到队列(Queue)回调,只有在出现错误时才回调。

发送服务类实现(局部方式)

将发送的服务类实现接口,实现回调

@Service
public class SendMessageService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
 
    private static Logger logger = LoggerFactory.getLogger(SendMessageService.class);

    @Autowired
    public RabbitTemplate rabbitTemplate;

    public void sendMessage(String str){
 
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setConfirmCallback(this);
        // CorrelationData构造函数中的id可以随便写,但是必须要非null而且是唯一的
        rabbitTemplate.convertAndSend("exchange","routingKey", str,new CorrelationData(UUID.randomUUID().toString()));
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
 
        System.out.println("sender return success" + returnedMessage.toString());
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
 
        logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");

        if (!b) {
 
            logger.error("消息发送异常!");
            // 进行处理
        } else {
 
            logger.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s);
        }
    }
}

接口方式访问下。没问题

需要注意的是:配置类方式和局部方式只能选择其一,如果一个RabbitTemplate设置了两个或者多个ConfirmCallback/ReturnCallback,会报错的不支持。类似这样的报错:Only one ConfirmCallback/ReturnCallback is supported by each RabbitTemplate。在开发过程中需要注意!!

接收方确认

消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理,比如重新发送或者丢弃。

RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。

消息确认模式有:

  1. AcknowledgeMode.NONE:自动确认。
  2. AcknowledgeMode.AUTO:根据情况确认。
  3. AcknowledgeMode.MANUAL:手动确认。

消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。

  • Basic.Ack 命令:用于确认当前消息。
  • Basic.Nack 命令:用于否定当前消息(批量拒绝) 。
  • Basic.Reject 命令:用于拒绝当前消息(单量拒绝)。

配置,注意是simple模式的ack还是direct模式,或者两个都设置上

server:
  port: 9000

spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
      direct:
        acknowledge-mode: manual
    # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
    addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673

消费者接收数据

其中在调用basiAck或basicNack时必须要携带一个tag,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。而在接收者方法上使用 @Header(AmqpHeaders.DELIVERY_TAG) 可以直接获取到这个tag。

@Component
public class MQConsumer {
 
    
    @Autowired
    private DispatcherService dispatcherService;
    
    @RabbitListener(queues = "order.queue")
    public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
 
        try {
 
            System.out.println("消息:" + orderMsg);
            JSONObject order = JSONObject.parseObject(orderMsg);
            String orderId = order.getString("orderId");
            // 派单处理
            dispatcherService.dispatch(orderId);

            System.out.println(1 / 0); // 出现异常
            // 手动确认
            channel.basicAck(tag, false);
        } catch (Exception e) {
 
            // 如果出现异常的情况下 根据实际情况重发
            // 重发一次后,丢失
            // 参数1:消息的tag
            // 参数2:多条处理
            // 参数3:重发
                // false 不会重发,会把消息打入到死信队列
                // true 重发,建议不使用try/catch 否则会死循环
            
            // 手动拒绝消息
            channel.basicNack(tag, false, false);
        }
    }
    
}

消息的接收者也可使用普通类实现 ChannelAwareMessageListener 接口,重写方法完成,这种是直接全局性接收的。没有最好的,只有最合适的,根据项目情况选择全局接收还是单个类接收自己监听的。

/**
 * 接收者
 *
 **/
@Component
public class Consumer implements ChannelAwareMessageListener
{
 
    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
 
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
 
            if ("queue_name".equals(message.getMessageProperties().getConsumerQueue()))
            {
 
                System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
                System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
                System.out.println("执行queue_name中的消息的业务处理流程......");
            }
 
            if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue()))
            {
 
                System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
                System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
                System.out.println("执行fanout.A中的消息的业务处理流程......");
            }
 			// 手动提交ack,并且批量确认消息
            channel.basicAck(deliveryTag, true);
        }
        catch (Exception e)
        {
 
            e.printStackTrace();
 
            /**
             * 拒绝消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            channel.basicReject(deliveryTag, true);
        }
    }
}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表