20200202 ActiveMQ 3. Java编码实现ActiveMQ通讯

Stella981
• 阅读 281

ActiveMQ 3. Java编码实现ActiveMQ通讯

3.1. 队列(Queue)

目的地(Destination)分为:

  • 点对点的队列(Queue)
  • 一对多的主题(Topic)

3.1.1. 上手代码

  1. pom.xml

    org.apache.activemq activemq-all 5.15.9 org.apache.xbean xbean-spring 3.16
  2. 生产者代码

    public class JmsProducer {

    public static final String ACTIVEMQ_URL = "tcp://192.168.181.128:61616/";
    public static final String QUEUE_NAME = "queue01";
    
    public static void main(String[] args) throws JMSException {
        // 1. 创建连接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2. 通过连接工厂,获得连接Connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
    
        // 3. 创建会话Session
        // 两个参数,第一个是事务控制,第二个是签收控制
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 创建目的地(具体是队列queue或主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5. 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        // 6. 通过消息生产者发送消息
        for (int i = 0; i < 3; i++) {
            // 7. 创建消息
            TextMessage textMessage = session.createTextMessage("msg---" + i);
            // 8. 发送给MQ
            messageProducer.send(textMessage);
        }
        // 9. 关闭资源
        messageProducer.close();
        session.close();
        connection.close();
    
        System.out.println("*****消息发布到MQ完成*****");
    }
    

    }

  3. 消费者代码

    public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://192.168.181.128:61616/";
    public static final String QUEUE_NAME = "queue01";
    
    public static void main(String[] args) throws JMSException {
        // 1. 创建连接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2. 通过连接工厂,获得连接Connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
    
        // 3. 创建会话Session
        // 两个参数,第一个是事务控制,第二个是签收控制
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 创建目的地(具体是队列queue或主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
    
        // 5. 创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while (true) {
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (textMessage != null) {
                System.out.println("*****消费者收到消息:" + textMessage.getText());
            } else {
                break;
            }
        }
    
        // 6. 关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
    

    }

3.1.2. receive()方法说明

// 收到消息前一直阻塞进程
javax.jms.MessageConsumer#receive()
// 超时后不再阻塞进程
javax.jms.MessageConsumer#receive(long timeout)

3.1.3. 消费者监听器方式接收消息

监听器方式属于异步非阻塞方式,所以需要手动阻塞进程

messageConsumer.setMessageListener(new MessageListener() {
    @SneakyThrows
    @Override
    public void onMessage(Message message) {
        if (null != message && message instanceof TextMessage) {
            System.out.println("消费者监听器监听到消息***********" + ((TextMessage) message).getText());
        }
    }
});
// 手动阻塞进程
System.in.read();

3.1.4. 消费者三大消费情况

  1. 先生产,只启动1号消费者。问题:1号消费者可以消费消息吗?

    可以

  2. 先生产,先启动1号消费者,再启动2号消费者。问题:2号消费者可以消费消息吗?

    1号消费者可以消费消息;2号消费者不可以消费消息;

  3. 先启动2个消费者,再生产6条消息。问题:消费情况如何?

    2个消费者各消费一半消息;

3.1.5. 两种消费方式

  1. 同步阻塞方式(receive()

  2. 异步非阻塞方式(消费者监听器onMessage()

3.1.6. 点对点消息传递域的特点

  1. 每个消息只能有一个消费者,类似1对1的关系,类似于快递

  2. 消息的消费者和生产者没有时间上的相关性,类似于短信

  3. 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息

3.2. 主题(Topic)

3.2.1. 发布订阅消息传递域的特点

  1. 每个消息可以有多个消费者,属于一对多的关系
  2. 生产者和消费者有时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息
  3. 生产者生产时,topic不保存消息,它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者

JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时的消息。一句话,类似微信公众号订阅

3.2.2. 上手代码

测试时要先启动消费者,后启动生产者。

  1. 生产者代码

    public class JmsProducer_Topic {

    public static final String ACTIVEMQ_URL = "tcp://192.168.181.128:61616/";
    public static final String TOPIC_NAME = "topic01";
    
    public static void main(String[] args) throws JMSException {
        // 1. 创建连接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2. 通过连接工厂,获得连接Connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
    
        // 3. 创建会话Session
        // 两个参数,第一个是事务控制,第二个是签收控制
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 创建目的地(具体是队列queue或主题topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        // 5. 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);
        // 6. 通过消息生产者发送消息
        for (int i = 0; i < 3; i++) {
            // 7. 创建消息
            TextMessage textMessage = session.createTextMessage("topic---" + i);
            // 8. 发送给MQ
            messageProducer.send(textMessage);
        }
        // 9. 关闭资源
        messageProducer.close();
        session.close();
        connection.close();
    
        System.out.println("*****topic消息发布到MQ完成*****");
    }
    

    }

  2. 消费者代码

    public class JmsConsumer_Topic {

    public static final String ACTIVEMQ_URL = "tcp://192.168.181.128:61616/";
    public static final String TOPIC_NAME = "topic01";
    
    public static void main(String[] args) throws JMSException, IOException {
        System.out.println("我是1号消费者");
        // System.out.println("我是2号消费者");
        // System.out.println("我是3号消费者");
    
        // 1. 创建连接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2. 通过连接工厂,获得连接Connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
    
        // 3. 创建会话Session
        // 两个参数,第一个是事务控制,第二个是签收控制
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 创建目的地(具体是队列queue或主题topic)
        Topic topic = session.createTopic(TOPIC_NAME);
    
        // 5. 创建消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);
    
    
        messageConsumer.setMessageListener(new MessageListener() {
            @SneakyThrows
            @Override
            public void onMessage(Message message) {
                if (null != message && message instanceof TextMessage) {
                    System.out.println("消费者监听器监听到 TOPIC 消息***********" + ((TextMessage) message).getText());
                }
            }
        });
        // 手动阻塞进程
        System.in.read();
    
        // 6. 关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
    

    }

3.3. 两种模式比较

比较项目

Topic 模式

Queue模式

工作模式

订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃;如果有多个订阅者,那么这些订阅者都会收到消息

"负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息只会发送给其中一个消费者,并且要求消费者ack消息

有无状态

无状态

Queue数据默认会在MQ服务器上以文件形式保存。也可以配置成DB存储

传递完整性

如果没有订阅者,消息会被丢弃

消息不会被丢弃

处理效率

由于消息要按照订阅者数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异

由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会明显降低。当然不同消息协议的具体性能也是有差异的

点赞
收藏
评论区
推荐文章
秃头王路飞 秃头王路飞
4个月前
webpack5手撸vue2脚手架
webpack5手撸vue相信工作个12年的小伙伴们在面试的时候多多少少怕被问到关于webpack方面的知识,本菜鸟最近闲来无事,就尝试了手撸了下vue2的脚手架,第一次发帖实在是没有经验,望海涵。languageJavaScript"name":"vuecliversion2","version":"1.0.0","desc
浅梦一笑 浅梦一笑
4个月前
初学 Python 需要安装哪些软件?超级实用,小白必看!
编程这个东西是真的奇妙。对于懂得的人来说,会觉得这个工具是多么的好用、有趣,而对于小白来说,就如同大山一样。其实这个都可以理解,大家都是这样过来的。那么接下来就说一下python相关的东西吧,并说一下我对编程的理解。本人也是小白一名,如有不对的地方,还请各位大神指出01名词解释:如果在编程方面接触的比较少,那么对于软件这一块,有几个名词一定要了解,比如开发环
技术小男生 技术小男生
4个月前
linux环境jdk环境变量配置
1:编辑系统配置文件vi/etc/profile2:按字母键i进入编辑模式,在最底部添加内容:JAVAHOME/opt/jdk1.8.0152CLASSPATH.:$JAVAHOME/lib/dt.jar:$JAVAHOME/lib/tools.jarPATH$JAVAHOME/bin:$PATH3:生效配置
光头强的博客 光头强的博客
4个月前
Java面向对象试题
1、请创建一个Animal动物类,要求有方法eat()方法,方法输出一条语句“吃东西”。创建一个接口A,接口里有一个抽象方法fly()。创建一个Bird类继承Animal类并实现接口A里的方法输出一条有语句“鸟儿飞翔”,重写eat()方法输出一条语句“鸟儿吃虫”。在Test类中向上转型创建b对象,调用eat方法。然后向下转型调用eat()方
刚刚好 刚刚好
4个月前
css问题
1、在IOS中图片不显示(给图片加了圆角或者img没有父级)<div<imgsrc""/</divdiv{width:20px;height:20px;borderradius:20px;overflow:h
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
小森森 小森森
4个月前
校园表白墙微信小程序V1.0 SayLove -基于微信云开发-一键快速搭建,开箱即用
后续会继续更新,敬请期待2.0全新版本欢迎添加左边的微信一起探讨!项目地址:(https://www.aliyun.com/activity/daily/bestoffer?userCodesskuuw5n)\2.Bug修复更新日历2.情侣脸功能大家不要使用了,现在阿里云的接口已经要收费了(土豪请随意),\\和注意
晴空闲云 晴空闲云
4个月前
css中box-sizing解放盒子实际宽高计算
我们知道传统的盒子模型,如果增加内边距padding和边框border,那么会撑大整个盒子,造成盒子的宽度不好计算,在实务中特别不方便。boxsizing可以设置盒模型的方式,可以很好的设置固定宽高的盒模型。盒子宽高计算假如我们设置如下盒子:宽度和高度均为200px,那么这会这个盒子实际的宽高就都是200px。但是当我们设置这个盒子的边框和内间距的时候,那
艾木酱 艾木酱
3个月前
快速入门|使用MemFire Cloud构建React Native应用程序
MemFireCloud是一款提供云数据库,用户可以创建云数据库,并对数据库进行管理,还可以对数据库进行备份操作。它还提供后端即服务,用户可以在1分钟内新建一个应用,使用自动生成的API和SDK,访问云数据库、对象存储、用户认证与授权等功能,可专
helloworld_28799839 helloworld_28799839
4个月前
常用知识整理
Javascript判断对象是否为空jsObject.keys(myObject).length0经常使用的三元运算我们经常遇到处理表格列状态字段如status的时候可以用到vue
helloworld_34035044 helloworld_34035044
6个月前
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为