SpringBoot集成RocketMQ

Stella981
• 阅读 489

实战,用案例来说话

前面已经说了JMS和RocketMQ一些概念和安装,下面使用SpringBoot来亲身操作一下.

生产者的操作

  1. SpringBoot项目创建完成,引入依赖是第一步:

    org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.apache.rocketmq rocketmq-client 4.3.0
  2. 创建生产者是第二步,生产者必须依赖于生产组,而且需要指定nameServer

    @Component public class PayProducer {

    /**
     * 生产组,生产者必须在生产组内
     */
    private String producerGroup = "pay_group";
    
    /**
     * 端口
     */
    private String nameServer = "39.106.214.179:9876";
    
    
    private DefaultMQProducer producer;
    
    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        // 指定nameServer地址,多个地址之间以 ; 隔开
        producer.setNamesrvAddr(nameServer);
        start();
    }
    
    public DefaultMQProducer getProducer() {
        return producer;
    }
    
    /**
     * 对象在使用之前必须调用一次,并且只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        producer.shutdown();
    }
    

    }

  3. 创建Controller进行测试发送消息,必须要指定topic,消息依赖于主题

    @RestController public class PayController {

    @Autowired
    private PayProducer payProducer;
    
    /**
     * topic,消息依赖于topic
     */
    private static final String topic = "pay_test_topic";
    
    
    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        // 创建消息  主题   二级分类   消息内容好的字节数组
        Message message = new Message(topic, "taga", ("hello rocketMQ " + text).getBytes());
    
        SendResult send = payProducer.getProducer().send(message);
    
        System.out.println(send);
    
        return new HashMap<>();
    }
    

    }

  4. 采坑记录

    • 上面完成就可以启动项目了,访问之后报错了:

      MQClientException: No route info of this topic, TopicTest1 这个的原因就是Broker禁止自动创建Topic且用户没有通过手动方式创建Topic, 或者是broker与Nameserver网络不通 解决: 使用手动创建Topic,在RocketMQ控制台的主题中创建就好,最主要的是指定topic name,如下图 出现创建不了的情况往下看 如果还出现这个问题,请关闭防火墙

    SpringBoot集成RocketMQ

    • 这次说下上面可能创建不了的问题,前面说了安装开放安全组,这次就是因为rocketMQ虚拟的端口问题,需要开放10909,也就是说ECS最终开放的端口号: 8080,10911,9876,10909

    • 继续采坑

      org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

      这个问题是阿里云服务器存在多个网卡,rocketMQ会根据当前网卡选择一个IP使用,我们需要制定一个IP: 路径是: /usr/local/software/rocketmq/distribution/target/apache-rocketmq vim ./conf/broker.conf 添加配置: brokerIP1=公网IP 重新启动: nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf & tail -f nohup.out

    • 其他问题

      https://blog.csdn.net/qq_14853889/article/details/81053145
      https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E
      https://www.jianshu.com/p/bfd6d849f156
      https://blog.csdn.net/wangmx1993328/article/details/81588217
      

消费者操作

  1. 在前一个项目的基础上,将公共内容提取出来,创建一个JsmConfig的类,来声明公共内容:

    public class JmsConfig {
    
        /**
         * 端口
         */
        public static final String NAME_SERVER = "39.106.214.179:9876";
    
        /**
         * topic,消息依赖于topic
         */
        public static final String TOPIC = "pay_test_topic";
    }
    
  2. 生产者内容变为

    @Component
    public class PayProducer {
    
        /**
         * 生产组,生产者必须在生产组内
         */
        private String producerGroup = "pay_producer_group";
    
        private DefaultMQProducer producer;
    
        public PayProducer() {
            producer = new DefaultMQProducer(producerGroup);
            // 指定nameServer地址,多个地址之间以 ; 隔开
            producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            start();
        }
    
        public DefaultMQProducer getProducer() {
            return producer;
        }
    
        /**
         * 对象在使用之前必须调用一次,并且只能初始化一次
         */
        public void start() {
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 一般在应用上下文,使用上下文监听器,进行关闭
         */
        public void shutdown() {
            producer.shutdown();
        }
    
    }
    
  3. 创建消费者

    @Component
    public class PayConsumer {
    
        private DefaultMQPushConsumer consumer;
    
        private String consumerGroup = "pay_consumer_group";
    
        public PayConsumer() throws MQClientException {
            consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            // 设置消费地点,从最后一个进行消费(其实就是消费策略)
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            // 订阅主题的哪些标签
            consumer.subscribe(JmsConfig.TOPIC, "*");
            // 注册监听器
            consumer.registerMessageListener((MessageListenerConcurrently)
                    (msgs, context) -> {
                        try {
                            // 获取Message
                            Message msg = msgs.get(0);
                            System.out.printf("%s Receive New Messages: %s %n",
                                    Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                            String topic = msg.getTopic();
                            String body = new String(msg.getBody(), "utf-8");
                            // 标签
                            String tags = msg.getTags();
                            String keys = msg.getKeys();
                            System.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body);
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    });
            consumer.start();
            System.out.println("Consumer Listener");
        }
    
    }
    
  4. Controller的变化:

    @RestController
    public class PayController {
    
        @Autowired
        private PayProducer payProducer;
    
    
    
        @RequestMapping("/api/v1/pay_cb")
        public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            // 创建消息  主题   二级分类   消息内容好的字节数组
            Message message = new Message(JmsConfig.TOPIC, "taga", ("hello rocketMQ " + text).getBytes());
    
            SendResult send = payProducer.getProducer().send(message);
    
            System.out.println(send);
    
            return new HashMap<>();
        }
    
    }
    

梳理一下整个流程,生产者存在于生产组,所以生产组很重要,创建生产者需要指定生产组.消费者同理,创建消费者也需要指定消费组. 并且二者都需要指定NameServer. 有了生产者就要发送消息,也就是Message,创建Message需要指定Topic,二级分类和消息体等信息. 那消费者如何获取呢? 无非就是绑定Topic和二级分类就可以,这就是整个流程. 中间少说了消息的存放,消息是在broker中,这个相当于仓库,所以就是生产者生产消息到Broker,Consumer从Broker中获取消息进行消费.

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Easter79 Easter79
2年前
SpringBoot集成RocketMQ
实战,用案例来说话前面已经说了JMS和RocketMQ一些概念和安装,下面使用SpringBoot来亲身操作一下.生产者的操作1.SpringBoot项目创建完成,引入依赖是第一步:<dependency<groupIdorg.springframework.boot</groupId
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
4个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这