计算机系统应用教程网站

网站首页 > 技术文章 正文

SpringBoot RabbitMQ消息可靠发送与接收

btikc 2024-09-01 15:52:04 技术文章 12 ℃ 0 评论

环境: springboot2.2.11.RELEASE + RabbitMQ3.7.4

RabbitMQ在以下情况会出现消息的丢失:

  • 交换机、队列、消息未持久化,mq重启后会出现消息丢失。
  • 生产者发出的消息第一步是投递到交换机,这一步可能因为网络原因导致失败。
  • 消息正常投递到交换机后,通过路由key路由到队列的时候出现失败。(没有符合的队列)
  • 代码层面,配置层面,考虑不全导致消息丢失。
  • 消费端接收到相关消息之后,消费端还没来得及处理消息,消费端机器就宕机了,此时消息如果处理不当会有丢失风险。

RabbitMQ中有两种方案来实现消息的可靠发送,分别如下:

RabbitMQ的事物机制

RabbitMQ的生产者确认机制

准备环境:分别建立交换机和队列

1、交换机test-exchange 持久化,类型为topic

2、队列test-queue test-queue 持久化。

3、交换机与队列绑定, Routing key = tm.#

方案1:事物机制

pom.xml依赖

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

application.yml配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        concurrency: 5
        maxConcurrency: 10
        prefetch: 5
        retry:
          enabled: true
          initialInterval: 3000
          maxAttempts: 3
        defaultRequeueRejected: fals

这里配置很简单主要就是消息监听并发数配置,重试相关的配置。

RabbitConfig.java 配置RabbitMQ

@Configuration
public class RabbitConfig {
	
	private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class) ;
	
	@Bean
	public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory) ;
		rabbitTemplate.setChannelTransacted(true) ;
		return rabbitTemplate ;
	}
	
	@Bean("rabbitTransactionManager")
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
	
}

这里必须配置rabbit事物Bean,因为我们的发送消息的方法添加注解事物时需要设置事务管理器。

rabbitTemplate.setChannelTransacted(true) ;这行代码必须设置为true开启事物功能。

MessageSend.java 消息发送

@Component
public class MessageSend {
	
	private static Logger logger = LoggerFactory.getLogger(MessageSend.class) ;
	
	@Resource
	private RabbitTemplate rabbitTemplate ;
	
	@Transactional(rollbackFor = {Exception.class}, transactionManager = "rabbitTransactionManager")
	public void send(String msg) {
		logger.info("准备发送消息:{}", msg);
		rabbitTemplate.convertAndSend("test-exchange", "tm.1", msg) ;
		if ("1".equals(msg)) {
			throw new RuntimeException("消息内容不真实") ;
		}
	}
	
}

注意这里要给发送消息的方法添加@Transactional,同时还要设置事物管理器的Bean Name。

MessageController.java 接口发送。

@RestController
@RequestMapping("/messages")
public class MessageController {
	
	@Resource
	private MessageSend ms ;
	
	@GetMapping("/send")
	public Object send(String msg) {
		ms.send(msg) ;
		return "success" ;
	}
	
}

测试:

先发送一条正常的消息

http://localhost:8080/messages/send?msg=123

控制台输出:



rabbitmq接受到了我们的消息。

接着发送消息内容为1的消息,当消息内容为1时,发送程序会抛出异常,看看消息是否能够回滚。

http://localhost:8080/messages/send?msg=1



rabbit并没有收到消息,说明消息回滚了。

接下来把@Transactional注解去掉后看看


rabbit收到了消息,这也说明前面配置的事物是生效的。

到这里基于事物机制的消息可靠传输就完了,接下来看看基于消息确认机制。


方案2:消息确认机制

application.yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: test
    publisherConfirmType: correlated
    publisherReturns: true
    listener:
      simple:
        concurrency: 5
        maxConcurrency: 10
        prefetch: 5
        acknowledgeMode: MANUAL
        retry:
          enabled: true
          initialInterval: 3000
          maxAttempts: 3
        defaultRequeueRejected: false 

publisherConfirmType取值说明:

  • NONE值是禁用发布确认模式,是默认值
  • CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
  • SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

注意:在springboot2.2.0.RELEASE版本之前amqp正式支持的属性是(spring.rabbitmq.publisher-confirm),用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-confirm-type属性配置代替,用来配置更多的确认类型;

publisherReturns: 可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃。简单说就是消息不能正确送达到队列将回调。

RabbitConfig.java 配置

@Configuration
public class RabbitConfig {
	
	private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class) ;
	
	@Bean
	public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory) ;
		// rabbitTemplate.setChannelTransacted(true) ;
		rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause) {
				if (ack) {
					logger.info("消息发送成功, 关联数据的ID:{}", correlationData.getId()) ;
		        } else {
		        	logger.error("消息发送失败, 失败原因:{}", cause) ;
		        }
			}
		}) ;
		rabbitTemplate.setMandatory(true) ;
		rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
			@Override
			public void returnedMessage(Message message, int replyCode, String replyText, 
					String exchange, String routingKey) {
				logger.error("发送出现错误,请检查exchange:{}, routingKey: {}是否配置正确, 消息内容:{}", 
						exchange, 
						routingKey,
						new String(message.getBody(), Charset.forName("UTF-8"))) ;
			}
		}) ;
		return rabbitTemplate ;
	}
	
	@Bean("rabbitTransactionManager")
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
	
} 

rabbitTemplate.setChannelTransacted(true) ; 这个代码一定要注释了或者不要设置或者设置为false;rabbitmq中的事物机制和消息确认机制是互斥的。。。

setConfirmCallback方法:只要消息到达了交换机ack会true(这时候它是不管该交换机是否与队列有绑定),当消息无法到达交换机(如:交换机不存在)ack为false。

setReturnCallback方法:用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调(也就是说到达了交换机,但是没有队列与该交换机绑定)

MessageSend.java 发送程序

@Component
public class MessageSend {
	
	private static Logger logger = LoggerFactory.getLogger(MessageSend.class) ;
	
	@Resource
	private RabbitTemplate rabbitTemplate ;
	
	public void sendConfirm(String msg) {
		logger.info("准备发送消息:{}", msg);
		rabbitTemplate.convertAndSend("test-exchange1", "tm.1", msg) ;
	}
	
}

注意:这里我故意把交换机名字写错加了一个1,就是为了测试用。

MessageController.java 发送程序

@RestController
@RequestMapping("/messages")
public class MessageController {
	
	@Resource
	private MessageSend ms ;
	
	@GetMapping("/send")
	public Object send(String msg) {
		ms.send(msg) ;
		return "success" ;
	}
	
	@GetMapping("/send2")
	public Object send2(String msg) {
		ms.sendConfirm(msg) ;
		return "success" ;
	}
	
}

测试:

上面我把交换机名字已经故意写错了,test-exchange1,实际是:test-exchange。

访问:/messages/send2?msg=1


控制台输出了错误信息,这些信息就是在setConfirmCallback方法中写的,这证明了当消息无法到达交换机时就被触发。

接下来把test-exchange1改正确:把key改错误

public void sendConfirm(String msg) {
		logger.info("准备发送消息:{}", msg);
		rabbitTemplate.convertAndSend("test-exchange", "tm1.1", msg) ;
	}

测试:


当交换机正确的时候setConfirmCallback方法的执行是正确的ack为true。但是同时setReturnCallback方法也被调用了,因为我们把key写错了,消息没法路由到正确的queue上,所以回调了。

到此 两种可靠消息发送就结束了,接下来简单说下接受消息。

首先是配置中我们要把消息的确认设置为手动:

acknowledgeMode: MANUAL

接受消息的方法:

@RabbitListener(queues= {"test-queue"})
	@RabbitHandler
	public void listner(Message message, Channel channel) {
		System.out.println("接受到消息.....income") ;
		byte[] body = message.getBody() ;
		MessageProperties mps = message.getMessageProperties() ;
			String content = new String(body, Charset.forName("UTF-8")) ;
			try {
				System.out.println("接受到消息来自交换机: 【" + mps.getReceivedExchange() + "】, 队列:【" + mps.getConsumerQueue() + "】:\n\t\t内容: " + content) ;
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), true) ;
			} catch (Exception e) {
				e.printStackTrace();
        channel.basicReject(mps.getDeliveryTag(), false) ;
			}
	}

这里通过 basicAck应答消息,basicReject拒绝消息。通过手动确认的机制来确保消息的正常消费。


完毕!!!

大家帮忙转发,给个关注呗,谢谢

SpringBoot开发自己的@Enable功能

Java线上CPU100% 问题排查

Restful API设计规范

SpringBoot开发自己的Starter

SpringCloud Hystrix实现资源隔离应用

SpringMVC参数统一验证方法

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表