SpringBoot整合RocketMq
one piece 29 0

1.maven依赖

  <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

2.配置文件yml

rocketmq:
  name-server: http://127.0.0.1:9876
  producer:
    group: GROUP

3.顺序发送消息

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 顺序消息
     *
     * @param message
     * @param tag
     * @param key
     */
    public void send(String topic, String message, String tag, String key) {
        rocketMQTemplate.send(topic,
                MessageBuilder.withPayload(message)
                        .setHeader(RocketMQHeaders.TAGS, tag)
                        .setHeader(RocketMQHeaders.KEYS, key)
                        .build());
    }

4.异步发送消息

 /**
     * 异步消息
     *
     * @param topic
     * @param msg
     */
    public void sendASyncMsg(String topic, String msg) {
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                //成功回调
                log.info("异步发送消息成功:{}", JSON.toJSONString(sendResult));
            }

            @Override
            public void onException(Throwable e) {
                //异常回调
                log.info("异步发送消息失败:{}", e.getMessage());
            }
        });
    }

5.延时消息

     * 发送异步延迟消息
     * 消息内容为json格式
     */
    public void sendAsyncMsgByJsonDelay(String topic, String json,int level) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(topic, json.getBytes(Charset.forName("utf-8")));
        //设置延迟等级 0不延时从1开始分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 
        message.setDelayTimeLevel(level);
        //发送异步消息
        this.rocketMQTemplate.getProducer().send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }
        });
    }

6.分组消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 分组消费消息
 * @author 
 */
@Slf4j
@Component
@RocketMQMessageListener(topic ="test-topic", consumerGroup = "test_group")
public class TestConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("received message: " + message);
    }

}

7.tag消费,同一topic只能被一组消费 第一个tag消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * tag消费消息
 * @author 
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group1",selectorExpression ="tag1",selectorType = SelectorType.TAG)
public class TagOneConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("received message: " + message);
    }

}

第二个tag消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * tag消费消息
 * @author 
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group2",selectorExpression ="tag2",selectorType = SelectorType.TAG)
public class TagTwoConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("received message: " + message);
    }
}

8.消费多个tag消息,用||分隔

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * tag消费消息
 * @author 
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group",selectorExpression ="tag1||tag2",selectorType = SelectorType.TAG)
public class TagConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("received message: " + message);
    }
}
评论区