技术:SpringBoot(2.2.6.RELEASE)+RabbitMQ
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: x.x.x.x
port: 5672
username: admin
password: admin
publisherConfirmType: correlated #确认消息与之关联一起
publisherReturns: true
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
acknowledgeMode: MANUAL #手动应答
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: false
RabbitTemplate配置
@Configuration
public class RabbitmqConfig {
private Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class) ;
@Bean
public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory) ;
// 只要消息到达了交换机ack会true(这时候它是不管该交换机是否与队列有绑定),当消息无法到达交换机(如:交换机不存在)ack为false
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息发送成功.") ;
} else {
logger.info("消息发送失败, 失败原因:" + cause) ;
}
}
}) ;
// ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调(也就是说到达了交换机,但是没有队列与该交换机绑定)
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
logger.info(new String(message.getBody()) + "\n" + replyCode + "\n" + replyText +
"\n" + exchange + "\n" + routingKey) ;
}
});
rabbitTemplate.setMandatory(true) ;
return rabbitTemplate ;
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)