java 编程技术异步通信

贾探春
• 阅读 690

一、分布式的业务场景

1 、如何高效完成各个分布式系统的协作

通过消息队列来达到异步解耦的效果,减少了程序之间的阻塞等待时间,降低了因为服务之间调用的依赖风险。

2、消息的弊端?如何解决?

消息队列的问题在于不确定性,java培训不能绝对保证消息的准确到达,所以要引入延迟、周期性的主动轮询,来发现未到达的消息,从而进行补偿。

二、消息队列简介
消息队列,也叫消息中间件。消息的传输过程中保存消息的容器。

消息队列都解决了什么问题?

异步

2、并行

3、解耦

4、排队(削峰填谷)

5 弊端:不确定性和延迟

解决方案:最终一致

消息模式
点对点

订阅

三 消息队列工具 ActiveMQ
1 、简介

同类产品: RabbitMQ 、 Kafka、Redis(List)

1.1 对比 RabbitMQ

最接近的同类型产品,经常拿来比较,性能伯仲之间,基本上可以互相替代。最主要区别是二者的协议不同 RabbitMQ 的协议是 AMQP(Advanced Message Queueing Protoco),而 ActiveMQ 使用的是 JMS(Java Messaging Service )协议。顾名思义 JMS 是针对 Java 体系的传输协议,队列两端必须有 JVM,所以如果开发环境都是 java 的话推荐使用 ActiveMQ,可以用 Java 的一些对象进行传递比如 Map、BLob、Stream 等。而 AMQP 通用行较强,非 java 环境经常使用,传输内容就是标准字符串。

另外一点就是 RabbitMQ 用 Erlang 开发,安装前要装 Erlang 环境,比较麻烦。ActiveMQ 解压即可用不用任何安装。

1.2 对比 KafKa

Kafka 性能超过 ActiveMQ 等传统 MQ 工具,集群扩展性好。

弊端是:

在传输过程中可能会出现消息重复的情况,

不保证发送顺序

一些传统 MQ 的功能没有,比如消息的事务功能。

所以通常用 Kafka 处理大数据日志。

1.3 对比 Redis

其实 Redis 本身利用 List 可以实现消息队列的功能,但是功能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简单的场景可以使用。

2 安装 ActiveMQ

拷贝 apache-activemq-5.14.4-bin.tar.gz 到 Linux 服务器的/opt 下

解压缩 tar -zxvfapache-activemq-5.14.4-bin.tar.gz

重命名 mv apache-activemq-5.14.4 activemq

vim /opt/activemq/bin/activemq

增加两行

JAVA_HOME="/opt/jdk1.8.0_152"JAVA_CMD="/opt/jdk1.8.0_152/bin

注册服务

ln -s /opt/activemq/bin/activemq /etc/init.d/activemqchkconfig --add activemq
复制代码

启动服务

service activemq start

关闭服务

service activemq stop

通过 netstat 查看端口

activemq 两个重要的端口,一个是提供消息队列的默认端口:61616

另一个是控制台端口 8161

通过控制台测试

启动消费端

进入网页控制台

账号/密码默认: admin/admin

点击 Queues

观察客户端

在 Java 中使用消息队列
3.1 在 gmall-service-util 中导入依赖坐标

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.2</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency>

3.2 producer 端

public static void main(String[] args) {ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.67.163:61616");try {Connection connection = connect.createConnection();connection.start();//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0Session session = connection.createSession(true, Session.SESSION_TRANSACTED);Queue testqueue = session.createQueue("TEST1");MessageProducer producer = session.createProducer(testqueue);TextMessage textMessage=new ActiveMQTextMessage();textMessage.setText("今天天气真好!");producer.setDeliveryMode(DeliveryMode.PERSISTENT);producer.send(textMessage);session.commit();connection.close();} catch (JMSException e) {e.printStackTrace();}}

3.3 consumer

public static void main(String[] args) {ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.67.163:61616");try {Connection connection = connect.createConnection();connection.start();//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination testqueue = session.createQueue("TEST1");MessageConsumer consumer = session.createConsumer(testqueue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if(message instanceof TextMessage){try {String text = ((TextMessage) message).getText();System.out.println(text);//session.rollback();} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}});}catch (Exception e){e.printStackTrace();}}

3.4 关于事务控制

3.5 持久化与非持久化

通过 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

持久化的好处就是当 activemq 宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

四 与 springboot 整合
1 配置类 ActiveMQConfig

@Configurationpublic class ActiveMQConfig {@Value("${spring.activemq.broker-url:disabled}")String brokerURL ;@Value("${activemq.listener.enable:disabled}")String listenerEnable;@Beanpublic ActiveMQUtil getActiveMQUtil() throws JMSException {if(brokerURL.equals("disabled")){return null;}ActiveMQUtil activeMQUtil=new ActiveMQUtil();activeMQUtil.init(brokerURL);return activeMQUtil;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂@Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();if(!listenerEnable.equals("true")){return null;}factory.setConnectionFactory(activeMQConnectionFactory);//设置并发数factory.setConcurrency("5");//重连间隔时间 factory.setRecoveryInterval(5000L);factory.setSessionTransacted(false);factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);return factory;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory ( ){ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory( brokerURL);return activeMQConnectionFactory;}}

2 工具类 ActiveMQUtil
public class ActiveMQUtil {PooledConnectionFactory pooledConnectionFactory=null;public ConnectionFactory init(String brokerUrl) {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);//加入连接池pooledConnectionFactory=new PooledConnectionFactory(factory);//出现异常时重新连接pooledConnectionFactory.setReconnectOnException(true);//pooledConnectionFactory.setMaxConnections(5);pooledConnectionFactory.setExpiryTimeout(10000);return pooledConnectionFactory;}public ConnectionFactory getConnectionFactory(){return pooledConnectionFactory;}}

五 在支付业务模块中应用
1 支付成功通知

支付模块利用消息队列通知订单系统,支付成功

在支付模块中配置 application.properties

spring.activemq.broker-url=tcp://mq.server.com:61616
复制代码

在 PaymentServiceImpl 中增加发送方法:

public void sendPaymentResult(String orderId,String result){ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory();Connection connection=null;try {connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(true, Session.SESSION_TRANSACTED);Queue paymentResultQueue = session.createQueue("PAYMENT_RESULT_QUEUE");MapMessage mapMessage=new ActiveMQMapMessage();mapMessage.setString("orderId",orderId);mapMessage.setString("result",result);MessageProducer producer = session.createProducer(paymentResultQueue);producer.send(mapMessage);session.commit();
复制代码

producer.close();session.close();connection.close();} catch (JMSException e) {e.printStackTrace();}}

在 PaymentController 中增加一个方法用来测试

@RequestMapping("sendResult")@ResponseBodypublic String sendPaymentResult(@RequestParam("orderId") String orderId){paymentService.sendPaymentResult(orderId,"success" );return "has been sent";}

在浏览器中访问:

查看队列内容:有一个在队列中没有被消费的消息。

2 订单模块消费消息

application.properties

spring.activemq.broker-url=tcp://mq.server.com:61616activemq.listener.enable=true
复制代码

订单消息消息后要更新订单状态,先准备好订单状态更新的方法

public void updateProcessStatus(String orderId , ProcessStatus processStatus, Map<String,String>... paramMaps) {OrderInfo orderInfo = new OrderInfo();orderInfo.setId(orderId);orderInfo.setOrderStatus(processStatus.getOrderStatus());orderInfo.setProcessStatus(processStatus);//动态增加需要补充更新的属性if (paramMaps != null && paramMaps.length > 0) {Map<String, String> paramMap = paramMaps[0];for (Map.Entry<String, String> entry : paramMap.entrySet()) {String properties = entry.getKey();String value = entry.getValue();try {BeanUtils.setProperty(orderInfo, properties, value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}orderInfoMapper.updateByPrimaryKeySelective(orderInfo);}
复制代码

消息队列的消费端

@JmsListener(destination = "PAYMENT_RESULT_QUEUE",containerFactory = "jmsQueueListener")public void consumePaymentResult(MapMessage mapMessage) throws JMSException {String orderId = mapMessage.getString("orderId");String result = mapMessage.getString("result");if(!"success".equals(result)){orderService.updateProcessStatus( orderId , ProcessStatus.PAY_FAIL);}else{orderService.updateProcessStatus( orderId , ProcessStatus.PAID);} orderService.sendOrderResult(orderId);}
复制代码

3 订单模块发送减库存通知

订单模块除了接收到请求改变单据状态,还要发送库存系统

查看看《库存管理系统接口手册》中【减库存的消息队列消费端接口】中的描述,组织相应的消息数据进行传递。

@Transactionalpublic void sendOrderResult(String orderId){OrderInfo orderInfo = getOrderInfo(orderId);Map<String, Object> messageMap = initWareOrderMessage(orderInfo);String wareOrderJson= JSON.toJSONString(messageMap);Session session = null;try {Connection conn = activeMQUtil.getConnection();session = conn.createSession(true, Session.SESSION_TRANSACTED);Queue queue = session.createQueue("ORDER_RESULT_QUEUE");MessageProducer producer = session.createProducer(queue);TextMessage message =new ActiveMQTextMessage();message.setText(wareOrderJson);producer.send(message);updateProcessStatus(orderInfo.getId(), ProcessStatus.NOTIFIED_WARE);session.commit();producer.close();conn.close();} catch (JMSException e) {e.printStackTrace();}}
复制代码

针对接口手册中需要的消息进行组织

public Map<String,Object> initWareOrderMessage( OrderInfo orderInfo ) {//准备发送到仓库系统的订单String wareId = orderInfo.getWareId();HashMap<String, Object> hashMap = new HashMap<>();hashMap.put("orderId", orderInfo.getId());hashMap.put("consignee", orderInfo.getConsignee());hashMap.put("consigneeTel", orderInfo.getConsigneeTel());hashMap.put("orderComment", orderInfo.getOrderComment());hashMap.put("orderBody", orderInfo.getOrderSubject());hashMap.put("deliveryAddress", orderInfo.getDeliveryAddress());hashMap.put("paymentWay", "2");//1 货到付款 2 在线支付hashMap.put("wareId",wareId);List<HashMap<String, String>> details = new ArrayList<>();List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();for (OrderDetail orderDetail : orderDetailList) {HashMap<String, String> detailMap = new HashMap<>();detailMap.put("skuId", orderDetail.getSkuId());detailMap.put("skuNum", "" + orderDetail.getSkuNum());detailMap.put("skuName", orderDetail.getSkuName());details.add(detailMap);}hashMap.put("details", details);return hashMap;}
复制代码

4 消费减库存结果

给仓库系统发送减库存消息后,还要接受减库存成功或者失败的消息。

同样根据《库存管理系统接口手册》中【商品减库结果消息】的说明完成。消费该消息的消息队列监听程序。

接受到消息后主要做的工作就是更新订单状态。

@JmsListener(destination = "SKU_DEDUCT_QUEUE",containerFactory = "jmsQueueListener")public void consumeSkuDeduct(MapMessage mapMessage) throws JMSException {String orderId = mapMessage.getString("orderId");String status = mapMessage.getString("status");if("DEDUCTED".equals(status)){orderService.updateProcessStatus( orderId , ProcessStatus.WAITING_DELEVER);return ;}else{orderService.updateProcessStatus( orderId , ProcessStatus.STOCK_EXCEPTION);return ;}}
复制代码

最后一次支付完成后,所有业务全部走通应该可以在订单列表中,查看到对应的订单是待发货状态。

关键词:java培训

点赞
收藏
评论区
推荐文章
Easter79 Easter79
4年前
spring上下文的异步Event事件
在实际开发中,我们经常会需要做一件事:在完成某一个动作之后,需要另外以同步或者异步的方式去通知另外的对象去完成额外的操作,比如:当用户下单成功之后,需要发异步消息到给到邮件系统发邮件(短信)通知用户。(这里就涉及到异步消息的概念)消息队列是我们用来解决系统与系统之间异步与解耦的极佳实践工具,而在应用内部这个级别上,有时候也会需要这样的异步消息通知机制
Centos7安装RabbitMQ详细教程 - 附带软件基本解释 - CSDN博客
MQ引言什么是MQMQ:messageQueue翻译为消息队列,通过典型的生产者和消费者模型不断向消息队列中生产消息,消费者不断从队列中获取消息。因为消息的生产和消费都是一部的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现了系统之间的解耦。别名是消息中间件,通过利用高效的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系
Stella981 Stella981
4年前
Spring Boot(七):RabbitMQ 详解
一、RabbitMQ简介RabbitMQ即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的
Stella981 Stella981
4年前
RabbitMQ 消息中间件搭建详解
1.RabbitMQ简介消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包
Wesley13 Wesley13
4年前
MQ 消息队列
1、场景作用削峰填谷,异步解耦。2、如何保证消息不被重复消费呢?这个问题可以换个思路,保证消息重复消费,其实是保证程序的幂等性。无论消息如何重复,程序运行的结果是一致的。比如消费消息后做数据库插入操作,为了防止消息重复消费,可以在插入前先查询一下有没有对应的数据。3、怎么保证从消息队列里拿到的数据按顺序执
专为小白打造—Kafka一篇文章从入门到入土 | 京东云技术团队
一、什么是KafkaMQ消息队列作为最常用的中间件之一,其主要特性有:解耦、异步、限流/削峰。Kafka和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka还提供了大多数消息系
MQ消息乱序问题解析与实战解决方案
作者:京东物流刘浩1.背景在分布式系统中,消息队列(MQ)是实现系统解耦、异步通信的重要工具。然而,MQ消费时出现的消息乱序问题,经常会对业务逻辑的正确执行和系统稳定性产生不良影响。本文将详细探讨MQ消息乱序问题的根源,并提供一系列在实际应用中可行的解决方