计算机系统应用教程网站

网站首页 > 技术文章 正文

RabbitMQ基础(消费者的消息确认模式)

btikc 2024-09-01 15:52:05 技术文章 12 ℃ 0 评论

9.1 YAML配置

spring:
  rabbitmq:
    host: 192.168.133.128
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  # 开启手动ACK消费者端
        prefetch: 2
server:
  port: 8081

9.2 代码实现

9.2.1 队列配置

该案例主要是使用Topic模式进行介绍。下面这段代码主要是定义了交换机的名称、队列名称、路由Key的表达式。将队列和交换机进行了一个绑定通过routing key

/**
 * rabbitmq 配置类
 */
@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "TOPIC_BOOT_EXCHANGE";
    public static final String QUEUE_NAME = "TOPIC_BOOT_QUEUE";
    public static final String ROUTEING_KEY_NAME = "TOPIC_BOOT.*";

    // 1、交换机
    @Bean(name = "bootExChange")
    public Exchange bootExChange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    // 2、队列
    @Bean(name = "bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    // 3、队列和交换机绑定
    /**
     * 需要知道要哪个交换机和哪个队列
     * 需要知道这个交换机和队列是和谁一起绑定
     * 设置routeingKey
     */
    @Bean
    public Binding bootBindingExchangeAndQueue(@Qualifier("bootExChange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTEING_KEY_NAME).noargs();
    }
    }

9.2.2 生产者

import com.dream.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringRabbitProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 测试topic发送
     */
    @Test
    public void sender() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "TOPIC_BOOT.DREAM", "com.dream topic spring rabbitmq ....");
    }
}


9.2.2 消费者

默认情况下消费者是进行自动ACK的,但是这种情况下辖如果业务消息处理发生异常那么就会出现业务Bug导致出现问题,那么我们可以通过手动ACK的方式来确认消费者收到消息,如果发生异常进行手动处理。这里模拟了一个异常,在catch中手动处理了这个异常消息, 调用了channel.basicNack();或者调用channel.basicReject();进行了手动确认ACK,这里我们可以手动的处理异常也可以将消息重新发回给队列对消息进行重新的消费。

/**
 * rabbitmq 消息消费监听
 */
@Component
public class RabbitConsumerListener {
    
    /**
     * 消费者的手动ACK和自动ACK,默认自动的签收
     * 如果需要手动签收就需要进行设置 acknowledge-mode: manual
     * 如果消息成功接受就需要调用 channel.basicAck(); 进行接收
     * 如果消息接受失败就需要调用 channel.basicNack(); 拒绝接收消息,broker重新发送
     */
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    @SneakyThrows
    public void listenerTopicQueueManualACK(Message message, Channel channel) {
        // 获取消息标记
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            // 模拟发生异常将消息打回到消息队列
            int a = 1 / 0;
            System.out.println("listenerTopicQueue ===> " + message);
            System.out.println("listenerTopicQueue ===> " + new String(message.getBody()));

            /**
             * 参数介绍:
             * long deliveryTag  消息标记
             * boolean multiple  是否允许一次处理多条消息
             */
            channel.basicAck(tag, false);
        } catch (Exception ex) {
            ex.printStackTrace();
            /**
             * 参数介绍:
             * long deliveryTag  消息标记
             * boolean multiple  是否允许一次处理多条消息
             * boolean requeue   表示是否消息重新回到消息队列,如果是true,broker会重新发送消息给消费端
             */
            channel.basicNack(tag, false, true);

            // channel.basicReject(tag,true);
            // 上面这个basicReject方法跟basicNack差不多,但是basicReject不允许多个一起处理
        }
    }
}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表