51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

RabbitMQ消息确认机制

# (一)概述 {#一-概述}

rabbitmq在使用过程中会遇到一个问题:生产者将消息发送出去后,消息有没有达到rabbitmq,默认是不知道的。

有两种解决方式:1.AMQP实现事务机制;2.Confirm模式

# (二)事务机制 {#二-事务机制}

事务机制通过三段代码控制事务的执行:

  1. channel.txSelect(); 将当前channel设置成transaction
  2. channel.txCommit(); 提交事务
  3. channel.txRollback(); 事务回滚

如果生产者因为一些错误没有将事务发送出去,那就会触发事务回滚机制,以达到消息确认的目的。

通过简单队列实现事务机制:

生产者

public class Sent {
    private static final String QUEUE_NAME="tx_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String msg="hello";
        try{
            //1.开启事务
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("消息发送成功");
            //2.提交事务
            channel.txCommit();
        }catch (Exception e){
            //3.事务回滚
            channel.txRollback();
            System.out.println("channel rollback");
        }
    }
}

消费者

public class Receive {
    private static final String QUEUE_NAME="tx_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("receive:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

但是每个消息都创建一个事务很耗时,并且降低rabbit的吞吐量。

# (三)Confirm模式 {#三-confirm模式}

生产者将channel设置成confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。

Confirm模式有同步和异步两种

# 3.1 同步 {#_3-1-同步}

channel.confirmSelect(); 开启Confirm模式
单条同步:队列收到消息后会返回 waitForConfirms()
if(channel.waitForConfirms()==true)发送成功 else 发送失败
批量同步:发送批量数据
channel.waitForConfirmsOrDie():有一条发送失败触发IOException

同步是指一条数据发送出去后直到收到回复,下面一条数据才能发送。

当然也可以一批数据一起发送,直到回到回复,后面一批数据才能发送。

通过代码模拟,还是用简单队列.

单条同步生产者:

public class Sent {
    private static final String QUEUE_NAME="confirm_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String msg="hello";
        //开启confirm
        channel.confirmSelect();
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        //判断是否收到waitForConfirms
        if (!channel.waitForConfirms()){
            System.out.println("发送失败");
        }else {
            System.out.println("发送成功");
        }
    }
}

消费者的代码和之前一致就不贴了。

批量同步生产者

public class Sent1 {
    private static final String QUEUE_NAME="confirm_queue1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.confirmSelect();
        for (int i=0;i<10;i++){
            String msg="i:"+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        //只要有一个消息未确认就会IOException
        channel.waitForConfirmsOrDie();
        System.out.println("全部发送成功");
    }
}

# 3.2 异步 {#_3-2-异步}

异步是指发送数据之后,不用等待返回消息,而是由异步监听。每条消息发送时都会有一个deliveryTag,由异步监听来确认是否送达

channel.confirmSelect(); 开启Confirm模式
channel.addConfirmListener()异步监听发送方确认模式;

通过代码实践:

public class Sent2 {
    private static final String QUEUE_NAME="confirm_queue2";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //开启confirm模式
        channel.confirmSelect();
        for (int i=0;i<10;i++){
            String msg="i:"+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        //异步监听确认和未确认的消息
       channel.addConfirmListener(new ConfirmListener() {
           //确认的消息
           public void handleAck(long deliveryTag, boolean multiple) throws IOException {
               System.out.println("确认消息Tag:"+deliveryTag+"是否批量确认:"+multiple);
           }
           //未确认的消息
           public void handleNack(long deliveryTag, boolean multiple) throws IOException {
               System.out.println("未确认消息Tag:"+deliveryTag);
           }
       });
    }
}

multiple的意思是是否对消息进行了批量确认,上面一段代码的运行结果:

赞(7)
未经允许不得转载:工具盒子 » RabbitMQ消息确认机制