# (一)概述 {#一-概述}
rabbitmq在使用过程中会遇到一个问题:生产者将消息发送出去后,消息有没有达到rabbitmq,默认是不知道的。
有两种解决方式:1.AMQP实现事务机制;2.Confirm模式
# (二)事务机制 {#二-事务机制}
事务机制通过三段代码控制事务的执行:
- channel.txSelect(); 将当前channel设置成transaction
- channel.txCommit(); 提交事务
- channel.txRollback(); 事务回滚
如果生产者因为一些错误没有将事务发送出去,那就会触发事务回滚机制,以达到消息确认的目的。
通过简单队列实现事务机制:
生产者
public class Sent {
private static final String QUEUE_NAME="tx_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="hello";
try{
//1.开启事务
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("消息发送成功");
//2.提交事务
channel.txCommit();
}catch (Exception e){
//3.事务回滚
channel.txRollback();
System.out.println("channel rollback");
}
}
}
消费者
public class Receive {
private static final String QUEUE_NAME="tx_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("receive:" + msg);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
但是每个消息都创建一个事务很耗时,并且降低rabbit的吞吐量。
# (三)Confirm模式 {#三-confirm模式}
生产者将channel设置成confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。
Confirm模式有同步和异步两种
# 3.1 同步 {#_3-1-同步}
channel.confirmSelect(); 开启Confirm模式
单条同步:队列收到消息后会返回 waitForConfirms()
if(channel.waitForConfirms()==true)发送成功 else 发送失败
批量同步:发送批量数据
channel.waitForConfirmsOrDie():有一条发送失败触发IOException
同步是指一条数据发送出去后直到收到回复,下面一条数据才能发送。
当然也可以一批数据一起发送,直到回到回复,后面一批数据才能发送。
通过代码模拟,还是用简单队列.
单条同步生产者:
public class Sent {
private static final String QUEUE_NAME="confirm_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="hello";
//开启confirm
channel.confirmSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
//判断是否收到waitForConfirms
if (!channel.waitForConfirms()){
System.out.println("发送失败");
}else {
System.out.println("发送成功");
}
}
}
消费者的代码和之前一致就不贴了。
批量同步生产者
public class Sent1 {
private static final String QUEUE_NAME="confirm_queue1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
for (int i=0;i<10;i++){
String msg="i:"+i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
//只要有一个消息未确认就会IOException
channel.waitForConfirmsOrDie();
System.out.println("全部发送成功");
}
}
# 3.2 异步 {#_3-2-异步}
异步是指发送数据之后,不用等待返回消息,而是由异步监听。每条消息发送时都会有一个deliveryTag,由异步监听来确认是否送达
channel.confirmSelect(); 开启Confirm模式
channel.addConfirmListener()异步监听发送方确认模式;
通过代码实践:
public class Sent2 {
private static final String QUEUE_NAME="confirm_queue2";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//开启confirm模式
channel.confirmSelect();
for (int i=0;i<10;i++){
String msg="i:"+i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
//确认的消息
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("确认消息Tag:"+deliveryTag+"是否批量确认:"+multiple);
}
//未确认的消息
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息Tag:"+deliveryTag);
}
});
}
}
multiple的意思是是否对消息进行了批量确认,上面一段代码的运行结果: