51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

发布-订阅模型详解

# (一)发布-订阅模型(Publish/Subscribe) {#一-发布-订阅模型-publish-subscribe}

发布订阅模型的结构图如下所示:

和前两个的模型结构不同,在发布订阅模型中多了一个X(exchange),exchange是一个交换机,生产者不是直接将消息发送给队列,而是先发送给交换机。消费者可以通过队列去订阅这个交换机,每个消费者对应于自己的一个队列。

这个结构就好像我们订阅微信公众号一样,作者将文章发送到自己的公众号上,只有订阅过该公众号的人才能收到消息。因此这个模型被称为发布-订阅模型。

# (二)发布-订阅模型实践 {#二-发布-订阅模型实践}

发布订阅模型中多了交换机的存在,而我们在rabbitmq的可视化界面中就见到过exchange

继续通过代码展示该模型:

# 2.1 工具类 {#_2-1-工具类}

工具类和之前都一样,不做介绍了

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置AMQP端口
        factory.setPort(5672);
        //设置VHOSTS
        factory.setVirtualHost("/vhosts_sdxb");
        //设置用户名
        factory.setUsername("user_sdxb");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}

# 2.2 生产者 {#_2-2-生产者}

public class Sent {
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,设置类型为fanout
        channel.exchangeDeclare(EXCHANGE,"fanout");
        String msg="hello world";
        channel.basicPublish(EXCHANGE,"",null,msg.getBytes());
        channel.close();
        connection.close();
    }
}

生产者在发布订阅模型中不再绑定队列,而是绑定交换机。exchange的种类有4中,分别是Direct 、Fanout 、Topic、Headers。接下来会做详细介绍。

# 2.3 消费者一 {#_2-3-消费者一}

public class Receive1 {
    private static final String QUEUE_NAME="ps_queue1";
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列和交换机绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE,"");
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动回复
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者的代码和之前一样,唯一的区别是增加了队列和交换机的绑定channel.queueBind();

# 2.4 消费者二 {#_2-4-消费者二}

public class Receive2 {
    private static final String QUEUE_NAME="ps_queue2";
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列和交换机绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE,"");
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动回复
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

# 2.5 运行项目 {#_2-5-运行项目}

由于此时rabbitmq中不存在名称为ps_exchange的交换机,因此我们可以手动在rabbitmq的可视化界面中创建,也可以运行一次生产者来创建交换机。接着运行两个消费者和生产者,可以看到生产者发送出去的消息被消费者收到。

观察此时的可视化界面,可以看到该交换机上已经绑定了两个队列:

# (三)Exchange类型介绍 {#三-exchange类型介绍}

exchange的种类有4中,分别是Direct 、Fanout 、Topic、Headers。接下来会做详细介绍。

# 3.1 Fanout(不处理路由键) {#_3-1-fanout-不处理路由键}

直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。即交换机将消息从生产者获取之后,直接发给订阅的队列。

# 3.2 Direct(处理路由键) {#_3-2-direct-处理路由键}

交换机和队列绑定时会设置路由键(routingkey),当消息从生产者发送给交换机时也会发送一个路由键。只有当这两个路由键相同时,交换机才会把消息发送给队列。

# 3.3 Topic(可以有通配符) {#_3-3-topic-可以有通配符}

Topic和Direct类似,只不过Direct要求路由键完全相同,但是Topic可以使用通配符进行匹配,如#,*

# 3.4 header(根据header匹配) {#_3-4-header-根据header匹配}

在发布消息的时候就需要传入header值,其中的header就是binding时的arguments参数

赞(4)
未经允许不得转载:工具盒子 » 发布-订阅模型详解