注意:
消息的确认机制跟消息的处理机制要先分清楚,消息的确认机制是生产者发投递消息后,broker会回调生产者的ComfirmListener,来告知生产者发送的消息有没有成功到达borker。
消息的处理机制是生产者投递的消息已经成功到达broker,但是会出现一下两种情况:
情况一: broker中根本没有对应的exchange交换机来接受该消息
情况二:broker上有对应的exchange,但是找不到对应的routingkey,路由不到指定的队列中
针对以上的这两种情况而产生了消息处理机制来解决上面出现的问题(消息不可达问题)
处理方法:
第一步:生产端定义一个ReturnListener来处理不可达的消息
第二步:生产端的mandatory设置为true,那么如果产生不可达的消息,broker就会调用生产端的ReturnListener进行处理,如果mandatory设置为false,broker就会自动删除消息
Return Listener消息处理机制流程图:
代码演示
生产者:
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(1234);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//设置return listernr
channel.addReturnListener(new AngleReturnListener(){
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws Exception {
System.out.println("记录不可达消息........................");
System.out.println("replaycode="+replyCode);
System.out.println("replyText="+replyText);
System.out.println("exchange="+exchange);
System.out.println("routingKey="+routingKey);
System.out.println("properties="+properties);
System.out.println("body="+new String(body));
});
//可达消息
channel.basicPublish("order.return.exchange","order.return.key",false,null,"return listener test".getBytes());
//不可达消息,调用return listener
/**
@param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param props other properties for the message - routing headers etc
* @param body the message body
*/
channel.basicPublish("order.return.exchange","order.return.key1",true,null,"return listener test2".getBytes());
//不可达消息 mq-broker自动删除模
channel.basicPublish("order.return.exchange","order.return.key2",false,null,"return listener test3".getBytes());}
消费者:
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(1234);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 定义交换机
channel.exchangeDeclare("order.return.exchange","direct",true,true,false,null);
// 定义队列
channel.queueDeclare("order.return.queue",true,false,true,null);
// 定义绑定关系
channel.queueBind("order.return.queue","order.return.exchange","order.return.key");
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 监听队列进行消费
channel.basicConsume("order.return.queue",true,queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println(new String(delivery.getBody()));
}
}
上一篇:消息确认机制ComfirmListener
本文暂时没有评论,来添加一个吧(●'◡'●)