JMS

开源布道者
• 阅读 1452

JMS

Java消息服务应用程序接口,是Java平台中面向消息中间件的一套规范的Java API接口,用于在两个应用程序之间或分布式系统中发送消息,进行异步通信。它不是消息协议,它是与具体平台无关的API。可以使用JMS API来连接支持AMQP、STOMP等协议的消息中间件产品,比如ActiveMQ、RabbitMQT等。
JMS
1.体系架构
(1)点对点模型
应用程序由队列(Queue)、发送者(Sender)和接收者(Receiver)组成。
特点:

  • 每天消息只有一个接收者,消息一旦被接收就不再保留在消息队列中。
  • 发送者和接收者之间是异步的。
  • 一个队列上可能有多个接收者在监听,但是消息只能被队列中一个接收者接收。
  • 队列按照消息服务器将消息放入队列的先后顺序依次传送给消费者。
  • 当接收者收到消息时,会发送确认收到通知。

(2)发布/订阅模型
应用程序由主题(Topic)、发布者(Publisher)和订阅者(Subscriber)组成。
特点:

  • 每条消息可以有多个订阅者。
  • 发布者可订阅者有时间上的依赖。一般情况下,某个主题的订阅者需要在创建了订阅之后才能接收到消息,而且为了接收消息订阅者必须保持运行的状态。
  • JMS允许订阅者创建一个可持久化的订阅,这样即使订阅者没有运行也能接收到所订阅的消息。
  • 每条消息都会传递给该主题下的所有订阅者。
  • 通常发布者意识不到哪一个订阅者正在接收消息。

2.基本概念

  • 生产者(Producer):创建并发送消息的JMS客户端,点对点模型中就是发送者,在发布/订阅中就是发布者。
  • 消费者(Consumer):接收消息的JMS客户端,在点对点模型中就是接收者,在发布/订阅中就是订阅者。
  • 客户端(Client):生产或消费消息的基于Java的应用程序或对象。
  • 队列(Queue):一个容纳被发送的等待订阅的消息区域,它是点对点模型中的队列。
  • 主题(Topic):一种支持发送消息给多个订阅者的机制。它是发布/订阅模型中的主题。
  • 消息(MESSAGE):JMS客户端之间传递的数据对象。

3.编程接口
(1)ConnectionFactory接口(连接工厂)
创建Connection对象的工厂,根据不同的消息类型分出队列连接工厂和主题连接工厂,分别对应QueueConnectionFactory和TopicConnectionFactory。
(2)Destination接口(目的地)
对于消息生产者,Destination是某个队列或某个主题(消息目的地);对于消息消费者来说,它的Destination也是摸个队列或某个主题(消息来源),所以是Queue或Topic类型的对象。
(3)Connection接口(连接)
客户端和JMS系统之间建立的连接,可以产生一个或多个Session,也有两种类型:QueueConnection和TopicConnection。
(4)Session接口(会话)
实际操作消息的接口,用于发送和接收消息。因为会话是单线程的,所以消息时按照发送的顺序一个个接收的。可以通过session创建生产者,消费者,消息等。同时提供了事务的功能。Session也分为两种类型:QueueSession和TopicSession。
(5)MessageProducer接口(消息生产者)
由Session创建并用于将消息发送到Destination。消息生产者有两种类型:QueueSender和TopicPublisher。
(6)MessageConsumer接口(消息消费者)
由Session创建并用于将消息发送到Destination。消息消费者有两种类型:QueueReceiver和TopicSubscriber
(7)Message接口(消息)
在消费者和生产者之间传送的对象。
(8)MessageListener(消息监听器)
如果注册了消息监听器,那么当消息到达时将自动调用监听器的onMessage方法。
4.JMS1.1示例
(1)引入依赖

<!-- ActiveMQ依赖 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.3</version>
</dependency>

(2)消息生产者

public class QueueProducer {
    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    /**
     * 默认连接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            // 创建连接
            Connection connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 创建队列,需要指定队列名称,消息生产者和消费者将根据它来发送、接收对应的消息
            Queue myTestQueue = session.createQueue("activemq-queue-test1");
            // 消息生产者
            MessageProducer producer = session.createProducer(myTestQueue);
            // 创建一个消息对象
            TextMessage message = session.createTextMessage("测试点对点的一条消息");
            // 发送一条消息
            producer.send(message);
            // 提交事务
            session.commit();
            // 关闭资源
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

(3)消息消费者

public class QueueConsumer {
    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    /**
     * 默认连接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            // 创建连接
            Connection connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 创建队列,需要指定队列名称,消息生产者和消费者将根据它来发送、接收对应的消息
            Queue myTestQueue = session.createQueue("activemq-queue-test1");
            // 消息消费者
            MessageConsumer consumer = session.createConsumer(myTestQueue);
            // 消费者实现监听接口消费消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage)message;
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
           // 主线程休眠100秒,使消息消费者对象能继续存活一段时间,从而能监听到消息
            Thread.sleep(100 * 1000);
            // 关闭资源
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4.JMS2.0概述
主要进行易用性的改进,由三个新接口组成

  • JMSContext:替换JMS1.1中的Connection和Session。
  • JMSProducer:替换MessageProducer,支持链式操作配置消息。
  • JMSConsumer:替换MessageConsumer。
public void sendMessageJMS20(ConnectionFactory connectionFactory, Queue queue, String text) {
        try(JMSContext context = connectionFactory.createContext();) {
            context.createProducer().send(queue, text);
        } catch (JMSRuntimeException ex) {
            ex.printStackTrace();
        }
    }

可以看到只需要创建JMSContext对象,JMSContxt实现了AutoCloseable接口,因此可以不用finally去手动关闭,无需创建TextMessage对象。

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
ActiveMQ的使用
ActiveMQ是Apache出品的开源消息总线。完全支持JMS1.1规范首先我们要了解一下JMSJMS简介Java消息服务(JavaMessageService,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异
Wesley13 Wesley13
3年前
JavaAPI
1.ActiveMQ是什么ActiveMQ是一个消息队列应用服务器(推送服务器)。支持JMS规范。1.1JMS概述全称:JavaMessageService,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)实现了JMS标准的系统,称之
Stella981 Stella981
3年前
Spring Boot实践
一.认识JMS1.1概述对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(JavaMessageService)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提
Stella981 Stella981
3年前
RabbitMQ 消息中间件搭建详解
1.RabbitMQ简介消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包
Stella981 Stella981
3年前
AvctiveMQ和RabbitMQ的区别
ActiveMQ:传统的消息队列,使用Java语言编写。基于JMS(JavaMessageService),采用多线程并发,资源消耗比较大。支持P2P和发布订阅两种模式。RabbitMQ:是使用Erlang语言开发的开源消息队列系统。基于AMQP协议来实现的。AMQP的主要特征是面向消息、队列、路由(
Stella981 Stella981
3年前
RabbitMQ学习:RabbitMQ的基本概念及RabbitMQ使用场景(二)
1、RabbitMQ的基本概念RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统
Stella981 Stella981
3年前
RabbitMQ基础概念详细介绍
RabbitMQ简介AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。R
Stella981 Stella981
3年前
RabbitMQ学习:安装RabbitMQ及RabbitMQ的初步配置(一)
RabbitMQ基础含义RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
Wesley13 Wesley13
3年前
ActiveMQ学习笔记(1)——JMS的概念
1.面向消息的中间件1.1什么是MOM   面向消息的中间件,MessageOrientedMiddleware,简称MOM,中文简称消息中间件,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。  一个MOM系统,通常会包括客户端(Clients)、消息(Messa
Stella981 Stella981
3年前
JMS(Java消息服务)与消息队列ActiveMQ基本使用(一)
最近的项目中用到了mq,之前自己一直在码农一样的照葫芦画瓢。最近几天研究了下,把自己所有看下来的文档和了解总结一下。一.认识JMS1.概述对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(JavaMessageService)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两