计算机系统应用教程网站

网站首页 > 技术文章 正文

RabbitMQ 持久化和权重分配消息 rabbitmq的持久化和确认机制

btikc 2024-10-02 12:21:05 技术文章 13 ℃ 0 评论

1、RabbitMQ 持久化简介

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。

2、队列如何实现持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果
要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化

但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新
创建一个持久化的队列,不然就会出现错误

以下为控制台中 非持久化持久化 队列的 UI 显示区:

这个时候即使重启 rabbitmq 队列也依然存在

3、消息实现持久化

要想让消息实现非持久化需要在消息生产者修改代码

message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);SpringBoot默认是持久化,如果想设置非持久化,可以修改参数。

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

4、默认权重分配

在最开始的时候我们用到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以在消费者中设置参数


意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

5、自定义权重分配

根据不同服务器的大小来自定义分配权重来减少服务器之间的压力

5.1、修改权重相关代码

package com.littyxin.message;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author :littyxin
 * @date :Created in 2023/12/30 16:24
 * @description:
 */
@Component
public class RabbitConfig {

    @Autowired
    CachingConnectionFactory cachingConnectionFactory;


    @Bean(name="limitContainerFactory")
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setPrefetchCount(3);  // 每次最多拿3个,等这三个处理完之后,再去队列中拿第二批,起到了限流的作用
        return factory;
    }

    @Autowired
    private WorkConsumer listener1;

    @Bean("limitContainerFactory2")
    public SimpleMessageListenerContainer container1() throws IOException, TimeoutException {
        SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();
        factory.setQueueNames("work"); // The queue name for this consumer.
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setMessageListener(new MessageListenerAdapter(listener1, "receive1")); // Use a MessageListenerAdapter to delegate the message handling to a method in MyMessageListener1.
        factory.setPrefetchCount(2); // Set the Prefetch Count for this consumer to 50.
        return factory;
    }

    @Bean("limitContainerFactory3")
    public SimpleMessageListenerContainer rabbitListenerContainerFactory2() {
        SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();
        factory.setQueueNames("work"); // The queue name for this consumer.
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setMessageListener(new MessageListenerAdapter(listener1, "receive2")); // Use a MessageListenerAdapter to delegate the message handling to a method in MyMessageListener1.
        factory.setPrefetchCount(1); // 设置第二个消费者的Prefetch Count为100
        return factory;
    }

}

5.2、运行

结果就可以看出管用了

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表