SpringCloud Stream生产者配置RabbitMq的动态路由键

Easter79
• 阅读 656

在写这个文章前不得不吐槽目前国内一些blog的文章,尽是些复制粘贴的文章,提到点上但没任何的深入和例子。.........

经过测试下来总结一下RabbitMQ的Exchange的特性:

1、direct

生产者可以指定路由键,消费者可以指定路由键,但不能讲路由键设置为#(全部)。

2、topic

生产者可以指定路由键,消费者可以指定路由键,也可以不指定(或者是#)。

3、fanout

生产者和消费都忽略路由键。

在现实的场景里,通常是生产者会生产多个路由键的消费,然后多个消费来消费指定路由键的消息,但通常生产者的生产代码是同一份,如何在发消息的时候动态指定当前消息的路由键呢?

例子:门店平台系统集中处理多个门店的数据,然后分别将不同门店的数据发送到不同的门店(即:A门店只消费属于A门店的消息,B门店只消费属于B的消息)

看例子:

application.yml

 1 spring:    
 2     cloud:
 3         stream:
 4             # 设置默认的binder
 5             default-binder: pos
 6             binders:
 7                 scm:
 8                     type: rabbit
 9                     environment:
10                         spring:
11                           rabbitmq:
12                             # 连接到scm的host和exchange
13                             virtual-host: scm
14             pos:
15                 type: rabbit
16                 environment:
17                     spring:
18                         rabbitmq:
19                             # 连接到pos的host和exchange
20                             virtual-host: pos
21                     
22             shop:
23                 type: rabbit
24                   environment:
25                     spring:
26                       rabbitmq:
27                         # 连接到shop的host和exchange
28                         virtual-host: shop
29             
30           bindings:
31             # ---------消息消费------------
32 
33             # 集单开始生产消费
34             order_set_start_produce_input:
35               binder: pos
36               destination: POS_ORDER_SET_STRAT_PRODUCE
37               group: pos_group
38               
39             # 门店ID为1的消费者    
40             shop_consumer_input_1:
41                 binder:    shop
42                 destination: POS_ORDER_SET_STRAT_PRODUCE
43                 group: shop_1_group
44      
45               
46             #-----------消息生产-----------
47             # 集单开始生产通知生产
48             order_set_start_produce_output:
49               binder: pos
50               destination: POS_ORDER_SET_STRAT_PRODUCE
51             
52           rabbit:
53             bindings:
54               # 集单开始生产消费者
55               order_set_start_produce_input:
56                 consumer:
57                   exchangeType: topic
58                   autoBindDlq: true
59                   republishToDlq: true
60                   deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_POS_DLX
61                   #bindingRoutingKey: '#'
62               # 门店1的消费者
63               shop_consumer_input_1:
64                 consumer:
65                   exchangeType: topic
66                   autoBindDlq: true
67                   republishToDlq: true
68                   deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_SHOP_1_DLX
69                   bindingRoutingKey: 1
70                   deadLetterRoutingKey: 1
71 
72               # 生产者配置              
73               order_set_start_produce_output:
74                 producer:
75                   exchangeType: topic
76                   routingKeyExpression: headers.shopId
77                   # routingKeyExpression: headers['shopId']

上面的配置文件配置了一个动态的基于shopId做路由的生产者配置,一个消费全部路由键的消费者,如果要配置指定路由键的可以在配置文件里设置bindingRoutingKey属性的值。

生产者java代码:

import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.longge.pos.production.mq.dto.OrderSetProductionMsg;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MqSendUtil {
    private static MessageChannel orderSetStartProduceChannel;

    public static void setSfOrderCreateChannel(MessageChannel channel) {
        sfOrderCreateProduceChannel = channel;
    }
    
    public static void sendOrderSetPrintMsg(OrderSetProductionMsg msg) {
        // add kv pair - routingkeyexpression (which matches 'type') will then evaluate
        // and add the value as routing key
        log.info("发送开始生产的MQ:{}", JSONObject.toJSONString(msg));
        orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).setHeader("shopId", msg.getOrderSet().getShopId()).build());
        //orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).build());
    }
}

动态路由的核心在于上面那个红色的字体的地方,这个是和配置文件里的  routingKeyExpression 的配置是匹配的。

SpringCloud Stream生产者配置RabbitMq的动态路由键

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Karen110 Karen110
2年前
一篇文章带你了解JavaScript日期
日期对象允许您使用日期(年、月、日、小时、分钟、秒和毫秒)。一、JavaScript的日期格式一个JavaScript日期可以写为一个字符串:ThuFeb02201909:59:51GMT0800(中国标准时间)或者是一个数字:1486000791164写数字的日期,指定的毫秒数自1970年1月1日00:00:00到现在。1\.显示日期使用
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
MQ对比之RabbitMQ & Redis
消息队列选择:RabbitMQ&RedisRabbitMQRabbitMQ是一个由erlang开发的AMQP(AdvancedMessageQueue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
2年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k