ActiveMQ接受byte消息正确姿势。

Wesley13
• 阅读 544

致那些年,被ActiveMQ浪费的青春。

ActiveMQ接受byte消息正确姿势。

网上有很多很多小朋友分享的文章,青春比较多的可以去多看看。

曾经我上传了一个病毒在一个群里,一位网友下载后电脑坏了。于是他在群里说:好东西,大家快去下载试试。真心不错。

或者国人总喜欢把一篇错误的文章转载几百次来误导下一位学习者寻求心里平衡。

网上一切文章关于ByteMessage的读取方式都是转为字符串。或者直接写文件。我就纳闷了,要是转为字符串就完事,那我闲着蛋疼发byte[]?

那么如果我传输的内容是一段byte[]呢。比如说是压缩后的json、图片等等呢?

苦寻无果,自己包装一下又怕影响效率,于是看了一下ActiveMQByteMessage的代码,贴上如下解决方案

局部代码:

        BytesMessage bm = (BytesMessage) message;
        Object[] params = null;
        //调用initializeReading方法初始化数据
        invoke(bm, "initializeReading", params);
        //读取dataIn字段内容(即实际内容)
        DataInputStream dataInputStream = (DataInputStream)PropertUtil.getFieldValue(bm, "dataIn");
        //inputstream转为byte数组。百度一下input2byte找到内容copy过来即可,不贴代码
        byte[] data = FileUtils.input2byte(dataInputStream);
        System.out.println("接收消息 BytesMessage:\t"+data);

全部代码:

@Resource
    private JmsTemplate jt ;

    public void MQTrigger() {
        SysThreadHandle.sysThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                // 接收消息
                while (true) {
                    try {
                        Message message = (Message) jt.receive();
                        // 如果是文本消息
                        if (message instanceof TextMessage) {
                            TextMessage tm = (TextMessage) message;
                            System.out.println("接受消息  textMessage:\t" + tm.getText());
                        }

                        // 如果是Map消息
                        if (message instanceof MapMessage) {
                            MapMessage mm = (MapMessage) message;
                            System.out.println("接受消息 textMessage:\t" + mm.getString("msgId"));
                        }
                        // 如果是bytes消息
                        if (message instanceof BytesMessage) {
                            BytesMessage bm = (BytesMessage) message;
                            Object[] params = null;
                            invoke(bm, "initializeReading", params);
                            DataInputStream dataInputStream = (DataInputStream) PropertUtil.getFieldValue(bm, "dataIn");
                            byte[] data = FileUtils.input2byte(dataInputStream);
                            System.out.println("接收消息 BytesMessage:\t"+data);
                        }

                        // 如果是Stream消息
                        if (message instanceof StreamMessage) {
                            StreamMessage sm = (StreamMessage) message;
                            System.out.println(sm.readString());
                            System.out.println(sm.readInt());
                        }

                        // 如果是Object消息
                        if (message instanceof ObjectMessage) {
                            ObjectMessage om = (ObjectMessage) message;
                            Object object = om.getObject();
                            System.out.println("接受消息  ObjectMessage:\t" + object);
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    public static Object invoke(Object beanOrClass, String methodName, Object... params) {
        try {
            Class<?> targeClass = beanOrClass.getClass();
            Method[] methods = targeClass.getDeclaredMethods();
            for (Method method : methods) {
                if (methodName.equals(method.getName())) {
                    method.setAccessible(true);
                    return method.invoke(beanOrClass, params);
                }
            }
            return null;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }

    }

    /**
     * 获取字段值,支持点属性
     * 
     * @param bean
     * @param paraName
     * @return
     */
    public static Object getFieldValue(Object bean, String paraName) {
        if (StringUtil.isNullOrEmpty(bean)) {
            return null;
        }
        Field[] fs = bean.getClass().getDeclaredFields();
        for (Field f : fs) {
            try {
                if (paraName.equals(f.getName())) {
                    f.setAccessible(true);
                    return f.get(bean);
                }
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return null;
    }

    
    @Override
    public void afterPropertiesSet() throws Exception {
        MQTrigger();
    }

思路说明:

Message有一个getBodyLength的方法,但是经过测试发现,长度少了几个。

不知道mq为何要如此封装,让我们这么难以取出来。

于是看了一下ActiveMQByteMessage源码。发现调用initializeReading方法初始化data数据。

有一个dataIn的字段,保存着消息内容,ByteMessage里面就是DataInputStream对象。至此,转为byte[]轻而易举。

Java交流群:218481849

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Irene181 Irene181
3年前
用 Python 爬取 QQ 空间说说和相册
文|某某白米饭来源:Python技术「ID:pythonall」QQ空间在2005年被腾讯开发,已经经历了15个年头,在还没有微信的年代,看网友发表的心情、心事、照片大多都在QQ空间的里。它承载了80、90后的大量青春,下面我们一起用selenium模块导出说说和相册回忆青春吧安装seleniumselenium是一
Wesley13 Wesley13
2年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
2年前
Android蓝牙连接汽车OBD设备
//设备连接public class BluetoothConnect implements Runnable {    private static final UUID CONNECT_UUID  UUID.fromString("0000110100001000800000805F9B34FB");
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
7个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这