RabbitMQ之路由和通配符模式,附源码注释讲解

虚拟现实
• 阅读 410

昨天写了关于RabbitMQ的上篇:全网首发,做第一人纯源码讲解RabbitMQ实践,收藏吧

昨天分享的是在日常开发过程中的一些基础操作,像RabbitMQ的简单应用、工作队列模式、发布/订阅模式,但是老大交给我的重任,我只写这点怎么可能,所以,我这不是就来了呀,路由模式、通配符模式两种模式也是在日常的开发过程中被使用的,昨天因为篇幅原因没有整理,今天整理出来分享给大家,希望对正在学习的大家有所帮助。

每一个源码所包含的内容我已经在代码中做好注释了,希望大家看完这两篇文章,五种模式之后能自己去实现一下,不实现你只能是会看,不属于你自己

好了,RabbitMQ的实战我暂时就整理了这么多,关注我,后期我会更新其他的技术文章,关注公众号:Java架构师联盟,每日更新技术好文

下面来看正事


路由模式(Routing)

消息过滤接收

RabbitMQ之路由和通配符模式,附源码注释讲解

特点:

  • 生产者发送消息到交换机,同时指定routingKey
  • 一个交换机绑定多个队列,每个队列设置routingKey,并且,一个队列可以设置多个routingKey
  • 交换机通过routingKey来决定把消息转发给哪些队列
  • 其实就是发布订阅模式的加强版。

二、使用步骤

引入库

代码如下(示例):

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

生产者

public class Product {

    private final static String QUEUE_1 = "queue_1";
    private final static String QUEUE_2 = "queue_2";
    // 交换机名称
    private final static String EXCHANGE_NAME = "exchange_dirct_routing";
​
    // 设置两个routingKey
    private final static String ROUTINGKEY_1 = "info_key";
    private final static String ROUTINGKEY_2 = "error_key";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        // 连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = null;
        Channel channel = null;
        try{
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 声明交换机
            /**
             * 1. 交换机名称
             * 2、交换机类型:
             *  FANOUT:对应的模式就是 发布/订阅模式
             *  DIRECT:对应 路由(Routing) 的工作模式
             *  TOPIC:对应 Topics 工作模式
             *  HEADERS: 对应 HEADERS 工作模式
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            
            // 声明队列
            channel.queueDeclare(QUEUE_1,false,false,false, null);
            channel.queueDeclare(QUEUE_2, false, false, false,null);
            // 交换机队列绑定
            /**
             * 1 queue 队列名称
             * 2 exchange 交换机名称
             * 3 routingKey 路由Key
             */
            // 队列(QUEUE_1)绑定routingKey_1
            channel.queueBind(QUEUE_1,EXCHANGE_NAME,ROUTINGKEY_1);
            // 队列(QUEUE_2)绑定routingKey_2和routingKey_1
            channel.queueBind(QUEUE_2,EXCHANGE_NAME,ROUTINGKEY_2);
            channel.queueBind(QUEUE_2,EXCHANGE_NAME,ROUTINGKEY_1);
​
            String message = "这是发送的message biubiubiu~";
​
            // 参数:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * exchange:交换机 如果不指定(""),就默认交换机
             * routingKey:路由key;交换机根据路由key将消息转发到指定的队列,如果使用默认交换机,routingKey为队列名称
             * props:额外属性
             * body:消息内容
             */
            for(int i=0;i<4;i++){
                // 修改这里的routingkey 你可以看到不同的效果喔
                // channel.basicPublish(EXCHANGE_NAME,ROUTINGKEY_1,null,message.getBytes());
                channel.basicPublish(EXCHANGE_NAME,ROUTINGKEY_2,null,message.getBytes());
                System.out.println("发送了message:"+message+", routingKey: "+ROUTINGKEY_2);
            }
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }
    }
}

消费者1

/**
 * 对应QUEUE_1的消费者
 */
public class Consumer_1 {
​
    private final static String QUEUE_1 = "queue_1";
    private final static String QUEUE_2 = "queue_2";
    private final static String EXCHANGE_NAME = "exchange_dirct_routing";
​
    private final static String ROUTINGKEY_1 = "info_key";
    private final static String ROUTINGKEY_2 = "error_key";
​
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("root");
​
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 声明队列
            channel.queueDeclare(QUEUE_1,false,false,false, null);
//            channel.queueDeclare(QUEUE_2, false, false, false,null);
​
            // 交换机队列绑定
            channel.queueBind(QUEUE_1,EXCHANGE_NAME,ROUTINGKEY_1);
​
//            channel.queueBind(QUEUE_2,EXCHANGE_NAME,ROUTINGKEY_2);
//            channel.queueBind(QUEUE_2,EXCHANGE_NAME,ROUTINGKEY_1);
​
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    String message = new String (body, StandardCharsets.UTF_8);
                    System.out.println("receive:" + message);
                }
            };
​
            // 接收消息 监听队列
            // 参数:String queue, boolean autoAck, Consumer callback
            /**
             * queue:队列
             * autoAck:自动回复:当消费者接收到消息后,告诉mq消息已经接收。TRUE:自动回复,false:编程回复
             * callback:消费方法,当消费者接收消息执行的方法。
             */
            channel.basicConsume(QUEUE_1,true,consumer);
​
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

消费者2

public class Consumer_2 {
    private final static String QUEUE_1 = "queue_1";
    private final static String QUEUE_2 = "queue_2";
    private final static String EXCHANGE_NAME = "exchange_dirct_routing";
​
    private final static String ROUTINGKEY_1 = "info_key";
    private final static String ROUTINGKEY_2 = "error_key";
​
    public static void main(String[] args) {
​
        ConnectionFactory factory = new ConnectionFactory();
​
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("root");
​
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 声明队列
            channel.queueDeclare(QUEUE_2, false, false, false,null);
​
            // 交换机队列绑定
            /**
             * 1 queue 队列名称
             * 2 exchange 交换机名称
             * 3 routingKey 路由Key
             */
            channel.queueBind(QUEUE_2,EXCHANGE_NAME,ROUTINGKEY_2);
            channel.queueBind(QUEUE_2,EXCHANGE_NAME,ROUTINGKEY_1);
​
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    String message = new String (body, StandardCharsets.UTF_8);
                    System.out.println("receive:" + message);
                }
            };
​
            // 接收消息 监听队列
            // 参数:String queue, boolean autoAck, Consumer callback
            /**
             * queue:队列
             * autoAck:自动回复:当消费者接收到消息后,告诉mq消息已经接收。TRUE:自动回复,false:编程回复
             * callback:消费方法,当消费者接收消息执行的方法。
             */
            channel.basicConsume(QUEUE_2,true,consumer);
​
        }catch (Exception ignored){
            ignored.printStackTrace();
        }
    }
​
}

通配符模式(Topic)

路由模式,routingKey采用通配符模式。

RabbitMQ之路由和通配符模式,附源码注释讲解

特点:

  • 跟routing模式相比,Topics模式是通配符匹配
* : 只能匹配一个词,例如: info.*,他可以匹配 info.hello,info.world
# : 可以匹配多个词,例如: info.#,它可以匹配 info.hello.world,info.xx,info
# 可以匹配空

二、使用步骤

引入库

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

生产者

两个队列 支付宝 和 微信, 有些人想要支付宝,有些人想要微信 ,而有些人全都要
public class TopicProduct {
    // 两个队列 支付宝 和 微信, 有些人想要支付宝,有些人想要微信 有些人全都要
    private final static String QUEUE_1 = "queue_topic_alipay";
    private final static String QUEUE_2 = "queue_topic_wechat";
    private final static String EXCHANGE_NAME = "exchange_topic";
​
    /**
     * 可以匹配 info.alipay,info.alipay.wechat...
     * 当 info.alipay.wechat 可以同时匹配两个路由key
     */
    private final static String ROUTINGKEY_1 = "info.#.alipay.#";
    private final static String ROUTINGKEY_2 = "info.#.wechat.#";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        // 连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = null;
        Channel channel = null;
        try{
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            // 声明队列
            channel.queueDeclare(QUEUE_1,false,false,false,null);
            channel.queueDeclare(QUEUE_2,false,false,false,null);
​
            /* 交换机队列绑定
             * 1 queue 队列名称
             * 2 exchange 交换机名称
             * 3 routingKey 路由Key
             */
            channel.queueBind(QUEUE_1,EXCHANGE_NAME,ROUTINGKEY_1);
            channel.queueBind(QUEUE_2,EXCHANGE_NAME,ROUTINGKEY_2);
​
            String message = "";
​
            // 参数:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * exchange:交换机 如果不指定(""),就默认交换机
             * routingKey:路由key;交换机根据路由key将消息转发到指定的队列,如果使用默认交换机,routingKey为队列名称
             * props:额外属性
             * body:消息内容
             */
            message = "这个是给喜欢 AliPay 的朋友们的~";
            channel.basicPublish(EXCHANGE_NAME,"info.alipay",null,message.getBytes());
            System.out.println(new Date()+":发送了message:"+message+", Topic routingKey: "+ROUTINGKEY_1);
​
            message = "喜欢 wechat 的朋友,接好了";
            channel.basicPublish(EXCHANGE_NAME,"info.wechat",null,message.getBytes());
            System.out.println(new Date()+":发送了:"+message+", Topic routingKey:"+ROUTINGKEY_2);
​
            message = "同时喜欢 ALIPAY 和 WECHAT 的朋友接好了";
            channel.basicPublish(EXCHANGE_NAME,"info.alipay.wechat",null,message.getBytes());
            System.out.println(new Date()+":发送了:"+message+",Topic routingKeys:"+ROUTINGKEY_2+","+ROUTINGKEY_1);
​
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }
    }
}

消费者1

喜欢alipay的
public class Consumer_Topic_1 {
    private final static String QUEUE_1 = "queue_topic_alipay";
    //private final static String QUEUE_2 = "queue_topic_wechat";
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String ROUTINGKEY_1 = "info.#.alipay.#";
    //private final static String ROUTINGKEY_2 = "info.#.wechat.#";
​
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("root");
​
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            // 声明队列
            channel.queueDeclare(QUEUE_1,false,false,false, null);
​
            // 交换机队列绑定
            channel.queueBind(QUEUE_1,EXCHANGE_NAME,ROUTINGKEY_1);
​
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    String message = new String (body, StandardCharsets.UTF_8);
                    System.out.println("receive:" + message);
                }
            };
​
            // 接收消息 监听队列
            // 参数:String queue, boolean autoAck, Consumer callback
            /**
             * queue:队列
             * autoAck:自动回复:当消费者接收到消息后,告诉mq消息已经接收。TRUE:自动回复,false:编程回复
             * callback:消费方法,当消费者接收消息执行的方法。
             */
            channel.basicConsume(QUEUE_1,true,consumer);
​
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
​

输出如下:

receive:这个是给喜欢 AliPay 的朋友们的~
receive:喜欢 ALIPAY 和 WECHAT 接好了

消费者2

喜欢wechat
public class Consumer_Topic_2 {
    // private final static String QUEUE_1 = "queue_topic_alipay";
    private final static String QUEUE_2 = "queue_topic_wechat";
    private final static String EXCHANGE_NAME = "exchange_topic";
​
    // private final static String ROUTINGKEY_1 = "info.#.alipay.#";
    private final static String ROUTINGKEY_2 = "info.#.wechat.#";
​
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("root");
​
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            // 声明队列
            channel.queueDeclare(QUEUE_2, false, false, false,null);
​
            // 交换机队列绑定
            /**
             * 1 queue 队列名称
             * 2 exchange 交换机名称
             * 3 routingKey 路由Key
             */
            channel.queueBind(QUEUE_2,EXCHANGE_NAME,ROUTINGKEY_2);
​
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    String message = new String (body, StandardCharsets.UTF_8);
                    System.out.println("receive:" + message);
                }
            };
​
            // 接收消息 监听队列
            // 参数:String queue, boolean autoAck, Consumer callback
            /**
             * queue:队列
             * autoAck:自动回复:当消费者接收到消息后,告诉mq消息已经接收。TRUE:自动回复,false:编程回复
             * callback:消费方法,当消费者接收消息执行的方法。
             */
            channel.basicConsume(QUEUE_2,true,consumer);
​
        }catch (Exception ignored){
            ignored.printStackTrace();
        }
    }
}
​
​

输出如下:

receive:喜欢 wechat 的朋友,接好了
receive:喜欢 ALIPAY 和 WECHAT 接好了
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Peter20 Peter20
4年前
mysql中like用法
like的通配符有两种%(百分号):代表零个、一个或者多个字符。\(下划线):代表一个数字或者字符。1\.name以"李"开头wherenamelike'李%'2\.name中包含"云",“云”可以在任何位置wherenamelike'%云%'3\.第二个和第三个字符是0的值wheresalarylike'\00%'4\
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Wesley13 Wesley13
3年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这