51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

工作队列详解

# (一)RabbitMQ工作队列模型结构 {#一-rabbitmq工作队列模型结构}

工作队列的模型相比简单队列增加了消费者的数量。

生产者提供消息到消息队列中,消费者可以去获取队列中的消息。在工作队列中默认采用轮询分发的方式将消息分发给消费者。所谓轮询分发,就是指不管消费者处理消息的速度是快是慢,都按照顺序轮流把消息发给消费者。

# (二)工作队列实践(轮询分发) {#二-工作队列实践-轮询分发}

使用工作队列的代码和简单队列基本一致,只不过多了几个消费者

# 2.1 创建工具类 {#_2-1-创建工具类}

创建工具类的代码在前一篇博客中已经讲到了,这里直接贴上代码:

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置AMQP端口
        factory.setPort(5672);
        //设置VHOSTS
        factory.setVirtualHost("/vhosts_sdxb");
        //设置用户名
        factory.setUsername("user_sdxb");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}

# 2.2 创建生产者 {#_2-2-创建生产者}

public class Send {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection();
        //2.创建通道
        Channel channel = connection.createChannel();
        //3.创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        for (int i=0;i<50;i++){
            String msg="i="+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

# 2.3 创建消费者一 {#_2-3-创建消费者一}

为了体现消费者处理消息的快慢,我在两个消费者中分别设置线程休眠1s和2s

public class Receive1 {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //创建消费者监听方法
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                try {
                    //设置睡眠实践1s
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

# 2.4 创建消费者二 {#_2-4-创建消费者二}

public class Receive2 {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                try {
                    //设置睡眠时间2s
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

分别将两个消费者运行起来,然后运行生产者发送50条消息,可以发现虽然两个消费者处理消息的能力有快有慢,但是得到的消息都是25条,下面展示消费者1获取的消息部分截图。

# (三)公平分发(Fair dispatch) {#三-公平分发-fair-dispatch}

在某些场景下轮询分发是不合理的,因此工作队列还有公平分发的方式,所谓公平分发,就是能者多劳,处理消息快的人获得消息多,处理消息慢的人获得消息少。公平分发的实现只需要对代码做一些修改:

# 3.1 修改生产者 {#_3-1-修改生产者}

对于生产者,只需要对通道增加一条限制,限制通道发送给同一个消费者不得超过一条消息,也就是只有当消费者处理完一条消息以后才会发第二条消息给它。使用channel.basicQos();方法,设置参数为1表示限制一次不超过1条消息。

public class Send {
    private static final String QUEUE_NAME="work_queue_fair";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //限制通道发送给同一个消费者不得超过一条消息
        int prefenchCount=1;
        channel.basicQos(prefenchCount);
        for (int i=0;i<50;i++){
            String msg="i="+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

# 3.2 修改消费者 {#_3-2-修改消费者}

对于消费者,需要修改三处地方,第一处和生产者一样修改通道的限制信息;第二处关闭消费者的自动应答;第三处设置手动回执,即处理完一条消息后手动发送处理完成的指令给队列。

//保证一次只分发一次
channel.basicQos(1);
//设置手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
//关闭自动应答
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);

以下是修改后的消费者代码

public class Receive1 {
    private static final String QUEUE_NAME="work_queue_fair";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动应答
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

设置完之后工作队列就变成了公平分发方式,测试结果:

# 3.3 关于自动应答 {#_3-3-关于自动应答}

在前面修改消费者代码的时候,我们关闭了自动应答

boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);

这是basicConsume的第二个参数

当autoAck=true时,表示开启自动应答,一旦rabbitmq将队列中的消息发送给消费者,这个消息就会从队列中消失。但是如果此时消费者挂掉了,那么这条消息也就彻底消失了。

当autoAck=false时,关闭自动应答,rabbitmq将队列中的消息发送给消费者,只有当消费者返回确认之后,队列中的消息才会被删除。

赞(6)
未经允许不得转载:工具盒子 » 工作队列详解