计算机系统应用教程网站

网站首页 > 技术文章 正文

RabbitMQ-消息确认机制(事务+confirm)

btikc 2024-09-01 15:51:39 技术文章 12 ℃ 0 评论

1.RabbitMQ-消息确认机制(事务+confirm)

在rabbitMQ中我们可通过持久化数据,解决rabbitMQ因为服务器异常造成的数据丢失

问题:生产者将消息发送出去之后,消息是否到底rabbitMQ服务器?

默认情况下不可得知的

有两种方式可以获得状态

AMQP协议:AMQP实现了事务机制

Confirm模式

2.事务机制

TxSelect TxCommit TxRollBack

TxSelect : 用于将当前channel设置成transation模式

TxCommit : 用于提交事务

TxRollBack : 用于回滚事务

生产者

package com.ithzk.rabbitmq.transation;

import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import javax.sound.midi.Soundbank;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @author hzk

* @date 2018/3/10

*/

public class TxSend {

private final static String QUEUE_NAME = "test_queue_tx";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = RabbitMQConnectionUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

String msg = "hello tx";

try {

channel.txSelect();

channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

int ex = 1/0;

channel.txCommit();

} catch (Exception e) {

e.printStackTrace();

channel.txRollback();

System.out.println("Send msg rollback");

}

System.out.println("Send msg"+msg);

channel.close();

connection.close();

}

}

消费者

package com.ithzk.rabbitmq.transation;

import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @author hzk

* @date 2018/3/10

*/

public class TxRecv1 {

private final static String QUEUE_NAME = "test_queue_tx";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接

Connection connection = RabbitMQConnectionUtils.getConnection();

//从连接中获取频道

final Channel channel = connection.createChannel();

//声明队列

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//保证一次只发一个

channel.basicQos(1);

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body, "utf-8");

System.out.println("[1] Recv tx msg:" + msg);

}

});

System.out.println("[Consumer 1 start]");

}

}

3.Confirm模式

3.1 生产者端Confirm模式的实现原理

生产者将信道设置成Confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,标识到这个序列号之前的所有消息都已经得到了处理

Confirm模式最大的好处是在于是异步处理

Nack(Negative ACKnowledgment,否定回答)

开启confrim模式

channel.confirmSelect()

编程模式:

1.普通 发送一条调用waitForConfirms()

2.批量 发送一批调用waitForConfirms()

3.异步confirm模式:提供一个回调

普通单条发送

生产者

package com.ithzk.rabbitmq.confirm;

import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 普通模式

* @author hzk

* @date 2018/3/10

*/

public class ConfrimSend {

private final static String QUEUE_NAME = "test_queue_confrim_one";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

Connection connection = RabbitMQConnectionUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//生产者调用confirmSelect() 将channel设为confirm模式

channel.confirmSelect();

String msg = "hello confirm one";

channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

if (!channel.waitForConfirms()){

System.out.println("message send fail!");

}else{

System.out.println("Send msg"+msg);

}

channel.close();

connection.close();

}

}

消费者

package com.ithzk.rabbitmq.confirm;

import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @author hzk

* @date 2018/3/10

*/

public class ConfirmRecv1 {

private final static String QUEUE_NAME = "test_queue_confrim_one";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接

Connection connection = RabbitMQConnectionUtils.getConnection();

//从连接中获取频道

final Channel channel = connection.createChannel();

//声明队列

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body, "utf-8");

System.out.println("[1] Recv tx msg:" + msg);

}

});

System.out.println("[Consumer 1 start]");

}

}

多条批量

发完之后再确认

生产者

package com.ithzk.rabbitmq.confirm;

import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 普通模式

* @author hzk

* @date 2018/3/10

*/

public class ConfrimSend2 {

private final static String QUEUE_NAME = "test_queue_confrim_one";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

Connection connection = RabbitMQConnectionUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//生产者调用confirmSelect() 将channel设为confirm模式

channel.confirmSelect();

String msg = "hello confirm batch";

//批量发送

for (int i = 0 ;i < 50 ; i++){

channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

}

//确认

if (!channel.waitForConfirms()){

System.out.println("message send fail!");

}else{

System.out.println("Send msg"+msg);

}

channel.close();

connection.close();

}

}

消费者

package com.ithzk.rabbitmq.confirm;

import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @author hzk

* @date 2018/3/10

*/

public class ConfirmRecv1 {

private final static String QUEUE_NAME = "test_queue_confrim_one";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接

Connection connection = RabbitMQConnectionUtils.getConnection();

//从连接中获取频道

final Channel channel = connection.createChannel();

//声明队列

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body, "utf-8");

System.out.println("[1] Recv tx msg:" + msg);

}

});

System.out.println("[Consumer 1 start]");

}

}

异步Confrim模式

Channel对象提供的ConfrimListener()回调方法只包含deliveryTag(当前Channel发送的消息序号),我们需要子集为每一个Channel维护一个unconfirm的消息序列号集合,每publish一条数据,集合中的元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构

生产者

package com.ithzk.rabbitmq.confirm;

import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.ConfirmListener;

import com.rabbitmq.client.Connection;

import java.io.IOException;

import java.util.Collections;

import java.util.SortedMap;

import java.util.SortedSet;

import java.util.TreeSet;

import java.util.concurrent.TimeoutException;

/**

* 普通模式

* @author hzk

* @date 2018/3/10

*/

public class ConfrimSend3 {

private final static String QUEUE_NAME = "test_queue_confrim_one";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

Connection connection = RabbitMQConnectionUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//生产者调用confirmSelect() 将channel设为confirm模式

channel.confirmSelect();

//存放未确认的消息

final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

//通道添加监听

channel.addConfirmListener(new ConfirmListener() {

//没有问题的handleAck

@Override

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

if(multiple){

System.out.println("handleAck multiple");

confirmSet.headSet(deliveryTag+1).clear();

}else{

System.out.println("handleAck multiple false");

confirmSet.remove(deliveryTag);

}

}

//handleNack 3s 10s xxx.. 重试

@Override

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

if(multiple){

System.out.println("handleNack multiple");

confirmSet.headSet(deliveryTag+1).clear();

}else{

System.out.println("handleNack multiple false");

confirmSet.remove(deliveryTag);

}

}

});

String msg = "hello confirm async";

while (true){

long seqNo = channel.getNextPublishSeqNo();

channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

confirmSet.add(seqNo);

}

}

}

消费者

package com.ithzk.rabbitmq.confirm;

import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @author hzk

* @date 2018/3/10

*/

public class ConfirmRecv1 {

private final static String QUEUE_NAME = "test_queue_confrim_one";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接

Connection connection = RabbitMQConnectionUtils.getConnection();

//从连接中获取频道

final Channel channel = connection.createChannel();

//声明队列

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body, "utf-8");

System.out.println("[1] Recv tx msg:" + msg);

}

});

System.out.println("[Consumer 1 start]");

}

}

ps:tx模式效率比较差 建议使用后两种

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表